how to throttle new Future operations?

73 views
Skip to first unread message

Istvan Soos

unread,
Jun 19, 2012, 6:07:42 PM6/19/12
to General Dart Discussion
The following simplified example tries to calculate the sum length of
your files:

#import("dart:io");
main() {
int sum = 0;

DirectoryLister lister = new Directory("/home/username/").list(true);
lister.onError = (e) { throw e; };
lister.onFile = (String file) {
// print("File: $file");

// Please ignore the length() vs lengthSync(), because my
// actual use case is a longer IO operation, e.g. DB call.
Future f = new File(file).length();

f.then((int c) {
sum += c;
// print(" - sum: $sum ($file)");
});
};

lister.onDone = (bool success) {
print("Final sum: $sum");
};
}

There are two major problems with this example:
- the sum might not be ready when onDone is being called
- it runs out of memory

The first part can be resolved with a workaround: "int
actualRunningFutures" counter and "bool hasDoneCalledYet" flag with a
bit of boilerplate code in both onDone and onFile solves it. However
the major question is: what would be the easiest way to throttle the
creation and execution of the Future objects is this and similar
scenarios? E.g. any plan for a FuturePool that handles this?

Thanks,
Istvan

Vadim Tsushko

unread,
Jun 19, 2012, 11:31:20 PM6/19/12
to General Dart Discussion
Hi, Istvan.

I guess you shoud use Futures.wait.
I've modified your example and it now works for me: https://gist.github.com/2957978

Istvan Soos

unread,
Jun 20, 2012, 1:33:13 AM6/20/12
to Vadim Tsushko, General Dart Discussion
Hi Vadim,

Thanks for the writeup, it certainly solves the waiting for the result part.
Unfortunately, collecting the Future objects doesn't stop the creation
of them. Running your code does run out of memory the same way as not
collecting the futures.

A blocking semaphore would solve the issue, but I haven't seen
anything similar in the runtime yet. Having said that, I think I could
emulate blocking semaphore with sync filesystem calls, but is
definitely awkward and definitely not "Darty".

Istvan

Seth Ladd

unread,
Jun 20, 2012, 1:38:46 AM6/20/12
to Istvan Soos, Vadim Tsushko, General Dart Discussion
Hi Istvan,

Are you running out of memory with the code snippet you posted? I got the sense you were showing that code only as an example, but it's not actually what you are doing.

Have you considered a work queue so you can throttle the work? Perhaps not starting the next Future worker until the current Future worker is complete.

Seth

Istvan Soos

unread,
Jun 20, 2012, 1:51:31 AM6/20/12
to Seth Ladd, Vadim Tsushko, General Dart Discussion
On Tue, Jun 19, 2012 at 10:38 PM, Seth Ladd <seth...@google.com> wrote:
> Are you running out of memory with the code snippet you posted? I got the
> sense you were showing that code only as an example, but it's not actually
> what you are doing.

The length() part is bogus, but otherwise, I'd like to scan a
directory recursively, process files, wait for the individual results
and provide a summary.

> Have you considered a work queue so you can throttle the work? Perhaps not
> starting the next Future worker until the current Future worker is complete.

I'd be really happy if there would be a mechanism to do that. Is there
something already I am not aware of?

Istvan

Seth Ladd

unread,
Jun 20, 2012, 2:25:15 AM6/20/12
to Istvan Soos, Vadim Tsushko, General Dart Discussion
Hi Istvan,

You could scan the directory, adding all the work items into a list (assuming you have enough memory to store the list of filenames) and then either work on the list one item at a time (assuming you have enough memory to work on a single item) or open a few isolates and farm work out to the isolates if your algorithm allows you to run a few workers in parallel.

Note that I'm not sure if you can pass a File object to another isolate, but you could pass the filename as a string to an isolate.

As for starting work on a Future when a previous Future is done, this should be straight forward if you collect all the filenames into a List. Thanks to Future's chain method. Here's an example:

#import('dart:io');

Future workOnSingleFile(String fileName) {
  var completer = new Completer();
  print("Starting work on $fileName");
  
  // this represents some long work
  new Timer(1000, (_) {
    print("Done work on $fileName");
    completer.complete(fileName);
  });
  
  return completer.future;
}

workOnFiles(List fileNames) {
  Future work = workOnSingleFile(fileNames[0]);
  
  for (var i = 1; i < fileNames.length; i++) {
    work = work.chain((_) => workOnSingleFile(fileNames[i]));
  }
  
  work.then((_) => print("all done!"));
}

main() {
  var allFiles = [];
  var script = new File(new Options().script);
  var directory = script.directorySync();
  var lister = directory.list(recursive:true);
  lister.onFile = (fileName) => allFiles.add(fileName);
  lister.onDone = (_) => workOnFiles(allFiles);
}

Hope that helps,
Seth

Mads Ager

unread,
Jun 20, 2012, 2:26:11 AM6/20/12
to Istvan Soos, Seth Ladd, Vadim Tsushko, General Dart Discussion
There is no rate-limiting queue in the system at this point and you
will have to implement one yourself to balance how many operations you
want to have in flight at any given point in time. We could consider
adding something to the libraries for that, but nothing like that
exists at this point.

Cheers, -- Mads

William Hesse

unread,
Jun 20, 2012, 4:16:17 AM6/20/12
to Mads Ager, Istvan Soos, Seth Ladd, Vadim Tsushko, General Dart Discussion
We have implemented rate-limiting queues where we need them, for
example in the test scripts
tools/test.dart and tools/testing/dart.

One can just use a counter for the outstanding jobs, and when it
reaches a limit, put the next
job in a queue instead of starting it. When a job finishes, the
counter is decremented, and the
queue is checked for a job to start.

One often needs the same kind of counter to ensure that all jobs in a
set of jobs have completed, so
that the next phase of the program can start.

Tricky points are to make sure that the active jobs decrement the
counter whether they finish normally
or call an error handler or throw an exception. The queue can often
consist of closures, just containing the
function call to do the job.
--
William Hesse
Software Engineer
whe...@google.com

Google Denmark ApS
Frederiksborggade 20B, 1 sal
1360 København K
Denmark
CVR nr. 28 86 69 84

If you received this communication by mistake, please don't forward it
to anyone else (it may contain confidential or privileged
information), please erase all copies of it, including all
attachments, and please let the sender know it went to the wrong
person. Thanks.

Istvan Soos

unread,
Jun 20, 2012, 4:33:13 PM6/20/12
to William Hesse, Mads Ager, Seth Ladd, Vadim Tsushko, General Dart Discussion
On Wed, Jun 20, 2012 at 1:16 AM, William Hesse <whe...@google.com> wrote:
> One can just use a counter for the outstanding jobs, and when it
> reaches a limit, put the next
> job in a queue instead of starting it.  When a job finishes, the
> counter is decremented, and the
> queue is checked for a job to start.

As Seth rightfully pointed out, this solution works as long as the
queue fits in the memory. If the "producer-side" is faster, eventually
every memory-intensive processing will face memory limit problem.

What do you think about introducing a pause/continue handler?
At least the following IO callbacks come in my mind, all of them would
benefit from such approach:
- DirectoryLister.onFile
- DirectoryLister.onDir
- InputStream.onData
- StringInputStream.onData
- StringInputStream.onLine

If we know that we will trigger an async task, and we wouldn't like to
fill the memory, we are able to pause the original processing of the
stream. Once our async task(s) are finished, we just continue the
stream processing where it was left.

What do you think about it?

Istvan

Seth Ladd

unread,
Jun 20, 2012, 4:50:51 PM6/20/12
to Istvan Soos, William Hesse, Mads Ager, Vadim Tsushko, General Dart Discussion
This reminds me of the difference between two XML parsing strategies. SAX is an event based parser, and StAX is a pull based parser. In the former, you handle callbacks. In the later, you request the next event.

Istvan, please feel free to file a feature request at http://dartbug.com/new with your use case(s) and pain points.

Thanks,
Seth

Istvan Soos

unread,
Jun 21, 2012, 2:51:24 AM6/21/12
to Seth Ladd, William Hesse, Mads Ager, Vadim Tsushko, General Dart Discussion
On Wed, Jun 20, 2012 at 1:50 PM, Seth Ladd <seth...@google.com> wrote:
> This reminds me of the difference between two XML parsing strategies. SAX is
> an event based parser, and StAX is a pull based parser. In the former, you
> handle callbacks. In the later, you request the next event.

My feeling is that the Dart API is more closer to the SAX philosophy,
and the pause/continue approach fits in the API while it solves my
issues. I've filed the request:
http://code.google.com/p/dart/issues/detail?id=3807

Regards,
Istvan

Mads Ager

unread,
Jun 21, 2012, 3:11:12 AM6/21/12
to Istvan Soos, William Hesse, Seth Ladd, Vadim Tsushko, General Dart Discussion
On Wed, Jun 20, 2012 at 10:33 PM, Istvan Soos <is...@google.com> wrote:
> On Wed, Jun 20, 2012 at 1:16 AM, William Hesse <whe...@google.com> wrote:
>> One can just use a counter for the outstanding jobs, and when it
>> reaches a limit, put the next
>> job in a queue instead of starting it.  When a job finishes, the
>> counter is decremented, and the
>> queue is checked for a job to start.
>
> As Seth rightfully pointed out, this solution works as long as the
> queue fits in the memory. If the "producer-side" is faster, eventually
> every memory-intensive processing will face memory limit problem.
>
> What do you think about introducing a pause/continue handler?
> At least the following IO callbacks come in my mind, all of them would
> benefit from such approach:
> - DirectoryLister.onFile
> - DirectoryLister.onDir
> - InputStream.onData
> - StringInputStream.onData
> - StringInputStream.onLine

The streaming interfaces already have the pausing functionality in the
sense that you can set the onData and onLine handler to null and the
data will stay in the OS buffers until you read it out. When you set
back an onData or onLine handler they will resume. I'm not entirely
sure about the directory lister, but I think it will just drop
directories and files if the handlers are set to null. It might make
sense to have some way of pausing a directory lister.

Cheers, -- Mads

Sam McCall

unread,
Jun 21, 2012, 3:28:45 AM6/21/12
to Mads Ager, Istvan Soos, William Hesse, Seth Ladd, Vadim Tsushko, General Dart Discussion
In many cases you need a mechanism that actually stops the underlying process, as if a producer outpaces a consumer, buffering indefinitely will run out of memory.

Pausing the operation once the buffer is full would work, as would a pull-based API. (Personally, I find the latter much easier to understand)

Cheers,
Sam 

Mads Ager

unread,
Jun 21, 2012, 3:32:15 AM6/21/12
to Sam McCall, Istvan Soos, William Hesse, Seth Ladd, Vadim Tsushko, General Dart Discussion
Exactly. And we have that pausing for streams exactly because the API
is that you have to call read to get the data. If you do not call
read, the data will not be read out from the OS buffer and the OS will
not read more into its buffers. This is also exactly the reason why we
don't have it for directory listers: we would have to make sure to
actually pause a directory listing and not just buffer up the data for
later processing.

Cheers, -- Mads

peteroc

unread,
Jun 21, 2012, 1:11:57 PM6/21/12
to General Dart Discussion
I implemented a small actor system recently in java. Nothing original.
Every actor is a pooled thread that processes a blocking queue. The
blocking queue (provided by java concurrency lib) really does
everything for you and handles the concerns expressed above very
naturally. You just size the queue and size your number of actors so
that you do not run out of memory. In this case you would create 1 Dir
actor/isolate, 5 FileLength actors, one Distributor actor and 1 Sum
actor. Give them all a mailbox size of 100, say. Then connect the
ouput of the Dir actor to the Distributor actor which distributes
messages equally to the 5 FileLength actors which all post their
output to the Sum actor. Waiting for terminiation is handled by
sending a special End message from the dir actor which gets
distributed around the system. Thinking about it is really simple
because its like wiring a circuit or Unix pipes. (Actors and dataflow
are very close and dataflow/wiring is a really easy way to think of
these kinds of problems). If it were possible to size the queue of an
isolate such that when the size is reached the sending isolate blocks
then dart could do this. Seth mentioned a work queue above. Is there a
blocking queue available for dart?

William Hesse

unread,
Jun 22, 2012, 5:33:45 AM6/22/12
to Mads Ager, Sam McCall, Istvan Soos, Seth Ladd, Vadim Tsushko, General Dart Discussion
I put together a document listing the built-in data sources and
consumers, and how they support throttling.
Others should add data sources that they know about.
It is called "Dart data source throttling", at
https://docs.google.com/a/google.com/document/d/1mOudLn3AKYDZsIDxRbuVER-e3X1VrPdwbVdRJpNe9b4/edit
--
William Hesse

William Hesse

unread,
Jun 22, 2012, 5:35:24 AM6/22/12
to Mads Ager, Sam McCall, Istvan Soos, Seth Ladd, Vadim Tsushko, General Dart Discussion
Oops, that was an internal document only available inside Google. I
will include its content here:
Dart data source throttling support


For major Dart data/event sources and consumers, describe how they can
request throttling, and how they can be throttled.


dart:io InputStream
Provides data through onData events and .read().
RequestThrottling: no
ToThrottle: set onData handler to null.
ToUnthrottle: set onData handler to non-null.

dart:io OutputStream
Accepts data through write()
RequestThrottling: write() returns false, instead of true.
RequestEndThrottling: onNoPendingWrites event.
ToThrottle: no

dart:io DirectoryLister
Provides data by onDirectory and onFile callbacks
RequestThrottling: no
ToThrottle: no

dart:io Socket
Provides data through onData event, readList()
Accepts data through onWrite, writeList()
RequestThrottling: write() returns 0 bytes written, instead of
requested bytes written.
RequestEndThrottling: onWrite event.
ToThrottle: ignore onData event, throttles automatically
ToUnthrottle: start reading data again.

dart:isolate Timer.repeating
Provides events through callback
RequestThrottling: no

dart:isolate Spawn...
Through ports

dart:core ReceivePort
Provides data through receive() callback.
ToThrottle: no

dart:isolate SendPort
Sends data through send() or call() methods
RequestThrottling: no
Reply all
Reply to author
Forward
0 new messages