Some sample data would definitely help here :)
I can say some things that might help though…
Fold functions are useful when you can perform a partial aggregation & then combine many of these partial aggregations to produce a result. The count function is the canonical example here. You can partially aggregate by counting elements & then sum those results to compute the result. However, you must have a key to group by (or fold all of the data) - there's no concept of state as you scan over data.
That said, there's nothing stopping you from making a stateful function like this:
=> (->>
(pig/return [1 2 2 2 3 4 4 4 5 6 6 6])
(pig/mapcat (let [state (atom [])]
(fn [x]
(swap! state conj x)
(when (odd? x)
(let [result @state]
(reset! state [])
[result])))))
(pig/dump))
([1] [2 2 2 3] [4 4 4 5])
There are some caveats though. This state is per-mapper, not global. The ordering of the data is dependent on the platform used, but I would speculate that most would give you the data in the order it's read from the underlying storage. There is no guarantee that any two records will be processed by the same mapper, though these boundaries usually end up matching the underlying splits (depending on the platform options used). And since there's no end-of-seq signal, it's going to drop any trailing data that doesn't have a terminating signal. If you used this technique immediately after a load command where you know that the data is ordered, it might work. If you use this after a join or grouping where the data is scrambled, it's likely not going to be very useful.
A slightly different version of this uses pig/reduce:
=> (->>
(pig/return [1 2 2 2 3 4 4 4 5 6 6 6])
(pig/reduce (fn [acc x]
(let [acc' (update-in acc [(dec (count acc))] conj x)]
(if (odd? x)
(conj acc' [])
acc')))
[[]])
(pig/mapcat identity)
(pig/dump))
([1] [2 2 2 3] [4 4 4 5] [6 6 6])
The downside to this one is that it's going to use a single reducer, so that's a bottleneck. And the entire accumulated result must be processed before emitting results. The benefit is that it's going to process all of the data using this single state, so you wouldn't have to worry about seeing partial data. And there's a clear end signal, so we can clean up when it's done. However, all of the ordering issues still apply.
Is that anywhere close to what you're looking for? Hope that helps some. Let me know if you've got a more concrete example & I'll see what I can think up.
-Matt