Since I'm using local ollama, it doesn't have any API key, In this scenario how would I change my custom-integration python code ? I followed wazuh integration configuration for writing my python code.
import json
import sys
import time
import os
from socket import socket, AF_UNIX, SOCK_DGRAM
try:
import requests
except Exception as e:
print("No module 'requests' found. Install: pip install requests")
sys.exit(1)
# Global vars
debug_enabled = False
pwd = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
print(pwd)
json_alert = {}
now = time.strftime("%a %b %d %H:%M:%S %Z %Y")
# Set paths
log_file = '{0}/logs/integrations.log'.format(pwd)
socket_addr = '{0}/queue/sockets/queue'.format(pwd)
ollama_url = "
http://localhost:11434/api/generate" # Ollama's API endpoint
def main(args):
debug("# Starting")
# Read args
alert_file_location = args[1]
model_name = args[2] #ollama model name
debug("# Model Name")
debug(model_name)
debug("# File location")
debug(alert_file_location)
# Load alert. Parse JSON object.
with open(alert_file_location) as alert_file:
json_alert = json.load(alert_file)
debug("# Processing alert")
debug(json_alert)
# Request ollama info
msg = request_ollama_info(json_alert, model_name)
# If positive match, send event to Wazuh Manager
if msg:
send_event(msg, json_alert["agent"])
def debug(msg):
if debug_enabled:
msg = "{0}: {1}\n".format(now, msg)
print(msg)
f = open(log_file, "a")
f.write(str(msg))
f.close()
def collect(data):
srcip = data['srcip']
content = data['response'] #ollama response
return srcip, content
def in_database(data, srcip):
if "response" in data and data["response"]: #basic check for a response
return True
return False
def query_ollama(srcip, model_name):
headers = {
'Content-Type': 'application/json',
}
json_data = {
'model': model_name,
'prompt': f"Give me more data about this IP: {srcip}",
'stream': False, #ollama non streaming response.
}
try:
response =
requests.post(ollama_url, headers=headers, json=json_data)
response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx)
if response.status_code == 200:
data = response.json()
ip = {"srcip": srcip}
new_json = {"response": data["response"]}
new_json.update(ip)
json_response = new_json
return json_response
except requests.exceptions.RequestException as e:
alert_output = {}
alert_output["ollama"] = {}
alert_output["integration"] = "custom-ollama"
debug(f"# Error: Ollama encountered an error: {e}")
alert_output["ollama"]["error"] = str(e)
send_event(alert_output)
exit(0)
except (KeyError, json.JSONDecodeError) as e: # Catch json parsing errors.
alert_output = {}
alert_output["ollama"] = {}
alert_output["integration"] = "custom-ollama"
debug(f"# Error: Ollama response parsing error: {e}, Response: {response.text if 'response' in locals() else 'No Response'}")
alert_output["ollama"]["error"] = str(e)
send_event(alert_output)
exit(0)
def request_ollama_info(alert, model_name):
alert_output = {}
# If there is no source ip address present in the alert. Exit.
if not "srcip" in alert["data"]:
return 0
# Request info using ollama API
data = query_ollama(alert["data"]["srcip"], model_name)
# Create alert
alert_output["ollama"] = {}
alert_output["integration"] = "custom-ollama"
alert_output["ollama"]["found"] = 0
alert_output["ollama"]["source"] = {}
alert_output["ollama"]["source"]["alert_id"] = alert["id"]
alert_output["ollama"]["source"]["rule"] = alert["rule"]["id"]
alert_output["ollama"]["source"]["description"] = alert["rule"]["description"]
alert_output["ollama"]["source"]["full_log"] = alert["full_log"]
alert_output["ollama"]["source"]["srcip"] = alert["data"]["srcip"]
srcip = alert["data"]["srcip"]
# Check if ollama has any info about the srcip
if in_database(data, srcip):
alert_output["ollama"]["found"] = 1
# Info about the IP found in ollama
if alert_output["ollama"]["found"] == 1:
srcip, content = collect(data)
# Populate JSON Output object with ollama request
alert_output["ollama"]["srcip"] = srcip
alert_output["ollama"]["content"] = content
debug(alert_output)
return alert_output
def send_event(msg, agent=None):
if not agent or agent["id"] == "000":
string = '1:ollama:{0}'.format(json.dumps(msg))
else:
string = '1:[{0}] ({1}) {2}->ollama:{3}'.format(agent["id"], agent["name"], agent["ip"] if "ip" in agent else "any", json.dumps(msg))
debug(string)
sock = socket(AF_UNIX, SOCK_DGRAM)
sock.connect(socket_addr)
sock.send(string.encode())
sock.close()
if __name__ == "__main__":
try:
# Read arguments
bad_arguments = False
if len(sys.argv) >= 4:
msg = '{0} {1} {2} {3} {4}'.format(now, sys.argv[1], sys.argv[2], sys.argv[3], sys.argv[4] if len(sys.argv) > 4 else '')
debug_enabled = (len(sys.argv) > 4 and sys.argv[4] == 'debug')
else:
msg = '{0} Wrong arguments'.format(now)
bad_arguments = True
# Logging the call
f = open(log_file, 'a')
f.write(str(msg) + '\n')
f.close()
if bad_arguments:
debug("# Exiting: Bad arguments.")
sys.exit(1)
# Main function
main(sys.argv)
except Exception as e:
debug(str(e))
raise