bqb18wzv的知识库

数据清洗太痛苦?教你搭建一套标准化的多市场行情管道

由bqb18wzv创建,最终由bqb18wzv 被浏览 5 用户

在训练量化模型时,最消磨热情的不是调参,而是数据预处理。A股数据源是一个格式,美股又是另一个格式,外汇更是乱七八糟。最近我在重构我的数据管道(Pipeline),目标是实现:源头统一,逻辑解耦。

遇到的工程难题 之前为了获取全球市场数据,我对接了三四个不同的API。结果就是代码里充满了大量的 if-else 来处理不同交易所的字段命名差异。一旦上游接口变动,整个ETL流程直接报错。

重构思路:统一数据层 我意识到,我需要一个能够屏蔽底层差异的中间层。我的解决方案是引入一个能够输出标准JSON流的API服务(目前测试下来AllTick的格式化做得比较规范),通过它来分发所有市场的数据。

管道搭建实操

  1. 依赖库配置 我们主要用到 requests 做历史数据抓取,websocket 做实时流接入。 (此处请插入代码:3.1 安装依赖)

  2. 标准化输入流 看下面的代码,无论是A股的 000001.SZ 还是美股的 AAPL,通过同一个接口请求,返回的字段结构是完全一致的。这对特征工程来说简直是福音。

    import requests
    
    # 获取A股实时行情示例
    symbol = "000001.SZ"
    url = "wss://quote.alltick.co/quote-stock-b-ws-api"
    response = requests.get(url, params={"symbol": symbol})
    data = response.json()
    print(data)
    
    # 美股示例
    symbol_us = "AAPL.US"
    response_us = requests.get(url, params={"symbol": symbol_us})
    print(response_us.json())
    
    
  3. 实时推理的数据喂入 对于部署在服务器上的实时推理模型,我使用WebSocket保持长连接,将接收到的Tick数据直接喂给模型进行预测。

    import websocket
    import json
    
    def on_message(ws, message):
        print("收到行情:", json.loads(message))
    
    ws = websocket.WebSocketApp("wss://quote.alltick.co/quote-stock-b-ws-api",
                                on_message=on_message)
    ws.run_forever()
    
    
  4. 历史训练集构建 在训练阶段,通过指定时间窗口获取历史K线,一步到位生成Pandas DataFrame所需的数据源。

    # 外汇实时行情
    fx_symbol = "EURUSD"
    fx_url = "wss://quote.alltick.co/quote-b-ws-api?token=yourtoken"
    fx_data = requests.get(fx_url, params={"symbol": fx_symbol}).json()
    print(fx_data)
    
    # 股票历史K线数据
    history_url = "https://quote.alltick.co/quote-stock-b-api/kline?token=yourtoken&query=queryData"
    history_data = requests.get(history_url, params={
        "symbol": "AAPL.US",
        "start": "2023-01-01",
        "end": "2023-12-31"
    }).json()
    print(history_data)
    
    

优化总结 通过这次重构,我把数据接入层的代码量减少了60%。无论你是做传统CTA还是机器学习,一个干净、统一的数据源能让你把精力集中在Alpha挖掘上,而不是在修bug上。



\

{link}