download sourceball as .zipdownload sourceball as .tardownload sourceball as static gem

wukong :: usage

How to run a Wukong script

To run your script using local files and no connection to a hadoop cluster,

your/script.rb --run=local path/to/input_files path/to/output_dir

To run the command across a Hadoop cluster,

your/script.rb --run=hadoop path/to/input_files path/to/output_dir

You can set the default in the config/wukong-site.yaml file, and then just use --run instead of --run=something —it will just use the default run mode.

If you’re running --run=hadoop, all file paths are HDFS paths. If you’re running --run=local, all file paths are local paths. (your/script path, of course, lives on the local filesystem).

You can supply arbitrary command line arguments (they wind up as key-value pairs in the options path your mapper and reducer receive), and you can use the hadoop syntax to specify more than one input file:

./path/to/your/script.rb --any_specific_options --options=can_have_vals \
   --run "input_dir/part_*,input_file2.tsv,etc.tsv" path/to/output_dir

Note that all --options must precede (in any order) all non-options.

How to test your scripts

To run mapper on its own:

cat ./local/test/input.tsv | ./examples/word_count.rb --map | more

or if your test data lies on the HDFS,

hdp-cat test/input.tsv | ./examples/word_count.rb --map | more

Next graduate to running --run=local mode so you can inspect the reducer.

Wukong Plays nicely with others

Wukong is friends with Hadoop the elephant, Pig the query language, and the cat on your command line. It even has limited support for martinis (Datamapper) and express trains (ActiveRecord).

Schema export to Pig or SQL

There is preliminary support for dumping wukong classes as schemata for other tools. For example, given the following:

    require "wukong" ;
    require "wukong/schema"
    User = TypedStruct.new(
        [:id,                     Integer],
        [:scraped_at,             Bignum],
        [:screen_name,            String],
        [:followers_count,        Integer],
        [:created_at,             Bignum]
        );

You can make a snippet for loading into pig with puts User.load_pig:

    LOAD users.tsv AS ( rsrc:chararray, id: int, scraped_at: long, screen_name: chararray, followers_count: int, created_at: long )

Export to SQL with puts User.sql_create_table ; puts User.sql_load_mysql:

    CREATE TABLE         `users` (
      `id`                  INT,
      `scraped_at`          BIGINT,
      `screen_name`         VARCHAR(255) CHARACTER SET ASCII,
      `followers_count`     INT,
      `created_at`          BIGINT
      )  ;
    ALTER TABLE            `user` DISABLE KEYS;
    LOAD DATA LOCAL INFILE 'user.tsv'
      REPLACE INTO TABLE   `user`
      COLUMNS
        TERMINATED BY           '\t'
        OPTIONALLY ENCLOSED BY  ''
        ESCAPED BY              ''
      LINES STARTING BY     'user'
      ( @dummy,
        `id`, `scraped_at`, `screen_name`, `followers_count`, `created_at`
      );
    ALTER TABLE `user` ENABLE KEYS ;
    SELECT 'user', NOW(), COUNT(*) FROM `user`;

Wukong’s internal workflow

Here’s a somewhat detailed overview of a wukong script’s internal workflow.

  1. You call ./myscript.rb --run infile outfile
  2. Execution begins in the run method of the Script class (wukong/script.rb). It launches (depending on if you’re local or remote) one of
    • cat infile | ./myscript.rb --map | sort | ./myscript.rb --reduce > outfile
    • @hadoop [a_crapton_of_streaming_args] -mapper ‘./myscript.rb —map’ -reducer ‘./myscript.rb —reduce’ @
  3. In either case, the effect is to spawn the exact same script you ran at the command line: one or more times with the —map command in place of the —run command, and one or more times with the —reduce command in place of the —run command. (well, unless you specify no reducers or a :map_command or something)
  1. With the --map or --reduce flag given, the Script flag turns over control to the corresponding class: either mapper_klass.new(self.options).stream or reducer_klass.new(self.options).stream

When in --map or --reduce mode (we’ll just use --map as an example):

  1. The mapper_klass is usually a subclass of Streamer::Base, but in actual fact it can be anything that initializes from a hash of options and responds to #stream.
  2. The default #stream method
    • calls the before_stream hook
    • reads each line from stdin ; #recordizes it ; passes it (if non-nil) to #process ; and emits each object yielded by #process
    • calls its after_stream hook
  3. You typically leave #stream alone and just override #process.
  4. The accumulator classes build on these patterns (they’re proper subclasses of Streamer::Base), but are used differently. With an accumulator, you should implement some or all of
    • #start! — called at the start of each accumulation, passing in the first record for that key
    • #accumulate — called on each record (including that first one)
    • #finalize — called when the last key of this accumulation is seen.
    • #get_key — called on each record to recover its key.

Using wukong with internal streaming

If you’re using wukong in local mode, you may not want to spawn new processes all over the place. Or your records may arrive not from the command line but from, say, a database call.

In that case, just override #stream. The original:

      #
      # Pass each record to +#process+
      #
      def stream
        before_stream
        $stdin.each do |line|
          record = recordize(line.chomp)
          next unless record
          process(*record) do |output_record|
            emit output_record
          end
        end
        after_stream
      end

Using wukong to Batch-Process ActiveRecord Objects

Here’s a stream method, overridden to batch-process ActiveRecord objects (untested sample code):

    class Mapper < Wukong::Streamer
      # Set record_klass to the ActiveRecord class you'd like to batch process
      cattr_accessor :record_klass
      # Size of each batch to pull from the database
      cattr_accessor :batch_size

      #
      # Grab records from the database in batches,
      # pass each record to +#process+
      #
      # Everything downstream of this is agnostic of the fact that
      # records are coming from the database and not $stdin
      #
      def stream
        before_stream
        record_klass.find_in_batches(:batch_size => batch_size ) do |record_batch|
          record_batch.each do |record|
            process(record.id, record) do |output_record|
              emit output_record
            end
          end
        end
        after_stream
      end

      # ....
    end

Fork me on GitHub