使用Python编写行情预警脚本(AllTick API + 邮件通知)

8 views
Skip to first unread message

Tempo Lam

unread,
Mar 4, 2025, 2:46:04 AMMar 4
to market-data-api

在金融交易中,及时获取市场行情并在满足特定条件时接收预警通知是非常重要的。本教程将介绍如何使用Python编写一个自动行情预警脚本,该脚本将从AllTick API获取实时外汇数据,并在符合条件时发送邮件提醒。


在开始编写代码之前,我们需要安装一些必要的Python库。

pip install requests smtplib email


获取AllTick实时外汇数据

AllTick API 提供实时的外汇数据,我们可以使用 requests 库进行HTTP请求来获取行情。

import requests
import json

def get_forex_price(symbol, token):
    url = f'https://quote.alltick.io/quote-b-ws-api?token={token}&query='
    query = {
        "trace": "forex_alert",
        "data": {"symbol_list": [{"code": symbol}]}
    }
    response = requests.get(url + json.dumps(query))
    if response.status_code == 200:
        data = response.json()
        return data
    else:
        print("Error fetching data:", response.text)
        return None


设定行情预警条件

我们假设当EUR/USD的汇率突破1.10时,我们希望收到邮件提醒。

def check_alert_condition(forex_data, threshold=1.10):
    if forex_data and "data" in forex_data:
        for item in forex_data["data"]["symbol_list"]:
            if item["code"] == "EUR/USD":
                price = float(item["price"])
                if price > threshold:
                    return True, price
    return False, None


发送邮件通知

当条件满足时,我们将使用SMTP发送邮件通知。

import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart

def send_email_alert(price):
    sender_email = "your_...@example.com"
    receiver_email = "rece...@example.com"
    password = "your_email_password"
   
    subject = "行情预警通知"
    body = f"EUR/USD 汇率已突破设定阈值,当前价格:{price}"
   
    msg = MIMEMultipart()
    msg["From"] = sender_email
    msg["To"] = receiver_email
    msg["Subject"] = subject
    msg.attach(MIMEText(body, "plain"))
   
    try:
        server = smtplib.SMTP("smtp.example.com", 587)
        server.starttls()
        server.login(sender_email, password)
        server.sendmail(sender_email, receiver_email, msg.as_string())
        server.quit()
        print("预警邮件已发送!")
    except Exception as e:
        print("邮件发送失败:", e)


主循环检测行情

我们使用一个循环,每隔一分钟检查一次行情数据。

import time

token = "your_alltick_token"
symbol = "EUR/USD"
while True:
    forex_data = get_forex_price(symbol, token)
    alert, price = check_alert_condition(forex_data)
    if alert:
        send_email_alert(price)
    time.sleep(60)

Reply all
Reply to author
Forward
0 new messages