download sourceball as .zipdownload sourceball as .tardownload sourceball as static gem

Using Wukong and Wuclan – Parsing

In part 1 we begain a scraper to trawl our desired part of the social web. Now
we’re ready to start using Wukong to process the files.

Files come off the wire as

:url :scraped_at :response_code :response_message :contents String DateTime (flat) Integer String String (JSON-formatted, tab&newline-munged)

The contents field is a JSON-formatted mix of records:

We want to split each API response into a stream of those TwitterUser, Tweet, etc. records.

  1. Stream in each line (each line holds one request)
  2. turn the line into the corresponding TwitterRequest
  3. have the TwitterRequest parse its JSON contents and construct the TwitterUser, Tweet, etc.
  4. seriealize those records back out as tab-separated lines suitable for further processing with Wukong

The basics of StructStreamer

Wukong handles the first and last steps through its StructStreamer and the standard .to_flat method. So the actual structure is really simple:

#
  1. Instantiate each incoming request.
  2. Stream out the contained classes it generates.
    #
    class TwitterRequestParser < Wukong::Streamer::StructStreamer
    def process request
    request.parse do |obj|
    yield obj
    end
    end
    end
  1. This makes the script go.
    Wukong::Script.new(TwitterRequestParser, nil).run

In practice, all you need to know is that a StructStreamer gets a stream of objects to parse. Here’s an outline of its internals. The Wukong StructStreamer:

  1. takes each flattened line:
“twitter_friends_request http://…. 20090701123456 …fields… [{…}, {…}, …json…, {…}]”
  1. splits by tabs to create an array of fields
[“twitter_friends_request”, “http://…”, … “[{…}, {…}, …json…, {…}]”]
  1. constructs the class name indicated in the first field,
    using the values extracted from the remaining fields.
TwitterFriendsRequest.new “http://…”, “20090701123456”, … “[{…}, {…}, …json…, {…}]”

The last (contents) field is still just a string: there’s nothing special about it to Wukong.

Parsing

Since each requests’ contents are handled in a slightly (and brittle-ly) different manner, we just ask each request object to parse itself and feed out all the TwitterXXXX objects it generates.

class TwitterFollowersRequest
def parse &block return unless healthy?
  1. for each raw user/tweet pair in the parsed JSON contents,
    parsed_contents.each do |hsh|
    json_obj = JsonUserWithTweet.new(hsh, ‘scraped_at’ => scraped_at)
    next unless json_obj && json_obj.healthy?
  2. Extract user, tweet and relationship
    yield AFollowsB.new(json_obj.user.id, self.twitter_user_id) if json_obj.user
    json_obj.each(&block)
    end
    end

  1. end

The TwitterXXXRequest objects consist of one or many hashes with (a raw user hash, and possibly its latest raw tweet hash) or (a raw tweet hash and its raw user hash). The user hash might have only the fields for a TwitterPartialUser or it might have the fields for a full set of TwitterUser, TwitterUserProfile, TwitterUserStyle. Besides which, the fields themselves need some massaging to be compatible with Wukong and other tools in our Map/Reduce toolkit (details explained in a later section).

The fiddly little details are handled by a JsonUserWithTweet or JsonTweetWithUser (as appropriate) adapter pattern:

class JsonUserTweetPair def initialize raw, moreinfo
  1. clean up fields in entries (flatten date, true/false → 1/0, etc)
    fix_raw_user!
    fix_raw_tweet!
    end
  1. generate all the contained TwitterXXX objects
    def each
  2. end
  1. create TwitterUser object from raw info
    def user
    end
  2. create Tweet object from raw tweet hash
    def tweet
    end
  3. … and so forth
    end

I’ll ignore the gory details; view the source if you’re interested.

Running the script

Here, again, is the code (in full!) for the twitter_request_parser.rb script.

#
  1. Instantiate each incoming request.
  2. Stream out the contained classes it generates.
    #
    class TwitterRequestParser < Wukong::Streamer::StructStreamer
    def process request
    request.parse do |obj|
    yield obj
    end
    end
    end
  1. This makes the script go.
    Wukong::Script.new(TwitterRequestParser, nil).run

That last line is the runner: it makes this a Wukong script with a map phase only. (We’ll add in a reducer later on.)


Fork me on GitHub