PLUS会员

老韵-机器学习作业

由bqgl97s8创建,最终由bqgl97s8 被浏览 11 用户

感谢万老师,这堂课收获非浅。无论是滚动训练框架,还是scikit-learn等知识的扩展,都对我的量化学习直到了非常大的启迪作用。

坦率讲,对平台和代码都还是半生不熟的情况下,这个作业的过程,就是一个非常高效的学习过程。

1、先试着补全一个线性模型。

首先拟定了五个常见因子,简化起见都是用时间分组下的排序统一量纲。然后询问AI,用什么线性方法较好。

AI推荐用Ridge回归,当然也介绍了一些其他常见线性模型。顺便加深了下线性回归知识。

机器学习——线性回归及选择














为了节省时间,自己编辑了数据查询,其他训练模型等AI为主生成,形成了初版程序:

from bigquant import bigtrader, dai
import pandas as pd
from datetime import datetime, timedelta
import numpy as np
from sklearn.linear_model import Ridge
from sklearn.preprocessing import StandardScaler

def initialize(context: bigtrader.IContext):
    from bigtrader.finance.commission import PerOrder

    # 系统已经设置了默认的交易手续费和滑点,要修改手续费可使用如下函数
    context.set_commission(PerOrder(buy_cost=0.0003, sell_cost=0.0013, min_cost=5))
    
    context.sql = """
    select
    date,
    instrument,
    pct_rank_by(date,ln(total_market_cap)) as fac_rank_total_market_cap,
    pct_rank_by(date,turn) as fac_rank_turn,
    pct_rank_by(date,close) as fac_rank_close,
    pct_rank_by(date,(close/m_lag(close,5))) as fac_ret_5,
    pct_rank_by(net_profit_mrq_yoy) as fac_net_profit_yoy_mrq,
    ------(label)------
    pct_rank_by(date,m_lead(close,2)/m_lead(close,1)) as label
    from
    cn_stock_prefactors
    where
    is_risk_warning = 0
    and list_days > 100
    """
    
    context.holding_days = 5
    context.target_hold_count = 10
    context.train_days = 252
    context.retrain = 10
    context.lookback_days = 90  # 新增:用于处理滞后因子的回看天数

    
    context.pred = []
    context.sv = 0
    context.model = None
    context.scaler = None

def get_date_range(end_date_str, days):
    """获取指定天数前的开始日期"""
    date_obj = datetime.strptime(end_date_str, '%Y-%m-%d')
    start_date_obj = date_obj - timedelta(days=days)
    return start_date_obj.strftime('%Y-%m-%d')

def befor_trading(context, data):
    context.sv = context.portfolio.portfolio_value
    current_date = data.current_dt.strftime("%Y-%m-%d")
    date_str = current_date
    print(f"{current_date}, Portfolio Value: {context.sv}")

    #TODO
    #=========定义训练列========
    #HINT  因子列名
    context.features = [
        'fac_rank_total_market_cap',
        'fac_rank_turn', 
        'fac_rank_close',
        'fac_ret_5',
        'fac_net_profit_yoy_mrq'
    ]
    #===========================

    # 训练模型
    if context.trading_day_index % context.retrain == 0:
        try:
            # 计算需要的数据范围(考虑滞后因子)
            train_end_date = date_str
            train_start_date = get_date_range(train_end_date, context.train_days + context.lookback_days)
            
            print(f"训练数据范围: {train_start_date} to {train_end_date}")
            #TODO
            #============机器学习训练模型==========
            #HINT 对因子和label进行划分
            #训练模型
            
            df = dai.query(context.sql, filters={'date': [train_start_date, train_end_date]}).df()
            
            # 过滤数据到实际训练期
            actual_train_start = get_date_range(train_end_date, context.train_days)
            df = df[df['date'] >= actual_train_start]
            
            # 数据预处理
            df_clean = df.dropna()
            
            if len(df_clean) < 50:
                print("训练数据不足,跳过训练")
                return
                
            print(f"训练数据量: {len(df_clean)}")
            
            # 准备特征和标签
            X = df_clean[context.features]
            y = df_clean['label']
            
            # 检查数据是否有足够的变化
            if len(y) > 0 and y.std() < 1e-6:
                print("标签数据无变化,跳过训练")
                return
            
            if len(X) == 0:
                print("特征数据为空,跳过训练")
                return
                
            # 数据标准化
            context.scaler = StandardScaler()
            X_scaled = context.scaler.fit_transform(X)
            
            # 训练Ridge回归模型
            context.model = Ridge(alpha=1.0)
            context.model.fit(X_scaled, y)
            
            print("模型训练完成")
            coefficients = dict(zip(context.features, context.model.coef_))
            print("因子系数:", coefficients)
            
            #============训练结束================
        except Exception as e:
            print(f"训练过程出错: {e}")
            import traceback
            traceback.print_exc()
            return

    # 每隔holding_days天进行预测和交易
    if context.trading_day_index % context.holding_days != 0:
        return
    #TODO
    #============模型预测==================
    # 预测部分
    if context.model is None or context.scaler is None:
        print("模型未训练,跳过预测")
        return

    try:
        # 获取预测数据(考虑滞后因子需要的回看数据)
        pred_end_date = date_str
        pred_start_date = get_date_range(pred_end_date, context.lookback_days)
        
        print(f"预测数据范围: {pred_start_date} to {pred_end_date}")
        
        pred_sql = """
        select
        date,
        instrument,
        pct_rank_by(date,ln(total_market_cap)) as fac_rank_total_market_cap,
        pct_rank_by(date,turn) as fac_rank_turn,
        pct_rank_by(date,close) as fac_rank_close,
        pct_rank_by(date,(close/m_lag(close,5))) as fac_ret_5,
        pct_rank_by(date,net_profit_yoy_mrq) as fac_net_profit_yoy_mrq
        from
        cn_stock_prefactors
        where
        is_risk_warning = 0
        and list_days > 100
        and date >= '{start_date}'
        and date <= '{end_date}'
        """.format(start_date=pred_start_date, end_date=pred_end_date)
        
        pred_df = dai.query(pred_sql).df()
        
        # 只取当前日期的数据进行预测
        pred_df_current = pred_df[pred_df['date'] == current_date].copy()
        pred_df_current = pred_df_current.dropna()
        
        if len(pred_df_current) == 0:
            print(f"预测数据为空,日期: {current_date}")
            print(f"总查询数据量: {len(pred_df)}, 当前日期数据量: {len(pred_df[pred_df['date'] == current_date])}")
            return
            
        # 准备预测数据
        X_pred = pred_df_current[context.features]
        
        # 标准化
        X_pred_scaled = context.scaler.transform(X_pred)
        
        # 预测
        predictions = context.model.predict(X_pred_scaled)
        
        # 添加预测结果到DataFrame
        pred_df_current['score'] = predictions
        
        # 保存预测结果
        context.pred = pred_df_current[['instrument', 'score']].copy()
        print(f"预测完成,共{len(context.pred)}只股票,日期: {current_date}")
        
        #=============================================
    except Exception as e:
        print(f"预测过程出错: {e}")
        import traceback
        traceback.print_exc()
        context.pred = []

def handle_data(context: bigtrader.IContext, data: bigtrader.IBarData):
    import pandas as pd 
    from bigtrader.constant  import Direction 
    from bigtrader.constant  import OrderType
    
    if len(context.pred) < 1:
        return

    df = context.pred

    # 按得分排序,选择得分最高的股票
    buy_list = list(df.sort_values(by='score', ascending=False).head(context.target_hold_count)['instrument'])
    len_ = len(buy_list)
    
    if len_ == 0:
        print("没有可买入的股票")
        return
        
    print(f"买入股票列表数量: {len_}")
    target_hold_instruments = set(buy_list)
        
    # 获取当前已持有股票
    current_hold_instruments = set(context.get_account_positions().keys())

    # 卖出不在目标持有列表中的股票
    for instrument in current_hold_instruments - target_hold_instruments:
        print(f"卖出: {instrument}")
        context.order_target_percent(instrument, 0)
            
    # 买入目标持有列表中的股票
    for instrument in target_hold_instruments - current_hold_instruments:
        print(f"买入: {instrument}, 比例: {1/len_:.4f}")
        context.order_target_percent(instrument, 1/len_)

    context.pred = []

performance = bigtrader.run(
    market=bigtrader.Market.CN_STOCK,
    frequency=bigtrader.Frequency.DAILY,
    start_date='2024-01-01',  
    end_date='2025-08-08',  
    capital_base=3000000,
    initialize=initialize,
    handle_data=handle_data,
    before_trading_start = befor_trading,
    order_price_field_buy='open',
    order_price_field_sell='open',
)

# 渲染绩效报告,展示回测结果
performance.render()

渲染结果:

初测结果

2、探查bigtrader的属性与参数

然后因为作业有要求结果比较等要求,于是探查performance里数据,从api说明里查到性能分析代码:

# 运行回测后获取性能对象
performance = bt.run(...)

# 渲染图表
performance.render('chart')

# 渲染表格
performance.render('table', table_max_rows=50)

# 访问原始性能数据
raw_data = performance.raw_perf
account_performances = performance.account_raw_performances

于是添加代码单元格:

raw_data = performance.raw_perf
print(raw_data)
account_performances = performance.account_raw_performances
print(account_performances)

运行分别输出账户的分日资产情况(.raw_perf)与账户的交易情况列表;

有点好奇,这个bigtrader.run()的属性到底还有哪些呢?

请教了下AI,于是继续添加代码单元格:

"""探索Performance对象的所有属性"""
print("=== Performance对象属性探索 ===")

# 获取所有属性和方法
all_attrs = dir(performance)
print(f"总属性数量: {len(all_attrs)}")

# 分类显示
methods = [attr for attr in all_attrs if callable(getattr(performance, attr))]
properties = [attr for attr in all_attrs if not callable(getattr(performance, attr))]

print(f"\n📝 属性 ({len(properties)} 个):")
for attr in sorted(properties):
    try:
        value = getattr(performance, attr)
        if not callable(value):
            print(f"  {attr}: {type(value).__name__}")
    except:
        print(f"  {attr}: <无法访问>")

print(f"\n🔧 方法 ({len(methods)} 个):")
for method in sorted(methods)[:20]:  # 只显示前20个
    print(f"  {method}()")

if len(methods) > 20:
    print(f"  ... 还有 {len(methods) - 20} 个方法")

运行日志输出如下

  • === Performance对象属性探索 ===
  • 总属性数量: 38
  • 📝 属性 (12 个):
  • _Performance__logger: BoundLoggerLazyProxy
  • annotations: dict
  • dict: dict
  • doc: str
  • module: str
  • weakref: NoneType
  • account_raw_performances: list
  • bar1d: DataFrame
  • frequency: Frequency
  • market: Market
  • raw_perf: DataFrame
  • stats: dict
  • 🔧 方法 (26 个):
  • class()
  • ……

然后继续探查我还不清楚的.stats:

print(performance.stats)

输出:

日志 1 条 ▼

  • {'return_ratio': 99.02, 'annual_return_ratio': 56.36, 'benchmark_ratio': 21.22, 'beta': 1.24, 'alpha': 0.42, 'sharp_ratio': 1.29, 'ir': 0.08, 'return_volatility': 37.96, 'max_drawdown': 35.43, 'win_ratio': 57.45, 'profit_loss_ratio': 1.35}

\n原来这个.stats输出的就是各项测试指标所在的dict.

3、效果数据输出与保存

知道了这些效果数据来源,那么就能构想如何保存数据,以用于对比。

于是,再添加代码单元:

import pickle
import pandas as pd

def save_strategy_simple(strategy_name, performance):
    """简化版策略结果保存函数"""
    
    # 直接使用performance.stats中的指标
    stats = performance.stats
    
    # 准备数据
    result = {
        'strategy_name': strategy_name,
        'stats': stats,  # 直接保存stats
        'returns_data': {
            'dates': performance.raw_perf.index.tolist(),
            'portfolio_values': performance.raw_perf['portfolio_value'].tolist(),
            'returns': performance.raw_perf['portfolio_value'].pct_change().dropna().tolist(),
            'return_dates': performance.raw_perf.index[1:].tolist()  # 对应收益率的日期
        },
        'raw_perf': performance.raw_perf.to_dict(),
        'timestamp': pd.Timestamp.now().isoformat()
    }
    
    # 保存
    filename = f'strategy_{strategy_name.lower().replace(" ", "_")}_result.pkl'
    with open(filename, 'wb') as f:
        pickle.dump(result, f)
    
    print(f"✅ {strategy_name} 策略结果已保存: {filename}")
    print(f"📊 年化收益: {stats.get('annual_return_ratio', 0)/100:.4f}")
    print(f"⭐ 夏普比率: {stats.get('sharp_ratio', 0)/100:.4f}")
    print(f"🛡️ 最大回撤: {stats.get('max_drawdown', 0)/100:.4f}")
    print(f"📈 胜率: {stats.get('win_ratio', 0)/100:.4f}")
    
    return result

# 在每个策略回测结束后调用
# performance = bigtrader.run(...)
save_strategy_simple('ridge', performance)

这样就把相关数据保存到文件了,方便不同模型对比。考虑后续调优方便,我并没有把几个不同模型放在一个策略文件里的想法,所以先保存结果再调用对比就是一个顺理成章的选择。

\

4、继续建立XGboost和随机森林机器学习策略

到这里,线性模型先告一段落,然后学习XGboost和随机森林。

XGboost要览

随机森林要览

然后就简单了,框架不用动,数据准备不用动,就只需要修改加载的模型就OK了。为节省篇幅和看客们时间,这两个策略就先不放上了

\

5、不同模型策略数据对比

两个策略就绪后,再建了个对比程序来对照参数等

# strategy_comparator_fixed.py
import pickle
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np

class SimpleStrategyComparator:
    def __init__(self):
        self.results = {}
    
    def load_all_strategies(self):
        """加载所有策略结果"""
        strategy_files = {
            'Ridge Regression': 'strategy_ridge_result.pkl',
            'XGBoost': 'strategy_xgboost_result.pkl',
            'Random': 'strategy_random_result.pkl'
        }
        
        for name, filename in strategy_files.items():
            try:
                with open(filename, 'rb') as f:
                    self.results[name] = pickle.load(f)
                print(f"✅ 加载 {name} 成功")
            except FileNotFoundError:
                print(f"❌ 文件 {filename} 不存在")
            except Exception as e:
                print(f"❌ 加载 {name} 失败: {e}")
    
    def create_metrics_dataframe(self):
###########因为字数限制,超限了,这里就省略近200行##########################

# 使用方法
if __name__ == "__main__":
    comparator = SimpleStrategyComparator()
    comparator.run_complete_analysis()

感谢ai,几百行也能几秒生成。虽然不是太完美,但第二次就能运行成功,还是不错的(第一次时ai程序发生了个百分比处理的错误)。

  • 🚀 开始策略对比分析...

  • ✅ 加载 Ridge Regression 成功

  • ✅ 加载 XGBoost 成功

  • ✅ 加载 Random 成功

  • =====================================================================================================================

  • 📈 策略性能对比表

  • =====================================================================================================================

  • 年化收益 最大回撤 夏普比率 胜率 收益波动率 Beta Alpha 信息比率

  • Strategy

  • Ridge Regression 0.5636 0.3543 1.29 0.5745 0.3796 1.24 0.42 0.08

  • XGBoost 0.5153 0.3823 1.08 0.5145 0.4559 1.44 0.39 0.06

  • Random 0.3001 0.3715 0.75 0.5501 0.4422 1.11 0.23 0.04

  • \

  • ============================================================

  • 🏆 策略排名报告

  • ============================================================

  • 💰 最高年化收益: Ridge Regression (0.5636)

  • ⭐ 最佳夏普比率: Ridge Regression (1.2900)

  • 🛡️ 最低最大回撤: Ridge Regression (0.3543)

  • ⚖️ 收益风险比 (年化收益/最大回撤):

  • Ridge: 1.5907

  • XGBoost: 1.3479

  • Random: 0.8078

  • 🏅 综合排名 (收益风险比 + 夏普比率):

  • \

    1. Ridge: 2.8807
  • \

    1. XGBoost: 2.4279
  • \

    1. Random: 1.5578
  • ✅ 分析完成!

    不同机器学习方案可视化对比(初测,均未进行因子与参数优化)

6、持续优化

后面的事情,就是针对各策略进行优化了。

优化中发现,模型确定后,特征因子的选取对回测效果是决定性的;一些因子加上去,可能会降低收益,增加回撤;另一些呢则可能增加收益,且减少回撤。也有些呢是比较偏一方面,如降低收益但减少回撤。但单因子测试与组合多因子是不同的,不同组合下,有时候负效的因子可能会变成正效,反之亦然。如果把因子分析这个基本功练好,对各特征因子的相关性有足够了解的话,可以事半功倍。

另外,一些模型的训练还是挺耗时间的。在快速测试的时候,我们可以优化参数。如XGboost,一定要加上**n_jobs=-1,**才能全面利用多线程加速,这时要快很多;树也可以从100降到50甚至20去快测(因子少的情况下)。尤其做作业主要是熟悉流程,那么就把训练间隔期设到30甚至60天,都是可以的。

在优化过程中,又想起模型评价及特征因子重要性等指标的保存对比等,探索了下如何在框架下汇总保存结果等,进一步完善了策略。以下是某次测试输出一部分:

  • ✅ XGBoost 策略结果已保存: strategy_xgboost_result_b.pkl

  • 📊 年化收益: 0.5754

  • ⭐ 夏普比率: 1.17

  • 🛡️ 最大回撤: 0.4030

  • 📈 胜率: 0.5650

  • 📈 训练历史摘要:

  • 训练次数: 7

  • 最近3次训练评估:

  • 日期: 2024-12-30

  • 样本数: 869889

  • MSE: 0.082658

  • R²: 0.0070

  • IC: 0.0863

  • 重要因子: fac_ret_5

  • 日期: 2025-04-02

  • 样本数: 856988

  • MSE: 0.082579

  • R²: 0.0076

  • IC: 0.0911

  • 重要因子: fac_ret_5

  • 日期: 2025-07-02

  • 样本数: 867055

  • MSE: 0.082422

  • R²: 0.0093

  • IC: 0.1004

  • 重要因子: fac_ret_5

    \

关于模型评价

我问了下ai,R²在0.0093等水平是不是很低,因为以前做线性回归时,衡量标准是要接近于1.

Ai简单回复了这个问题:

关于R²很低的问题 R² = 0.0093确实很低,但这在金融预测中其实是正常的!原因如下:

  1. 金融市场的本质特征 股票收益率具有很强的随机性 市场效率高,可预测性低 大部分收益率变化是不可预测的噪声
  2. R²在金融预测中的意义 R² ≈ 0.01 (1%) 在金融预测中已经很不错了 R² ≈ 0.05 (5%) 已经是非常好的模型了 R² > 0.1 (10%) 在金融领域算是优秀模型
  3. 为什么IC=0.1004相对更好 IC = 0.1004 表示预测值和实际值的相关系数约为10% 这在金融预测中是一个合理且有用的水平 IC > 0.05 通常就认为模型有预测能力
  4. 模型有效性判断标准 对于金融预测模型,更应该关注:

IC值:> 0.05 就有实用价值 方向准确性:预测涨跌方向的准确率 收益表现:实际回测收益和风险指标

先发到这,后面稍优化点(运行中)如有时间再添加个xgboost代码,先发布再说哈 : )


\

{link}