I'm having an issue where the Rule Level is set to 7 on the dfir iris, but it's showing low data when it should be medium. Is there a wrong line of code? Please check my script.
#!/var/ossec/framework/python/bin/python3
# ============================================================================
# Custom Wazuh → DFIR-IRIS Integration (Final Version)
# Enhanced mapping for severity and event handling
# Author: XXXX
# Compatible with: Wazuh 4.7+ & DFIR-IRIS v2.x
# ============================================================================
import sys
import json
import requests
import logging
from datetime import datetime
# ----------------------------------------------------------------------------
# Logging configuration
# ----------------------------------------------------------------------------
logging.basicConfig(
filename='/var/ossec/logs/integrations.log',
level=logging.INFO,
format='%(asctime)s %(levelname)s: %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
# ----------------------------------------------------------------------------
# Format alert details (for IRIS description)
# ----------------------------------------------------------------------------
def format_alert_details(alert_json):
rule = alert_json.get("rule", {})
agent = alert_json.get("agent", {})
data = alert_json.get("data", {})
# Extract MITRE info
mitre = rule.get("mitre", {})
mitre_ids = ', '.join(mitre.get("id", ["N/A"]))
mitre_tactics = ', '.join(mitre.get("tactic", ["N/A"]))
mitre_techniques = ', '.join(mitre.get("technique", ["N/A"]))
# Extract network info if available
src_ip = data.get("srcip", alert_json.get("srcip", "N/A"))
dst_ip = data.get("dstip", alert_json.get("dstip", "N/A"))
src_port = data.get("srcport", alert_json.get("srcport", "N/A"))
dst_port = data.get("dstport", alert_json.get("dstport", "N/A"))
protocol = data.get("protocol", alert_json.get("protocol", "N/A"))
details = [
f"Rule ID: {rule.get('id', 'N/A')}",
f"Rule Level: {rule.get('level', 'N/A')}",
f"Rule Description: {rule.get('description', 'N/A')}",
f"Agent ID: {agent.get('id', 'N/A')}",
f"Agent Name: {agent.get('name', 'N/A')}",
f"Source IP: {src_ip}",
f"Source Port: {src_port}",
f"Destination IP: {dst_ip}",
f"Destination Port: {dst_port}",
f"Protocol: {protocol}",
f"MITRE IDs: {mitre_ids}",
f"MITRE Tactics: {mitre_tactics}",
f"MITRE Techniques: {mitre_techniques}",
f"Location: {alert_json.get('location', 'N/A')}",
f"Full Log: {alert_json.get('full_log', 'N/A')}"
]
return '\n'.join(details)
# ----------------------------------------------------------------------------
# Main logic
# ----------------------------------------------------------------------------
def main():
# Validate arguments from Wazuh
if len(sys.argv) < 4:
logging.error("Insufficient arguments provided. Exiting.")
sys.exit(1)
alert_file = sys.argv[1]
api_key = sys.argv[2]
hook_url = sys.argv[3]
# Read alert file
try:
with open(alert_file) as f:
alert_json = json.load(f)
except Exception as e:
logging.error(f"Failed to read alert file: {e}")
sys.exit(1)
# ----------------------------------------------------------------------------
# Prepare alert details
# ----------------------------------------------------------------------------
alert_details = format_alert_details(alert_json)
# ----------------------------------------------------------------------------
# Determine severity mapping (Wazuh level → IRIS severity)
# ----------------------------------------------------------------------------
try:
alert_level = int(alert_json.get("rule", {}).get("level", 0))
except (ValueError, TypeError):
alert_level = 0
syscheck_event = (
alert_json.get("syscheck", {}).get("event", "")
or alert_json.get("data", {}).get("event", "")
).lower()
# Dynamic severity adjustment for FIM (added/modified/deleted)
if syscheck_event == "added":
severity = 3 # Low
elif syscheck_event == "modified":
severity = 4 # Medium
elif syscheck_event == "deleted":
severity = 5 # High
else:
# Mapping strictly follows Wazuh standard levels
if alert_level < 7:
severity = 3 # Low (0–6)
elif alert_level < 12:
severity = 4 # Medium (7–11)
elif alert_level < 15:
severity = 5 # High (12–14)
else:
severity = 6 # Critical (15+)
# ----------------------------------------------------------------------------
# Generate human-readable alert title
# ----------------------------------------------------------------------------
rule_desc = alert_json.get("rule", {}).get("description", "No Description")
syscheck_path = alert_json.get("syscheck", {}).get("path", None)
if syscheck_event and syscheck_path:
alert_title = f"File {syscheck_event}: {syscheck_path}"
else:
alert_title = rule_desc
# ----------------------------------------------------------------------------
# Construct payload for DFIR-IRIS webhook
# ----------------------------------------------------------------------------
payload = json.dumps({
"alert_title": alert_title,
"alert_description": alert_details,
"alert_source": "Wazuh",
"alert_source_ref": alert_json.get("id", "Unknown ID"),
"alert_source_link": "
https://192.168.1.X/app/wz-home",
"alert_severity_id": severity,
"alert_status_id": 2, # New
"alert_source_event_time": alert_json.get("timestamp", datetime.utcnow().isoformat()),
"alert_note": "",
"alert_tags": f"wazuh,{alert_json.get('agent', {}).get('name', 'N/A')},{syscheck_event or 'general'}",
"alert_customer_id": 1, # Default: IrisInitialClient
"alert_source_content": alert_json
})
# ----------------------------------------------------------------------------
# Send alert to DFIR-IRIS
# ----------------------------------------------------------------------------
try:
response =
requests.post(
hook_url,
data=payload,
headers={
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
},
verify=False
)
if response.status_code in [200, 201, 202, 204]:
logging.info(f"✅ Sent alert '{alert_title}' to IRIS. Code: {response.status_code}")
else:
logging.error(f"❌ Failed to send alert to IRIS. HTTP {response.status_code}: {response.text}")
except Exception as e:
logging.error(f"❌ Error sending alert to IRIS: {e}")
sys.exit(1)
# ----------------------------------------------------------------------------
# Entry point
# ----------------------------------------------------------------------------
if __name__ == "__main__":
main()