defining variables with a value from an event

72 views
Skip to first unread message

Nickolaos Kas

unread,
Aug 10, 2022, 2:29:34 PM8/10/22
to Riemann Users
Hello I have a peculiar issue in manipulating an event coming from a docker plugin before sending it for storage in graphite. The events look like the following before processing it
{:host docker01
 :service docker-proxy_traefik.okxa5m88qp7e86hb7lx2rkt51.888ri1xfqargsw6q64tk4auu2[com.docker.stack.namespace=proxy,com.docker.swarm.node.id=okx/memory.stats-mapped_file
 :state nil
 :description nil
 :metric 0.0
 :tags [collectd docker vm]
 :time 1660152450
 :ttl 60.0
 :plugin docker
 :type memory.stats
 :ds_name value
 :plugin_instance proxy_traefik.okxa5m88qp7e86hb7lx2rkt51.888ri1xfqargsw6q64tk4auu2[com.docker.stack.namespace=proxy,com.docker.swarm.node.id=okx
 :type_instance mapped_file
 :ds_type gauge
 :ds_index 0
 :true true
}

What I am trying to do is change the service to type.type_isntance.ds_name and the host from the docker host, to the container host which is provided by the :plugin_instance key. For the host I would like to get rid of the big hexes provided by docker swarm. The container name is typically following this format
<base name>.<instance number>.<swarm node id>.<swarm task id>[[swarm info]
the instance number is optional and it appears if more than one instance is requested, and the swarm node seem to appear if the instance number is not present. In the above example we have no instance number

My thought was to split the string on the dots (.) and keep the first string on the list, and potentially the second if it is less than 4 characters long (up to 999 instances). The code I was hoping to use is the following

(where (= (:plugin event) "docker")
   (def plugin_inst (str/split (:plugin_instance event) #"\."))
   (def container_name (cond-> (first plugin_inst)
                                (< 4 (count (second plugin_inst))) (str "." (second plugin_inst))
                              ))
   (smap #(assoc % :host container_name
                    :service (cond-> (str (:type %))
                                      (:type_instance %) (str "." (:type_instance %))
                                          (:ds_name %) (str "." (:ds_name %))
                                     )
          )
      (smap rewrite-service graph))
)

No point mentioning that the code crashes and burns with the error
clojure.lang.Compiler$CompilerException: java.lang.NullPointerException
which seems to be originating from my first def statement trying to define the plugin_inst variable. I am guessing this is more of a riemann issue rather than clojure given that the statement (:plugin_instance event) seems to return null.

Given that there is a key in the event called :plugin_instances, what is wrong with the above code?

Thanks in advance


Sanel Zukan

unread,
Aug 11, 2022, 3:52:21 AM8/11/22
to Nickolaos Kas, Riemann Users
Nickolaos Kas <redh...@gmail.com> writes:
> ...
> No point mentioning that the code crashes and burns with the error
> *clojure.lang.Compiler$CompilerException: java.lang.NullPointerException*
> which seems to be originating from my first *def* statement trying to
> define the plugin_inst variable. I am guessing this is more of a riemann
> issue rather than clojure given that the statement *(:plugin_instance
> event)* seems to return null.
>
> Given that there is a key in the event called :plugin_instances, what is
> wrong with the above code?

Hard to tell without a full trace. My first guess is that "where" macro,
trying to rewrite body by looking for "event" symbols, found something
unexpected.

Why not use "smap" directly? It was intended to adjust event and will
make code easier to test.

I'd approach maybe with something like this (please test it first):

(where (= (:plugin event) "docker")
(smap
(fn [ev]
;; Destructuring makes code readable. Also, some-> will check
;; that ':plugin_instance' key is present in a map or str/split can
;; throw exception.
(let [[first-name second-name] (some-> ev :plugin_instance (str/split #"\."))
container-name (when (and first-name
second-name
(< 4 (count second-name)))
(str first-name "." second-name))
;; get values for the given keys, but only those not nil
vals (remove nil? (map ev [:type :type_instance :ds_name]))
;; generate service name
service-name (str/join "." vals)]
(assoc ev
:service service-name
:host (or container-name (:host ev)))))
(smap rewrite-service graph)))

Now, because you already have "rewrite-service" called through "smap",
you can refactor initial code to a separate function.

(defn rewrite-service-host [ev]
(let [[first-name second-name]...
;; copy rest of the "(fn [ev]) body from above
)

;; initial code is now in two lines only
(where (= (:plugin event) "docker")
(smap (comp rewrite-service-host rewrite-service) graph))

Best,
Sanel

Nickolaos Kas

unread,
Aug 11, 2022, 9:37:51 AM8/11/22
to Riemann Users
Thank you for the reply. I see your point and I would look into refactoring but I am new to all this and I think I am missing something fundamental here.

I setup a very simple test-case with the following code
(logging/init {:file "/var/log/riemann/riemann.log"})
(let [host "0.0.0.0"]
  (tcp-server {:host host :port 5555})
)
(let [index (index)]
  (streams
    (default :ttl 15
      index

      (expired
        (def container_name (:host event) )
        #(info container_name %))
    )
  )
)

When I stop collectd and the event expires, I get the same null issue. Here is one of the messages. You can see from the INFO at the end that the value of container_name is actually nil.
java.lang.NullPointerException: null
    at clojure.lang.Var.invoke(Var.java:381)
    at riemann.streams$expired$stream__8801$fn__8816.invoke(streams.clj:1337)
    at riemann.streams$expired$stream__8801.invoke(streams.clj:1337)
    at riemann.streams$default$stream__9068$fn__9083.invoke(streams.clj:1417)
    at riemann.streams$default$stream__9068.invoke(streams.clj:1417)
    at riemann.core$stream_BANG_$fn__10009.invoke(core.clj:20)
    at riemann.core$stream_BANG_.invokeStatic(core.clj:19)
    at riemann.core$stream_BANG_.invoke(core.clj:15)
    at riemann.core$reaper$worker__10125$fn__10135.invoke(core.clj:307)
    at riemann.core$reaper$worker__10125.invoke(core.clj:300)
    at riemann.service.ScheduledTaskService$fn__6678.invoke(service.clj:69)
    at riemann.time.Every.run(time.clj:55)
    at riemann.time$run_tasks_BANG_$fn__5402$fn__5403.invoke(time.clj:154)
    at riemann.time$run_tasks_BANG_$fn__5402.invoke(time.clj:153)
    at riemann.time$run_tasks_BANG_.invokeStatic(time.clj:147)
    at riemann.time$run_tasks_BANG_.invoke(time.clj:142)
    at riemann.time$start_BANG_$fn__5422$fn__5423.invoke(time.clj:193)
    at clojure.lang.AFn.applyToHelper(AFn.java:152)
    at clojure.lang.AFn.applyTo(AFn.java:144)
    at clojure.core$apply.invokeStatic(core.clj:657)
    at clojure.core$with_bindings_STAR_.invokeStatic(core.clj:1965)
    at clojure.core$with_bindings_STAR_.doInvoke(core.clj:1965)
    at clojure.lang.RestFn.invoke(RestFn.java:425)
    at clojure.lang.AFn.applyToHelper(AFn.java:156)
    at clojure.lang.RestFn.applyTo(RestFn.java:132)
    at clojure.core$apply.invokeStatic(core.clj:661)
    at clojure.core$bound_fn_STAR_$fn__5471.doInvoke(core.clj:1995)
    at clojure.lang.RestFn.invoke(RestFn.java:397)
    at clojure.lang.AFn.run(AFn.java:22)
    at java.lang.Thread.run(Thread.java:750)
INFO [2022-08-11 11:26:07,024] riemann task 1 - riemann.config - nil {:host ree-du1rws3, :service protocols-Tcp/protocol_counter-RtoMin, :tags [collectd vm], :description nil, :metric 200, :state expired, :time 1660209967011/1000, :ttl 15}

if on the other hand I put the (:host event) straight in the info macro it works great. So changing the code around the expired statement to
      (expired
        #(info (:host %) %))
I get in the log
INFO [2022-08-11 15:33:11,256] riemann task 1 - riemann.config - ree-du1rws3 {:host ree-du1rws3, :service protocols-Tcp/protocol_counter-RtoMin, :tags [collectd vm], :description nil, :metric 200, :state expired, :time 332044958249/200, :ttl 15}

How would I go about debugging why something that looks a normal clojure statement (normal to me at least) of (def container_name (:host event) ) creates an error and a nil value as above. There are some other areas that I am trying to use def statements and I get similar issues. If there are any articles you would recommend to read I am all ears. Currently I am trying to learn some more clojure in case al this becomes clear

Thanks
Nick

Sanel Zukan

unread,
Aug 11, 2022, 11:53:33 AM8/11/22
to Nickolaos Kas, Riemann Users
Nickolaos Kas <redh...@gmail.com> writes:
> (expired
> (def container_name (:host event) )
> #(info container_name %))

I think the problem is above code - "expired" is a function and expects
arguments to be functions, that is going to be internally called passing
expired event to each of them. Using "def", with (presumably) nil event
at the expansion time, will create something like this:

((def foo nil) 3)

In short, don't use "def" inside expressions. In Clojrue, "def" is
reserved for, simply put, creating something global that is not going to
be changed (unless you know what you are doing).

To get more sense how "expired" works:

;;; this will log expired events
(defn log-it [event]
(info (:container_name event)))

(expired log-it)
;;;

"#(info ...)" is shorthand for anonymous function, just if we used
"log-it" or maybe this:

(expired
(fn [event]
(info (:container_name event))))

Now, inside those functions you can use "let" which will create _local_
bindings (or local variables), like:

(expired
(fn [event]
(let [my-container-name (str "my-magic-" (:container_name event) "-container")]
;; prints "my-magic-...-container"
(info my-container-name))))

Again, don't use "def" inside of it as well ;)

Best,
Sanel

Nickolaos Kas

unread,
Aug 12, 2022, 5:42:49 PM8/12/22
to Riemann Users
Thank you for the suggestions. All your examples worked great, but when I try to adjust them to what I need to do, I get an egg on my face. It seems I am missing something about how streams work.

So going back to the original case I defined a simple function to used inside an nmap to do the name substitution. I kept it to a minimum in this case since the assoc function will change more fields
(defn rewrite-docker [event]
   (let [plugin_inst (str/split (:plugin_instance event) #"\.")]
      #(assoc % :host (first plugin_inst))
   )
)

Then I have the following in the stream section of the main riemann file
(where (= (:plugin event) "docker")
  #(info "def " (:host %)  (:plugin_instance %) )
  (fn [event]
     (smap rewrite-docker
        (info "smap" (:host event) (:plugin_instance event) )
     )
  )
)

Now when I run the above code I am expecting to see the original host and plugin_instance entries of the event in the first info (with the def string), and the changed values from the second info (with the smap string) inside the smap. Unfortunately what I get is the following
INFO [2022-08-12 23:22:02,644] defaultEventExecutorGroup-2-2 - riemann.config - def  eur-docker01 proxy_traefik.okxa5m88qp7e86hb7lx2rkt51.888ri1xfqargsw6q64tk4auu2[com.docker.stack.namespace=proxy,com.docker.swarm.node.id=okx
INFO [2022-08-12 23:22:02,644] defaultEventExecutorGroup-2-2 - riemann.config - smap eur-docker01 proxy_traefik.okxa5m88qp7e86hb7lx2rkt51.888ri1xfqargsw6q64tk4auu2[com.docker.stack.namespace=proxy,com.docker.swarm.node.id=okx

The host field refuses to change. I am not sure if the issue is in the write-docker function definition, the let statement or the assoc function. The smap function looks too simple to be the source of the problem but then again I am new to riemann. Any insights into what may be wrong?

Best regards
Nick

ap...@aphyr.com

unread,
Aug 13, 2022, 12:23:10 AM8/13/22
to rieman...@googlegroups.com
Typically you'd invoke smap directly as a stream--here you're making an anonymous stream fn and having that call smap, which... I think constructs an smap stream and returns it, and the return value gets discarded? Not clear how that's supposed to work, but you might try looking at the API docs and howto examples for smap. As the Riemann home page notes, streams are functions that take events--when in doubt, look for that function and ask if it's receiving an event and doing something with it.

Likewise, this rewrite-docker fn is constructing an anonymous function, rather than calling assoc directly--probably not what you want.

--Kyle

--
You received this message because you are subscribed to the Google Groups "Riemann Users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to riemann-user...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/riemann-users/c490e62c-aedb-4d37-b502-0ee396879e21n%40googlegroups.com.

Nickolaos Kas

unread,
Aug 13, 2022, 6:30:14 AM8/13/22
to Riemann Users
Hello
I do see your point about calling smap directly, but I get a null pointer when I do that. It is really puzzling me. So if I refactor to the following

(where (= (:plugin event) "docker")
  #(info "def " (:host %)  (:plugin_instance %) )
  (smap rewrite-docker
    (info "smap" (:host event) (:plugin_instance event) )
  )
)

I get errors like the following in the log
INFO [2022-08-13 12:07:07,805] defaultEventExecutorGroup-2-1 - riemann.config - def  eur-docker01 metrics_node-exporter.okxa5m88qp7e86hb7lx2rkt51.sc5j5ji8yjq3e3t0hrogrhidi[com.docker.stack.namespace=metrics,com.docker.swarm.n
WARN [2022-08-13 12:07:07,808] defaultEventExecutorGroup-2-1 - riemann.streams -  threw
java.lang.NullPointerException: null
    at riemann.streams$smap$stream__7207$fn__7222.invoke(streams.clj:175)
    at riemann.streams$smap$stream__7207.invoke(streams.clj:175)
    at riemann.config$eval527$stream__9318__auto____649$fn__654.invoke(riemann.config:52)
    at riemann.config$eval527$stream__9318__auto____649.invoke(riemann.config:52)
    at riemann.streams$tagged_all$stream__8738$fn__8753.invoke(streams.clj:1304)
    at riemann.streams$tagged_all$stream__8738.invoke(streams.clj:1304)

    at riemann.streams$default$stream__9068$fn__9083.invoke(streams.clj:1417)
    at riemann.streams$default$stream__9068.invoke(streams.clj:1417)
    at riemann.core$stream_BANG_$fn__10009.invoke(core.clj:20)
    at riemann.core$stream_BANG_.invokeStatic(core.clj:19)
    at riemann.core$stream_BANG_.invoke(core.clj:15)
    at riemann.transport$handle.invokeStatic(transport.clj:173)
    at riemann.transport$handle.invoke(transport.clj:167)
    at riemann.transport.tcp$tcp_handler.invokeStatic(tcp.clj:109)
    at riemann.transport.tcp$tcp_handler.invoke(tcp.clj:102)
    at riemann.transport.tcp$gen_tcp_handler$fn__13525.invoke(tcp.clj:68)
    at riemann.transport.tcp.proxy$io.netty.channel.ChannelInboundHandlerAdapter$ff19274a.channelRead(Unknown Source)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
    at io.netty.channel.AbstractChannelHandlerContext.access$600(AbstractChannelHandlerContext.java:56)
    at io.netty.channel.AbstractChannelHandlerContext$7.run(AbstractChannelHandlerContext.java:365)
    at io.netty.util.concurrent.DefaultEventExecutor.run(DefaultEventExecutor.java:66)
    at io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1044)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(Thread.java:750)

Notice that the first info statement works fine, but I get this null pointer somewhere in the smap. When I put the (fn [event] ...) around it no error, but its like the function is ignored. My biggest problem with riemann is that I do not know how to debug these issues What would be a good methodology to identify where the null is?

Concerning the rewrite-docker function, I am not sure what you mean about the anonymous function, unless this is what the let function does. I actually need the let block to define the plguin_inst variable which will be used multiple times in that function once I solve this issue. This was derived from the original questions I asked in this thread. I am not sure how I could refactor the code to be better in that case

Thanks
Nick

Nickolaos Kas

unread,
Aug 13, 2022, 8:29:22 AM8/13/22
to Riemann Users
After a bit of digging, I believe I understand what you meant by anonymous function in the function write_docker. Sorry it takes me a bit, new to this. I changed my function to

(defn rewrite-docker
   [event]
      (let [plugin_inst (str/split (:plugin_instance event) #"\.")]
        (assoc event :host (first plugin_inst))
    )
)

but still no difference

Nickolaos Kas

unread,
Aug 15, 2022, 2:06:01 PM8/15/22
to Riemann Users
Digging around I managed to find a solution, but I don't fully understand it. The resulting code is the following.
Outside of the streams block I define 2 functions:
(defn rewrite-docker []
   (fn [{:keys [] :as event}]

      (let [plugin_inst (str/split (:plugin_instance event) #"\.")]
         (assoc event :host (cond-> (first plugin_inst)
                                (< (count (second plugin_inst)) 4) (str "." (second plugin_inst)))
                  :service (cond-> (str (:type event))
                                   (:type_instance event) (str "." (:type_instance event))
                                   (:ds_name event) (str "." (:ds_name event)))
         );asoc
      );let
   );fn
)

(def rewrite-service
   (rewrite-docker)
)

And inside the streams block:
(where (= (:plugin event) "docker")
   #(info "def " (:host %) (:plugin_instance %))
   (smap rewrite-service
      #(info "smap " (:host %) (:plugin_instance %)) )
)

After running the above, the event is manipulated as I wanted
INFO [2022-08-15 19:41:49,545] defaultEventExecutorGroup-2-2 - riemann.config - def  eur-docker01 metrics_cadvisor.okxa5m88qp7e86hb7lx2rkt51.ekka95ygbw7e7yvdqqjn8zim9[com.docker.stack.namespace=metrics,com.docker.swarm.node.i
INFO [2022-08-15 19:41:49,549] defaultEventExecutorGroup-2-2 - riemann.config - smap  metrics_cadvisor metrics_cadvisor.okxa5m88qp7e86hb7lx2rkt51.ekka95ygbw7e7yvdqqjn8zim9[com.docker.stack.namespace=metrics,com.docker.swarm.node.i

I have highlighted in blue what I don't fully understand.
Starting with the second (smap) line, I had the impression that inside a stream, I am supposed to call functions with this format (fn [event] ...), and this smap is not exactly formed like that but maybe it being a straming function maybe this is ok. Also if I understand well smap is supposed to call the function (rewrite-service in my example) with the event option. This is what the documentation says (https://riemann.io/api/riemann.streams.html#var-smap)
        smap
        (smap f & children)
        Streaming map. Calls children with (f event), whenever (f event) is non-nil.
But what is called is a symbol not a function, and the function it is assigned to, does not take the event argument. What would be the reason why this is working, and what is it called so that I can look at some documentation.

Concerning the first highlighted line (fn [{:keys [] :as event}] ...). I have no idea what that does, and could not find anything in the clojure hashed-map documentation to explain it. Is this not a hash-map thing and what is it called so that I can look it up.

Any explanation or redirection to an article/documentation would be greatly appreciated.

Thanks
Nick

Sanel Zukan

unread,
Aug 16, 2022, 2:41:15 PM8/16/22
to Nickolaos Kas, Riemann Users
Nickolaos Kas <redh...@gmail.com> writes:
> I have highlighted in blue what I don't fully understand.
> Starting with the second (smap) line, I had the impression that inside a
> stream, I am supposed to call functions with this format (fn [event] ...),

Inside the stream you don't call functions, they are called by Riemann
when event is flowing through them. However, you *define* functions with
(fn [event] ...).

> and this smap is not exactly formed like that but maybe it being a straming
> function maybe this is ok.

You have a couple of things going on here, which are mostly Clojure
stuff. First, you defined a function like this:

(defn rewrite-docker []
(fn [{:keys [] :as event}]
...))

which basically said: "when I call (rewrite-docker), it will *return* a
new function object, that is going to be called later.". You also used
Clojure destructuring (check that in Clojure docs [1]) and named a map as
"event". Essentially, what you've done is the same as:

(defn rewrite-docker []
(fn [event] ...))

This is why you had to use this:

(def rewrite-service
(rewrite-docker)) ;; <-- you are calling a function to return a new function

You could write above code just like this:

(def rewrite-service
(fn [event] ...))

or like:

(defn rewrite-service [event]
...)

> This is what the documentation says
> (https://riemann.io/api/riemann.streams.html#var-smap)
> smap
> (smap f & children)
> Streaming map. Calls children with (f event), whenever (f event) is
> non-nil.
> But what is called is a symbol not a function, and the function it is
> assigned to, does not take the event argument. What would be the reason why
> this is working, and what is it called so that I can look at some
> documentation.

What documentation states is exactly what I mentioned above: you define
a function like:

(defn f [event]
;; you do something
)

and Riemann "smap" will call it as (f event) at some point. It is
calling a function object, not symbol (in Clojure these things are
different), but it is always called with (f event). For example, your
"graph" function, from the beginning of this thread, will be called by
Riemann as (graph event).

I'm hoping this addresses your questions.

[1] https://clojure.org/guides/destructuring#_associative_destructuring

Best,
Sanel

Nickolaos Kas

unread,
Aug 17, 2022, 1:12:39 PM8/17/22
to Riemann Users
Thanks for clarifying this, and everyone else who helped saving my sanity. It's my newness in clojure and that got the better of me. It all makes sense now, and works as it should
Also thank you for the link to clojure destructuring. I will read up on that


Thanks
Nick
Reply all
Reply to author
Forward
0 new messages