Streaming a large file onto a channel

276 views
Skip to first unread message

Adrian Mowat

unread,
Mar 17, 2015, 8:52:17 AM3/17/15
to clo...@googlegroups.com
Hi,

I've played around with core.async a bit but now I'm trying to use it for a real project and I'm running into a problem getting data off a file and into a channel on the JVM (i.e. as opposed to ClojureScript)

I have around 1GB of data sitting in a file.  Each line of the file contains a separate JSON document.  There are different types of document in the file and I would like use core.async to setup a pipeline of concurrent operations as follows so I can start processing the data before I've finished reading the file.

1. Stream the raw data out of the file one line at a time, parse it as JSON and write each line to channel (1)
2. Read channel (1) and divide the messages up by type and write them to new channels (2..n)
3. Read channels (2..n) and apply business logic as appropriate

I'd like the initial read to run in it's own thread because it will be IO blocking.  The others can run in core.async's thread pool 

I'm running into problems getting channels (1) and (2) to talk to one another.  Here's my initial spike and I would expect it to write the 10 lines of json from the example file to stdout. 

(defn file-to-chan [ch file]
  (do
    (async/thread
      (with-open [rdr (io/reader file)]
        (doseq [line (line-seq rdr)]
          (>!! ch line))))
    ch))

(defn parse-line [s]
  (json/parse-string s (comp keyword str/lower-case)))

(def events (chan 1 (map parse-line)))

(go
  (while true
    (println (<! events))))

(file-to-chan events "10_events.json")

I have a few questions...

* Can anyone help me understand what's going wrong? (I'm sure it's something silly, but I'm going cross eyed looking at it)
* It's effectively a batch process.  Is this an appropriate use case for core.async?
* If so, am I on the right track or is there a better way to approach this?

Many Thanks

Adrian





Erick Pintor

unread,
Mar 18, 2015, 9:45:33 AM3/18/15
to clo...@googlegroups.com
Hi Adrian,

What is exactly the issue that you're facing?
I did my own version and it seems to be working fine.

Please, take a look and I hope it helps.

(defn process-file [ch file]
  (async/thread
    (with-open [input (io/reader file)]
      (doseq [line (line-seq input)]
        (async/>!! ch line)))))

(defn parse [line]
  (str "Parsed: " line)) ; change it to do whatever you want

(defn mapping [ch]
  (async/map parse [ch]))

(defn start []
  (let [events (mapping
                 (async/chan))]
    (process-file events "10_events.json")
    (async/go-loop []
                   (let [v (async/<! events)]
                     (println v)
                     (recur)))))

About your approach. For me, it seems a legitimate usage for core.async.
Please, send us your impressions once you finish.

Cheers,

Adrian Mowat

unread,
Mar 18, 2015, 11:19:45 AM3/18/15
to clo...@googlegroups.com
Hi Erick

Thanks for getting back to me.  On my system, I wasn't seeing the contents of my file being listed in the REPL.  Your code is working fine though and I can't see anything significantly different so I wonder if I had managed to corrupt my session in some way.

Anyway, it's good to know I'm on the right path.  I'll post my solutions as I get things up and running

Cheers

Adrian

Adam Clements

unread,
Mar 18, 2015, 2:57:35 PM3/18/15
to clo...@googlegroups.com

It's possible you are simply not seeing the println output from a background thread, depending on how your repl etc is set up.


--
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clo...@googlegroups.com
Note that posts from new members are moderated - please be patient with your first post.
To unsubscribe from this group, send email to
clojure+u...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
---
You received this message because you are subscribed to the Google Groups "Clojure" group.
To unsubscribe from this group and stop receiving emails from it, send an email to clojure+u...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Adrian Mowat

unread,
Mar 18, 2015, 3:14:55 PM3/18/15
to clo...@googlegroups.com
Hi Adam

I'm using the latest version on cider + cider-nrepl but it's a possibility.  I suspect it's more of a case that I tried so many different combinations I polluted my repl beyond repair.  My fault for not just using components from the outset :-(

Thanks

Adrian 

Sent from my iPhone
You received this message because you are subscribed to a topic in the Google Groups "Clojure" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/clojure/KV9_37uHnJI/unsubscribe.
To unsubscribe from this group and all its topics, send an email to clojure+u...@googlegroups.com.

Christopher Small

unread,
Mar 18, 2015, 6:07:47 PM3/18/15
to clo...@googlegroups.com
It seems like you're generally on the right track here (though Erick Pintor's code has some nice cleanup, like removal of necessary do, etc). The one thing I'd recommend is testing what happens with a larger channel buffer; if the file io isn't the bottleneck, but rather the processing, this could help the concurrency performance by making sure there's always something to be ready to be taken/worked on; It would be cool to see some metrics on that for your use case.

Best

Chris
Reply all
Reply to author
Forward
0 new messages