I modified the Iris.py so that I could send notifications to a ntfy server. Here is my config:
#!/var/ossec/framework/python/bin/python3
# custom-wazuh_ntfy.py
# Custom Wazuh integration script to send alerts to ntfy
import sys
import json
import requests
import logging
from datetime import datetime
# Configure logging
logging.basicConfig(filename='/var/ossec/logs/integrations.log', level=logging.INFO,
format='%(asctime)s %(levelname)s: %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
# Function to create a formatted string from alert details
def format_alert_details(alert_json):
rule = alert_json.get("rule", {})
agent = alert_json.get("agent", {})
# Extracting MITRE information from the nested 'rule' structure
mitre = rule.get("mitre", {})
mitre_ids = ', '.join(mitre.get("id", ["N/A"]))
mitre_tactics = ', '.join(mitre.get("tactic", ["N/A"]))
mitre_techniques = ', '.join(mitre.get("technique", ["N/A"]))
details = [
f"Rule ID: {rule.get('id', 'N/A')}",
f"Rule Level: {rule.get('level', 'N/A')}",
f"Rule Description: {rule.get('description', 'N/A')}",
f"Agent ID: {agent.get('id', 'N/A')}",
f"Agent Name: {agent.get('name', 'N/A')}",
f"MITRE IDs: {mitre_ids}",
f"MITRE Tactics: {mitre_tactics}",
f"MITRE Techniques: {mitre_techniques}",
f"Location: {alert_json.get('location', 'N/A')}",
f"Full Log: {alert_json.get('full_log', 'N/A')}"
]
return '\n'.join(details)
def main():
# Read parameters when integration is run
if len(sys.argv) < 4:
logging.error("Insufficient arguments provided. Exiting.")
sys.exit(1)
alert_file = sys.argv[1]
api_key = sys.argv[2]
hook_url = sys.argv[3]
# Read the alert file
try:
with open(alert_file) as f:
alert_json = json.load(f)
except Exception as e:
logging.error(f"Failed to read alert file: {e}")
sys.exit(1)
# Prepare alert details
alert_details = format_alert_details(alert_json)
# Convert Wazuh rule levels(0-15) -> ntfy severity(1-6)
alert_level = alert_json.get("rule", {}).get("level", 0)
if alert_level < 5:
severity = 1
elif alert_level >= 5 and alert_level < 7:
severity = 2
elif alert_level >= 7 and alert_level < 10:
severity = 3
elif alert_level >= 10 and alert_level < 13:
severity = 4
elif alert_level >= 13:
severity = 5
else:
severity = 1
# Convert Wazuh rule levels(0-15) -> ntfy icons
alert_level = alert_json.get("rule", {}).get("level", 0)
if alert_level < 5:
tag = "love_you_gesture"
elif alert_level >= 5 and alert_level < 7:
tag = "+1"
elif alert_level >= 7 and alert_level < 10:
tag = "pinching_hand"
elif alert_level >= 10 and alert_level < 13:
tag = "-1"
elif alert_level >= 13:
tag = "middle_finger"
else:
tag = "+1"
# Generate request
payload = json.dumps({
"topic": "wazuh",
"title": alert_json.get("rule", {}).get("description", "No Description"),
"message": "Details:" alert_details,
"click": "
https://wazuhdashboard-0.dauntless.local/app/wz-home",
"priority": severity,
"tags": tag
})
# Send request to ntfy
try:
response =
requests.post(hook_url, data=payload, headers={"Authorization": "Bearer " + api_key, "content-type": "application/json"}, verify=False)
if response.status_code in [200, 201, 202, 204]:
logging.info(f"Sent alert to ntfy. Response status code: {response.status_code}")
else:
logging.error(f"Failed to send alert to ntfy. Response status code: {response.status_code}")
except Exception as e:
logging.error(f"Failed to send alert to ntfy: {e}")
sys.exit(1)
if __name__ == "__main__":
main()