download sourceball as .zipdownload sourceball as .tardownload sourceball as static gem

wukong :: hadoop made easy

Wukong: Hadoop made so easy a Chimpanzee could run it.

Treat your dataset like a

Wukong is friends with Hadoop the elephant, Pig the query language, and the cat on your command line.

Send Wukong questions to the Infinite Monkeywrench mailing list

Documentation index

How to write a Wukong script

Everyone uses a word count example to demonstrate map/reduce. Here is our obligatory script to count words in a text stream:

    require 'wukong'
    module WordCount
      class Mapper < Wukong::Streamer::LineStreamer
        # Emit each word in the line.
        def process line
          words = line.strip.split(/\W+/).reject(&:blank?)
          words.each{|word| yield [word, 1] }
        end
      end
      
      class Reducer < Wukong::Streamer::ListReducer
        def finalize
          yield [ key, values.map(&:last).map(&:to_i).sum ]
        end
      end
    end
    
    Wukong::Script.new(
      WordCount::Mapper,
      WordCount::Reducer
      ).run # Execute the script

The first class, the Mapper, eats lines and craps [word, count] records: word is the key, the count (1) is the value.

In the reducer, the values for each key are stacked up into a list; then the record(s) yielded by #finalize are emitted. There are many other ways to write the reducer (most of them are better) — see the tutorial

Structured data stream

You can also use structs to treat your dataset as a stream of objects:

    require 'wukong'
    require 'my_blog' #defines the blog models
    # structs for our input objects
    Tweet = Struct.new( :id, :created_at, :twitter_user_id,
      :in_reply_to_user_id, :in_reply_to_status_id, :text )
    TwitterUser  = Struct.new( :id, :username, :fullname,
      :homepage, :location, :description )
    module TwitBlog
      class Mapper < Wukong::Streamer::RecordStreamer
        # Watch for tweets by me
        MY_USER_ID = 24601
        #
        # If this is a tweet is by me, convert it to a Post.
        #
        # If it is a tweet not by me, convert it to a Comment that
        # will be paired with the correct Post.
        #
        # If it is a TwitterUser, convert it to a User record and
        # a user_location record
        #
        def process record
          case record
          when TwitterUser
            user     = MyBlog::User.new.merge(record) # grab the fields in common
            user_loc = MyBlog::UserLoc.new(record.id, record.location, nil, nil)
            yield user
            yield user_loc
          when Tweet
            if record.twitter_user_id == MY_USER_ID
              post = MyBlog::Post.new.merge record
              post.link = "http://twitter.com/statuses/show/#{record.id}"
              post.body = record.text
              post.title = record.text[0..65] + "..."
              yield post
            else
              comment = MyBlog::Comment.new.merge record
              comment.body    = record.text
              comment.post_id = record.in_reply_to_status_id
              yield comment
            end
          end
        end
      end
    end
    Wukong::Script.new( TwitBlog::Mapper, nil ).run # identity reducer

Advanced Patterns

Wukong has a good collection of map/reduce patterns. For example, it’s quite common to accumulate all records for a given key and emit some result based on the whole group.

The AccumulatingReducer calls start! on the first record for each key, calls accumulate() on every example for that key (including the first), and calls finalize() once the last record for that key is seen.

Here’s an AccumulatingReducer that takes a long list of key-value pairs and emits, for each key, all its corresponding values in one line.

    #
    # Roll up all values for each key into a single line
    #
    class GroupByReducer < Wukong::Streamer::AccumulatingReducer
      attr_accessor :values

      # Start with an empty list
      def start! *args
        self.values = []
      end

      # Aggregate each value in turn 
      def accumulate key, value
        self.values << value
      end

      # Emit the key and all values, tab-separated
      def finalize
        yield [key, values].flatten
      end
    end

So given adjacency pairs for the following directed friend graph:


    @jerry      @elaine
    @elaine     @jerry
    @jerry      @kramer
    @kramer     @jerry
    @kramer     @bobsacamato
    @kramer     @newman
    @jerry      @superman
    @newman     @kramer
    @newman     @elaine
    @newman     @jerry

You’d end up with


    @elaine     @jerry
    @jerry      @elaine      @kramer     @superman
    @kramer     @bobsacamato @jerry      @newman
    @newman     @elaine      @jerry      @kramer   

Credits

Wukong was written by Philip (flip) Kromer (flip@infochimps.org / @infochimps) for the infochimps project

Patches submitted by:

Thanks to:

Help!

Send Wukong questions to the Infinite Monkeywrench mailing list

Wukong News


Fork me on GitHub