Wukong: Hadoop made so easy a Chimpanzee could run it.
Treat your dataset like a
Wukong is friends with Hadoop the elephant, Pig the query language, and the cat
on your command line.
Send Wukong questions to the Infinite Monkeywrench mailing list
Everyone uses a word count example to demonstrate map/reduce. Here is our obligatory script to count words in a text stream:
require 'wukong'
module WordCount
class Mapper < Wukong::Streamer::LineStreamer
# Emit each word in the line.
def process line
words = line.strip.split(/\W+/).reject(&:blank?)
words.each{|word| yield [word, 1] }
end
end
class Reducer < Wukong::Streamer::ListReducer
def finalize
yield [ key, values.map(&:last).map(&:to_i).sum ]
end
end
end
Wukong::Script.new(
WordCount::Mapper,
WordCount::Reducer
).run # Execute the script
The first class, the Mapper, eats lines and craps [word, count]
records: word is the key, the count (1) is the value.
In the reducer, the values for each key are stacked up into a list; then the record(s) yielded by #finalize
are emitted. There are many other ways to write the reducer (most of them are better) — see the tutorial
You can also use structs to treat your dataset as a stream of objects:
require 'wukong'
require 'my_blog' #defines the blog models
# structs for our input objects
Tweet = Struct.new( :id, :created_at, :twitter_user_id,
:in_reply_to_user_id, :in_reply_to_status_id, :text )
TwitterUser = Struct.new( :id, :username, :fullname,
:homepage, :location, :description )
module TwitBlog
class Mapper < Wukong::Streamer::RecordStreamer
# Watch for tweets by me
MY_USER_ID = 24601
#
# If this is a tweet is by me, convert it to a Post.
#
# If it is a tweet not by me, convert it to a Comment that
# will be paired with the correct Post.
#
# If it is a TwitterUser, convert it to a User record and
# a user_location record
#
def process record
case record
when TwitterUser
user = MyBlog::User.new.merge(record) # grab the fields in common
user_loc = MyBlog::UserLoc.new(record.id, record.location, nil, nil)
yield user
yield user_loc
when Tweet
if record.twitter_user_id == MY_USER_ID
post = MyBlog::Post.new.merge record
post.link = "http://twitter.com/statuses/show/#{record.id}"
post.body = record.text
post.title = record.text[0..65] + "..."
yield post
else
comment = MyBlog::Comment.new.merge record
comment.body = record.text
comment.post_id = record.in_reply_to_status_id
yield comment
end
end
end
end
end
Wukong::Script.new( TwitBlog::Mapper, nil ).run # identity reducer
Wukong has a good collection of map/reduce patterns. For example, it’s quite common to accumulate all records for a given key and emit some result based on the whole group.
The AccumulatingReducer calls start! on the first record for each key, calls accumulate() on every example for that key (including the first), and calls finalize() once the last record for that key is seen.
Here’s an AccumulatingReducer that takes a long list of key-value pairs and emits, for each key, all its corresponding values in one line.
#
# Roll up all values for each key into a single line
#
class GroupByReducer < Wukong::Streamer::AccumulatingReducer
attr_accessor :values
# Start with an empty list
def start! *args
self.values = []
end
# Aggregate each value in turn
def accumulate key, value
self.values << value
end
# Emit the key and all values, tab-separated
def finalize
yield [key, values].flatten
end
end
So given adjacency pairs for the following directed friend graph:
@jerry @elaine
@elaine @jerry
@jerry @kramer
@kramer @jerry
@kramer @bobsacamato
@kramer @newman
@jerry @superman
@newman @kramer
@newman @elaine
@newman @jerry
You’d end up with
@elaine @jerry
@jerry @elaine @kramer @superman
@kramer @bobsacamato @jerry @newman
@newman @elaine @jerry @kramer
Wukong was written by Philip (flip) Kromer (flip@infochimps.org / @infochimps) for the infochimps project
Patches submitted by:
Thanks to:
Send Wukong questions to the Infinite Monkeywrench mailing list