help for iris - wazuh integration

22 views
Skip to first unread message

Alfian Syahputra

unread,
Nov 11, 2025, 8:04:36 AMNov 11
to Wazuh | Mailing List

I'm having an issue where the Rule Level is set to 7 on the dfir iris, but it's showing low data when it should be medium. Is there a wrong line of code? Please check my script.


#!/var/ossec/framework/python/bin/python3
# ============================================================================
#  Custom Wazuh → DFIR-IRIS Integration (Final Version)
#  Enhanced mapping for severity and event handling
#  Author: XXXX
#  Compatible with: Wazuh 4.7+ & DFIR-IRIS v2.x
# ============================================================================

import sys
import json
import requests
import logging
from datetime import datetime

# ----------------------------------------------------------------------------
# Logging configuration
# ----------------------------------------------------------------------------
logging.basicConfig(
    filename='/var/ossec/logs/integrations.log',
    level=logging.INFO,
    format='%(asctime)s %(levelname)s: %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S'
)

# ----------------------------------------------------------------------------
# Format alert details (for IRIS description)
# ----------------------------------------------------------------------------
def format_alert_details(alert_json):
    rule = alert_json.get("rule", {})
    agent = alert_json.get("agent", {})
    data = alert_json.get("data", {})

    # Extract MITRE info
    mitre = rule.get("mitre", {})
    mitre_ids = ', '.join(mitre.get("id", ["N/A"]))
    mitre_tactics = ', '.join(mitre.get("tactic", ["N/A"]))
    mitre_techniques = ', '.join(mitre.get("technique", ["N/A"]))

    # Extract network info if available
    src_ip = data.get("srcip", alert_json.get("srcip", "N/A"))
    dst_ip = data.get("dstip", alert_json.get("dstip", "N/A"))
    src_port = data.get("srcport", alert_json.get("srcport", "N/A"))
    dst_port = data.get("dstport", alert_json.get("dstport", "N/A"))
    protocol = data.get("protocol", alert_json.get("protocol", "N/A"))

    details = [
        f"Rule ID: {rule.get('id', 'N/A')}",
        f"Rule Level: {rule.get('level', 'N/A')}",
        f"Rule Description: {rule.get('description', 'N/A')}",
        f"Agent ID: {agent.get('id', 'N/A')}",
        f"Agent Name: {agent.get('name', 'N/A')}",
        f"Source IP: {src_ip}",
        f"Source Port: {src_port}",
        f"Destination IP: {dst_ip}",
        f"Destination Port: {dst_port}",
        f"Protocol: {protocol}",
        f"MITRE IDs: {mitre_ids}",
        f"MITRE Tactics: {mitre_tactics}",
        f"MITRE Techniques: {mitre_techniques}",
        f"Location: {alert_json.get('location', 'N/A')}",
        f"Full Log: {alert_json.get('full_log', 'N/A')}"
    ]

    return '\n'.join(details)

# ----------------------------------------------------------------------------
# Main logic
# ----------------------------------------------------------------------------
def main():
    # Validate arguments from Wazuh
    if len(sys.argv) < 4:
        logging.error("Insufficient arguments provided. Exiting.")
        sys.exit(1)

    alert_file = sys.argv[1]
    api_key = sys.argv[2]
    hook_url = sys.argv[3]

    # Read alert file
    try:
        with open(alert_file) as f:
            alert_json = json.load(f)
    except Exception as e:
        logging.error(f"Failed to read alert file: {e}")
        sys.exit(1)

    # ----------------------------------------------------------------------------
    # Prepare alert details
    # ----------------------------------------------------------------------------
    alert_details = format_alert_details(alert_json)

    # ----------------------------------------------------------------------------
    # Determine severity mapping (Wazuh level → IRIS severity)
    # ----------------------------------------------------------------------------
    try:
        alert_level = int(alert_json.get("rule", {}).get("level", 0))
    except (ValueError, TypeError):
        alert_level = 0

    syscheck_event = (
        alert_json.get("syscheck", {}).get("event", "")
        or alert_json.get("data", {}).get("event", "")
    ).lower()

    # Dynamic severity adjustment for FIM (added/modified/deleted)
    if syscheck_event == "added":
        severity = 3     # Low
    elif syscheck_event == "modified":
        severity = 4     # Medium
    elif syscheck_event == "deleted":
        severity = 5     # High
    else:
        # Mapping strictly follows Wazuh standard levels
        if alert_level < 7:
            severity = 3      # Low (0–6)
        elif alert_level < 12:
            severity = 4      # Medium (7–11)
        elif alert_level < 15:
            severity = 5      # High (12–14)
        else:
            severity = 6      # Critical (15+)

    # ----------------------------------------------------------------------------
    # Generate human-readable alert title
    # ----------------------------------------------------------------------------
    rule_desc = alert_json.get("rule", {}).get("description", "No Description")
    syscheck_path = alert_json.get("syscheck", {}).get("path", None)

    if syscheck_event and syscheck_path:
        alert_title = f"File {syscheck_event}: {syscheck_path}"
    else:
        alert_title = rule_desc

    # ----------------------------------------------------------------------------
    # Construct payload for DFIR-IRIS webhook
    # ----------------------------------------------------------------------------
    payload = json.dumps({
        "alert_title": alert_title,
        "alert_description": alert_details,
        "alert_source": "Wazuh",
        "alert_source_ref": alert_json.get("id", "Unknown ID"),
        "alert_source_link": "https://192.168.1.X/app/wz-home", 
        "alert_severity_id": severity,
        "alert_status_id": 2,  # New
        "alert_source_event_time": alert_json.get("timestamp", datetime.utcnow().isoformat()),
        "alert_note": "",
        "alert_tags": f"wazuh,{alert_json.get('agent', {}).get('name', 'N/A')},{syscheck_event or 'general'}",
        "alert_customer_id": 1,  # Default: IrisInitialClient
        "alert_source_content": alert_json
    })

    # ----------------------------------------------------------------------------
    # Send alert to DFIR-IRIS
    # ----------------------------------------------------------------------------
    try:
        response = requests.post(
            hook_url,
            data=payload,
            headers={
                "Authorization": f"Bearer {api_key}",
                "Content-Type": "application/json"
            },
            verify=False
        )

        if response.status_code in [200, 201, 202, 204]:
            logging.info(f"✅ Sent alert '{alert_title}' to IRIS. Code: {response.status_code}")
        else:
            logging.error(f"❌ Failed to send alert to IRIS. HTTP {response.status_code}: {response.text}")
    except Exception as e:
        logging.error(f"❌ Error sending alert to IRIS: {e}")
        sys.exit(1)

# ----------------------------------------------------------------------------
# Entry point
# ----------------------------------------------------------------------------
if __name__ == "__main__":
    main()
dfir-iris.png

Olamilekan Abdullateef Ajani

unread,
Nov 11, 2025, 8:35:08 AMNov 11
to Wazuh | Mailing List
Hello,

I reviewed your script and the rule logic and it looks fine. Your comparison uses “less than” (<), not “less than or equal to” (<=). So when alert_level == 7, the python script skips the first branch (since 7 < 7 is false), enters the next (< 12), and actually assigns severity = 4, which is correct. So based on this, the problem seems to be from how DFIR IRIS categorizes alerts on its own. I looked it up online and found out some could equate 4 to Low, 5 to High and 6 to Critical.

I think you need to review the alert level category on the DFIR platform and map the information there to your script so it reflects your intentions appropriately.

Regards,





Reply all
Reply to author
Forward
0 new messages