Wazuh Integration with MISP (Threat Intelligence Feed)

101 views
Skip to first unread message

3

unread,
Dec 1, 2025, 1:55:10 AMDec 1
to Wazuh | Mailing List
Hello Wazuh Team,
I need assistance regarding the integration of Wazuh with MISP (Threat Feed).

I am currently working on project where wazuh gets attacks data from Honeypot, then 
Wazuh has to forward these logs and data to MISP so they can be displayed in the MISP dashboard.
What are the possible ways of accomplishing this integration? (Honeypot is already integrated with Wazuh, but MISP is not, so How can I integrate Wazuh with MISP?)

Md. Nazmur Sakib

unread,
Dec 1, 2025, 2:37:53 AMDec 1
to Wazuh | Mailing List

Hello,
You can integrate MISP to coress check your data from the alerts with MISP IOCs once the alerts are on the Wazuh manager, and triggers a new alert if there is a match. 2025-12-01 13 36 01.png
You can follow this document for integrating Wazuh with MISP
External API integration


I am also sharing some reference documents to integrate Wazuh with MISP.
https://github.com/wazuh/integrations/tree/main/integrations/misp

https://opensecure.medium.com/wazuh-and-misp-integration-242dfa2f2e19

Let me know if you need further assistance on this.

3

unread,
Dec 2, 2025, 8:58:56 AMDec 2
to Wazuh | Mailing List

However, this guide is for Windows (as it uses Sysmon).
If possible, is there a way of integrating this guide (Wazuh + MISP), but with T-Pot as Wazuh agent?

Md. Nazmur Sakib

unread,
Dec 4, 2025, 2:09:13 AMDec 4
to Wazuh | Mailing List
Yes, possible. The Sysmon event 22 was an example use case.

The script I have shared previously looks for these fields with the MISP IOC database.

SUPPORTED_KEYS = [
    ("ip_src", ["src_ip", "source_ip", "srcip", "SourceIP", "aws.source_ip_address", "client_ip", "clientIP_s", "IPAddress", "originalHost_s", "CallerIPAddress"]),
    ("ip_dst", ["dst_ip", "destination_ip", "dstip", "DestinationIP", "remote_ip", "external_ip"]),
    ("sha1",   ["sha1", "sha1sum", "file_sha1", "ciscoendpoint.file.identity.sha1"]),
    ("sha256", ["sha256", "sha256sum", "file_sha256", "ciscoendpoint.file.identity.sha256"]),
    ("md5",    ["md5", "md5sum", "file_md5", "ciscoendpoint.file.identity.md5"]),
    ("url",    ["url", "source_url", "TargetURL", "download_url", "http_url"]),
    ("domain", ["domain", "hostname", "base_domain", "fqdn", "TargetDestination", "Fqdn_s", "win.eventdata.queryName"]),




If your alert has these fields, the only thing you need is to configure the <integration> block to forward the alerts to the MISP you want.

<integration>
------------------
  <!-- Optional filters -->
  <rule_id> </rule_id>
  <level> </level>
  <group> </group>
  <event_location> </event_location>
  <options> </options>
</integration>


Ref: Optional filters


If you need further customisations, you can use the custom script of the MISP I have shared previously and follow this document to make a custom script as per your needs. 

Creating an integration script

Let me know if you need any further information.

3

unread,
Dec 14, 2025, 5:17:19 PM (7 days ago) Dec 14
to Wazuh | Mailing List
I am trying to integrate MISP with Wazuh, using T-Pot as the honeypot agent, (all instances using Ubuntu) (3 EC2 instances), but I’m facing several challenges:
  • Most tutorials assume Sysmon, which is not available in my setup.
  • All T-Pot sensors are already generating alerts and these are visible on the Wazuh dashboard.
  •  I cannot figure out how to trigger custom-misp.py automatically when attacks occur to validate hashes.
My goal is for Wazuh alerts to automatically interact with MISP to:
  • Validate attacks using existing threat intelligence feeds.
  • Enrich the feeds by adding new indicators from detected attacks, all without creating duplicates.

I need a step-by-step approach to accomplish this integration from scratch, including automatic detection, hash validation, and event creation/enrichment in MISP.

Md. Nazmur Sakib

unread,
Dec 17, 2025, 11:19:29 PM (4 days ago) Dec 17
to Wazuh | Mailing List

You only need to configure the <integration> block to forward the alert's to the script. Next, the script will forward the Key fields from the alerts to MISP, and if there is any detection misp will return the detection result as a log file, and based on the MISP custom rules in Wazuh, alerts will trigger for detection. Keep in mind that if you do not have a match for the field value with the MISP IOC database, there will not be any alert from MISP.


Have you added these custom misp rules to your custom rule file?

Go to Server Management > Rules > Add New Rule file. Name it custom_misp_rules.xml

<group name="misp,">

  <rule id="100620" level="10">

    <decoded_as>json</decoded_as>

    <field name="integration">misp</field>

    <description>Default rule for MISP Events</description>

    <!--options>no_full_log</options-->

  </rule>

  <rule id="100621" level="12">

    <if_sid>100620</if_sid>

    <field name="threat">MISP error</field>

    <description>MISP error</description>

  </rule>

  <rule id="100622" level="12">

    <if_sid>100620</if_sid>

    <field name="ioc.domain">\.+</field>

    <description>MISP - IoC found in Threat Intel - Attribute: domain $(ioc.domain) - Event: $(misp_response.domain.event_info)</description>

  </rule>

  <rule id="100623" level="12">

    <if_sid>100620</if_sid>

    <field name="ioc.sha256">\.+</field>

    <description>MISP - IoC found in Threat Intel - Attribute: hash $(ioc.sha256) - Event: $(misp_response.sha256.event_info)</description>

  </rule>

  <rule id="100624" level="12">

    <if_sid>100620</if_sid>

    <field name="ioc.sha1">\.+</field>

    <description>MISP - IoC found in Threat Intel - Attribute: hash $(ioc.sha1) - Event: $(misp_response.sha1.event_info)</description>

  </rule>

</group>



You need to follow the document step by step. You can skip the Sysmon configuration part and the Sysmon custom rules, and the rest will be the same.
Did you made adjustment to the script based on your alerts filed?

If you still face issues, share the T-Pot sensors alert details and your customized MISP script both in text format. Mention the field you want to compare with the MISP ioc. I am not very experienced at coding, but I will try to review the changes you have made in your script to the best I can.
You can find the alert in the Wazuh manager server in the log file (/var/ossec/logs/alerts/alerts.json). cat /var/ossec/logs/alerts/alerts.json | grep <rule_id>
Looking forward to your update.

3

unread,
Dec 19, 2025, 4:21:03 AM (3 days ago) Dec 19
to Wazuh | Mailing List
I managed to make the system work according to this guide:
https://github.com/wazuh/integrations/tree/main/integrations/misp

However, it is not working as wanted and there are some issues.
My goal is to feed the IoCs from Wazuh to MISP.

So my final goal is to visuallize and make a dashboard with MISP and view
The IPs and info of attackers within MISP dashboard.

The above guide is for viewing the MISP response logs in Wazuh dashboard.
(So Wazuh will connect send the logs to MISP and misp will reply with either log that
contains MISP No threat match / MISP Threat match)

Some notes I would like to mention:

- To make the misp reply to wazuh I needed to create an event and add
attributes (and then add my ip manually as value)

- Also it seems that only Cowrie works with that setup.

- I need to make Wazuh connect to MISP and add the IP addresses as values for the event's
attributes (automatically).



Below are necessary configuration files -->

custom-misp.py:

#!/var/ossec/framework/python/bin/python3

import os
import sys
import json
import logging
import asyncio
import httpx
from typing import Any, Dict, List, Optional, Tuple
from socket import AF_UNIX, SOCK_DGRAM, socket
from pathlib import Path
from logging.handlers import RotatingFileHandler

# Global error codes for exit statuses
ERR_NO_RESPONSE_MISP = 10
ERR_SOCKET_OPERATION = 11
ERR_INVALID_JSON = 12
ERR_BAD_ARGUMENTS = 13

# Service configuration constants
SERVICE_NAME = "wazuh-misp-integration"
LOG_DIR = Path("/var/log/wazuh-misp")
LOG_FILE = LOG_DIR / "integrations.log"
SOCKET_ADDR = "/var/ossec/queue/sockets/queue"
QUEUE_FILE_PATH = LOG_DIR / "wazuh-retry-queue"
QUEUE_FILE = QUEUE_FILE_PATH / "misp_queue.json"
QUEUE_TMP = QUEUE_FILE.with_suffix(".inprocess")
FAILED_MISP_ALERTS_DIR = LOG_DIR / "misp-failed-enrichment"

# These will be populated from CLI args or options file
MISP_BASE_URL = "https://{IP_Address}"
MISP_API_KEY = "{API Key}"
VERIFY_SSL = False

# Supported IOC keys and their possible field names in Wazuh alerts

SUPPORTED_KEYS = [
    ("ip_src", ["src_ip", "source_ip", "srcip", "SourceIP", "aws.source_ip_address", "client_ip", "clientIP_s", "IPAddress", "originalHost_s", "CallerIPAddress"]),
    ("ip_dst", ["dst_ip", "destination_ip", "dstip", "DestinationIP", "remote_ip", "external_ip"]),
    ("sha1",   ["sha1", "sha1sum", "file_sha1", "ciscoendpoint.file.identity.sha1"]),
    ("sha256", ["sha256", "sha256sum", "file_sha256", "ciscoendpoint.file.identity.sha256"]),
    ("md5",    ["md5", "md5sum", "file_md5", "ciscoendpoint.file.identity.md5"]),
    ("url",    ["url", "source_url", "TargetURL", "download_url", "http_url"]),
    ("domain", ["domain", "hostname", "base_domain", "fqdn", "TargetDestination", "Fqdn_s", "win.eventdata.queryName"]),
]

# -------------------- Logging Setup ---------------------

def create_app_logger() -> logging.Logger:
    """
    Create and configure a JSON or plain-text logger for the service.
    Honors LOG_FORMAT environment variable ("json" or "plain").
    """
    logger = logging.getLogger(SERVICE_NAME)
    logger.setLevel(logging.INFO)

    # Remove existing handlers
    for h in list(logger.handlers):
        logger.removeHandler(h)

    # Plain formatter for console and file
    plain_formatter = logging.Formatter("%(asctime)s [%(levelname)s] %(name)s: %(message)s")

    # Console handler
    stream_handler = logging.StreamHandler()
    stream_handler.setFormatter(plain_formatter)
    logger.addHandler(stream_handler)

    # Ensure log directory exists
    log_dir = os.path.dirname(LOG_FILE)
    os.makedirs(log_dir, exist_ok=True)

    # 10MB per file, 5 backups (total: ~500MB)
    file_handler = RotatingFileHandler(LOG_FILE, maxBytes=10_000_000, backupCount=5, encoding="utf-8")
    file_handler.setFormatter(plain_formatter)
    logger.addHandler(file_handler)

    logger.propagate = False
    return logger


logger = create_app_logger()

# -------------------- IOC Extraction ---------------------

def extract_all_iocs(alert: Dict[str, Any]) -> Dict[str, Any]:
    """
    Given a Wazuh alert, extract all supported IOCs (IP, hash, URL, domain).
    Returns a mapping of normalized IOC keys to their values.
    """
    def get_nested_value(data: dict, key_path: str) -> Optional[Any]:
        # Traverse nested dict by dot-delimited path
        keys = key_path.split(".")
        val = data
        for key in keys:
            if isinstance(val, dict) and key in val:
                val = val[key]
            else:
                return None
        return val

    data = alert.get("data", {})
    syscheck = alert.get("syscheck", {})
    iocs: Dict[str, Any] = {}

    # Iterate supported key sets; pick first matching candidate
    for out_key, candidates in SUPPORTED_KEYS:
        for candidate in candidates:
            val = get_nested_value(data, candidate)
            # If not found in data, check syscheck for "<candidate>_after"
            if val is None and "." not in candidate:
                val = syscheck.get(f"{candidate}_after")
            if val:
                iocs[out_key] = val
                break
    return iocs

# -------------------- MISP Async Query ---------------------

async def misp_fetch(client: httpx.AsyncClient, value: str, sem: asyncio.Semaphore, misp_base_url: str, misp_api_key: str, verify_ssl: bool) -> Tuple[str, Optional[Dict[str, Any]]]:
    """
    Perform a REST search in MISP for a single IOC value.
    Retries up to 3 times on error or timeout. Returns tuple (value, response_data).
    """
    url = f"{misp_base_url.rstrip('/')}/attributes/restSearch/"
    headers = {
        "Content-Type": "application/json",
        "Authorization": misp_api_key,
        "Accept": "application/json",
    }
    payload = {
        "value": value,
        "returnFormat": "json",
        "includeContext": True,
        "includeEventTags": True,
        "includeAttributeTags": True,
        "includeSightings": True,
    }

    async with sem:
        for attempt in range(1, 4):
            try:
                resp = await client.post(url, headers=headers, json=payload, timeout=10.0)
                if resp.status_code == 200:
                    data = resp.json()
                    logger.info(f"MISP query successful for '{value}', status {resp.status_code}")
                    return value, data
                else:
                    logger.error(f"MISP responded with error status {resp.status_code} for IOC value {value}")
                    try:
                        error_data = resp.json()
                    except json.JSONDecodeError:
                        error_data = {"message": resp.text}
                    return value, {"error": {"status": resp.status_code, "data": error_data}}
            except httpx.TimeoutException:
                logger.warning(f"Timeout on attempt {attempt} for IOC '{value}', retrying...")
            except Exception as e:
                logger.warning(f"Request exception on attempt {attempt} for IOC '{value}': {e}")
            await asyncio.sleep(2**attempt)

        # All retries failed
        logger.error(f"Failed to fetch MISP data for IOC '{value}' after retries")
        return value, None


def save_failed_misp_alert(alert: dict) -> None:
    """
    Save an alert for retry if MISP is unreachable.
    """
    try:
        if not FAILED_MISP_ALERTS_DIR.exists():
            FAILED_MISP_ALERTS_DIR.mkdir(parents=True, exist_ok=True)
            os.chmod(FAILED_MISP_ALERTS_DIR, 0o750)
        alert_id = alert.get("id", "unknown")
        sanitized_alert_id = alert_id.replace(".", "_")
        fname = FAILED_MISP_ALERTS_DIR / f"alert_{sanitized_alert_id}.json"
        with open(fname, "w", encoding="utf-8") as f:
            json.dump(alert, f, separators=(",", ":"))
        logger.warning(f"Alert {alert_id} saved for retry due to MISP unresponsiveness")
    except Exception as e:
        logger.error(f"Failed to store failed alert to retry folder: {e}")


def process_failed_misp_alerts(misp_base_url, misp_api_key, verify_ssl):
    """
    Retry enriching all alerts in the failed alerts directory.
    Successfully reprocessed alerts are deleted.
    """
    def misp_is_reachable(misp_base_url, misp_api_key, verify_ssl):
        """Quickly check if MISP API responds to a simple authenticated request using httpx."""
        try:
            url = f"{misp_base_url.rstrip('/')}/servers/getVersion.json"
            headers = {"Authorization": misp_api_key}
            resp = httpx.get(url, headers=headers, timeout=5.0, verify=verify_ssl)
            return resp.status_code == 200
        except Exception as e:
            logger.warning(f"MISP healthcheck failed: {e}")
            return False

    failed_dir = FAILED_MISP_ALERTS_DIR
    if not failed_dir.exists():
        return

    # Check if MISP is up before retrying
    if not misp_is_reachable(misp_base_url, misp_api_key, verify_ssl):
        logger.warning("Skipping MISP failed alert retries because MISP is still unreachable.")
        return

    for alert_file in list(failed_dir.glob("alert_*.json")):
        try:
            with open(alert_file, "r", encoding="utf-8") as f:
                alert = json.load(f)
            asyncio.run(process_alerts([alert], misp_base_url, misp_api_key, verify_ssl))
            alert_file.unlink()  # Remove file on success
            logger.info(f"Successfully reprocessed failed alert {alert_file.name}, removed from retry queue.")
        except Exception as e:
            logger.error(f"Failed to reprocess {alert_file.name}: {e}")
            # Keep the file for a future retry

# -------------------- Sending enriched event ---------------------

def send_event(msg: Dict[str, Any], agent: Optional[Dict[str, Any]] = None) -> None:
    """
    Send the enriched alert JSON to Wazuh via UNIX datagram socket.
    On failure, queue the event for retry.
    """
    try:
        line = json.dumps(msg, separators=(",", ":"))
        # Format socket message with optional agent context
        if not agent or agent.get("id") == "000":
            string = f"1:misp:{line}"
        else:
            string = f"1:[{agent['id']}] ({agent['name']}) {agent.get('ip','any')}->misp:{line}"

        with socket(AF_UNIX, SOCK_DGRAM) as sock:
            sock.connect(SOCKET_ADDR)
            sock.send(string.encode())
        logger.info("Sent enriched event to Wazuh socket")
    except FileNotFoundError:
        logger.error(f"Socket file not found at {SOCKET_ADDR}, queuing event for retry")
        save_to_queue(msg)
    except Exception as e:
        logger.error(f"Failed to send event: {e}. Event queued for retry.")
        save_to_queue(msg)

def save_to_queue(event: Dict[str, Any]) -> None:
    """
    Append failed events to a local file for later retry.
    """
    try:
        if not QUEUE_FILE_PATH.exists():
            QUEUE_FILE_PATH.mkdir(parents=True, exist_ok=True)
            os.chmod(QUEUE_FILE_PATH, 0o750)
        with open(QUEUE_FILE, "a", encoding="utf-8") as f:
            f.write(json.dumps(event, separators=(",", ":")) + "\n")
        logger.warning("Event saved to retry queue")
    except Exception as e:
        logger.error(f"Failed to write event to retry queue: {e}")

# -------------------- Main Async Alert Processing ---------------------

async def process_alerts(alerts: List[Dict[str, Any]], misp_base_url: str, misp_api_key: str, verify_ssl: bool) -> None:
    """
    Extract IOCs from each alert, de-duplicate values, query MISP concurrently,
    then enrich and send each alert with aggregated IOC results.
    """

    def filtered_alert(alert: dict) -> dict:
        return {
            "id": alert.get("id"),
            "manager.name": alert.get("manager", {}).get("name"),
            "rule.groups": alert.get("rule", {}).get("groups"),
            "rule.id": alert.get("rule", {}).get("id"),
            "rule.level": alert.get("rule", {}).get("level"),
            "timestamp": alert.get("timestamp"),
            "rule.description": alert.get("rule", {}).get("description"),
        }

    if not alerts:
        logger.info("No alerts to process")
        return

    sem = asyncio.Semaphore(10)
    async with httpx.AsyncClient(verify=verify_ssl, timeout=10.0) as session:
        alerts_iocs_list: List[Dict[str, Any]] = []
        all_iocs_values: set = set()

        # Extract IOCs and build unique set
        for alert in alerts:
            iocs = extract_all_iocs(alert)
            alerts_iocs_list.append(iocs)
            all_iocs_values.update(iocs.values())

        # Query MISP for each unique IOC
        tasks = [
            misp_fetch(session, val, sem, misp_base_url, misp_api_key, verify_ssl)
            for val in all_iocs_values
        ]
        results = await asyncio.gather(*tasks)

        # Map IOC value to its MISP response
        global_misp_results = {val: data for val, data in results}

    # Enrich each alert with MISP data
    for alert, iocs in zip(alerts, alerts_iocs_list):
        alert_id = alert.get("id", "unknown")  # Use 'unknown' if 'id' is missing
        if not iocs:
            logger.warning(f"No IOCs extracted from alert (alert_id={alert_id}), skipping enrichment")
            continue

        all_failed = all(global_misp_results.get(val) is None for val in iocs.values())
        if iocs and all_failed:
            save_failed_misp_alert(alert)
            logger.warning(f"MISP unreachable for all IOCs in alert_id={alert_id}. Alert queued for later enrichment.")
            continue

        # Reverse index: value → keys it came from
        value_to_keys: Dict[str, List[str]] = {}
        for key, val in iocs.items():
            value_to_keys.setdefault(val, []).append(key)

        enrichment_iocs: Dict[str, Any] = {}
        result_flags: Dict[str, bool] = {}
        misp_response: Dict[str, Dict[str, Any]] = {}
        misp_error_response: Dict[str, Any] = {}

        # Populate enrichment fields per key
        for value, keys in value_to_keys.items():
            data = global_misp_results.get(value)
            matched = False
            attr_info = None

            if data is None:
                logger.info(f"No response from MISP for IOC value '{value}'")
            elif "error" in data:
                logger.error(f"MISP error for IOC value '{value}': {data['error']}")
                misp_error_response = {
                    "status": data["error"]["status"],
                    "name": data["error"]["data"].get("name", ""),
                    "message": data["error"]["data"].get("message", ""),
                    "url": data["error"]["data"].get("url", "")
                }
            else:
                misp_attrs = data.get("response", {}).get("Attribute")
                if misp_attrs and len(misp_attrs) > 0:
                    attr = misp_attrs[0]
                    event = attr.get("Event", {})
                    if value == attr.get("value"):
                        matched = True
                        attr_info = {
                            "value": attr.get("value"),
                            "comment": attr.get("comment"),
                            "uuid": attr.get("uuid"),
                            "timestamp": attr.get("timestamp"),
                            "event_id": attr.get("event_id"),
                            "event_org_id": event.get("org_id"),
                            "event_info": event.get("info"),
                            "threat_level_id": event.get("threat_level_id"),
                        }

            for key in keys:
                enrichment_iocs[key] = value
                result_flags[f"{key}_misp"] = matched
                misp_response[key] = attr_info if matched else {}

            if matched:
                logger.info(f"MISP match found for IOC value '{value}'")
            elif data and "error" not in data:
                logger.info(f"No MISP match for IOC value '{value}'")

        # Build final enriched event payload
        enrichment = {
            "ioc": enrichment_iocs,
            "result_flags": result_flags,
            "misp_response": misp_response,
            "original_alert": filtered_alert(alert),
        }
        if misp_error_response:
            enrichment["misp_error_response"] = misp_error_response


        enriched_event = {
            **enrichment,
            "integration": "misp",
            "threat": "MISP match" if any(result_flags.values()) else "MISP no match"
        }

        send_event(enriched_event)

def process_queue() -> None:
    """
    Read queued events from disk, attempt to resend them, and handle failures.
    """
    if not QUEUE_FILE.exists():
        return

    try:
        QUEUE_FILE.rename(QUEUE_TMP)
    except Exception as e:
        logger.error(f"Failed to rename queue file: {e}")
        return

    failed: List[str] = []
    with open(QUEUE_TMP, "r", encoding="utf-8") as f:
        for line in f:
            try:
                alert = json.loads(line)
                asyncio.run(process_alerts([alert], MISP_BASE_URL, MISP_API_KEY, VERIFY_SSL))
            except Exception as e:
                logger.error(f"Error processing queued event: {e}")
                failed.append(line)

    # Restore any still-failed events or clean up
    if failed:
        try:
            with open(QUEUE_FILE, "w", encoding="utf-8") as f:
                f.writelines(failed)
        except Exception as e:
            logger.error(f"Failed to restore failed events to queue: {e}")
    else:
        try:
            QUEUE_TMP.unlink()
        except Exception as e:
            logger.error(f"Failed to delete temporary queue file: {e}")

def main():
    """
    Entry point: parse CLI arguments, load options, process any queued events,
    then read and process incoming alerts.
    """
    global MISP_BASE_URL, MISP_API_KEY, VERIFY_SSL

    try:
        alert_file = sys.argv[1]
        MISP_API_KEY = sys.argv[2]
        MISP_BASE_URL = sys.argv[3]
        options_path = sys.argv[5] if len(sys.argv) > 5 else ""
        VERIFY_SSL = False
        if options_path and os.path.isfile(options_path):
            with open(options_path, "r", encoding="utf-8") as f:
                options = json.load(f)
                VERIFY_SSL = (str(options.get("misp_verify_ssl", "false")).strip().lower() == "true")
    except Exception as e:
        logger.error(f"Failed to parse Wazuh integration arguments: {e}")
        sys.exit(ERR_INVALID_JSON)

    # First, retry any queued events
    process_queue()

    # Then, retry failed MISP enrichment for alerts (if any)
    process_failed_misp_alerts(MISP_BASE_URL, MISP_API_KEY, VERIFY_SSL)

    # Load alerts from file or stdin
    alerts: List[Dict[str, Any]] = []
    try:
        if alert_file == "-":
            content = sys.stdin.read()
            parsed = json.loads(content)
            alerts = [parsed] if isinstance(parsed, dict) else parsed
        else:
            with open(alert_file, "r", encoding="utf-8") as f:
                for line in f:
                    line = line.strip()
                    if line:
                        alerts.append(json.loads(line))
    except Exception as e:
        logger.error(f"Failed to parse alert input: {e}")
        sys.exit(ERR_INVALID_JSON)

    # Process the loaded alerts
    asyncio.run(process_alerts(alerts, MISP_BASE_URL, MISP_API_KEY, VERIFY_SSL))

if __name__ == "__main__":
    main()
local_rules.xml:

<group name="cowrie,">


  <rule id="90000" level="0" >
    <decoded_as>json</decoded_as>
    <field name="eventid">^cowrie</field>
    <description>cowrie messages grouped.</description>
  </rule>

  <rule id="90005" level="3">
    <if_sid>90000</if_sid>
    <match>cowrie.login.failed</match>
    <description>cowrie login failed</description>
    <group>cowrie,</group>
  </rule>

  <rule id="90010" level="3">
    <if_sid>90000</if_sid>
    <match>cowrie.login.success</match>
    <description>cowrie login success</description>
    <group>cowrie,</group>
  </rule>

  <rule id="90015" level="5">
    <if_sid>90000</if_sid>
    <match>cowrie.command.input</match>
    <description>cowrie command input</description>
    <group>cowrie,</group>
  </rule>

  <rule id="90020" level="3">
    <if_sid>90000</if_sid>
    <match>cowrie.client.version</match>
    <description>cowrie client version</description>
    <group>cowrie,</group>
  </rule>


</group>


<group name="dionaea,">
  <rule id="110001" level="8">
    <decoded_as>dionaea-custom</decoded_as>
    <description>Dionaea Honeypot Activity Detected</description>
  </rule>
</group>

<group name="sentrypeer,">
  <rule id="120001" level="8">
    <decoded_as>sentrypeer-custom</decoded_as>
    <description>Sentrypeer Honeypot Activity Detected - Protocol: SIP</description>
  </rule>
</group>

<group name="Tanner,">
  <rule id="130001" level="8">
    <decoded_as>tanner-custom</decoded_as>
    <description>Tanner Honeypot Activity Detected - Protocol: HTTP</description>
  </rule>
</group>

<group name="adbhoney,">
  <rule id="140001" level="8">
    <decoded_as>adbhoney-custom</decoded_as>
    <description>ADBHoney Honeypot Activity Detected - Protocol: ADB</description>
  </rule>
</group>

<group name="mailoney,">
  <rule id="150001" level="8">
    <decoded_as>mailoney-custom</decoded_as>
    <description>Mailoney Honeypot Activity Detected - Service: Email</description>
  </rule>
</group>

<group name="honeytrap,">
  <rule id="160001" level="8">
    <decoded_as>honeytrap-custom</decoded_as>
    <description>Honeytrap Honeypot Activity Detected</description>
  </rule>
</group>

<group name="h0neytr4p,">
  <rule id="170001" level="8">
    <decoded_as>h0neytr4p-custom</decoded_as>
    <description>H0neytr4p Honeypot Activity Detected</description>
  </rule>
</group>

<group name="heralding,">
  <rule id="199001" level="8">
    <decoded_as>heralding-custom</decoded_as>
    <description>Heralding Honeypot Activity Detected</description>
  </rule>
</group>

<group name="miniprint,">
  <rule id="167901" level="8">
    <decoded_as>miniprint-custom</decoded_as>
    <description>Miniprint Honeypot Activity Detected</description>
  </rule>
</group>

<group name="elasticpot,">
  <rule id="167011" level="8">
    <decoded_as>elasticpot-custom</decoded_as>
    <description>Elasticpot Honeypot Activity Detected</description>
  </rule>
</group>

<group name="medpot,">
  <rule id="167411" level="8">
    <decoded_as>medpot-custom</decoded_as>
    <description>Medpot Honeypot Activity Detected</description>
  </rule>
</group>

<group name="ipphoney,">
  <rule id="164411" level="8">
    <decoded_as>ipphoney-custom</decoded_as>
    <description>ipphoney Honeypot Activity Detected</description>
  </rule>
</group>

<group name="redishoneypot,">
  <rule id="166611" level="8">
    <decoded_as>redishoneypot-custom</decoded_as>
    <description>redishoneypot Honeypot Activity Detected</description>
  </rule>
</group>



local_decoder.xml:

<!-- Local Decoders -->

<!-- Modify it at your will. -->
<!-- Copyright (C) 2015, Wazuh Inc. -->

<!--
  - Allowed static fields:
  - location   - where the log came from (only on FTS)
  - srcuser    - extracts the source username
  - dstuser    - extracts the destination (target) username
  - user       - an alias to dstuser (only one of the two can be used)
  - srcip      - source ip
  - dstip      - dst ip
  - srcport    - source port
  - dstport    - destination port
  - protocol   - protocol
  - id         - event id
  - url        - url of the event
  - action     - event action (deny, drop, accept, etc)
  - status     - event status (success, failure, etc)
  - extra_data - Any extra data
-->


<decoder name="dionaea-custom">
   <program_name>docker</program_name>
  <prematch>dionaea</prematch>
</decoder>
<decoder name="dionaea-child">
  <parent>dionaea-custom</parent>
  <regex type="pcre2">"protocol":"([^"]+)","transport":"([^"]+)","type":"([^"]+)","src_ip":"([^"]+)","src_port":"([^"]+)","dst_ip":"([^"]+)","dst_port":"([^"]+)"</regex>
  <order>dionaea.protocol,dionaea.transport,dionaea.event_type,srcip,srcport,dstip,dstport</order>
</decoder>

<decoder name="sentrypeer-custom">
   <program_name>docker</program_name>
  <prematch>sentrypeer</prematch>
</decoder>
<decoder name="sentrypeer-child">
    <parent>sentrypeer-custom</parent>
    <regex type="pcre2">
        "app_name":"([^"]+)",
        "app_version":"([^"]+)",
        "event_timestamp":"([^"]+)",
        "transport_type":"([^"]+)",
        "source_ip":"([^"]+)",
        "destination_ip":"([^"]+)",
        "called_number":"([^"]+)",
        "sip_method":"([^"]+)",
        "sip_user_agent":"([^"]+)"
    </regex>
    <order>
        sentrypeer.app_name,
        sentrypeer.app_version,
        sentrypeer.event_timestamp,
        sentrypeer.transport,
        srcip,
        dstip,
        sentrypeer.called_number,
        sentrypeer.sip_method,
        sentrypeer.user_agent
    </order>
</decoder>


<decoder name="tanner-custom">
    <program_name>docker</program_name>
    <prematch>tanner</prematch>
</decoder>
<decoder name="tanner-child">
    <parent>tanner-custom</parent>
    <regex type="pcre2">
        "method":"([^"]+)",
        "path":"([^"]+)",
        .*?
        "status":"?([0-9]+)"?,
        .*?
        "peer":\{"ip":"([^"]+)","port":"([^"]+)"\},
        .*?
        "user-agent":"([^"]+)",
        .*?
        "detection":\{"name":"([^"]+)"
    </regex>
    <order>
        tanner.method,
        tanner.path,
        tanner.status,
        srcip,
        srcport,
        tanner.user_agent,
        tanner.detection_name
    </order>
</decoder>


<decoder name="adbhoney-custom">
    <program_name>docker</program_name>
    <prematch>adbhoney</prematch>
</decoder>
<decoder name="adbhoney-child">
    <parent>adbhoney-custom</parent>
    <regex type="pcre2">
        'eventid':\s*'([^']+)',
        \s*'src_ip':\s*'([^']+)',
        \s*'src_port':\s*([0-9]+),
        \s*'dest_ip':\s*'([^']+)',
        \s*'dest_port':\s*'([^']+)',
        .*?
        'session':\s*'([^']+)',
        \s*'sensor':\s*'([^']+)'
    </regex>
    <order>
        adbhoney.eventid,
        srcip,
        srcport,
        dstip,
        dstport,
        adbhoney.session,
        adbhoney.sensor
    </order>
</decoder>


<decoder name="mailoney-custom">
    <program_name>docker</program_name>
    <prematch>mailoney</prematch>
</decoder>
<decoder name="mailoney-child">
    <parent>mailoney-custom</parent>
    <regex type="pcre2">
        "timestamp":\s*"([^"]+)",
        \s*"src_ip":\s*"([^"]+)",
        \s*"src_port":\s*([0-9]+),
        \s*"data":\s*"([^\\"]+)
    </regex>
    <order>
        mailoney.timestamp,
        srcip,
        srcport,
        mailoney.command
    </order>
</decoder>


<decoder name="honeytrap-custom">
    <program_name>docker</program_name>
    <prematch>honeytrap</prematch>
</decoder>
<decoder name="honeytrap-child">
    <parent>honeytrap-custom</parent>
    <regex type="pcre2">\[\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\]\s+\*\s+(\d+)\/tcp\s+.*?from\s+(\d+\.\d+\.\d+\.\d+):(\d+)</regex>
    <order>dstport, srcip, srcport</order>
</decoder>


<decoder name="h0neytr4p-custom">
    <program_name>docker</program_name>
    <prematch>h0neytr4p</prematch>
</decoder>
  <decoder name="h0neytr4p-child">
    <parent>h0neytr4p-custom</parent>
    <regex type="pcre2">
      "protocol":"([^"]+)".*,
      "dest_port":"(\d+)".*,
      "request_method":"([^"]+)".*,
      "request_uri":"([^"]+)".*,
      "src_ip":"([^"]+)".*,
      "user-agent":"([^"]+)"
    </regex>
    <order>
      protocol, dstport, http_method, url, srcip, useragent
    </order>
  </decoder>

<!--
  <decoder name="conpot-custom">
    <prematch>sensorid|event_type|data_type|src_ip|dst_port</prematch>
  </decoder>

  <decoder name="conpot-child">
    <parent>conpot-custom</parent>
    <regex type="pcre2">
      "sensorid":"([^"]+)"|"event_type":"([^"]+)"|"data_type":"([^"]+)"|"src_ip":"([0-9\.]+)"|"src_port":([0-9]+)|"dst_ip":"([0-9\.]+)"|"dst_port":([0-9]+)|"oid":"([^"]+)"
    </regex>
    <order>
      sensor event_type protocol srcip srcport dstip dstport oid
    </order>
  </decoder>
  -->

<decoder name="heralding-custom">
  <program_name>docker</program_name>
  <prematch>heralding</prematch>
</decoder>
<decoder name="heralding-child">
  <parent>heralding-custom</parent>
  <regex type="pcre2">"timestamp":"([^"]+)".*"source_ip":"([^"]+)".*"source_port":([0-9]+).*"destination_ip":"([^"]+)".*"destination_port":([0-9]+).*"protocol":"([^"]+)".*"session_id":"([^"]+)"</regex>
  <order>timestamp, srcip, srcport, dstip, dstport, protocol, id</order>
</decoder>


<decoder name="miniprint-custom">
  <program_name>docker</program_name>
  <prematch>miniprint</prematch>
</decoder>
<decoder name="miniprint-child">
    <parent>miniprint-custom</parent>
    <regex type="pcre2">
"timestamp":"([^"]+)".*
"src_ip":"([^"]+)".*
"dest_port":([0-9]+).*
"action":"([^"]+)".*
"event":"([^"]+)"(?:.*
"job_text":"([^"]+)")?
    </regex>
    <order>
        timestamp,
        srcip,
        dstport,
        action,
        miniprint.event,
        miniprint.job
    </order>
</decoder>


<decoder name="elasticpot-custom">
  <program_name>docker</program_name>
  <prematch>elasticpot</prematch>
</decoder>
<decoder name="elasticpot-child">
  <parent>elasticpot-custom</parent>
  <regex type="pcre2">
"eventid":"([^"]+)".*"url":"([^"]+)".*"timestamp":"([^"]+)".*"src_ip":"([^"]+)".*"src_port":([0-9]+).*"dst_port":([0-9]+).*"request":"([^"]+)".*"user_agent":"([^"]+)"
  </regex>
  <order>
    elasticpot.eventid,
    url,
    elasticpot.timestamp,
    srcip,
    srcport,
    dstport,
    http_method,
    useragent
  </order>
</decoder>


<decoder name="medpot-custom">
  <program_name>docker</program_name>
  <prematch>medpot</prematch>
</decoder>
<decoder name="medpot-child">
  <parent>medpot-custom</parent>
  <regex type="pcre2">
    \{"level":"([^"]+)","message":"([^"]+)","timestamp":"([^"]+)","src_port":"([0-9]+)","src_ip":"([0-9.]+)","data":"([^"]+)"\}
  </regex>
  <order>
    medpot.level,
    medpot.message,
    medpot.timestamp,
    srcport,
    srcip,
    medpot.payload_b64
  </order>
</decoder>


<decoder name="ipphoney-custom">
  <program_name>docker</program_name>
  <prematch>ipphoney</prematch>
</decoder>
<decoder name="ipphoney-child">
  <parent>ipphoney-custom</parent>
  <regex type="pcre2">
    \"eventid\":\"([^\"]+)\",\"timestamp\":\"([^\"]+)\",\"url\":\"([^\"]+)\",\"src_ip\":\"([0-9.]+)\",\"src_port\":([0-9]+),\"dst_port\":([0-9]+),\"sensor\":\"([^\"]+)\",\"request\":\"([A-Z]+)\",\"user_agent\":\"([^\"]+)\",\"dst_ip\":\"([0-9.]+)\"
  </regex>
  <order>
    ipphoney.eventid,
    ipphoney.timestamp,
    url,
    srcip,
    srcport,
    dstport,
    ipphoney.sensor,
    http_method,
    user_agent,
    dstip
  </order>
</decoder>


<decoder name="redishoneypot-custom">
  <program_name>docker</program_name>
  <prematch>redishoneypot</prematch>
</decoder>
<decoder name="redishoneypot-child">
  <parent>redishoneypot-custom</parent>
  <regex type="pcre2">
    \"action\":\"([^\"]+)\",\"addr\":\"([0-9.]+):([0-9]+)\",\"level\":\"([^\"]+)\",\"time\":\"([^\"]+)\"
  </regex>
  <order>
    redis.action,
    srcip,
    srcport,
    loglevel,
    redis.time
  </order>
</decoder>



Integration Block in ossec.conf for misp configuration:

<integration>
 <name>custom-misp.py</name>
 <rule_id>90000,90005,90010,90015,90020,110001,120001,130001,140001,150001,160001,170001,199001,167901,167011,167411,164411,166611</rule_id>
 <hook_url>https://IP</hook_url>
 <api_key>{api_Key}</api_key>
 <alert_format>json</alert_format>
</integration>

Reply all
Reply to author
Forward
0 new messages