download sourceball as .zipdownload sourceball as .tardownload sourceball as static gem

Tutorial by Examples

Count Words

Here’s a script to count words in a text stream:

    require 'wukong'
    module WordCount
      class Mapper < Wukong::Streamer::LineStreamer
        # Emit each word in the line.
        def process line
          words = line.strip.split(/\W+/).reject(&:blank?)
          words.each{|word| yield [word, 1] }
        end
      end

      class Reducer < Wukong::Streamer::ListReducer
        def finalize
          yield [ key, values.map(&:last).map(&:to_i).sum ]
        end
      end
    end

    Wukong::Script.new(
      WordCount::Mapper,
      WordCount::Reducer
      ).run # Execute the script

The first class, the Mapper, eats lines and craps [word, count] records. Here
the /key/ is the word, and the /value/ is its count.

The second class is an example of an accumulated list reducer. The values for
each key are stacked up into a list; then the record(s) yielded by #finalize
are emitted.

Here’s another way to write the Reducer: accumulate the count of each line, then
yield the sum in #finalize:

    class Reducer2 < Wukong::Streamer::AccumulatingReducer
      attr_accessor :key_count
      def start! *args
        self.key_count = 0
      end
      def accumulate(word, count)
        self.key_count += count.to_i
      end
      def finalize
        yield [ key, key_count ]
      end
    end

Of course you can be really lazy (i.e. smart) and write your script as

    class Script < Wukong::Script
      def reducer_command
        'uniq -c'
      end
    end

Structured data

The previous example dealt with unstructured data. Wukong also lets you view your data as a stream of structured objects.

Let’s say you have a blog; its records look like

    Post    = Struct.new( :id, :created_at, :user_id, :title, :body, :link )
    Comment = Struct.new( :id, :created_at, :post_id, :user_id, :body )
    User    = Struct.new( :id, :username, :fullname, :homepage, :description )
    UserLoc = Struct.new( :user_id, :text, :lat, :lng )

You’ve been using twitter for a long time, and you’ve written something that from now on will inject all your tweets as Posts, and all replies to them as Comments (by a common ‘twitter_bot’ account on your blog).What about the past two years’ worth of tweets? Let’s assume you’re so chatty that a Map/Reduce script is warranted to handle the volume. (Actually, wukong makes a really nice ETL package, so this may be convienient even at small scale).

Cook up something that scrapes your tweets and all replies to your tweets:

    Tweet = Struct.new( :id, :created_at, :twitter_user_id,
      :in_reply_to_user_id, :in_reply_to_status_id, :text )
    TwitterUser  = Struct.new( :id, :username, :fullname,
      :homepage, :location, :description )

Now we’ll just process all those in a big pile, converting to Posts, Comments and Users as appropriate. Serialize your scrape results so that each Tweet and each TwitterUser is a single lines containing first the class name (‘tweet’ or ‘twitter_user’) followed by its constituent fields, in order, separated by tabs.

The RecordStreamer takes each such line, constructs its corresponding class, and instantiates it with the

    require 'wukong'
    require 'my_blog' #defines the blog models
    module TwitBlog
      class Mapper < Wukong::Streamer::RecordStreamer
        # Watch for tweets by me
        MY_USER_ID = 24601
        # structs for our input objects
        Tweet = Struct.new( :id, :created_at, :twitter_user_id,
          :in_reply_to_user_id, :in_reply_to_status_id, :text )
        TwitterUser  = Struct.new( :id, :username, :fullname,
          :homepage, :location, :description )
        #
        # If this is a tweet is by me, convert it to a Post.
        #
        # If it is a tweet not by me, convert it to a Comment that
        # will be paired with the correct Post.
        #
        # If it is a TwitterUser, convert it to a User record and
        # a user_location record
        #
        def process record
          case record
  	when TwitterUser
  	  user     = MyBlog::User.new.merge(record) # grab the fields in common
  	  user_loc = MyBlog::UserLoc.new(record.id, record.location, nil, nil)
  	  yield user
  	  yield user_loc
  	when Tweet
  	  if record.twitter_user_id == MY_USER_ID
  	    post = MyBlog::Post.new.merge record
  	    post.link = "http://twitter.com/statuses/show/#{record.id}"
  	    post.body = record.text
  	    post.title = record.text[0..65] + "..."
  	    yield post
  	  else
     	    comment = MyBlog::Comment.new.merge record
  	    comment.body    = record.text
  	    comment.post_id = record.in_reply_to_status_id
  	    yield comment
  	  end
  	end
        end
      end
    end
    Wukong::Script.new( TwitBlog::Mapper, nil ).run # identity reducer

Accumulators

A Uniqifying Accumulator

The script above uses the identity reducer: every record from the mapper is sent
to the output. But what if you had grabbed the replying user’s record every time you saw a reply?

You’d like to just pass it through uniq. But if something has changed in the interim, or if you record a timestamp for each sample, you won’t be able to use the simple uniq command. You’d like to just get one example for each key!

Wukong includes just such a reducer, the UniqByLastReducer:

    #
    # UniqByLastReducer accepts all records for a given key and emits only the
    # last-seen.
    #
    # It acts like an insecure high-school kid: for each record of a given key
    # it discards whatever record it's holding and adopts this new value. When a
    # new key comes on the scene it emits the last record, like an older brother
    # handing off his Depeche Mode collection.
    #
    # For example, to extract the *latest* value for each property, emit your
    # records as
    #
    #    [resource_type, key, timestamp, ... fields ...]
    #
    # then set :sort_fields to 3 and :partition_fields to 2.
    #
    class UniqByLastReducer < Wukong::Streamer::AccumulatingReducer
      attr_accessor :final_value

      #
      # Use first two fields as keys by default
      #
      def get_key *vals
        vals[0..1]
      end

      #
      # Adopt each value in turn: the last one's the one you want.
      #
      def accumulate *vals
        self.final_value = vals
      end

      #
      # Emit the last-seen value
      #
      def finalize
        yield final_value if final_value
      end

      #
      # Clear state on reset
      #
      def start! *args
        self.final_value = nil
      end
    end

A GroupBy Accumulator

Wukong has a good collection of map/reduce patterns. For example, it’s quite common to accumulate all records for a given key and emit some result based on the whole group. The

The AccumulatingReducer calls start! on the first record for each key, calls accumulate() on every example for that key (including the first), and calls finalize() once the last record for that key is seen.

Here’s an AccumulatingReducer that takes a long list of key-value pairs and emits, for each key, all its corresponding values in one line.

    #
    # Roll up all values for each key into a single line
    #
    class GroupByReducer < Wukong::Streamer::AccumulatingReducer
      attr_accessor :values

      # Start with an empty list
      def start! *args
        self.values = []
      end

      # Aggregate each value in turn
      def accumulate key, value
        self.values << value
      end

      # Emit the key and all values, tab-separated
      def finalize
        yield [key, values].flatten
      end
    end

So given adjacency pairs for the following directed friend graph:

    @jerry      @elaine
    @elaine     @jerry
    @jerry      @kramer
    @kramer     @jerry
    @kramer     @bobsacamato
    @kramer     @newman
    @jerry      @superman
    @newman     @kramer
    @newman     @elaine
    @newman     @jerry

You’d end up with

    @elaine     @jerry
    @jerry      @elaine      @kramer     @superman
    @kramer     @bobsacamato @jerry      @newman
    @newman     @elaine      @jerry      @kramer

A note about keys

Now we’re going to write this using the synthetic keys already extant in the
twitter records, making the unwarranted assumption that they won’t collide with
the keys in your database.

Map/Reduce paradigm does badly with synthetic keys. Synthetic keys demand
locality, and map/reduce’s remarkable scaling comes from not assuming
locality. In general, write your map/reduce scripts to use natural keys (the scre

More…

There are many useful examples (including an actually-useful version of this
WordCount script) in the examples/ directory.


Fork me on GitHub