blocking, pull style input streams

32 views
Skip to first unread message

John Vasileff

unread,
Aug 1, 2016, 12:04:27 AM8/1/16
to ceylon...@googlegroups.com
Building on recent discussions involving memory efficient lazy streams, and in an attempt to organize and share some thoughts, the following contemplates a scenario in which:

1) you have a large 2GB file (or other resource) to process
2) you'd like to leverage Ceylon's rich and growing support for working with lazy streams
3) the work may involve things like XML parsing, performing database updates, streaming transformation to an output stream, etc.

The idea is that we need a streaming solution. Working with such large amount of input data in memory is impractical.

So, considering XML parsing, a function like:

    [XmlEvent*] parseXml(Byte|Finished input());

simply won't work. The 2GB XML document is too large to fit in memory, and for our imaginary application, we don’t need it to.

Instead, a workable signature using both lazy input and lazy output streams is:

    {XmlEvent*} parseXml({Byte*} input);

Further processing can easily be performed on the returned XmlEvent stream. (Note that this function imposes no restrictions on the returned stream: absent limitations of the provided input (hint: see below!), the returned {XmlEvent*} may be iterated as many times as desired.)

That leaves us with the final task of turning the 2GB file resource into an {Byte*}. Ideally, we'd be able to use something like:

    {Byte*} fileToByteStream(File file);

Every call to iterator() would open a new file input stream which would be returned in the form of {Byte*}. But, unfortunately, this would lead to file handle leaks since there would be no way to close abandoned/unfinished iterators.

Instead, the best solution may be to use a class that staisfies both Destroyable and {Byte*}, and permits at most one iterator from being created:

    class FileByteIterable(File f) satisfies {Byte*} & Destroyable {}

Instances of this class can safely be created in try blocks, ensuring that their associated Iterators are properly closed.


The following is a working demo of these ideas, but instead of processing XML, we’ll simply count uppercase characters:

shared void run() {
    value tempFile = temporaryDirectory.TemporaryFile(null, null);

    // create a 2GB+ file
    try (writer = tempFile.Overwriter()) {
        for (i in 0:40M) {
            writer.writeLine {
                "Lorem ipsum dolor sit amet, \
                 consectetur adipiscing elit";
            };
        }
    }

    // count uppercase characters using a stream
    try (bytes = FileByteIterable(tempFile)) {
        print {
            // Don't use collect here, or OOME!
            // Better would be a lazy utf8.decoder
            bytes.map((b) => b.unsigned.character)
                 .count(Character.uppercase);
        }; 
    }
    // prints 40000000
}

Note how convenient the file processing is, using familiar constructs! And, even with a much smaller 15MB input file, changing “map” to “collect” above causes the program to run *significantly* slower (multiple seconds vs. sub-second).

The FileByteIterable class is written in two parts: the class itself, which deals specifically with Files, and a reusable functionIterable() function, which can turn any function into a single-shot iterable:

class FileByteIterable(File f, Integer bufferSize=1k)
        satisfies {Byte*} & Destroyable {
    assert (bufferSize >= 1);
    value reader = f.Reader();
    destroy = reader.destroy;
    iterator = expand {
        functionIterable {
            () => let (bytes = reader.readBytes(bufferSize))
                  if (bytes.empty)
                  then finished
                  else bytes;
        };
    }.iterator;
}

{Element*} functionIterable<Element>
        (Element|Finished generate()) => object
        satisfies {Element*} {
    variable value used = false;
    shared actual Iterator<Element> iterator() {
        "Only one iterator may be obtained from \
         each Iterable created by functionIterable()."
        assert (!used);
        used = true;
        return object satisfies Iterator<Element> {
            next() => generate();
        };
    }
};

The next step of course would be to add additional classes for dealing with other types of resources.

Note that this approximates two open issues that go a bit further, with the idea that resource Iterables wouldn’t need the single-iterator restriction:


Of course, another option would be to allow multiple Iterators *until* theIterable.destroy() is called, at which time all file handles would be destroyed and no further Iterators would be allowed. That may provide little practical benefit though, since only one Iterator should ever be necessary!

John

Stephane Epardaud

unread,
Aug 1, 2016, 5:40:27 AM8/1/16
to ceylon...@googlegroups.com
This looks cool, but incompatible with the non-blocking style that lots of people count on nowadays. It's rare to find APIs that actually work in both blocking and non-blocking modes, rather than two different incompatible APIs, but I still think it's a desirable property that an API could support both styles. 

--
You received this message because you are subscribed to the Google Groups "ceylon-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to ceylon-users...@googlegroups.com.
To post to this group, send email to ceylon...@googlegroups.com.
Visit this group at https://groups.google.com/group/ceylon-users.
To view this discussion on the web visit https://groups.google.com/d/msgid/ceylon-users/E2D5D23C-1F18-4F37-B248-94489C0B11A5%40vasileff.com.
For more options, visit https://groups.google.com/d/optout.



--
Stéphane Épardaud

Tako Schotanus

unread,
Aug 1, 2016, 6:03:28 AM8/1/16
to ceylon-users

On Mon, Aug 1, 2016 at 11:40 AM, Stephane Epardaud <stephane...@gmail.com> wrote:
but I still think it's a desirable property that an API could support both styles. 

that might be desirable but I think that John's suggestions is desirable too, afer all Iterable is a very big cornerstone of the Ceylon language. So much example code shows the use of Iterables and Sequences etc that people naturally are going to look for solutions to their problems using those. Given that I think not having something like this where you can use all the nice features of Iterable for any kind of data stream imaginable seems like an oversight.

-Tako

Stephane Epardaud

unread,
Aug 1, 2016, 6:19:46 AM8/1/16
to ceylon...@googlegroups.com
Sure, I'm not saying it's not useful. I don't even know if it's reconciliable with non-blocking style. I'm just saying lots of people want non-blocking and so this would not help them, and we should at least try to see if we can support both. If not, fine this is already useful by itself.

--
You received this message because you are subscribed to the Google Groups "ceylon-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to ceylon-users...@googlegroups.com.
To post to this group, send email to ceylon...@googlegroups.com.
Visit this group at https://groups.google.com/group/ceylon-users.

For more options, visit https://groups.google.com/d/optout.



--
Stéphane Épardaud

Tako Schotanus

unread,
Aug 1, 2016, 6:30:34 AM8/1/16
to ceylon-users
Oh I agree non-blocking is useful too and an API should definitely exist for that.
But given the discussions we've had these past days I just don't want this side-tracked and buried again ;)
If most of us can agree that this is useful we should just add them to the language module and SDK


-Tako

John Vasileff

unread,
Aug 1, 2016, 4:10:34 PM8/1/16
to ceylon...@googlegroups.com
Thanks, yes, this is only about blocking input streams, which is why I put that in the title.

I think it would be up to the XML parser implementor if a non-blocking variation of the parser should be made available (I’m guessing the version of the parser suggested here would be backed by code used for the non-blocking version.)

John

John Vasileff

unread,
Aug 1, 2016, 4:11:55 PM8/1/16
to ceylon...@googlegroups.com

On Aug 1, 2016, at 12:04 AM, John Vasileff <jo...@vasileff.com> wrote:

Of course, another option would be to allow multiple Iterators *until* theIterable.destroy() is called, at which time all file handles would be destroyed and no further Iterators would be allowed. That may provide little practical benefit though, since only one Iterator should ever be necessary!

Below is a variation of the file-byte-iterable that does exactly that.

I believe this answers all previous concerns about single-shot iterables, because it isn’t one. You of course can’t use the Iterable after destroy() is called, but that seems like a very fair restriction.

class FileByteIterable2(File f, Integer bufferSize=1k)
        satisfies {Byte*} & Destroyable {

    assert (bufferSize >= 1);

    variable {Destroyable*} iterators = [];
    variable Boolean iterableDestroyed = false;

    shared actual void destroy(Throwable? error) {
        for (it in iterators) {
            try { it.destroy(error); }
            catch (e) { /* TODO? */ }
        }
        iterableDestroyed = true;
    }

    shared actual Iterator<Byte> iterator() {
        assert (!iterableDestroyed);
        value iterator = object
                satisfies Destroyable & Iterator<Byte> {
            variable Byte[] bytes = [];
            variable Boolean readerDestroyed = false;
            variable value nextIndex = 0;

            value reader = f.Reader();

            shared actual void destroy(Throwable? error) {
                if (!readerDestroyed) {
                    reader.destroy(error);
                    readerDestroyed = true;
                }
            }

            shared actual Byte | Finished next() {
                assert (!iterableDestroyed);
                if (readerDestroyed) {
                    return finished;
                }
                if (nextIndex >= bytes.size) {
                    bytes = reader.readBytes(bufferSize);
                    nextIndex = 0;
                }
                if (nextIndex < bytes.size) {
                    assert (exists byte = bytes.get(nextIndex++));
                    return byte;
                }
                else {
                    destroy(null);
                    bytes = [];
                    return finished;
                }
            }
        };
        iterators = iterators.follow(iterator);
        return iterator;
    }
}


John

Tako Schotanus

unread,
Aug 1, 2016, 5:21:08 PM8/1/16
to ceylon-users
In general it seems the code has parts that could be re-used between Iterables like this dealing with Destroyable objects, right?

I mean, once we have a `FileByteIterable` I also want a `FileLineIterable`and then `Socket`s ;)


-Tako

--
You received this message because you are subscribed to the Google Groups "ceylon-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to ceylon-users...@googlegroups.com.
To post to this group, send email to ceylon...@googlegroups.com.
Visit this group at https://groups.google.com/group/ceylon-users.

John Vasileff

unread,
Aug 1, 2016, 5:57:31 PM8/1/16
to ceylon...@googlegroups.com

> On Aug 1, 2016, at 5:20 PM, Tako Schotanus <ta...@codejive.org> wrote:
>
> In general it seems the code has parts that could be re-used between Iterables like this dealing with Destroyable objects, right?
>
> I mean, once we have a `FileByteIterable` I also want a `FileLineIterable`and then `Socket`s ;)
>

Yes, absolutely!

Although, it might also be nice to have reusable components to create the more specialized iterables, like:

{Byte*} byteIterable = …; // any sort of resource
value lineIterable = byteResource
.transform(utf8Decoder)
.transform(characterToLineConverter);

with lazy Converters:

shared alias Converter<Out, In> => {Out*}({In*});

{Character*} utf8Decoder({Byte*} bytes) => ...;
{String*} characterToLineConverter({Character*} characters) => ...;

and a new Iterable.transform method:

shared {Out*} transform<Out>(Converter<Out, Element> convert)
=> convert(this);

John
Reply all
Reply to author
Forward
0 new messages