On Mon, Nov 7, 2011 at 2:31 PM, Boris Scheiman <bsch...@gmail.com> wrote:
> Main issue is that my data is being padded in 4k blocks in random ways: I
> keep a constant 6144b block, but sometimes Kayak pads it up to 12k. I
> noticed 4096b increments... but I have no idea what I'm doing wrong.
> public class BufferedConsumer : IDataConsumer {
> protected readonly Action<Exception> ErrorCallback;
> protected readonly Action<string, byte[]> ResultCallback;
> public List<byte> ByteArray { get; set; }
> public string File { get; set; }
>
> public BufferedConsumer(string file, Action<string, byte[]> resultCallback, Action<Exception> errorCallback) {
> File = file;
> ResultCallback = resultCallback;
> ErrorCallback = errorCallback;
>
> ByteArray = new List<byte>();
> }
>
> private static byte[] Slice(byte[] source, int length) {
> var destfoo = new byte[length];
>
> Array.Copy(source, 0, destfoo, 0, length);
> return destfoo;
> }
>
> #region IDataConsumer Members
> public bool OnData(ArraySegment<byte> data, Action continuation) {
> MessageBox.Show(data.Array.Length + "b");
>
> ByteArray.AddRange(data.Array);
>
> return false;
> }
>
> public void OnEnd() {
> var bytes = Slice(ByteArray.ToArray(), 6144);
> MessageBox.Show(bytes.Length.ToString());
>
> ResultCallback(File, bytes);
> }
>
> public void OnError(Exception error) {
> ErrorCallback(error);
> }
> #endregion
> }
>
Not sure if this will fully solve your problem, but this jumps out at me:
Not sure I've explicitly stated this anywhere, but the buffers that
Kayak passes to OnData are considered internal to the implementation
and only guaranteed to be valid for the duration of the callback. The
proper way to build up the data is to copy it out of the buffer Kayak
hands you and into your own buffer.
Hm, I was about to link you to KayakExamples/Program.cs, but it's
actually incorrect in this regard as well! Will fix.
On Mon, Nov 7, 2011 at 4:20 PM, Boris Scheiman <bsch...@gmail.com> wrote:
> Hrm. Would you happen to have an example w/ POST? Ie, posting JSON or
> something?
Here's one way to buffer stuff up (DataConsumer is an implementation
of IDataConsumer). You could use GetString to feed the result to a
JSON parser (although ideally you're using a streaming parser and
feeding it data incrementally in OnNext)
static class Input
{
public static void Read(this IDataProducer producer,
Action<Buffer> result, Action<Exception> fault)
{
var buffer = new Buffer();
producer.Subscribe(new DataConsumer(
(data, ack) => { buffer.Add(data); return false; },
fault,
() => result(buffer)));
}
}
public class Buffer : IEnumerable<ArraySegment<byte>>
{
List<ArraySegment<byte>> buffer = new List<ArraySegment<byte>>();
public string GetString()
{
return GetString(Encoding.UTF8);
}
public string GetString(Encoding encoding)
{
return buffer.Aggregate("", (acc, next) => acc +
encoding.GetString(next.Array, next.Offset, next.Count));
}
public int GetCount()
{
return buffer.Aggregate(0, (c, d) => c + d.Count);
}
public void Add(ArraySegment<byte> d)
{
byte[] b = new byte[d.Count];
System.Buffer.BlockCopy(d.Array, d.Offset, b, 0, d.Count);
buffer.Add(new ArraySegment<byte>(b));
}
public void Add(IEnumerable<ArraySegment<byte>> data)
{
foreach (var d in data)
Add(d);
}
public IEnumerator<ArraySegment<byte>> GetEnumerator()
{
return buffer.GetEnumerator();
}
System.Collections.IEnumerator
System.Collections.IEnumerable.GetEnumerator()
{
return buffer.GetEnumerator();
}
}