# 导入函数库 from jqdata import * from jqdata import finance import pandas as pd import numpy as np from datetime import date, datetime, timedelta, time import re # 均线形态交易策略 v001 # 基于均线走势(前提条件)+ K线形态(开盘价差、当天价差)的期货交易策略 # # 核心逻辑: # 1. 开盘时检查均线走势(MA30<=MA20<=MA10<=MA5为多头,反之为空头) # 2. 检查开盘价差是否符合方向要求(多头>=0.5%,空头<=-0.5%) # 3. 14:35和14:55检查当天价差(多头>0,空头<0),满足条件则开仓 # 4. 应用固定止损和动态追踪止盈 # 5. 自动换月移仓 # 设置以便完整打印 DataFrame pd.set_option('display.max_rows', None) pd.set_option('display.max_columns', None) pd.set_option('display.width', None) pd.set_option('display.max_colwidth', 20) ## 初始化函数,设定基准等等 def initialize(context): # 设定沪深300作为基准 set_benchmark('000300.XSHG') # 开启动态复权模式(真实价格) set_option('use_real_price', True) # 输出内容到日志 log.info('=' * 60) log.info('均线形态交易策略 v001 初始化开始') log.info('策略类型: 均线走势 + K线形态') log.info('=' * 60) ### 期货相关设定 ### # 设定账户为金融账户 set_subportfolios([SubPortfolioConfig(cash=context.portfolio.starting_cash, type='index_futures')]) # 期货类每笔交易时的手续费是: 买入时万分之0.23,卖出时万分之0.23,平今仓为万分之23 set_order_cost(OrderCost(open_commission=0.000023, close_commission=0.000023, close_today_commission=0.0023), type='index_futures') # 设置期货交易的滑点 set_slippage(StepRelatedSlippage(2)) # 初始化全局变量 g.usage_percentage = 0.8 # 最大资金使用比例 g.max_margin_per_position = 20000 # 单个标的最大持仓保证金(元) # 均线策略参数 g.ma_periods = [5, 10, 20, 30] # 均线周期 g.ma_historical_days = 60 # 获取历史数据天数(确保足够计算MA30) g.ma_open_gap_threshold = 0.002 # 方案1开盘价差阈值(0.2%) g.ma_pattern_lookback_days = 10 # 历史均线模式一致性检查的天数 g.ma_pattern_consistency_threshold = 0.8 # 历史均线模式一致性阈值(80%) g.check_intraday_spread = False # 是否检查日内价差(True: 检查, False: 跳过) g.ma_proximity_min_threshold = 8 # MA5与MA10贴近计数和的最低阈值 # 均线价差策略方案选择 g.ma_gap_strategy_mode = 2 # 策略模式选择(1: 原方案, 2: 新方案) g.ma_open_gap_threshold2 = 0.002 # 方案2开盘价差阈值(0.2%) g.ma_intraday_threshold_scheme2 = 0.005 # 方案2日内变化阈值(0.5%) # 止损止盈策略参数 g.fixed_stop_loss_rate = 0.01 # 固定止损比率(1%) g.ma_offset_ratio_normal = 0.003 # 均线跟踪止盈常规偏移量(0.3%) g.ma_offset_ratio_close = 0.01 # 均线跟踪止盈收盘前偏移量(1%) g.days_for_adjustment = 4 # 持仓天数调整阈值 # 输出策略参数 log.info("均线形态策略参数:") log.info(f" 均线周期: {g.ma_periods}") log.info(f" 策略模式: 方案{g.ma_gap_strategy_mode}") log.info(f" 方案1开盘价差阈值: {g.ma_open_gap_threshold:.1%}") log.info(f" 方案2开盘价差阈值: {g.ma_open_gap_threshold2:.1%}") log.info(f" 方案2日内变化阈值: {g.ma_intraday_threshold_scheme2:.1%}") log.info(f" 历史均线模式检查天数: {g.ma_pattern_lookback_days}天") log.info(f" 历史均线模式一致性阈值: {g.ma_pattern_consistency_threshold:.1%}") log.info(f" 均线贴近计数阈值: {g.ma_proximity_min_threshold}") log.info(f" 是否检查日内价差: {g.check_intraday_spread}") log.info(f" 固定止损: {g.fixed_stop_loss_rate:.1%}") log.info(f" 均线跟踪止盈常规偏移: {g.ma_offset_ratio_normal:.1%}") log.info(f" 均线跟踪止盈收盘前偏移: {g.ma_offset_ratio_close:.1%}") log.info(f" 持仓天数调整阈值: {g.days_for_adjustment}天") # 期货品种完整配置字典 g.futures_config = { # 贵金属 'AU': {'has_night_session': True, 'margin_rate': {'long': 0.14, 'short': 0.14}, 'multiplier': 1000, 'trading_start_time': '21:00'}, 'AG': {'has_night_session': True, 'margin_rate': {'long': 0.14, 'short': 0.14}, 'multiplier': 15, 'trading_start_time': '21:00'}, # 有色金属 'CU': {'has_night_session': True, 'margin_rate': {'long': 0.09, 'short': 0.09}, 'multiplier': 5, 'trading_start_time': '21:00'}, 'AL': {'has_night_session': True, 'margin_rate': {'long': 0.09, 'short': 0.09}, 'multiplier': 5, 'trading_start_time': '21:00'}, 'ZN': {'has_night_session': True, 'margin_rate': {'long': 0.09, 'short': 0.09}, 'multiplier': 5, 'trading_start_time': '21:00'}, 'PB': {'has_night_session': True, 'margin_rate': {'long': 0.09, 'short': 0.09}, 'multiplier': 5, 'trading_start_time': '21:00'}, 'NI': {'has_night_session': True, 'margin_rate': {'long': 0.12, 'short': 0.12}, 'multiplier': 1, 'trading_start_time': '21:00'}, 'SN': {'has_night_session': True, 'margin_rate': {'long': 0.12, 'short': 0.12}, 'multiplier': 1, 'trading_start_time': '21:00'}, 'SS': {'has_night_session': True, 'margin_rate': {'long': 0.07, 'short': 0.07}, 'multiplier': 5, 'trading_start_time': '21:00'}, # 黑色系 'RB': {'has_night_session': True, 'margin_rate': {'long': 0.07, 'short': 0.07}, 'multiplier': 10, 'trading_start_time': '21:00'}, 'HC': {'has_night_session': True, 'margin_rate': {'long': 0.07, 'short': 0.07}, 'multiplier': 10, 'trading_start_time': '21:00'}, 'I': {'has_night_session': True, 'margin_rate': {'long': 0.1, 'short': 0.1}, 'multiplier': 100, 'trading_start_time': '21:00'}, 'JM': {'has_night_session': True, 'margin_rate': {'long': 0.22, 'short': 0.22}, 'multiplier': 100, 'trading_start_time': '21:00'}, 'J': {'has_night_session': True, 'margin_rate': {'long': 0.22, 'short': 0.22}, 'multiplier': 60, 'trading_start_time': '21:00'}, # 能源化工 'SP': {'has_night_session': True, 'margin_rate': {'long': 0.1, 'short': 0.1}, 'multiplier': 10, 'trading_start_time': '21:00'}, 'FU': {'has_night_session': True, 'margin_rate': {'long': 0.08, 'short': 0.08}, 'multiplier': 10, 'trading_start_time': '21:00'}, 'BU': {'has_night_session': True, 'margin_rate': {'long': 0.04, 'short': 0.04}, 'multiplier': 10, 'trading_start_time': '21:00'}, 'RU': {'has_night_session': True, 'margin_rate': {'long': 0.05, 'short': 0.05}, 'multiplier': 10, 'trading_start_time': '21:00'}, 'BR': {'has_night_session': True, 'margin_rate': {'long': 0.07, 'short': 0.07}, 'multiplier': 5, 'trading_start_time': '21:00'}, 'SC': {'has_night_session': True, 'margin_rate': {'long': 0.12, 'short': 0.12}, 'multiplier': 1000, 'trading_start_time': '21:00'}, 'NR': {'has_night_session': True, 'margin_rate': {'long': 0.13, 'short': 0.13}, 'multiplier': 10, 'trading_start_time': '21:00'}, 'LU': {'has_night_session': True, 'margin_rate': {'long': 0.15, 'short': 0.15}, 'multiplier': 10, 'trading_start_time': '21:00'}, 'LC': {'has_night_session': False, 'margin_rate': {'long': 0.1, 'short': 0.1}, 'multiplier': 1, 'trading_start_time': '09:00'}, # 化工 'FG': {'has_night_session': True, 'margin_rate': {'long': 0.05, 'short': 0.05}, 'multiplier': 20, 'trading_start_time': '21:00'}, 'TA': {'has_night_session': True, 'margin_rate': {'long': 0.05, 'short': 0.05}, 'multiplier': 5, 'trading_start_time': '21:00'}, 'MA': {'has_night_session': True, 'margin_rate': {'long': 0.05, 'short': 0.05}, 'multiplier': 10, 'trading_start_time': '21:00'}, 'SA': {'has_night_session': True, 'margin_rate': {'long': 0.05, 'short': 0.05}, 'multiplier': 20, 'trading_start_time': '21:00'}, 'L': {'has_night_session': True, 'margin_rate': {'long': 0.07, 'short': 0.07}, 'multiplier': 5, 'trading_start_time': '21:00'}, 'V': {'has_night_session': True, 'margin_rate': {'long': 0.07, 'short': 0.07}, 'multiplier': 5, 'trading_start_time': '21:00'}, 'EG': {'has_night_session': True, 'margin_rate': {'long': 0.05, 'short': 0.05}, 'multiplier': 10, 'trading_start_time': '21:00'}, 'PP': {'has_night_session': True, 'margin_rate': {'long': 0.07, 'short': 0.07}, 'multiplier': 5, 'trading_start_time': '21:00'}, 'EB': {'has_night_session': True, 'margin_rate': {'long': 0.12, 'short': 0.12}, 'multiplier': 5, 'trading_start_time': '21:00'}, 'PG': {'has_night_session': True, 'margin_rate': {'long': 0.05, 'short': 0.05}, 'multiplier': 20, 'trading_start_time': '21:00'}, # 农产品 'RM': {'has_night_session': True, 'margin_rate': {'long': 0.05, 'short': 0.05}, 'multiplier': 10, 'trading_start_time': '21:00'}, 'OI': {'has_night_session': True, 'margin_rate': {'long': 0.05, 'short': 0.05}, 'multiplier': 10, 'trading_start_time': '21:00'}, 'CF': {'has_night_session': True, 'margin_rate': {'long': 0.05, 'short': 0.05}, 'multiplier': 5, 'trading_start_time': '21:00'}, 'SR': {'has_night_session': True, 'margin_rate': {'long': 0.05, 'short': 0.05}, 'multiplier': 10, 'trading_start_time': '21:00'}, 'PF': {'has_night_session': True, 'margin_rate': {'long': 0.1, 'short': 0.1}, 'multiplier': 5, 'trading_start_time': '21:00'}, 'C': {'has_night_session': True, 'margin_rate': {'long': 0.07, 'short': 0.07}, 'multiplier': 10, 'trading_start_time': '21:00'}, 'CS': {'has_night_session': True, 'margin_rate': {'long': 0.07, 'short': 0.07}, 'multiplier': 10, 'trading_start_time': '21:00'}, 'CY': {'has_night_session': True, 'margin_rate': {'long': 0.15, 'short': 0.15}, 'multiplier': 5, 'trading_start_time': '21:00'}, 'A': {'has_night_session': True, 'margin_rate': {'long': 0.07, 'short': 0.07}, 'multiplier': 10, 'trading_start_time': '21:00'}, 'B': {'has_night_session': True, 'margin_rate': {'long': 0.05, 'short': 0.05}, 'multiplier': 10, 'trading_start_time': '21:00'}, 'M': {'has_night_session': True, 'margin_rate': {'long': 0.07, 'short': 0.07}, 'multiplier': 10, 'trading_start_time': '21:00'}, 'Y': {'has_night_session': True, 'margin_rate': {'long': 0.05, 'short': 0.05}, 'multiplier': 10, 'trading_start_time': '21:00'}, 'P': {'has_night_session': True, 'margin_rate': {'long': 0.05, 'short': 0.05}, 'multiplier': 10, 'trading_start_time': '21:00'}, # 无夜盘品种 'IF': {'has_night_session': False, 'margin_rate': {'long': 0.08, 'short': 0.08}, 'multiplier': 300, 'trading_start_time': '09:30'}, 'IH': {'has_night_session': False, 'margin_rate': {'long': 0.08, 'short': 0.08}, 'multiplier': 300, 'trading_start_time': '09:30'}, 'IC': {'has_night_session': False, 'margin_rate': {'long': 0.08, 'short': 0.08}, 'multiplier': 200, 'trading_start_time': '09:30'}, 'IM': {'has_night_session': False, 'margin_rate': {'long': 0.08, 'short': 0.08}, 'multiplier': 200, 'trading_start_time': '09:30'}, 'AP': {'has_night_session': False, 'margin_rate': {'long': 0.08, 'short': 0.08}, 'multiplier': 10, 'trading_start_time': '09:00'}, 'CJ': {'has_night_session': False, 'margin_rate': {'long': 0.09, 'short': 0.09}, 'multiplier': 5, 'trading_start_time': '09:00'}, 'PK': {'has_night_session': False, 'margin_rate': {'long': 0.05, 'short': 0.05}, 'multiplier': 5, 'trading_start_time': '09:00'}, 'JD': {'has_night_session': False, 'margin_rate': {'long': 0.07, 'short': 0.07}, 'multiplier': 10, 'trading_start_time': '09:00'}, 'LH': {'has_night_session': False, 'margin_rate': {'long': 0.1, 'short': 0.1}, 'multiplier': 16, 'trading_start_time': '09:00'} } # 策略品种选择策略配置 # 方案1:全品种策略 - 考虑所有配置的期货品种 g.strategy_focus_symbols = [] # 空列表表示考虑所有品种 # 方案2:精选品种策略 - 只交易流动性较好的特定品种(如需使用请取消下行注释) # g.strategy_focus_symbols = ['RM', 'CJ', 'CY', 'JD', 'L', 'LC', 'SF', 'SI'] log.info(f"品种选择策略: {'全品种策略(覆盖所有配置品种)' if not g.strategy_focus_symbols else '精选品种策略(' + str(len(g.strategy_focus_symbols)) + '个品种)'}") # 交易记录和数据存储 g.trade_history = {} # 持仓记录 {symbol: {'entry_price': xxx, 'direction': xxx, ...}} g.daily_ma_candidates = {} # 通过均线和开盘价差检查的候选品种 {symbol: {'direction': 'long'/'short', 'open_price': xxx, ...}} g.today_trades = [] # 当日交易记录 g.excluded_contracts = {} # 每日排除的合约缓存 {dominant_future: {'reason': 'ma_trend'/'open_gap', 'trading_day': xxx}} g.ma_checked_underlyings = {} # 记录各品种在交易日的均线检查状态 {symbol: trading_day} g.last_ma_trading_day = None # 最近一次均线检查所属交易日 # 定时任务设置 # 夜盘开始(21:05) - 均线和开盘价差检查 run_daily(check_ma_trend_and_open_gap, time='21:05:00', reference_security='IF1808.CCFX') # 日盘开始 - 均线和开盘价差检查 run_daily(check_ma_trend_and_open_gap, time='09:05:00', reference_security='IF1808.CCFX') run_daily(check_ma_trend_and_open_gap, time='09:35:00', reference_security='IF1808.CCFX') # 盘中价差检查和开仓(14:35和14:55) run_daily(check_intraday_price_diff, time='14:35:00', reference_security='IF1808.CCFX') run_daily(check_intraday_price_diff, time='14:55:00', reference_security='IF1808.CCFX') # 夜盘止损止盈检查 run_daily(check_stop_loss_profit, time='21:05:00', reference_security='IF1808.CCFX') run_daily(check_stop_loss_profit, time='21:35:00', reference_security='IF1808.CCFX') run_daily(check_stop_loss_profit, time='22:05:00', reference_security='IF1808.CCFX') run_daily(check_stop_loss_profit, time='22:35:00', reference_security='IF1808.CCFX') # 日盘止损止盈检查 run_daily(check_stop_loss_profit, time='09:05:00', reference_security='IF1808.CCFX') run_daily(check_stop_loss_profit, time='09:35:00', reference_security='IF1808.CCFX') run_daily(check_stop_loss_profit, time='10:05:00', reference_security='IF1808.CCFX') run_daily(check_stop_loss_profit, time='10:35:00', reference_security='IF1808.CCFX') run_daily(check_stop_loss_profit, time='11:05:00', reference_security='IF1808.CCFX') run_daily(check_stop_loss_profit, time='11:25:00', reference_security='IF1808.CCFX') run_daily(check_stop_loss_profit, time='13:35:00', reference_security='IF1808.CCFX') run_daily(check_stop_loss_profit, time='14:05:00', reference_security='IF1808.CCFX') run_daily(check_stop_loss_profit, time='14:35:00', reference_security='IF1808.CCFX') run_daily(check_stop_loss_profit, time='14:55:00', reference_security='IF1808.CCFX') # 收盘后 run_daily(after_market_close, time='15:30:00', reference_security='IF1808.CCFX') log.info('=' * 60) ############################ 主程序执行函数 ################################### def get_current_trading_day(current_dt): """根据当前时间推断对应的期货交易日""" current_date = current_dt.date() current_time = current_dt.time() trade_days = get_trade_days(end_date=current_date, count=1) if trade_days and trade_days[0] == current_date: trading_day = current_date else: next_days = get_trade_days(start_date=current_date, count=1) trading_day = next_days[0] if next_days else current_date if current_time >= time(20, 59): next_trade_days = get_trade_days(start_date=trading_day, count=2) if len(next_trade_days) >= 2: return next_trade_days[1] if len(next_trade_days) == 1: return next_trade_days[0] return trading_day def normalize_trade_day_value(value): """将交易日对象统一转换为 datetime.date""" if isinstance(value, date) and not isinstance(value, datetime): return value if isinstance(value, datetime): return value.date() if hasattr(value, 'to_pydatetime'): return value.to_pydatetime().date() try: return pd.Timestamp(value).date() except Exception: return value def check_ma_trend_and_open_gap(context): """阶段一:开盘时均线走势和开盘价差检查(一天一次)""" log.info("=" * 60) current_trading_day = get_current_trading_day(context.current_dt) log.info(f"执行均线走势和开盘价差检查 - 时间: {context.current_dt}, 交易日: {current_trading_day}") log.info("=" * 60) # 先检查换月移仓 position_auto_switch(context) # 检查是否进入新交易日,必要时清空缓存 if g.last_ma_trading_day != current_trading_day: if g.excluded_contracts: log.info(f"交易日切换至 {current_trading_day},清空上一交易日的排除缓存") g.excluded_contracts = {} g.ma_checked_underlyings = {} g.last_ma_trading_day = current_trading_day # 获取当前时间 current_time = str(context.current_dt.time())[:5] # HH:MM格式 # 筛选可交易品种(根据交易开始时间判断) focus_symbols = g.strategy_focus_symbols if g.strategy_focus_symbols else list(g.futures_config.keys()) tradable_symbols = [] # 根据当前时间确定可交易的时段 # 21:05 -> 仅接受21:00开盘的合约 # 09:05 -> 接受09:00或21:00开盘的合约 # 09:35 -> 接受所有时段(21:00, 09:00, 09:30)的合约 for symbol in focus_symbols: trading_start_time = get_futures_config(symbol, 'trading_start_time', '09:05') should_trade = False if current_time == '21:05': # 夜盘开盘:仅接受21:00开盘的品种 should_trade = trading_start_time.startswith('21:00') elif current_time == '09:05': # 日盘早盘:接受21:00和09:00开盘的品种 should_trade = trading_start_time.startswith('21:00') or trading_start_time.startswith('09:00') elif current_time == '09:35': # 日盘晚开:接受所有品种(21:00, 09:00, 09:30) should_trade = True if should_trade: tradable_symbols.append(symbol) if not tradable_symbols: log.info(f"当前时间 {current_time} 无品种开盘,跳过检查") return log.info(f"当前时间 {current_time} 开盘品种: {tradable_symbols}") # 对每个品种执行均线和开盘价差检查 for symbol in tradable_symbols: if g.ma_checked_underlyings.get(symbol) == current_trading_day: log.info(f"{symbol} 已在交易日 {current_trading_day} 完成均线检查,跳过本次执行") continue try: g.ma_checked_underlyings[symbol] = current_trading_day # 获取主力合约 dominant_future = get_dominant_future(symbol) # log.debug(f"{symbol} 主力合约: {dominant_future}") if not dominant_future: log.info(f"{symbol} 未找到主力合约,跳过") continue # 检查是否在排除缓存中(当日已检查过但不符合条件) if dominant_future in g.excluded_contracts: excluded_info = g.excluded_contracts[dominant_future] if excluded_info['trading_day'] == current_trading_day: # log.debug(f"{symbol} 在排除缓存中(原因: {excluded_info['reason']}),跳过") continue else: # 新的一天,从缓存中移除(会在after_market_close统一清理,这里也做兜底) del g.excluded_contracts[dominant_future] # 检查是否已有持仓 if check_symbol_prefix_match(dominant_future, set(g.trade_history.keys())): log.info(f"{symbol} 已有持仓,跳过") continue # 获取历史数据(需要足够计算MA30) # 使用get_price获取数据,可以正确处理夜盘品种 # 注意:historical_data最后一行是昨天的数据,不包含今天的数据 historical_data = get_price(dominant_future, end_date=context.current_dt, frequency='1d', fields=['open', 'close', 'high', 'low'], count=g.ma_historical_days) if historical_data is None or len(historical_data) < max(g.ma_periods): log.info(f"{symbol} 历史数据不足,跳过") continue previous_trade_days = get_trade_days(end_date=current_trading_day, count=2) previous_trade_days = [normalize_trade_day_value(d) for d in previous_trade_days] previous_trading_day = None if len(previous_trade_days) >= 2: previous_trading_day = previous_trade_days[-2] elif len(previous_trade_days) == 1 and previous_trade_days[0] < current_trading_day: previous_trading_day = previous_trade_days[0] if previous_trading_day is None: log.info(f"{symbol} 无法确定前一交易日,跳过") continue historical_dates = historical_data.index.date match_indices = np.where(historical_dates == previous_trading_day)[0] if len(match_indices) == 0: earlier_indices = np.where(historical_dates < previous_trading_day)[0] if len(earlier_indices) == 0: log.info(f"{symbol} 历史数据缺少 {previous_trading_day} 之前的记录,跳过") continue match_indices = [earlier_indices[-1]] data_upto_yesterday = historical_data.iloc[:match_indices[-1] + 1] # log.debug(f"data_upto_yesterday: {data_upto_yesterday}") yesterday_data = data_upto_yesterday.iloc[-1] yesterday_close = yesterday_data['close'] # 获取今天的开盘价(使用get_current_data API) current_data = get_current_data()[dominant_future] today_open = current_data.day_open # log.info(f" 历史数据时间范围: {historical_data.index[0]} 至 {historical_data.index[-1]}") # 计算昨天的均线值(使用截至前一交易日的数据) ma_values = calculate_ma_values(data_upto_yesterday, g.ma_periods) ma_proximity_counts = calculate_ma_proximity_counts(data_upto_yesterday, g.ma_periods, g.ma_pattern_lookback_days) log.info(f"{symbol}({dominant_future}) 均线检查:") # log.debug(f"yesterday_data: {yesterday_data}") # log.info(f" 昨收: {yesterday_close:.2f}, 今开: {today_open:.2f}") # log.info(f" 昨日均线 - MA5: {ma_values['MA5']:.2f}, MA10: {ma_values['MA10']:.2f}, " # f"MA20: {ma_values['MA20']:.2f}, MA30: {ma_values['MA30']:.2f}") log.info(f" 均线贴近统计: {ma_proximity_counts}") proximity_sum = ma_proximity_counts.get('MA5', 0) + ma_proximity_counts.get('MA10', 0) if proximity_sum < g.ma_proximity_min_threshold: log.info(f" {symbol}({dominant_future}) ✗ 均线贴近计数不足,MA5+MA10={proximity_sum} < {g.ma_proximity_min_threshold},跳过") g.excluded_contracts[dominant_future] = { 'reason': 'ma_proximity', 'trading_day': current_trading_day } continue # 判断均线走势(使用新的灵活模式检查) direction = None if check_ma_pattern(ma_values, 'long'): direction = 'long' # log.info(f" {symbol}({dominant_future}) 均线走势判断: 多头排列") elif check_ma_pattern(ma_values, 'short'): direction = 'short' # log.info(f" {symbol}({dominant_future}) 均线走势判断: 空头排列") else: # log.info(f" 均线走势判断: 不符合多头或空头排列,跳过") # 将不符合条件的合约加入排除缓存 g.excluded_contracts[dominant_future] = { 'reason': 'ma_trend', 'trading_day': current_trading_day } continue # 检查历史均线模式一致性 consistency_passed, consistency_ratio = check_historical_ma_pattern_consistency( historical_data, direction, g.ma_pattern_lookback_days, g.ma_pattern_consistency_threshold ) if not consistency_passed: log.info(f" {symbol}({dominant_future}) ✗ 历史均线模式一致性不足 " f"({consistency_ratio:.1%} < {g.ma_pattern_consistency_threshold:.1%}),跳过") g.excluded_contracts[dominant_future] = { 'reason': 'ma_consistency', 'trading_day': current_trading_day } continue else: log.info(f" {symbol}({dominant_future}) ✓ 历史均线模式一致性检查通过 " f"({consistency_ratio:.1%} >= {g.ma_pattern_consistency_threshold:.1%})") # 计算开盘价差比例 open_gap_ratio = (today_open - yesterday_close) / yesterday_close log.info(f" 开盘价差检查: 昨收 {yesterday_close:.2f}, 今开 {today_open:.2f}, " f"价差比例 {open_gap_ratio:.2%}") # 检查开盘价差是否符合方向要求 gap_check_passed = False if g.ma_gap_strategy_mode == 1: # 方案1:多头检查上跳,空头检查下跳 if direction == 'long' and open_gap_ratio >= g.ma_open_gap_threshold: log.info(f" {symbol}({dominant_future}) ✓ 方案1多头开盘价差检查通过 ({open_gap_ratio:.2%} >= {g.ma_open_gap_threshold:.2%})") gap_check_passed = True elif direction == 'short' and open_gap_ratio <= -g.ma_open_gap_threshold: log.info(f" {symbol}({dominant_future}) ✓ 方案1空头开盘价差检查通过 ({open_gap_ratio:.2%} <= {-g.ma_open_gap_threshold:.2%})") gap_check_passed = True elif g.ma_gap_strategy_mode == 2: # 方案2:多头检查下跳,空头检查上跳 if direction == 'long' and open_gap_ratio <= -g.ma_open_gap_threshold2: log.info(f" {symbol}({dominant_future}) ✓ 方案2多头开盘价差检查通过 ({open_gap_ratio:.2%} <= {-g.ma_open_gap_threshold2:.2%})") gap_check_passed = True elif direction == 'short' and open_gap_ratio >= g.ma_open_gap_threshold2: log.info(f" {symbol}({dominant_future}) ✓ 方案2空头开盘价差检查通过 ({open_gap_ratio:.2%} >= {g.ma_open_gap_threshold2:.2%})") gap_check_passed = True if not gap_check_passed: # log.info(f" ✗ 开盘价差不符合方案{g.ma_gap_strategy_mode} {direction}方向要求,跳过") # 将不符合条件的合约加入排除缓存 g.excluded_contracts[dominant_future] = { 'reason': 'open_gap', 'trading_day': current_trading_day } continue # 将通过检查的品种加入候选列表 g.daily_ma_candidates[dominant_future] = { 'symbol': symbol, 'direction': direction, 'open_price': today_open, 'yesterday_close': yesterday_close, 'ma_values': ma_values } log.info(f" ✓✓ {symbol} 通过均线和开盘价差检查,加入候选列表") except Exception as e: g.ma_checked_underlyings.pop(symbol, None) log.warning(f"{symbol} 检查时出错: {str(e)}") continue log.info(f"候选列表更新完成,当前候选品种: {list(g.daily_ma_candidates.keys())}") log.info("=" * 60) def check_intraday_price_diff(context): """阶段二:盘中价差检查和开仓(14:35和14:55)""" log.info("=" * 60) log.info(f"执行当天价差检查和开仓逻辑 - 时间: {context.current_dt}") log.info("=" * 60) # 先检查换月移仓 position_auto_switch(context) if not g.daily_ma_candidates: log.info("当前无候选品种,跳过") return log.info(f"候选品种数量: {len(g.daily_ma_candidates)}") # 遍历候选品种 candidates_to_remove = [] for dominant_future, candidate_info in g.daily_ma_candidates.items(): try: symbol = candidate_info['symbol'] direction = candidate_info['direction'] open_price = candidate_info['open_price'] # 再次检查是否已有持仓 if check_symbol_prefix_match(dominant_future, set(g.trade_history.keys())): log.info(f"{symbol} 已有持仓,从候选列表移除") candidates_to_remove.append(dominant_future) continue # 获取当前价格 current_data = get_current_data()[dominant_future] current_price = current_data.last_price # 计算当天价差 intraday_diff = current_price - open_price intraday_diff_ratio = intraday_diff / open_price # 计算相对变化比例 log.info(f"{symbol}({dominant_future}) 当天价差检查:") log.info(f" 方向: {direction}, 开盘价: {open_price:.2f}, 当前价: {current_price:.2f}, " f"当天价差: {intraday_diff:.2f}, 变化比例: {intraday_diff_ratio:.2%}") # 判断是否满足开仓条件 should_open = False if g.ma_gap_strategy_mode == 1: # 方案1:根据参数决定是否检查日内价差 if not g.check_intraday_spread: # 跳过日内价差检查,直接允许开仓 log.info(f" 方案1跳过日内价差检查(check_intraday_spread=False)") should_open = True elif direction == 'long' and intraday_diff > 0: log.info(f" ✓ 方案1多头当天价差检查通过 ({intraday_diff:.2f} > 0)") should_open = True elif direction == 'short' and intraday_diff < 0: log.info(f" ✓ 方案1空头当天价差检查通过 ({intraday_diff:.2f} < 0)") should_open = True else: log.info(f" ✗ 方案1当天价差不符合{direction}方向要求") elif g.ma_gap_strategy_mode == 2: # 方案2:强制检查日内变化,使用专用阈值 if direction == 'long' and intraday_diff_ratio >= g.ma_intraday_threshold_scheme2: log.info(f" ✓ 方案2多头日内变化检查通过 ({intraday_diff_ratio:.2%} >= {g.ma_intraday_threshold_scheme2:.2%})") should_open = True elif direction == 'short' and intraday_diff_ratio <= -g.ma_intraday_threshold_scheme2: log.info(f" ✓ 方案2空头日内变化检查通过 ({intraday_diff_ratio:.2%} <= {-g.ma_intraday_threshold_scheme2:.2%})") should_open = True else: log.info(f" ✗ 方案2日内变化不符合{direction}方向要求(阈值: ±{g.ma_intraday_threshold_scheme2:.2%})") if should_open: # 执行开仓 log.info(f" 准备开仓: {symbol} {direction}") target_hands = calculate_target_hands(context, dominant_future, direction) if target_hands > 0: success = open_position(context, dominant_future, target_hands, direction, f'均线形态开仓') if success: log.info(f" ✓✓ {symbol} 开仓成功,从候选列表移除") candidates_to_remove.append(dominant_future) else: log.warning(f" ✗ {symbol} 开仓失败") else: log.warning(f" ✗ {symbol} 计算目标手数为0,跳过开仓") except Exception as e: log.warning(f"{dominant_future} 处理时出错: {str(e)}") continue # 从候选列表中移除已开仓的品种 for future in candidates_to_remove: if future in g.daily_ma_candidates: del g.daily_ma_candidates[future] log.info(f"剩余候选品种: {list(g.daily_ma_candidates.keys())}") log.info("=" * 60) def check_stop_loss_profit(context): """阶段三:止损止盈检查(所有时间点)""" # 先检查换月移仓 position_auto_switch(context) # 获取当前时间 current_time = str(context.current_dt.time())[:2] # 判断是否为夜盘时间 is_night_session = (current_time in ['21', '22', '23', '00', '01', '02']) # 遍历所有持仓进行止损止盈检查 subportfolio = context.subportfolios[0] long_positions = list(subportfolio.long_positions.values()) short_positions = list(subportfolio.short_positions.values()) closed_count = 0 skipped_count = 0 for position in long_positions + short_positions: security = position.security underlying_symbol = security.split('.')[0][:-4] # 检查交易时间适配性 has_night_session = get_futures_config(underlying_symbol, 'has_night_session', False) # 如果是夜盘时间,但品种不支持夜盘交易,则跳过 if is_night_session and not has_night_session: skipped_count += 1 continue # 执行止损止盈检查 if check_position_stop_loss_profit(context, position): closed_count += 1 if closed_count > 0: log.info(f"执行了 {closed_count} 次止损止盈") if skipped_count > 0: log.info(f"夜盘时间跳过 {skipped_count} 个日间品种的止损止盈检查") def check_position_stop_loss_profit(context, position): """检查单个持仓的止损止盈""" security = position.security if security not in g.trade_history: return False trade_info = g.trade_history[security] direction = trade_info['direction'] entry_price = trade_info['entry_price'] entry_time = trade_info['entry_time'] current_price = position.price # 计算当前盈亏比率 if direction == 'long': profit_rate = (current_price - entry_price) / entry_price else: profit_rate = (entry_price - current_price) / entry_price # 检查固定止损 if profit_rate <= -g.fixed_stop_loss_rate: log.info(f"触发固定止损 {security} {direction}, 当前亏损率: {profit_rate:.3%}, " f"成本价: {entry_price:.2f}, 当前价格: {current_price:.2f}") close_position(context, security, direction) return True # 检查是否启用均线跟踪止盈 if not trade_info.get('ma_trailing_enabled', True): return False # 检查均线跟踪止盈 # 获取持仓天数 entry_date = entry_time.date() current_date = context.current_dt.date() all_trade_days = get_all_trade_days() holding_days = sum((entry_date <= d <= current_date) for d in all_trade_days) # 计算变化率 today_price = get_current_data()[security].last_price avg_daily_change_rate = calculate_average_daily_change_rate(security) historical_data = attribute_history(security, 1, '1d', ['close']) yesterday_close = historical_data['close'].iloc[-1] today_change_rate = abs((today_price - yesterday_close) / yesterday_close) # 根据时间判断使用的偏移量 current_time = context.current_dt.time() target_time = datetime.strptime('14:55:00', '%H:%M:%S').time() if current_time > target_time: offset_ratio = g.ma_offset_ratio_close else: offset_ratio = g.ma_offset_ratio_normal # 选择止损均线 close_line = None if today_change_rate >= 1.5 * avg_daily_change_rate: close_line = 'ma5' # 波动剧烈时用短周期 elif holding_days <= g.days_for_adjustment: close_line = 'ma5' # 持仓初期用短周期 else: close_line = 'ma5' if today_change_rate >= 1.2 * avg_daily_change_rate else 'ma10' # 计算实时均线值 ma_values = calculate_realtime_ma_values(security, [5, 10]) ma_value = ma_values[close_line] # 应用偏移量 if direction == 'long': adjusted_ma_value = ma_value * (1 - offset_ratio) else: adjusted_ma_value = ma_value * (1 + offset_ratio) # 判断是否触发均线止损 if (direction == 'long' and today_price < adjusted_ma_value) or \ (direction == 'short' and today_price > adjusted_ma_value): log.info(f"触发均线跟踪止盈 {security} {direction}, 止损均线: {close_line}, " f"均线值: {ma_value:.2f}, 调整后: {adjusted_ma_value:.2f}, " f"当前价: {today_price:.2f}, 持仓天数: {holding_days}") close_position(context, security, direction) return True return False ############################ 核心辅助函数 ################################### def calculate_ma_values(data, periods): """计算均线值 Args: data: DataFrame,包含'close'列的历史数据(最后一行是最新的数据) periods: list,均线周期列表,如[5, 10, 20, 30] Returns: dict: {'MA5': value, 'MA10': value, 'MA20': value, 'MA30': value} 返回最后一行(最新日期)的各周期均线值 """ ma_values = {} for period in periods: if len(data) >= period: # 计算最后period天的均线值 ma_values[f'MA{period}'] = data['close'].iloc[-period:].mean() else: ma_values[f'MA{period}'] = None return ma_values def calculate_ma_proximity_counts(data, periods, lookback_days): """统计近 lookback_days 天收盘价贴近各均线的次数""" proximity_counts = {f'MA{period}': 0 for period in periods} if len(data) < lookback_days: return proximity_counts closes = data['close'].iloc[-lookback_days:] ma_series = { period: data['close'].rolling(window=period).mean().iloc[-lookback_days:] for period in periods } for idx, close_price in enumerate(closes): min_diff = None closest_period = None for period in periods: ma_value = ma_series[period].iloc[idx] if pd.isna(ma_value): continue diff = abs(close_price - ma_value) if min_diff is None or diff < min_diff: min_diff = diff closest_period = period if closest_period is not None: proximity_counts[f'MA{closest_period}'] += 1 return proximity_counts def check_ma_pattern(ma_values, direction): """检查均线排列模式是否符合方向要求 Args: ma_values: dict,包含MA5, MA10, MA20, MA30的均线值 direction: str,'long'或'short' Returns: bool: 是否符合均线排列要求 """ ma5 = ma_values['MA5'] ma10 = ma_values['MA10'] ma20 = ma_values['MA20'] ma30 = ma_values['MA30'] if direction == 'long': # 多头模式:MA30 <= MA20 <= MA10 <= MA5 或 MA30 <= MA20 <= MA5 <= MA10 pattern1 = (ma30 <= ma20 <= ma10 <= ma5) pattern2 = (ma30 <= ma20 <= ma5 <= ma10) return pattern1 or pattern2 elif direction == 'short': # 空头模式:MA10 <= MA5 <= MA20 <= MA30 或 MA5 <= MA10 <= MA20 <= MA30 pattern1 = (ma10 <= ma5 <= ma20 <= ma30) pattern2 = (ma5 <= ma10 <= ma20 <= ma30) return pattern1 or pattern2 else: return False def check_historical_ma_pattern_consistency(historical_data, direction, lookback_days, consistency_threshold): """检查历史均线模式的一致性 Args: historical_data: DataFrame,包含足够天数的历史数据 direction: str,'long'或'short' lookback_days: int,检查过去多少天 consistency_threshold: float,一致性阈值(0-1之间) Returns: tuple: (bool, float) - (是否通过一致性检查, 实际一致性比例) """ if len(historical_data) < max(g.ma_periods) + lookback_days: # 历史数据不足 return False, 0.0 match_count = 0 total_count = lookback_days # 检查过去lookback_days天的均线模式 for i in range(lookback_days): # 获取倒数第(i+1)天的数据(i=0时是昨天,i=1时是前天,依此类推) end_idx = -(i + 1) if end_idx == -1: data_slice = historical_data else: data_slice = historical_data.iloc[:end_idx] # 计算该天的均线值 ma_values = calculate_ma_values(data_slice, g.ma_periods) # 检查是否符合模式 if check_ma_pattern(ma_values, direction): match_count += 1 consistency_ratio = match_count / total_count passed = consistency_ratio >= consistency_threshold return passed, consistency_ratio ############################ 交易执行函数 ################################### def open_position(context, security, target_hands, direction, reason=''): """开仓""" try: # 记录交易前的可用资金 cash_before = context.portfolio.available_cash # 使用order_target按手数开仓 order = order_target(security, target_hands, side=direction) if order is not None and order.filled > 0: # 记录交易后的可用资金 cash_after = context.portfolio.available_cash # 计算实际资金变化 cash_change = cash_before - cash_after # 获取订单价格和数量 order_price = order.avg_cost if order.avg_cost else order.price order_amount = order.filled # 记录当日交易 underlying_symbol = security.split('.')[0][:-4] g.today_trades.append({ 'security': security, 'underlying_symbol': underlying_symbol, 'direction': direction, 'order_amount': order_amount, 'order_price': order_price, 'cash_change': cash_change, 'time': context.current_dt }) # 记录交易信息 g.trade_history[security] = { 'entry_price': order_price, 'target_hands': target_hands, 'actual_hands': order_amount, 'actual_margin': cash_change, 'direction': direction, 'entry_time': context.current_dt } ma_trailing_enabled = True if direction == 'long': ma_values_at_entry = calculate_realtime_ma_values(security, [5]) ma5_value = ma_values_at_entry.get('ma5') if ma5_value is not None and order_price < ma5_value: ma_trailing_enabled = False log.info(f"禁用均线跟踪止盈: {security} {direction}, 开仓价 {order_price:.2f} < MA5 {ma5_value:.2f}") g.trade_history[security]['ma_trailing_enabled'] = ma_trailing_enabled log.info(f"开仓成功: {security} {direction} {order_amount}手 @{order_price:.2f}, " f"保证金: {cash_change:.0f}, 原因: {reason}") return True except Exception as e: log.warning(f"开仓失败 {security}: {str(e)}") return False def close_position(context, security, direction): """平仓""" try: # 使用order_target平仓到0手 order = order_target(security, 0, side=direction) if order is not None and order.filled > 0: underlying_symbol = security.split('.')[0][:-4] # 记录当日交易(平仓) g.today_trades.append({ 'security': security, 'underlying_symbol': underlying_symbol, 'direction': direction, 'order_amount': -order.filled, 'order_price': order.avg_cost if order.avg_cost else order.price, 'cash_change': 0, 'time': context.current_dt }) log.info(f"平仓成功: {underlying_symbol} {direction} {order.filled}手") # 从交易历史中移除 if security in g.trade_history: del g.trade_history[security] return True except Exception as e: log.warning(f"平仓失败 {security}: {str(e)}") return False ############################ 辅助函数 ################################### def get_futures_config(underlying_symbol, config_key=None, default_value=None): """获取期货品种配置信息的辅助函数""" if underlying_symbol not in g.futures_config: if config_key and default_value is not None: return default_value return {} if config_key is None: return g.futures_config[underlying_symbol] return g.futures_config[underlying_symbol].get(config_key, default_value) def get_margin_rate(underlying_symbol, direction, default_rate=0.10): """获取保证金比例的辅助函数""" return g.futures_config.get(underlying_symbol, {}).get('margin_rate', {}).get(direction, default_rate) def get_multiplier(underlying_symbol, default_multiplier=10): """获取合约乘数的辅助函数""" return g.futures_config.get(underlying_symbol, {}).get('multiplier', default_multiplier) def calculate_target_hands(context, security, direction): """计算目标开仓手数""" current_price = get_current_data()[security].last_price underlying_symbol = security.split('.')[0][:-4] # 使用保证金比例 margin_rate = get_margin_rate(underlying_symbol, direction) multiplier = get_multiplier(underlying_symbol) # 计算单手保证金 single_hand_margin = current_price * multiplier * margin_rate # 还要考虑可用资金限制 available_cash = context.portfolio.available_cash * g.usage_percentage # 根据单个标的最大持仓保证金限制计算开仓数量 max_margin = g.max_margin_per_position if single_hand_margin <= max_margin: # 如果单手保证金不超过最大限制,计算最大可开仓手数 max_hands = int(max_margin / single_hand_margin) max_hands_by_cash = int(available_cash / single_hand_margin) # 取两者较小值 actual_hands = min(max_hands, max_hands_by_cash) # 确保至少开1手 actual_hands = max(1, actual_hands) log.info(f"单手保证金: {single_hand_margin:.0f}, 目标开仓手数: {actual_hands}") return actual_hands else: # 如果单手保证金超过最大限制,默认开仓1手 actual_hands = 1 log.info(f"单手保证金: {single_hand_margin:.0f} 超过最大限制: {max_margin}, 默认开仓1手") return actual_hands def check_symbol_prefix_match(symbol, hold_symbols): """检查是否有相似的持仓品种""" symbol_prefix = symbol[:-9] for hold_symbol in hold_symbols: hold_symbol_prefix = hold_symbol[:-9] if len(hold_symbol) > 9 else hold_symbol if symbol_prefix == hold_symbol_prefix: return True return False def calculate_average_daily_change_rate(security, days=30): """计算日均变化率""" historical_data = attribute_history(security, days + 1, '1d', ['close']) daily_change_rates = abs(historical_data['close'].pct_change()).iloc[1:] return daily_change_rates.mean() def calculate_realtime_ma_values(security, ma_periods): """计算包含当前价格的实时均线值""" historical_data = attribute_history(security, max(ma_periods), '1d', ['close']) today_price = get_current_data()[security].last_price close_prices = historical_data['close'].tolist() + [today_price] ma_values = {f'ma{period}': sum(close_prices[-period:]) / period for period in ma_periods} return ma_values def after_market_close(context): """收盘后运行函数""" log.info(str('函数运行时间(after_market_close):'+str(context.current_dt.time()))) # 清空候选列表(每天重新检查) g.daily_ma_candidates = {} # 清空排除缓存(每天重新检查) excluded_count = len(g.excluded_contracts) if excluded_count > 0: log.info(f"清空排除缓存,共 {excluded_count} 个合约") g.excluded_contracts = {} # 只有当天有交易时才打印统计信息 if g.today_trades: print_daily_trading_summary(context) # 清空当日交易记录 g.today_trades = [] log.info('##############################################################') def print_daily_trading_summary(context): """打印当日交易汇总""" if not g.today_trades: return log.info("\n=== 当日交易汇总 ===") total_margin = 0 for trade in g.today_trades: if trade['order_amount'] > 0: # 开仓 log.info(f"开仓 {trade['underlying_symbol']} {trade['direction']} {trade['order_amount']}手 " f"价格:{trade['order_price']:.2f} 保证金:{trade['cash_change']:.0f}") total_margin += trade['cash_change'] else: # 平仓 log.info(f"平仓 {trade['underlying_symbol']} {trade['direction']} {abs(trade['order_amount'])}手 " f"价格:{trade['order_price']:.2f}") log.info(f"当日保证金占用: {total_margin:.0f}") log.info("==================\n") ########################## 自动移仓换月函数 ################################# def position_auto_switch(context, pindex=0, switch_func=None, callback=None): """期货自动移仓换月""" import re subportfolio = context.subportfolios[pindex] symbols = set(subportfolio.long_positions.keys()) | set(subportfolio.short_positions.keys()) switch_result = [] for symbol in symbols: match = re.match(r"(?P[A-Z]{1,})", symbol) if not match: raise ValueError("未知期货标的: {}".format(symbol)) else: dominant = get_dominant_future(match.groupdict()["underlying_symbol"]) cur = get_current_data() symbol_last_price = cur[symbol].last_price dominant_last_price = cur[dominant].last_price if dominant > symbol: for positions_ in (subportfolio.long_positions, subportfolio.short_positions): if symbol not in positions_.keys(): continue else : p = positions_[symbol] if switch_func is not None: switch_func(context, pindex, p, dominant) else: amount = p.total_amount # 跌停不能开空和平多,涨停不能开多和平空 if p.side == "long": symbol_low_limit = cur[symbol].low_limit dominant_high_limit = cur[dominant].high_limit if symbol_last_price <= symbol_low_limit: log.warning("标的{}跌停,无法平仓。移仓换月取消。".format(symbol)) continue elif dominant_last_price >= dominant_high_limit: log.warning("标的{}涨停,无法开仓。移仓换月取消。".format(dominant)) continue else: log.info("进行移仓换月: ({0},long) -> ({1},long)".format(symbol, dominant)) order_old = order_target(symbol, 0, side='long') if order_old != None and order_old.filled > 0: order_new = order_target(dominant, amount, side='long') if order_new != None and order_new.filled > 0: switch_result.append({"before": symbol, "after": dominant, "side": "long"}) # 换月成功,更新交易记录 if symbol in g.trade_history: g.trade_history[dominant] = g.trade_history[symbol] del g.trade_history[symbol] else: log.warning("标的{}交易失败,无法开仓。移仓换月失败。".format(dominant)) if p.side == "short": symbol_high_limit = cur[symbol].high_limit dominant_low_limit = cur[dominant].low_limit if symbol_last_price >= symbol_high_limit: log.warning("标的{}涨停,无法平仓。移仓换月取消。".format(symbol)) continue elif dominant_last_price <= dominant_low_limit: log.warning("标的{}跌停,无法开仓。移仓换月取消。".format(dominant)) continue else: log.info("进行移仓换月: ({0},short) -> ({1},short)".format(symbol, dominant)) order_old = order_target(symbol, 0, side='short') if order_old != None and order_old.filled > 0: order_new = order_target(dominant, amount, side='short') if order_new != None and order_new.filled > 0: switch_result.append({"before": symbol, "after": dominant, "side": "short"}) # 换月成功,更新交易记录 if symbol in g.trade_history: g.trade_history[dominant] = g.trade_history[symbol] del g.trade_history[symbol] else: log.warning("标的{}交易失败,无法开仓。移仓换月失败。".format(dominant)) if callback: callback(context, pindex, p, dominant) return switch_result