Here’s a script to count words in a text stream:
require 'wukong'
module WordCount
class Mapper < Wukong::Streamer::LineStreamer
# Emit each word in the line.
def process line
words = line.strip.split(/\W+/).reject(&:blank?)
words.each{|word| yield [word, 1] }
end
end
class Reducer < Wukong::Streamer::ListReducer
def finalize
yield [ key, values.map(&:last).map(&:to_i).sum ]
end
end
end
Wukong::Script.new(
WordCount::Mapper,
WordCount::Reducer
).run # Execute the script
The first class, the Mapper, eats lines and craps [word, count]
records. Here
the /key/ is the word, and the /value/ is its count.
The second class is an example of an accumulated list reducer. The values for
each key are stacked up into a list; then the record(s) yielded by #finalize
are emitted.
Here’s another way to write the Reducer: accumulate the count of each line, then
yield the sum in #finalize
:
class Reducer2 < Wukong::Streamer::AccumulatingReducer
attr_accessor :key_count
def start! *args
self.key_count = 0
end
def accumulate(word, count)
self.key_count += count.to_i
end
def finalize
yield [ key, key_count ]
end
end
Of course you can be really lazy (i.e. smart) and write your script as
class Script < Wukong::Script
def reducer_command
'uniq -c'
end
end
The previous example dealt with unstructured data. Wukong also lets you view your data as a stream of structured objects.
Let’s say you have a blog; its records look like
Post = Struct.new( :id, :created_at, :user_id, :title, :body, :link )
Comment = Struct.new( :id, :created_at, :post_id, :user_id, :body )
User = Struct.new( :id, :username, :fullname, :homepage, :description )
UserLoc = Struct.new( :user_id, :text, :lat, :lng )
You’ve been using twitter for a long time, and you’ve written something that from now on will inject all your tweets as Posts, and all replies to them as Comments (by a common ‘twitter_bot’ account on your blog).What about the past two years’ worth of tweets? Let’s assume you’re so chatty that a Map/Reduce script is warranted to handle the volume. (Actually, wukong makes a really nice ETL package, so this may be convienient even at small scale).
Cook up something that scrapes your tweets and all replies to your tweets:
Tweet = Struct.new( :id, :created_at, :twitter_user_id,
:in_reply_to_user_id, :in_reply_to_status_id, :text )
TwitterUser = Struct.new( :id, :username, :fullname,
:homepage, :location, :description )
Now we’ll just process all those in a big pile, converting to Posts, Comments and Users as appropriate. Serialize your scrape results so that each Tweet and each TwitterUser is a single lines containing first the class name (‘tweet’ or ‘twitter_user’) followed by its constituent fields, in order, separated by tabs.
The RecordStreamer takes each such line, constructs its corresponding class, and instantiates it with the
require 'wukong'
require 'my_blog' #defines the blog models
module TwitBlog
class Mapper < Wukong::Streamer::RecordStreamer
# Watch for tweets by me
MY_USER_ID = 24601
# structs for our input objects
Tweet = Struct.new( :id, :created_at, :twitter_user_id,
:in_reply_to_user_id, :in_reply_to_status_id, :text )
TwitterUser = Struct.new( :id, :username, :fullname,
:homepage, :location, :description )
#
# If this is a tweet is by me, convert it to a Post.
#
# If it is a tweet not by me, convert it to a Comment that
# will be paired with the correct Post.
#
# If it is a TwitterUser, convert it to a User record and
# a user_location record
#
def process record
case record
when TwitterUser
user = MyBlog::User.new.merge(record) # grab the fields in common
user_loc = MyBlog::UserLoc.new(record.id, record.location, nil, nil)
yield user
yield user_loc
when Tweet
if record.twitter_user_id == MY_USER_ID
post = MyBlog::Post.new.merge record
post.link = "http://twitter.com/statuses/show/#{record.id}"
post.body = record.text
post.title = record.text[0..65] + "..."
yield post
else
comment = MyBlog::Comment.new.merge record
comment.body = record.text
comment.post_id = record.in_reply_to_status_id
yield comment
end
end
end
end
end
Wukong::Script.new( TwitBlog::Mapper, nil ).run # identity reducer
The script above uses the identity reducer: every record from the mapper is sent
to the output. But what if you had grabbed the replying user’s record every time you saw a reply?
You’d like to just pass it through uniq
. But if something has changed in the interim, or if you record a timestamp for each sample, you won’t be able to use the simple uniq
command. You’d like to just get one example for each key!
Wukong includes just such a reducer, the UniqByLastReducer:
#
# UniqByLastReducer accepts all records for a given key and emits only the
# last-seen.
#
# It acts like an insecure high-school kid: for each record of a given key
# it discards whatever record it's holding and adopts this new value. When a
# new key comes on the scene it emits the last record, like an older brother
# handing off his Depeche Mode collection.
#
# For example, to extract the *latest* value for each property, emit your
# records as
#
# [resource_type, key, timestamp, ... fields ...]
#
# then set :sort_fields to 3 and :partition_fields to 2.
#
class UniqByLastReducer < Wukong::Streamer::AccumulatingReducer
attr_accessor :final_value
#
# Use first two fields as keys by default
#
def get_key *vals
vals[0..1]
end
#
# Adopt each value in turn: the last one's the one you want.
#
def accumulate *vals
self.final_value = vals
end
#
# Emit the last-seen value
#
def finalize
yield final_value if final_value
end
#
# Clear state on reset
#
def start! *args
self.final_value = nil
end
end
Wukong has a good collection of map/reduce patterns. For example, it’s quite common to accumulate all records for a given key and emit some result based on the whole group. The
The AccumulatingReducer calls start! on the first record for each key, calls accumulate() on every example for that key (including the first), and calls finalize() once the last record for that key is seen.
Here’s an AccumulatingReducer that takes a long list of key-value pairs and emits, for each key, all its corresponding values in one line.
#
# Roll up all values for each key into a single line
#
class GroupByReducer < Wukong::Streamer::AccumulatingReducer
attr_accessor :values
# Start with an empty list
def start! *args
self.values = []
end
# Aggregate each value in turn
def accumulate key, value
self.values << value
end
# Emit the key and all values, tab-separated
def finalize
yield [key, values].flatten
end
end
So given adjacency pairs for the following directed friend graph:
@jerry @elaine @elaine @jerry @jerry @kramer @kramer @jerry @kramer @bobsacamato @kramer @newman @jerry @superman @newman @kramer @newman @elaine @newman @jerry
You’d end up with
@elaine @jerry @jerry @elaine @kramer @superman @kramer @bobsacamato @jerry @newman @newman @elaine @jerry @kramer
Now we’re going to write this using the synthetic keys already extant in the
twitter records, making the unwarranted assumption that they won’t collide with
the keys in your database.
Map/Reduce paradigm does badly with synthetic keys. Synthetic keys demand
locality, and map/reduce’s remarkable scaling comes from not assuming
locality. In general, write your map/reduce scripts to use natural keys (the scre
There are many useful examples (including an actually-useful version of this
WordCount script) in the examples/ directory.