I've tried many ways before, and the best way i found was to direct filebeat to send all logs through a logstash server where you can do additional processing. and create more indices custom indices. In the example below (my configuration) for multi-index it changes from wazuh-alerts-4.x-2024.01.01 to wazuh-alerts-clientone-2024.01.01.
The idea is to create a keydb type of list and reference it for rule processing. in this example logstash is looking for a field called location (for Fortigate we use the location field, which is an IP address from where the fortigate logs come from), then it checks if the IP found in the log exists in the /etc/logstash/keydb/location.csv file and assigns the index_suffix a value - either "default" or the value that correlates the location key, so if the location was 111.111.111.111 logstash would send the log to the custom index wazuh-alerts-4.x-fortigateclient1-2024.01.01, and if it does not meet any of values it will be wazuh-alerts-4.x-default-2024.01.01.
if [location] {
translate {
source => "[location]"
target => "[@metadata][index_suffix]"
dictionary_path => "/etc/logstash/keydb/location.csv"
fallback => "default"
}
The reason i use a custom wazuh-template for output is because from creating multi-custom indexes you will also start creating instead of 6 shards per day an additional 6 per each indice you create, which becomes a problem if you have many custom indexes (default maximum amount of shards is 1000, but it can be increased by either adding more indexer nodes or increasing the maximum shard per node, which will have impact on your resource). I've made it so i create 1 primary and 1 replica instead of 3 primaries and 1 replica for each primary per index.
input {
beats {
port => 5044
}
beats {
port => 5045
ssl_enabled => true
ssl_certificate_authorities => ["/etc/logstash/certs/root-ca.pem"]
ssl_certificate => "/etc/logstash/certs/logstash.pem"
ssl_key => "/etc/logstash/certs/logstash-key.pem"
ssl_client_authentication => "required"
}
}
filter {
json {
source => "message"
}
mutate {
replace => { "[host]" => "%{[host][name]}" }
}
if [location] {
translate {
source => "[location]"
target => "[@metadata][index_suffix]"
dictionary_path => "/etc/logstash/keydb/location.csv"
fallback => "default"
}
} else if [data][office365][OrganizationId] {
translate {
source => "[data][office365][OrganizationId]"
target => "[@metadata][index_suffix]"
dictionary_path => "/etc/logstash/keydb/o365.csv"
fallback => "default"
}
} else if [data][ms-graph][tennantId] {
translate {
source => "[data][ms-graph][tennantId]"
target => "[@metadata][index_suffix]"
dictionary_path => "/etc/logstash/keydb/o365.csv"
fallback => "default"
}
} else {
mutate {
add_field => { "[@metadata][index_suffix]" => "default" }
}
}
if [previous_output] {
mutate {
remove_field => ["previous_output"]
}
}
if [message] {
mutate {
remove_field => ["message"]
}
}
mutate {
remove_field => ["[event][original]"]
}
}
output {
opensearch {
hosts => ["ipofindexer1:9200", "ipofindexer2:9200"]
user => "$username"
password => "$password"
ssl => true
cacert => "/etc/logstash/certs/root-ca.pem"
index => "wazuh-alerts-4.x-%{[@metadata][index_suffix]}-%{+YYYY.MM.dd}"
template => "/etc/logstash/templates/wazuh-template.json"
template_name => "wazuh-template"
template_overwrite => true
manage_template => true
}
}