recursive fold functions

19 views
Skip to first unread message

Andy L

unread,
Aug 5, 2015, 11:05:33 PM8/5/15
to PigPen Support
Hi,

As French people say "L'appétit vient en mangeant". My first steps with Pig/PigPen are very promising and now want to covert my other Clojure algorithms to PigPen (all below is me thinking out loud - it might not make sense):

My data in Hadoop can be interpreted in two ways:
1) as independent tuples (I already have addressed this)
2) as tuples related to each other, in a hierarchical fashion, very similar like nested tags in XML document

I already have a (non-trivial) recursive parsing function which turn a sequence of those tuples into a nested data structure. That output, a nested values, are relatively small, however there is no key I could group them by, which makes it a bit more challenging. In other words, I need something like a stateful tranducer, or Python generator, which can recognize boundaries, when something starts or ends.

Now, what I can do at that point, is to group touples by on a one level up, and then apply my recursive function on it which will output a sequence of that nested data structure. Than I can split it this sequence to get back what I need.

However, I was wondering if there is a way to code it up more "natively", perhaps using fold module. I will supply code examples if there is some "hope" for better solution.

Thanks,
Andy


Matt Bossenbroek

unread,
Aug 6, 2015, 1:18:59 PM8/6/15
to Andy L, PigPen Support
Some sample data would definitely help here :)

I can say some things that might help though…

Fold functions are useful when you can perform a partial aggregation & then combine many of these partial aggregations to produce a result. The count function is the canonical example here. You can partially aggregate by counting elements & then sum those results to compute the result. However, you must have a key to group by (or fold all of the data) - there's no concept of state as you scan over data.


That said, there's nothing stopping you from making a stateful function like this:

=> (->>

     (pig/return [1 2 2 2 3 4 4 4 5 6 6 6])

     (pig/mapcat (let [state (atom [])]

                   (fn [x]

                     (swap! state conj x)

                     (when (odd? x)

                       (let [result @state]

                         (reset! state [])

                         [result])))))

     (pig/dump))

([1] [2 2 2 3] [4 4 4 5])


There are some caveats though. This state is per-mapper, not global. The ordering of the data is dependent on the platform used, but I would speculate that most would give you the data in the order it's read from the underlying storage. There is no guarantee that any two records will be processed by the same mapper, though these boundaries usually end up matching the underlying splits (depending on the platform options used). And since there's no end-of-seq signal, it's going to drop any trailing data that doesn't have a terminating signal. If you used this technique immediately after a load command where you know that the data is ordered, it might work. If you use this after a join or grouping where the data is scrambled, it's likely not going to be very useful.


A slightly different version of this uses pig/reduce:

=> (->>

     (pig/return [1 2 2 2 3 4 4 4 5 6 6 6])

     (pig/reduce (fn [acc x]

                   (let [acc' (update-in acc [(dec (count acc))] conj x)]

                     (if (odd? x)

                       (conj acc' [])

                       acc')))

                 [[]])

     (pig/mapcat identity)

     (pig/dump))

([1] [2 2 2 3] [4 4 4 5] [6 6 6])


The downside to this one is that it's going to use a single reducer, so that's a bottleneck. And the entire accumulated result must be processed before emitting results. The benefit is that it's going to process all of the data using this single state, so you wouldn't have to worry about seeing partial data. And there's a clear end signal, so we can clean up when it's done. However, all of the ordering issues still apply.


Is that anywhere close to what you're looking for? Hope that helps some. Let me know if you've got a more concrete example & I'll see what I can think up.

-Matt

--
You received this message because you are subscribed to the Google Groups "PigPen Support" group.
To unsubscribe from this group and stop receiving emails from it, send an email to pigpen-suppor...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Andy L

unread,
Aug 7, 2015, 9:27:10 PM8/7/15
to PigPen Support, core....@gmail.com


Is that anywhere close to what you're looking for? Hope that helps some. Let me know if you've got a more concrete example & I'll see what I can think up.


That helps a lot and gives me better feeling about what map/reduce jobs are about. I know for the fact that my mappers retain original sequence of records, a quite essential property in my case. Hence, first solution might work. However after further analysis, your examples convinced me that I should write a higher level custom loader. I can easily rewrite my recursive algorithm/parser from Clojure to Java. That would make it a less leaky abstraction and give me more confidence I can apply whatever map/reduce flows without any additional presumptions.

Using my XML analogy, the loader would produce entire documents as single records, which I could easily process with my Clojure functions down the stream. Total freedom :-). Again, thanks for shedding so much light on my case. I will try to post some simplified examples later.

Best regards,
Andy
Reply all
Reply to author
Forward
0 new messages