#!/var/ossec/framework/python/bin/python3
# Copyright (C) 2015-2023, Wazuh Inc.
# Gemini Integration
import json
import sys
import time
import os
from socket import socket, AF_UNIX, SOCK_DGRAM
try:
import requests
except Exception as e:
print("No module 'requests' found. Install: pip install requests")
sys.exit(1)
# Global vars
debug_enabled = False
pwd = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
json_alert = {}
now = time.strftime("%a %b %d %H:%M:%S %Z %Y")
# Set paths
log_file = '{0}/var/log/syslog'.format(pwd)
socket_addr = '{0}/queue/sockets/queue'.format(pwd)
def main(args):
debug("# Starting")
# Read args
alert_file_location = args[1]
apikey = args[2]
hook_url = args[3]
debug("# API Key")
debug(apikey)
debug("# File location")
debug(alert_file_location)
# Load alert. Parse JSON object.
with open(alert_file_location) as alert_file:
json_alert = json.load(alert_file)
debug("# Processing alert")
debug(json_alert)
# Check for rule level 15
if json_alert["rule"]["level"] == 15:
debug("# Rule level 15 detected")
# Send log to Gemini
msg = request_gemini_info(json_alert, apikey, hook_url)
# If positive match, send event to Wazuh Manager
if msg:
send_event(msg, json_alert["agent"])
def debug(msg):
if debug_enabled:
msg = "{0}: {1}\n".format(now, msg)
print(msg)
with open(log_file, "a") as f:
f.write(str(msg))
def request_gemini_info(alert, apikey, hook_url):
alert_output = {}
# Gemini API request
headers = {
'Authorization': 'Bearer ' + apikey,
'Content-Type': 'application/json',
}
json_data = {
'full_log': alert['full_log'],
'rule': {
'id': alert['rule']['id'],
'description': alert['rule']['description'],
'level': alert['rule']['level']
},
'agent': alert['agent']
}
response = requests.post(hook_url, headers=headers, json=json_data)
if response.status_code == 200:
alert_output = response.json()
debug("# Gemini API response received")
else:
alert_output["gemini"] = {}
alert_output["integration"] = "custom-gemini"
alert_output["gemini"]["error"] = response.status_code
debug("# Error: The Gemini API encountered an error")
return alert_output
def send_event(msg, agent=None):
if not agent or agent["id"] == "000":
string = '1:gemini:{0}'.format(json.dumps(msg))
else:
string = '1:[{0}] ({1}) {2}->gemini:{3}'.format(agent["id"], agent["name"], agent.get("ip", "any"), json.dumps(msg))
debug(string)
sock = socket(AF_UNIX, SOCK_DGRAM)
sock.connect(socket_addr)
sock.send(string.encode())
sock.close()
if __name__ == "__main__":
try:
# Read arguments
bad_arguments = False
if len(sys.argv) >= 4:
msg = '{0} {1} {2} {3}'.format(now, sys.argv[1], sys.argv[2], sys.argv[3])
debug_enabled = (len(sys.argv) > 4 and sys.argv[4] == 'debug')
else:
msg = '{0} Wrong arguments'.format(now)
bad_arguments = True
# Logging the call
with open(log_file, 'a') as f:
f.write(str(msg) + '\n')
if bad_arguments:
debug("# Exiting: Bad arguments.")
sys.exit(1)
# Main function
main(sys.argv)
except Exception as e:
debug(str(e))
raise