Using Wukong and Wuclan – Parsing
In part 1 we begain a scraper to trawl our desired part of the social web. Now
we’re ready to start using Wukong to process the files.
Files come off the wire as
:url :scraped_at :response_code :response_message :contents
String DateTime (flat) Integer String String (
JSON-formatted, tab&newline-munged)
The contents field is a JSON-formatted mix of records:
- TwitterFollowersRequest and TwitterFriendsRequest yield an
Array[Hash{user => raw_tweet}]
. We want to extract a stream of AFollowsB (with the request user as user_a for a friends request and user_b for a followers request) along with the included Tweet, TwitterUser, TwitterUserProfile and TwitterUserStyle records.
- TwitterFavoritesRequest yields an array of @Array[Hash{tweet_hash => user_hash}]. We want to extract a stream of AFavoritesB along with the included Tweet, TwitterUser, TwitterUserProfile and TwitterUserStyle records
- TwitterUser yields a single
user_hash
making one each of TwitterUser, TwitterUserProfile and TwitterUserStyle.
- UserTimelineRequest and PublicTimelineRequest yield an Array[Hash{tweet => user}]. We want to extract the included Tweet, TwitterUser, TwitterUserProfile and TwitterUserStyle records.
- TwitterFollowerIdsRequest and TwitterFriendIdsRequest return an Array[user_ids] (each user_id is a simple Integer). We extract a series of AFollowsB (using the request’s user_id as user_a_id or user_b_id)
We want to split each API response into a stream of those TwitterUser, Tweet, etc. records.
- Stream in each line (each line holds one request)
- turn the line into the corresponding TwitterRequest
- have the TwitterRequest parse its JSON contents and construct the TwitterUser, Tweet, etc.
- seriealize those records back out as tab-separated lines suitable for further processing with Wukong
The basics of StructStreamer
Wukong handles the first and last steps through its StructStreamer and the standard .to_flat method. So the actual structure is really simple:
#
- Instantiate each incoming request.
- Stream out the contained classes it generates.
#
class TwitterRequestParser < Wukong::Streamer::StructStreamer
def process request
request.parse do |obj|
yield obj
end
end
end
- This makes the script go.
Wukong::Script.new(TwitterRequestParser, nil).run
In practice, all you need to know is that a StructStreamer gets a stream of objects to parse. Here’s an outline of its internals. The Wukong StructStreamer:
- takes each flattened line:
“twitter_friends_request http://…. 20090701123456 …fields… [{…}, {…}, …json…, {…}]”
- splits by tabs to create an array of fields
[“twitter_friends_request”, “http://…”, … “[{…}, {…}, …json…, {…}]”]
- constructs the class name indicated in the first field,
using the values extracted from the remaining fields.
TwitterFriendsRequest.new “http://…”, “20090701123456”, … “[{…}, {…}, …json…, {…}]”
The last (contents) field is still just a string: there’s nothing special about it to Wukong.
Parsing
Since each requests’ contents are handled in a slightly (and brittle-ly) different manner, we just ask each request object to parse itself and feed out all the TwitterXXXX objects it generates.
class TwitterFollowersRequest
- …
def parse &block
return unless healthy?
- for each raw user/tweet pair in the parsed JSON contents,
parsed_contents.each do |hsh|
json_obj = JsonUserWithTweet.new(hsh, ‘scraped_at’ => scraped_at)
next unless json_obj && json_obj.healthy?
- Extract user, tweet and relationship
yield AFollowsB.new(json_obj.user.id, self.twitter_user_id) if json_obj.user
json_obj.each(&block)
end
end
- …
end
The TwitterXXXRequest objects consist of one or many hashes with (a raw user hash, and possibly its latest raw tweet hash) or (a raw tweet hash and its raw user hash). The user hash might have only the fields for a TwitterPartialUser or it might have the fields for a full set of TwitterUser, TwitterUserProfile, TwitterUserStyle. Besides which, the fields themselves need some massaging to be compatible with Wukong and other tools in our Map/Reduce toolkit (details explained in a later section).
The fiddly little details are handled by a JsonUserWithTweet or JsonTweetWithUser (as appropriate) adapter pattern:
class JsonUserTweetPair
def initialize raw, moreinfo
- clean up fields in entries (flatten date, true/false → 1/0, etc)
fix_raw_user!
fix_raw_tweet!
end
- generate all the contained TwitterXXX objects
def each
- end
- create TwitterUser object from raw info
def user
end
- create Tweet object from raw tweet hash
def tweet
end
- … and so forth
end
I’ll ignore the gory details; view the source if you’re interested.
Running the script
Here, again, is the code (in full!) for the twitter_request_parser.rb script.
#
- Instantiate each incoming request.
- Stream out the contained classes it generates.
#
class TwitterRequestParser < Wukong::Streamer::StructStreamer
def process request
request.parse do |obj|
yield obj
end
end
end
- This makes the script go.
Wukong::Script.new(TwitterRequestParser, nil).run
That last line is the runner: it makes this a Wukong script with a map phase only. (We’ll add in a reducer later on.)