[ANN] Concurrently - A library for making concurrent process-pipeline backed by core.async

83 views
Skip to first unread message

Tsutomu YANO

unread,
Sep 12, 2021, 8:30:15 AM9/12/21
to clo...@googlegroups.com
Hi clojurians,

We publish our library for concurrent processing of data with core.async. named 'concurrently'



With 'concurrently', programmers can create shared process-pipelines backed by core.async and 
can share the pipelines safely/easily from multiple requester-threads. 

Shared pipeline can accepts requests from many requester-threads and handle the requests in 
shared concurrent pipeline of core.async and then split the calculated results to each requesters.

Using core.async for creating shared pipeline causes many difficulties for programmers because:

* Shared channels easily stack, because if some requester threads stop reading a channel for 
  accidental exceptions, some data remain unread and nobody can write the channel. it might 
  causes a shared pipeline stops (stacks) for waiting to write data to the unread channel.
* You must carefully handle channels as they NEVER stack.
* All input-data are put onto a same pipeline and you must SPLIT data from output of the shared pipeline
   in any way for returning the processed results to each requester-threads. 


'concurrently' handles all the problems described above. 
Each requster-threads just pass input-data by calling a 'concurrently' functions. Shared pipeline will return a 'Job',
then Each requster-threads can get all calculated results for the input-data by calling 'get-results' function on the job.
No verbose tasks for protecting channels from accidental stack or splitting calculated results for each requesters.


UZABASE is a company based on Japan that is processing/publishing financial data on SaaS applications,
and uses Clojure language for many tasks.

This 'concurrently' library is made for our streaming-data-processing tasks and already is used for over 1 year.
Problems of this library are found and already are fixed while the long application on real tasks, 
So we decided to publish this library for many programmers.

We hope that it is useful for someone.

Thank you.



Tsutomu YANO

Rangel

unread,
Sep 12, 2021, 11:36:35 AM9/12/21
to clo...@googlegroups.com
Interesting project. 

Can you expand on any differences or similarities with core.async's pipeline, pipeline-async, etc ?


--
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clo...@googlegroups.com
Note that posts from new members are moderated - please be patient with your first post.
To unsubscribe from this group, send email to
clojure+u...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
---
You received this message because you are subscribed to the Google Groups "Clojure" group.
To unsubscribe from this group and stop receiving emails from it, send an email to clojure+u...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/clojure/47D5CA02-79AA-4267-9EFA-4B63D415957D%40me.com.
Message has been deleted

Tsutomu YANO

unread,
Sep 13, 2021, 12:38:51 AM9/13/21
to clo...@googlegroups.com
(Sorry, I send a response before writing text fully. This is a repost)

The most large difference is safe exception-handling and easy per-request result-handling.

'concurrently' is built on top of pipeline/pipeline-blocking of core.async. It have functions like
`concurrent-process` and `concurrent-process-blocking` that depend on `pipeline` and `pipeline-blocking`,
so you can pass input and output channels and parallel-count to the functions same as core.async,
'concurrently' internally create pipeline and use it for execution of a supplied transducer.

But APIs of core.async are very primitive. Programmers must handle channels very carefully for protecting
program from accidental exceptions by passing exception-handlers for ALL transducers (if you forget it,
the exceptions thrown by the exception-handler never be caught and just a stack-trace is printed to stdout.
Application easily lost a chance to handle exceptions).

And in a usecase where you are building a web application that have a shared single pipeline, and many
requests use the same shared pipeline for calculation, you must carefully handle the output of the shared
pipeline for retrieving only results for a request (because the pipeline is shared, the output contains results of
other requester-threads). If some requester-thread mishandling the output-channel and stop reading their own
results, the data will remain in the output-channel eternally and the the pipeline will stop working.
It means that a thing similar to DEAD-LOCK of multithread programming occurs easily.

'concurrently' is useful for such usecase. Most of verbose exception-handlings and per-thread result-handlings are
handled by 'concurrently'. Things programmers must to do is just passing input data to 'concurrently' function,
and read the result-channel returned by the function by calling 'get-results'.

'concurrently' is a kind of a high-level API for core.async.


Tsutomu YANO


> 2021/09/13 13:09、'Tsutomu YANO' via Clojure <clo...@googlegroups.com>のメール:
>
> The most large difference is safe exception-handling and per-thread
>
> 'concurrently' is built on top of pipeline/pipeline-blocking of core.async. It have functions like
> `concurrent-process` and `concurrent-process-blocking` that depend on `pipeline` and `pipeline-blocking`,
> so you can pass input and output channels and parallel-count to the functions. It is same with core.async.
>
> But 'concurrently' wraps the created pipeline for protecting the pipeline
>
>
>
>> 2021/09/13 0:36、Rangel <rasp...@gmail.com>のメール:
>> To view this discussion on the web visit https://groups.google.com/d/msgid/clojure/CAPo-QOsZ8WQRZkKvEsMzx1_c68a93e9eMZRaZgzuf_0i_ziQDQ%40mail.gmail.com.
>
> --
> You received this message because you are subscribed to the Google
> Groups "Clojure" group.
> To post to this group, send email to clo...@googlegroups.com
> Note that posts from new members are moderated - please be patient with your first post.
> To unsubscribe from this group, send email to
> clojure+u...@googlegroups.com
> For more options, visit this group at
> http://groups.google.com/group/clojure?hl=en
> ---
> You received this message because you are subscribed to the Google Groups "Clojure" group.
> To unsubscribe from this group and stop receiving emails from it, send an email to clojure+u...@googlegroups.com.
> To view this discussion on the web visit https://groups.google.com/d/msgid/clojure/9D6F96FC-171F-4E0B-9CCA-5CC252C5FD37%40me.com.

Message has been deleted

Christopher Small

unread,
Sep 13, 2021, 11:11:40 PM9/13/21
to Clojure
Cool project; Thanks for working on and sharing this.

Worth mentioning that Christian Weilbach built a thing called superv (based on the supervisor pattern in Erlang) which solves some similar problems using macros with some of the other core.async api, but I don't think implemented a version of either pipeline or pipeline blocking as you have here. So maybe something to look at either for complementary tooling, or for other ideas about how to approach this problem.

Tsutomu YANO

unread,
Sep 14, 2021, 12:03:58 PM9/14/21
to clo...@googlegroups.com
Thank you, Chris

I didn’t know about the project you mentioned. It seems like very interesting project. I will check it for knowing another approach on this problem.


Tsutomu YANO

2021/09/14 12:11、Christopher Small <metas...@gmail.com>のメール:


Reply all
Reply to author
Forward
0 new messages