Issue with Sending Events to InfluxDB via Riemann

9 views
Skip to first unread message

Aniruddh Singh

unread,
May 8, 2024, 7:08:21 AMMay 8
to Riemann Users
I'm facing two issues while attempting to send events to InfluxDB using Riemann:

Warning about Line Protocol: I keep receiving the following warning message:

WARNING: The writes: java.util.stream.ReferencePipeline$3992913c doesn't contains any Line Protocol, skipping
This warning persists, and I suspect it's related to the format of the data being sent to InfluxDB. How can I resolve this issue 
Missing Fields in InfluxDB Measurement: I've noticed that only events from riemann task 0 are being sent to InfluxDB, while events from defaultEventExecutorGroup are not included. However, I need all relevant events to be included in the InfluxDB measurement. Below is my Riemann configuration file:
clojure

; -*- mode: clojure; -*-
; vim: filetype=clojure

(logging/init {:file "/var/log/riemann/riemann.log"})

; Listen on the local interface over TCP (5555), UDP (5555), and websockets
; (5556)
(let [host "127.0.0.1"]
  (tcp-server {:host host})
  (udp-server {:host host})
  (ws-server  {:host host}))

; Expire old events from the index every 5 seconds.
(periodically-expire 5)

(let [index (index)]
  ; Inbound events will be passed to these streams:
  (streams
    index
    ; Log expired events.
    (expired
      (fn [event] (info "expired" event)))
    ; Transform events
    (smap
      (fn [event]
        (merge event
          {:metric (or (:metric event) 0.0) ; Add a default metric if not present
           :tags (or (:tags event) [])
           :time (or (:time event) (System/currentTimeMillis)) ; Use current time if not present
           ;; Add other fields as needed
           :service (or (:service event) "unknown")
           :state (or (:state event) "unknown")
           :description (or (:description event) "unknown")
           :host (or (:host event) "unknown")
           :user_agent (or (:user_agent event) "unknown")
           :file (or (:file event) "unknown")
           :data (or (:data event) "unknown")
           :unique_id (or (:unique_id event) "unknown")
           :client_ip (or (:client_ip event) "unknown")})))
    ; Forward events to InfluxDB2
    (influxdb2 {:host "192.168.64.13"
                :organization "riemann"
                :bucket "riemann"
                :token "xxx"
                :port 8086}))))
And here are some example logs from Riemann:

INFO [2024-05-08 16:16:47,328] defaultEventExecutorGroup-2-1 - riemann.config - expired #riemann.codec.Event{:host localhost, :service modsecurity, :state nil, :description nil, :metric 0.0, :tags nil, :time 1715164806, :ttl nil, :user_agent curl/8.5.0, :file /etc/nginx/modsec/coreruleset/rules/REQUEST-932-APPLICATION-ATTACK-RCE.conf, :data Matched Data: bin/bash found within ARGS:exec: /bin/bash, :unique_id 171516480691.657915, :client_ip None}
INFO [2024-05-08 16:16:51,599] riemann task 0 - riemann.config - expired {:host localhost, :service modsecurity, :state expired, :time 857582605793/500}
Below is thr Python script i have written to parse

import re

import datetime

import time

from bernhard import Client


def parse_modsec_log(log_file_path):

    parsed_logs = []

    current_log = {}

    current_section = None


    with open(log_file_path, 'r') as file:

        for line in file:

            line = line.strip()

            if line.startswith('---'):

                current_section = line[14:15]

                if current_section == 'Z':

                    # End of log entry, add current_log to parsed_logs

                    parsed_logs.append(current_log)

                    # Reset current_log for the next log entry

                    current_log = {}

            else:

                if current_section == 'A':

                    time_match = re.search(r'\[(\d+/\w+/\d+:\d+:\d+:\d+)', line)

                    if time_match:

                        # Convert timestamp to Unix epoch format

                        timestamp_str = time_match.group(1)

                        timestamp_obj = datetime.datetime.strptime(timestamp_str, '%d/%b/%Y:%H:%M:%S')

                        current_log['timestamp'] = int(timestamp_obj.timestamp())


                        # Extract client IP

                        client_ip_match = re.search(r'(\d+\.\d+\.\d+\.\d+)', line)

                        if client_ip_match:

                            current_log['client_ip'] = client_ip_match.group(1)

                elif current_section == 'B':

                    match_host = re.match(r'Host: (.+)', line)

                    match_user_agent = re.match(r'User-Agent: (.+)', line)

                    if match_host:

                        current_log['host'] = match_host.group(1)

                    elif match_user_agent:

                        current_log['user_agent'] = match_user_agent.group(1)

                elif current_section == 'D':

                    current_log['html_response'] = line

                elif current_section == 'E':

                    current_log['http_response'] = line

                elif current_section == 'F':

                    response_headers = line.split(': ')

                    if len(response_headers) == 2:

                        current_log[response_headers[0].lower()] = response_headers[1]

                elif current_section == 'H':

                    modsec_warning_match = re.search(r'ModSecurity: Warning.*?\[msg "([^"]+)"\].*?\[tag "([^"]+)"\]', line)

                    if modsec_warning_match:

                        current_log['action'] = 'Warning'

                        current_log['msg'] = modsec_warning_match.group(1)

                        current_log['tag'] = modsec_warning_match.group(2)

                        file_match = re.search(r'\[file "([^"]+)"\]', line)

                        if file_match:

                            current_log['file'] = file_match.group(1)

                        data_match = re.search(r'\[data "([^"]+)"\]', line)

                        if data_match:

                            current_log['data'] = data_match.group(1)

                        unique_id_match = re.search(r'\[unique_id "([^"]+)"\]', line)

                        if unique_id_match:

                            current_log['unique_id'] = unique_id_match.group(1)

    return parsed_logs 

def send_to_riemann(event):

    # Initialize Riemann client

    client = Client()


    # Send event to Riemann server

    client.send(event)



log_file_path = "/var/log/modsec_audit.log"

parsed_logs = parse_modsec_log(log_file_path)

for log in parsed_logs:

    event = {

        'time': log.get('timestamp'),

        'service': 'modsecurity',

        'metric': 0,

        'attributes': { 'host': log.get('host'),

        'user_agent': log.get('user_agent'),

        'file': log.get('file'),

        'data': log.get('data'),

        'unique_id': log.get('unique_id'),

        'client_ip': log.get('client_ip')

        }

        }

    send_to_riemann(event)

in influx i am only getting the service name and host ip which seems to be coming from riemann task 0

and nothing else
I would appreciate any guidance or insights on resolving these issues. Thank you!


Reply all
Reply to author
Forward
0 new messages