decode-stream for tcp client

125 views
Skip to first unread message

Vladimir Lebedev

unread,
Nov 27, 2014, 11:14:07 AM11/27/14
to alep...@googlegroups.com
Hi,

I'm trying to make decode-stream work in new version of aleph (0.4.0), but without any success. Here is the code:

(ns avro-producer.core
  (:use
   [lamina.core]
   [gloss.core]
   [gloss.io :only (decode-stream)])
  (:require
   [aleph.tcp :as tcp]
   [manifold.stream :as s]
   [byte-streams :as bs]))

(def tcp-stream
  @(tcp/client {:host "bravo" :port 5353}))

(def messages
  (decode-stream
   tcp-stream
   (string :utf-8 :delimiters ["\n"])))

(defn -main
  [& args]
  (s/consume bs/print-bytes messages))

If I substitute 'messages' with 'tcp-stream' in the last line of code, I get nice printout of my incoming data. If this line left intact, my program just hangs and doesn't print anything. Here is the data which is pumped into the client, it is just JSON strings separated by a "\n" (0x0A):

7B 22 69 64 22 3A 22 34  34 37 36 36 34 33 64 2D      {"id"."4476643d-
32 30 63 32 2D 34 30 65  32 2D 37 36 30 37 2D 37      20c2-40e2-7607-7
39 63 37 30 30 65 64 61  38 66 64 22 2C 22 75 72      9c700eda8fd","ur
6C 22 3A 22 68 74 74 70  73 3A 2F 2F 77 77 77 2E      l"."https.//www.
61 76 69 74 6F 2E 72 75  2F 61 6E 61 70 61 2F 70      avito.ru/anapa/p
72 65 64 6C 6F 7A 68 65  6E 69 79 61 5F 75 73 6C      redlozheniya_usl
75 67 2F 6D 61 6E 69 6B  79 75 72 6C 61 6B 2E 5F      ug/manikyurlak._
6D 61 6E 69 6B 79 75 72  67 65 6C 5F 70 65 64 69      manikyurgel_pedi
6B 79 75 72 67 65 6C 5F  69 5F 6C 61 6B 5F 76 79      kyurgel_i_lak_vy
5F 34 36 37 34 31 37 32  31 35 22 2C 22 69 70 22      _467417215","ip"
3A 22 38 35 2E 31 37 35  2E 31 33 32 2E 35 36 22      ."85.175.132.56"
2C 22 75 61 22 3A 22 4D  6F 7A 69 6C 6C 61 2F 35      ,"ua"."Mozilla/5
2E 30 20 28 57 69 6E 64  6F 77 73 20 4E 54 20 36      .0 (Windows NT 6
2E 31 3B 20 72 76 3A 33  33 2E 30 29 20 47 65 63      .1; rv.33.0) Gec
6B 6F 2F 32 30 31 30 30  31 30 31 20 46 69 72 65      ko/20100101 Fire
66 6F 78 2F 33 33 2E 30  22 2C 22 74 73 22 3A 31      fox/33.0","ts".1
34 31 37 30 39 31 39 38  38 39 33 33 7D 0A            417091988933}.
7B 22 69 64 22 3A 22 66  65 38 33 38 30 32 63 2D      {"id"."fe83802c-
39 33 39 38 2D 34 66 34  36 2D 63 66 37 39 2D 36      9398-4f46-cf79-6
63 30 34 39 61 30 33 34  32 32 64 22 2C 22 75 72      c049a03422d","ur
6C 22 3A 22 68 74 74 70  3A 2F 2F 79 61 6E 64 65      l"."http.//yande
[.......]

The goal is to have each individual JSON string in a separate message on the 'messages' stream.

Many thanks in advance,

Vladimir

Vladimir Lebedev

unread,
Nov 27, 2014, 2:41:13 PM11/27/14
to alep...@googlegroups.com
I've managed to figure it out by myself, using the gist from Jonatan Jacob's Oct 27th post. My problem was that I did not convert original tcp stream into stream of byte-buffers. Here is solution which worked:

(ns avprod.core
  (require [aleph.tcp :as tcp]
           [gloss.core :as gloss]
           [gloss.io :as gio]
           [byte-streams :as bs]
           [manifold.stream :as s]
           [manifold.deferred :as d])
  (:import [java.nio ByteBuffer])
  (:gen-class))

(defn- to-bb
  [value]
  (bs/convert value ByteBuffer))

(def json-frame
  (gloss/string :ascii :delimiters ["\n"]))

(defn main-loop
  []
  (d/let-flow [src (tcp/client {:host "bravo"
                                :port 5353})
               dst (s/stream)]
    (s/connect-via src #(s/put! dst (to-bb %)) dst)
    (s/consume #(println "msg =>" %) (gio/decode-stream dst json-frame))))

(defn -main
  [& args]
  (main-loop))

It works well, though from time to time I get the following exceptions:

Nov 27, 2014 11:32:48 PM clojure.tools.logging$eval1920$fn__1924 invoke
SEVERE: error in consume
java.nio.charset.MalformedInputException: Input length = 1
at java.nio.charset.CoderResult.throwException(CoderResult.java:277)
at java.nio.charset.CharsetDecoder.decode(CharsetDecoder.java:798)
at gloss.data.string.codecs$string_codec$reify__6012.read_bytes(codecs.clj:60)
at gloss.data.bytes.delimited$delimited_codec$fn__5321$fn__5323.invoke(delimited.clj:188)
at gloss.data.bytes.delimited$delimited_codec$fn__5321.invoke(delimited.clj:187)
at gloss.core.protocols$compose_callback$reify__5233.read_bytes(protocols.clj:73)
at gloss.data.bytes.delimited$delimited_codec$reify__5326.read_bytes(delimited.clj:195)
at gloss.core.protocols$compose_callback$reify__5233.read_bytes(protocols.clj:71)
at gloss.core.structure$compile_frame$reify__6103.read_bytes(structure.clj:120)
at gloss.io$decode_byte_sequence.invoke(io.clj:133)
at gloss.io$decode_stream$f__6335.invoke(io.clj:151)
at manifold.stream.Callback.put(stream.clj:541)
at manifold.stream.graph$async_send.invoke(graph.clj:50)
at manifold.stream.graph$async_connect$this__2838.invoke(graph.clj:172)
at manifold.stream.graph$async_connect$this__2838.invoke(graph.clj:151)
at clojure.core$trampoline.invoke(core.clj:5801)
at manifold.stream.graph$async_connect$this__2838$f__2843.invoke(graph.clj:180)
at manifold.deferred.Listener$f__1980__auto____2181.invoke(deferred.clj:151)
at manifold.deferred.Listener.onSuccess(deferred.clj:151)
at manifold.deferred.Deferred$fn__2257$fn__2258.invoke(deferred.clj:216)
at manifold.deferred.Deferred$fn__2257.invoke(deferred.clj:306)
at manifold.deferred.Deferred.success(deferred.clj:306)
at manifold.deferred$success_BANG_.invoke(deferred.clj:175)
at manifold.stream$put_all_BANG_$this__2454__auto____2764.invoke(stream.clj:349)
at clojure.lang.AFn.applyToHelper(AFn.java:156)
at clojure.lang.AFn.applyTo(AFn.java:144)
at clojure.core$apply.invoke(core.clj:626)
at manifold.stream$put_all_BANG_$this__2454__auto____2764$fn__2769.invoke(stream.clj:349)
at manifold.deferred.Listener$f__1980__auto____2181.invoke(deferred.clj:151)
at manifold.deferred.Listener.onSuccess(deferred.clj:151)
at manifold.deferred.Deferred$fn__2257$fn__2258.invoke(deferred.clj:216)
at manifold.deferred.Deferred$fn__2257.invoke(deferred.clj:306)
at manifold.deferred.Deferred.success(deferred.clj:306)
at manifold.deferred$success_BANG_.invoke(deferred.clj:175)
at manifold.deferred$chain_.invoke(deferred.clj:593)
at manifold.deferred$chain_$fn__2384.invoke(deferred.clj:581)
at manifold.deferred.Listener$f__1980__auto____2181.invoke(deferred.clj:151)
at manifold.deferred.Listener.onSuccess(deferred.clj:151)
at manifold.deferred.Deferred$fn__2257$fn__2258.invoke(deferred.clj:216)
at manifold.deferred.Deferred$fn__2257.invoke(deferred.clj:306)
at manifold.deferred.Deferred.success(deferred.clj:306)
at manifold.deferred$success_BANG_.invoke(deferred.clj:175)
at manifold.deferred$chain_SINGLEQUOTE__.invoke(deferred.clj:555)
at manifold.deferred$chain_SINGLEQUOTE__$fn__2357.invoke(deferred.clj:543)
at manifold.deferred.Listener$f__1980__auto____2181.invoke(deferred.clj:151)
at manifold.deferred.Listener.onSuccess(deferred.clj:151)
at manifold.deferred.Deferred$fn__2262$fn__2263.invoke(deferred.clj:216)
at manifold.deferred.Deferred$fn__2262.invoke(deferred.clj:308)
at manifold.deferred.Deferred.success(deferred.clj:308)
at manifold.deferred$success_BANG_.invoke(deferred.clj:177)
at manifold.stream.core.Stream$fn__2934.invoke(core.clj:193)
at manifold.stream.core.Stream.take(core.clj:192)
at manifold.stream.core.Stream.take(core.clj:207)
at manifold.stream.graph$async_connect$this__2838.invoke(graph.clj:149)
at clojure.core$trampoline.invoke(core.clj:5801)
at manifold.stream.graph$async_connect$this__2838$fn__2839.invoke(graph.clj:153)
at manifold.deferred.Listener$f__1980__auto____2181.invoke(deferred.clj:151)
at manifold.deferred.Listener$f__1970__auto____2183.invoke(deferred.clj:151)
at clojure.lang.AFn.run(AFn.java:22)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at manifold.utils$thread_factory$reify__1943$fn__1945.invoke(utils.clj:25)
at clojure.lang.AFn.run(AFn.java:22)
at java.lang.Thread.run(Thread.java:744)

Any clues how to get rid of them?

Best regards,
Vladimir

Zach Tellman

unread,
Nov 27, 2014, 3:22:51 PM11/27/14
to alep...@googlegroups.com
Hi Vladimir,

You shouldn't have to coerce the types into byte-buffers.  Can you give me the versions of aleph, gloss, and byte-streams that you're using?  Using 'lein deps :tree' will give you the version if they're transitively defined.

Zach

--
You received this message because you are subscribed to the Google Groups "Aleph" group.
To unsubscribe from this group and stop receiving emails from it, send an email to aleph-lib+...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Vladimir Lebedev

unread,
Nov 27, 2014, 3:58:14 PM11/27/14
to alep...@googlegroups.com
$ lein deps :tree
Possibly confusing dependencies found:
[manifold "0.1.0-beta3"]
 overrides
[aleph "0.4.0-alpha9"] -> [manifold "0.1.0-beta5"]

Consider using these exclusions:
[aleph "0.4.0-alpha9" :exclusions [manifold]]

[byte-streams "0.2.0-alpha3"]
 overrides
[aleph "0.4.0-alpha9"] -> [byte-streams "0.2.0-alpha4"]

Consider using these exclusions:
[aleph "0.4.0-alpha9" :exclusions [byte-streams]]

warn
 [aleph "0.4.0-alpha9"]
   [io.aleph/dirigiste "0.1.0-alpha3"]
   [io.netty/netty-all "4.0.23.Final"]
   [org.clojure/tools.logging "0.3.1"]
   [potemkin "0.3.11"]
 [byte-streams "0.2.0-alpha3"]
   [clj-tuple "0.1.7"]
   [primitive-math "0.1.4"]
 [clojure-complete "0.2.3" :scope "test" :exclusions [[org.clojure/clojure]]]
 [gloss "0.2.3"]
 [manifold "0.1.0-beta3"]
   [riddley "0.1.7"]
 [org.clojure/clojure "1.6.0"]
 [org.clojure/tools.nrepl "0.2.6" :scope "test" :exclusions [[org.clojure/clojure]]]

Vladimir Lebedev

unread,
Nov 27, 2014, 4:08:33 PM11/27/14
to alep...@googlegroups.com
Zach,

I fixed troubles with dependencies and modified main-loop as follows:

(defn main-loop
  []
  (d/let-flow [src (tcp/client {:host "bravo"
                                :port 5353})
               dst (s/stream)]
              (s/consume #(println "msg =>" %)
                         (gio/decode-stream src json-frame))))

It works now as expected, though I still get the same exception randomly:

java.nio.charset.MalformedInputException: Input length = 1
at java.nio.charset.CoderResult.throwException(CoderResult.java:277)
at gloss.data.string.codecs$take_string_from_buf_seq.invoke(codecs.clj:48)
at gloss.data.string.codecs$string_codec$reify__6010.read_bytes(codecs.clj:61)
at gloss.data.bytes.delimited$delimited_codec$fn__5319$fn__5321.invoke(delimited.clj:188)
at gloss.data.bytes.delimited$delimited_codec$fn__5319.invoke(delimited.clj:187)
at gloss.core.protocols$compose_callback$reify__5231.read_bytes(protocols.clj:73)
at gloss.core.protocols$compose_callback$reify__5231.read_bytes(protocols.clj:71)
at gloss.io$decode_byte_sequence.invoke(io.clj:133)
at gloss.io$decode_stream$f__6333.invoke(io.clj:151)
at manifold.stream.Callback.put(stream.clj:541)
[.......]

Vladimir

On Thursday, November 27, 2014 7:14:07 PM UTC+3, Vladimir Lebedev wrote:

Zach Tellman

unread,
Nov 27, 2014, 4:11:44 PM11/27/14
to alep...@googlegroups.com

I know JSON is UTF8, but maybe there's a malformed encoding? It might be worthwhile to decode everything as ASCII and see what the output looks like.

--

Vladimir Lebedev

unread,
Nov 27, 2014, 4:19:17 PM11/27/14
to alep...@googlegroups.com
Actually, I already use :ascii, here is definition of json-frame:

(def json-frame
  (gloss/string :ascii :delimiters ["\n"]))

If I change :ascii to :utf, right at the start I get the following exception:

ov 28, 2014 1:14:30 AM clojure.tools.logging$eval1920$fn__1924 invoke
SEVERE: error in consume
java.lang.IllegalArgumentException: Don't know how to create ISeq from: io.netty.buffer.SimpleLeakAwareByteBuf
at clojure.lang.RT.seqFrom(RT.java:505)
at clojure.lang.RT.seq(RT.java:486)
at clojure.core$seq.invoke(core.clj:133)
at clojure.core$empty_QMARK_.invoke(core.clj:5706)
at gloss.data.bytes.core$create_buf_seq.invoke(core.clj:250)
at gloss.core.formats$to_buf_seq.invoke(formats.clj:30)
at gloss.io$decode_stream$f__6333.invoke(io.clj:148)
[.......]

The stream itself contains utf-8 (cyrillic) symbols though quite rarely, so I cannot use :ascii decoder. Could you please see what is the problem with :utf-8 decoder. Also, is there any way to catch the exceptions in s/consume?

Zach Tellman

unread,
Nov 28, 2014, 3:02:44 PM11/28/14
to alep...@googlegroups.com
The issue is that Gloss wasn't using the byte-streams library to allow conversion from Netty's byte representation to something it could use.  I thought I had already made these changes, sorry for the confusion.  If you target Gloss 0.2.4-SNAPSHOT, it should work, or at least show different behavior.

Zach

Vladimir Lebedev

unread,
Nov 28, 2014, 3:49:31 PM11/28/14
to alep...@googlegroups.com
Zach,

My current dependencies are:

                [[org.clojure/clojure "1.6.0"]
                 [aleph "0.4.0-alpha9"]
                 [manifold "0.1.0-beta5"]
                 [byte-streams "0.2.0-alpha7"]
                 [gloss "0.2.4-SNAPSHOT"]]

With :utf-8 decoder I get exception right at the start:

Nov 29, 2014 12:45:29 AM clojure.tools.logging$eval1920$fn__1924 invoke
SEVERE: error in consume
java.lang.NullPointerException
at byte_streams$convert.invoke(byte_streams.clj:186)
at byte_streams$convert.invoke(byte_streams.clj:164)
at gloss.core.formats$to_buf_seq.invoke(formats.clj:28)
at gloss.data.bytes.delimited$delimited_bytes_codec$reify__5316.read_bytes(delimited.clj:157)
at gloss.core.protocols$compose_callback$reify__5233.read_bytes(protocols.clj:71)
at gloss.data.bytes.delimited$delimited_codec$reify__5326.read_bytes(delimited.clj:195)
at gloss.core.protocols$compose_callback$reify__5233.read_bytes(protocols.clj:71)
at gloss.core.structure$compile_frame$reify__6103.read_bytes(structure.clj:120)
at gloss.io$decode_byte_sequence.invoke(io.clj:133)

With :ascii decoder nothing changed. 

Vladimir

ztellman

unread,
Dec 13, 2014, 2:33:24 PM12/13/14
to alep...@googlegroups.com
I've pushed 0.2.4, which should fix your issue.  Please let me know if it doesn't.

Vladimir Lebedev

unread,
Dec 15, 2014, 7:19:23 AM12/15/14
to alep...@googlegroups.com
Zach,

Yes, it worked! Many thanks for help!

Vladimir
Reply all
Reply to author
Forward
0 new messages