Chaining streams issue

91 views
Skip to first unread message

Don Olmstead

unread,
May 20, 2015, 3:40:43 PM5/20/15
to mi...@dartlang.org
I'm running into an issue with streams where I'm wondering what the best way to do things would be.

In this particular scenario I'm using the PostgreSQL connector library to execute statements. The library has the concept of a connection pool which you can request from. After getting the connection you can run the query which returns a Stream. An attempt using async/await syntax looks like this.

  Stream<dynamic> executeSql(String statement) async* {
   
// Get the connection from the pool
   
var connection = await _connectionPool.connect() as postgres.Connection;


   
// Get the values
   
var values = connection.query(statement).map((row) => row.toList());


   
// Return the connection to the pool
    connection
.close();


   
yield* values;
 
}

The issue I'm encountering is that the Stream gets in a bad state, where its already being listened to.

So I'm wondering what the best way to accomplish something like that is.

Natalie Weizenbaum

unread,
May 20, 2015, 4:01:35 PM5/20/15
to General Dart Discussion
It looks like you're closing the connection without waiting for the query to complete. If you call connection.close() after yield* values, it should work.

--
For other discussions, see https://groups.google.com/a/dartlang.org/
 
For HOWTO questions, visit http://stackoverflow.com/tags/dart
 
To file a bug report or feature request, go to http://www.dartbug.com/new

To unsubscribe from this group and stop receiving emails from it, send an email to misc+uns...@dartlang.org.

Don Olmstead

unread,
May 20, 2015, 4:20:09 PM5/20/15
to mi...@dartlang.org
Good catch Natalie. That's something I missed when transitioning over to a Stream based API. I'm still getting an error from the VM about the Stream being in a bad state if I just comment out the close part to see if that was my problem.

At a high level if you had something like this where the stream creation is deferred.

Future<Stream> getStream() async => _stream;

Stream transformStream() async* {
 
var stream = await getStream();

 
yield* stream.map(doMapping);
}

Does that work as a general pattern for this? Or should I be avoiding the async* method when trying to accomplish something like this?

Natalie Weizenbaum

unread,
May 20, 2015, 4:27:54 PM5/20/15
to General Dart Discussion
Unfortunately, I don't have enough experience with async* to know :(.

On Wed, May 20, 2015 at 1:20 PM, Don Olmstead <don.j.o...@gmail.com> wrote:
Good catch Natalie. That's something I missed when transitioning over to a Stream based API. I'm still getting an error from the VM about the Stream being in a bad state if I just comment out the close part to see if that was my problem.

At a high level if you had something like this where the stream creation is deferred.

Future<Stream> getStream() async => _stream;

Stream transformStream() async* {

 
var stream = getStream();

 
yield* stream.map(doMapping);
}

Does that work as a general pattern for this? Or should I be avoiding the async* method when trying to accomplish something like this?

--

Don Olmstead

unread,
May 20, 2015, 4:30:18 PM5/20/15
to mi...@dartlang.org
Would there be a way you'd approach it without the async bits?

Natalie Weizenbaum

unread,
May 20, 2015, 4:33:48 PM5/20/15
to General Dart Discussion
I'd probably write a utility function that takes a Future<Stream> and returns a Stream, and use that.

Jesse Riggins

unread,
May 20, 2015, 4:42:58 PM5/20/15
to mi...@dartlang.org
I'm so glad you shared this (and of course Natalie's response).  I'm not using async/await yet (but I am using the postgres driver), but I KNOW this would have been an issue I would have spent hours on.

Róbert Tóth

unread,
May 20, 2015, 4:44:19 PM5/20/15
to mi...@dartlang.org
You forgot to put await before getStream();
I'm not sure if yield* is needed here. A simple return would be better with async instead of async*.


2015. május 20., szerda 22:20:09 UTC+2 időpontban Don Olmstead a következőt írta:
Good catch Natalie. That's something I missed when transitioning over to a Stream based API. I'm still getting an error from the VM about the Stream being in a bad state if I just comment out the close part to see if that was my problem.

At a high level if you had something like this where the stream creation is deferred.

Future<Stream> getStream() async => _stream;

Stream transformStream() async* {

 
var stream = getStream();

 
yield* stream.map(doMapping);
}

Don Olmstead

unread,
May 20, 2015, 4:48:09 PM5/20/15
to mi...@dartlang.org
Edited on the mailing list cause I forgot await in the getStream example. That's what I get for writing code in Google Groups.

@Robert if you use async you implicitly return a Future. async* is for returning a Stream.

--

Róbert Tóth

unread,
May 20, 2015, 5:33:37 PM5/20/15
to mi...@dartlang.org
I did a quick check and you were right. I thought that yield* is only good, if it's used with yield, but I was wrong.

import 'dart:async';

main() async{
  Stream otherStream = transformStream();
  await for(var stuff in otherStream){
    print(stuff);
  }
}

Stream transformStream() async* {
  var fooStream = await foo();

  yield* fooStream.map((item)=>item*2);
}

Future<Stream> foo() async{
  return aStream(6).map((item)=>2*item);
}

Stream aStream(n) async*{
  int k = 1;
  while (k < n) yield k++;
}

Don Olmstead

unread,
May 20, 2015, 6:31:54 PM5/20/15
to mi...@dartlang.org, kkdst...@gmail.com
Found my issue. I had an await hanging out from when the API was Futures based which was causing the Stream to get two listeners and thus get in a bad state. The analyzer still isn't that great about reasoning around async/await code so no problems were reported.

Just a heads up that something like the following works perfectly when you want to return a stream but the stream you want to create is created asynchronously.

import 'dart:async';

Future<int> getMultiplier() async => 42;
Future<Stream> getStream() async => new Stream.fromIterable([0, 1, 2, 3, 4]);

Stream transformStream() async* {
 
var stream = await getStream();

 
yield* stream.map((value) => value * 2);;
}

Stream transformAgain() async* {
 
var multiplier = await getMultiplier();

 
yield* transformStream().map((value) => value * multiplier);
}

main
(List<String> arguments) async {
  await
for (var value in transformAgain()) {
   
print(value);
 
}
}

Thanks for all the help everybody! Appreciate it.

Greg Lowe

unread,
May 21, 2015, 5:07:42 AM5/21/15
to mi...@dartlang.org, kkdst...@gmail.com
In many web apps the database is actually the bottleneck, not the application server cpu or memory. This means it's often best to do the following:

  List result = await connection.query("...").toList();

This means that the application server will read the data from the connection as fast as possible, which frees up the database to handle another request sooner.

The other advantage, is it makes the code simpler.

The caveat is for very large datasets you will need to use streams, or you will use a lot of memory.

For mapping result sets to objects I usually just use the List.map(). The mapper functions are synchronous so pretty easy to work with. For example:

class Crayon {
   
String color;
   
int length;
}


List<Crayon> crayons = await connection
 
.query('select color, length from crayons')
 
.map((row) => new Crayon()
                 
..color = row.color
                 
..length = row.length)
 
.toList();


I look forward to seeing how dogma progresses - cheers!

Lasse R.H. Nielsen

unread,
May 21, 2015, 11:13:00 AM5/21/15
to mi...@dartlang.org, kkdst...@gmail.com
As a general comment, I don't recommend returning a Future<Stream> ever. Just return the stream.
If the stream isn't ready immediately, then just wait as long as necessary before providing the first stream event.

As Natalie wrote, you can write a function from Future<Stream> to Stream in case someone else didn't take the advice:

  Stream unFuturizeStream(Future<Stream> bad) async* {
     yield* await bad;
  }

That's what the code returning the Future<Stream> should have done :)

/L

/L

--
For other discussions, see https://groups.google.com/a/dartlang.org/
 
For HOWTO questions, visit http://stackoverflow.com/tags/dart
 
To file a bug report or feature request, go to http://www.dartbug.com/new

To unsubscribe from this group and stop receiving emails from it, send an email to misc+uns...@dartlang.org.



--
Lasse R.H. Nielsen - l...@google.com  
'Faith without judgement merely degrades the spirit divine'
Google Denmark ApS - Frederiksborggade 20B, 1 sal - 1360 København K - Denmark - CVR nr. 28 86 69 84
Reply all
Reply to author
Forward
0 new messages