[Kayak] Sending large files

257 views
Skip to first unread message

JohnACarruthers

unread,
May 5, 2012, 2:39:31 AM5/5/12
to Kayak HTTP
Hi folks,

I was wondering how I would go about sending a large file in an HTTP
response? If the given file is 2GB, for example, loading it entirely
into a buffer before sending it is not going to be the ideal solution.

I've been looking at the example code and it only gives an example on
how to send a buffered response. Is there any example code floating
around for sending larger responses?

Benjamin van der Veen

unread,
May 5, 2012, 2:41:53 PM5/5/12
to kayak...@googlegroups.com
By all rights, this functionality should exist within Kayak. So you know, patches welcome. :)

At a high level, you'll need to implement IDataProducer (defined in https://github.com/kayak/kayak/blob/master/Kayak/Net/Net.cs) and pass it to the IHttpResponseDelegate (https://github.com/kayak/kayak/blob/master/Kayak/Http/Http.cs).

On connect, open the file and call BeginRead on it. When the read completes, call the consumer's OnData method and call BeginRead again. In the cancellation/disposable returned from connect, cancel any pending reads and close the file. If an error occurs, call the consumer's OnError. When you reach EOF (read 0 bytes), call the consumer's OnEnd.

You may find it useful to reference the socket implementation for the details of handling the synchronous, asynchronous, and exception cases:

https://github.com/kayak/kayak/blob/master/Kayak/Net/Socket/KayakSocket.cs

sir.kny...@gmail.com

unread,
May 5, 2012, 5:57:40 PM5/5/12
to kayak...@googlegroups.com
This is about as far as I had gotten on my own and I've been tinkering with multiple OnData calls using strings (I used the example BufferedProducer and duplicated the ArraySegment so if the code worked the string should appear in my browser twice). I must be missing something critical though; no matter what I try consecutive calls to OnData don't appear to work, either calling OnData immediately after or adding another OnData call to a delegate passed as the continuation parameter.

My hunch is the secret lies in the IDisposable returned from the Connect method. I have no idea what the Connect method is supposed to return, can't find any documentation or reference to it yet either. Care to clue me in?

Benjamin van der Veen

unread,
May 5, 2012, 7:01:36 PM5/5/12
to kayak...@googlegroups.com
On Sat, May 5, 2012 at 2:57 PM, <sir.kny...@gmail.com> wrote:
I have no idea what the Connect method is supposed to return, can't find any documentation or reference to it yet either. Care to clue me in?

IDataProducer/IDataConsumer are analogous to IObservable/IObserver, with the modification to the signature of OnNext/OnData (adding the bool return value and continuation, so that a 'slower' consumer can 'throttle back' a 'faster' producer). The return value of Connect is 'disconnect', just like with IObservable. When a consumer no longer wants to receive values from a producer, it should disconnect. In disconnect, a producer free any external resources owned by the producer (open file handles, sockets, etc) and release the consumer. IEnumerable<T>.Dispose is also analogous.

sir.kny...@gmail.com

unread,
May 5, 2012, 7:41:15 PM5/5/12
to kayak...@googlegroups.com
This is a very rough trial of getting this to work with strings:

    class BufferedProducer : IDataProducer
    {
        ArraySegment<byte> data;

        public BufferedProducer(string data) : this(data, Encoding.UTF8) { }
        public BufferedProducer(string data, Encoding encoding) : this(encoding.GetBytes(data)) { }
        public BufferedProducer(byte[] data) : this(new ArraySegment<byte>(data)) { }
        public BufferedProducer(ArraySegment<byte> data)
        {
            this.data = data;
        }

        public IDisposable Connect(IDataConsumer channel)
        {
            // null continuation, consumer must swallow the data immediately.
            //channel.OnData(data, null);
            //channel.OnData(new ArraySegment<byte>(Encoding.UTF8.GetBytes("footer")), null);
            //channel.OnEnd();
            //return null;
            return new BufferedDisconnector(channel, data);
        }
    }

    class BufferedDisconnector : IDisposable
    {
        ArraySegment<byte> data;
        IDataConsumer consumer;

        public BufferedDisconnector(IDataConsumer consumer, ArraySegment<byte> data)
        {
            this.data = data;
            this.consumer = consumer;

            var watch = consumer.OnData(data, SendMore);
        }

        public void SendMore()
        {
            consumer.OnData(new ArraySegment<byte>(Encoding.UTF8.GetBytes("footer")), null);
            consumer.OnEnd();
        }

        public void Dispose()
        {
            
        }
    }

My observation is that consumer.OnData always returns false and the continuation method (SendMore) is never called. Am I still missing a piece of the puzzle?

Benjamin van der Veen

unread,
May 5, 2012, 7:54:53 PM5/5/12
to kayak...@googlegroups.com
On Sat, May 5, 2012 at 4:41 PM, <sir.kny...@gmail.com> wrote:
My observation is that consumer.OnData always returns false and the continuation method (SendMore) is never called. Am I still missing a piece of the puzzle?

Yep, sounds like you're missing the semantics of the bool/continuation. Not surprising—I really need to formally document this somewhere. For now, read the OWIN spec for a detailed, if dry, explanation. http://owin.org/spec/owin-1.0.0draft5.html#BodyDelegate

The bool value has the meaning 'consumer will invoke continuation'.

A return value of true is a signal from the consumer to the producer ('I will invoke the continuation') that the producer should 'back off' and not call OnData for a time—specifically, until the consumer invokes the continuation. A return value of false is a signal that the consumer is immediately ready for more data from the producer.

Producers may or may not provide a continuation. If no continuation is provided, the consumer must return false and be prepared to buffer the data coming from the producer.

This mechanism is to prevent a fast producer, like a disk, from overwhelming a slow consumer, like a network connection to a mobile device. If this mechanism wasn't in place, data would be read from disk as fast as possible and buffered in memory while the connected mobile device slowly drained the memory buffer. Obviously, for large files this is unacceptable.


sir.kny...@gmail.com

unread,
May 5, 2012, 8:58:59 PM5/5/12
to kayak...@googlegroups.com
Ah, so here is what it should look like I think. I've tested it with some basic MemoryStreams and it appears to work; I'll post it here incase somebody else is looking for a similar solution:

class StreamedProducer : IDataProducer
    {
        Stream _stream;
        IDataConsumer consumer;

        public StreamedProducer(Stream stream)
        {
            _stream = stream;
        }

        public IDisposable Connect(IDataConsumer channel)
        {
            consumer = channel;
            Send();
            return null;
        }

        void Send()
        {
            byte[] buffer = new byte[1024];
            _stream.BeginRead(buffer, 0, buffer.Length, new AsyncCallback(x => {
                int read = _stream.EndRead(x);
                var waitForContinuation = consumer.OnData(new ArraySegment<byte>(buffer, 0, read), Send);
                if (_stream.Position >= _stream.Length)
                {
                    _stream.Close();
                    consumer.OnEnd();
                }
                else if (!waitForContinuation)
                {
                    Send();
                }
            }), null);
        }
    }

    class StreamedDisconnector : IDisposable
    {
        Stream _stream;

        public StreamedDisconnector(Stream stream)
        {
            _stream = stream;
        }

        public void Dispose()
        {
            _stream.Close();

sir.kny...@gmail.com

unread,
May 5, 2012, 9:00:12 PM5/5/12
to kayak...@googlegroups.com
Opps, "return null;" should be "return new StreamedDisconnector(_stream);"

Benjamin van der Veen

unread,
May 5, 2012, 9:28:00 PM5/5/12
to kayak...@googlegroups.com
On Sat, May 5, 2012 at 6:00 PM, <sir.kny...@gmail.com> wrote:
Opps, "return null;" should be "return new StreamedDisconnector(_stream);"


On Saturday, May 5, 2012 7:58:59 PM UTC-5, sir.kny...@gmail.com wrote:
Ah, so here is what it should look like I think. I've tested it with some basic MemoryStreams and it appears to work; I'll post it here incase somebody else is looking for a similar solution:

Looks pretty good! Perhaps you can make a gist or project on Github including some tests?

RichB

unread,
Aug 18, 2012, 7:35:09 AM8/18/12
to kayak...@googlegroups.com, b...@bvanderveen.com

Hi,

Many thanks for this! I've adapted it slightly so that it works on a FileStream. You pass in the name of the file that you want to serve to the client and it establishes a stream to the file and serves it out.  It woulnd't be very difficult to modify it slightly more so that it will support HTTP/1.1 chunked encoding.

It's very easy to modify the examples provided to serve a file rather than a string. You will need to ensure that you send the Content-Length header though, otherwise your browser will sit there forever waiting for more data to arrive - implementing chunked encoding would solve this.

Call the FileProducer (below) using httpResponse.OnResponse(responseHeaders, new FileProducer(fileName));



class FileProducer : IDataProducer
    {

        // Members
        private string m_fileName;
        private FileStream m_fileStream;
        private IDataConsumer m_consumer;
       
        // Constructor
        public FileProducer(string fileName)
        {

            // The stream that we'll serve up
            m_fileStream = null;

            // Check that the file that we are going to serve actually exists
            if (File.Exists(fileName) == false)
            {
                throw new Exception("File does not exist");
            }
            else
            {
                m_fileName = fileName;
            }

        }


        // Returns either a continuation or NULL if no continuation
        public IDisposable Connect(IDataConsumer channel)
        {

            // Store a handle to the data consumer
            m_consumer = channel;

            // Open the file that we want to send
            m_fileStream = File.OpenRead(m_fileName);

            // Send data
            Send();

            // Return a handle to a disconnector
            return new FileDisconnector(m_fileStream);

        }


        private void Send()
        {

            // Buffer to hold bytes read from file

            byte[] buffer = new byte[1024];

            // If the file is closed
            if (m_fileStream.CanRead == false)
            {
                return;
            }

            // Set up an async callback to write data to the consumer
            m_fileStream.BeginRead(buffer, 0, buffer.Length, new AsyncCallback(x =>
                {

                    // Wait for a read on the file to complete
                    int bytesRead = m_fileStream.EndRead(x);

                    // Will the consumer invoke continuation?
                    bool waitOnClient = m_consumer.OnData(new ArraySegment<byte>(buffer, 0, bytesRead), Send);

                    // If we've hit the end of the stream
                    if (m_fileStream.Position >= m_fileStream.Length)
                    {
                        m_fileStream.Close();
                        m_consumer.OnEnd();
                        return;
                    }

                    // If we aren't waiting on the client to request more data
                    else if (waitOnClient == false)
                    {
                        Send();
                    }

                }), null);

        }

    }


    class FileDisconnector : IDisposable
    {

        // Members
        private FileStream m_fileStream;

        // Constructor
        public FileDisconnector(FileStream fileStream)
        {
            m_fileStream = fileStream;
        }

        public void Dispose()
        {
            m_fileStream.Close();
        }

    }

Benjamin van der Veen

unread,
Aug 18, 2012, 9:12:46 PM8/18/12
to RichB, kayak...@googlegroups.com
On Sat, Aug 18, 2012 at 4:35 AM, RichB <ric...@gmail.com> wrote:
> class FileProducer : IDataProducer

This is a spot-on implementation of a producer! Thanks for posting
that here! Google, please index this—I'm sure there are people
searching for it. ;)

Would love to see some tests and a pull request on
https://github.com/kayak/kayak. ;)

Many thanks!

And in the name of SEO,

Gist: https://gist.github.com/3390660
Tweet: https://twitter.com/kayakhttp/status/236992196107198464

RichB

unread,
Aug 20, 2012, 4:23:03 PM8/20/12
to kayak...@googlegroups.com


On Sunday, August 19, 2012 2:12:46 AM UTC+1, Benjamin van der Veen wrote:
 
This is a spot-on implementation of a producer! Thanks for posting
that here! Google, please index this—I'm sure there are people
searching for it. ;)

Thanks!  To be perfectly honest I threw it together to see if it would work.  It seemed to work so I thought I'd post it here.

Having played with it for a few days I'm encountering random exceptions here and there, though I can't work out if it's Kayak or my producer that is causing the problem. I get unhandled exception being thrown in OutputBuffer.cs - at the line "Size -= howmuch;"  and ObjectDisposedException in BeginWrite() (KayakSocketState.cs).  I saw the other thread which mentioned the Disposed exception, though in my case this is causing a problem and the server bails halfway through serving a file.  It's all very random though - sometimes everything works perfectly and other times it all blows up.

To be honest, I don't understand enough about how the async magic of Kayak works in order to debug it properly.  If anybody else has any ideas about how to fix it I'd love to hear them.


Benjamin van der Veen

unread,
Aug 20, 2012, 5:27:02 PM8/20/12
to kayak...@googlegroups.com
On Mon, Aug 20, 2012 at 1:23 PM, RichB <ric...@gmail.com> wrote:
> It's all very random though - sometimes everything works perfectly and other
> times it all blows up.

Are you compiling yourself using the latest bits from GitHub? Some
things are addressed there which caused problems in the NuGet version.
Reply all
Reply to author
Forward
0 new messages