Problems with riemann.index/lookup

233 views
Skip to first unread message

Lorenz Leenen

unread,
Mar 6, 2015, 5:27:42 AM3/6/15
to rieman...@googlegroups.com
Hi,

i tried to use the lookup-function to read needed events out of the index. I got many many NullPointerExceptions, so I tried to figure out why by doing the following:

        (where (host "doesreallynotexist")
            (prn (riemann.index/lookup (:index @core) "cpu1avg1" "avg"))
        )

and surprisingly I got this output:

log4j:WARN No appenders could be found for logger (io.netty.util.internal.logging.InternalLoggerFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
INFO [2015-03-06 10:32:12,386] main - riemann.bin - PID 14732
INFO [2015-03-06 10:32:13,027] clojure-agent-send-off-pool-0 - riemann.transport.websockets - Websockets server 0.0.0.0 5556 online
INFO [2015-03-06 10:32:13,066] clojure-agent-send-off-pool-3 - riemann.transport.udp - UDP server 0.0.0.0 5555 16384 online
INFO [2015-03-06 10:32:13,066] clojure-agent-send-off-pool-2 - riemann.transport.tcp - TCP server 0.0.0.0 5555 online
INFO [2015-03-06 10:32:13,069] main - riemann.core - Hyperspace core online
nil
nil
nil
nil
nil
nil
nil
nil
nil
nil
nil
nil
nil
#riemann.codec.Event{:host "cpu1avg1", :service "avg", :state "ok", :description "10.55% user+nice+sytem\n\n21.7  9811 beam.smp\n 0.5 29165 riemann-health\n 0.1  1080 b
eam.smp\n%CPU   PID COMMAND\n 0.0     9 rcuos/1\n 0.0   994 acpid\n 0.0   990 sshd\n 0.0  9890 head\n 0.0  9889 sort\n 0.0  9888 ps", :metric 0, :tags nil, :time 356408
583423/250, :ttl 600}
#riemann.codec.Event{:host "cpu1avg1", :service "avg", :state "ok", :description "10.55% user+nice+sytem\n\n21.7  9811 beam.smp\n 0.5 29165 riemann-health\n 0.1  1080 b
eam.smp\n%CPU   PID COMMAND\n 0.0     9 rcuos/1\n 0.0   994 acpid\n 0.0   990 sshd\n 0.0  9890 head\n 0.0  9889 sort\n 0.0  9888 ps", :metric 0, :tags nil, :time 356408
583423/250, :ttl 600}
#riemann.codec.Event{:host "cpu1avg1", :service "avg", :state "ok", :description "10.55% user+nice+sytem\n\n21.7  9811 beam.smp\n 0.5 29165 riemann-health\n 0.1  1080 b
eam.smp\n%CPU   PID COMMAND\n 0.0     9 rcuos/1\n 0.0   994 acpid\n 0.0   990 sshd\n 0.0  9890 head\n 0.0  9889 sort\n 0.0  9888 ps", :metric 0, :tags nil, :time 356408
583423/250, :ttl 600}

and this event appears 39 times.

After that, nothing else happens. I assume that there is some kind of initialization and after that no events get into that stream. Is that right? And why is this prn executed in the first place?

I tried

        (where (host "doesreallynotexist")
            prn
        )

but without any output after that INFO-output.

Thanks for your help.
Lorenz

Lorenz Leenen

unread,
Mar 9, 2015, 7:34:53 AM3/9/15
to rieman...@googlegroups.com
Hi again,

perhaps it is better to explain, what I want riemann to do.

I've got 3 VMs, which I want to monitor. In case all three of them have a cpu-load above 70% over 1 minute, I want to scale out, i.e. add an additional VM.

Here is my entire config:

; -*- mode: clojure; -*-
; vim: filetype=clojure:
; vim: set tabstop=2:
; vim: set shiftwidth=2:

(logging/init {:file "riemann.log"})

; Listen on the local interface over TCP (5555), UDP (5555), and websockets
; (5556)
;(let [host "127.0.0.1"]
(let [host "0.0.0.0"]
  (tcp-server {:host host})
  (udp-server {:host host})
  (ws-server  {:host host})
)

; Expire old events from the index every 5 seconds.
(periodically-expire 5)

(let [index (index)]
  ; Inbound events will be passed to these streams:
  (streams
        (default :ttl 60
            ; Index all events immediately.
            index
            ; Log expired events.
            ;(expired
            ;  (fn [event] (info "expired" event)))
        )
  )
)

(streams

    (by [:host :service]

        (where (service "cpu")

            (moving-time-window 60
                (smap folds/mean
                    (where (host "vm1")
                        (where (< 0.7 metric)

                            (smap #(assoc % :host "cpu1" :service "avg" :metric 1 :state "critical" :time (unix-time) :ttl 20) reinject)

                            (else
                                (smap #(assoc % :host "cpu1" :service "avg" :metric 0 :state "ok" :time (unix-time) :ttl 600) reinject)
                            )
                        )
                    )
                    (where (host "vm2")
                        (where (< 0.7 metric)
                            (smap #(assoc % :host "cpu2" :service "avg" :metric 1 :state "critical" :time (unix-time) :ttl 20) reinject)
                            (else
                                (smap #(assoc % :host "cpu2" :service "avg" :metric 0 :state "ok" :time (unix-time) :ttl 600) reinject)
                            )
                        )
                    )
                    (where (host "vm3")
                        (where (< 0.7 metric)
                            (smap #(assoc % :host "cpu3" :service "avg" :metric 1 :state "critical" :time (unix-time) :ttl 20) reinject)
                            (else
                                (smap #(assoc % :host "cpu3" :service "avg" :metric 0 :state "ok" :time (unix-time) :ttl 600) reinject)
                            )
                        )
                    )
                )
            )
        )
    )
)

(streams
    (changed-state
        (if
            (and
                (= 1 (:metric (riemann.index/lookup (:index @core) "cpu1" "avg")))
                (= 1 (:metric (riemann.index/lookup (:index @core) "cpu2" "avg")))
                (= 1 (:metric (riemann.index/lookup (:index @core) "cpu3" "avg")))
            )
            (prn "Scale out!")
            (prn "Do nothing!")
        )
    )
)


All three VMs are running "riemann-health -h myhost -i 1" with my actual IP as myhost.
The Dashboard shows, that this works just fine, but i get many many NullPointerExceptions, caused by riemann.index/lookup.

I tried to figure out why, so i changed the last stream changed-state into that:

(streams
    (changed-state
        (prn (riemann.index/lookup (:index @core) "cpu1" "avg"))
    )
)

After the initialization, I started to send events with riemann-health from the three VMs.
Immediately the output prints 9 warnings because of NullPointerExceptions followed by a "nil" (i think "nil", because the (index) is empty at this time):

WARN [2015-03-09 11:49:48,444] defaultEventExecutorGroup-2-3 - riemann.streams -  threw
java.lang.NullPointerException
        at riemann.streams$changed$stream__5078$fn__5090.invoke(streams.clj:1546)
        at riemann.streams$changed$stream__5078.invoke(streams.clj:1546)
        at riemann.streams$by_fn$stream__5054$fn__5059.invoke(streams.clj:1510)
        at riemann.streams$by_fn$stream__5054.invoke(streams.clj:1510)
        at riemann.core$stream_BANG_$fn__5678.invoke(core.clj:19)
        at riemann.core$stream_BANG_.invoke(core.clj:18)
        at riemann.transport$handle.invoke(transport.clj:159)
        at riemann.transport.tcp$tcp_handler.invoke(tcp.clj:93)
        at riemann.transport.tcp$gen_tcp_handler$fn__5904.invoke(tcp.clj:65)
        at riemann.transport.tcp.proxy$io.netty.channel.ChannelInboundHandlerAdapter$ff19274a.channelRead(Unknown Source)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
        at io.netty.channel.AbstractChannelHandlerContext.access$700(AbstractChannelHandlerContext.java:32)
        at io.netty.channel.AbstractChannelHandlerContext$8.run(AbstractChannelHandlerContext.java:324)
        at io.netty.util.concurrent.DefaultEventExecutor.run(DefaultEventExecutor.java:36)
        at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
        at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
        at java.lang.Thread.run(Thread.java:745)
nil

After that, I get 6 events, respectively followed by a NullPointerException:

#riemann.codec.Event{:host "cpu1", :service "avg", :state "ok", :description "0.84% user+nice+sytem\n\n18.9 12908 beam.smp\n 2.6 12943 riemann-health\n 0.1  1080 beam.smp\n%CPU   PID COMMAND\n 0.0     9 rcuos/1\n 0.0   994 acpid\n 0.0   9
90 sshd\n 0.0   959 getty\n 0.0   957 getty\n 0.0   956 getty", :metric 0, :tags nil, :time 1425898189797/1000, :ttl 600}
WARN [2015-03-09 11:49:49,815] defaultEventExecutorGroup-2-3 - riemann.streams -  threw
java.lang.NullPointerException
        at riemann.streams$changed$stream__5078$fn__5090.invoke(streams.clj:1546)
        at riemann.streams$changed$stream__5078.invoke(streams.clj:1546)
        at riemann.streams$by_fn$stream__5054$fn__5059.invoke(streams.clj:1510)
        at riemann.streams$by_fn$stream__5054.invoke(streams.clj:1510)
        at riemann.core$stream_BANG_$fn__5678.invoke(core.clj:19)
        at riemann.core$stream_BANG_.invoke(core.clj:18)
        at riemann.config$reinject.invoke(config.clj:231)
        at riemann.streams$smap$stream__3695$fn__3706.invoke(streams.clj:163)
        at riemann.streams$smap$stream__3695.invoke(streams.clj:163)
        at riemann.config$eval103$new_fork__5052__auto____104$stream__109$fn__130.invoke(test:43)
        at riemann.config$eval103$new_fork__5052__auto____104$stream__109.invoke(test:43)
        at riemann.config$eval103$new_fork__5052__auto____104$stream__152$fn__157.invoke(test:42)
        at riemann.config$eval103$new_fork__5052__auto____104$stream__152.invoke(test:42)
        at riemann.streams$smap$stream__3695$fn__3706.invoke(streams.clj:163)
        at riemann.streams$smap$stream__3695.invoke(streams.clj:163)
        at riemann.streams$moving_time_window$stream__3915$fn__3934.invoke(streams.clj:326)
        at riemann.streams$moving_time_window$stream__3915.invoke(streams.clj:326)
        at riemann.config$eval103$new_fork__5052__auto____104$stream__375$fn__380.invoke(test:38)
        at riemann.config$eval103$new_fork__5052__auto____104$stream__375.invoke(test:38)
        at riemann.streams$by_fn$stream__5054$fn__5059.invoke(streams.clj:1510)
        at riemann.streams$by_fn$stream__5054.invoke(streams.clj:1510)
        at riemann.core$stream_BANG_$fn__5678.invoke(core.clj:19)
        at riemann.core$stream_BANG_.invoke(core.clj:18)
        at riemann.transport$handle.invoke(transport.clj:159)
        at riemann.transport.tcp$tcp_handler.invoke(tcp.clj:93)
        at riemann.transport.tcp$gen_tcp_handler$fn__5904.invoke(tcp.clj:65)
        at riemann.transport.tcp.proxy$io.netty.channel.ChannelInboundHandlerAdapter$ff19274a.channelRead(Unknown Source)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
        at io.netty.channel.AbstractChannelHandlerContext.access$700(AbstractChannelHandlerContext.java:32)
        at io.netty.channel.AbstractChannelHandlerContext$8.run(AbstractChannelHandlerContext.java:324)
        at io.netty.util.concurrent.DefaultEventExecutor.run(DefaultEventExecutor.java:36)
        at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
        at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
        at java.lang.Thread.run(Thread.java:745)

Nothing more happens, because no state changes. So I use "stress" at VM1 to, to change the state. Immediately there is one more Exception:

WARN [2015-03-09 11:50:55,793] defaultEventExecutorGroup-2-3 - riemann.streams -  threw
java.lang.NullPointerException
        at riemann.streams$changed$stream__5078$fn__5090.invoke(streams.clj:1546)
        at riemann.streams$changed$stream__5078.invoke(streams.clj:1546)
        at riemann.streams$by_fn$stream__5054$fn__5059.invoke(streams.clj:1510)
        at riemann.streams$by_fn$stream__5054.invoke(streams.clj:1510)
        at riemann.core$stream_BANG_$fn__5678.invoke(core.clj:19)
        at riemann.core$stream_BANG_.invoke(core.clj:18)
        at riemann.transport$handle.invoke(transport.clj:159)
        at riemann.transport.tcp$tcp_handler.invoke(tcp.clj:93)
        at riemann.transport.tcp$gen_tcp_handler$fn__5904.invoke(tcp.clj:65)
        at riemann.transport.tcp.proxy$io.netty.channel.ChannelInboundHandlerAdapter$ff19274a.channelRead(Unknown Source)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
        at io.netty.channel.AbstractChannelHandlerContext.access$700(AbstractChannelHandlerContext.java:32)
        at io.netty.channel.AbstractChannelHandlerContext$8.run(AbstractChannelHandlerContext.java:324)
        at io.netty.util.concurrent.DefaultEventExecutor.run(DefaultEventExecutor.java:36)
        at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
        at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
        at java.lang.Thread.run(Thread.java:745)

and after passing the threshold from the moving-time-window one more (but without the changed event!):

WARN [2015-03-09 11:51:34,795] defaultEventExecutorGroup-2-3 - riemann.streams -  threw
java.lang.NullPointerException
        at riemann.streams$changed$stream__5078$fn__5090.invoke(streams.clj:1546)
        at riemann.streams$changed$stream__5078.invoke(streams.clj:1546)
        at riemann.streams$by_fn$stream__5054$fn__5059.invoke(streams.clj:1510)
        at riemann.streams$by_fn$stream__5054.invoke(streams.clj:1510)
        at riemann.core$stream_BANG_$fn__5678.invoke(core.clj:19)
        at riemann.core$stream_BANG_.invoke(core.clj:18)
        at riemann.config$reinject.invoke(config.clj:231)
        at riemann.streams$smap$stream__3695$fn__3706.invoke(streams.clj:163)
        at riemann.streams$smap$stream__3695.invoke(streams.clj:163)
        at riemann.config$eval103$new_fork__5052__auto____104$stream__109$fn__114.invoke(test:43)
        at riemann.config$eval103$new_fork__5052__auto____104$stream__109.invoke(test:43)
        at riemann.config$eval103$new_fork__5052__auto____104$stream__152$fn__157.invoke(test:42)
        at riemann.config$eval103$new_fork__5052__auto____104$stream__152.invoke(test:42)
        at riemann.streams$smap$stream__3695$fn__3706.invoke(streams.clj:163)
        at riemann.streams$smap$stream__3695.invoke(streams.clj:163)
        at riemann.streams$moving_time_window$stream__3915$fn__3934.invoke(streams.clj:326)
        at riemann.streams$moving_time_window$stream__3915.invoke(streams.clj:326)
        at riemann.config$eval103$new_fork__5052__auto____104$stream__375$fn__380.invoke(test:38)
        at riemann.config$eval103$new_fork__5052__auto____104$stream__375.invoke(test:38)
        at riemann.streams$by_fn$stream__5054$fn__5059.invoke(streams.clj:1510)
        at riemann.streams$by_fn$stream__5054.invoke(streams.clj:1510)
        at riemann.core$stream_BANG_$fn__5678.invoke(core.clj:19)
        at riemann.core$stream_BANG_.invoke(core.clj:18)
        at riemann.transport$handle.invoke(transport.clj:159)
        at riemann.transport.tcp$tcp_handler.invoke(tcp.clj:93)
        at riemann.transport.tcp$gen_tcp_handler$fn__5904.invoke(tcp.clj:65)
        at riemann.transport.tcp.proxy$io.netty.channel.ChannelInboundHandlerAdapter$ff19274a.channelRead(Unknown Source)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
        at io.netty.channel.AbstractChannelHandlerContext.access$700(AbstractChannelHandlerContext.java:32)
        at io.netty.channel.AbstractChannelHandlerContext$8.run(AbstractChannelHandlerContext.java:324)
        at io.netty.util.concurrent.DefaultEventExecutor.run(DefaultEventExecutor.java:36)
        at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
        at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
        at java.lang.Thread.run(Thread.java:745)

I wonder why these exceptions are beeing thrown and why the last situation (exceeding the threshold) doesn't print the event, that exceeds that threshold.

I think Riemann is perfect for this monitoring/scaling-problem, but I really don't know, how to solve that problem with lookup.
I am thankful for any hints and help.

Tanks in advance,
Lorenz
1.png
2 with stress.png

Aphyr

unread,
Mar 9, 2015, 11:52:08 AM3/9/15
to rieman...@googlegroups.com
On 03/09/2015 04:34 AM, Lorenz Leenen wrote:
> (= 1 (:metric (riemann.index/lookup (:index @core) "cpu1" "avg")))

You're trying to compare the number 1 with the metric of the current value in
the index, which is nil. But on top of that, you're doing this at *config* time
and returning `(prn "do nothing")` (which evaluates to nil) to
`(changed-state)`. Since changed-state has a nil child, and expects a function,
it's throwing NullPointerException on every state change.

I'd probably just do something like

(where (service #"cpu\d+")
(coalesce (smap (fn [events]
(if (every? #{"critical"} (map :state events))
(prn "scale out")
(prn "fine")))))

No need to query the index; coalesce will preserve all the cpu events and pass
them off to the next stream nicely.

--Kyle

Lorenz Leenen

unread,
Mar 11, 2015, 12:09:47 PM3/11/15
to rieman...@googlegroups.com
Thanks very much Kyle, for your help. Now I understand that NullPointerException. That really was a beginner's mistake.

Your solution works fine. Now I need only on more detail, to complete my work. I thought, I am able to manage it by myself, but it seems I am not.

The scaling decision ist'n only about three VMs, that are above 70% CPU-load over 1 minute, but one of them has to be above 85% over 5 minutes.
So I made two moving-time-windows, one over 60 seconds and one over 300. I reinjected the events passing the respective threshold with service "thresh1" or "thresh5" for 1 or 5 min-time-win.
I tried to do something like in your solution, but now coalesce gets two different services and I don't know, how to split the sequence passed off by coalesce.
Something like that:

(where (or (service "thresh1") (service "thresh5"))
  (coalesce (fn [events]
                    (if (and (every?    *thresh1 is critical*)
                               (any?       *thresh5 is critical*))

                        (prn"scale out")
                        (prn "fine")))))

Just to be clear, the scaling-decision should be made, if (and (and vm1thresh1 vm2thresh1 vm3thresh1) (or vm1thresh5 vm2thresh5 vm3thresh5)) is true.
I thought the "or" could be done by a function "any?" which i didn't found in the Clojure API. What i found was "not-any?", so perhaps I need to do (not (not-any? ...)) to get an "or"?

Thanks again in advance for your help,
Lorenz

Kyle Kingsbury

unread,
Mar 11, 2015, 12:12:52 PM3/11/15
to Lorenz Leenen, rieman...@googlegroups.com

Oh, for (any? ...) use (some pred coll). I agree, that is a tad confusing. ;-)

--
You received this message because you are subscribed to the Google Groups "Riemann Users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to riemann-user...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Lorenz Leenen

unread,
Mar 11, 2015, 12:23:13 PM3/11/15
to rieman...@googlegroups.com, lorenz...@gmail.com
Oh, ok. Thanks. But how do I do the predicate with two important things, in this case service "thresh1" and state "critical"?
You did it like this: (every? #{"critical"} (map :state events).
I need to know if every event with service "thresh1" have the state "critical".

Kyle Kingsbury

unread,
Mar 11, 2015, 12:27:22 PM3/11/15
to Lorenz Leenen, rieman...@googlegroups.com

You can use (filter) to filter events to those with a particular service. See Clojure from the Ground Up, chapter 3, "sequences". :)

Lorenz Leenen

unread,
Mar 12, 2015, 8:10:01 AM3/12/15
to rieman...@googlegroups.com, lorenz...@gmail.com
Thank you again very much, Kyle. Your Clojure-Tutorial helped me write a little recursive function, that splits the sequence.

(defn buildseqbyserv [s xs]
    (if (first xs)
        (if (= s (:service (first xs)))
            (cons (first xs)
                        (buildseqbyserv s (rest xs)))
            (buildseqbyserv s (rest xs)))
        (list)))


Lorenz
Reply all
Reply to author
Forward
0 new messages