Question about streams of futures

32 views
Skip to first unread message

Malibu

unread,
May 8, 2023, 9:20:15 AM5/8/23
to Dart Misc
Hi there,

I have a generic function to subscribe to a wamp topic (Connectanum plugin) and the session.subscribe() returns a Future<Subscribed>.  I need to wait for this future to complete to obtain the Subscribed object, and inside this Subscribed object we have a Stream<Event> that carries the actual events coming from the topic.  So that is a Future that contains an Object that contains a stream.  I want a stream that accepts a command object to create the subscription and then process the Subscribed object downstream.

So I have started with this:

_commandInputSubject.map( (command) => client.subscribe(command) );

But this map returns Stream<Future<Subscribed>> and there is no point of continuing with the stream until the future completes.  What is the best reactive way to do this?

I think the following will work:

_commandSubject.
     map( (command) => client.subscribe(command) ).
     flatMap( (subscribedFuture) => subscribedFuture.asStream() )....

But it seems a lot of overkill to convert the futures to streams just to flatten them back to one stream.  It seems like there should be a way to just create a map function that simply waits for the future to complete and emits the Subscribed object but I can't figure out how.  Any suggestions?

Thanks.

William Hesse

unread,
May 8, 2023, 10:12:31 AM5/8/23
to Dart Misc, Malibu
I think the function you are looking for is Stream.asyncMap instead of Stream.map. I am assuming that the multiple subscription commands
are arriving as a stream, as the example code seems to show.

You can also construct a stream using an async* function - that can be easier to read.
If your input is a stream of commands, that you want to asynchronously fetch a subscription for, and then put all
the events from that subscription's stream onto the result stream, before fetching the next subscription, then I think something
like this should work.  The await for loop will pause the stream commandInputSubject while it is in the body of the loop, forwarding the events from the current subscription.

Stream<T> eventsFromCommands(Stream<Command> commandInputSubject) async* {
    await for (final command in commandInputSubject) {
        final subscribed = await client.subscribe(command);
        yield* subscribed.stream;
    }
}

The function Stream.asyncMap can apply an async function to a stream's elements, and await those functions as part of the mapping process, pausing the input stream.
And the function Stream.asyncExpand can expand a stream of streams into a single stream. So this could also be written with

final subscriptions = _commandInputSubject.asyncMap((command) => client.subscribe(command));
final events = subscriptions.asyncExpand((subscribed) => subscribed.stream);

I haven't tested these answers, so there may be minor mistakes. If you don't want to wait for one subscription to complete
before starting the next command, then you will have to use different code and decide how you want to interleave the events.

Malibu

unread,
May 8, 2023, 11:39:52 AM5/8/23
to Dart Misc, William Hesse, Malibu
Ok thank you very much.  I was thinking of the asyncMap but for some reason I thought it returned a Future instead of a Stream.  I'm still getting all these little nuances straight in my mind.

Thank you for the async* suggestion, and an example of why I find reactive programming so mind bending.  It does work that way too and has indeed simplified my stream a little bit.

Is it considered just as appropriate to pass in the subject stream argument as to chain off of it?  I guess I didn't do that because I felt as if I should be chaining off instead of passing in as an argument.
Reply all
Reply to author
Forward
0 new messages