# 导入函数库 from jqdata import * from jqdata import finance import pandas as pd import numpy as np from datetime import date, datetime, timedelta, time import re # 期货左侧交易策略(带网格和对冲)v001 # 基于网格交易理念,结合左侧底仓建仓、网格交易和空头对冲三个策略组件 # # 交易逻辑分离: # 1. 建仓逻辑:在开盘时执行(夜盘品种21:05,日盘品种09:05) # - 底仓左侧多头:档位价格>=当前价用市价单,否则挂最高两档限价单 # - 网格多头:目标价格>=当前价用市价单,否则挂最高两档限价单 # - 空头对冲:永远使用限价单 # 2. 平仓逻辑:在所有时间点执行(check_stop_profit_loss函数),包括止盈和止损 # 设置以便完整打印 DataFrame pd.set_option('display.max_rows', None) pd.set_option('display.max_columns', None) pd.set_option('display.width', None) pd.set_option('display.max_colwidth', 20) ## 初始化函数,设定基准等等 def initialize(context): # 设定沪深300作为基准 set_benchmark('000300.XSHG') # 开启动态复权模式(真实价格) set_option('use_real_price', True) # 输出内容到日志 log.info('期货左侧交易策略(带网格和对冲)初始化开始') ### 期货相关设定 ### # 设定账户为金融账户 set_subportfolios([SubPortfolioConfig(cash=context.portfolio.starting_cash, type='index_futures')]) # 期货类每笔交易时的手续费是: 买入时万分之0.23,卖出时万分之0.23,平今仓为万分之23 set_order_cost(OrderCost(open_commission=0.000023, close_commission=0.000023, close_today_commission=0.0023), type='index_futures') # 设置期货交易的滑点 set_slippage(StepRelatedSlippage(2)) # ==================== 策略组件开关配置 ==================== g.enable_grid_long = True # 是否启用网格多头策略组件 g.enable_hedge_short = True # 是否启用空头对冲策略组件 # 注意:底仓左侧多头始终启用 # ==================== 资金管理配置 ==================== g.usage_percentage = 0.8 # 最大资金使用比例 g.max_margin_per_position = 20000 # 单个标的最大持仓保证金(元) # ==================== 交易品种配置 ==================== # 期货品种完整配置字典 g.futures_config = { # 贵金属(夜盘品种) 'AU': {'margin_rate': {'long': 0.14, 'short': 0.14}, 'multiplier': 1000, 'trading_start_time': '21:00'}, 'AG': {'margin_rate': {'long': 0.14, 'short': 0.14}, 'multiplier': 15, 'trading_start_time': '21:00'}, # 有色金属(夜盘品种) 'CU': {'margin_rate': {'long': 0.09, 'short': 0.09}, 'multiplier': 5, 'trading_start_time': '21:00'}, 'AL': {'margin_rate': {'long': 0.09, 'short': 0.09}, 'multiplier': 5, 'trading_start_time': '21:00'}, 'ZN': {'margin_rate': {'long': 0.09, 'short': 0.09}, 'multiplier': 5, 'trading_start_time': '21:00'}, 'PB': {'margin_rate': {'long': 0.09, 'short': 0.09}, 'multiplier': 5, 'trading_start_time': '21:00'}, 'NI': {'margin_rate': {'long': 0.12, 'short': 0.12}, 'multiplier': 1, 'trading_start_time': '21:00'}, 'SN': {'margin_rate': {'long': 0.12, 'short': 0.12}, 'multiplier': 1, 'trading_start_time': '21:00'}, 'SS': {'margin_rate': {'long': 0.07, 'short': 0.07}, 'multiplier': 5, 'trading_start_time': '21:00'}, # 黑色系(夜盘品种) 'RB': {'margin_rate': {'long': 0.07, 'short': 0.07}, 'multiplier': 10, 'trading_start_time': '21:00'}, 'HC': {'margin_rate': {'long': 0.07, 'short': 0.07}, 'multiplier': 10, 'trading_start_time': '21:00'}, 'I': {'margin_rate': {'long': 0.1, 'short': 0.1}, 'multiplier': 100, 'trading_start_time': '21:00'}, 'JM': {'margin_rate': {'long': 0.22, 'short': 0.22}, 'multiplier': 100, 'trading_start_time': '21:00'}, 'J': {'margin_rate': {'long': 0.22, 'short': 0.22}, 'multiplier': 60, 'trading_start_time': '21:00'}, # 能源化工(夜盘品种) 'SP': {'margin_rate': {'long': 0.1, 'short': 0.1}, 'multiplier': 10, 'trading_start_time': '21:00'}, 'FU': {'margin_rate': {'long': 0.08, 'short': 0.08}, 'multiplier': 10, 'trading_start_time': '21:00'}, 'BU': {'margin_rate': {'long': 0.04, 'short': 0.04}, 'multiplier': 10, 'trading_start_time': '21:00'}, 'RU': {'margin_rate': {'long': 0.05, 'short': 0.05}, 'multiplier': 10, 'trading_start_time': '21:00'}, 'BR': {'margin_rate': {'long': 0.07, 'short': 0.07}, 'multiplier': 5, 'trading_start_time': '21:00'}, 'SC': {'margin_rate': {'long': 0.12, 'short': 0.12}, 'multiplier': 1000, 'trading_start_time': '21:00'}, 'NR': {'margin_rate': {'long': 0.13, 'short': 0.13}, 'multiplier': 10, 'trading_start_time': '21:00'}, 'LU': {'margin_rate': {'long': 0.15, 'short': 0.15}, 'multiplier': 10, 'trading_start_time': '21:00'}, 'LC': {'margin_rate': {'long': 0.1, 'short': 0.1}, 'multiplier': 1, 'trading_start_time': '09:00'}, # 化工(夜盘品种) 'FG': {'margin_rate': {'long': 0.05, 'short': 0.05}, 'multiplier': 20, 'trading_start_time': '21:00'}, 'TA': {'margin_rate': {'long': 0.05, 'short': 0.05}, 'multiplier': 5, 'trading_start_time': '21:00'}, 'MA': {'margin_rate': {'long': 0.05, 'short': 0.05}, 'multiplier': 10, 'trading_start_time': '21:00'}, 'SA': {'margin_rate': {'long': 0.05, 'short': 0.05}, 'multiplier': 20, 'trading_start_time': '21:00'}, 'L': {'margin_rate': {'long': 0.07, 'short': 0.07}, 'multiplier': 5, 'trading_start_time': '21:00'}, 'V': {'margin_rate': {'long': 0.07, 'short': 0.07}, 'multiplier': 5, 'trading_start_time': '21:00'}, 'EG': {'margin_rate': {'long': 0.05, 'short': 0.05}, 'multiplier': 10, 'trading_start_time': '21:00'}, 'PP': {'margin_rate': {'long': 0.07, 'short': 0.07}, 'multiplier': 5, 'trading_start_time': '21:00'}, 'EB': {'margin_rate': {'long': 0.12, 'short': 0.12}, 'multiplier': 5, 'trading_start_time': '21:00'}, 'PG': {'margin_rate': {'long': 0.05, 'short': 0.05}, 'multiplier': 20, 'trading_start_time': '21:00'}, # 农产品(夜盘品种) 'RM': {'margin_rate': {'long': 0.05, 'short': 0.05}, 'multiplier': 10, 'trading_start_time': '21:00'}, 'OI': {'margin_rate': {'long': 0.05, 'short': 0.05}, 'multiplier': 10, 'trading_start_time': '21:00'}, 'CF': {'margin_rate': {'long': 0.05, 'short': 0.05}, 'multiplier': 5, 'trading_start_time': '21:00'}, 'SR': {'margin_rate': {'long': 0.05, 'short': 0.05}, 'multiplier': 10, 'trading_start_time': '21:00'}, 'PF': {'margin_rate': {'long': 0.1, 'short': 0.1}, 'multiplier': 5, 'trading_start_time': '21:00'}, 'C': {'margin_rate': {'long': 0.07, 'short': 0.07}, 'multiplier': 10, 'trading_start_time': '21:00'}, 'CS': {'margin_rate': {'long': 0.07, 'short': 0.07}, 'multiplier': 10, 'trading_start_time': '21:00'}, 'CY': {'margin_rate': {'long': 0.15, 'short': 0.15}, 'multiplier': 5, 'trading_start_time': '21:00'}, 'A': {'margin_rate': {'long': 0.07, 'short': 0.07}, 'multiplier': 10, 'trading_start_time': '21:00'}, 'B': {'margin_rate': {'long': 0.05, 'short': 0.05}, 'multiplier': 10, 'trading_start_time': '21:00'}, 'M': {'margin_rate': {'long': 0.07, 'short': 0.07}, 'multiplier': 10, 'trading_start_time': '21:00'}, 'Y': {'margin_rate': {'long': 0.05, 'short': 0.05}, 'multiplier': 10, 'trading_start_time': '21:00'}, 'P': {'margin_rate': {'long': 0.05, 'short': 0.05}, 'multiplier': 10, 'trading_start_time': '21:00'}, # 股指期货(日盘品种,9:30开始交易) 'IF': {'margin_rate': {'long': 0.08, 'short': 0.08}, 'multiplier': 300, 'trading_start_time': '09:30'}, 'IH': {'margin_rate': {'long': 0.08, 'short': 0.08}, 'multiplier': 300, 'trading_start_time': '09:30'}, 'IC': {'margin_rate': {'long': 0.08, 'short': 0.08}, 'multiplier': 200, 'trading_start_time': '09:30'}, 'IM': {'margin_rate': {'long': 0.08, 'short': 0.08}, 'multiplier': 200, 'trading_start_time': '09:30'}, 'TL': {'margin_rate': {'long': 0.01, 'short': 0.01}, 'multiplier': 10000, 'trading_start_time': '09:30'}, # 其他日盘品种(9:00开始交易) 'AP': {'margin_rate': {'long': 0.08, 'short': 0.08}, 'multiplier': 10, 'trading_start_time': '09:00'}, 'CJ': {'margin_rate': {'long': 0.09, 'short': 0.09}, 'multiplier': 5, 'trading_start_time': '09:00'}, 'PK': {'margin_rate': {'long': 0.05, 'short': 0.05}, 'multiplier': 5, 'trading_start_time': '09:00'}, 'JD': {'margin_rate': {'long': 0.07, 'short': 0.07}, 'multiplier': 10, 'trading_start_time': '09:00'}, 'LH': {'margin_rate': {'long': 0.1, 'short': 0.1}, 'multiplier': 16, 'trading_start_time': '09:00'}, 'UR': {'margin_rate': {'long': 0.05, 'short': 0.05}, 'multiplier': 20, 'trading_start_time': '09:00'} } # 策略品种选择配置 g.strategy_focus_symbols = ['LH'] # 交易品种列表 # ==================== 底仓左侧多头配置 ==================== # 价格-数量网格:价格越低,买入手数越多 g.base_position_grid = { 'TL': {118: 1, 117: 1, 116: 1, 115: 1, 114: 2, 113: 2}, 'SA': {1400: 4, 1300: 6, 1200: 8, 1100: 12, 1000: 14, 900: 16}, 'M': {2800: 4, 2750: 6, 2700: 8, 2650: 12, 2600: 14, 2550: 16}, 'UR': {1750: 4, 1700: 6, 1650: 8, 1600: 12, 1550: 14, 1500: 16}, 'LH': {13000: 1, 12500: 1, 12000: 1, 11500: 1, 11000: 2}, } # 底仓退出价格(止盈) g.base_position_exit_price = { 'TL': 121, 'SA': 1550, 'M': 3800, 'UR': 2400, 'LH': 14600, } # ==================== 网格多头配置 ==================== g.grid_config = { 'TL': { 'start_price': 118, # 网格开始价格 'grid_size': 1, # 网格大小 'quantity_per_grid': 1, # 每网格数量 'exit_grid_size': 1 # 退出网格大小 }, 'SA': { 'start_price': 1250, # 开始价格 'grid_size': 50, # 网格大小 'quantity_per_grid': 5, # 每网格数量 'exit_grid_size': 50 # 退出网格大小 }, 'M': { 'start_price': 2800, 'grid_size': 100, 'quantity_per_grid': 10, 'exit_grid_size': 100 }, 'UR': { 'start_price': 1800, 'grid_size': 50, 'quantity_per_grid': 10, 'exit_grid_size': 50 }, 'LH': { 'start_price': 13500, 'grid_size': 500, 'quantity_per_grid': 1, 'exit_grid_size': 500 }, } # ==================== 空头对冲配置 ==================== g.hedge_stoploss_pct = 0.02 # 止损百分比(1%) g.hedge_level1_recovery_pct = 0.30 # 一级止盈回升百分比(30%) g.hedge_level2_recovery_pct = 0.60 # 二级止盈回升百分比(60%) g.hedge_cost_area_pct = 0.02 # 成本区域百分比(2%) # ==================== 全局变量初始化 ==================== # 持仓跟踪 g.base_positions = {} # 底仓持仓 {symbol: {level_key: {'entry_price': price, 'quantity': qty, 'entry_date': date}}} g.grid_positions = {} # 网格持仓 {symbol: [{'entry_price': price, 'quantity': qty, 'target_price': target_price, 'entry_date': date}]} g.hedge_positions = {} # 对冲持仓 {symbol: {level_key: {'entry_price': price, 'quantity': qty, 'entry_date': date}}} # 底仓待成交限价单跟踪(问题2修复) g.base_pending_orders = {} # {symbol: {level_key: {'order_id': xxx, 'price': xxx, 'quantity': xxx, 'submit_time': xxx}}} # 网格状态跟踪 g.grid_buy_levels = {} # 网格已建仓目标价集合 {symbol: set()} # 对冲状态跟踪 g.hedge_state = {} # {symbol: {'max_profit': 0, 'max_profit_price': 0, ...}} # 订单策略信息记录 g.order_strategy_info = {} # {order_id: {'strategy_type': '策略类型', 'target_price': 价格}} # ==================== 定时任务设置 ==================== # 日盘止损止盈检查(每个时间点) run_daily(check_open_positions, time='09:35:00', reference_security='IF1808.CCFX') run_daily(check_stop_profit_loss, time='10:05:00', reference_security='IF1808.CCFX') run_daily(check_stop_profit_loss, time='10:35:00', reference_security='IF1808.CCFX') run_daily(check_stop_profit_loss, time='11:05:00', reference_security='IF1808.CCFX') run_daily(check_stop_profit_loss, time='13:35:00', reference_security='IF1808.CCFX') run_daily(check_stop_profit_loss, time='14:05:00', reference_security='IF1808.CCFX') run_daily(check_stop_profit_loss, time='14:35:00', reference_security='IF1808.CCFX') run_daily(check_stop_profit_loss, time='14:55:00', reference_security='IF1808.CCFX') # 建仓逻辑(开盘时执行:夜盘21:05,日盘09:05) run_daily(check_open_positions, time='21:05:00', reference_security='IF1808.CCFX') # 夜盘品种 run_daily(check_open_positions, time='09:05:00', reference_security='IF1808.CCFX') # 日盘品种 # 夜盘止损止盈检查(仅对有夜盘的品种) run_daily(check_stop_profit_loss, time='21:05:00', reference_security='IF1808.CCFX') run_daily(check_stop_profit_loss, time='21:35:00', reference_security='IF1808.CCFX') run_daily(check_stop_profit_loss, time='22:05:00', reference_security='IF1808.CCFX') # 收盘后 run_daily(after_market_close, time='15:30:00', reference_security='IF1808.CCFX') log.info("=" * 50) log.info("策略组件配置:") log.info(f" 底仓左侧多头: 始终启用") log.info(f" 网格多头策略: {'启用' if g.enable_grid_long else '禁用'}") log.info(f" 空头对冲策略: {'启用' if g.enable_hedge_short else '禁用'}") log.info(f"交易品种: {g.strategy_focus_symbols}") log.info("=" * 50) ############################ 主程序执行函数 ################################### def check_open_positions(context): """建仓检查函数(仅在14:35执行)""" log.info("=" * 60) log.info(f"执行建仓检查逻辑 - 时间: {context.current_dt}") log.info("=" * 60) # 先检查换月移仓 switch_success, switch_failed = position_auto_switch(context) if switch_failed: log.info("检测到移仓换月失败,跳过后续建仓检查") for failed in switch_failed: log.info(f"换月失败详情: {failed}") return # 获取当前时间 current_time = str(context.current_dt.time())[:2] # 筛选可交易品种(根据交易开始时间判断是否为夜盘品种) tradable_symbols = [] for symbol in g.strategy_focus_symbols: is_night_symbol = is_night_session_symbol(symbol) if current_time in ('21', '22'): # 夜盘时间:只考虑夜盘品种 if is_night_symbol: tradable_symbols.append(symbol) else: # 日盘时间:所有品种都可以交易 tradable_symbols.append(symbol) if not tradable_symbols: log.info("当前时段无可交易品种") return log.info(f"可交易品种: {tradable_symbols}") # 对每个品种执行建仓检查 for symbol in tradable_symbols: try: # 获取主力合约 dominant_future = get_dominant_future(symbol) if not dominant_future: log.warning(f"{symbol} 未找到主力合约") continue # 获取当前价格 current_data = get_current_data()[dominant_future] current_price = current_data.last_price log.info(f"建仓检查 - 品种: {symbol}, 主力合约: {dominant_future}, 当前价格: {current_price}") # 策略组件1: 底仓左侧多头建仓(始终启用) check_base_position_open(context, symbol, dominant_future, current_price) # 策略组件2: 网格多头建仓(可配置) if g.enable_grid_long: check_grid_trading_open(context, symbol, dominant_future, current_price) # 策略组件3: 空头对冲建仓(可配置) if g.enable_hedge_short: check_hedge_position_open(context, symbol, dominant_future, current_price) except Exception as e: log.warning(f"建仓检查处理{symbol}时出错: {str(e)}") continue def check_stop_profit_loss(context): """止损止盈检查函数(所有时间点执行)""" # log.info("=" * 60) # log.info(f"执行止损止盈检查 - 时间: {context.current_dt}") # log.info("=" * 60) # 先检查换月移仓 switch_success, switch_failed = position_auto_switch(context) if switch_failed: log.info("检测到移仓换月失败,跳过后续止盈止损检查") for failed in switch_failed: log.info(f"换月失败详情: {failed}") return # 获取当前时间 current_dt = context.current_dt current_time = current_dt.time() current_time_str = current_dt.strftime("%H:%M:%S") in_night_session = current_time >= time(21, 0) or current_time < time(2, 30) in_day_session = time(9, 0) <= current_time <= time(15, 0) # 筛选可交易品种(根据交易开始时间判断是否为夜盘品种) tradable_symbols = [] for symbol in g.strategy_focus_symbols: is_night_symbol = is_night_session_symbol(symbol) if not is_night_symbol: if in_night_session: log.info(f"{symbol} 为日盘合约,当前夜间时段 {current_time_str} 跳过止损止盈检查") continue if not in_day_session: log.info(f"{symbol} 为日盘合约,当前非交易时段 {current_time_str} 跳过止损止盈检查") continue else: if in_night_session: log.info(f"{symbol} 夜盘合约在夜间时段 {current_time_str} 执行止损止盈检查") elif not in_day_session: log.info(f"{symbol} 夜盘合约当前非活跃时段 {current_time_str} 仍执行止损止盈检查以覆盖全天风险") tradable_symbols.append(symbol) if not tradable_symbols: return # 对每个品种执行止损止盈检查 for symbol in tradable_symbols: try: # 获取主力合约 dominant_future = get_dominant_future(symbol) if not dominant_future: continue # 获取当前价格 current_data = get_current_data()[dominant_future] current_price = current_data.last_price # 策略组件1: 底仓左侧多头止盈(始终启用) check_base_position_close(context, symbol, dominant_future, current_price) # 策略组件2: 网格多头止盈(可配置) if g.enable_grid_long: check_grid_trading_close(context, symbol, dominant_future, current_price) check_grid_trading_open(context, symbol, dominant_future, current_price) # 策略组件3: 空头对冲止损止盈(可配置) if g.enable_hedge_short: check_hedge_position_close(context, symbol, dominant_future, current_price) check_hedge_position_open(context, symbol, dominant_future, current_price) except Exception as e: log.warning(f"止损止盈检查处理{symbol}时出错: {str(e)}") continue ############################ 策略组件1: 底仓左侧多头 ################################### def update_base_pending_orders(context, symbol, dominant_future): """更新底仓待成交限价单状态 检查待成交限价单的状态: - 如果订单已完全成交,移到 g.base_positions - 如果订单被取消或失败,从 g.base_pending_orders 中删除 - 如果订单部分成交或仍在挂单,保持在 g.base_pending_orders 中 Args: context: 上下文对象 symbol: 品种代码 dominant_future: 主力合约 """ if dominant_future not in g.base_pending_orders: return if not g.base_pending_orders[dominant_future]: return # 获取所有订单 try: all_orders = get_orders() except Exception as e: log.warning(f"更新待成交限价单失败: 无法获取订单信息 - {str(e)}") return levels_to_remove = [] levels_to_add_to_positions = [] for level_key, pending_info in g.base_pending_orders[dominant_future].items(): order_id = pending_info.get('order_id') if not order_id or order_id not in all_orders: # 订单不存在,可能已被系统清理,从待成交列表中移除 log.info(f"底仓限价单 {level_key} 订单 {order_id} 不存在于订单列表,移除待成交记录") levels_to_remove.append(level_key) continue order_obj = all_orders[order_id] status = str(order_obj.status).lower() filled_amount = getattr(order_obj, 'filled', 0) or 0 order_amount = getattr(order_obj, 'amount', 0) or 0 # 检查订单状态 if status in ('filled', 'held'): # 订单已完全成交 actual_entry_price = pending_info['price'] # 限价单成交价等于限价 levels_to_add_to_positions.append((level_key, { 'entry_price': actual_entry_price, 'quantity': pending_info['quantity'], 'entry_date': pending_info['submit_time'] })) levels_to_remove.append(level_key) log.info(f"底仓限价单 {level_key} 已成交,订单 {order_id},移至持仓记录") elif status in ('canceled', 'rejected'): # 订单已取消或被拒绝 levels_to_remove.append(level_key) log.info(f"底仓限价单 {level_key} 已{status},订单 {order_id},从待成交列表移除") elif status in ('open', 'pending', 'new', 'part_filled'): # 订单仍在等待成交或部分成交,保持在待成交列表 log.info(f"底仓限价单 {level_key} 状态 {status},已成交 {filled_amount}/{order_amount} 手,继续等待") else: # 未知状态,记录日志 log.warning(f"底仓限价单 {level_key} 订单 {order_id} 状态未知: {status}") # 将已成交的订单移到持仓记录 for level_key, position_info in levels_to_add_to_positions: g.base_positions[dominant_future][level_key] = position_info # 从待成交列表中移除 for level_key in levels_to_remove: del g.base_pending_orders[dominant_future][level_key] def check_base_position_open(context, symbol, dominant_future, current_price): """检查底仓左侧多头建仓 逻辑: 1. 遍历所有未建仓的价格档位 2. 如果档位价格 >= 当前价格:使用市价单立即成交 3. 如果档位价格 < 当前价格:记录到待挂限价单列表 4. 从待挂限价单列表中选择最高的两个档位挂限价单 """ # 时间检查:判断是否已达到该品种的交易开始时间 if not is_trading_time_reached(context, symbol): log.info(f"底仓建仓跳过: {symbol} 未达到交易开始时间") return # 检查是否有底仓配置 if symbol not in g.base_position_grid: return grid = g.base_position_grid[symbol] # 初始化持仓记录 if dominant_future not in g.base_positions: g.base_positions[dominant_future] = {} # 初始化待成交限价单记录(问题2修复) if dominant_future not in g.base_pending_orders: g.base_pending_orders[dominant_future] = {} # 问题2修复:检查待成交限价单的状态,将已成交的订单移到持仓记录 update_base_pending_orders(context, symbol, dominant_future) log.info(f"底仓建仓检查: {symbol} 当前价格 {current_price}") log.info(f"底仓持仓: {g.base_positions[dominant_future]}") log.info(f"底仓待成交限价单: {g.base_pending_orders[dominant_future]}") # 收集未建仓的档位 market_order_levels = [] # 需要市价单的档位 limit_order_levels = [] # 需要限价单的档位 for price_level, quantity in sorted(grid.items(), reverse=True): level_key = f"{price_level}" # 跳过已建仓的档位 if level_key in g.base_positions[dominant_future]: continue # 问题2修复:跳过已有待成交限价单的档位 if level_key in g.base_pending_orders[dominant_future]: log.info(f" 档位 {price_level} 已有待成交限价单,跳过") continue # 问题1修复:过滤价格过低的档位,避免无效挂单 # 当档位价格 < 当前价格的95%时,跳过该档位(相当于当前价格 > 档位价格 * 1.05) if price_level < current_price * 0.95: log.info(f" 档位 {price_level} 过低(< 当前价 {current_price} * 0.95 = {current_price * 0.95:.2f}),跳过挂单") continue # 判断使用市价单还是限价单 if price_level >= current_price: # 档位价格 >= 当前价格,使用市价单 market_order_levels.append((price_level, quantity)) log.info(f" 档位 {price_level}({quantity}手)>= 当前价 {current_price},将使用市价单") else: # 档位价格 < 当前价格,加入限价单候选 limit_order_levels.append((price_level, quantity)) # 执行市价单 for price_level, quantity in market_order_levels: level_key = f"{price_level}" strategy_info = {'strategy_type': '基础左侧', 'target_price': price_level} _, success = open_position(context, dominant_future, quantity, 'long', f'底仓市价建仓(目标档位{price_level})', strategy_info=strategy_info) if success: # 获取实际成交价格(从订单记录中获取) actual_price = current_price # 市价单按当前价记录 g.base_positions[dominant_future][level_key] = { 'entry_price': actual_price, 'quantity': quantity, 'entry_date': context.current_dt } log.info(f"底仓市价单成交: {symbol} 目标档位 {price_level}, 实际价格 {actual_price}, 数量 {quantity}") # 执行限价单(固定挂两个档位) if limit_order_levels: # 选择价格最高的两个档位 top_two_levels = sorted(limit_order_levels, reverse=True)[:2] log.info(f" 准备挂限价单,候选档位: {[p for p, q in limit_order_levels]}, 选择最高两个: {[p for p, q in top_two_levels]}") for price_level, quantity in top_two_levels: level_key = f"{price_level}" strategy_info = {'strategy_type': '基础左侧', 'target_price': price_level} order_id, success = open_position(context, dominant_future, quantity, 'long', f'底仓限价建仓@{price_level}', limit_price=price_level, strategy_info=strategy_info) if success: # 问题2修复:限价单提交成功后,记录到待成交列表,而不是直接记录到持仓 g.base_pending_orders[dominant_future][level_key] = { 'order_id': order_id, 'price': price_level, 'quantity': quantity, 'submit_time': context.current_dt } log.info(f"底仓限价单挂单: {symbol} 档位 {price_level}, 数量 {quantity}, order_id={order_id}, 待成交确认") def check_base_position_close(context, symbol, dominant_future, current_price): """检查底仓左侧多头平仓(止盈)""" # 时间检查:判断是否已达到该品种的交易开始时间 if not is_trading_time_reached(context, symbol): log.info(f"底仓止盈跳过: {symbol} 未达到交易开始时间") return # 检查是否有底仓配置 if symbol not in g.base_position_exit_price: return exit_price = g.base_position_exit_price.get(symbol) # 检查退出条件(止盈) if exit_price and current_price >= exit_price: # 检查是否有持仓需要平仓 subportfolio = context.subportfolios[0] if dominant_future in subportfolio.long_positions: position = subportfolio.long_positions[dominant_future] if position.total_amount > 0: log.info(f"底仓触发止盈: {symbol} 当前价格 {current_price} >= 退出价格 {exit_price}") close_position(context, dominant_future, position.total_amount, 'long', '底仓止盈') if dominant_future in g.base_positions: g.base_positions[dominant_future] = {} return ############################ 策略组件2: 网格多头策略 ################################### def check_grid_trading_open(context, symbol, dominant_future, current_price): """检查网格多头建仓 逻辑: 1. 找出所有应该买入但未买入的网格层级 2. 对于每个层级的目标价格: - 如果目标价格 >= 当前价格:使用市价单 - 如果目标价格 < 当前价格:记录到待挂限价单列表 3. 从待挂限价单列表中选择价格最高的两个层级挂限价单 """ # 时间检查:判断是否已达到该品种的交易开始时间 if not is_trading_time_reached(context, symbol): log.info(f"网格建仓跳过: {symbol} 未达到交易开始时间") return # 检查是否有网格配置 if symbol not in g.grid_config: return config = g.grid_config[symbol] start_price = config['start_price'] grid_size = config['grid_size'] quantity_per_grid = config['quantity_per_grid'] # 初始化网格持仓记录 if dominant_future not in g.grid_positions: g.grid_positions[dominant_future] = [] if dominant_future not in g.grid_buy_levels: g.grid_buy_levels[dominant_future] = set() # 只在价格低于起始价格时才执行网格交易 if current_price >= start_price: return log.info(f"网格建仓检查: {symbol} 当前价格 {current_price}, 起始价格 {start_price}") # 计算当前价格对应的最低网格档位数量 max_grid_index = int((start_price - current_price) / grid_size) # 收集需要建仓的网格档位 market_order_grids = [] # 需要市价单的网格 limit_order_grids = [] # 需要限价单的网格 # 遍历所有应该触发的网格档位(从0到max_grid_index) for grid_index in range(max_grid_index + 1): # 计算该档位的目标买入价格 target_price = start_price - grid_index * grid_size # 跳过已经买入的目标价 if target_price in g.grid_buy_levels[dominant_future]: continue grid_info = (grid_index, target_price) # 判断使用市价单还是限价单 if target_price >= current_price: # 目标价格 >= 当前价格,使用市价单 market_order_grids.append(grid_info) log.info(f" 网格目标价 {target_price} >= 当前价 {current_price},将使用市价单 (档位索引 {grid_index})") else: # 目标价格 < 当前价格,加入限价单候选 limit_order_grids.append(grid_info) # 执行市价单 for _, target_price in market_order_grids: strategy_info = {'strategy_type': '网格', 'target_price': target_price} _, success = open_position(context, dominant_future, quantity_per_grid, 'long', f'网格市价买入(目标价{target_price})', strategy_info=strategy_info) if success: # 市价单按实际成交价记录(这里用当前价近似) g.grid_positions[dominant_future].append({ 'entry_price': current_price, 'quantity': quantity_per_grid, 'target_price': target_price, 'entry_date': context.current_dt }) g.grid_buy_levels[dominant_future].add(target_price) log.info(f"网格市价单成交: {symbol} 目标价 {target_price}, 实际价 {current_price}, 数量 {quantity_per_grid}") # 执行限价单(固定挂两个层级) if limit_order_grids: # 选择价格最高的两个目标价(档位数越小价格越高) top_two_grids = sorted(limit_order_grids, key=lambda x: x[1], reverse=True)[:2] log.info(f" 准备挂限价单,候选目标价: {[(l, p) for l, p in limit_order_grids]}, 选择最高两个: {[(l, p) for l, p in top_two_grids]}") for _, target_price in top_two_grids: strategy_info = {'strategy_type': '网格', 'target_price': target_price} _, success = open_position(context, dominant_future, quantity_per_grid, 'long', f'网格限价买入@目标价{target_price}', limit_price=target_price, strategy_info=strategy_info) if success: g.grid_positions[dominant_future].append({ 'entry_price': target_price, # 限价单按委托价记录 'quantity': quantity_per_grid, 'target_price': target_price, 'entry_date': context.current_dt }) g.grid_buy_levels[dominant_future].add(target_price) log.info(f"网格限价单挂单: {symbol} 目标价 {target_price}, 数量 {quantity_per_grid}") def check_grid_trading_close(context, symbol, dominant_future, current_price): """检查网格多头平仓(止盈)""" # 时间检查:判断是否已达到该品种的交易开始时间 if not is_trading_time_reached(context, symbol): log.info(f"网格止盈跳过: {symbol} 未达到交易开始时间") return # 检查是否有网格配置 if symbol not in g.grid_config: return config = g.grid_config[symbol] exit_grid_size = config['exit_grid_size'] # 初始化网格持仓记录 if dominant_future not in g.grid_positions: g.grid_positions[dominant_future] = [] if dominant_future not in g.grid_buy_levels: g.grid_buy_levels[dominant_future] = set() # 检查卖出条件:对每个持仓检查是否达到退出条件 positions_to_remove = [] for i, position in enumerate(g.grid_positions[dominant_future]): target_price = position['target_price'] exit_price = target_price + exit_grid_size if current_price >= exit_price: # 执行网格卖出 success = close_position(context, dominant_future, position['quantity'], 'long', f'网格卖出@目标价{target_price}') if success: profit_per_unit = current_price - position['entry_price'] log.info(f"网格卖出: {symbol} 目标价 {target_price}, 计划退出价 {exit_price}, " f"买入价 {position['entry_price']}, 卖出价 {current_price}, " f"盈利 {profit_per_unit * position['quantity'] * get_multiplier(symbol)}") positions_to_remove.append(i) # 移除该网格目标价标记,允许重新买入 if target_price in g.grid_buy_levels[dominant_future]: g.grid_buy_levels[dominant_future].remove(target_price) # 移除已平仓的网格持仓记录 for i in reversed(positions_to_remove): g.grid_positions[dominant_future].pop(i) ############################ 策略组件3: 空头对冲策略 ################################### def check_hedge_position_open(context, symbol, dominant_future, current_price): """检查空头对冲建仓 对冲逻辑: 1. 对冲数量只对应基础左侧持仓,不包括网格持仓 2. 对冲永远使用限价单,价格为各档位的目标价格 3. 针对每个基础左侧档位分别建立对应的空头对冲单 4. **关键约束**:只对今天开立的左侧持仓建立对冲 """ # 时间检查:判断是否已达到该品种的交易开始时间 if not is_trading_time_reached(context, symbol): log.info(f"对冲建仓跳过: {symbol} 未达到交易开始时间") return # 初始化对冲状态 if dominant_future not in g.hedge_state: g.hedge_state[dominant_future] = { 'max_profit': 0, 'max_profit_price': 0, 'profit_stage': 0 # 0: 未盈利, 1: 一级盈利, 2: 二级盈利 } # 初始化对冲持仓记录 if dominant_future not in g.hedge_positions: g.hedge_positions[dominant_future] = {} # 检查是否有底仓持仓(作为建立对冲的条件) if dominant_future not in g.base_positions or not g.base_positions[dominant_future]: log.info(f"对冲建仓检查: {symbol} 当前无基础左侧持仓,无需对冲") return # **关键检查**:筛选今天开立的左侧持仓 today_date = context.current_dt.date() today_opened_positions = {} for level_key, position_info in g.base_positions[dominant_future].items(): entry_date = position_info.get('entry_date') if entry_date is None: log.warning(f"对冲建仓检查: {symbol} 档位 {level_key} 缺少entry_date,跳过") continue # 将entry_date转换为date对象进行比较 if hasattr(entry_date, 'date'): entry_date_obj = entry_date.date() elif hasattr(entry_date, 'to_pydatetime'): entry_date_obj = entry_date.to_pydatetime().date() else: entry_date_obj = entry_date # 只记录今天开立的持仓 if entry_date_obj == today_date: today_opened_positions[level_key] = position_info log.info(f"对冲建仓检查: {symbol} 档位 {level_key} 今天开立,可建立对冲") else: log.info(f"对冲建仓检查: {symbol} 档位 {level_key} 开立日期 {entry_date_obj} ≠ 今天 {today_date},跳过对冲") # **关键判断**:如果今天没有开立任何左侧持仓,完全跳过对冲逻辑 if not today_opened_positions: # log.info(f"对冲建仓检查: {symbol} 今天未开立任何左侧持仓,跳过所有对冲建仓逻辑") return log.info(f"对冲建仓检查: {symbol} 当前价格 {current_price}, 今天开立的左侧持仓档位: {list(today_opened_positions.keys())}") # 收集对应基础左侧订单的成交情况,仅在订单成交后才建立对冲 level_fill_status = {} order_strategy_info_map = getattr(g, 'order_strategy_info', {}) if hasattr(g, 'order_strategy_info') else {} try: all_orders = get_orders() except Exception as e: log.warning(f"对冲建仓检查: {symbol} 获取订单信息失败,等待下次检查。错误: {str(e)}") all_orders = {} if all_orders: for order_id, order_obj in all_orders.items(): if order_id not in order_strategy_info_map: continue strategy_info = order_strategy_info_map[order_id] if strategy_info.get('strategy_type') != '基础左侧': continue if getattr(order_obj, 'security', None) != dominant_future: continue target_price_key = f"{strategy_info.get('target_price')}" if target_price_key not in today_opened_positions: continue status_value = getattr(order_obj, 'status', '') status_str = str(status_value) status_lower = status_str.lower() filled_amount = getattr(order_obj, 'filled', 0) or 0 order_amount = getattr(order_obj, 'amount', 0) or 0 info = level_fill_status.setdefault(target_price_key, { 'filled': 0, 'amount': 0, 'pending': False, 'waiting': False, 'statuses': set() }) info['filled'] += filled_amount info['amount'] += order_amount info['statuses'].add(status_str) if status_lower in ('open', 'part_filled', 'pending', 'submitted'): info['pending'] = True if status_lower in ('open', 'submitted') and filled_amount == 0: info['waiting'] = True # 只针对今天开立的基础左侧档位建立对应的对冲单 for level_key, position_info in today_opened_positions.items(): # 跳过已经建立对冲的档位 if level_key in g.hedge_positions[dominant_future]: log.info(f" 对冲档位 {level_key}: 已存在对冲持仓,跳过") continue fill_info = level_fill_status.get(level_key) if not fill_info: log.info(f" 对冲档位 {level_key}: 底仓订单未找到成交记录,等待成交后再建立对冲") continue required_quantity = position_info['quantity'] filled_quantity = fill_info.get('filled', 0) status_list = sorted(fill_info.get('statuses')) if fill_info.get('waiting', False): log.info(f" 对冲档位 {level_key}: 底仓限价单仍在挂单等待,状态集合 {status_list}") continue if filled_quantity < required_quantity: # log.info(f" 对冲档位 {level_key}: 底仓订单成交量不足 (成交 {filled_quantity} 手 / 目标 {required_quantity} 手,状态集合 {status_list}),跳过对冲") continue target_price = level_key # 目标价格本质就是对冲档位 quantity = position_info['quantity'] log.info(f" 对冲档位 {level_key}: 目标价格 {target_price}, 数量 {quantity}手(对应今天开立的左侧持仓)") # 建立空头对冲(永远使用限价单,价格为档位目标价) strategy_info = {'strategy_type': '对冲', 'target_price': target_price} _, success = open_position(context, dominant_future, quantity, 'short', f'建立空头对冲@{target_price}', limit_price=target_price, strategy_info=strategy_info) if success: try: entry_price_value = float(target_price) except (TypeError, ValueError): log.error(f"对冲建仓记录失败: 档位 {level_key} 目标价无法转换为浮点数 -> {target_price}") entry_price_value = target_price g.hedge_positions[dominant_future][level_key] = { 'entry_price': entry_price_value, 'quantity': quantity, 'entry_date': context.current_dt } log.info(f"对冲限价单挂单: {symbol} 档位 {level_key}, 价格 {target_price}, 数量 {quantity}手") def check_hedge_position_close(context, symbol, dominant_future, current_price): """检查空头对冲平仓(止损止盈) 对冲平仓逻辑: 1. 分别检查每个档位的对冲持仓 2. 每个档位独立计算盈亏和止损止盈条件 """ # 时间检查:判断是否已达到该品种的交易开始时间 if not is_trading_time_reached(context, symbol): log.info(f"对冲止损止盈跳过: {symbol} 未达到交易开始时间") return # 检查是否已有对冲持仓记录 if dominant_future not in g.hedge_positions or not g.hedge_positions[dominant_future]: return # 检查实际空头持仓 subportfolio = context.subportfolios[0] has_hedge_position = dominant_future in subportfolio.short_positions and \ subportfolio.short_positions[dominant_future].total_amount > 0 if not has_hedge_position: return # 问题1修复:获取当前日期,用于过滤当日新开仓的对冲头寸 today_date = context.current_dt.date() # 遍历每个档位的对冲持仓 levels_to_remove = [] for level_key, hedge_info in g.hedge_positions[dominant_future].items(): # 问题1修复:检查对冲头寸的开仓日期 entry_date = hedge_info.get('entry_date') if entry_date is not None: # 将entry_date转换为date对象进行比较 if hasattr(entry_date, 'date'): entry_date_obj = entry_date.date() elif hasattr(entry_date, 'to_pydatetime'): entry_date_obj = entry_date.to_pydatetime().date() else: entry_date_obj = entry_date # 如果是当日新开仓的对冲头寸,跳过止损止盈检查 if entry_date_obj == today_date: log.info(f"对冲止损止盈检查: {symbol} 档位 {level_key} 于今日开仓,跳过当日止损止盈检查") continue try: entry_price = float(hedge_info['entry_price']) except (TypeError, ValueError): log.error(f"对冲止损止盈计算失败: 档位 {level_key} 入场价无法转换为浮点数 -> {hedge_info.get('entry_price')}") continue quantity = hedge_info['quantity'] # 计算当前盈亏率(空头:入场价格 > 当前价格为盈利) profit_rate = (entry_price - current_price) / entry_price # 更新最大盈利 if profit_rate > g.hedge_state[dominant_future]['max_profit']: g.hedge_state[dominant_future]['max_profit'] = profit_rate g.hedge_state[dominant_future]['max_profit_price'] = current_price # 更新盈利阶段 if profit_rate >= g.hedge_level2_recovery_pct: g.hedge_state[dominant_future]['profit_stage'] = 2 elif profit_rate >= g.hedge_level1_recovery_pct: g.hedge_state[dominant_future]['profit_stage'] = 1 # 检查止损条件 if profit_rate <= -g.hedge_stoploss_pct: log.info(f"对冲触发止损: {symbol} 档位 {level_key}, 入场价 {entry_price}, 亏损率 {profit_rate:.2%}") close_position(context, dominant_future, quantity, 'short', f'对冲止损@档位{level_key}') levels_to_remove.append(level_key) continue # 检查成本区域止盈条件 if abs(profit_rate) <= g.hedge_cost_area_pct: profit_stage = g.hedge_state[dominant_future]['profit_stage'] if profit_stage >= 1: # 曾经达到过一级或二级盈利 log.info(f"对冲触发成本区域止盈: {symbol} 档位 {level_key}, 当前盈利 {profit_rate:.2%} " f"在成本区域内,曾达到盈利阶段 {profit_stage}") close_position(context, dominant_future, quantity, 'short', f'对冲成本区域止盈@档位{level_key}') levels_to_remove.append(level_key) continue # 移除已平仓的档位 for level_key in levels_to_remove: del g.hedge_positions[dominant_future][level_key] # 如果所有对冲持仓都已平仓,重置对冲状态 if not g.hedge_positions[dominant_future]: g.hedge_state[dominant_future] = {'max_profit': 0, 'max_profit_price': 0, 'profit_stage': 0} ############################ 交易执行函数 ################################### def open_position(context, security, quantity, direction, reason='', limit_price=None, strategy_info=None): """开仓 Args: context: 上下文对象 security: 合约代码 quantity: 开仓数量 direction: 方向 ('long' 或 'short') reason: 开仓原因 limit_price: 限价单价格,None表示使用市价单 strategy_info: 策略信息字典 {'strategy_type': '策略类型', 'target_price': 目标价格} Returns: tuple: (order_id, success) - 订单ID和是否成功 注:订单提交后filled=0,实际成交信息需要在收盘后通过get_trades()获取 """ try: order_type = "限价单" if limit_price is not None else "市价单" price_info = f"@{limit_price}" if limit_price is not None else "" log.info(f"提交开仓订单: {security} {direction} {quantity}手 {order_type}{price_info} - {reason}") # 使用order按手数开仓(避免order_target取消之前的未完成订单) # 根据是否有限价选择订单类型 if limit_price is not None: order_obj = order(security, quantity, LimitOrderStyle(limit_price), side=direction) else: order_obj = order(security, quantity, side=direction) if order_obj is not None: log.info(f"订单已提交: order_id={order_obj.order_id}, 状态={order_obj.status}, " f"已成交={order_obj.filled}/{quantity}手") # 保存策略信息到订单记录中,用于收盘后的汇总显示 if strategy_info: if not hasattr(g, 'order_strategy_info'): g.order_strategy_info = {} g.order_strategy_info[order_obj.order_id] = strategy_info log.info(f"订单策略信息已记录: {strategy_info}") return order_obj.order_id, True else: log.warning(f"订单提交失败: {security} {direction} {quantity}手") return None, False except Exception as e: log.warning(f"开仓异常 {security}: {str(e)}") return None, False def close_position(context, security, quantity, direction, reason=''): """平仓 注:平仓使用order_target是安全的,因为通常不会有多个平仓订单同时进行 订单提交后filled=0,实际成交信息需要在收盘后通过get_trades()获取 """ try: log.info(f"提交平仓订单: {security} {direction} {quantity}手 - {reason}") # 使用order_target平仓 subportfolio = context.subportfolios[0] current_position = 0 if direction == 'long' and security in subportfolio.long_positions: current_position = subportfolio.long_positions[security].total_amount elif direction == 'short' and security in subportfolio.short_positions: current_position = subportfolio.short_positions[security].total_amount target_quantity = max(0, current_position - quantity) order_obj = order_target(security, target_quantity, side=direction) if order_obj is not None: log.info(f"平仓订单已提交: order_id={order_obj.order_id}, 状态={order_obj.status}, " f"已成交={order_obj.filled}/{quantity}手") return True else: log.warning(f"平仓订单提交失败: {security} {direction} {quantity}手") return False except Exception as e: log.warning(f"平仓异常 {security}: {str(e)}") return False ############################ 辅助函数 ################################### def get_futures_config(underlying_symbol, config_key=None, default_value=None): """获取期货品种配置信息的辅助函数""" if underlying_symbol not in g.futures_config: if config_key and default_value is not None: return default_value return {} if config_key is None: return g.futures_config[underlying_symbol] return g.futures_config[underlying_symbol].get(config_key, default_value) def is_night_session_symbol(symbol): """判断品种是否为夜盘品种 Args: symbol: 期货品种代码(如'IF'、'AU'等) Returns: bool: True表示夜盘品种,False表示日盘品种 """ trading_start_time = get_futures_config(symbol, 'trading_start_time', '09:00') # 解析配置的时间 try: start_hour, _ = map(int, trading_start_time.split(':')) # 如果开始时间 >= 21:00,则为夜盘品种 return start_hour >= 21 except (ValueError, AttributeError): log.warning(f"{symbol} 交易开始时间配置格式错误: {trading_start_time},默认判断为日盘品种") return False def is_trading_time_reached(context, symbol): """判断当前时间是否已达到该品种的交易开始时间 Args: context: 上下文对象 symbol: 期货品种代码(如'IF'、'AU'等) Returns: bool: True表示已达到交易时间,False表示未达到 """ # 获取配置的交易开始时间 trading_start_time = get_futures_config(symbol, 'trading_start_time', '09:05') # 解析配置的时间(格式为 "HH:MM") try: start_hour, start_minute = map(int, trading_start_time.split(':')) configured_start_time = time(start_hour, start_minute) except (ValueError, AttributeError): log.warning(f"{symbol} 交易开始时间配置格式错误: {trading_start_time},默认使用09:05") configured_start_time = time(9, 5) # 获取当前时间 current_time = context.current_dt.time() # 判断逻辑: # 1. 如果配置时间是夜盘时间(>=21:00),且当前时间也在夜盘时段(>=21:00 或 <03:00),直接比较 # 2. 如果配置时间是日盘时间(<21:00),且当前时间也在日盘时段(09:00-15:00),直接比较 # 3. 其他情况返回False(例如日盘品种在夜盘时段) is_night_start = configured_start_time >= time(21, 0) is_current_night = current_time >= time(21, 0) or current_time < time(3, 0) is_current_day = time(9, 0) <= current_time <= time(15, 0) if is_night_start: # 配置为夜盘开始时间 if is_current_night: # 当前也在夜盘时段 if current_time >= time(21, 0): # 当前在21:00之后 result = current_time >= configured_start_time else: # 当前在凌晨(已经过了配置的开始时间) result = True # log.debug(f"[时间检查] {symbol} 配置夜盘开始 {trading_start_time}, 当前时间 {current_time.strftime('%H:%M:%S')}, 判断结果: {result}") return result elif is_current_day: # 夜盘品种在日盘时段,允许交易 # log.debug(f"[时间检查] {symbol} 夜盘品种在日盘时段 {current_time.strftime('%H:%M:%S')}, 允许交易") return True else: # 其他时段不允许交易 # log.debug(f"[时间检查] {symbol} 夜盘品种在非交易时段 {current_time.strftime('%H:%M:%S')}, 不允许交易") return False else: # 配置为日盘开始时间 if is_current_night: # 日盘品种在夜盘时段,不允许交易 # log.debug(f"[时间检查] {symbol} 日盘品种在夜盘时段 {current_time.strftime('%H:%M:%S')}, 不允许交易") return False elif is_current_day: # 当前在日盘时段,比较时间 result = current_time >= configured_start_time # log.debug(f"[时间检查] {symbol} 配置日盘开始 {trading_start_time}, 当前时间 {current_time.strftime('%H:%M:%S')}, 判断结果: {result}") return result else: # 其他时段(如午休、收盘后)不允许建仓,但允许止盈止损 # log.debug(f"[时间检查] {symbol} 日盘品种在非开盘时段 {current_time.strftime('%H:%M:%S')}, 不允许交易") return False def get_margin_rate(underlying_symbol, direction, default_rate=0.10): """获取保证金比例的辅助函数""" return g.futures_config.get(underlying_symbol, {}).get('margin_rate', {}).get(direction, default_rate) def get_multiplier(underlying_symbol, default_multiplier=10): """获取合约乘数的辅助函数""" return g.futures_config.get(underlying_symbol, {}).get('multiplier', default_multiplier) def after_market_close(context): """收盘后运行函数 使用get_trades()获取当日所有成交记录 使用get_open_orders()获取所有未成交订单 """ log.info(str('函数运行时间(after_market_close):'+str(context.current_dt.time()))) # 获取当日所有成交记录 trades = get_trades() # 获取当日所有未完成订单 open_orders = get_open_orders() # 打印交易汇总 print_daily_trading_summary(context, trades, open_orders) log.info('##############################################################') def print_daily_trading_summary(context, trades, open_orders): """打印当日交易汇总 Args: context: 上下文对象 trades: 当日成交记录字典 {trade_id: UserTrade对象} open_orders: 未完成订单字典 {order_id: Order对象} 注:UserTrade对象只有基本属性(trade_id, order_id, time, amount, price) 需要通过order_id关联订单来获取完整信息 """ log.info("=" * 60) log.info("当日交易汇总") log.info("=" * 60) current_data = {} try: current_data = get_current_data() except Exception as e: log.warning(f"获取当前行情数据失败: {str(e)}") current_data = {} def format_datetime_value(dt_value): if hasattr(dt_value, 'to_pydatetime'): dt_value = dt_value.to_pydatetime() if isinstance(dt_value, datetime): return dt_value.strftime('%Y-%m-%d %H:%M:%S') if isinstance(dt_value, date): return datetime.combine(dt_value, datetime.min.time()).strftime('%Y-%m-%d %H:%M:%S') return str(dt_value) if dt_value is not None else '' # 统计成交记录 if trades: log.info(f"\n【成交记录】共 {len(trades)} 笔") log.info("-" * 60) # 获取所有订单(包括已完成和未完成的)用于关联成交记录 all_orders = get_orders() for trade_id, trade in trades.items(): # 从trade获取基本信息 security = None # 尝试通过order_id获取订单信息 order_info = None if hasattr(trade, 'order_id') and trade.order_id in all_orders: order_info = all_orders[trade.order_id] security = order_info.security # 如果无法获取security,跳过这条记录 trade_time = getattr(trade, 'time', None) trade_datetime_str = format_datetime_value(trade_time) if not security: log.info(f"成交记录 trade_id={trade_id}: {trade.amount}手 @{trade.price:.2f} " f"成交日期时间:{trade_datetime_str} (无法获取详细信息)") continue underlying_symbol = security.split('.')[0][:-4] dominant_code = None if hasattr(g, 'dominant_contracts'): dominant_code = g.dominant_contracts.get(underlying_symbol) display_symbol = dominant_code if dominant_code else security.split('.')[0] # 从订单获取action和side信息 if order_info: action_str = "开仓" if order_info.action == "open" else "平仓" side_str = "多" if order_info.side == "long" else "空" # 获取策略信息 strategy_str = "" if hasattr(g, 'order_strategy_info') and trade.order_id in g.order_strategy_info: strategy_info = g.order_strategy_info[trade.order_id] strategy_type = strategy_info.get('strategy_type', '未知') target_price = strategy_info.get('target_price', 0) strategy_str = f", 策略:{strategy_type}, 策略对应价格:{target_price}" # 获取当前价格 try: current_price = current_data[security].last_price except Exception: current_price = 0 log.info(f"{action_str}{side_str}: {display_symbol}, " f"数量:{trade.amount}手, 成交价:{trade.price:.2f}{strategy_str}, " f"标的价格:{current_price:.2f}, " f"成交日期时间:{trade_datetime_str}") else: log.info(f"成交: {underlying_symbol} {trade.amount}手 @{trade.price:.2f} " f"成交日期时间:{trade_datetime_str}") log.info("-" * 60) else: log.info("\n【成交记录】无") # 统计未完成订单 if open_orders: log.info(f"\n【未完成订单】共 {len(open_orders)} 个") log.info("-" * 60) for order_id, order_obj in open_orders.items(): dominant_code = None try: underlying_symbol = order_obj.security.split('.')[0][:-4] full_symbol = order_obj.security.split('.')[0] if hasattr(g, 'dominant_contracts'): dominant_code = g.dominant_contracts.get(underlying_symbol) except Exception: underlying_symbol = order_obj.security display_symbol = dominant_code if dominant_code else full_symbol action_str = "开仓" if order_obj.action == "open" else "平仓" side_str = "多" if order_obj.side == "long" else "空" # 判断订单类型 order_type = "市价单" limit_price_str = "" if hasattr(order_obj, 'style') and order_obj.style: if hasattr(order_obj.style, 'limit_price') and order_obj.style.limit_price > 0: order_type = "限价单" limit_price_str = f" @{order_obj.style.limit_price:.2f}" log.info(f"{action_str}{side_str}: {display_symbol} " f"{order_obj.amount}手 {order_type}{limit_price_str} " f"状态:{order_obj.status}") log.info("-" * 60) else: log.info("\n【未完成订单】无") # 打印持仓信息 log.info("\n【当前持仓】") log.info("-" * 60) subportfolio = context.subportfolios[0] holdings_logged = False for security, position in subportfolio.long_positions.items(): if position.total_amount <= 0: continue underlying_symbol = security.split('.')[0][:-4] full_symbol = security.split('.')[0] dominant_code = None if hasattr(g, 'dominant_contracts'): dominant_code = g.dominant_contracts.get(underlying_symbol) display_symbol = dominant_code if dominant_code else full_symbol try: current_price = current_data[security].last_price except Exception: current_price = 0 symbol_logged = False base_entries = g.base_positions.get(security, {}) if hasattr(g, 'base_positions') else {} for level_key, entry_info in sorted(base_entries.items()): quantity = entry_info.get('quantity', 0) if quantity <= 0: continue entry_price = float(entry_info.get('entry_price', 0)) entry_date = entry_info.get('entry_date') log.info(f"标的种类: {display_symbol}, 数量:{quantity}手, 成交价:{entry_price:.2f}, " f"策略:基础左侧, 策略对应价格:{level_key}, 当前标的价格:{current_price:.2f}, " f"成交日期时间: {format_datetime_value(entry_date)}") symbol_logged = True holdings_logged = True grid_entries = g.grid_positions.get(security, []) if hasattr(g, 'grid_positions') else [] for entry_info in sorted(grid_entries, key=lambda x: x.get('target_price', 0), reverse=True): quantity = entry_info.get('quantity', 0) if quantity <= 0: continue entry_price = float(entry_info.get('entry_price', 0)) target_price = entry_info.get('target_price', 0) entry_date = entry_info.get('entry_date') log.info(f"标的种类: {display_symbol}, 数量:{quantity}手, 成交价:{entry_price:.2f}, " f"策略:网格, 策略对应价格:{target_price}, 当前标的价格:{current_price:.2f}, " f"成交日期时间: {format_datetime_value(entry_date)}") symbol_logged = True holdings_logged = True if not symbol_logged: log.info(f"标的种类: {display_symbol}, 数量:{position.total_amount}手, 成交价:{position.avg_cost:.2f}, " f"策略:未分类, 策略对应价格:--, 当前标的价格:{current_price:.2f}, 成交日期时间: --") holdings_logged = True for security, position in subportfolio.short_positions.items(): if position.total_amount <= 0: continue underlying_symbol = security.split('.')[0][:-4] dominant_code = None if hasattr(g, 'dominant_contracts'): dominant_code = g.dominant_contracts.get(underlying_symbol) display_symbol = dominant_code if dominant_code else full_symbol try: current_price = current_data[security].last_price except Exception: current_price = 0 symbol_logged = False hedge_entries = g.hedge_positions.get(security, {}) if hasattr(g, 'hedge_positions') else {} for level_key, entry_info in sorted(hedge_entries.items()): quantity = entry_info.get('quantity', 0) if quantity <= 0: continue entry_price = float(entry_info.get('entry_price', 0)) entry_date = entry_info.get('entry_date') log.info(f"标的种类: {display_symbol}, 数量:{quantity}手, 成交价:{entry_price:.2f}, " f"策略:对冲, 策略对应价格:{level_key}, 当前标的价格:{current_price:.2f}, " f"成交日期时间: {format_datetime_value(entry_date)}") symbol_logged = True holdings_logged = True if not symbol_logged: log.info(f"标的种类: {display_symbol}, 数量:{position.total_amount}手, 成交价:{position.avg_cost:.2f}, " f"策略:未分类(空), 策略对应价格:--, 当前标的价格:{current_price:.2f}, 成交日期时间: --") holdings_logged = True if not holdings_logged: log.info("无持仓") log.info("-" * 60) log.info(f"账户总资产: {context.portfolio.total_value:.2f}") log.info(f"可用资金: {context.portfolio.available_cash:.2f}") log.info("=" * 60) ########################## 自动移仓换月函数 ################################# def position_auto_switch(context, pindex=0, switch_func=None, callback=None): """ 期货自动移仓换月。默认使用市价单进行开平仓。 """ import re subportfolio = context.subportfolios[pindex] symbols = set(subportfolio.long_positions.keys()) | set(subportfolio.short_positions.keys()) switch_result = [] switch_failed = [] for symbol in symbols: match = re.match(r"(?P[A-Z]{1,})", symbol) if not match: raise ValueError("未知期货标的: {}".format(symbol)) else: underlying_symbol = match.groupdict()["underlying_symbol"] # 时间检查:判断是否已达到该品种的交易开始时间 if not is_trading_time_reached(context, underlying_symbol): log.info(f"换月操作跳过: {underlying_symbol} 未达到交易开始时间") continue dominant = get_dominant_future(underlying_symbol) cur = get_current_data() symbol_last_price = cur[symbol].last_price dominant_last_price = cur[dominant].last_price # log.debug(f'检查换月中,当前持仓合约: {symbol}, 当前主力合约: {dominant}') if dominant > symbol: for positions_ in (subportfolio.long_positions, subportfolio.short_positions): if symbol not in positions_.keys(): continue else : p = positions_[symbol] if switch_func is not None: switch_func(context, pindex, p, dominant) else: amount = p.total_amount # 跌停不能开空和平多,涨停不能开多和平空。 if p.side == "long": symbol_low_limit = cur[symbol].low_limit dominant_high_limit = cur[dominant].high_limit if symbol_last_price <= symbol_low_limit: log.warning("标的{}跌停,无法平仓。移仓换月取消。".format(symbol)) switch_failed.append({"underlying": match.groupdict()["underlying_symbol"], "before": symbol, "after": dominant, "side": p.side, "reason": "limit_close_failed"}) break elif dominant_last_price >= dominant_high_limit: log.warning("标的{}涨停,无法开仓。移仓换月取消。".format(dominant)) switch_failed.append({"underlying": match.groupdict()["underlying_symbol"], "before": symbol, "after": dominant, "side": p.side, "reason": "limit_open_failed"}) break else: log.info("进行移仓换月: ({0},long) -> ({1},long)".format(symbol, dominant)) order_old = order_target(symbol, 0, side='long') if order_old != None and order_old.filled > 0: order_new = order_target(dominant, amount, side='long') if order_new != None and order_new.filled > 0: switch_result.append({"before": symbol, "after": dominant, "side": "long"}) # 换月成功,更新持仓记录 update_positions_after_switch(symbol, dominant, 'long') else: log.warning("标的{}交易失败,无法开仓。移仓换月失败。".format(dominant)) switch_failed.append({"underlying": match.groupdict()["underlying_symbol"], "before": symbol, "after": dominant, "side": p.side, "reason": "open_order_failed"}) break else: log.info("标的{}平仓委托未成交,移仓换月失败,等待下次重试。".format(symbol)) switch_failed.append({"underlying": match.groupdict()["underlying_symbol"], "before": symbol, "after": dominant, "side": p.side, "reason": "close_order_not_filled"}) break if switch_failed: break if p.side == "short": symbol_high_limit = cur[symbol].high_limit dominant_low_limit = cur[dominant].low_limit if symbol_last_price >= symbol_high_limit: log.warning("标的{}涨停,无法平仓。移仓换月取消。".format(symbol)) switch_failed.append({"underlying": match.groupdict()["underlying_symbol"], "before": symbol, "after": dominant, "side": p.side, "reason": "limit_close_failed"}) break elif dominant_last_price <= dominant_low_limit: log.warning("标的{}跌停,无法开仓。移仓换月取消。".format(dominant)) switch_failed.append({"underlying": match.groupdict()["underlying_symbol"], "before": symbol, "after": dominant, "side": p.side, "reason": "limit_open_failed"}) break else: log.info("进行移仓换月: ({0},short) -> ({1},short)".format(symbol, dominant)) order_old = order_target(symbol, 0, side='short') if order_old != None and order_old.filled > 0: order_new = order_target(dominant, amount, side='short') if order_new != None and order_new.filled > 0: switch_result.append({"before": symbol, "after": dominant, "side": "short"}) # 换月成功,更新持仓记录 update_positions_after_switch(symbol, dominant, 'short') else: log.warning("标的{}交易失败,无法开仓。移仓换月失败。".format(dominant)) switch_failed.append({"underlying": match.groupdict()["underlying_symbol"], "before": symbol, "after": dominant, "side": p.side, "reason": "open_order_failed"}) break else: log.info("标的{}平仓委托未成交,移仓换月失败,等待下次重试。".format(symbol)) switch_failed.append({"underlying": match.groupdict()["underlying_symbol"], "before": symbol, "after": dominant, "side": p.side, "reason": "close_order_not_filled"}) break if callback: callback(context, pindex, p, dominant) if switch_failed: break if switch_failed: break if switch_failed: break return switch_result, switch_failed def update_positions_after_switch(old_symbol, new_symbol, side): """合约切换后更新持仓记录""" # 更新底仓持仓记录 if old_symbol in g.base_positions: g.base_positions[new_symbol] = g.base_positions[old_symbol] del g.base_positions[old_symbol] log.info(f"底仓持仓记录更新: {old_symbol} -> {new_symbol}") # 更新底仓待成交限价单记录(问题2修复) if old_symbol in g.base_pending_orders: g.base_pending_orders[new_symbol] = g.base_pending_orders[old_symbol] del g.base_pending_orders[old_symbol] log.info(f"底仓待成交限价单记录更新: {old_symbol} -> {new_symbol}") # 更新网格持仓记录 if old_symbol in g.grid_positions: g.grid_positions[new_symbol] = g.grid_positions[old_symbol] del g.grid_positions[old_symbol] log.info(f"网格持仓记录更新: {old_symbol} -> {new_symbol}") if old_symbol in g.grid_buy_levels: g.grid_buy_levels[new_symbol] = g.grid_buy_levels[old_symbol] del g.grid_buy_levels[old_symbol] # 更新对冲持仓记录 if old_symbol in g.hedge_positions: g.hedge_positions[new_symbol] = g.hedge_positions[old_symbol] del g.hedge_positions[old_symbol] log.info(f"对冲持仓记录更新: {old_symbol} -> {new_symbol}") if old_symbol in g.hedge_state: g.hedge_state[new_symbol] = g.hedge_state[old_symbol] del g.hedge_state[old_symbol]