[Lamina] Aggregate channel messages to chunks and apply a reduce function to each chunk?

49 views
Skip to first unread message

Paul Lam

unread,
Mar 27, 2013, 6:25:00 PM3/27/13
to alep...@googlegroups.com
Hi,

I have a channel with streaming messages that are timestamped, e.g. [{:timestamp 1000, :value 1}, {:timestamp 1001, :value 4} ...], I want to return a channel that's the max of a periodical chunk using each message's timestamp for grouping. How do I aggregate messages and apply a reduce function to each chunk to return a new channel?


Cheers,
Paul

Zach Tellman

unread,
Mar 27, 2013, 7:44:58 PM3/27/13
to alep...@googlegroups.com
Hey Paul,

In the latest release (0.5.0-beta15), this functionality can be found in lamina.query/query-stream.  Your query would look something like:

(use 'lamina.query 'lamina.core)
(require '[lamina.time :as t])

(query-stream 
  #(->> % partition-every (map* #(apply max %)))
  {:period (t/minutes 1), :timestamp :timestamp, :payload :value} 
  ch)

This will emit a stream of tuples looking like {:timestamp X, :value Y} where X is the logical timestamp this would have been emitted at, and Y is the max value for the time range (X - period).

If you change 'query-stream' to 'query-seq', you can do the same for a normal sequence of the values you describe, the returned value will be a lazy sequence of max-values and timestamps.

Let me know if you have any further questions,
Zach
   



--
You received this message because you are subscribed to the Google Groups "Aleph" group.
To unsubscribe from this group and stop receiving emails from it, send an email to aleph-lib+...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.
 
 

Paul Lam

unread,
Mar 28, 2013, 4:50:24 PM3/28/13
to alep...@googlegroups.com
Hi Zach,

I'm getting a null pointer exception. I'm probably doing something silly. Here's my stacktrace: http://pastebin.com/yfbEPN2h I'm just (enqueue) streaming messages to a channel then running that (query-stream). It's ok if the channel is never-ending is it?

Is there anything that stands out to you from the stacktrace? I should get a unit test and debug it myself but is hoping you might spot something easily.


Cheers,
Paul

Zach Tellman

unread,
Mar 28, 2013, 5:01:30 PM3/28/13
to alep...@googlegroups.com
Hey Paul,

Sorry, I was just about to board a plan yesterday as I sent that email, and I rushed over some aspects of query-stream.  The instructions I gave will work for query-seq, but query-stream currently requires a little more work:

(use 'lamina.query 'lamina.core)
(require '[lamina.time :as t])

(def t-q (t/task-queue))

(query-stream 
  #(->> % partition-every (map* #(apply max %)))
  {:task-queue t-q, :period (t/minutes 1), :timestamp :timestamp, :payload :value} 
  ch)

(t/advance-until t-q 1234)

The 'task-queue' here basically is the clock, which must be manually advanced.  In retrospect, if no :task-queue is explicitly defined this is something that can be taken care of for you, so I'll be pushing some code that would make my earlier version work properly.

Zach



--

Zach Tellman

unread,
Mar 28, 2013, 5:24:19 PM3/28/13
to alep...@googlegroups.com
Sorry, that should read:

(def t-q (t/non-realtime-task-queue))

I've just pushed a change to 0.5.0-SNAPSHOT which will make it so that the :task-queue doesn't need to be specified, however.

Zach

Paul Lam

unread,
Mar 28, 2013, 5:59:07 PM3/28/13
to alep...@googlegroups.com
Hi Zack,

Thanks a lot for your help! Now I seem to be running into arity error.

ArityException Wrong number of args (1) passed to: operators$partition-every  clojure.lang.AFn.throwArity (AFn.java:437)

So I added the first arg to partition-every as such

(fn [ch] (->> ch (partial partition-every {:period (t/seconds 10)}) (map* #(apply max %))))

Then it's complaining of cast

ClassCastException clojure.core$partial$fn__4192 cannot be cast to lamina.core.channel.IChannel  lamina.core.channel/eval4790/emitter-node--4793 (channel.clj:41)



Regards,
Paul

Zach Tellman

unread,
Mar 28, 2013, 6:30:47 PM3/28/13
to alep...@googlegroups.com
The ->> operator does code-level reordering, and is not a proper thrush combinator.  Therefore, if you want to write (c x (b y (a z))), it would just be

  (->> z a (b y) (c x))

not
  
  (->> z a (partial b y) (partial c x))

so you just need to do:

  (->> ch (partition-every {:period (t/seconds 10)}) (map* #(apply max %))))

Zach

Reply all
Reply to author
Forward
0 new messages