To run your script using local files and no connection to a hadoop cluster,
your/script.rb --run=local path/to/input_files path/to/output_dir
To run the command across a Hadoop cluster,
your/script.rb --run=hadoop path/to/input_files path/to/output_dir
You can set the default in the config/wukong-site.yaml file, and then just use --run
instead of --run=something
—it will just use the default run mode.
If you’re running --run=hadoop
, all file paths are HDFS paths. If you’re running --run=local
, all file paths are local paths. (your/script path, of course, lives on the local filesystem).
You can supply arbitrary command line arguments (they wind up as key-value pairs in the options path your mapper and reducer receive), and you can use the hadoop syntax to specify more than one input file:
./path/to/your/script.rb --any_specific_options --options=can_have_vals \ --run "input_dir/part_*,input_file2.tsv,etc.tsv" path/to/output_dir
Note that all --options
must precede (in any order) all non-options.
To run mapper on its own:
cat ./local/test/input.tsv | ./examples/word_count.rb --map | more
or if your test data lies on the HDFS,
hdp-cat test/input.tsv | ./examples/word_count.rb --map | more
Next graduate to running --run=local
mode so you can inspect the reducer.
Wukong is friends with Hadoop the elephant, Pig the query language, and the cat
on your command line. It even has limited support for martinis (Datamapper) and express trains (ActiveRecord).
There is preliminary support for dumping wukong classes as schemata for other tools. For example, given the following:
require "wukong" ;
require "wukong/schema"
User = TypedStruct.new(
[:id, Integer],
[:scraped_at, Bignum],
[:screen_name, String],
[:followers_count, Integer],
[:created_at, Bignum]
);
You can make a snippet for loading into pig with puts User.load_pig
:
LOAD users.tsv AS ( rsrc:chararray, id: int, scraped_at: long, screen_name: chararray, followers_count: int, created_at: long )
Export to SQL with puts User.sql_create_table ; puts User.sql_load_mysql
:
CREATE TABLE `users` (
`id` INT,
`scraped_at` BIGINT,
`screen_name` VARCHAR(255) CHARACTER SET ASCII,
`followers_count` INT,
`created_at` BIGINT
) ;
ALTER TABLE `user` DISABLE KEYS;
LOAD DATA LOCAL INFILE 'user.tsv'
REPLACE INTO TABLE `user`
COLUMNS
TERMINATED BY '\t'
OPTIONALLY ENCLOSED BY ''
ESCAPED BY ''
LINES STARTING BY 'user'
( @dummy,
`id`, `scraped_at`, `screen_name`, `followers_count`, `created_at`
);
ALTER TABLE `user` ENABLE KEYS ;
SELECT 'user', NOW(), COUNT(*) FROM `user`;
Here’s a somewhat detailed overview of a wukong script’s internal workflow.
./myscript.rb --run infile outfile
wukong/script.rb
). It launches (depending on if you’re local or remote) one of
cat infile | ./myscript.rb --map | sort | ./myscript.rb --reduce > outfile
--map
or --reduce
flag given, the Script flag turns over control to the corresponding class: either mapper_klass.new(self.options).stream
or reducer_klass.new(self.options).stream
When in --map
or --reduce
mode (we’ll just use --map
as an example):
Streamer::Base
, but in actual fact it can be anything that initializes from a hash of options and responds to #stream.If you’re using wukong in local mode, you may not want to spawn new processes all over the place. Or your records may arrive not from the command line but from, say, a database call.
In that case, just override #stream. The original:
#
# Pass each record to +#process+
#
def stream
before_stream
$stdin.each do |line|
record = recordize(line.chomp)
next unless record
process(*record) do |output_record|
emit output_record
end
end
after_stream
end
Here’s a stream method, overridden to batch-process ActiveRecord objects (untested sample code):
class Mapper < Wukong::Streamer
# Set record_klass to the ActiveRecord class you'd like to batch process
cattr_accessor :record_klass
# Size of each batch to pull from the database
cattr_accessor :batch_size
#
# Grab records from the database in batches,
# pass each record to +#process+
#
# Everything downstream of this is agnostic of the fact that
# records are coming from the database and not $stdin
#
def stream
before_stream
record_klass.find_in_batches(:batch_size => batch_size ) do |record_batch|
record_batch.each do |record|
process(record.id, record) do |output_record|
emit output_record
end
end
end
after_stream
end
# ....
end