Asynchronously processing elements of stream in C++

446 views
Skip to first unread message

Vu Pham

unread,
Nov 21, 2018, 1:20:37 PM11/21/18
to ISO C++ Standard - Future Proposals
Hi all,

I have this idea about a set of primitives for processing elements coming from never-ending streams in C++. The main class is stream<T> with the following methods:
  - stream<U> map(function<U(T&)> mapper): map a stream<T> to stream<U>
  - void foreach(function<void(T&)> processor): run the given function on each element of the source stream.
  - void write_to(sink<T>& dest): write all elements to the given sink.
  - and so on.

All the elements are processed asynchronously, in parallel, using either an Executor (coming feature of C++) or std::async.
The usage will be something like this:

stream<MyData> source(...);
sink<TransformedData> destination(...);
source.map(...).filter(...).map(...).write_to(destination);

I found these constructs very helpful for my usecase, where I process messages coming from a distributed queue - something similar to Apache Kafka.
The logic of processing messages can now be expressed using a chain of transformations. I don't need to worry about maintaining a threadpool, fibers, and all that threading operations.

I think it is going to be useful for other usecases as well, especially when we allow users to create their own sources.
Is there anything similar like that in the standard? Do you think something like that will be helpful?

Relately, I also have another design for parallelizing elements from standard collections (vector, list, map, ...).
Unlike streams, those elements are countable and stored in-memory, so we can define many other aggreation operations like reduce, aggregate...
Using these primitives, user can easily parallelize algorithms using functional constructs, without maintaining low-level threading constructs.

I believe this is a useful low-overhead abstraction that will make people love C++ even more!

I would love to discuss the details.

Cheers,
Vu

P.S. This is my first email to this group. I am following guides from your website. Please forward it to the right channel if this is not the correct place :)

Gaetano Checinski

unread,
Nov 21, 2018, 6:57:51 PM11/21/18
to std-pr...@isocpp.org

> Do you think something like that will be helpful?

Definitely. Your idea is very powerful. 
However I'm not sure it is something that should be part of the standard library.
The reason is: There are many different ways to implement functional reactive(FRP) data processing pipelines, each with very different properties.

One of the most popular FRP instances are the 'reactive extensions' libraries, which are implemented in almost every language:
RxCpp is the C++ version and a nice visualisation of it's operators can be found here.
RxCpp also has experimental support for the Coroutine TS. Checkout their example.

Mailtrack Sender notified by
Mailtrack 11/21/18, 11:56:45 PM

--
You received this message because you are subscribed to the Google Groups "ISO C++ Standard - Future Proposals" group.
To unsubscribe from this group and stop receiving emails from it, send an email to std-proposal...@isocpp.org.
To post to this group, send email to std-pr...@isocpp.org.
To view this discussion on the web visit https://groups.google.com/a/isocpp.org/d/msgid/std-proposals/067d8f3c-a700-4a4b-9ca7-6596480b20e9%40isocpp.org.


--
Regards,

Gaetano Checinski
Founder of Loopperfect

Vu Pham

unread,
Nov 22, 2018, 10:45:58 AM11/22/18
to std-pr...@isocpp.org
Thanks for the pointers. It looks like C++ is getting weirder :)

Since I mentioned 2 different things, let's consider each of them and why they should be part of the standard library. Sorry in advance since this is going to be a monologue with lots of (potentially ill-informed) opinions.

The stream<T> construct is a pattern that can be used in a wide variety of applications, including web services, data analysis (processing logs, for example), distributed computing systems (especially for people who do scientific computing)...
People came up with many different implementations for asynchronous event-driven networking libraries for C++, but they tend to be quite scattered.
The stream<T> may serve as a lingua-franca for these kinds of systems in C++.
In term of APIs, the closest examples in other languages I can find are Apache Beam, Flink, Spark. All of them are for the JVM.

The "parallel collection" construct is an easy way to do things in parallel without messing around with threads.
You may be thinking our vectors and lists are usually not that big, and may not be very beneficial to parallelize.
But I think part of the problem is it has never been so easy to parallelize algorithms on collections, that's why we settle with our for-loops and be good with it.
Now, this construct will allow us to rethink a lot of collection-based algorithms. For example, I will be very happy to replace qsort() with a parallel, ideally non-blocking, version of it.

There are similar ideas in other languages: LINQ has a strong SQL flavor and heavily extends the language, which may not really be what we want.
Scala has parallel versions for all of its collections, which is great in term of ease-of-use, but I think maintaining the consistency between the sequential and parallel version will require extra work.
By giving users a mininal set of abstractions, we don't try to be too opinionated, while allowing it to be implemented efficiently on different hardware.

It is hard for me to categorize these constructs. They fall somewhere in the intersection between parallelism, functional programming, and distributed system, big data, etc. if I must.

The only concern for me is these constructs don't feel very C++ (like std::async, for example). The first time I saw std::async, I thought: right, that's what a C++ dude will do, not an Executor.
I understand it will take a lot of time for something to become a standard, so I am sharing these ideas so that maybe we can make them more like a C++ construct.
Using pipes like in RxCpp is a curious way to do it.

I come from an engineering background where I tend to build distributed systems to crunch numbers.
The JVM is great for building distributed systems at ease, but not so great for crunching numbers.
C++ is great for crunching numbers (with BLAS and friends), but not so great for building distributed systems (you can build great distributed system in C++, but it will take a significant effort).
I think that is quite unfortunate, and we can definitely expect more from C++.


Btw, when I said:
Unlike streams, those elements are countable and stored in-memory...
I meant to say "finite" instead of "countable", with the implication that streams are infinite, although nothing is practically infinite in our profession :)

Vu Pham

unread,
Nov 30, 2018, 8:15:18 AM11/30/18
to std-pr...@isocpp.org
I think I had an overdose of caffein and came up with another API to make this more like C++. The idea is to use the extraction and insertion operators on streams:

stream<T> source(....)
sink<U> my_sink(...)
function<U(T&)> map_fn = ...            // a map function
function<void(U&)> foreach_fn = ...          // a foreach function

// the API:
source >> mapper(map_fn) >> foreach(foreach_fn);

// intermediate results can be stored:
auto mapped = source >> mapper(map_fn);
mapped >> write(my_sink);

- mapper(), foreach(), write()... are utility boilerplate that turn std::function<..> into an instance of stream_processor.
- the API source >> mapper(map_fn) is implemented as an overload of operator>>(stream& s, stream_processor& processor), that returns another stream or void, depending on the type of the given stream_processor.

I think this can be a C++ way to construct complicated pipelines (I almost feel like a C++ fashionista).
The extreme of this is to get rid of mapper(), foreach().. altogether: source >> map_fn >> foreach_fn;
However it may be confusing for developers.

For parallel collections, the summary operations can be supported similarly:
future<U> result = source >> mapper(map_fn) >> aggregator(<initial value>, <combiner>, <aggregator>);

What do you think?
If this doesn't look good enough, I will double my caffein consumption and may come up with something better (or worse, no promise).

Cheers,
Vu



--
PHAM Hoai Vu

Tony V E

unread,
Nov 30, 2018, 10:40:30 AM11/30/18
to Vu Pham
Who decides the parallelism? The stream?

Your stream might make a good high level API, but we tend to first standardize lower level APIs (from which you can build higher APIs). Like executers and queues and ranges.

(it may be a mistake that we focus so much on low level, but if we don't give tools that offer 100% performance for all cases (ie by allowing custom allocators, etc) then people just write there own anyhow.)

If we were to separate your APIs into parts, what do we end up with?


Sent from my BlackBerry portable Babbage Device
From: Vu Pham
Sent: Friday, November 30, 2018 8:15 AM
Subject: Re: [std-proposals] Asynchronously processing elements of stream in C++

--
You received this message because you are subscribed to the Google Groups "ISO C++ Standard - Future Proposals" group.
To unsubscribe from this group and stop receiving emails from it, send an email to std-proposal...@isocpp.org.
To post to this group, send email to std-pr...@isocpp.org.

Vu Pham

unread,
Nov 30, 2018, 12:00:47 PM11/30/18
to std-pr...@isocpp.org
Good question. I was imagining the level of parallelization is decided by a stream system, which serves as both the Scheduler and Factory for the stream source. Something like this:

stream_system system;
stream<U> source = system.my_source(....);

Another possibility is to add a few utility functions that can be injected into the >> operators:
source >> mapper(map_fn) >> parallelize(5) >> ...
Much like setprecision() for std::cout.

Relately, how the parallelism is decided in std::async?

If there is enough interest, I can consolidate this into a concrete proposal doc.

Gaetano Checinski

unread,
Nov 30, 2018, 12:40:13 PM11/30/18
to std-pr...@isocpp.org
I'd like to emphasize by repeating my previous statement:

> There are many different ways to implement functional reactive(FRP) data processing pipelines, each with very different properties.

I think this discussion would benefit by talking about the various properties your API implicates.

As a site note: I implemented a coroutine and pull based stream processing library: https://github.com/LoopPerfect/conduit
The API you depicted is very similar to conduits.

Here a list of questions that are important in my opinion:

Push or Pull ? - intersection of both?

Examples

Stream<tuple<T, U>> stream3 = zipWithLatest(stream1, stream2)
Interval(ms) // emits a counter and increments every ms
stream >> throttle(n, ms) // emits up to n events per ms


First order or Higher order FRP ?

Example:
 
stream1 >> map([](int ms){ return Interval(ms);  }); 
counter() >> switchMap({stream1, stream2}, [](int i){ return i%2; });
// alternates between elements between stream 1 and stream 2

Questions:
does the counter fire if noone is listening?
Can you subscribe after unsubscribe? - can you suspend and resume the event-source?

Backpressure and Buffering
Example:

counter() 
  >> flatMap([](auto x){ return delayAndEmit(10ms, x); })
  >> flatMap([](auto x){ return delayAndEmit(100ms, x); }) 

Questions:

After every emission, 10 more events are generated, are they buffered, swallowed or delayed via backpressure? 
Do you buffer events? 
what happens if the buffer is full?  
how do you buffer?


State Management and Purity

counter() 
  >> scan(0, [](int a, int b){ return a+b; }) // emits the partial sum 0...n

In order to make scan work, we need to store it's state somewhere. Where do we store it?

It might sound like a silly question but bear with me:

What if you want to serialize the state of the eventstream, close the application and resume the stream after restart?

Transducer based stream processing libraries maintain often the separation of sinks, producers and pipes(transformers).
The subscriber/sink then receives  a stateless pipe and a associated stateobject and drives the iteration. 
This also enables the end user to decide various details FRP questions like push vs. pull or in the switchmap example.

subscriptions: static or dynamic

Can I create a firing observable and subscribe to it later?
Can you have multiple subscribers? - Will everyone receive the same events?

Regarding Schedulers and Executors:

RxCpp has shown that the decision how things are executed can be deferred by the end user. 
I don't see a big problem why your proposed API should not be able to support this. 
However I'm not sure whether offering support for custom schedulers and executors support will be free (either cost in performance or complexity)


Now the real question: Can we design one generic/abstract API that can defer all those question to the implementer? 


Mailtrack Sender notified by
Mailtrack 11/30/18, 5:14:43 PM

Bjorn Reese

unread,
Dec 1, 2018, 10:55:54 AM12/1/18
to std-pr...@isocpp.org
On 11/30/18 6:39 PM, Gaetano Checinski wrote:

> As a site note: I implemented a coroutine and pull based stream
> processing library: https://github.com/LoopPerfect/conduit
> The API you depicted is very similar to conduits.

Can you elaborate on the design decision to compose operations
into a stream, and why you have operations like orElse and forEach?

Given that you are using co-routines, why didn't you compose operations
by writing a co-routine with the normal C++ syntax like the if-else
statement?

Gaetano Checinski

unread,
Dec 1, 2018, 1:21:08 PM12/1/18
to std-pr...@isocpp.org
> why you have operations like orElse and forEach?

lhs >> orElse(rhs)

OrElse Is just a special compositional operator that streams rhs stream only iff lhs is an empty stream.
It is the same idea as behind getOrDefault(value) in other monads(eg. maybe) applied to streams.

lhs >> forEach(side-effect)

First of all, forEach is not a sink and does not transform the stream.
However this one is admitelly questionable from a FP perspective. 
It's purpose is to inject a side effect into the processing pipeline.

For both of those operators other names might have been better.

> Can you elaborate on the design decision to compose operations into a stream
We aimed to implement common functional operators we were using on a daily basis for data processing in our various products.

> Given that you are using co-routines, why didn't you compose operations
by writing a co-routine with the normal C++ syntax like the if-else
statement?

I'm not sure what you mean. I'd argue I did. Every operator is implemented as a coroutine.

Are you asking why you should use Conduits operators as opposed to just writing coroutines?

Well Conduit provides implements high-level operations that naturally occur in stream processing.
In the process of  DRYing up your codebase you would end up refactoring your code into those operators.
They are part of the functional programmers vocabulary.

> Can you elaborate on the design decision to compose operations
into a stream

We wanted to offer zero-cost compositional operators that work with the current Coroutine TS.
RxCPP already provides a reactive push-based solution that works nicely with open-systems. 
We focused to build a solution for closed systems using a pull based model.

Composability and Reusability

We wanted to make composition cheap and easy.
We realized that we can maximize those metrics if you can defer the control of the computation to the consumer.
This way the algorithm implementor can defer decisions like storage and amount of iterations to the consumer of the sequence.
As a solution we implemented inversion of control via lazyness. As a result all operators are lazy 

Zero-Cost

As we wanted to enable users to write high-level abstraction, a runtime overhead was not acceptable to us.
We were intrigued by the Coroutine TS and investigated what is doable in its current state.
We were mostly successful but there are a couple cases where the optimizer fails to remove a heap allocation.

Simplicity

We wanted to make sure it is easy to add new operators. 
All existing solution are quite complicated as they try to handle and optimize for all the different types of iterators.
Some of them implement Ranges TS' '.base' method. 
We don't think it is required in any scenario and think it is a bad idea as it encourages place oriented programming (which is the source of many bugs).

Thanks to Coroutine TS implementation is quite trivial.
Coroutine TS even type erases the handle enabling Conduit to provide one type Seq<T> for all coroutines.
As a result it compile times are very low compared to range-v3 for instance.

Detour Open vs. Closed Systems or Push vs Pull

Open-Systems

A system where datastreams are external events you have no control over.
Examples are timers and I/O like sockets or mouse moves.
The metrics in open systems you want to control are usually latency and concurrency.
Those systems follow often a push based to enable the datasource to have control over the consumer. 

Closed-Systems

A system where data(generation) is fully controlled by the system.
Examples are simulations or parsers where determinism and predictable performance matters.
Those systems follow often a pull based to enable the consumer to control the computation.


There is a big overlap in what you can do in pull and push based systems.
However I don't believe it makes sense to use one 
Many open systems consist of many parts that are closed systems.
HTTP Servers are here a good example:
Incoming requests are outside your control but once you received a request you have full control over that chunk of data.
Parsing the requests using a pull based solution might be sensible.


Mailtrack Sender notified by
Mailtrack 12/01/18, 6:20:38 PM

--
You received this message because you are subscribed to the Google Groups "ISO C++ Standard - Future Proposals" group.
To unsubscribe from this group and stop receiving emails from it, send an email to std-proposal...@isocpp.org.
To post to this group, send email to std-pr...@isocpp.org.

Vu Pham

unread,
Dec 5, 2018, 8:22:26 AM12/5/18
to std-pr...@isocpp.org
Lots of good ideas flying around.

I think all your questions boils down to 2 things (actually 1 thing):
- how to manage states in different stages of the stream processing pipeline, and
- How to buffer (Of course the buffer itself can be seen as a state, but it may need special care because it may grow really big).

If we have a good answer for both questions, all your examples can be implemented:
- The example in "Push or Pull" can be implemented by a map() that keeps track of the timestamp of the last message. That can be considered as a state.
- The "alternative between elements of 2 streams" example is similar to how you would implement a zip() operation, which is essentially a map() with buffers for both streams.

Backpressure is tricky. Since we are doing things locally, we may backpressure, but it doesn't quite follow the key principles.
I think we should try to make the system works in which the producer must produce as fast as possible. If the consumer is slower, it is consumer's problem.
In the distributed setting, this could be solved by just adding a new bunch of consumers (on different machines).
Since we do it locally here, it is going to be a trade-off between memory (how much you buffer) and processing power (how many new threads you can spawn). When it is unclear, we can leave it to the users to configure.
Buffer may be actually not too bad. We can use memory-mapped files when it is neccessary.

The question of state management is a good one. In general functions used in these framework doesn't have to be stateless, they only need to be thread-safe. You can certainly have processing stages that are stateful(ly) thread-safe.
Usually the users (i.e. developers) take care of the states, with locks and whatever they are comfortable with.

If we choose to have built-in support for states, we most likely can do so by recruiting a ConcurrentHashMap, that will be checkpointed along with the whole pipeline.
Elements in the state map must implement some API so that they can be checkpointed. Everything is local here so it simplifies the job a lot.
The state map will then be passed into user-provided functions.




Kirk Shoop

unread,
Dec 6, 2018, 1:04:13 AM12/6/18
to ISO C++ Standard - Future Proposals
Hi,

I am arriving late to this thread. I am happy to see more ideas and discussion on this topic. This space is a passion of mine. This year I joined Eric Niebler and Lewis Baker to build async ranges in a way that integrates coroutines and executors with the existing ranges library. 

Executors
-------------

We realized earlier this year that the first step was to be involved in the Executors design. We wrote a paper (http://wg21.link/P1055r0) to start the conversation. If you look at the paper you will see that the design is planned to extend to support Future and Stream (not with those names). The paper resulted in a meeting in Bellevue, before CppCon, to agree on a unified approach for Executors. This was followed by a paper (https://wg21.link/P1194r0) at the San Diego meeting describing the agreed approach for Executors design.

Concepts
-------------

The core idea in the papers we have written is to define a set of Concepts (rather than types) for async values similar to how Iterators define sequences of sync values. The SanDiego post-mailing should have a paper (http://wg21.link/P1341R0)(?) that will show the work Lewis has been doing to combine the coroutines-ts with Senders in a way that can be applied generally for async in C++.

Push and Pull
-------------------

The Sender/Receiver concepts can be described in different ways. They are formally-defined callbacks. They are Async Iterators. They are Streams. They Push values.

The Coroutines-TS enables the compiler to transform a function that looks like it is synchronous Pull into an implementation that is Push. Another way to express this is that coroutines-TS is sugar syntax to hide callbacks.

When Senders and Coroutines-TS are combined, the same expression can be used in the Push model or the Pull model as desired.

Pipelines
------------

There was another paper at SanDiego to add pipelines to C++ (http://wg21.link/p1261r0). This paper came from the hetero-c group in SG14. I am working with them to write a paper that will incorporate the Sender/Receiver model.

Implementation
--------------------

There is an implementation under active development here (https://github.com/facebook/folly/tree/master/folly/experimental/pushmi)
To see that implementation in action (https://godbolt.org/z/woVAi9)

RXCPP
----------

I also worked on rxcpp. rxcpp supports both operator| and operator>>. rxcpp also supports chaining with 'dot' source().map(..).filter(..).subscribe(); The original preference was 'dot' operator| was adopted to align with the range library which is now in the standard. pushmi and rxcpp are solving the same problem that the range library solves, they Push values distributed in time where range Pull values distributed in space. It helps usage when they share the same structure and composition.

Contribute
--------------

Please do play with existing libraries and get involved with feedback, papers, etc.. The goal is to have a solid foundation for async in C++ and the more perspectives the better.

Kirk
Reply all
Reply to author
Forward
0 new messages