import requests
import sys
import os
# URL and Authorization token
auth_token = ""
headers = {
"Authorization": f"Token {auth_token}",
}
output_folder = "/var/ossec/etc/lists"
def error_message(endpoint, error_code, error_message):
print(f"ERROR: {endpoint} returned {error_code}: {error_message}")
def details_error(response):
try:
print(f"Response headers: {dict(response.headers)}")
print(f"Response text: {response.text[:1000]}")
try:
error_data = response.json()
if isinstance(error_data, dict):
if 'errors' in error_data:
return "\n".join([error.get('detail', str(error)) for error in error_data.get('errors', [])])
elif 'detail' in error_data:
return error_data['detail']
else:
return str(error_data)
return str(error_data)
except ValueError:
return response.text
except Exception as e:
return f"Error parsing response: {str(e)}"
def update_ipv4_list():
response = requests.get(
general_url + "ip/",
params={"type": "IPv4"},
headers=headers
)
if response.status_code == 200:
ip_data = response.json()
output_ipv4_file = os.path.join(output_folder, "ipv4.cdb")
ipv4_addresses = [entry['value'] for entry in ip_data]
try:
with open(output_ipv4_file, "w") as file:
for ip in ipv4_addresses:
file.write(f"{ip}:\n")
print(f"Successfully wrote IPv4 IPs to {output_ipv4_file}")
except Exception as e:
error_message("No endpoint", "No status code", f"Error writing to file: {e}")
else:
error_message(f"{general_url}ip/?type=IPv4", response.status_code, details_error(response))
def update_ipv6_list():
response = requests.get(
general_url + "ip/",
params={"type": "IPv6"},
headers=headers
)
if response.status_code == 200:
ip_data = response.json()
output_ipv6_file = os.path.join(output_folder, "ipv6.cdb")
ipv6_addresses = [entry['value'] for entry in ip_data]
try:
with open(output_ipv6_file, "w") as file:
for ip in ipv6_addresses:
file.write(f"{ip}:\n")
print(f"Successfully wrote IPv6 IPs to {output_ipv6_file}")
except Exception as e:
error_message("No endpoint", "No status code", f"Error writing to file: {e}")
else:
error_message(f"{general_url}ip/?type=IPv6", response.status_code, details_error(response))
def update_hashmd5_list():
response = requests.get(
general_url + "hash/",
params={"type": "FileHash-MD5"},
headers=headers
)
if response.status_code == 200:
hash_data = response.json()
output_hashmd5_file = os.path.join(output_folder, "hashmd5.cdb")
hashmd5_hashes = [entry['value'] for entry in hash_data]
try:
with open(output_hashmd5_file, "w") as file:
for hash in hashmd5_hashes:
file.write(f"{hash}:\n")
print(f"Successfully wrote MD5 hashes to {output_hashmd5_file}")
except Exception as e:
error_message("No endpoint", "No status code", f"Error writing to file: {e}")
else:
error_message(f"{general_url}hash/?type=FileHash-MD5", response.status_code, details_error(response))
def update_hashsha1_list():
response = requests.get(
general_url + "hash/",
params={"type": "FileHash-SHA1"},
headers=headers
)
if response.status_code == 200:
hash_data = response.json()
output_hashsha1_file = os.path.join(output_folder, "hashsha1.cdb")
hashsha1_hashes = [entry['value'] for entry in hash_data]
try:
with open(output_hashsha1_file, "w") as file:
for hash in hashsha1_hashes:
file.write(f"{hash}:\n")
print(f"Successfully wrote SHA1 hashes to {output_hashsha1_file}")
except Exception as e:
error_message("No endpoint", "No status code", f"Error writing to file: {e}")
else:
error_message(f"{general_url}hash/?type=FileHash-SHA1", response.status_code, details_error(response))
def update_hashsha256_list():
response = requests.get(
general_url + "hash/",
params={"type": "FileHash-SHA256"},
headers=headers
)
if response.status_code == 200:
hash_data = response.json()
output_hashsha256_file = os.path.join(output_folder, "hashsha256.cdb")
hashsha256_hashes = [entry['value'] for entry in hash_data]
try:
with open(output_hashsha256_file, "w") as file:
for hash in hashsha256_hashes:
file.write(f"{hash}:\n")
print(f"Successfully wrote SHA256 hashes to {output_hashsha256_file}")
except Exception as e:
error_message("No endpoint", "No status code", f"Error writing to file: {e}")
else:
error_message(f"{general_url}hash/?type=FileHash-SHA256", response.status_code, details_error(response))
def update_urls_lists():
response = requests.get(
general_url + "url/",
headers=headers
)
if response.status_code == 200:
url_data = response.json()
output_urls_file = os.path.join(output_folder, "urls.cdb")
urls = [entry['value'] for entry in url_data]
try:
with open(output_urls_file, "w") as file:
for url in urls:
file.write(f"{url}:\n")
print(f"Successfully wrote URLs to {output_urls_file}")
except Exception as e:
error_message("No endpoint", "No status code", f"Error writing to file: {e}")
else:
error_message(f"{general_url}url/", response.status_code, details_error(response))
def run_wazuh_command(command):
"""Run a Wazuh command and return the result"""
try:
import subprocess
result = subprocess.run(command, shell=True, check=True, capture_output=True, text=True)
return result.stdout
except subprocess.CalledProcessError as e:
print(f"Command failed with exit code {e.returncode}: {e.stderr}")
return None
def restart_wazuh_if_needed():
"""Restart Wazuh if needed to apply changes to the lists"""
try:
result = run_wazuh_command("/var/ossec/bin/wazuh-control status")
if "is running" in result:
print("Restarting Wazuh to apply list changes...")
run_wazuh_command("/var/ossec/bin/wazuh-control restart")
print("Wazuh restarted successfully")
else:
print("Wazuh is not running, no need to restart")
except Exception as e:
print(f"Error checking/restarting Wazuh: {e}")
if __name__ == "__main__":
# If we get here, backend is available, run the update functions
try:
update_ipv4_list()
update_ipv6_list()
update_hashmd5_list()
update_hashsha1_list()
update_hashsha256_list()
update_urls_lists()
restart_wazuh_if_needed()
except Exception as e:
print(f"Error during update: {e}")
# Still exit with success to avoid affecting Wazuh startup
sys.exit(1)