All,
I'll probably write a blog post on this, but I wanted to share some work I've done today.
http://vichargrave.com/ossec-log-management-with-elasticsearch/ shows how to use OSSEC's syslog output to route messages to Elasticsearch. The problem with this method is it uses UDP. Even when sending packets to a local process UDP by definition is unreliable. Garbage collections and other system events can cause packets to be lost. I've found it tends to cap out at around 1,500 messages per minute.
To address this issue I've put together a logstash config that will read the alerts from /var/ossec/logs/alerts/alerts.log. On top of solving the reliability issue, it also fixes issues with multi-lines being lost, and adds geoip lookups for the src_ip. I tested it against approximately 1GB of alerts (3M events).
input {
file {
type => "ossec"
path => "/var/ossec/logs/alerts/alerts.log"
sincedb_path => "/opt/logstash/"
codec => multiline {
pattern => "^\*\*"
negate => true
what => "previous"
}
}
}
filter {
if [type] == "ossec" {
# Parse the header of the alert
grok {
match => ["message", "(?m)\*\* Alert %{DATA:timestamp_seconds}:%{SPACE}%{WORD}?%{SPACE}\- %{DATA:ossec_group}\n%{YEAR} %{SYSLOGTIMESTAMP:syslog_timestamp} \(%{DATA:reporting_host}\) %{IP:reporting_ip}\-\>%{DATA:reporting_source}\nRule: %{NONNEGINT:rule_number} \(level %{NONNEGINT:severity}\) \-\> '%{DATA:signature}'\n%{GREEDYDATA:remaining_message}"]
# Matches 2014 Mar 08 00:00:00 ossec-server01->/var/log/auth.log
match => ["message", "(?m)\*\* Alert %{DATA:timestamp_seconds}:%{SPACE}%{WORD}?%{SPACE}\- %{DATA:ossec_group}\n%{YEAR} %{SYSLOGTIMESTAMP:syslog_timestamp} %{DATA:reporting_host}\-\>%{DATA:reporting_source}\nRule: %{NONNEGINT:rule_number} \(level %{NONNEGINT:severity}\) \-\> '%{DATA:signature}'\n%{GREEDYDATA:remaining_message}"]
}
# Attempt to parse additional data from the alert
grok {
match => ["remaining_message", "(?m)(Src IP: %{IP:src_ip}%{SPACE})?(Src Port: %{NONNEGINT:src_port}%{SPACE})?(Dst IP: %{IP:dst_ip}%{SPACE})?(Dst Port: %{NONNEGINT:dst_port}%{SPACE})?(User: %{USER:acct}%{SPACE})?%{GREEDYDATA:real_message}"]
}
geoip {
source => "src_ip"
}
mutate {
convert => [ "severity", "integer"]
replace => [ "@message", "%{real_message}" ]
replace => [ "@fields.hostname", "%{reporting_host}"]
add_field => [ "@fields.product", "ossec"]
add_field => [ "raw_message", "%{message}"]
add_field => [ "ossec_server", "%{host}"]
remove_field => [ "type", "syslog_program", "syslog_timestamp", "reporting_host", "message", "timestamp_seconds", "real_message", "remaining_message", "path", "host", "tags"]
}
}
}
output {
elasticsearch {
host => "10.0.0.1"
cluster => "mycluster"
}
}
Here are a few examples of the output this generates.
{
"@timestamp":"2014-03-08T20:34:08.847Z",
"@version":"1",
"ossec_group":"syslog,sshd,invalid_login,authentication_failed,",
"reporting_ip":"10.1.2.3",
"reporting_source":"/var/log/auth.log",
"rule_number":"5710",
"severity":5,
"signature":"Attempt to login using a non-existent user",
"src_ip":"112.65.211.164",
"geoip":{
"ip":"112.65.211.164",
"country_code2":"CN",
"country_code3":"CHN",
"country_name":"China",
"continent_code":"AS",
"region_name":"23",
"city_name":"Shanghai",
"latitude":31.045600000000007,
"longitude":121.3997,
"timezone":"Asia/Shanghai",
"real_region_name":"Shanghai",
"location":[
121.3997,
31.045600000000007
]
},
"@message":"Mar 8 01:00:59 someserver sshd[22874]: Invalid user oracle from 112.65.211.164\n",
"@fields.product":"ossec",
"raw_message":"** Alert 1394240459.2305861: - syslog,sshd,invalid_login,authentication_failed,\n2014 Mar 08 01:00:59 (someserver.somedomain.com) 10.1.2.3->/var/log/auth.log\nRule: 5710 (level 5) -> 'Attempt to login using a non-existent user'\nSrc IP: 112.65.211.164\nMar 8 01:00:59 someserver sshd[22874]: Invalid user oracle from 112.65.211.164\n",
}
and
{
"@timestamp":"2014-03-08T21:15:23.278Z",
"@version":"1",
"ossec_group":"syslog,sudo",
"reporting_source":"/var/log/auth.log",
"rule_number":"5402",
"severity":3,
"signature":"Successful sudo to ROOT executed",
"acct":"nagios",
"@message":"Mar 8 00:00:03 ossec-server sudo: nagios : TTY=unknown ; PWD=/ ; USER=root ; COMMAND=/usr/lib/some/command",
"@fields.hostname":"ossec-server",
"@fields.product":"ossec",
"raw_message":"** Alert 1394236804.1451: - syslog,sudo\n2014 Mar 08 00:00:04 ossec-server->/var/log/auth.log\nRule: 5402 (level 3) -> 'Successful sudo to ROOT executed'\nUser: nagios\nMar 8 00:00:03 ossec-server sudo: nagios : TTY=unknown ; PWD=/ ; USER=root ; COMMAND=/usr/lib/some/command",
}
If you combine the above with a custom Elasticsearch template, you can put together some really nice Kibana dashboards.
--Josh