# 导入函数库 from jqdata import * from jqdata import finance import pandas as pd import numpy as np import math from datetime import date, datetime, timedelta, time import re MA_ADDITIONAL_HISTORY_DAYS = 30 # 计算均线时额外获取的交易日数量,用于支持更长周期 # 均线聚合度阈值(基于MA5/MA10/MA20),用于动态调整单标的保证金上限 MA_COMPACTION_THRESHOLDS = { 'tight': 0.0045, 'balanced': 0.0073, 'loose': 0.0116 } # 顺势交易策略 v001 # 基于均线走势(前提条件)+ K线形态(开盘价差、当天价差)的期货交易策略 # # 核心逻辑: # 1. 开盘时检查均线走势(MA30<=MA20<=MA10<=MA5为多头,反之为空头) # 2. 检查开盘价差是否符合方向要求(多头>=0.5%,空头<=-0.5%) # 3. 14:35和14:55检查当天价差(多头>0,空头<0),满足条件则开仓 # 4. 应用固定止损和动态追踪止盈 # 5. 自动换月移仓 # 设置以便完整打印 DataFrame pd.set_option('display.max_rows', None) pd.set_option('display.max_columns', None) pd.set_option('display.width', None) pd.set_option('display.max_colwidth', 20) ## 初始化函数,设定基准等等 def initialize(context): # 设定沪深300作为基准 set_benchmark('000300.XSHG') # 开启动态复权模式(真实价格) set_option('use_real_price', True) # 输出内容到日志 log.info('=' * 60) log.info('均线形态交易策略 v001 初始化开始') log.info('策略类型: 均线走势 + K线形态') log.info('=' * 60) ### 期货相关设定 ### # 设定账户为金融账户 set_subportfolios([SubPortfolioConfig(cash=context.portfolio.starting_cash, type='index_futures')]) # 期货类每笔交易时的手续费是: 买入时万分之0.23,卖出时万分之0.23,平今仓为万分之23 set_order_cost(OrderCost(open_commission=0.000023, close_commission=0.000023, close_today_commission=0.0023), type='index_futures') # 设置期货交易的滑点 set_slippage(StepRelatedSlippage(2)) # 初始化全局变量 g.reserve_percentage = 0.2 # 预留资金比例 g.financial_reserve_amount = 180000 # 金融期货预留资金 g.max_margin_per_position = 30000 # 单个标的最大持仓保证金(元) g.base_max_margin_per_position = g.max_margin_per_position # 记录基础保证金上限,便于动态调整 g.ma_compaction_thresholds = MA_COMPACTION_THRESHOLDS.copy() # 均线策略参数 g.ma_periods = [5, 10, 20, 30, 60] # 均线周期(新增MA60) g.ma_cross_periods = [5, 10, 20, 30] # 参与均线穿越判断的均线周期 g.ma_historical_days = 60 + MA_ADDITIONAL_HISTORY_DAYS # 额外增加30天,确保MA60计算稳定 g.ma_open_gap_threshold = 0.001 # 方案1开盘价差阈值(0.2%) g.ma_pattern_lookback_days = 10 # 历史均线模式一致性检查的天数 g.ma_pattern_consistency_threshold = 0.8 # 历史均线模式一致性阈值(80%) g.check_intraday_spread = False # 是否检查日内价差(True: 检查, False: 跳过) g.ma_proximity_min_threshold = 8 # MA5与MA10贴近计数和的最低阈值 g.ma_pattern_extreme_days_threshold = 4 # 极端趋势天数阈值 g.ma_distribution_lookback_days = 5 # MA5分布过滤回溯天数 g.ma_distribution_min_ratio = 0.4 # MA5分布满足比例阈值 g.enable_ma_distribution_filter = True # 是否启用MA5分布过滤 g.ma_cross_threshold = 1 # 均线穿越数量阈值 g.enable_open_gap_filter = True # 是否启用开盘价差过滤 # 均线价差策略方案选择 g.ma_gap_strategy_mode = 3 # 策略模式选择(1: 原方案, 2: 新方案, 3: 方案3) g.ma_open_gap_threshold2 = 0.001 # 方案2开盘价差阈值(0.2%) g.ma_intraday_threshold_scheme2 = 0.005 # 方案2日内变化阈值(0.5%) # 止损止盈策略参数 g.fixed_stop_loss_rate = 0.01 # 固定止损比率(1%) g.ma_offset_ratio_normal = 0.003 # 均线跟踪止盈常规偏移量(0.3%) g.ma_offset_ratio_close = 0.01 # 均线跟踪止盈收盘前偏移量(1%) g.days_for_adjustment = 4 # 持仓天数调整阈值 # 输出策略参数 log.info("均线形态策略参数:") log.info(f" 均线周期: {g.ma_periods}") log.info(f" 均线穿越判定周期: {g.ma_cross_periods}") log.info(f" 策略模式: 方案{g.ma_gap_strategy_mode}") log.info(f" 方案1开盘价差阈值: {g.ma_open_gap_threshold:.1%}") log.info(f" 方案2开盘价差阈值: {g.ma_open_gap_threshold2:.1%}") log.info(f" 方案2日内变化阈值: {g.ma_intraday_threshold_scheme2:.1%}") log.info(f" 历史均线模式检查天数: {g.ma_pattern_lookback_days}天") log.info(f" 历史均线模式一致性阈值: {g.ma_pattern_consistency_threshold:.1%}") log.info(f" 极端趋势天数阈值: {g.ma_pattern_extreme_days_threshold}") log.info(f" 均线贴近计数阈值: {g.ma_proximity_min_threshold}") log.info(f" MA5分布过滤天数: {g.ma_distribution_lookback_days}") log.info(f" MA5分布最低比例: {g.ma_distribution_min_ratio:.0%}") log.info(f" 启用MA5分布过滤: {g.enable_ma_distribution_filter}") log.info(f" 是否检查日内价差: {g.check_intraday_spread}") log.info(f" 均线穿越阈值: {g.ma_cross_threshold}") log.info(f" 是否启用开盘价差过滤: {g.enable_open_gap_filter}") log.info(f" 固定止损: {g.fixed_stop_loss_rate:.1%}") log.info(f" 均线跟踪止盈常规偏移: {g.ma_offset_ratio_normal:.1%}") log.info(f" 均线跟踪止盈收盘前偏移: {g.ma_offset_ratio_close:.1%}") log.info(f" 持仓天数调整阈值: {g.days_for_adjustment}天") log.info(f" 预留资金比例: {g.reserve_percentage:.0%}") log.info(f" 金融期货预留资金: {g.financial_reserve_amount:.0f}") # 期货品种完整配置字典 g.futures_config = { # 贵金属 'AU': {'has_night_session': True, 'margin_rate': {'long': 0.21, 'short': 0.21}, 'multiplier': 1000, 'trading_start_time': '21:00'}, 'AG': {'has_night_session': True, 'margin_rate': {'long': 0.22, 'short': 0.22}, 'multiplier': 15, 'trading_start_time': '21:00'}, # 有色金属 'CU': {'has_night_session': True, 'margin_rate': {'long': 0.15, 'short': 0.15}, 'multiplier': 5, 'trading_start_time': '21:00'}, 'AL': {'has_night_session': True, 'margin_rate': {'long': 0.15, 'short': 0.15}, 'multiplier': 5, 'trading_start_time': '21:00'}, 'ZN': {'has_night_session': True, 'margin_rate': {'long': 0.14, 'short': 0.14}, 'multiplier': 5, 'trading_start_time': '21:00'}, 'PB': {'has_night_session': True, 'margin_rate': {'long': 0.14, 'short': 0.14}, 'multiplier': 5, 'trading_start_time': '21:00'}, 'NI': {'has_night_session': True, 'margin_rate': {'long': 0.16, 'short': 0.16}, 'multiplier': 1, 'trading_start_time': '21:00'}, 'SN': {'has_night_session': True, 'margin_rate': {'long': 0.17, 'short': 0.17}, 'multiplier': 1, 'trading_start_time': '21:00'}, 'SS': {'has_night_session': True, 'margin_rate': {'long': 0.07, 'short': 0.07}, 'multiplier': 5, 'trading_start_time': '21:00'}, # 黑色系 'RB': {'has_night_session': True, 'margin_rate': {'long': 0.13, 'short': 0.13}, 'multiplier': 10, 'trading_start_time': '21:00'}, 'HC': {'has_night_session': True, 'margin_rate': {'long': 0.13, 'short': 0.13}, 'multiplier': 10, 'trading_start_time': '21:00'}, 'I': {'has_night_session': True, 'margin_rate': {'long': 0.16, 'short': 0.16}, 'multiplier': 100, 'trading_start_time': '21:00'}, 'JM': {'has_night_session': True, 'margin_rate': {'long': 0.17, 'short': 0.17}, 'multiplier': 100, 'trading_start_time': '21:00'}, 'J': {'has_night_session': True, 'margin_rate': {'long': 0.25, 'short': 0.25}, 'multiplier': 60, 'trading_start_time': '21:00'}, # 能源化工 'SP': {'has_night_session': True, 'margin_rate': {'long': 0.13, 'short': 0.13}, 'multiplier': 10, 'trading_start_time': '21:00'}, 'FU': {'has_night_session': True, 'margin_rate': {'long': 0.16, 'short': 0.16}, 'multiplier': 10, 'trading_start_time': '21:00'}, 'BU': {'has_night_session': True, 'margin_rate': {'long': 0.17, 'short': 0.17}, 'multiplier': 10, 'trading_start_time': '21:00'}, 'RU': {'has_night_session': True, 'margin_rate': {'long': 0.15, 'short': 0.15}, 'multiplier': 10, 'trading_start_time': '21:00'}, 'BR': {'has_night_session': True, 'margin_rate': {'long': 0.15, 'short': 0.15}, 'multiplier': 5, 'trading_start_time': '21:00'}, 'SC': {'has_night_session': True, 'margin_rate': {'long': 0.17, 'short': 0.17}, 'multiplier': 1000, 'trading_start_time': '21:00'}, 'NR': {'has_night_session': True, 'margin_rate': {'long': 0.15, 'short': 0.15}, 'multiplier': 10, 'trading_start_time': '21:00'}, 'LU': {'has_night_session': True, 'margin_rate': {'long': 0.17, 'short': 0.17}, 'multiplier': 10, 'trading_start_time': '21:00'}, 'LC': {'has_night_session': False, 'margin_rate': {'long': 0.16, 'short': 0.16}, 'multiplier': 1, 'trading_start_time': '09:00'}, # 化工 'FG': {'has_night_session': True, 'margin_rate': {'long': 0.16, 'short': 0.16}, 'multiplier': 20, 'trading_start_time': '21:00'}, 'TA': {'has_night_session': True, 'margin_rate': {'long': 0.13, 'short': 0.13}, 'multiplier': 5, 'trading_start_time': '21:00'}, 'MA': {'has_night_session': True, 'margin_rate': {'long': 0.12, 'short': 0.12}, 'multiplier': 10, 'trading_start_time': '21:00'}, 'SA': {'has_night_session': True, 'margin_rate': {'long': 0.14, 'short': 0.14}, 'multiplier': 20, 'trading_start_time': '21:00'}, 'L': {'has_night_session': True, 'margin_rate': {'long': 0.12, 'short': 0.12}, 'multiplier': 5, 'trading_start_time': '21:00'}, 'V': {'has_night_session': True, 'margin_rate': {'long': 0.12, 'short': 0.12}, 'multiplier': 5, 'trading_start_time': '21:00'}, 'EG': {'has_night_session': True, 'margin_rate': {'long': 0.12, 'short': 0.12}, 'multiplier': 10, 'trading_start_time': '21:00'}, 'PP': {'has_night_session': True, 'margin_rate': {'long': 0.12, 'short': 0.12}, 'multiplier': 5, 'trading_start_time': '21:00'}, 'EB': {'has_night_session': True, 'margin_rate': {'long': 0.12, 'short': 0.12}, 'multiplier': 5, 'trading_start_time': '21:00'}, 'PG': {'has_night_session': True, 'margin_rate': {'long': 0.12, 'short': 0.12}, 'multiplier': 20, 'trading_start_time': '21:00'}, 'PX': {'has_night_session': True, 'margin_rate': {'long': 0.1, 'short': 0.1}, 'multiplier': 5, 'trading_start_time': '21:00'}, # 农产品 'RM': {'has_night_session': True, 'margin_rate': {'long': 0.13, 'short': 0.13}, 'multiplier': 10, 'trading_start_time': '21:00'}, 'OI': {'has_night_session': True, 'margin_rate': {'long': 0.13, 'short': 0.13}, 'multiplier': 10, 'trading_start_time': '21:00'}, 'CF': {'has_night_session': True, 'margin_rate': {'long': 0.13, 'short': 0.13}, 'multiplier': 5, 'trading_start_time': '21:00'}, 'SR': {'has_night_session': True, 'margin_rate': {'long': 0.12, 'short': 0.12}, 'multiplier': 10, 'trading_start_time': '21:00'}, 'PF': {'has_night_session': True, 'margin_rate': {'long': 0.13, 'short': 0.13}, 'multiplier': 5, 'trading_start_time': '21:00'}, 'C': {'has_night_session': True, 'margin_rate': {'long': 0.12, 'short': 0.12}, 'multiplier': 10, 'trading_start_time': '21:00'}, 'CS': {'has_night_session': True, 'margin_rate': {'long': 0.11, 'short': 0.11}, 'multiplier': 10, 'trading_start_time': '21:00'}, 'CY': {'has_night_session': True, 'margin_rate': {'long': 0.11, 'short': 0.11}, 'multiplier': 5, 'trading_start_time': '21:00'}, 'A': {'has_night_session': True, 'margin_rate': {'long': 0.12, 'short': 0.12}, 'multiplier': 10, 'trading_start_time': '21:00'}, 'B': {'has_night_session': True, 'margin_rate': {'long': 0.12, 'short': 0.12}, 'multiplier': 10, 'trading_start_time': '21:00'}, 'M': {'has_night_session': True, 'margin_rate': {'long': 0.12, 'short': 0.12}, 'multiplier': 10, 'trading_start_time': '21:00'}, 'Y': {'has_night_session': True, 'margin_rate': {'long': 0.12, 'short': 0.12}, 'multiplier': 10, 'trading_start_time': '21:00'}, 'P': {'has_night_session': True, 'margin_rate': {'long': 0.13, 'short': 0.13}, 'multiplier': 10, 'trading_start_time': '21:00'}, # 无夜盘品种 'IF': {'has_night_session': False, 'margin_rate': {'long': 0.17, 'short': 0.17}, 'multiplier': 300, 'trading_start_time': '09:30', 'is_financial': True}, 'IH': {'has_night_session': False, 'margin_rate': {'long': 0.15, 'short': 0.15}, 'multiplier': 300, 'trading_start_time': '09:30', 'is_financial': True}, 'IC': {'has_night_session': False, 'margin_rate': {'long': 0.17, 'short': 0.17}, 'multiplier': 200, 'trading_start_time': '09:30', 'is_financial': True}, 'IM': {'has_night_session': False, 'margin_rate': {'long': 0.17, 'short': 0.17}, 'multiplier': 200, 'trading_start_time': '09:30', 'is_financial': True}, 'AP': {'has_night_session': False, 'margin_rate': {'long': 0.16, 'short': 0.16}, 'multiplier': 10, 'trading_start_time': '09:00'}, 'CJ': {'has_night_session': False, 'margin_rate': {'long': 0.14, 'short': 0.14}, 'multiplier': 5, 'trading_start_time': '09:00'}, 'PK': {'has_night_session': False, 'margin_rate': {'long': 0.13, 'short': 0.13}, 'multiplier': 5, 'trading_start_time': '09:00'}, 'JD': {'has_night_session': False, 'margin_rate': {'long': 0.11, 'short': 0.11}, 'multiplier': 10, 'trading_start_time': '09:00'}, 'LH': {'has_night_session': False, 'margin_rate': {'long': 0.13, 'short': 0.13}, 'multiplier': 16, 'trading_start_time': '09:00'}, 'T': {'has_night_session': False, 'margin_rate': {'long': 0.03, 'short': 0.03}, 'multiplier': 1000000, 'trading_start_time': '09:30', 'is_financial': True}, 'PS': {'has_night_session': False, 'margin_rate': {'long': 0.16, 'short': 0.16}, 'multiplier': 3, 'trading_start_time': '09:00'}, 'UR': {'has_night_session': False, 'margin_rate': {'long': 0.14, 'short': 0.14}, 'multiplier': 20, 'trading_start_time': '09:00'}, 'MO': {'has_night_session': True, 'margin_rate': {'long': 0.17, 'short': 0.17}, 'multiplier': 100, 'trading_start_time': '21:00'}, # 'LF': {'has_night_session': False, 'margin_rate': {'long': 0.12, 'short': 0.12}, 'multiplier': 1, 'trading_start_time': '09:30'}, 'HO': {'has_night_session': False, 'margin_rate': {'long': 0.15, 'short': 0.15}, 'multiplier': 100, 'trading_start_time': '09:30'}, # 'LR': {'has_night_session': True, 'margin_rate': {'long': 0.21, 'short': 0.21}, 'multiplier': 20, 'trading_start_time': '21:00'}, 'LG': {'has_night_session': True, 'margin_rate': {'long': 0.13, 'short': 0.13}, 'multiplier': 90, 'trading_start_time': '21:00'}, # 'FB': {'has_night_session': True, 'margin_rate': {'long': 0.14, 'short': 0.14}, 'multiplier': 10, 'trading_start_time': '21:00'}, # 'PM': {'has_night_session': True, 'margin_rate': {'long': 0.2, 'short': 0.2}, 'multiplier': 50, 'trading_start_time': '21:00'}, 'EC': {'has_night_session': False, 'margin_rate': {'long': 0.23, 'short': 0.23}, 'multiplier': 50, 'trading_start_time': '09:00'}, # 'RR': {'has_night_session': True, 'margin_rate': {'long': 0.11, 'short': 0.11}, 'multiplier': 10, 'trading_start_time': '21:00'}, 'OP': {'has_night_session': False, 'margin_rate': {'long': 0.13, 'short': 0.13}, 'multiplier': 40, 'trading_start_time': '09:00'}, # 'IO': {'has_night_session': True, 'margin_rate': {'long': 0.17, 'short': 0.17}, 'multiplier': 1, 'trading_start_time': '21:00'}, 'BC': {'has_night_session': True, 'margin_rate': {'long': 0.15, 'short': 0.15}, 'multiplier': 5, 'trading_start_time': '21:00'}, # 'WH': {'has_night_session': False, 'margin_rate': {'long': 0.2, 'short': 0.2}, 'multiplier': 20, 'trading_start_time': '09:00'}, 'SH': {'has_night_session': True, 'margin_rate': {'long': 0.13, 'short': 0.13}, 'multiplier': 30, 'trading_start_time': '21:00'}, # 'RI': {'has_night_session': False, 'margin_rate': {'long': 0.21, 'short': 0.21}, 'multiplier': 20, 'trading_start_time': '09:00'}, 'TS': {'has_night_session': False, 'margin_rate': {'long': 0.015, 'short': 0.015}, 'multiplier': 2000000, 'trading_start_time': '09:30', 'is_financial': True}, # 'JR': {'has_night_session': False, 'margin_rate': {'long': 0.21, 'short': 0.21}, 'multiplier': 20, 'trading_start_time': '09:00'}, 'AD': {'has_night_session': False, 'margin_rate': {'long': 0.13, 'short': 0.13}, 'multiplier': 10, 'trading_start_time': '09:00'}, # 'BB': {'has_night_session': False, 'margin_rate': {'long': 0.19, 'short': 0.19}, 'multiplier': 500, 'trading_start_time': '09:00'}, 'PL': {'has_night_session': False, 'margin_rate': {'long': 0.12, 'short': 0.12}, 'multiplier': 20, 'trading_start_time': '09:00'}, # 'RS': {'has_night_session': False, 'margin_rate': {'long': 0.26, 'short': 0.26}, 'multiplier': 10, 'trading_start_time': '09:00'}, 'SI': {'has_night_session': False, 'margin_rate': {'long': 0.15, 'short': 0.15}, 'multiplier': 5, 'trading_start_time': '09:00'}, # 'ZC': {'has_night_session': True, 'margin_rate': {'long': 0.56, 'short': 0.56}, 'multiplier': 100, 'trading_start_time': '21:00'}, 'SM': {'has_night_session': False, 'margin_rate': {'long': 0.15, 'short': 0.15}, 'multiplier': 5, 'trading_start_time': '09:00'}, 'AO': {'has_night_session': True, 'margin_rate': {'long': 0.17, 'short': 0.17}, 'multiplier': 20, 'trading_start_time': '21:00'}, 'TL': {'has_night_session': False, 'margin_rate': {'long': 0.045, 'short': 0.045}, 'multiplier': 1000000, 'trading_start_time': '09:00', 'is_financial': True}, 'SF': {'has_night_session': False, 'margin_rate': {'long': 0.14, 'short': 0.14}, 'multiplier': 5, 'trading_start_time': '09:00'}, # 'WR': {'has_night_session': False, 'margin_rate': {'long': 0.15, 'short': 0.15}, 'multiplier': 10, 'trading_start_time': '09:00'}, 'PR': {'has_night_session': True, 'margin_rate': {'long': 0.12, 'short': 0.12}, 'multiplier': 15, 'trading_start_time': '21:00'}, 'TF': {'has_night_session': False, 'margin_rate': {'long': 0.022, 'short': 0.022}, 'multiplier': 1000000, 'trading_start_time': '09:00', 'is_financial': True}, # 'VF': {'has_night_session': False, 'margin_rate': {'long': 0.12, 'short': 0.12}, 'multiplier': 1, 'trading_start_time': '09:00'}, 'BZ': {'has_night_session': False, 'margin_rate': {'long': 0.13, 'short': 0.13}, 'multiplier': 30, 'trading_start_time': '09:00'}, } # 策略品种选择策略配置 # 方案1:全品种策略 - 考虑所有配置的期货品种 g.strategy_focus_symbols = ['SC'] # 空列表表示考虑所有品种 # 方案2:精选品种策略 - 只交易流动性较好的特定品种(如需使用请取消下行注释) # g.strategy_focus_symbols = ['RM', 'CJ', 'CY', 'JD', 'L', 'LC', 'SF', 'SI'] log.info(f"品种选择策略: {'全品种策略(覆盖所有配置品种)' if not g.strategy_focus_symbols else '精选品种策略(' + str(len(g.strategy_focus_symbols)) + '个品种)'}") # 交易记录和数据存储 g.trade_history = {} # 持仓记录 {symbol: {'entry_price': xxx, 'direction': xxx, ...}} g.daily_ma_candidates = {} # 通过均线和开盘价差检查的候选品种 {symbol: {'direction': 'long'/'short', 'open_price': xxx, ...}} g.today_trades = [] # 当日交易记录 g.excluded_contracts = {} # 每日排除的合约缓存 {dominant_future: {'reason': 'ma_trend'/'open_gap', 'trading_day': xxx}} g.ma_checked_underlyings = {} # 记录各品种在交易日的均线检查状态 {symbol: trading_day} g.last_ma_trading_day = None # 最近一次均线检查所属交易日 # 夜盘禁止操作标志 g.night_session_blocked = False # 标记是否禁止当晚操作 g.night_session_blocked_trading_day = None # 记录被禁止的交易日 # 定时任务设置 # 夜盘开始(21:05) - 均线和开盘价差检查 run_daily(check_ma_trend_and_open_gap, time='21:05:00', reference_security='IF1808.CCFX') # 日盘开始 - 均线和开盘价差检查 run_daily(check_ma_trend_and_open_gap, time='09:05:00', reference_security='IF1808.CCFX') run_daily(check_ma_trend_and_open_gap, time='09:35:00', reference_security='IF1808.CCFX') # 夜盘开仓和止损止盈检查 run_daily(check_open_and_stop, time='21:05:00', reference_security='IF1808.CCFX') run_daily(check_open_and_stop, time='21:35:00', reference_security='IF1808.CCFX') run_daily(check_open_and_stop, time='22:05:00', reference_security='IF1808.CCFX') run_daily(check_open_and_stop, time='22:35:00', reference_security='IF1808.CCFX') # 日盘开仓和止损止盈检查 run_daily(check_open_and_stop, time='09:05:00', reference_security='IF1808.CCFX') run_daily(check_open_and_stop, time='09:35:00', reference_security='IF1808.CCFX') run_daily(check_open_and_stop, time='10:05:00', reference_security='IF1808.CCFX') run_daily(check_open_and_stop, time='10:35:00', reference_security='IF1808.CCFX') run_daily(check_open_and_stop, time='11:05:00', reference_security='IF1808.CCFX') run_daily(check_open_and_stop, time='11:25:00', reference_security='IF1808.CCFX') run_daily(check_open_and_stop, time='13:35:00', reference_security='IF1808.CCFX') run_daily(check_open_and_stop, time='14:05:00', reference_security='IF1808.CCFX') run_daily(check_open_and_stop, time='14:35:00', reference_security='IF1808.CCFX') run_daily(check_open_and_stop, time='14:55:00', reference_security='IF1808.CCFX') run_daily(check_ma_trailing_reactivation, time='14:55:00', reference_security='IF1808.CCFX') # 收盘后 run_daily(after_market_close, time='15:30:00', reference_security='IF1808.CCFX') log.info('=' * 60) ############################ 主程序执行函数 ################################### def get_current_trading_day(current_dt): """根据当前时间推断对应的期货交易日""" current_date = current_dt.date() current_time = current_dt.time() trade_days = get_trade_days(end_date=current_date, count=1) if trade_days and trade_days[0] == current_date: trading_day = current_date else: next_days = get_trade_days(start_date=current_date, count=1) trading_day = next_days[0] if next_days else current_date if current_time >= time(20, 59): next_trade_days = get_trade_days(start_date=trading_day, count=2) if len(next_trade_days) >= 2: return next_trade_days[1] if len(next_trade_days) == 1: return next_trade_days[0] return trading_day def normalize_trade_day_value(value): """将交易日对象统一转换为 datetime.date""" if isinstance(value, date) and not isinstance(value, datetime): return value if isinstance(value, datetime): return value.date() if hasattr(value, 'to_pydatetime'): return value.to_pydatetime().date() try: return pd.Timestamp(value).date() except Exception: return value def check_ma_trend_and_open_gap(context): """阶段一:开盘时均线走势和开盘价差检查(一天一次)""" log.info("=" * 60) current_trading_day = get_current_trading_day(context.current_dt) log.info(f"执行均线走势和开盘价差检查 - 时间: {context.current_dt}, 交易日: {current_trading_day}") log.info("=" * 60) # 换月移仓检查(在所有部分之前) position_auto_switch(context) # ==================== 第一部分:基础数据获取 ==================== # 步骤1:交易日检查和缓存清理 if g.last_ma_trading_day != current_trading_day: if g.excluded_contracts: log.info(f"交易日切换至 {current_trading_day},清空上一交易日的排除缓存") g.excluded_contracts = {} g.ma_checked_underlyings = {} g.last_ma_trading_day = current_trading_day # 步骤2:获取当前时间和筛选可交易品种 current_time = str(context.current_dt.time())[:5] # HH:MM格式 focus_symbols = g.strategy_focus_symbols if g.strategy_focus_symbols else list(g.futures_config.keys()) tradable_symbols = [] # 根据当前时间确定可交易的时段 # 21:05 -> 仅接受21:00开盘的合约 # 09:05 -> 接受09:00或21:00开盘的合约 # 09:35 -> 接受所有时段(21:00, 09:00, 09:30)的合约 for symbol in focus_symbols: trading_start_time = get_futures_config(symbol, 'trading_start_time', '09:05') should_trade = False if current_time == '21:05': should_trade = trading_start_time.startswith('21:00') elif current_time == '09:05': should_trade = trading_start_time.startswith('21:00') or trading_start_time.startswith('09:00') elif current_time == '09:35': should_trade = True if should_trade: tradable_symbols.append(symbol) if not tradable_symbols: log.info(f"当前时间 {current_time} 无品种开盘,跳过检查") return log.info(f"当前时间 {current_time} 开盘品种: {tradable_symbols}") # 步骤3:对每个品种循环处理 for symbol in tradable_symbols: # 步骤3.1:检查是否已处理过 if g.ma_checked_underlyings.get(symbol) == current_trading_day: log.info(f"{symbol} 已在交易日 {current_trading_day} 完成均线检查,跳过本次执行") continue try: g.ma_checked_underlyings[symbol] = current_trading_day # 步骤3.2:获取主力合约 dominant_future = get_dominant_future(symbol) if not dominant_future: log.info(f"{symbol} 未找到主力合约,跳过") continue # 步骤3.3:检查排除缓存 if dominant_future in g.excluded_contracts: excluded_info = g.excluded_contracts[dominant_future] if excluded_info['trading_day'] == current_trading_day: continue else: # 新的一天,从缓存中移除(会在after_market_close统一清理,这里也做兜底) del g.excluded_contracts[dominant_future] # 步骤3.4:检查是否已有持仓 if check_symbol_prefix_match(dominant_future, context, set(g.trade_history.keys())): log.info(f"{symbol} 已有持仓,跳过") continue # 步骤3.5:获取历史数据和前一交易日数据(合并优化) # 获取历史数据(需要足够计算MA30) historical_data = get_price(dominant_future, end_date=context.current_dt, frequency='1d', fields=['open', 'close', 'high', 'low'], count=g.ma_historical_days) if historical_data is None or len(historical_data) < max(g.ma_periods): log.info(f"{symbol} 历史数据不足,跳过") continue # 获取前一交易日并在历史数据中匹配 previous_trade_days = get_trade_days(end_date=current_trading_day, count=2) previous_trade_days = [normalize_trade_day_value(d) for d in previous_trade_days] previous_trading_day = None if len(previous_trade_days) >= 2: previous_trading_day = previous_trade_days[-2] elif len(previous_trade_days) == 1 and previous_trade_days[0] < current_trading_day: previous_trading_day = previous_trade_days[0] if previous_trading_day is None: log.info(f"{symbol} 无法确定前一交易日,跳过") continue # 在历史数据中匹配前一交易日 historical_dates = historical_data.index.date match_indices = np.where(historical_dates == previous_trading_day)[0] if len(match_indices) == 0: earlier_indices = np.where(historical_dates < previous_trading_day)[0] if len(earlier_indices) == 0: log.info(f"{symbol} 历史数据缺少 {previous_trading_day} 之前的记录,跳过") continue match_indices = [earlier_indices[-1]] # 提取截至前一交易日的数据,并一次性提取所有需要的字段 data_upto_yesterday = historical_data.iloc[:match_indices[-1] + 1] yesterday_data = data_upto_yesterday.iloc[-1] yesterday_close = yesterday_data['close'] yesterday_open = yesterday_data['open'] # 步骤3.6:获取当前价格数据 current_data = get_current_data()[dominant_future] today_open = current_data.day_open # ==================== 第二部分:核心指标计算 ==================== # 步骤4:计算均线相关指标(合并优化) ma_values = calculate_ma_values(data_upto_yesterday, g.ma_periods) ma_proximity_counts = calculate_ma_proximity_counts(data_upto_yesterday, g.ma_periods, g.ma_pattern_lookback_days) ma_compaction = calculate_ma_compaction_from_values(ma_values, periods=[5, 10, 20]) log.info(f"{symbol}({dominant_future}) 均线检查:") log.info(f" 均线贴近统计: {ma_proximity_counts}") if ma_compaction is not None: log.info(f" 均线聚合度(MA5/MA10/MA20): {ma_compaction:.4f}") if ma_compaction > g.ma_compaction_thresholds['loose']: log.info( f" {symbol}({dominant_future}) ✗ 均线聚合度 {ma_compaction:.4f} 超过阈值 " f"{g.ma_compaction_thresholds['loose']:.4f},跳过" ) add_to_excluded_contracts(dominant_future, 'ma_compaction', current_trading_day) continue else: log.info(" 均线聚合度(MA5/MA10/MA20) 无法计算,使用默认值") # 检查均线贴近计数 proximity_sum = ma_proximity_counts.get('MA5', 0) + ma_proximity_counts.get('MA10', 0) if proximity_sum < g.ma_proximity_min_threshold: log.info(f" {symbol}({dominant_future}) ✗ 均线贴近计数不足,MA5+MA10={proximity_sum} < {g.ma_proximity_min_threshold},跳过") add_to_excluded_contracts(dominant_future, 'ma_proximity', current_trading_day) continue # 步骤5:计算极端趋势天数 extreme_above_count, extreme_below_count = calculate_extreme_trend_days( data_upto_yesterday, g.ma_periods, g.ma_pattern_lookback_days ) extreme_total = extreme_above_count + extreme_below_count min_extreme = min(extreme_above_count, extreme_below_count) filter_threshold = max(2, g.ma_pattern_extreme_days_threshold) log.info( f" 极端趋势天数统计: 收盘在所有均线上方 {extreme_above_count} 天, 收盘在所有均线下方 {extreme_below_count} 天, " f"合计 {extreme_total} 天, min(A,B)={min_extreme} (过滤阈值: {filter_threshold})" ) if extreme_above_count > 0 and extreme_below_count > 0 and min_extreme >= filter_threshold: log.info( f" {symbol}({dominant_future}) ✗ 极端趋势多空同时出现且 min(A,B)={min_extreme} ≥ {filter_threshold},跳过" ) add_to_excluded_contracts(dominant_future, 'ma_extreme_trend', current_trading_day) continue # 步骤6:判断均线走势 direction = None if check_ma_pattern(ma_values, 'long'): direction = 'long' elif check_ma_pattern(ma_values, 'short'): direction = 'short' else: add_to_excluded_contracts(dominant_future, 'ma_trend', current_trading_day) continue # 步骤7:检查MA5分布过滤 if g.enable_ma_distribution_filter: distribution_passed, distribution_stats = check_ma5_distribution_filter( data_upto_yesterday, g.ma_distribution_lookback_days, direction, g.ma_distribution_min_ratio ) log.info( f" MA5分布过滤: 方向 {direction}, 有效天数 " f"{distribution_stats['valid_days']}/{distribution_stats['lookback_days']}," f"满足天数 {distribution_stats['qualified_days']}/{distribution_stats['required_days']}" ) if not distribution_passed: insufficiency = distribution_stats['valid_days'] < distribution_stats['lookback_days'] reason = "有效数据不足" if insufficiency else "满足天数不足" log.info( f" {symbol}({dominant_future}) ✗ MA5分布过滤未通过({reason})" ) add_to_excluded_contracts(dominant_future, 'ma5_distribution', current_trading_day) continue # 步骤8:检查历史均线模式一致性 consistency_passed, consistency_ratio = check_historical_ma_pattern_consistency( historical_data, direction, g.ma_pattern_lookback_days, g.ma_pattern_consistency_threshold ) if not consistency_passed: log.info(f" {symbol}({dominant_future}) ✗ 历史均线模式一致性不足 " f"({consistency_ratio:.1%} < {g.ma_pattern_consistency_threshold:.1%}),跳过") add_to_excluded_contracts(dominant_future, 'ma_consistency', current_trading_day) continue else: log.info(f" {symbol}({dominant_future}) ✓ 历史均线模式一致性检查通过 " f"({consistency_ratio:.1%} >= {g.ma_pattern_consistency_threshold:.1%})") # 步骤9:检查开盘价差(可配置开关) if g.enable_open_gap_filter: gap_check_passed = check_open_gap_filter( symbol=symbol, dominant_future=dominant_future, direction=direction, yesterday_close=yesterday_close, today_open=today_open, current_trading_day=current_trading_day ) if not gap_check_passed: continue else: log.info(" 已关闭开盘价差过滤(enable_open_gap_filter=False),跳过该检查") # 步骤10:将通过检查的品种加入候选列表 g.daily_ma_candidates[dominant_future] = { 'symbol': symbol, 'direction': direction, 'open_price': today_open, 'yesterday_close': yesterday_close, 'yesterday_open': yesterday_open, 'ma_values': ma_values, 'ma_compaction': ma_compaction } log.info(f" ✓✓ {symbol} 通过均线和开盘价差检查,加入候选列表") except Exception as e: g.ma_checked_underlyings.pop(symbol, None) log.warning(f"{symbol} 检查时出错: {str(e)}") continue log.info(f"候选列表更新完成,当前候选品种: {list(g.daily_ma_candidates.keys())}") log.info("=" * 60) def check_open_and_stop(context): """统一的开仓和止损止盈检查函数""" # 先检查换月移仓 log.info("=" * 60) current_trading_day = get_current_trading_day(context.current_dt) log.info(f"执行开仓和止损止盈检查 - 时间: {context.current_dt}, 交易日: {current_trading_day}") log.info("=" * 60) log.info(f"先检查换月:") position_auto_switch(context) # 获取当前时间 current_time = str(context.current_dt.time())[:2] # 判断是否为夜盘时间 is_night_session = (current_time in ['21', '22', '23', '00', '01', '02']) # 检查是否禁止当晚操作 if is_night_session and g.night_session_blocked: blocked_trading_day = normalize_trade_day_value(g.night_session_blocked_trading_day) if g.night_session_blocked_trading_day else None current_trading_day_normalized = normalize_trade_day_value(current_trading_day) if blocked_trading_day == current_trading_day_normalized: log.info(f"当晚操作已被禁止(订单状态为'new',无夜盘),跳过所有操作") return # 得到当前未完成订单 orders = get_open_orders() # 循环,撤销订单 if len(orders) == 0: log.debug(f"无未完成订单") else: for _order in orders.values(): log.debug(f"order: {_order}") cancel_order(_order) # 第一步:检查开仓条件 log.info(f"检查开仓条件:") if g.daily_ma_candidates: log.info("=" * 60) log.info(f"执行开仓检查 - 时间: {context.current_dt}, 候选品种数量: {len(g.daily_ma_candidates)}") # 遍历候选品种 candidates_to_remove = [] for dominant_future, candidate_info in g.daily_ma_candidates.items(): try: symbol = candidate_info['symbol'] direction = candidate_info['direction'] open_price = candidate_info['open_price'] yesterday_close = candidate_info.get('yesterday_close') yesterday_open = candidate_info.get('yesterday_open') ma_compaction = candidate_info.get('ma_compaction') # 检查是否已有持仓 if check_symbol_prefix_match(dominant_future, context, set(g.trade_history.keys())): log.info(f"{symbol} 已有持仓,从候选列表移除") candidates_to_remove.append(dominant_future) continue # 获取当前价格 current_data = get_current_data()[dominant_future] current_price = current_data.last_price # 计算当天价差 intraday_diff = current_price - open_price intraday_diff_ratio = intraday_diff / open_price log.info(f"{symbol}({dominant_future}) 开仓条件检查:") log.info(f" 方向: {direction}, 开盘价: {open_price:.2f}, 当前价: {current_price:.2f}, " f"当天价差: {intraday_diff:.2f}, 变化比例: {intraday_diff_ratio:.2%}") margin_limit = adjust_max_margin_per_position(ma_compaction) if margin_limit is None: compaction_str = f"{ma_compaction:.4f}" if ma_compaction is not None else "NA" log.info( f" ✗ 均线聚合度 {compaction_str} 超出允许范围(>{g.ma_compaction_thresholds['loose']:.4f}),跳过开仓" ) candidates_to_remove.append(dominant_future) continue else: log.info(f" 动态单标的保证金上限: {margin_limit:.0f}") # 判断是否满足开仓条件 - 仅检查均线穿越得分 should_open = True log.info(f" 开仓条件简化:仅检查均线穿越得分,跳过策略1,2,3的判断") if should_open: ma_values = candidate_info.get('ma_values') or {} avg_5day_change = calculate_recent_average_change(dominant_future, days=5) entry_snapshot = { 'yesterday_close': yesterday_close, 'today_open': open_price, 'ma_values': ma_values.copy() if ma_values else {}, 'avg_5day_change': avg_5day_change } cross_score, score_details = calculate_ma_cross_score(open_price, current_price, ma_values, direction) # 根据当前时间调整所需的均线穿越得分阈值 current_time_str = str(context.current_dt.time())[:5] # HH:MM格式 required_cross_score = g.ma_cross_threshold if current_time_str != '14:55': # 在14:55以外的时间,需要更高的得分阈值 required_cross_score = g.ma_cross_threshold + 1 log.info(f" 均线穿越得分: {cross_score}, 得分详情: {score_details}, 当前时间: {current_time_str}, 所需阈值: {required_cross_score}") # 检查得分是否满足条件 score_passed = False if cross_score >= required_cross_score: # 如果得分达到阈值,检查特殊情况:只有1分且来自MA5 if cross_score == 1 and required_cross_score == 1: # 检查这一分是否来自MA5 from_ma5_only = len(score_details) == 1 and score_details[0]['period'] == 5 if not from_ma5_only: score_passed = True log.info(f" ✓ 均线穿越得分检查通过:1分且非来自MA5") else: log.info(f" ✗ 均线穿越得分检查未通过:1分且仅来自MA5") else: score_passed = True log.info(f" ✓ 均线穿越得分检查通过") if not score_passed: log.info(f" ✗ 均线穿越得分不足或不符合条件({cross_score} < {required_cross_score} 或 1分来自MA5),跳过开仓") continue if current_time_str == '14:55': positive_cross_periods = {detail['period'] for detail in score_details if detail.get('delta', 0) > 0} if positive_cross_periods == {30}: log.info(" ✗ 尾盘仅穿越MA30,跳过开仓") continue # 执行开仓 log.info(f" 准备开仓: {symbol} {direction}") target_hands, single_hand_margin, single_hand_exceeds_limit = calculate_target_hands( context, dominant_future, direction, max_margin_override=margin_limit ) if target_hands > 0: success = open_position( context, dominant_future, target_hands, direction, single_hand_margin, single_hand_exceeds_limit, f'均线形态开仓', crossed_ma_details=score_details, entry_snapshot=entry_snapshot ) if success: log.info(f" ✓✓ {symbol} 开仓成功,从候选列表移除") candidates_to_remove.append(dominant_future) else: log.warning(f" ✗ {symbol} 开仓失败") else: log.warning(f" ✗ {symbol} 计算目标手数为0,跳过开仓") except Exception as e: log.warning(f"{dominant_future} 处理时出错: {str(e)}") continue finally: adjust_max_margin_per_position(None) # 从候选列表中移除已开仓的品种 for future in candidates_to_remove: if future in g.daily_ma_candidates: del g.daily_ma_candidates[future] log.info(f"剩余候选品种: {list(g.daily_ma_candidates.keys())}") log.info("=" * 60) # 第二步:检查止损止盈 log.info(f"检查止损止盈条件:") subportfolio = context.subportfolios[0] long_positions = list(subportfolio.long_positions.values()) short_positions = list(subportfolio.short_positions.values()) closed_count = 0 skipped_count = 0 for position in long_positions + short_positions: security = position.security underlying_symbol = security.split('.')[0][:-4] # 检查交易时间适配性 has_night_session = get_futures_config(underlying_symbol, 'has_night_session', False) # 如果是夜盘时间,但品种不支持夜盘交易,则跳过 if is_night_session and not has_night_session: skipped_count += 1 continue # 执行止损止盈检查 if check_position_stop_loss_profit(context, position): closed_count += 1 if closed_count > 0: log.info(f"执行了 {closed_count} 次止损止盈") if skipped_count > 0: log.info(f"夜盘时间跳过 {skipped_count} 个日间品种的止损止盈检查") def check_ma_trailing_reactivation(context): """检查是否需要恢复均线跟踪止盈""" subportfolio = context.subportfolios[0] positions = list(subportfolio.long_positions.values()) + list(subportfolio.short_positions.values()) if not positions: return reenabled_count = 0 current_data = get_current_data() for position in positions: security = position.security trade_info = g.trade_history.get(security) if not trade_info or trade_info.get('ma_trailing_enabled', True): continue direction = trade_info['direction'] ma_values = calculate_realtime_ma_values(security, [5]) ma5_value = ma_values.get('ma5') if ma5_value is None or security not in current_data: continue today_price = current_data[security].last_price if direction == 'long' and today_price > ma5_value: trade_info['ma_trailing_enabled'] = True reenabled_count += 1 log.info(f"恢复均线跟踪止盈: {security} {direction}, 当前价 {today_price:.2f} > MA5 {ma5_value:.2f}") elif direction == 'short' and today_price < ma5_value: trade_info['ma_trailing_enabled'] = True reenabled_count += 1 log.info(f"恢复均线跟踪止盈: {security} {direction}, 当前价 {today_price:.2f} < MA5 {ma5_value:.2f}") if reenabled_count > 0: log.info(f"恢复均线跟踪止盈持仓数量: {reenabled_count}") def check_position_stop_loss_profit(context, position): """检查单个持仓的止损止盈""" log.info(f"检查持仓: {position.security}") security = position.security if security not in g.trade_history: return False trade_info = g.trade_history[security] direction = trade_info['direction'] entry_price = trade_info['entry_price'] entry_time = trade_info['entry_time'] entry_trading_day = trade_info.get('entry_trading_day') if entry_trading_day is None: entry_trading_day = get_current_trading_day(entry_time) trade_info['entry_trading_day'] = entry_trading_day if entry_trading_day is not None: entry_trading_day = normalize_trade_day_value(entry_trading_day) current_trading_day = normalize_trade_day_value(get_current_trading_day(context.current_dt)) current_price = position.price # 计算当前盈亏比率 if direction == 'long': profit_rate = (current_price - entry_price) / entry_price else: profit_rate = (entry_price - current_price) / entry_price # 检查固定止损 log.info("=" * 60) log.info(f"检查固定止损:") log.info("=" * 60) if profit_rate <= -g.fixed_stop_loss_rate: log.info(f"{security} {direction} 触发固定止损 {g.fixed_stop_loss_rate:.3%}, 当前亏损率: {profit_rate:.3%}, " f"成本价: {entry_price:.2f}, 当前价格: {current_price:.2f}") close_position(context, security, direction) return True else: log.debug(f"{security} {direction} 未触发固定止损 {g.fixed_stop_loss_rate:.3%}, 当前亏损率: {profit_rate:.3%}, " f"成本价: {entry_price:.2f}, 当前价格: {current_price:.2f}") if entry_trading_day is not None and entry_trading_day == current_trading_day: log.info(f"{security} 建仓交易日内跳过动态止盈检查") return False # 检查是否启用均线跟踪止盈 log.info("=" * 60) log.info(f"检查是否启用均线跟踪止盈:") log.info("=" * 60) if not trade_info.get('ma_trailing_enabled', True): log.debug(f"{security} {direction} 未启用均线跟踪止盈") return False # 检查均线跟踪止盈 # 获取持仓天数 entry_date = entry_time.date() current_date = context.current_dt.date() all_trade_days = get_all_trade_days() holding_days = sum((entry_date <= d <= current_date) for d in all_trade_days) # 计算变化率 today_price = get_current_data()[security].last_price avg_daily_change_rate = calculate_average_daily_change_rate(security) historical_data = attribute_history(security, 1, '1d', ['close']) yesterday_close = historical_data['close'].iloc[-1] today_change_rate = abs((today_price - yesterday_close) / yesterday_close) # 根据时间判断使用的偏移量 current_time = context.current_dt.time() target_time = datetime.strptime('14:55:00', '%H:%M:%S').time() if current_time > target_time: offset_ratio = g.ma_offset_ratio_close log.debug(f"当前时间是:{current_time},使用偏移量: {offset_ratio:.3%}") else: offset_ratio = g.ma_offset_ratio_normal log.debug(f"当前时间是:{current_time},使用偏移量: {offset_ratio:.3%}") # 选择止损均线 close_line = None if today_change_rate >= 1.5 * avg_daily_change_rate: close_line = 'ma5' # 波动剧烈时用短周期 elif holding_days <= g.days_for_adjustment: close_line = 'ma5' # 持仓初期用短周期 else: close_line = 'ma5' if today_change_rate >= 1.2 * avg_daily_change_rate else 'ma10' # 计算实时均线值 ma_values = calculate_realtime_ma_values(security, [5, 10]) ma_value = ma_values[close_line] # 应用偏移量 if direction == 'long': adjusted_ma_value = ma_value * (1 - offset_ratio) else: adjusted_ma_value = ma_value * (1 + offset_ratio) # 判断是否触发均线止损 if (direction == 'long' and today_price < adjusted_ma_value) or \ (direction == 'short' and today_price > adjusted_ma_value): log.info(f"触发均线跟踪止盈 {security} {direction}, 止损均线: {close_line}, " f"均线值: {ma_value:.2f}, 调整后: {adjusted_ma_value:.2f}, " f"当前价: {today_price:.2f}, 持仓天数: {holding_days}") close_position(context, security, direction) return True else: log.debug(f"未触发均线跟踪止盈 {security} {direction}, 止损均线: {close_line}, " f"均线值: {ma_value:.2f}, 调整后: {adjusted_ma_value:.2f}, " f"当前价: {today_price:.2f}, 持仓天数: {holding_days}") return False ############################ 核心辅助函数 ################################### def calculate_ma_values(data, periods): """计算均线值 Args: data: DataFrame,包含'close'列的历史数据(最后一行是最新的数据) periods: list,均线周期列表,如[5, 10, 20, 30] Returns: dict: {'MA5': value, 'MA10': value, 'MA20': value, 'MA30': value} 返回最后一行(最新日期)的各周期均线值 """ ma_values = {} for period in periods: if len(data) >= period: # 计算最后period天的均线值 ma_values[f'MA{period}'] = data['close'].iloc[-period:].mean() else: ma_values[f'MA{period}'] = None return ma_values def calculate_ma_cross_score(open_price, current_price, ma_values, direction): """根据开盘价与当前价统计多周期均线穿越得分 返回: (总得分, 得分详情列表) 得分详情: [{'period': 5, 'delta': 1}, {'period': 10, 'delta': 1}, ...] """ if not ma_values: return 0, [] assert direction in ('long', 'short') score = 0 score_details = [] periods = getattr(g, 'ma_cross_periods', g.ma_periods) for period in periods: key = f'MA{period}' ma_value = ma_values.get(key) if ma_value is None: continue cross_up = open_price < ma_value and current_price > ma_value cross_down = open_price > ma_value and current_price < ma_value if not (cross_up or cross_down): continue if direction == 'long': delta = 1 if cross_up else -1 else: delta = -1 if cross_up else 1 score += delta score_details.append({'period': period, 'delta': delta}) log.debug( f" 均线穿越[{key}] - 开盘 {open_price:.2f}, 当前 {current_price:.2f}, " f"均线 {ma_value:.2f}, 方向 {direction}, 增量 {delta}, 当前得分 {score}" ) return score, score_details def calculate_ma_proximity_counts(data, periods, lookback_days): """统计近 lookback_days 天收盘价贴近各均线的次数""" proximity_counts = {f'MA{period}': 0 for period in periods} if len(data) < lookback_days: return proximity_counts closes = data['close'].iloc[-lookback_days:] ma_series = { period: data['close'].rolling(window=period).mean().iloc[-lookback_days:] for period in periods } for idx, close_price in enumerate(closes): min_diff = None closest_period = None for period in periods: ma_value = ma_series[period].iloc[idx] if pd.isna(ma_value): continue diff = abs(close_price - ma_value) if min_diff is None or diff < min_diff: min_diff = diff closest_period = period if closest_period is not None: proximity_counts[f'MA{closest_period}'] += 1 return proximity_counts def calculate_extreme_trend_days(data, periods, lookback_days): """统计过去 lookback_days 天收盘价相对所有均线的极端趋势天数""" if len(data) < lookback_days: return 0, 0 recent_closes = data['close'].iloc[-lookback_days:] ma_series = { period: data['close'].rolling(window=period).mean().iloc[-lookback_days:] for period in periods } above_count = 0 below_count = 0 for idx, close_price in enumerate(recent_closes): ma_values = [] valid = True for period in periods: ma_value = ma_series[period].iloc[idx] if pd.isna(ma_value): valid = False break ma_values.append(ma_value) if not valid or not ma_values: continue if all(close_price > ma_value for ma_value in ma_values): above_count += 1 elif all(close_price < ma_value for ma_value in ma_values): below_count += 1 return above_count, below_count def check_ma5_distribution_filter(data, lookback_days, direction, min_ratio): """检查近 lookback_days 天收盘价相对于MA5的分布情况""" stats = { 'lookback_days': lookback_days, 'valid_days': 0, 'qualified_days': 0, 'required_days': max(0, math.ceil(lookback_days * min_ratio)) } if lookback_days <= 0: return True, stats if len(data) < max(lookback_days, 5): return False, stats recent_closes = data['close'].iloc[-lookback_days:] ma5_series = data['close'].rolling(window=5).mean().iloc[-lookback_days:] for close_price, ma5_value in zip(recent_closes, ma5_series): log.debug(f"close_price: {close_price}, ma5_value: {ma5_value}") if pd.isna(ma5_value): continue stats['valid_days'] += 1 if direction == 'long' and close_price < ma5_value: stats['qualified_days'] += 1 elif direction == 'short' and close_price > ma5_value: stats['qualified_days'] += 1 if stats['valid_days'] < lookback_days: return False, stats return stats['qualified_days'] >= stats['required_days'], stats def check_open_gap_filter(symbol, dominant_future, direction, yesterday_close, today_open, current_trading_day): """开盘价差过滤辅助函数 根据当前策略模式(`g.ma_gap_strategy_mode`)和对应阈值检查开盘价差是否符合方向要求。 Args: symbol: 品种代码(如 'AO') dominant_future: 主力合约代码(如 'AO2502.XSGE') direction: 方向,'long' 或 'short' yesterday_close: 前一交易日收盘价 today_open: 当日开盘价 current_trading_day: 当前期货交易日 Returns: bool: True 表示通过开盘价差过滤,False 表示未通过(并已加入排除缓存) """ open_gap_ratio = (today_open - yesterday_close) / yesterday_close log.info( f" 开盘价差检查: 昨收 {yesterday_close:.2f}, 今开 {today_open:.2f}, " f"价差比例 {open_gap_ratio:.2%}" ) gap_check_passed = False if g.ma_gap_strategy_mode == 1: # 方案1:多头检查上跳,空头检查下跳 if direction == 'long' and open_gap_ratio >= g.ma_open_gap_threshold: log.info( f" {symbol}({dominant_future}) ✓ 方案1多头开盘价差检查通过 " f"({open_gap_ratio:.2%} >= {g.ma_open_gap_threshold:.2%})" ) gap_check_passed = True elif direction == 'short' and open_gap_ratio <= -g.ma_open_gap_threshold: log.info( f" {symbol}({dominant_future}) ✓ 方案1空头开盘价差检查通过 " f"({open_gap_ratio:.2%} <= {-g.ma_open_gap_threshold:.2%})" ) gap_check_passed = True elif g.ma_gap_strategy_mode == 2 or g.ma_gap_strategy_mode == 3: # 方案2和方案3:多头检查下跳,空头检查上跳 if direction == 'long' and open_gap_ratio <= -g.ma_open_gap_threshold2: log.info( f" {symbol}({dominant_future}) ✓ 方案{g.ma_gap_strategy_mode}多头开盘价差检查通过 " f"({open_gap_ratio:.2%} <= {-g.ma_open_gap_threshold2:.2%})" ) gap_check_passed = True elif direction == 'short' and open_gap_ratio >= g.ma_open_gap_threshold2: log.info( f" {symbol}({dominant_future}) ✓ 方案{g.ma_gap_strategy_mode}空头开盘价差检查通过 " f"({open_gap_ratio:.2%} >= {g.ma_open_gap_threshold2:.2%})" ) gap_check_passed = True if not gap_check_passed: add_to_excluded_contracts(dominant_future, 'open_gap', current_trading_day) return False return True def check_ma_pattern(ma_values, direction): """检查均线排列模式是否符合方向要求 Args: ma_values: dict,包含MA5, MA10, MA20, MA30的均线值 direction: str,'long'或'short' Returns: bool: 是否符合均线排列要求 """ ma5 = ma_values['MA5'] ma10 = ma_values['MA10'] ma20 = ma_values['MA20'] ma30 = ma_values['MA30'] if direction == 'long': # 多头模式:MA30 <= MA20 <= MA10 <= MA5 或 MA30 <= MA20 <= MA5 <= MA10 # 或者:MA20 <= MA30 <= MA10 <= MA5 或 MA20 <= MA30 <= MA5 <= MA10 pattern1 = (ma30 <= ma20 <= ma10 <= ma5) pattern2 = (ma30 <= ma20 <= ma5 <= ma10) pattern3 = (ma20 <= ma30 <= ma10 <= ma5) pattern4 = (ma20 <= ma30 <= ma5 <= ma10) return pattern1 or pattern2 or pattern3 or pattern4 elif direction == 'short': # 空头模式:MA10 <= MA5 <= MA20 <= MA30 或 MA5 <= MA10 <= MA20 <= MA30 # 或者:MA10 <= MA5 <= MA30 <= MA20 或 MA5 <= MA10 <= MA30 <= MA20 pattern1 = (ma10 <= ma5 <= ma20 <= ma30) pattern2 = (ma5 <= ma10 <= ma20 <= ma30) pattern3 = (ma10 <= ma5 <= ma30 <= ma20) pattern4 = (ma5 <= ma10 <= ma30 <= ma20) return pattern1 or pattern2 or pattern3 or pattern4 else: return False def check_historical_ma_pattern_consistency(historical_data, direction, lookback_days, consistency_threshold): """检查历史均线模式的一致性 Args: historical_data: DataFrame,包含足够天数的历史数据 direction: str,'long'或'short' lookback_days: int,检查过去多少天 consistency_threshold: float,一致性阈值(0-1之间) Returns: tuple: (bool, float) - (是否通过一致性检查, 实际一致性比例) """ if len(historical_data) < max(g.ma_periods) + lookback_days: # 历史数据不足 return False, 0.0 match_count = 0 total_count = lookback_days # log.debug(f"历史均线模式一致性检查: {direction}, 检查过去{lookback_days}天的数据") # log.debug(f"历史数据: {historical_data}") # 检查过去lookback_days天的均线模式 for i in range(lookback_days): # 获取倒数第(i+1)天的数据(i=0时是昨天,i=1时是前天,依此类推) end_idx = -(i + 1) # 获取这一天的具体日期 date = historical_data.index[end_idx].date() # 获取到该天(包括该天)为止的所有数据 if i == 0: data_slice = historical_data else: data_slice = historical_data.iloc[:-i] # 计算该天的均线值 # log.debug(f"对于倒数第{i+1}天,end_idx: {end_idx},日期: {date},计算均线值: {data_slice}") ma_values = calculate_ma_values(data_slice, g.ma_periods) # log.debug(f"end_idx: {end_idx},日期: {date},倒数第{i+1}天的均线值: {ma_values}") # 检查是否符合模式 if check_ma_pattern(ma_values, direction): match_count += 1 # log.debug(f"日期: {date},对于倒数第{i+1}天,历史均线模式一致性检查: {direction} 符合模式") # else: # log.debug(f"日期: {date},对于倒数第{i+1}天,历史均线模式一致性检查: {direction} 不符合模式") consistency_ratio = match_count / total_count passed = consistency_ratio >= consistency_threshold return passed, consistency_ratio ############################ 交易执行函数 ################################### def open_position(context, security, target_hands, direction, single_hand_margin, single_hand_exceeds_limit, reason='', crossed_ma_details=None, entry_snapshot=None): """开仓并可选地在成交后校验保证金 Args: single_hand_exceeds_limit: bool,当单手保证金超过最大限制时为True """ try: # 记录交易前的可用资金 cash_before = context.portfolio.available_cash # 根据单手保证金情况选择下单方式 use_value_order = (not single_hand_exceeds_limit) and getattr(g, 'max_margin_per_position', 0) > 0 if use_value_order: target_value = g.max_margin_per_position order = order_target_value(security, target_value, side=direction) log.debug(f"使用order_target_value下单: {security} {direction} 目标价值 {target_value}") else: order = order_target(security, target_hands, side=direction) log.debug(f"使用order_target按手数下单: {security} {direction} 目标手数 {target_hands}") log.debug(f"order: {order}") # 检查订单状态,如果为'new'说明当晚没有夜盘 if order is not None: order_status = str(order.status).lower() if order_status == 'new': # 取消订单 cancel_order(order) current_trading_day = get_current_trading_day(context.current_dt) g.night_session_blocked = True g.night_session_blocked_trading_day = current_trading_day log.warning(f"订单状态为'new',说明{current_trading_day}当晚没有夜盘,已取消订单: {security} {direction} {target_hands}手,并禁止当晚所有操作") return False if order is not None and order.filled > 0: # 记录交易后的可用资金 cash_after = context.portfolio.available_cash # 计算实际资金变化 cash_change = cash_before - cash_after # 计算保证金变化 margin_change = single_hand_margin * target_hands # 获取订单价格和数量 order_price = order.avg_cost if order.avg_cost else order.price order_amount = order.filled # 记录当日交易 underlying_symbol = security.split('.')[0][:-4] g.today_trades.append({ 'security': security, # 合约代码 'underlying_symbol': underlying_symbol, # 标的代码 'direction': direction, # 方向 'order_amount': order_amount, # 订单数量 'order_price': order_price, # 订单价格 'cash_change': cash_change, # 资金变化 'margin_change': margin_change, # 保证金 'time': context.current_dt # 时间 }) # 记录交易信息 entry_trading_day = get_current_trading_day(context.current_dt) # 处理穿越均线信息 crossed_ma_lines = [] if crossed_ma_details: crossed_ma_lines = [f"MA{detail['period']}" for detail in crossed_ma_details if detail.get('delta', 0) > 0] snapshot_copy = None if entry_snapshot: snapshot_copy = { 'yesterday_close': entry_snapshot.get('yesterday_close'), 'today_open': entry_snapshot.get('today_open'), 'ma_values': (entry_snapshot.get('ma_values') or {}).copy(), 'avg_5day_change': entry_snapshot.get('avg_5day_change') } g.trade_history[security] = { 'entry_price': order_price, 'target_hands': target_hands, 'actual_hands': order_amount, 'actual_margin': margin_change, 'direction': direction, 'entry_time': context.current_dt, 'entry_trading_day': entry_trading_day, 'crossed_ma_lines': crossed_ma_lines, # 记录穿越的均线 'entry_snapshot': snapshot_copy } ma_trailing_enabled = True ma_values_at_entry = calculate_realtime_ma_values(security, [5]) ma5_value = ma_values_at_entry.get('ma5') if ma5_value is not None: if direction == 'long' and order_price < ma5_value: ma_trailing_enabled = False log.info(f"禁用均线跟踪止盈: {security} {direction}, 开仓价 {order_price:.2f} < MA5 {ma5_value:.2f}") elif direction == 'short' and order_price > ma5_value: ma_trailing_enabled = False log.info(f"禁用均线跟踪止盈: {security} {direction}, 开仓价 {order_price:.2f} > MA5 {ma5_value:.2f}") g.trade_history[security]['ma_trailing_enabled'] = ma_trailing_enabled crossed_ma_str = ', '.join(crossed_ma_lines) if crossed_ma_lines else '无' log.info(f"开仓成功: {security} {direction} {order_amount}手 @{order_price:.2f}, " f"保证金: {margin_change:.0f}, 资金变化: {cash_change:.0f}, 原因: {reason}, " f"穿越均线: {crossed_ma_str}") return True except Exception as e: log.warning(f"开仓失败 {security}: {str(e)}") return False def close_position(context, security, direction): """平仓""" try: # 使用order_target平仓到0手 order = order_target(security, 0, side=direction) if order is not None and order.filled > 0: underlying_symbol = security.split('.')[0][:-4] # 记录当日交易(平仓) g.today_trades.append({ 'security': security, 'underlying_symbol': underlying_symbol, 'direction': direction, 'order_amount': -order.filled, 'order_price': order.avg_cost if order.avg_cost else order.price, 'cash_change': 0, 'time': context.current_dt }) log.info(f"平仓成功: {underlying_symbol} {direction} {order.filled}手") # 从交易历史中移除 if security in g.trade_history: del g.trade_history[security] log.debug(f"从交易历史中移除: {security}") else: log.info(f"平仓失败: {security} {direction} {order.filled}手") return True except Exception as e: log.warning(f"平仓失败 {security}: {str(e)}") return False ############################ 辅助函数 ################################### def get_futures_config(underlying_symbol, config_key=None, default_value=None): """获取期货品种配置信息的辅助函数""" if underlying_symbol not in g.futures_config: if config_key and default_value is not None: return default_value return {} if config_key is None: return g.futures_config[underlying_symbol] return g.futures_config[underlying_symbol].get(config_key, default_value) def get_margin_rate(underlying_symbol, direction, default_rate=0.10): """获取保证金比例的辅助函数""" return g.futures_config.get(underlying_symbol, {}).get('margin_rate', {}).get(direction, default_rate) def get_multiplier(underlying_symbol, default_multiplier=10): """获取合约乘数的辅助函数""" return g.futures_config.get(underlying_symbol, {}).get('multiplier', default_multiplier) def is_financial_underlying(underlying_symbol): """判断标的是否属于金融期货""" if not underlying_symbol: return False return bool(get_futures_config(underlying_symbol, 'is_financial', False)) def has_financial_positions(context): """检查当前持仓中是否包含金融期货""" subportfolios = getattr(context, 'subportfolios', []) if not subportfolios: return False subportfolio = subportfolios[0] positions = list(getattr(subportfolio, 'long_positions', {}).values()) + \ list(getattr(subportfolio, 'short_positions', {}).values()) for position in positions: security = getattr(position, 'security', '') if not security: continue underlying = security.split('.')[0] if not underlying: continue symbol = underlying[:-4] if len(underlying) > 4 else underlying if is_financial_underlying(symbol): return True return False def add_to_excluded_contracts(dominant_future, reason, current_trading_day): """将合约添加到排除缓存""" g.excluded_contracts[dominant_future] = { 'reason': reason, 'trading_day': current_trading_day } def has_reached_trading_start(current_dt, trading_start_time_str, has_night_session=False): """判断当前是否已到达合约允许交易的起始时间""" if not trading_start_time_str: return True try: hour, minute = [int(part) for part in trading_start_time_str.split(':')[:2]] except Exception: return True start_time = time(hour, minute) current_time = current_dt.time() if has_night_session: if current_time >= start_time: return True if current_time < time(12, 0): return True if time(8, 30) <= current_time <= time(15, 30): return True return False if current_time < start_time: return False if current_time >= time(20, 0): return False return True def calculate_target_hands(context, security, direction, max_margin_override=None): """计算目标开仓手数 Returns: tuple: (target_hands, single_hand_margin, single_hand_exceeds_limit) """ current_price = get_current_data()[security].last_price underlying_symbol = security.split('.')[0][:-4] # 使用保证金比例 margin_rate = get_margin_rate(underlying_symbol, direction) multiplier = get_multiplier(underlying_symbol) # 计算单手保证金 single_hand_margin = current_price * multiplier * margin_rate log.debug(f"计算单手保证金: {current_price:.2f} * {multiplier:.2f} * {margin_rate:.2f} = {single_hand_margin:.2f}") # 还要考虑可用资金限制,先计算账户总资金(可用资金 + 已占用保证金) total_value = context.portfolio.total_value reserved_cash = total_value * g.reserve_percentage # 按比例预留的现金 available_cash = max(total_value - reserved_cash, 0) # log.debug(f"账户总资金: {total_value:.0f}, 预留现金: {reserved_cash:.0f}, 可用资金: {available_cash:.0f}") # 如果当前没有持有金融期货,则额外预留指定金额,避免被其他品种占用 if not has_financial_positions(context): reserve_amount = getattr(g, 'financial_reserve_amount', 0) log.debug(f"金融期货预留资金: {reserve_amount:.0f}") if reserve_amount > 0: available_cash = max(available_cash - reserve_amount, 0) log.debug(f"金融期货预留资金扣除后可用资金: {available_cash:.0f}") # 根据单个标的最大持仓保证金限制计算开仓数量 max_margin = max_margin_override if max_margin_override is not None else g.max_margin_per_position single_hand_exceeds_limit = single_hand_margin > max_margin if not single_hand_exceeds_limit: # 如果单手保证金不超过最大限制,计算最大可开仓手数 max_hands = int(max_margin / single_hand_margin) max_hands_by_cash = int(available_cash / single_hand_margin) # 取两者较小值 actual_hands = min(max_hands, max_hands_by_cash) # 确保至少开1手 actual_hands = max(1, actual_hands) log.info(f"单手保证金: {single_hand_margin:.0f}, 目标开仓手数: {actual_hands}") return actual_hands, single_hand_margin, single_hand_exceeds_limit else: # 如果单手保证金超过最大限制,默认开仓1手 actual_hands = 1 log.info(f"单手保证金: {single_hand_margin:.0f} 超过最大限制: {max_margin}, 默认开仓1手") return actual_hands, single_hand_margin, single_hand_exceeds_limit def check_symbol_prefix_match(symbol, context, hold_symbols): """检查是否有相似的持仓品种""" log.debug(f"检查持仓") symbol_prefix = symbol[:-9] long_positions = context.subportfolios[0].long_positions short_positions = context.subportfolios[0].short_positions log.debug(f"long_positions: {long_positions}, short_positions: {short_positions}") for hold_symbol in hold_symbols: hold_symbol_prefix = hold_symbol[:-9] if len(hold_symbol) > 9 else hold_symbol if symbol_prefix == hold_symbol_prefix: return True return False def calculate_average_daily_change_rate(security, days=30): """计算日均变化率""" historical_data = attribute_history(security, days + 1, '1d', ['close']) daily_change_rates = abs(historical_data['close'].pct_change()).iloc[1:] return daily_change_rates.mean() def calculate_realtime_ma_values(security, ma_periods): """计算包含当前价格的实时均线值""" if not ma_periods: return {} max_period = max(ma_periods) history_days = max_period + MA_ADDITIONAL_HISTORY_DAYS historical_data = attribute_history(security, history_days, '1d', ['close']) today_price = get_current_data()[security].last_price close_prices = historical_data['close'].tolist() + [today_price] ma_values = {} for period in ma_periods: if len(close_prices) >= period: ma_values[f'ma{period}'] = sum(close_prices[-period:]) / period else: ma_values[f'ma{period}'] = None return ma_values def calculate_recent_average_change(security, days=5): """计算最近days天日间收盘涨幅的平均值""" if days <= 0: return None try: history = attribute_history(security, days + 1, '1d', ['close']) except Exception as e: log.warning(f"{security} 获取历史数据失败(均值涨幅计算): {str(e)}") return None closes = history['close'] if len(closes) < days + 1: return None pct_changes = closes.pct_change().dropna().abs() # 取绝对值,更关心涨跌幅度而不是方向 if len(pct_changes) < days: return None return pct_changes.iloc[-days:].mean() def calculate_ma_compaction_from_values(ma_values, periods=(5, 10, 20)): """基于给定的均线数值计算聚合度(标准差/均值)""" if not ma_values: return None values = [] for period in periods: key = f'MA{period}' value = ma_values.get(key) if value is None: return None values.append(value) mean_val = np.mean(values) if mean_val == 0: return None std_val = np.std(values, ddof=0) return std_val / mean_val def determine_margin_limit_from_compaction(compaction_value): """根据均线聚合度返回对应的最大持仓保证金上限""" base_limit = getattr(g, 'base_max_margin_per_position', getattr(g, 'max_margin_per_position', 30000)) if compaction_value is None: return base_limit thresholds = getattr(g, 'ma_compaction_thresholds', MA_COMPACTION_THRESHOLDS) tight_threshold = thresholds.get('tight', MA_COMPACTION_THRESHOLDS['tight']) balanced_threshold = thresholds.get('balanced', MA_COMPACTION_THRESHOLDS['balanced']) loose_threshold = thresholds.get('loose', MA_COMPACTION_THRESHOLDS['loose']) if compaction_value <= tight_threshold: return 40000 if compaction_value <= balanced_threshold: return 30000 if compaction_value <= loose_threshold: return 20000 return None def adjust_max_margin_per_position(compaction_value): """根据聚合度动态调整 g.max_margin_per_position,并返回新的上限""" margin_limit = determine_margin_limit_from_compaction(compaction_value) if margin_limit is None: return None g.max_margin_per_position = margin_limit return margin_limit def format_entry_details(entry_snapshot): """格式化记录到CSV的价格、均线及扩展信息""" if not entry_snapshot: return '' def fmt(value): return f"{value:.2f}" if value is not None else "NA" ma_values = entry_snapshot.get('ma_values') or {} snapshot_parts = [ f"prev_close:{fmt(entry_snapshot.get('yesterday_close'))}", f"open:{fmt(entry_snapshot.get('today_open'))}" ] for period in [5, 10, 20, 30, 60]: key_upper = f"MA{period}" key_lower = key_upper.lower() value = ma_values.get(key_upper) if value is None: value = ma_values.get(key_lower) snapshot_parts.append(f"{key_upper}:{fmt(value)}") avg_change = entry_snapshot.get('avg_5day_change') if avg_change is not None: snapshot_parts.append(f"AVG5:{avg_change:.4%}") else: snapshot_parts.append("AVG5:NA") return '|'.join(snapshot_parts) def save_today_new_positions_to_csv(context): """将当天新增的持仓记录追加保存到CSV文件""" log.info(f"保存当天新增的持仓记录到CSV文件") if not g.trade_history: return current_trading_day = normalize_trade_day_value(get_current_trading_day(context.current_dt)) # 筛选当天新开仓的记录 today_new_positions = [] for security, trade_info in g.trade_history.items(): entry_trading_day = normalize_trade_day_value(trade_info.get('entry_trading_day')) if entry_trading_day == current_trading_day: underlying_symbol = security.split('.')[0][:-4] crossed_ma_lines = trade_info.get('crossed_ma_lines', []) # 使用分号分隔多条均线,避免CSV格式问题 crossed_ma_str = ';'.join(crossed_ma_lines) if crossed_ma_lines else '' details = format_entry_details(trade_info.get('entry_snapshot')) today_new_positions.append({ 'security': security, 'underlying_symbol': underlying_symbol, 'direction': trade_info['direction'], 'entry_price': trade_info['entry_price'], 'actual_hands': trade_info['actual_hands'], 'actual_margin': trade_info['actual_margin'], 'entry_time': trade_info['entry_time'].strftime('%H:%M:%S'), # 只保留时间 'entry_trading_day': str(entry_trading_day), 'crossed_ma_lines': crossed_ma_str, 'details': details }) # 如果没有当天新增的记录,直接返回 if not today_new_positions: log.debug("当天无新增持仓记录,跳过保存") return try: filename = 'trade_history.csv' # 尝试读取现有文件 existing_content = '' file_exists = False try: existing_content_bytes = read_file(filename) if existing_content_bytes: existing_content = existing_content_bytes.decode('utf-8') file_exists = True except: pass # 准备CSV内容 csv_lines = [] # 如果文件不存在,添加表头 if not file_exists: header = 'security,underlying_symbol,direction,entry_price,actual_hands,actual_margin,entry_time,entry_trading_day,crossed_ma_lines,details' csv_lines.append(header) # 添加新记录 for pos in today_new_positions: line = f"{pos['security']},{pos['underlying_symbol']},{pos['direction']},{pos['entry_price']:.2f},{pos['actual_hands']},{pos['actual_margin']:.2f},{pos['entry_time']},{pos['entry_trading_day']},{pos['crossed_ma_lines']},{pos['details']}" csv_lines.append(line) # 合并内容:现有内容 + 新内容 new_content = '\n'.join(csv_lines) if file_exists: final_content = existing_content.rstrip('\n') + '\n' + new_content else: final_content = new_content # 写入文件 - 将字符串编码为bytes以符合JoinQuant API要求 write_file(filename, final_content.encode('utf-8')) log.info(f"已追加{len(today_new_positions)}条当天新增持仓记录到文件: {filename}") except Exception as e: log.warning(f"保存持仓记录到CSV文件时出错: {str(e)}") def sync_trade_history_with_positions(context): """同步g.trade_history与实际持仓,清理已平仓但记录未删除的持仓""" if not g.trade_history: return subportfolio = context.subportfolios[0] actual_positions = set(subportfolio.long_positions.keys()) | set(subportfolio.short_positions.keys()) # 找出g.trade_history中有记录但实际已平仓的合约 stale_records = [] for security in g.trade_history.keys(): if security not in actual_positions: stale_records.append(security) # 清理这些过期记录 if stale_records: log.info("=" * 60) log.info("发现持仓记录与实际持仓不同步,进行清理:") for security in stale_records: trade_info = g.trade_history[security] underlying_symbol = security.split('.')[0][:-4] log.info(f" 清理过期记录: {underlying_symbol}({security}) {trade_info['direction']}, " f"成本价: {trade_info['entry_price']:.2f}, " f"入场时间: {trade_info['entry_time']}") del g.trade_history[security] log.info(f"共清理 {len(stale_records)} 条过期持仓记录") log.info("=" * 60) def after_market_close(context): """收盘后运行函数""" log.info(str('函数运行时间(after_market_close):'+str(context.current_dt.time()))) # 保存当天新增的持仓记录到CSV文件 save_today_new_positions_to_csv(context) # 同步检查:清理g.trade_history中已平仓但记录未删除的持仓 sync_trade_history_with_positions(context) # 清空候选列表(每天重新检查) g.daily_ma_candidates = {} # 清空排除缓存(每天重新检查) excluded_count = len(g.excluded_contracts) if excluded_count > 0: log.info(f"清空排除缓存,共 {excluded_count} 个合约") g.excluded_contracts = {} # 重置夜盘禁止操作标志 if g.night_session_blocked: log.info(f"重置夜盘禁止操作标志") g.night_session_blocked = False g.night_session_blocked_trading_day = None # 只有当天有交易时才打印统计信息 if g.today_trades: print_daily_trading_summary(context) # 清空当日交易记录 g.today_trades = [] log.info('##############################################################') def print_daily_trading_summary(context): """打印当日交易汇总""" if not g.today_trades: return log.info("\n=== 当日交易汇总 ===") total_margin = 0 for trade in g.today_trades: if trade['order_amount'] > 0: # 开仓 log.info(f"开仓 {trade['underlying_symbol']} {trade['direction']} {trade['order_amount']}手 " f"价格:{trade['order_price']:.2f} 保证金:{trade['cash_change']:.0f}") total_margin += trade['cash_change'] else: # 平仓 log.info(f"平仓 {trade['underlying_symbol']} {trade['direction']} {abs(trade['order_amount'])}手 " f"价格:{trade['order_price']:.2f}") log.info(f"当日保证金占用: {total_margin:.0f}") log.info("==================\n") ########################## 自动移仓换月函数 ################################# def position_auto_switch(context, pindex=0, switch_func=None, callback=None): """期货自动移仓换月""" import re subportfolio = context.subportfolios[pindex] symbols = set(subportfolio.long_positions.keys()) | set(subportfolio.short_positions.keys()) switch_result = [] for symbol in symbols: match = re.match(r"(?P[A-Z]{1,})", symbol) if not match: raise ValueError("未知期货标的: {}".format(symbol)) else: underlying_symbol = match.groupdict()["underlying_symbol"] trading_start = get_futures_config(underlying_symbol, 'trading_start_time', None) has_night_session = get_futures_config(underlying_symbol, 'has_night_session', False) # log.debug(f"移仓换月: {symbol}, 交易开始时间: {trading_start}, 夜盘: {has_night_session}") if trading_start and not has_reached_trading_start(context.current_dt, trading_start, has_night_session): # log.info("{} 当前时间 {} 未到达交易开始时间 {} (夜盘:{} ),跳过移仓".format( # symbol, # context.current_dt.strftime('%H:%M:%S'), # trading_start, # has_night_session # )) continue dominant = get_dominant_future(underlying_symbol) cur = get_current_data() symbol_last_price = cur[symbol].last_price dominant_last_price = cur[dominant].last_price if dominant > symbol: for positions_ in (subportfolio.long_positions, subportfolio.short_positions): if symbol not in positions_.keys(): continue else : p = positions_[symbol] if switch_func is not None: switch_func(context, pindex, p, dominant) else: amount = p.total_amount # 跌停不能开空和平多,涨停不能开多和平空 if p.side == "long": symbol_low_limit = cur[symbol].low_limit dominant_high_limit = cur[dominant].high_limit if symbol_last_price <= symbol_low_limit: log.warning("标的{}跌停,无法平仓。移仓换月取消。".format(symbol)) continue elif dominant_last_price >= dominant_high_limit: log.warning("标的{}涨停,无法开仓。移仓换月取消。".format(dominant)) continue else: log.info("进行移仓换月: ({0},long) -> ({1},long)".format(symbol, dominant)) order_old = order_target(symbol, 0, side='long') if order_old != None and order_old.filled > 0: order_new = order_target(dominant, amount, side='long') if order_new != None and order_new.filled > 0: switch_result.append({"before": symbol, "after": dominant, "side": "long"}) # 换月成功,更新交易记录 if symbol in g.trade_history: # 复制旧的交易记录作为基础 old_entry_price = g.trade_history[symbol]['entry_price'] g.trade_history[dominant] = g.trade_history[symbol].copy() # 更新成本价为新合约的实际开仓价 new_entry_price = None if order_new.avg_cost and order_new.avg_cost > 0: new_entry_price = order_new.avg_cost g.trade_history[dominant]['entry_price'] = order_new.avg_cost elif order_new.price and order_new.price > 0: new_entry_price = order_new.price g.trade_history[dominant]['entry_price'] = order_new.price else: # 如果订单价格无效,使用当前价格作为成本价 new_entry_price = dominant_last_price g.trade_history[dominant]['entry_price'] = dominant_last_price # 更新入场时间 g.trade_history[dominant]['entry_time'] = context.current_dt # 更新入场交易日 g.trade_history[dominant]['entry_trading_day'] = get_current_trading_day(context.current_dt) # 删除旧合约的交易记录 del g.trade_history[symbol] log.info(f"移仓换月成本价更新: {symbol} -> {dominant}, " f"旧成本价: {old_entry_price:.2f}, 新成本价: {new_entry_price:.2f}") else: log.warning("标的{}交易失败,无法开仓。移仓换月失败。".format(dominant)) if p.side == "short": symbol_high_limit = cur[symbol].high_limit dominant_low_limit = cur[dominant].low_limit if symbol_last_price >= symbol_high_limit: log.warning("标的{}涨停,无法平仓。移仓换月取消。".format(symbol)) continue elif dominant_last_price <= dominant_low_limit: log.warning("标的{}跌停,无法开仓。移仓换月取消。".format(dominant)) continue else: log.info("进行移仓换月: ({0},short) -> ({1},short)".format(symbol, dominant)) order_old = order_target(symbol, 0, side='short') if order_old != None and order_old.filled > 0: order_new = order_target(dominant, amount, side='short') if order_new != None and order_new.filled > 0: switch_result.append({"before": symbol, "after": dominant, "side": "short"}) # 换月成功,更新交易记录 if symbol in g.trade_history: # 复制旧的交易记录作为基础 old_entry_price = g.trade_history[symbol]['entry_price'] g.trade_history[dominant] = g.trade_history[symbol].copy() # 更新成本价为新合约的实际开仓价 new_entry_price = None if order_new.avg_cost and order_new.avg_cost > 0: new_entry_price = order_new.avg_cost g.trade_history[dominant]['entry_price'] = order_new.avg_cost elif order_new.price and order_new.price > 0: new_entry_price = order_new.price g.trade_history[dominant]['entry_price'] = order_new.price else: # 如果订单价格无效,使用当前价格作为成本价 new_entry_price = dominant_last_price g.trade_history[dominant]['entry_price'] = dominant_last_price # 更新入场时间 g.trade_history[dominant]['entry_time'] = context.current_dt # 更新入场交易日 g.trade_history[dominant]['entry_trading_day'] = get_current_trading_day(context.current_dt) # 删除旧合约的交易记录 del g.trade_history[symbol] log.info(f"移仓换月成本价更新: {symbol} -> {dominant}, " f"旧成本价: {old_entry_price:.2f}, 新成本价: {new_entry_price:.2f}") else: log.warning("标的{}交易失败,无法开仓。移仓换月失败。".format(dominant)) if callback: callback(context, pindex, p, dominant) return switch_result