I get server stats via collectD send to Riemann.
In Riemann I filter in different streams and would like to add additional tags to some events before I send them to influxDB.
For some reason the attributes get recognized and forwarded as fields to influxDB but the tags don't get listed. Also the :tag-fields parameter in the def influxdb doesn't seem to work anymore. It does in the def influxdb-09, but that one doesn't connect with my influx instance.
I'd highly appreciate some help. I am new to Riemann and Clojure and quite puzzled over this. The custom tags are important for the influxDB design and performance. I can't imagine it's not possible to adjust them in Riemann.
I use riemann 0.2.11 and influxDB 0.9
Here ist the riemann.config I use to testing the tagging functionality:
(logging/init {:file "/var/log/riemann/riemann.log"})
(def influxdb-creds { :version :0.9 :host "localhost" :db "riemann_071116" :port 8086 :tag-fields #{:host :sys :env}}) ;doesn't transmit ;; Riemann log file location(logging/init {:file "/var/log/riemann/riemann.log"})
(let [host "0.0.0.0"] (tcp-server {:host host}) (udp-server {:host host}) (ws-server {:host host}))
(periodically-expire 10 {:keep-keys [:host :service :tags, :state, :description, :metric]})
(let [index (index) influxBatchSender (batch 100 1/10 (async-queue! :agg {:queue-size 1000 :core-pool-size 4 :max-pool-size 32 :keep-alive-time 60000} (influxdb influxdb-creds)))]
; Inbound events will be passed to these streams: (streams (default :ttl 60 ; add tag to all events - does not work (tag "foo" index)
; Index all events immediately. (where (not (tagged "notification")) index)
(where (not (service #"^riemann.")) (where (service "myservice") influxBatchSender)))))
;print events to log(streams (where (tagged "test") ; prn #(info %)))
--
You received this message because you are subscribed to the Google Groups "Riemann Users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to riemann-users+unsubscribe@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
This may be slightly confusing: Riemann events are maps of keys to values. One of those keys is :tags, whose value is a sequence of strings. (with :sys "sys!" ...) adds a :sys key to an event, but leaves its tags unchanged.
I think Influx uses the terms a little differently, which might account for some confusion ;)
--Kyle
To unsubscribe from this group and stop receiving emails from it, send an email to riemann-user...@googlegroups.com.
(ns testing.etc.influx (require [clojure.string :as str]))
(defn measurement [statsdstring] (first(str/split statsdstring #",")))
(defn tags [statsdstring] (def tagsvector (rest(str/split statsdstring #","))) (reduce (fn [m v] (assoc m (keyword (first v)) (second v))) {} (map (fn [s] (str/split s #"=")) tagsvector)))
(require [testing.etc.influx :refer :all])
(def influxdb-creds { :version :0.9 :host "localhost"
:db "riemann_091116" :port 8086 :tag-fields #{} :table-name ""})
;; Riemann log file location(logging/init {:file "/var/log/riemann/riemann.log"})
(let [host "0.0.0.0"] (tcp-server {:host host}) (udp-server {:host host}) (ws-server {:host host}))
(periodically-expire 10 {:keep-keys [:host :service :tags, :state, :description, :metric]})
(let [index (index) influxBatchSender (batch 100 1/10 (async-queue! :agg {:queue-size 1000 :core-pool-size 4 :max-pool-size 32 :keep-alive-time 60000} (influxdb influxdb-creds)))]
; Inbound events will be passed to these streams: (streams (default :ttl 60
; Index events immediately.
(where (not (tagged "notification")) index)
(where (service "myservice") (where (service #"^statsd") (with :table-name (measurement (#(info (:service %)))) :tag-fields (tags (#(info (:service %)))) influxBatchSender))))))
;print events to log(streams
(where (service "myservice") #(info %)))
To unsubscribe from this group and stop receiving emails from it, send an email to riemann-user...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
(with :table-name (measurement (#(info (:service %)))) :tag-fields (tags (#(info (:service %)))) influxBatchSender)
(measurement (#(info (:service %))))
. You probably want to get the :service value of your event. You don't have to call info. Info is a function for logging. Unfortunaly, as far as i know, you cannot access the event in the "with" stream. You can do (with :table-name "foo") but you cannot do (with :table-name (:service event)).To unsubscribe from this group and stop receiving emails from it, send an email to riemann-users+unsubscribe@googlegroups.com.
(require '[testing.influx :refer :all])
(def influxdb-creds { :version :0.9 :host "localhost" :db "riemann_091116" :port 8086
:tag-fields #{:farm :farm_node_id :host}})
;; Riemann log file location(logging/init {:file "/var/log/riemann/riemann.log"})
(let [host "0.0.0.0"] (tcp-server {:host host}) (udp-server {:host host}) (ws-server {:host host}))
(periodically-expire 10 {:keep-keys [:host :service :tags, :state, :description, :metric]})
(let [index (index) influxBatchSender (batch 100 1/10 (async-queue! :agg {:queue-size 1000 :core-pool-size 4 :max-pool-size 32 :keep-alive-time 60000} (influxdb influxdb-creds)))]
(streams (default :ttl 60
(where (service #"^statsd")
(tag "influx" #(info "Tagged with influx: " %))
(smap (fn [ev] (conj (assoc ev :service (getmeasurement (:service ev)) :tag-fields (keys (gettags (:service ev)))) (gettags (:service ev)))) influxBatchSender)))))
(defn gettags [statsdstring]
(def tagsvector (rest(str/split statsdstring #","))) (reduce (fn [m v] (assoc m (keyword (first v)) (second v))) {} (map (fn [s] (str/split s #"=")) tagsvector)))
WARN [2016-11-10 18:54:19,444] pool-1-thread-4 - riemann.streams - riemann.influxdb$influxdb_9$stream__9428@5c5aa320 threwjava.lang.ClassCastException: clojure.lang.APersistentMap$KeySeq cannot be cast to java.lang.Number at clojure.lang.Numbers.isNeg(Numbers.java:100) at clojure.pprint$fixed_float.invokeStatic(cl_format.clj:676) at clojure.pprint$fixed_float.invoke(cl_format.clj:672) at clojure.lang.AFn.applyToHelper(AFn.java:160) at clojure.lang.AFn.applyTo(AFn.java:144) at clojure.core$apply.invokeStatic(core.clj:646) at clojure.pprint$execute_format$fn__9038.invoke(cl_format.clj:1907)
at clojure.lang.AFn.applyToHelper(AFn.java:156) at clojure.lang.AFn.applyTo(AFn.java:144)
at clojure.core$apply.invokeStatic(core.clj:646) at clojure.pprint$map_passing_context.invokeStatic(utilities.clj:30) at clojure.pprint$execute_format.invokeStatic(cl_format.clj:1879) at clojure.pprint$execute_format.invoke(cl_format.clj:1879) at clojure.pprint$execute_format$fn__9036.invoke(cl_format.clj:1893) at clojure.pprint$execute_format.invokeStatic(cl_format.clj:1892) at clojure.pprint$execute_format.invoke(cl_format.clj:1879) at clojure.pprint$cl_format.invokeStatic(cl_format.clj:64) at clojure.pprint$cl_format.doInvoke(cl_format.clj:27) at clojure.lang.RestFn.invoke(RestFn.java:442) at riemann.influxdb$kv_encode_9$fn__9379.invoke(influxdb.clj:25) at clojure.core$map$fn__4785.invoke(core.clj:2646) at clojure.lang.LazySeq.sval(LazySeq.java:40) at clojure.lang.LazySeq.seq(LazySeq.java:49) at clojure.lang.Cons.next(Cons.java:39) at clojure.lang.RT.next(RT.java:688) at clojure.core$next__4341.invokeStatic(core.clj:64) at clojure.string$join.invokeStatic(string.clj:191) at clojure.string$join.invoke(string.clj:180) at riemann.influxdb$kv_encode_9.invokeStatic(influxdb.clj:21) at riemann.influxdb$kv_encode_9.invoke(influxdb.clj:20) at riemann.influxdb$lineprotocol_encode_9.invokeStatic(influxdb.clj:29) at riemann.influxdb$lineprotocol_encode_9.invoke(influxdb.clj:28) at clojure.core$map$fn__4785.invoke(core.clj:2644) at clojure.lang.LazySeq.sval(LazySeq.java:40) at clojure.lang.LazySeq.seq(LazySeq.java:49) at clojure.lang.LazySeq.first(LazySeq.java:71) at clojure.lang.RT.first(RT.java:667) at clojure.core$first__4339.invokeStatic(core.clj:55) at clojure.string$join.invokeStatic(string.clj:180) at clojure.string$join.invoke(string.clj:180) at riemann.influxdb$influxdb_9$stream__9428.invoke(influxdb.clj:181) at riemann.streams$execute_on$stream__6765$runner__6766$fn__6777.invoke(streams.clj:268) at riemann.streams$execute_on$stream__6765$runner__6766.invoke(streams.clj:268) at clojure.lang.AFn.applyToHelper(AFn.java:152) at clojure.lang.AFn.applyTo(AFn.java:144) at clojure.core$apply.invokeStatic(core.clj:646) at clojure.core$with_bindings_STAR_.invokeStatic(core.clj:1881) at clojure.core$with_bindings_STAR_.doInvoke(core.clj:1881) at clojure.lang.RestFn.invoke(RestFn.java:425) at clojure.lang.AFn.applyToHelper(AFn.java:156) at clojure.lang.RestFn.applyTo(RestFn.java:132) at clojure.core$apply.invokeStatic(core.clj:650) at clojure.core$bound_fn_STAR_$fn__4671.doInvoke(core.clj:1911) at clojure.lang.RestFn.invoke(RestFn.java:397) at clojure.lang.AFn.run(AFn.java:22) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
gettags function can be :
(defn get-tags [statd-string]
(->> (rest (str/split statd-string #",")) ;; get the tags
(map #(str/split % #"=")) ;; split => [key, value]
(map #(assoc % 0 (keyword (first %)))) ;; transform keyword => [:key, value]
(into {}))) ;; array => map
->> is a threading macro (http://clojure.org/guides/threading_macros)
To unsubscribe from this group and stop receiving emails from it, send an email to riemann-users+unsubscribe@googlegroups.com.
To unsubscribe from this group and stop receiving emails from it, send an email to riemann-users+unsubscribe@googlegroups.com.
To unsubscribe from this group and stop receiving emails from it, send an email to riemann-user...@googlegroups.com.
To unsubscribe from this group and stop receiving emails from it, send an email to riemann-users+unsubscribe@googlegroups.com.