verticles with a streaming API

156 views
Skip to first unread message

Bert Robben

unread,
Dec 4, 2021, 10:27:26 AM12/4/21
to vert.x
I'm designing a streaming service and am struggling to express it properly using Vertx concepts. Help would be appreciated.

Note that for the sake of the discussion, I've greatly simplified the example such that it only focusses on the streaming aspect. The point is not to find some existing technology that already does this, but rather to find how certain patterns (in this case shared objects with a streaming API) can be expressed with Vertx.

----------------

I want to design a reporting service that can handle a set of reports. To each report data can be appended. Each report can also be read. Since the report can be quite large, I want to design it with a streaming API.

interface Report {
    WriteStream<String> append();   
    ReadStream<String> read(int position);   
}

It's important to understand that the report keeps state in memory. For instance, it might stream data that is being appended immediately to open read connections. Since the implementation of it is complex, I want to design it in such a way that, for any given instance of Report, all its logic is always executed within the same EventContext (just like a verticle). 

On top of this, I have a reporting service exposed through some REST API.

interface ReportingService {
   void append(RoutingContext rc);
   void read(RoutingContext rc);
}

The implementation of ReportingService#append should do a lookup to find the report with the name (extracted from the request) and then set up a pipe to stream all the data from the requestContext into the report. Once that pipe has finished, it can close the response context.

The implementation of ReportingService#read is similar. It should do a lookup to find the report with the name (extracted from the request) and then set up a pipe to stream all the data from the report into the response context. Once that pipe has finished, it can close the response context.

---------------

The vanilla approach (AFAIK) would be to make each report a verticle and use the event bus to talk from the reporting service to the report. However, that doesn't work since the EventBus doesn't support streaming (see e.g https://groups.google.com/g/vertx/c/KX0qopBJoTo/m/lvSgkf3lAwAJ). 

Would it be ok to pass the RoutingContext object through the EventBus (I don't want or need any HA or distribution; local communication within the vertx instance is good enough) and then use it from within a Report verticle? 
It feels like cheating and I fear I might break some concurrency contract while doing so.

A similar approach (that feels also like cheating) that would by pass the event bus completely, might look like

// pseudo-code
// it would be problematic for instance to get hold of the report object
report.getContext().runOnContext(() -> report.read(position).pipeTo(rc.response()));


So, what would be the good way to do this?

thanks,

Bert

Thomas SEGISMONT

unread,
Dec 6, 2021, 4:43:27 AM12/6/21
to ve...@googlegroups.com
Hi,

If the report can be large, are you sure you must keep it in memory?
I'm asking because I think this could be solved nicely with the Vert.x FileSystem API.

About having a verticle per report, I would personally refrain from going down that route.
Usually it's best to think of verticles as processing units.

--
You received this message because you are subscribed to the Google Groups "vert.x" group.
To unsubscribe from this group and stop receiving emails from it, send an email to vertx+un...@googlegroups.com.
To view this discussion on the web, visit https://groups.google.com/d/msgid/vertx/f37eabc0-1003-4505-8165-4c34e227a6a7n%40googlegroups.com.

Bert Robben

unread,
Dec 10, 2021, 1:55:43 AM12/10/21
to vert.x
The report was just an example I made up which I hoped would be easy to understand. The underlying question is how I can stream data from one verticle to another. Think of the first verticle as the verticle that handles REST requests. The second verticle is the verticle that can stream (read/write) the report (yes, it uses the file system). The first verticle wants to "forward" incoming streaming REST requests to the correct report.

I can do that by writing to the filesystem and reading it back. Or writing it to a socket and reading it back, or writing it over an http connection and reading it back. But all of that feels rather silly.

Bert

Bert Robben

unread,
Dec 10, 2021, 3:59:14 AM12/10/21
to vert.x
What do you think of the following approach for a WriteStream? The idea is that the report verticle wraps a ContextSwitchingWriteStream around its own internal WriteStream and passes this to the request handler verticle. The wrapper ensures both the report verticle and the request handler verticle get all their respective callbacks on the proper event thread.
To get this object passed through the EventBus I created a special Codec for  ContextSwitchingWriteStream that doesn't do any transformation and returns the object as is.

Would this be acceptable in the Vertx universe?

Bert

public class ContextSwitchingWriteStream<T> implements WriteStream<T> {

private final Context context;
private final WriteStream<T> delegate;
private int writesInTransit;
private int bufferSize = 5;
private boolean full;
private Handler<Void> drainHandler;

public ContextSwitchingWriteStream(Vertx vertx, WriteStream<T> delegate) {
this.context = vertx.getOrCreateContext();
this.delegate = delegate;
}

@Override
public WriteStream<T> exceptionHandler(Handler<Throwable> handler) {
Context callingContext = Vertx.currentContext();
context.runOnContext(v -> delegate.exceptionHandler(t -> callingContext.runOnContext(vv -> handler.handle(t))));
return this;
}

@Override
public Future<Void> write(T data) {
Promise<Void> promise = Promise.promise();
write(data, promise);
return promise.future();
}

@Override
public void write(T data, Handler<AsyncResult<Void>> handler) {
writesInTransit++;
if (writesInTransit == bufferSize) {
full = true;
}
Context callingContext = Vertx.currentContext();
context.runOnContext(v -> delegate.write(data, ar -> callingContext.runOnContext(vv -> {
writesInTransit--;
drainIfNeeded();
handler.handle(ar);
})));
}

private void drainIfNeeded() {
if (full && writesInTransit < bufferSize / 2 && drainHandler != null) {
full = false;
drainHandler.handle(null);
}
}

@Override
public void end(Handler<AsyncResult<Void>> handler) {
Context callingContext = Vertx.currentContext();
context.runOnContext(v -> delegate.end(ar -> callingContext.runOnContext(vv -> handler.handle(ar))));
}

@Override
public WriteStream<T> setWriteQueueMaxSize(int maxSize) {
bufferSize = maxSize;
return this;
}

@Override
public boolean writeQueueFull() {
return full;
}

@Override
public WriteStream<T> drainHandler(Handler<Void> handler) {
this.drainHandler = handler;
drainIfNeeded();
return this;
}
}

Thomas SEGISMONT

unread,
Dec 20, 2021, 11:01:28 AM12/20/21
to ve...@googlegroups.com
That might work but I don't understand why you can't implement everything in a single verticle.

Bert Robben

unread,
Jan 6, 2022, 5:46:36 AM1/6/22
to vert.x
I can have many different reports, each of which I want to represent by a separate verticle. The REST call arrives in some context, which is definitely not in the verticle of the report that the request is targeting. If I want to stream data from my report verticle to the web request response stream, then I need to bridge this stream across context boundaries.

The alternative approach would be to not to put the reports in separate verticles but rather just have one single verticle that contains both my REST layer and all report objects. Is that what you mean with implementing everything in a single verticle? As I understand this approach, that would basically mean that I do all my "business logic" in a single thread, which to me feels too limiting.

Thomas SEGISMONT

unread,
Jan 7, 2022, 4:28:41 AM1/7/22
to ve...@googlegroups.com
Le jeu. 6 janv. 2022 à 11:46, Bert Robben <bert....@gmail.com> a écrit :
I can have many different reports, each of which I want to represent by a separate verticle. The REST call arrives in some context, which is definitely not in the verticle of the report that the request is targeting. If I want to stream data from my report verticle to the web request response stream, then I need to bridge this stream across context boundaries.

The alternative approach would be to not to put the reports in separate verticles but rather just have one single verticle that contains both my REST layer and all report objects. Is that what you mean with implementing everything in a single verticle?

Yes, this is what I mean.
 
As I understand this approach, that would basically mean that I do all my "business logic" in a single thread, which to me feels too limiting.

If you deploy a single instance of the verticle, yes. But if you deploy multiple instances of the same verticle, you can use all the cores available.
 

vinste...@gmail.com

unread,
Jan 8, 2022, 3:43:41 AM1/8/22
to vert.x
I don't understand why you would want to have a verticle for every incoming report. This seems a lot of overhead to me. Maybe other actor based implementations might make this more logical but for vert.x I think a processing verticle should be sufficient. 

Just handle everything through one verticle maybe over all your cpus like suggested
Reply all
Reply to author
Forward
0 new messages