""" 期货网格交易研究分析工具(带主力合约切换) 研究期货网格交易策略在不同配置下的表现,支持多种交易场景的对比分析 核心功能包括主力合约自动切换、强制平仓和重新建仓逻辑 本程序实现完整的网格交易分析流程: 1. 主力合约监控与切换 - 自动检测主力合约变化并处理切换 2. 合约选择逻辑 - 基于主力合约选择算法 3. 基础头寸交易 - 价格-数量网格配置,支持合约切换时重新建仓 4. 网格交易策略 - 限价订单网格买入卖出,合约切换时根据价格条件重新建仓 5. 网格对冲策略 - 带止损的增强网格交易,合约切换时根据价格条件重新建仓 6. 统计分析对比 - 四种交易场景性能分析 主要特点: - 主力合约自动监控:每日检测主力合约变化 - 强制平仓机制:合约切换时立即平掉旧合约所有头寸 - 智能重新建仓:根据价格条件在新合约中重新建立头寸 - 标准期货盈亏计算:使用正确的合约倍数和期货盈亏公式 - 最终持仓结算:分析期结束时对所有未平仓头寸进行市值计价 - 完整交易记录:记录所有交易包括合约切换引起的强制平仓 期货盈亏计算公式: - 多头:(出场价格 - 入场价格) × 合约倍数 × 数量 - 空头:(入场价格 - 出场价格) × 合约倍数 × 数量 注:程序支持多个核心商品同时分析,生成详细的交易记录和统计报告 作者: jukuan研究团队 日期: 2025-09 适用平台: 聚宽在线研究平台 """ import pandas as pd import numpy as np from jqdata import * import datetime import warnings warnings.filterwarnings('ignore') # ===================================================================================== # 分析配置参数 - 集中配置部分 # ===================================================================================== class GridTradingConfig: """期货网格交易分析配置参数""" # ==================== 时间范围设置 ==================== START_DATE = datetime.datetime(2024, 11, 7) # 分析开始日期 END_DATE = datetime.datetime(2025, 9, 19) # 分析结束日期(修正为9月19日避免数据缺失) # ==================== 期货合约倍数配置 ==================== FUTURES_MULTIPLIER = { # 贵金属 'AU': 1000, # 黄金 'AG': 15, # 白银 # 有色金属 'CU': 5, 'AL': 5, 'ZN': 5, 'PB': 5, 'NI': 1, 'SN': 1, 'SS': 5, # 黑色系 'RB': 10, 'HC': 10, 'I': 100, 'JM': 100, 'J': 60, # 能源化工 'SP': 10, 'FU': 10, 'BU': 10, 'RU': 10, 'BR': 5, 'SC': 1000, 'NR': 10, 'LU': 10, 'LC': 1, # 化工 'FG': 20, 'TA': 5, 'MA': 10, 'SA': 20, 'L': 5, 'V': 5, 'EG': 10, 'PP': 5, 'EB': 5, 'PG': 20, # 农产品 'RM': 10, 'OI': 10, 'CF': 5, 'SR': 10, 'PF': 5, 'C': 10, 'CS': 10, 'CY': 5, 'A': 10, 'B': 10, 'M': 10, 'Y': 10, 'P': 10, # 股指期货 'IF': 300, 'IH': 300, 'IC': 200, 'IM': 200, # 其他 'AP': 10, 'CJ': 5, 'PK': 5, 'JD': 10, 'LH': 16 } # ==================== 核心商品配置 ==================== CORE_COMMODITIES = { 'SA': ['SA2509.XZCE', 'SA2601.XZCE'], # 纯碱 'M': ['M2507.XDCE', 'M2605.XDCE'], # 豆粕 # 'MA': ['MA2501.XZCE', 'MA2505.XZCE'], # 甲醇 # 'AU': ['AU2412.XSGE', 'AU2502.XSGE'], # 黄金 # 'AG': ['AG2412.XSGE', 'AG2502.XSGE'], # 白银 # 'RU': ['RU2501.XSGE', 'RU2505.XSGE'], # 橡胶 } # ==================== 合约切换配置 ==================== REQUIRED_TRADING_DAYS = 30 # 合约切换前需要的最少有效交易日数 # ==================== 基础头寸交易配置 ==================== BASE_POSITION_GRID = { 'SA': {1400: 4, 1300: 6, 1200: 8, 1100: 12, 1000: 14, 900: 16}, 'M': {2800: 4, 2750: 6, 2700: 8, 2650: 12, 2600: 14, 2550: 16}, # 'MA': {2500: 8, 2400: 8, 2300: 8, 2200: 8, 2100: 8, 2000: 8}, # 'AU': {520: 2, 510: 2, 500: 2, 490: 2, 480: 2, 470: 2}, # 'AG': {6500: 10, 6000: 10, 5500: 10, 5000: 10, 4500: 10, 4000: 10}, # 'RU': {18000: 5, 17000: 5, 16000: 5, 15000: 5, 14000: 5, 13000: 5} } # 统一退出价格(无止损) BASE_POSITION_EXIT_PRICE = { 'SA': 1500, 'M': 3800, # 'MA': 2600, # 'AU': 530, # 'AG': 7000, # 'RU': 19000 } # ==================== 网格交易配置 ==================== GRID_TRADING_CONFIG = { 'SA': { 'start_price': 1250, # 开始价格 'grid_size': 50, # 网格大小 'quantity_per_grid': 5, # 每网格数量 'exit_grid_size': 50 # 退出网格大小 }, 'M': { 'start_price': 2800, 'grid_size': 100, 'quantity_per_grid': 10, 'exit_grid_size': 100 }, # 'MA': { # 'start_price': 2300, # 'grid_size': 50, # 'quantity_per_grid': 4, # 'exit_grid_size': 50 # }, # 'AU': { # 'start_price': 480, # 'grid_size': 10, # 'quantity_per_grid': 1, # 'exit_grid_size': 10 # }, # 'AG': { # 'start_price': 5500, # 'grid_size': 200, # 'quantity_per_grid': 8, # 'exit_grid_size': 200 # }, # 'RU': { # 'start_price': 15000, # 'grid_size': 500, # 'quantity_per_grid': 3, # 'exit_grid_size': 500 # } } # ==================== 网格对冲策略配置 ==================== GRID_HEDGE_CONFIG = { 'stop_loss_percentage': 0.02, # 2%止损百分比 'enable_hedge_strategy': True # 是否启用网格对冲策略 } # ==================== 输出设置 ==================== OUTPUT_ENCODING = 'utf-8-sig' # 输出文件编码格式 VERBOSE_LOGGING = True # 是否打印详细日志 @classmethod def print_config(cls): """打印当前配置信息""" print("=== 期货网格交易分析配置 ===") print(f"分析时间范围: {cls.START_DATE.strftime('%Y-%m-%d')} 至 {cls.END_DATE.strftime('%Y-%m-%d')}") print(f"核心商品数量: {len(cls.CORE_COMMODITIES)}") print("核心商品列表:") for commodity, contracts in cls.CORE_COMMODITIES.items(): print(f" {commodity}: {contracts}") print(f"网格对冲策略: {'启用' if cls.GRID_HEDGE_CONFIG['enable_hedge_strategy'] else '禁用'}") print(f"止损百分比: {cls.GRID_HEDGE_CONFIG['stop_loss_percentage']*100}%") print(f"详细日志: {'开启' if cls.VERBOSE_LOGGING else '关闭'}") print("=" * 50) class FutureGridTradingAnalyzer: """期货网格交易分析器""" def __init__(self, config=None): """初始化分析器""" if config is None: config = GridTradingConfig self.config = config self.start_date = config.START_DATE self.end_date = config.END_DATE self.core_commodities = config.CORE_COMMODITIES self.base_position_grid = config.BASE_POSITION_GRID self.base_position_exit_price = config.BASE_POSITION_EXIT_PRICE self.grid_trading_config = config.GRID_TRADING_CONFIG self.grid_hedge_config = config.GRID_HEDGE_CONFIG self.verbose_logging = config.VERBOSE_LOGGING self.output_encoding = config.OUTPUT_ENCODING # 存储结果的字典 self.selected_contracts = {} # 选中的合约 self.price_data = {} # 价格数据 self.dominant_contract_history = {} # 主力合约历史变化 self.active_positions = { # 当前活跃头寸跟踪 'base_position': {}, 'grid_trading': {}, 'grid_hedge': {} } self.trading_results = { # 四种交易场景的结果 'base_position': [], 'grid_trading': [], 'grid_hedge': [], 'combined': [] } if self.verbose_logging: print("初始化期货网格交易分析器") print(f"核心商品: {list(self.core_commodities.keys())}") print(f"分析期间: {self.start_date.strftime('%Y-%m-%d')} - {self.end_date.strftime('%Y-%m-%d')}") def select_contracts(self): """ 合约选择逻辑 1. 首先获取商品的主导合约 2. 如果主导合约在可用列表中,选择它 3. 如果主导合约不在列表中,选择未来到期日期最近且晚于主导合约的合约 """ if self.verbose_logging: print("\n=== 步骤1: 合约选择逻辑 ===") for commodity, available_contracts in self.core_commodities.items(): if self.verbose_logging: print(f"\n处理商品: {commodity}") print(f"可用合约: {available_contracts}") try: # 获取商品的主导合约 dominant_contract = get_dominant_future(commodity, self.start_date.date()) if self.verbose_logging: print(f"主导合约: {dominant_contract}") if dominant_contract in available_contracts: # 主导合约在可用列表中,直接选择 selected_contract = dominant_contract if self.verbose_logging: print(f"选择主导合约: {selected_contract}") else: # 主导合约不在列表中,选择最近的未来合约 selected_contract = self._select_nearest_future_contract( commodity, dominant_contract, available_contracts ) if self.verbose_logging: print(f"选择最近的未来合约: {selected_contract}") self.selected_contracts[commodity] = selected_contract except Exception as e: if self.verbose_logging: print(f"获取{commodity}主导合约失败: {str(e)}") # 默认选择第一个可用合约 self.selected_contracts[commodity] = available_contracts[0] if self.verbose_logging: print(f"默认选择第一个合约: {available_contracts[0]}") if self.verbose_logging: print(f"\n合约选择完成,共选择{len(self.selected_contracts)}个合约") for commodity, contract in self.selected_contracts.items(): print(f" {commodity}: {contract}") return self.selected_contracts def _select_nearest_future_contract(self, commodity, dominant_contract, available_contracts): """选择最近的未来到期合约""" if not dominant_contract: return available_contracts[0] # 解析主导合约的到期月份 try: # 提取合约代码中的月份信息 (例如 SA2507 -> 2507) dominant_year_month = dominant_contract.split('.')[0][-4:] # 取最后4位 dominant_year = int(dominant_year_month[:2]) + 2000 # 假设是21世纪 dominant_month = int(dominant_year_month[2:]) except: return available_contracts[0] # 找到最近的未来合约 best_contract = available_contracts[0] best_diff = float('inf') for contract in available_contracts: try: contract_year_month = contract.split('.')[0][-4:] contract_year = int(contract_year_month[:2]) + 2000 contract_month = int(contract_year_month[2:]) # 计算月份差异 contract_total_months = contract_year * 12 + contract_month dominant_total_months = dominant_year * 12 + dominant_month # 只选择晚于主导合约的合约 if contract_total_months > dominant_total_months: diff = contract_total_months - dominant_total_months if diff < best_diff: best_diff = diff best_contract = contract except: continue return best_contract def _get_futures_multiplier(self, commodity): """获取期货合约倍数""" return self.config.FUTURES_MULTIPLIER.get(commodity, 10) # 默认倍数为10 def _calculate_futures_pnl(self, entry_price, exit_price, quantity, commodity, is_long=True): """ 计算期货盈亏 参数: entry_price: 入场价格 exit_price: 出场价格 quantity: 数量(手数) commodity: 商品代码 is_long: 是否多头,True为多头,False为空头 返回: 实际盈亏金额 """ multiplier = self._get_futures_multiplier(commodity) if is_long: # 多头:(出场价格 - 入场价格) × 合约倍数 × 数量 pnl = (exit_price - entry_price) * multiplier * quantity else: # 空头:(入场价格 - 出场价格) × 合约倍数 × 数量 pnl = (entry_price - exit_price) * multiplier * quantity return pnl def build_dominant_contract_history(self): """ 构建主力合约历史变化记录 为每个商品在整个分析期间构建主力合约变化的时间序列 只有当合约真正发生变化时才记录为合约切换 """ if self.verbose_logging: print("\n=== 步骤2:构建主力合约历史变化记录 ===") for commodity in self.core_commodities.keys(): if self.verbose_logging: print(f"构建 {commodity} 主力合约历史...") contract_history = [] current_date = self.start_date.date() end_date = self.end_date.date() current_selected_contract = None # 跟踪选择的合约而不是主力合约 while current_date <= end_date: # 跳过非交易日 if current_date.weekday() >= 5: # 周六周日 current_date += datetime.timedelta(days=1) continue try: # 获取当日主力合约 dominant_contract = get_dominant_future(commodity, current_date) # print(f"日期: {current_date}, 主力合约: {dominant_contract}") selected_contract = self._match_to_available_contract(commodity, dominant_contract) # 只有当选择的合约真正发生变化时才记录 if selected_contract != current_selected_contract: contract_history.append({ 'date': current_date, 'dominant_contract': dominant_contract, 'selected_contract': selected_contract, 'is_initial': current_selected_contract is None # 标记是否为初始合约 }) if self.verbose_logging: if current_selected_contract is None: print(f" {current_date}: 初始合约设置为 {selected_contract}") else: print(f" {current_date}: 合约切换 {current_selected_contract} -> {selected_contract}") current_selected_contract = selected_contract except Exception as e: if self.verbose_logging: print(f" 获取 {current_date} 的主力合约时出错: {str(e)}") current_date += datetime.timedelta(days=1) self.dominant_contract_history[commodity] = contract_history if self.verbose_logging: total_changes = sum(len(history) for history in self.dominant_contract_history.values()) actual_switches = sum( sum(1 for change in history if not change.get('is_initial', False)) for history in self.dominant_contract_history.values() ) initial_setups = total_changes - actual_switches print(f"主力合约历史构建完成,共 {total_changes} 次记录({initial_setups} 次初始设置,{actual_switches} 次真实切换)") return self.dominant_contract_history def _match_to_available_contract(self, commodity, dominant_contract): """将主力合约匹配到可用合约列表""" available_contracts = self.core_commodities.get(commodity, []) if dominant_contract in available_contracts: return dominant_contract else: return self._select_nearest_future_contract(commodity, dominant_contract, available_contracts) def collect_price_data(self): """收集所有可能用到的合约价格数据(优化日期范围)""" if self.verbose_logging: print("\n=== 步骤3: 收集价格数据(优化日期范围) ===") # 清除之前的调整建议 if hasattr(self, 'adjustment_suggestions'): self.adjustment_suggestions = [] # 为每个商品创建数据存储结构 for commodity in self.core_commodities.keys(): print(f'收集{commodity}的价格数据:') self.price_data[commodity] = {} # 根据主力合约历史确定每个合约的数据获取范围 contract_date_ranges = self._determine_contract_date_ranges(commodity) for contract, date_range in contract_date_ranges.items(): start_date, end_date = date_range if self.verbose_logging: print(f"获取 {contract} 价格数据...") print(f" 优化日期范围: {start_date} 至 {end_date}") try: # 获取价格数据(使用优化的日期范围) data = get_price( contract, start_date=start_date, end_date=end_date, frequency='daily', fields=['open', 'close', 'high', 'low', 'volume'], skip_paused=False, panel=False ) if data is not None and len(data) > 0: self.price_data[commodity][contract] = data # 检查这个数据里有多少条空值数据 empty_data = data[data.isna().any(axis=1)] # 检查有效交易日数据并收集调整建议 adjustment_info = self._check_thirty_day_trading_data(commodity, contract, data, start_date, end_date) if adjustment_info and adjustment_info.get('needs_adjustment'): # 暂存调整建议,稍后统一处理 if not hasattr(self, 'adjustment_suggestions'): self.adjustment_suggestions = [] self.adjustment_suggestions.append(adjustment_info) if self.verbose_logging: print(f" ✅ 成功获取{len(data)}条数据记录") print(f" 空值数据: {len(empty_data)}条") print(f" 价格范围: {data['low'].min():.2f} - {data['high'].max():.2f}") print(f" 数据日期范围: {data.index[0].date()} 至 {data.index[-1].date()}") else: if self.verbose_logging: print(f" ⚠️ 未获取到{contract}的数据") # 如果优化日期范围没有数据,尝试使用更宽泛的日期范围 if self.verbose_logging: print(f" 尝试使用更宽泛的日期范围获取数据...") try: fallback_data = get_price( contract, start_date=self.start_date, end_date=self.end_date, frequency='daily', fields=['open', 'close', 'high', 'low', 'volume'], skip_paused=False, panel=False ) if fallback_data is not None and len(fallback_data) > 0: self.price_data[commodity][contract] = fallback_data # 检查有效交易日数据并收集调整建议(回退方案) adjustment_info = self._check_thirty_day_trading_data(commodity, contract, fallback_data, self.start_date, self.end_date) if adjustment_info and adjustment_info.get('needs_adjustment'): if not hasattr(self, 'adjustment_suggestions'): self.adjustment_suggestions = [] self.adjustment_suggestions.append(adjustment_info) if self.verbose_logging: print(f" ✅ 回退方案成功获取{len(fallback_data)}条数据记录") print(f" 数据日期范围: {fallback_data.index[0].date()} 至 {fallback_data.index[-1].date()}") else: if self.verbose_logging: print(f" ❌ 回退方案也未获取到{contract}的数据") except Exception as fallback_e: if self.verbose_logging: print(f" ❌ 回退方案出错: {str(fallback_e)}") except Exception as e: if self.verbose_logging: print(f" ❌ 获取{contract}数据时出错: {str(e)}") continue # 处理动态调整建议 if hasattr(self, 'adjustment_suggestions') and self.adjustment_suggestions: self._apply_dynamic_adjustments() if self.verbose_logging: total_contracts = sum(len(contracts) for contracts in self.price_data.values()) print(f"价格数据收集完成,共{total_contracts}个合约") return self.price_data def _determine_contract_date_ranges(self, commodity): """ 根据主力合约历史确定每个合约的最优数据获取日期范围 """ contract_ranges = {} if commodity not in self.dominant_contract_history: # 如果没有主力合约历史,使用全范围 for contract in self.core_commodities[commodity]: contract_ranges[contract] = (self.start_date, self.end_date) return contract_ranges contract_history = self.dominant_contract_history[commodity] # 分析每个合约的活跃期间 for contract in self.core_commodities[commodity]: contract_start = self.start_date contract_end = self.end_date # 查找该合约在主力合约历史中的使用时间段 for i, history_record in enumerate(contract_history): if history_record['selected_contract'] == contract: # 该合约开始使用的日期 if history_record.get('is_initial', False): # 初始设置的合约,从分析开始日期或历史记录日期开始 contract_start = max(self.start_date.date(), history_record['date']) else: # 切换到的合约,从切换日期开始 contract_start = history_record['date'] # 查找该合约结束使用的日期 for j in range(i + 1, len(contract_history)): next_record = contract_history[j] if next_record['selected_contract'] != contract: # 找到下一次切换,该合约在此日期结束使用 contract_end = next_record['date'] break else: # 该合约一直使用到分析结束 contract_end = self.end_date.date() break # 转换为datetime格式并添加缓冲区 if isinstance(contract_start, datetime.date): contract_start = datetime.datetime.combine(contract_start, datetime.time.min) if isinstance(contract_end, datetime.date): contract_end = datetime.datetime.combine(contract_end, datetime.time.max) # 添加缓冲期(前后各3天)以确保数据充足 buffer_days = 3 contract_start_buffered = contract_start - datetime.timedelta(days=buffer_days) contract_end_buffered = contract_end + datetime.timedelta(days=buffer_days) # 确保不超出总体分析范围 contract_start_final = max(contract_start_buffered, self.start_date) contract_end_final = min(contract_end_buffered, self.end_date) contract_ranges[contract] = (contract_start_final, contract_end_final) if self.verbose_logging: print(f" {contract}: {contract_start_final.date()} 至 {contract_end_final.date()}") return contract_ranges def _check_thirty_day_trading_data(self, commodity, contract, data, start_date, end_date): """ 检查合约是否有足够的有效交易日数据并进行动态调整 返回调整建议信息 """ if data is None or len(data) == 0: print(f" ⚠️ {contract}: 无价格数据") return None required_days = self.config.REQUIRED_TRADING_DAYS # 检查空值数据 empty_data = data[data.isna().any(axis=1)] empty_count = len(empty_data) # 过滤出非空的收盘价数据 valid_close_data = data['close'].dropna() valid_count = len(valid_close_data) print(f" 📊 {contract}: 有效收盘价数据共{valid_count}天") adjustment_info = { 'contract': contract, 'commodity': commodity, 'empty_count': empty_count, 'valid_count': valid_count, 'required_days': required_days, 'needs_adjustment': False, 'suggested_switch_date': None } # 检查是否有空值数据且需要调整 if empty_count > 0: print(f" ⚠️ {contract}: 检测到{empty_count}条空值数据") if valid_count >= required_days: # 找到第N个有效收盘价的日期 nth_date = valid_close_data.index[required_days - 1] # 索引从0开始 nth_price = valid_close_data.iloc[required_days - 1] print(f" 📍 {contract}: 第{required_days}个有效收盘价日期为{nth_date.date()},价格{nth_price:.2f}") # 检查当前切换日期是否需要调整 if commodity in self.dominant_contract_history: for history_record in self.dominant_contract_history[commodity]: if (history_record['selected_contract'] == contract and not history_record.get('is_initial', False)): current_switch_date = history_record['date'] # 转换日期格式进行比较 if isinstance(current_switch_date, datetime.date): current_switch_datetime = datetime.datetime.combine(current_switch_date, datetime.time.min) else: current_switch_datetime = current_switch_date if nth_date > current_switch_datetime: print(f" ❌ {contract}: 切换日期过早(当前:{current_switch_date}),建议调整至{nth_date.date()}") adjustment_info.update({ 'needs_adjustment': True, 'suggested_switch_date': nth_date.date(), 'current_switch_date': current_switch_date }) else: print(f" ✅ {contract}: 切换日期{current_switch_date}合理,在第{required_days}个有效交易日之后") break else: print(f" ❌ {contract}: 有效交易日不足{required_days}天(仅{valid_count}天),不符合切换要求") adjustment_info['needs_adjustment'] = True else: # 没有空值数据,检查是否有足够的交易日 if valid_count >= required_days: nth_date = valid_close_data.index[required_days - 1] nth_price = valid_close_data.iloc[required_days - 1] print(f" ✅ {contract}: 第{required_days}个有效收盘价日期为{nth_date.date()},价格{nth_price:.2f}") else: print(f" ❌ {contract}: 有效交易日不足{required_days}天(仅{valid_count}天)") return adjustment_info def _apply_dynamic_adjustments(self): """应用动态调整建议,更新合约切换日期并重新获取数据""" if self.verbose_logging: print(f"\n=== 应用动态调整建议(共{len(self.adjustment_suggestions)}个) ===") adjustments_applied = [] for suggestion in self.adjustment_suggestions: if suggestion.get('suggested_switch_date'): commodity = suggestion['commodity'] contract = suggestion['contract'] new_switch_date = suggestion['suggested_switch_date'] print(f"📅 调整{commodity}的{contract}切换日期至{new_switch_date}") # 更新合约历史 if self._update_contract_switch_date(commodity, contract, new_switch_date): adjustments_applied.append(suggestion) # 如果有调整,重新获取相关的价格数据 if adjustments_applied: print(f"✅ 完成{len(adjustments_applied)}个调整,重新获取相关价格数据") self._refresh_price_data_for_adjustments(adjustments_applied) def _update_contract_switch_date(self, commodity, contract, new_switch_date): """更新指定合约的切换日期""" if commodity not in self.dominant_contract_history: return False # 查找并更新对应的历史记录 for history_record in self.dominant_contract_history[commodity]: if (history_record['selected_contract'] == contract and not history_record.get('is_initial', False)): old_date = history_record['date'] history_record['date'] = new_switch_date print(f" 📝 {contract}: 切换日期从{old_date}更新为{new_switch_date}") return True return False def _refresh_price_data_for_adjustments(self, adjustments): """为调整的合约重新获取价格数据""" affected_commodities = set() for adjustment in adjustments: commodity = adjustment['commodity'] affected_commodities.add(commodity) for commodity in affected_commodities: print(f"🔄 重新获取{commodity}的价格数据...") # 重新计算日期范围 contract_date_ranges = self._determine_contract_date_ranges(commodity) # 重新获取每个合约的数据 for contract, date_range in contract_date_ranges.items(): start_date, end_date = date_range try: # 获取价格数据(使用新的日期范围) data = get_price( contract, start_date=start_date, end_date=end_date, frequency='daily', fields=['open', 'close', 'high', 'low', 'volume'], skip_paused=False, panel=False ) if data is not None and len(data) > 0: self.price_data[commodity][contract] = data # 检查调整后的数据 empty_data = data[data.isna().any(axis=1)] empty_count = len(empty_data) print(f" ✅ {contract}: 重新获取{len(data)}条数据记录,空值{empty_count}条") if empty_count == 0: print(f" 🎉 {contract}: 空值数据已消除") except Exception as e: print(f" ❌ 重新获取{contract}数据时出错: {str(e)}") def simulate_with_contract_switching(self): """ 模拟带有主力合约切换逻辑的交易 """ if self.verbose_logging: print("\n=== 步骤3: 带合约切换的交易模拟 ===") # 按日期顺序处理所有交易日 current_date = self.start_date.date() end_date = self.end_date.date() while current_date <= end_date: # 跳过非交易日 if current_date.weekday() >= 5: current_date += datetime.timedelta(days=1) continue # 检查每个商品的主力合约切换 for commodity in self.core_commodities.keys(): self._check_and_handle_contract_switch(commodity, current_date) # 处理正常的交易逻辑 self._process_daily_trading(current_date) current_date += datetime.timedelta(days=1) # 在交易循环结束后,计算所有未平仓头寸的最终盈亏 self._calculate_final_positions_pnl() if self.verbose_logging: print("带合约切换的交易模拟完成") def _calculate_final_positions_pnl(self): """ 计算分析期结束时所有未平仓头寸的最终盈亏 将这些盈亏作为最终交易记录加入结果中 """ if self.verbose_logging: print("\n=== 计算最终持仓盈亏 ===") final_date = self.end_date.date() final_pnl_records = [] # 添加诊断信息 if self.verbose_logging: print(f"分析结束日期: {final_date}") print(f"活跃头寸概览:") for strategy_name in ['base_position', 'grid_trading', 'grid_hedge']: strategy_positions = self.active_positions.get(strategy_name, {}) total_positions = 0 open_positions = 0 for commodity, positions in strategy_positions.items(): commodity_total = len(positions) commodity_open = sum(1 for p in positions.values() if p['status'] == 'open') total_positions += commodity_total open_positions += commodity_open if commodity_total > 0: print(f" {strategy_name} - {commodity}: 总计 {commodity_total} 个头寸, 未平仓 {commodity_open} 个") # 详细列出所有头寸信息 print(f" 详细头寸列表:") for pos_id, pos_info in positions.items(): status = pos_info.get('status', 'Unknown') entry_price = pos_info.get('entry_price', 'N/A') contract = pos_info.get('contract', 'N/A') entry_date = pos_info.get('entry_date', 'N/A') quantity = pos_info.get('quantity', 'N/A') print(f" {pos_id}: 状态={status}, 合约={contract}, 开仓价格={entry_price}, 日期={entry_date}, 数量={quantity}") print(f" {strategy_name} 策略总计: {open_positions}/{total_positions} 个未平仓头寸") # 验证头寸计数的准确性 actual_count = len(positions) open_count_verify = len([p for p in positions.values() if p.get('status') == 'open']) if actual_count != commodity_total or open_count_verify != commodity_open: print(f" ⚠️ 计数不匹配!实际头寸数: {actual_count}, 预期: {commodity_total}; 实际未平仓: {open_count_verify}, 预期: {commodity_open}") # 检查是否有重复的开仓价格(同一合约同一状态) open_positions_by_price = {} for pos_id, pos_info in positions.items(): if pos_info.get('status') == 'open': price = pos_info.get('entry_price') contract = pos_info.get('contract') key = f"{contract}_{price}" if key not in open_positions_by_price: open_positions_by_price[key] = [] open_positions_by_price[key].append(pos_id) # for key, pos_ids in open_positions_by_price.items(): # if len(pos_ids) > 1: # print(f" ⚠️ 发现重复的未平仓头寸: {key} -> {pos_ids}") print(f" {strategy_name} 策略总计: {open_positions}/{total_positions} 个未平仓头寸") for strategy_name in ['base_position', 'grid_trading', 'grid_hedge']: strategy_positions = self.active_positions.get(strategy_name, {}) for commodity, positions in strategy_positions.items(): # 获取当前合约和最终价格 current_contract = self._get_current_contract(commodity, final_date) if not current_contract: if self.verbose_logging: print(f" 警告: 无法确定 {commodity} 在 {final_date} 的当前合约") continue final_price = self._get_price_on_date(commodity, current_contract, final_date, 'close') if final_price is None: if self.verbose_logging: print(f" 警告: 无法获取 {commodity} {current_contract} 在 {final_date} 的价格") continue if self.verbose_logging and len(positions) > 0: print(f" {commodity} {strategy_name}: 当前合约 {current_contract}, 结算价格 {final_price:.2f}") for position_id, position in positions.items(): if self.verbose_logging: print(f" 检查头寸 {position_id}: 状态={position['status']}, 合约={position['contract']}, 开仓价格={position.get('entry_price', 'N/A')}") if position['status'] == 'open' and position['contract'] == current_contract: if self.verbose_logging: print(f" 匹配头寸进行结算: {position_id}") print(f" 头寸详情: 开仓日期={position.get('entry_date', 'N/A')}, 开仓价格={position['entry_price']}, 数量={position.get('quantity', 'N/A')}") # 计算最终盈亏 if strategy_name == 'grid_hedge': # 网格对冲是做空 profit_loss = self._calculate_futures_pnl( position['entry_price'], final_price, position['quantity'], commodity, is_long=False ) else: # 基础头寸和网格交易都是做多 profit_loss = self._calculate_futures_pnl( position['entry_price'], final_price, position['quantity'], commodity, is_long=True ) profit_loss_pct = (final_price - position['entry_price']) / position['entry_price'] if strategy_name == 'grid_hedge': profit_loss_pct = (position['entry_price'] - final_price) / position['entry_price'] # 计算持有天数 entry_date = datetime.datetime.strptime(position['entry_date'], '%Y-%m-%d').date() days_held = (final_date - entry_date).days # 创建最终持仓盈亏记录 final_record = { 'commodity': commodity, 'contract': current_contract, 'strategy': strategy_name, 'entry_date': position['entry_date'], 'exit_date': final_date.strftime('%Y-%m-%d'), 'entry_price': position['entry_price'], 'exit_price': final_price, 'quantity': position['quantity'], 'profit_loss': profit_loss, 'profit_loss_pct': profit_loss_pct, 'days_held': days_held, 'exit_reason': 'final_settlement' } if self.verbose_logging: print(f" 创建最终结算记录: 头寸ID={position_id}, 开仓价格={position['entry_price']}, 结算价格={final_price:.2f}") final_pnl_records.append(final_record) # 将头寸标记为已平仓 self.active_positions[strategy_name][commodity][position_id]['status'] = 'final_settled' if self.verbose_logging: print(f" {commodity} {strategy_name} 最终结算: {position['entry_price']} -> {final_price:.2f}, 盈亏: {profit_loss:.2f}") # 将最终盈亏记录添加到交易结果中 for record in final_pnl_records: strategy_name = record['strategy'] self.trading_results[strategy_name].append(record) # 更新组合策略结果 combined_final_records = [] for record in final_pnl_records: if record['strategy'] in ['grid_trading', 'grid_hedge']: combined_record = record.copy() combined_record['strategy'] = f"combined_{record['strategy']}" combined_final_records.append(combined_record) self.trading_results['combined'].extend(combined_final_records) if self.verbose_logging: total_final_records = len(final_pnl_records) total_final_pnl = sum(record['profit_loss'] for record in final_pnl_records) print(f"最终持仓结算完成,共 {total_final_records} 个头寸,总未实现盈亏: {total_final_pnl:.2f}") # 显示所有最终结算记录的详情 if final_pnl_records: print(f"最终结算记录详情:") for i, record in enumerate(final_pnl_records, 1): print(f" {i}. {record['commodity']} {record['strategy']}: {record['entry_price']} -> {record['exit_price']:.2f}, 盈亏: {record['profit_loss']:.2f}, 合约: {record['contract']}") def _check_and_handle_contract_switch(self, commodity, current_date): """ 检查并处理主力合约切换 只有真正的合约切换才会触发平仓和重新建仓,初始设置不会 """ if commodity not in self.dominant_contract_history: return # 检查当天是否有合约变化 contract_changes = self.dominant_contract_history[commodity] for change in contract_changes: if change['date'] == current_date: # 检查是否为初始合约设置 if change.get('is_initial', False): # 初始合约设置,不需要平仓和重新建仓,只需要启动正常交易逻辑 if self.verbose_logging: print(f"\n{current_date}: {commodity} 初始合约设置为 {change['selected_contract']}") return # 真正的合约切换 old_contract = self._get_current_contract(commodity, current_date - datetime.timedelta(days=1)) new_contract = change['selected_contract'] if self.verbose_logging: print(f"\n{current_date}: {commodity} 合约切换 {old_contract} -> {new_contract}") # 平掉旧合约的所有头寸 self._close_all_positions_on_switch(commodity, old_contract, current_date) # 在新合约中重新建仓 self._reestablish_positions_in_new_contract(commodity, new_contract, current_date) break def _get_current_contract(self, commodity, date): """获取指定日期的当前合约""" if commodity not in self.dominant_contract_history: return None contract_changes = self.dominant_contract_history[commodity] current_contract = None for change in contract_changes: if change['date'] <= date: current_contract = change['selected_contract'] else: break return current_contract def _close_all_positions_on_switch(self, commodity, old_contract, switch_date): """ 在合约切换时平掉旧合约的所有头寸 """ if self.verbose_logging: print(f" 平掉 {old_contract} 的所有头寸") # 获取当日收盘价 close_price = self._get_price_on_date(commodity, old_contract, switch_date, 'close') if close_price is None: if self.verbose_logging: print(f" 无法获取 {switch_date} 的价格数据,跳过平仓") return # 平掉基础头寸交易的头寸 if commodity in self.active_positions['base_position']: positions = self.active_positions['base_position'][commodity].copy() for position_id, position in positions.items(): if position['contract'] == old_contract and position['status'] == 'open': # 使用正确的期货盈亏计算公式(基础头寸都是多头) profit_loss = self._calculate_futures_pnl( position['entry_price'], close_price, position['quantity'], commodity, is_long=True ) profit_loss_pct = (close_price - position['entry_price']) / position['entry_price'] trade_record = { 'commodity': commodity, 'contract': old_contract, 'strategy': 'base_position', 'entry_date': position['entry_date'], 'exit_date': switch_date.strftime('%Y-%m-%d'), 'entry_price': position['entry_price'], 'exit_price': close_price, 'quantity': position['quantity'], 'profit_loss': profit_loss, 'profit_loss_pct': profit_loss_pct, 'days_held': (switch_date - datetime.datetime.strptime(position['entry_date'], '%Y-%m-%d').date()).days, 'exit_reason': 'contract_switch' } self.trading_results['base_position'].append(trade_record) self.active_positions['base_position'][commodity][position_id]['status'] = 'closed' self.active_positions['base_position'][commodity][position_id]['close_reason'] = 'contract_switch' if self.verbose_logging: print(f" 基础头寸平仓: {position['entry_price']} -> {close_price:.2f}, 盈亏: {profit_loss:.2f}") # 平掉网格交易的头寸 if commodity in self.active_positions['grid_trading']: positions = self.active_positions['grid_trading'][commodity].copy() for position_id, position in positions.items(): if position['contract'] == old_contract and position['status'] == 'open': # 使用正确的期货盈亏计算公式(网格交易都是多头) profit_loss = self._calculate_futures_pnl( position['entry_price'], close_price, position['quantity'], commodity, is_long=True ) profit_loss_pct = (close_price - position['entry_price']) / position['entry_price'] trade_record = { 'commodity': commodity, 'contract': old_contract, 'strategy': 'grid_trading', 'entry_date': position['entry_date'], 'exit_date': switch_date.strftime('%Y-%m-%d'), 'entry_price': position['entry_price'], 'exit_price': close_price, 'quantity': position['quantity'], 'profit_loss': profit_loss, 'profit_loss_pct': profit_loss_pct, 'days_held': (switch_date - datetime.datetime.strptime(position['entry_date'], '%Y-%m-%d').date()).days, 'exit_reason': 'contract_switch' } self.trading_results['grid_trading'].append(trade_record) self.active_positions['grid_trading'][commodity][position_id]['status'] = 'closed' self.active_positions['grid_trading'][commodity][position_id]['close_reason'] = 'contract_switch' if self.verbose_logging: print(f" 网格头寸平仓: {position['entry_price']} -> {close_price:.2f}, 盈亏: {profit_loss:.2f}") # 平掉网格对冲的头寸 if commodity in self.active_positions['grid_hedge']: positions = self.active_positions['grid_hedge'][commodity].copy() for position_id, position in positions.items(): if position['contract'] == old_contract and position['status'] == 'open': # 使用正确的期货盈亏计算公式(网格对冲是做空) profit_loss = self._calculate_futures_pnl( position['entry_price'], close_price, position['quantity'], commodity, is_long=False ) profit_loss_pct = (position['entry_price'] - close_price) / position['entry_price'] trade_record = { 'commodity': commodity, 'contract': old_contract, 'strategy': 'grid_hedge', 'entry_date': position['entry_date'], 'exit_date': switch_date.strftime('%Y-%m-%d'), 'entry_price': position['entry_price'], 'exit_price': close_price, 'quantity': position['quantity'], 'profit_loss': profit_loss, 'profit_loss_pct': profit_loss_pct, 'days_held': (switch_date - datetime.datetime.strptime(position['entry_date'], '%Y-%m-%d').date()).days, 'exit_reason': 'contract_switch' } self.trading_results['grid_hedge'].append(trade_record) self.active_positions['grid_hedge'][commodity][position_id]['status'] = 'closed' self.active_positions['grid_hedge'][commodity][position_id]['close_reason'] = 'contract_switch' if self.verbose_logging: print(f" 对冲头寸平仓: {position['entry_price']} -> {close_price:.2f}, 盈亏: {profit_loss:.2f}") def _reestablish_positions_in_new_contract(self, commodity, new_contract, switch_date): """ 在新合约中重新建仓 """ if self.verbose_logging: print(f" 在 {new_contract} 中重新建仓") # 获取当日收盘价 close_price = self._get_price_on_date(commodity, new_contract, switch_date, 'close') if close_price is None: if self.verbose_logging: print(f" 无法获取 {switch_date} 的价格数据,跳过重新建仓") return # 基础头寸交易重新建仓 self._reestablish_base_positions(commodity, new_contract, close_price, switch_date) # 网格交易重新建仓 self._reestablish_grid_positions(commodity, new_contract, close_price, switch_date) # 网格对冲重新建仓 self._reestablish_hedge_positions(commodity, new_contract, close_price, switch_date) def _reestablish_base_positions(self, commodity, new_contract, close_price, switch_date): """重新建立基础头寸""" if commodity not in self.base_position_grid: return # 获取之前被平掉的基础头寸信息(按价格水平记录) closed_positions = {} # price_level -> quantity if commodity in self.active_positions['base_position']: for position in self.active_positions['base_position'][commodity].values(): if position['status'] == 'closed' and 'contract_switch' in position.get('close_reason', ''): # 只处理因合约切换而平掉的头寸 original_price = position.get('original_price_level', position['entry_price']) if original_price not in closed_positions: closed_positions[original_price] = 0 closed_positions[original_price] += position['quantity'] if self.verbose_logging: print(f" 发现需重建的基础头寸: {original_price}水平 {position['quantity']}手 (原合约: {position['contract']})") # 根据当前价格和原始价格水平重建头寸 price_grid = self.base_position_grid[commodity] reestablish_count = 0 for target_price, configured_quantity in price_grid.items(): # 只有当目标价格大于等于当前价格时才重建头寸 # 这确保了只重建"应该持有"的价格水平头寸 if target_price >= close_price: # 检查是否有该价格水平的平仓头寸需要重建 if target_price in closed_positions: quantity_to_reestablish = closed_positions[target_price] if self.verbose_logging: print(f" 重建条件检查: {target_price} >= {close_price:.2f} ✓ (重建原有平仓头寸)") else: # 当前价格低于目标价格,应该建立该价格水平的头寸 quantity_to_reestablish = configured_quantity if self.verbose_logging: print(f" 重建条件检查: {target_price} >= {close_price:.2f} ✓ (建立新头寸)") else: # 当前价格高于目标价格,不重建 if self.verbose_logging: print(f" 重建条件检查: {target_price} >= {close_price:.2f} ✗ (跳过重建)") continue if quantity_to_reestablish > 0: position_id = f"{commodity}_{new_contract}_{switch_date}_base_reestablish_{target_price}" if commodity not in self.active_positions['base_position']: self.active_positions['base_position'][commodity] = {} self.active_positions['base_position'][commodity][position_id] = { 'contract': new_contract, 'entry_date': switch_date.strftime('%Y-%m-%d'), 'entry_price': close_price, # 实际成交价格 'original_price_level': target_price, # 原始价格水平 'quantity': quantity_to_reestablish, 'status': 'open', 'exit_target': self.base_position_exit_price.get(commodity) } if self.verbose_logging: print(f" 创建重建头寸: {position_id}") print(f" 实际成交价格: {close_price}, 原始价格水平: {target_price}, 数量: {quantity_to_reestablish}") reestablish_count += quantity_to_reestablish if self.verbose_logging: print(f" 重建基础头寸 {target_price}水平: {quantity_to_reestablish} 手 @ {close_price:.2f}") if reestablish_count > 0 and self.verbose_logging: print(f" 基础头寸重建完成,总计: {reestablish_count} 手") def _reestablish_grid_positions(self, commodity, new_contract, close_price, switch_date): """重新建立网格交易头寸""" if commodity not in self.grid_trading_config: return config = self.grid_trading_config[commodity] grid_size = config['grid_size'] quantity_per_grid = config['quantity_per_grid'] exit_grid_size = config['exit_grid_size'] # 获取之前的网格头寸信息 previous_grid_levels = set() if commodity in self.active_positions['grid_trading']: for position in self.active_positions['grid_trading'][commodity].values(): if position['status'] == 'closed' and 'contract_switch' in position.get('close_reason', ''): # 只处理因合约切换而平掉的头寸 previous_grid_levels.add(position['entry_price']) if self.verbose_logging: print(f" 发现需重建的网格头寸: {position['entry_price']}水平 {position['quantity']}手") # 仅在原始网格开仓价格大于等于当前价格时重新建仓 # 这确保了只重建"应该持有"的网格水平头寸 reestablish_count = 0 for grid_level in previous_grid_levels: if grid_level >= close_price: if self.verbose_logging: print(f" 网格重建条件检查: {grid_level} >= {close_price:.2f} ✓ (重建网格头寸)") position_id = f"{commodity}_{new_contract}_{switch_date}_grid_{grid_level}" if commodity not in self.active_positions['grid_trading']: self.active_positions['grid_trading'][commodity] = {} self.active_positions['grid_trading'][commodity][position_id] = { 'contract': new_contract, 'entry_date': switch_date.strftime('%Y-%m-%d'), 'entry_price': close_price, 'original_grid_level': grid_level, 'quantity': quantity_per_grid, 'status': 'open', 'exit_target': grid_level + exit_grid_size # 保持原始退出价格 } reestablish_count += 1 else: if self.verbose_logging: print(f" 网格重建条件检查: {grid_level} >= {close_price:.2f} ✗ (跳过重建)") continue # 同时检查是否需要开立新的网格头寸(价格更低的情况) start_price = config['start_price'] current_level = start_price while current_level > close_price: current_level -= grid_size if current_level not in previous_grid_levels and current_level > 0: # 这是一个新的网格水平 position_id = f"{commodity}_{new_contract}_{switch_date}_grid_new_{current_level}" if commodity not in self.active_positions['grid_trading']: self.active_positions['grid_trading'][commodity] = {} self.active_positions['grid_trading'][commodity][position_id] = { 'contract': new_contract, 'entry_date': switch_date.strftime('%Y-%m-%d'), 'entry_price': close_price, 'original_grid_level': current_level, 'quantity': quantity_per_grid, 'status': 'open', 'exit_target': current_level + exit_grid_size } reestablish_count += 1 if self.verbose_logging and reestablish_count > 0: print(f" 重建网格头寸: {reestablish_count} 个网格 @ {close_price:.2f}") def _reestablish_hedge_positions(self, commodity, new_contract, close_price, switch_date): """重新建立网格对冲头寸""" if not self.grid_hedge_config['enable_hedge_strategy'] or commodity not in self.grid_trading_config: return config = self.grid_trading_config[commodity] grid_size = config['grid_size'] quantity_per_grid = config['quantity_per_grid'] stop_loss_pct = self.grid_hedge_config['stop_loss_percentage'] # 获取之前的对冲头寸信息 previous_hedge_levels = set() if commodity in self.active_positions['grid_hedge']: for position in self.active_positions['grid_hedge'][commodity].values(): if position['status'] == 'closed' and 'contract_switch' in position.get('close_reason', ''): # 只处理因合约切换而平掉的头寸 previous_hedge_levels.add(position['entry_price']) if self.verbose_logging: print(f" 发现需重建的对冲头寸: {position['entry_price']}水平 {position['quantity']}手") # 仅在当前价格高于原始对冲开仓价格时重新建仓 # 对冲是做空策略,只有价格高于对冲水平时才适合重建做空头寸 reestablish_count = 0 for hedge_level in previous_hedge_levels: if close_price > hedge_level: if self.verbose_logging: print(f" 对冲重建条件检查: {close_price:.2f} > {hedge_level} ✓ (重建对冲头寸)") position_id = f"{commodity}_{new_contract}_{switch_date}_hedge_{hedge_level}" if commodity not in self.active_positions['grid_hedge']: self.active_positions['grid_hedge'][commodity] = {} self.active_positions['grid_hedge'][commodity][position_id] = { 'contract': new_contract, 'entry_date': switch_date.strftime('%Y-%m-%d'), 'entry_price': close_price, 'original_grid_level': hedge_level, 'quantity': quantity_per_grid, 'status': 'open', 'profit_target': hedge_level - grid_size, # 保持原始退出价格 'stop_loss': close_price * (1 + stop_loss_pct) } reestablish_count += 1 else: if self.verbose_logging: print(f" 对冲重建条件检查: {close_price:.2f} > {hedge_level} ✗ (跳过重建)") continue if self.verbose_logging and reestablish_count > 0: print(f" 重建对冲头寸: {reestablish_count} 个网格 @ {close_price:.2f}") def _get_price_on_date(self, commodity, contract, date, price_type='close'): """获取指定日期和合约的价格(增强NaN问题诊断)""" if commodity not in self.price_data or contract not in self.price_data[commodity]: if self.verbose_logging: print(f" ❌ 价格数据不存在: {commodity} -> {contract}") return None price_data = self.price_data[commodity][contract] # 找到日期对应的价格 target_date = date if isinstance(date, datetime.date) else date.date() for idx, row in price_data.iterrows(): if idx.date() == target_date: price_value = row[price_type] if self.verbose_logging: print(f'{price_type}的价格是: {price_value}') # 如果价格为NaN,进行详细诊断 if pd.isna(price_value): self._diagnose_nan_price_issue(commodity, contract, target_date, price_type, row) return None else: return price_value # 如果没有找到精确日期,尝试查找最近的交易日 if self.verbose_logging: print(f" ⚠️ 未找到 {contract} 在 {target_date} 的数据,尝试查找最近交易日...") return self._get_nearest_trading_day_price(commodity, contract, target_date, price_type) def _diagnose_nan_price_issue(self, commodity, contract, date, price_type, row): """诊断NaN价格问题的根本原因""" if self.verbose_logging: print(f" 🔍 NaN价格问题诊断: {commodity} {contract} {date}") print(f" 目标价格类型: {price_type}") print(f" 该日所有价格数据: 开盘={row['open']}, 收盘={row['close']}, 最高={row['high']}, 最低={row['low']}, 成交量={row['volume']}") # 检查是否所有价格都是NaN price_fields = ['open', 'close', 'high', 'low'] nan_fields = [field for field in price_fields if pd.isna(row[field])] valid_fields = [field for field in price_fields if not pd.isna(row[field])] if len(nan_fields) == len(price_fields): print(f" ❌ 所有价格字段都为NaN - 可能该合约在此日期未开始交易") else: print(f" ⚠️ 部分价格字段为NaN: {nan_fields}") print(f" ✅ 有效价格字段: {valid_fields}") # 如果有有效价格,尝试使用替代方案 if valid_fields: fallback_price = row[valid_fields[0]] print(f" 💡 建议使用替代价格: {valid_fields[0]} = {fallback_price}") # 检查成交量是否为0或NaN if pd.isna(row['volume']) or row['volume'] == 0: print(f" ⚠️ 成交量异常: {row['volume']} - 可能该合约在此日期无交易活动") # 检查是否是合约刚上市的情况 price_data = self.price_data[commodity][contract] first_valid_date = None for idx, data_row in price_data.iterrows(): if not pd.isna(data_row['close']): first_valid_date = idx.date() break if first_valid_date and date < first_valid_date: print(f" 🔍 合约首次有效交易日: {first_valid_date} (查询日期 {date} 早于首次交易日)") print(f" 💡 建议: 合约 {contract} 在 {date} 可能尚未开始交易") # 提供解决建议 print(f" 📋 解决建议:") print(f" 1. 检查合约 {contract} 的上市日期") print(f" 2. 验证合约代码是否正确") print(f" 3. 考虑调整合约切换日期") if valid_fields: print(f" 4. 临时使用替代价格: {valid_fields[0]} = {row[valid_fields[0]]}") def _get_nearest_trading_day_price(self, commodity, contract, target_date, price_type): """获取最近交易日的价格""" price_data = self.price_data[commodity][contract] # 查找最近的交易日(前后5天范围内) search_range = 5 for offset in range(1, search_range + 1): # 先查找之后的日期 future_date = target_date + datetime.timedelta(days=offset) for idx, row in price_data.iterrows(): if idx.date() == future_date: price_value = row[price_type] if not pd.isna(price_value): if self.verbose_logging: print(f" ✅ 使用后续交易日 {future_date} 的价格: {price_value}") return price_value break # 再查找之前的日期 past_date = target_date - datetime.timedelta(days=offset) for idx, row in price_data.iterrows(): if idx.date() == past_date: price_value = row[price_type] if not pd.isna(price_value): if self.verbose_logging: print(f" ✅ 使用前期交易日 {past_date} 的价格: {price_value}") return price_value break if self.verbose_logging: print(f" ❌ 在 {search_range} 天范围内未找到有效的 {price_type} 价格") return None def _process_daily_trading(self, current_date): """处理每日的正常交易逻辑""" for commodity in self.core_commodities.keys(): current_contract = self._get_current_contract(commodity, current_date) if not current_contract: continue # 获取当日价格数据 daily_prices = self._get_daily_prices(commodity, current_contract, current_date) if not daily_prices: continue # 检查基础头寸入场和退出机会 self._check_base_position_trading(commodity, current_contract, current_date, daily_prices) # 检查网格交易入场和退出机会 self._check_grid_trading(commodity, current_contract, current_date, daily_prices) # 检查网格对冲入场和退出机会 self._check_grid_hedge_trading(commodity, current_contract, current_date, daily_prices) def _get_daily_prices(self, commodity, contract, date): """获取指定日期的价格数据""" if commodity not in self.price_data or contract not in self.price_data[commodity]: return None price_data = self.price_data[commodity][contract] for idx, row in price_data.iterrows(): if idx.date() == date: return { 'open': row['open'], 'close': row['close'], 'high': row['high'], 'low': row['low'], 'volume': row['volume'] } return None def _check_base_position_trading(self, commodity, contract, current_date, daily_prices): """检查基础头寸交易机会""" if commodity not in self.base_position_grid: return # 检查入场机会 price_grid = self.base_position_grid[commodity] for entry_price, quantity in price_grid.items(): # 检查是否已经有这个价格水平的头寸(包括重建的头寸) position_exists = False if commodity in self.active_positions['base_position']: for position in self.active_positions['base_position'][commodity].values(): if (position['contract'] == contract and position['status'] == 'open'): # 检查原始价格水平或入场价格 position_price_level = position.get('original_price_level', position['entry_price']) if position_price_level == entry_price: position_exists = True break if not position_exists and daily_prices['low'] <= entry_price <= daily_prices['high']: # 建立头寸 position_id = f"{commodity}_{contract}_{current_date}_base_{entry_price}" if commodity not in self.active_positions['base_position']: self.active_positions['base_position'][commodity] = {} self.active_positions['base_position'][commodity][position_id] = { 'contract': contract, 'entry_date': current_date.strftime('%Y-%m-%d'), 'entry_price': entry_price, 'original_price_level': entry_price, # 记录原始价格水平 'quantity': quantity, 'status': 'open', 'exit_target': self.base_position_exit_price.get(commodity) } if self.verbose_logging: print(f" {current_date}: {commodity} 基础头寸入场 @ {entry_price}, 数量: {quantity},当天最低价格为{daily_prices['low']},最高价格为{daily_prices['high']}") # print(f" 创建头寸: {position_id}") # print(f" 当日价格范围: 最低 {daily_prices['low']}, 最高 {daily_prices['high']}") # 检查退出机会 exit_target = self.base_position_exit_price.get(commodity) if exit_target and commodity in self.active_positions['base_position']: positions_to_close = [] for position_id, position in self.active_positions['base_position'][commodity].items(): if position['contract'] == contract and position['status'] == 'open' and daily_prices['high'] >= exit_target: positions_to_close.append(position_id) for position_id in positions_to_close: position = self.active_positions['base_position'][commodity][position_id] exit_price = min(exit_target, daily_prices['high']) # 使用正确的期货盈亏计算公式(基础头寸都是多头) profit_loss = self._calculate_futures_pnl( position['entry_price'], exit_price, position['quantity'], commodity, is_long=True ) profit_loss_pct = (exit_price - position['entry_price']) / position['entry_price'] trade_record = { 'commodity': commodity, 'contract': contract, 'strategy': 'base_position', 'entry_date': position['entry_date'], 'exit_date': current_date.strftime('%Y-%m-%d'), 'entry_price': position['entry_price'], 'exit_price': exit_price, 'quantity': position['quantity'], 'profit_loss': profit_loss, 'profit_loss_pct': profit_loss_pct, 'days_held': (current_date - datetime.datetime.strptime(position['entry_date'], '%Y-%m-%d').date()).days, 'exit_reason': 'target_reached' } self.trading_results['base_position'].append(trade_record) self.active_positions['base_position'][commodity][position_id]['status'] = 'closed' if self.verbose_logging: print(f" {current_date}: {commodity} 基础头寸退出 {position['entry_price']} -> {exit_price:.2f}, 盈亏: {profit_loss:.2f}") def _check_grid_trading(self, commodity, contract, current_date, daily_prices): """检查网格交易机会""" if commodity not in self.grid_trading_config: return config = self.grid_trading_config[commodity] start_price = config['start_price'] grid_size = config['grid_size'] quantity_per_grid = config['quantity_per_grid'] exit_grid_size = config['exit_grid_size'] # 检查入场机会 current_level = start_price while current_level > daily_prices['low'] and current_level > 0: position_exists = False if commodity in self.active_positions['grid_trading']: for position in self.active_positions['grid_trading'][commodity].values(): if (position['contract'] == contract and position.get('original_grid_level', position['entry_price']) == current_level and position['status'] == 'open'): position_exists = True break if not position_exists and daily_prices['low'] <= current_level <= daily_prices['high']: position_id = f"{commodity}_{contract}_{current_date}_grid_{current_level}" if commodity not in self.active_positions['grid_trading']: self.active_positions['grid_trading'][commodity] = {} self.active_positions['grid_trading'][commodity][position_id] = { 'contract': contract, 'entry_date': current_date.strftime('%Y-%m-%d'), 'entry_price': current_level, 'original_grid_level': current_level, 'quantity': quantity_per_grid, 'status': 'open', 'exit_target': current_level + exit_grid_size } if self.verbose_logging: print(f" {current_date}: {commodity} 网格入场 @ {current_level},数量:{quantity_per_grid},当天最低价为{daily_prices['low']},最高价为{daily_prices['high']}") current_level -= grid_size # 检查退出机会 if commodity in self.active_positions['grid_trading']: positions_to_close = [] for position_id, position in self.active_positions['grid_trading'][commodity].items(): if position['contract'] == contract and position['status'] == 'open': if daily_prices['high'] >= position['exit_target']: positions_to_close.append(position_id) for position_id in positions_to_close: position = self.active_positions['grid_trading'][commodity][position_id] exit_price = position['exit_target'] # 使用正确的期货盈亏计算公式(网格交易都是多头) profit_loss = self._calculate_futures_pnl( position['entry_price'], exit_price, position['quantity'], commodity, is_long=True ) profit_loss_pct = (exit_price - position['entry_price']) / position['entry_price'] trade_record = { 'commodity': commodity, 'contract': contract, 'strategy': 'grid_trading', 'entry_date': position['entry_date'], 'exit_date': current_date.strftime('%Y-%m-%d'), 'entry_price': position['entry_price'], 'exit_price': exit_price, 'quantity': position['quantity'], 'profit_loss': profit_loss, 'profit_loss_pct': profit_loss_pct, 'days_held': (current_date - datetime.datetime.strptime(position['entry_date'], '%Y-%m-%d').date()).days, 'exit_reason': 'target_reached' } self.trading_results['grid_trading'].append(trade_record) self.active_positions['grid_trading'][commodity][position_id]['status'] = 'closed' if self.verbose_logging: print(f" {current_date}: {commodity} 网格退出 {position['entry_price']} -> {exit_price:.2f}, 盈亏: {profit_loss:.2f} ({profit_loss_pct:.2%})") def _check_grid_hedge_trading(self, commodity, contract, current_date, daily_prices): """检查网格对冲交易机会""" if not self.grid_hedge_config['enable_hedge_strategy'] or commodity not in self.grid_trading_config: return config = self.grid_trading_config[commodity] start_price = config['start_price'] grid_size = config['grid_size'] quantity_per_grid = config['quantity_per_grid'] stop_loss_pct = self.grid_hedge_config['stop_loss_percentage'] # 检查入场机会(对冲是在start_price之上) current_level = start_price + grid_size while current_level < daily_prices['high']: position_exists = False if commodity in self.active_positions['grid_hedge']: for position in self.active_positions['grid_hedge'][commodity].values(): if (position['contract'] == contract and position.get('original_grid_level', position['entry_price']) == current_level and position['status'] == 'open'): position_exists = True break if not position_exists and daily_prices['low'] <= current_level <= daily_prices['high']: position_id = f"{commodity}_{contract}_{current_date}_hedge_{current_level}" if commodity not in self.active_positions['grid_hedge']: self.active_positions['grid_hedge'][commodity] = {} self.active_positions['grid_hedge'][commodity][position_id] = { 'contract': contract, 'entry_date': current_date.strftime('%Y-%m-%d'), 'entry_price': current_level, 'original_grid_level': current_level, 'quantity': quantity_per_grid, 'status': 'open', 'profit_target': current_level - grid_size, 'stop_loss': current_level * (1 + stop_loss_pct) } if self.verbose_logging: print(f" {current_date}: {commodity} 对冲入场 @ {current_level},数量:{quantity_per_grid},当天最低价为{daily_prices['low']},最高价为{daily_prices['high']}") current_level += grid_size # 检查退出机会 if commodity in self.active_positions['grid_hedge']: positions_to_close = [] for position_id, position in self.active_positions['grid_hedge'][commodity].items(): if position['contract'] == contract and position['status'] == 'open': if daily_prices['low'] <= position['profit_target']: positions_to_close.append((position_id, position['profit_target'], 'profit_target')) elif daily_prices['high'] >= position['stop_loss']: positions_to_close.append((position_id, position['stop_loss'], 'stop_loss')) for position_id, exit_price, exit_reason in positions_to_close: position = self.active_positions['grid_hedge'][commodity][position_id] # 使用正确的期货盈亏计算公式(网格对冲是做空) profit_loss = self._calculate_futures_pnl( position['entry_price'], exit_price, position['quantity'], commodity, is_long=False ) profit_loss_pct = (position['entry_price'] - exit_price) / position['entry_price'] trade_record = { 'commodity': commodity, 'contract': contract, 'strategy': 'grid_hedge', 'entry_date': position['entry_date'], 'exit_date': current_date.strftime('%Y-%m-%d'), 'entry_price': position['entry_price'], 'exit_price': exit_price, 'quantity': position['quantity'], 'profit_loss': profit_loss, 'profit_loss_pct': profit_loss_pct, 'days_held': (current_date - datetime.datetime.strptime(position['entry_date'], '%Y-%m-%d').date()).days, 'exit_reason': exit_reason } self.trading_results['grid_hedge'].append(trade_record) self.active_positions['grid_hedge'][commodity][position_id]['status'] = 'closed' if self.verbose_logging: print(f" {current_date}: {commodity} 对冲退出 {position['entry_price']} -> {exit_price:.2f} ({exit_reason}),数量:{position['quantity']},当天最低价为{daily_prices['low']},最高价为{daily_prices['high']}") def simulate_base_position_trading(self): """ 模拟基础头寸交易 为每种商品配置价格-数量网格,以指定价格水平和数量开立多头头寸 所有头寸使用统一的退出价格(无止损) """ if self.verbose_logging: print("\n=== 步骤3: 基础头寸交易模拟 ===") base_position_results = [] for commodity in self.price_data.keys(): if commodity not in self.base_position_grid: continue price_grid = self.base_position_grid[commodity] exit_price = self.base_position_exit_price[commodity] price_data = self.price_data[commodity]['data'] contract = self.price_data[commodity]['contract'] if self.verbose_logging: print(f"\n分析 {commodity} ({contract}) 基础头寸交易") print(f"价格网格: {price_grid}") print(f"退出价格: {exit_price}") # 遍历每个价格水平 for entry_price, quantity in price_grid.items(): # 查找触发入场的日期 entry_dates = [] for date, row in price_data.iterrows(): if row['low'] <= entry_price <= row['high']: entry_dates.append(date) if not entry_dates: continue # 使用第一个触发日期作为入场点 entry_date = entry_dates[0] # 查找退出点 exit_date = None exit_price_actual = exit_price # 在入场后查找价格达到退出价格的日期 for date, row in price_data.iterrows(): if date > entry_date and row['high'] >= exit_price: exit_date = date exit_price_actual = min(exit_price, row['high']) break # 如果没有达到退出价格,使用最后一日的收盘价退出 if exit_date is None: exit_date = price_data.index[-1] exit_price_actual = price_data.iloc[-1]['close'] # 计算盈亏 profit_loss = (exit_price_actual - entry_price) * quantity profit_loss_pct = (exit_price_actual - entry_price) / entry_price trade_record = { 'commodity': commodity, 'contract': contract, 'strategy': 'base_position', 'entry_date': entry_date.strftime('%Y-%m-%d'), 'exit_date': exit_date.strftime('%Y-%m-%d'), 'entry_price': entry_price, 'exit_price': exit_price_actual, 'quantity': quantity, 'profit_loss': profit_loss, 'profit_loss_pct': profit_loss_pct, 'days_held': (exit_date - entry_date).days } base_position_results.append(trade_record) if self.verbose_logging: print(f" 入场: {entry_date.strftime('%Y-%m-%d')} @ {entry_price}, 数量: {quantity}") print(f" 出场: {exit_date.strftime('%Y-%m-%d')} @ {exit_price_actual:.2f}") print(f" 盈亏: {profit_loss:.2f} ({profit_loss_pct:.2%})") self.trading_results['base_position'] = base_position_results if self.verbose_logging: print(f"\n基础头寸交易模拟完成,共{len(base_position_results)}笔交易") return base_position_results def simulate_grid_trading(self): """ 模拟网格交易策略 从start_price开始,每次价格下降grid_size时买入quantity_per_grid 当价格从入场价格上涨exit_grid_size时退出 """ if self.verbose_logging: print("\n=== 步骤4: 网格交易策略模拟 ===") grid_trading_results = [] for commodity in self.price_data.keys(): if commodity not in self.grid_trading_config: continue config = self.grid_trading_config[commodity] start_price = config['start_price'] grid_size = config['grid_size'] quantity_per_grid = config['quantity_per_grid'] exit_grid_size = config['exit_grid_size'] price_data = self.price_data[commodity]['data'] contract = self.price_data[commodity]['contract'] if self.verbose_logging: print(f"\n分析 {commodity} ({contract}) 网格交易") print(f"起始价格: {start_price}, 网格大小: {grid_size}") print(f"每网格数量: {quantity_per_grid}, 退出网格大小: {exit_grid_size}") # 生成网格价格水平 grid_levels = [] current_level = start_price min_price = price_data['low'].min() while current_level > min_price: grid_levels.append(current_level) current_level -= grid_size # 模拟每个网格水平的交易 for entry_price in grid_levels: exit_price = entry_price + exit_grid_size # 查找入场机会 entry_date = None for date, row in price_data.iterrows(): if row['low'] <= entry_price <= row['high']: entry_date = date break if entry_date is None: continue # 查找退出机会 exit_date = None exit_price_actual = exit_price for date, row in price_data.iterrows(): if date > entry_date and row['high'] >= exit_price: exit_date = date exit_price_actual = exit_price break # 如果没有达到退出价格,使用最后一日收盘价 if exit_date is None: exit_date = price_data.index[-1] exit_price_actual = price_data.iloc[-1]['close'] # 计算盈亏 profit_loss = (exit_price_actual - entry_price) * quantity_per_grid profit_loss_pct = (exit_price_actual - entry_price) / entry_price trade_record = { 'commodity': commodity, 'contract': contract, 'strategy': 'grid_trading', 'entry_date': entry_date.strftime('%Y-%m-%d'), 'exit_date': exit_date.strftime('%Y-%m-%d'), 'entry_price': entry_price, 'exit_price': exit_price_actual, 'quantity': quantity_per_grid, 'profit_loss': profit_loss, 'profit_loss_pct': profit_loss_pct, 'days_held': (exit_date - entry_date).days } grid_trading_results.append(trade_record) if self.verbose_logging: print(f" 网格 {entry_price}: {entry_date.strftime('%Y-%m-%d')} -> {exit_date.strftime('%Y-%m-%d')}") print(f" 盈亏: {profit_loss:.2f} ({profit_loss_pct:.2%})") self.trading_results['grid_trading'] = grid_trading_results if self.verbose_logging: print(f"\n网格交易模拟完成,共{len(grid_trading_results)}笔交易") return grid_trading_results def simulate_grid_hedge_strategy(self): """ 模拟网格对冲策略 类似于网格交易,但有额外的止损百分比 """ if not self.grid_hedge_config['enable_hedge_strategy']: if self.verbose_logging: print("\n网格对冲策略已禁用,跳过模拟") return [] if self.verbose_logging: print("\n=== 步骤5: 网格对冲策略模拟 ===") grid_hedge_results = [] stop_loss_pct = self.grid_hedge_config['stop_loss_percentage'] for commodity in self.price_data.keys(): if commodity not in self.grid_trading_config: continue config = self.grid_trading_config[commodity] start_price = config['start_price'] grid_size = config['grid_size'] quantity_per_grid = config['quantity_per_grid'] exit_grid_size = config['exit_grid_size'] price_data = self.price_data[commodity]['data'] contract = self.price_data[commodity]['contract'] if self.verbose_logging: print(f"\n分析 {commodity} ({contract}) 网格对冲策略") print(f"起始价格: {start_price}, 网格大小: {grid_size}") print(f"止损百分比: {stop_loss_pct:.1%}") # 生成网格价格水平(对冲策略:在start_price + grid_size入场) grid_levels = [] current_level = start_price + grid_size max_price = price_data['high'].max() while current_level < max_price: grid_levels.append(current_level) current_level += grid_size # 模拟每个网格水平的对冲交易 for entry_price in grid_levels: profit_target = entry_price - grid_size # 获利了结 stop_loss = entry_price * (1 + stop_loss_pct) # 止损 # 查找入场机会 entry_date = None for date, row in price_data.iterrows(): if row['low'] <= entry_price <= row['high']: entry_date = date break if entry_date is None: continue # 查找退出机会(获利或止损) exit_date = None exit_price_actual = None exit_reason = 'hold' for date, row in price_data.iterrows(): if date <= entry_date: continue # 检查止损 if row['high'] >= stop_loss: exit_date = date exit_price_actual = stop_loss exit_reason = 'stop_loss' break # 检查获利了结 if row['low'] <= profit_target: exit_date = date exit_price_actual = profit_target exit_reason = 'profit_target' break # 如果没有触发退出条件,使用最后一日收盘价 if exit_date is None: exit_date = price_data.index[-1] exit_price_actual = price_data.iloc[-1]['close'] exit_reason = 'end_of_data' # 计算盈亏(做空,所以是entry_price - exit_price) profit_loss = (entry_price - exit_price_actual) * quantity_per_grid profit_loss_pct = (entry_price - exit_price_actual) / entry_price trade_record = { 'commodity': commodity, 'contract': contract, 'strategy': 'grid_hedge', 'entry_date': entry_date.strftime('%Y-%m-%d'), 'exit_date': exit_date.strftime('%Y-%m-%d'), 'entry_price': entry_price, 'exit_price': exit_price_actual, 'quantity': quantity_per_grid, 'profit_loss': profit_loss, 'profit_loss_pct': profit_loss_pct, 'days_held': (exit_date - entry_date).days, 'exit_reason': exit_reason } grid_hedge_results.append(trade_record) if self.verbose_logging: print(f" 对冲网格 {entry_price}: {entry_date.strftime('%Y-%m-%d')} -> {exit_date.strftime('%Y-%m-%d')}") print(f" 退出原因: {exit_reason}, 盈亏: {profit_loss:.2f} ({profit_loss_pct:.2%})") self.trading_results['grid_hedge'] = grid_hedge_results if self.verbose_logging: print(f"\n网格对冲策略模拟完成,共{len(grid_hedge_results)}笔交易") return grid_hedge_results def simulate_combined_strategy(self): """ 模拟组合策略:网格交易 + 网格对冲策略 """ if self.verbose_logging: print("\n=== 步骤5: 组合策略模拟 ===") # 简单将网格交易和网格对冲的结果合并 grid_results = self.trading_results['grid_trading'] hedge_results = self.trading_results['grid_hedge'] combined_results = [] # 添加网格交易结果 for record in grid_results: combined_record = record.copy() combined_record['strategy'] = 'combined_grid' combined_results.append(combined_record) # 添加网格对冲结果 for record in hedge_results: combined_record = record.copy() combined_record['strategy'] = 'combined_hedge' combined_results.append(combined_record) self.trading_results['combined'] = combined_results if self.verbose_logging: print(f"组合策略模拟完成,共{len(combined_results)}笔交易") print(f" 其中网格交易: {len(grid_results)}笔") print(f" 其中网格对冲: {len(hedge_results)}笔") return combined_results def calculate_performance_statistics(self): """ 计算多品种多级聚合的性能统计 包括品种-策略级、品种级、策略级和总体级统计 """ if self.verbose_logging: print("\n=== 步骤6: 多级性能统计分析 ===") # 多级统计结构 performance_stats = { 'by_commodity_strategy': {}, # 品种-策略级统计 'by_commodity': {}, # 品种级汇总 'by_strategy': {}, # 策略级汇总 'overall': {} # 总体汇总 } if self.verbose_logging: print("\n--- 第一级:品种-策略级统计 ---") # 第一步:计算品种-策略级统计 for strategy_name, results in self.trading_results.items(): if strategy_name not in performance_stats['by_commodity_strategy']: performance_stats['by_commodity_strategy'][strategy_name] = {} # 按品种分组交易结果 commodity_results = {} for result in results: commodity = result['commodity'] if commodity not in commodity_results: commodity_results[commodity] = [] commodity_results[commodity].append(result) # 为每个品种计算统计 for commodity in self.core_commodities.keys(): comm_results = commodity_results.get(commodity, []) stats = self._calculate_single_strategy_stats(strategy_name, comm_results, commodity) performance_stats['by_commodity_strategy'][strategy_name][commodity] = stats if self.verbose_logging: print(f"\n{commodity}-{strategy_name} 策略统计:") self._print_strategy_stats(stats) if self.verbose_logging: print("\n--- 第二级:品种级汇总统计 ---") # 第二步:计算品种级汇总统计 for commodity in self.core_commodities.keys(): commodity_stats = self._calculate_commodity_summary(commodity, performance_stats['by_commodity_strategy']) performance_stats['by_commodity'][commodity] = commodity_stats if self.verbose_logging: print(f"\n{commodity} 品种汇总统计:") self._print_strategy_stats(commodity_stats) if self.verbose_logging: print("\n--- 第三级:策略级汇总统计 ---") # 第三步:计算策略级汇总统计 for strategy_name in self.trading_results.keys(): strategy_stats = self._calculate_strategy_summary(strategy_name, performance_stats['by_commodity_strategy']) performance_stats['by_strategy'][strategy_name] = strategy_stats if self.verbose_logging: print(f"\n{strategy_name} 策略汇总统计:") self._print_strategy_stats(strategy_stats) if self.verbose_logging: print("\n--- 第四级:整体汇总统计 ---") # 第四步:计算总体统计 overall_stats = self._calculate_overall_summary(performance_stats['by_strategy']) performance_stats['overall'] = overall_stats if self.verbose_logging: print(f"\n整体汇总统计:") self._print_strategy_stats(overall_stats) return performance_stats def _calculate_single_strategy_stats(self, strategy_name, results, commodity): """计算单个策略在特定品种下的统计数据""" # 计算已平仓交易的盈亏 closed_profit_loss = sum(r['profit_loss'] for r in results) if results else 0.0 # 计算未平仓头寸的未实现盈亏(特定品种) unrealized_profit_loss = self._calculate_unrealized_pnl_for_commodity(strategy_name, commodity) # 总盈亏 = 已实现盈亏 + 未实现盈亏 total_profit_loss = closed_profit_loss + unrealized_profit_loss if not results and unrealized_profit_loss == 0: return { 'total_trades': 0, 'open_positions': 0, 'profitable_trades': 0, 'losing_trades': 0, 'win_rate': 0.0, 'closed_profit_loss': 0.0, 'unrealized_profit_loss': 0.0, 'total_profit_loss': 0.0, 'avg_profit_loss': 0.0, 'avg_profit_loss_pct': 0.0, 'max_profit': 0.0, 'max_loss': 0.0, 'avg_holding_days': 0.0, 'profit_factor': 0.0 } # 基本统计 total_trades = len(results) open_positions = self._count_open_positions_for_commodity(strategy_name, commodity) profitable_trades = sum(1 for r in results if r['profit_loss'] > 0) losing_trades = sum(1 for r in results if r['profit_loss'] < 0) win_rate = profitable_trades / total_trades if total_trades > 0 else 0 # 平均盈亏(基于已平仓交易) avg_profit_loss = closed_profit_loss / total_trades if total_trades > 0 else 0 avg_profit_loss_pct = sum(r['profit_loss_pct'] for r in results) / total_trades if total_trades > 0 else 0 # 最大盈亏(基于已平仓交易) profit_losses = [r['profit_loss'] for r in results] max_profit = max(profit_losses) if profit_losses else 0 max_loss = min(profit_losses) if profit_losses else 0 # 平均持有天数 avg_holding_days = sum(r['days_held'] for r in results) / total_trades if total_trades > 0 else 0 # 盈亏比(基于已平仓交易) total_profits = sum(r['profit_loss'] for r in results if r['profit_loss'] > 0) total_losses = abs(sum(r['profit_loss'] for r in results if r['profit_loss'] < 0)) profit_factor = total_profits / total_losses if total_losses > 0 else float('inf') if total_profits > 0 else 0 return { 'total_trades': total_trades, 'open_positions': open_positions, 'profitable_trades': profitable_trades, 'losing_trades': losing_trades, 'win_rate': win_rate, 'closed_profit_loss': closed_profit_loss, 'unrealized_profit_loss': unrealized_profit_loss, 'total_profit_loss': total_profit_loss, 'avg_profit_loss': avg_profit_loss, 'avg_profit_loss_pct': avg_profit_loss_pct, 'max_profit': max_profit, 'max_loss': max_loss, 'avg_holding_days': avg_holding_days, 'profit_factor': profit_factor } def _print_strategy_stats(self, stats): """打印策略统计信息""" print(f" 已平仓交易: {stats['total_trades']}") print(f" 未平仓头寸: {stats['open_positions']}") print(f" 盈利交易: {stats['profitable_trades']}") print(f" 亏损交易: {stats['losing_trades']}") print(f" 胜率: {stats['win_rate']:.2%}") print(f" 已实现盈亏: {stats['closed_profit_loss']:.2f}") print(f" 未实现盈亏: {stats['unrealized_profit_loss']:.2f}") print(f" 总盈亏: {stats['total_profit_loss']:.2f}") print(f" 平均盈亏: {stats['avg_profit_loss']:.2f}") print(f" 平均盈亏率: {stats['avg_profit_loss_pct']:.2%}") print(f" 最大盈利: {stats['max_profit']:.2f}") print(f" 最大亏损: {stats['max_loss']:.2f}") print(f" 平均持有天数: {stats['avg_holding_days']:.1f}") profit_factor_str = f"{stats['profit_factor']:.2f}" if stats['profit_factor'] != float('inf') else "∞" print(f" 盈亏比: {profit_factor_str}") def _calculate_commodity_summary(self, commodity, by_commodity_strategy): """计算品种级汇总统计""" total_stats = { 'total_trades': 0, 'open_positions': 0, 'profitable_trades': 0, 'losing_trades': 0, 'closed_profit_loss': 0.0, 'unrealized_profit_loss': 0.0, 'total_profit_loss': 0.0, 'max_profit': 0.0, 'max_loss': 0.0, 'total_holding_days': 0.0, 'total_profits': 0.0, 'total_losses': 0.0, 'all_pct': [] } for strategy_name, commodity_stats in by_commodity_strategy.items(): if commodity in commodity_stats: stats = commodity_stats[commodity] total_stats['total_trades'] += stats['total_trades'] total_stats['open_positions'] += stats['open_positions'] total_stats['profitable_trades'] += stats['profitable_trades'] total_stats['losing_trades'] += stats['losing_trades'] total_stats['closed_profit_loss'] += stats['closed_profit_loss'] total_stats['unrealized_profit_loss'] += stats['unrealized_profit_loss'] total_stats['total_profit_loss'] += stats['total_profit_loss'] total_stats['max_profit'] = max(total_stats['max_profit'], stats['max_profit']) total_stats['max_loss'] = min(total_stats['max_loss'], stats['max_loss']) total_stats['total_holding_days'] += stats['avg_holding_days'] * stats['total_trades'] if stats['profit_factor'] != float('inf'): profits = stats['closed_profit_loss'] if stats['closed_profit_loss'] > 0 else 0 losses = abs(stats['closed_profit_loss']) if stats['closed_profit_loss'] < 0 else 0 total_stats['total_profits'] += profits total_stats['total_losses'] += losses # 收集所有百分比数据 if stats['total_trades'] > 0: total_stats['all_pct'].extend([stats['avg_profit_loss_pct']] * stats['total_trades']) # 计算汇总指标 win_rate = total_stats['profitable_trades'] / total_stats['total_trades'] if total_stats['total_trades'] > 0 else 0 avg_profit_loss = total_stats['closed_profit_loss'] / total_stats['total_trades'] if total_stats['total_trades'] > 0 else 0 avg_profit_loss_pct = sum(total_stats['all_pct']) / len(total_stats['all_pct']) if total_stats['all_pct'] else 0 avg_holding_days = total_stats['total_holding_days'] / total_stats['total_trades'] if total_stats['total_trades'] > 0 else 0 profit_factor = total_stats['total_profits'] / total_stats['total_losses'] if total_stats['total_losses'] > 0 else float('inf') if total_stats['total_profits'] > 0 else 0 return { 'total_trades': total_stats['total_trades'], 'open_positions': total_stats['open_positions'], 'profitable_trades': total_stats['profitable_trades'], 'losing_trades': total_stats['losing_trades'], 'win_rate': win_rate, 'closed_profit_loss': total_stats['closed_profit_loss'], 'unrealized_profit_loss': total_stats['unrealized_profit_loss'], 'total_profit_loss': total_stats['total_profit_loss'], 'avg_profit_loss': avg_profit_loss, 'avg_profit_loss_pct': avg_profit_loss_pct, 'max_profit': total_stats['max_profit'], 'max_loss': total_stats['max_loss'], 'avg_holding_days': avg_holding_days, 'profit_factor': profit_factor } def _calculate_strategy_summary(self, strategy_name, by_commodity_strategy): """计算策略级汇总统计""" if strategy_name not in by_commodity_strategy: return self._get_empty_stats() strategy_stats = by_commodity_strategy[strategy_name] total_stats = { 'total_trades': 0, 'open_positions': 0, 'profitable_trades': 0, 'losing_trades': 0, 'closed_profit_loss': 0.0, 'unrealized_profit_loss': 0.0, 'total_profit_loss': 0.0, 'max_profit': 0.0, 'max_loss': 0.0, 'total_holding_days': 0.0, 'total_profits': 0.0, 'total_losses': 0.0, 'all_pct': [] } for commodity, stats in strategy_stats.items(): total_stats['total_trades'] += stats['total_trades'] total_stats['open_positions'] += stats['open_positions'] total_stats['profitable_trades'] += stats['profitable_trades'] total_stats['losing_trades'] += stats['losing_trades'] total_stats['closed_profit_loss'] += stats['closed_profit_loss'] total_stats['unrealized_profit_loss'] += stats['unrealized_profit_loss'] total_stats['total_profit_loss'] += stats['total_profit_loss'] total_stats['max_profit'] = max(total_stats['max_profit'], stats['max_profit']) total_stats['max_loss'] = min(total_stats['max_loss'], stats['max_loss']) total_stats['total_holding_days'] += stats['avg_holding_days'] * stats['total_trades'] if stats['profit_factor'] != float('inf'): profits = stats['closed_profit_loss'] if stats['closed_profit_loss'] > 0 else 0 losses = abs(stats['closed_profit_loss']) if stats['closed_profit_loss'] < 0 else 0 total_stats['total_profits'] += profits total_stats['total_losses'] += losses # 收集所有百分比数据 if stats['total_trades'] > 0: total_stats['all_pct'].extend([stats['avg_profit_loss_pct']] * stats['total_trades']) # 计算汇总指标 win_rate = total_stats['profitable_trades'] / total_stats['total_trades'] if total_stats['total_trades'] > 0 else 0 avg_profit_loss = total_stats['closed_profit_loss'] / total_stats['total_trades'] if total_stats['total_trades'] > 0 else 0 avg_profit_loss_pct = sum(total_stats['all_pct']) / len(total_stats['all_pct']) if total_stats['all_pct'] else 0 avg_holding_days = total_stats['total_holding_days'] / total_stats['total_trades'] if total_stats['total_trades'] > 0 else 0 profit_factor = total_stats['total_profits'] / total_stats['total_losses'] if total_stats['total_losses'] > 0 else float('inf') if total_stats['total_profits'] > 0 else 0 return { 'total_trades': total_stats['total_trades'], 'open_positions': total_stats['open_positions'], 'profitable_trades': total_stats['profitable_trades'], 'losing_trades': total_stats['losing_trades'], 'win_rate': win_rate, 'closed_profit_loss': total_stats['closed_profit_loss'], 'unrealized_profit_loss': total_stats['unrealized_profit_loss'], 'total_profit_loss': total_stats['total_profit_loss'], 'avg_profit_loss': avg_profit_loss, 'avg_profit_loss_pct': avg_profit_loss_pct, 'max_profit': total_stats['max_profit'], 'max_loss': total_stats['max_loss'], 'avg_holding_days': avg_holding_days, 'profit_factor': profit_factor } def _calculate_overall_summary(self, by_strategy): """计算总体汇总统计""" total_stats = { 'total_trades': 0, 'open_positions': 0, 'profitable_trades': 0, 'losing_trades': 0, 'closed_profit_loss': 0.0, 'unrealized_profit_loss': 0.0, 'total_profit_loss': 0.0, 'max_profit': 0.0, 'max_loss': 0.0, 'total_holding_days': 0.0, 'total_profits': 0.0, 'total_losses': 0.0, 'all_pct': [] } for strategy_name, stats in by_strategy.items(): total_stats['total_trades'] += stats['total_trades'] total_stats['open_positions'] += stats['open_positions'] total_stats['profitable_trades'] += stats['profitable_trades'] total_stats['losing_trades'] += stats['losing_trades'] total_stats['closed_profit_loss'] += stats['closed_profit_loss'] total_stats['unrealized_profit_loss'] += stats['unrealized_profit_loss'] total_stats['total_profit_loss'] += stats['total_profit_loss'] total_stats['max_profit'] = max(total_stats['max_profit'], stats['max_profit']) total_stats['max_loss'] = min(total_stats['max_loss'], stats['max_loss']) total_stats['total_holding_days'] += stats['avg_holding_days'] * stats['total_trades'] if stats['profit_factor'] != float('inf'): profits = stats['closed_profit_loss'] if stats['closed_profit_loss'] > 0 else 0 losses = abs(stats['closed_profit_loss']) if stats['closed_profit_loss'] < 0 else 0 total_stats['total_profits'] += profits total_stats['total_losses'] += losses # 收集所有百分比数据 if stats['total_trades'] > 0: total_stats['all_pct'].extend([stats['avg_profit_loss_pct']] * stats['total_trades']) # 计算汇总指标 win_rate = total_stats['profitable_trades'] / total_stats['total_trades'] if total_stats['total_trades'] > 0 else 0 avg_profit_loss = total_stats['closed_profit_loss'] / total_stats['total_trades'] if total_stats['total_trades'] > 0 else 0 avg_profit_loss_pct = sum(total_stats['all_pct']) / len(total_stats['all_pct']) if total_stats['all_pct'] else 0 avg_holding_days = total_stats['total_holding_days'] / total_stats['total_trades'] if total_stats['total_trades'] > 0 else 0 profit_factor = total_stats['total_profits'] / total_stats['total_losses'] if total_stats['total_losses'] > 0 else float('inf') if total_stats['total_profits'] > 0 else 0 return { 'total_trades': total_stats['total_trades'], 'open_positions': total_stats['open_positions'], 'profitable_trades': total_stats['profitable_trades'], 'losing_trades': total_stats['losing_trades'], 'win_rate': win_rate, 'closed_profit_loss': total_stats['closed_profit_loss'], 'unrealized_profit_loss': total_stats['unrealized_profit_loss'], 'total_profit_loss': total_stats['total_profit_loss'], 'avg_profit_loss': avg_profit_loss, 'avg_profit_loss_pct': avg_profit_loss_pct, 'max_profit': total_stats['max_profit'], 'max_loss': total_stats['max_loss'], 'avg_holding_days': avg_holding_days, 'profit_factor': profit_factor } def _get_empty_stats(self): """返回空的统计数据""" return { 'total_trades': 0, 'open_positions': 0, 'profitable_trades': 0, 'losing_trades': 0, 'win_rate': 0.0, 'closed_profit_loss': 0.0, 'unrealized_profit_loss': 0.0, 'total_profit_loss': 0.0, 'avg_profit_loss': 0.0, 'avg_profit_loss_pct': 0.0, 'max_profit': 0.0, 'max_loss': 0.0, 'avg_holding_days': 0.0, 'profit_factor': 0.0 } def _calculate_unrealized_pnl_for_commodity(self, strategy_name, commodity): """计算特定品种未平仓头寸的未实现盈亏""" if strategy_name == 'combined': return (self._calculate_unrealized_pnl_for_commodity('grid_trading', commodity) + self._calculate_unrealized_pnl_for_commodity('grid_hedge', commodity)) unrealized_pnl = 0.0 strategy_positions = self.active_positions.get(strategy_name, {}) if commodity in strategy_positions: current_contract = self._get_current_contract(commodity, self.end_date.date()) if not current_contract: return 0.0 end_price = self._get_price_on_date(commodity, current_contract, self.end_date.date(), 'close') if end_price is None: return 0.0 positions = strategy_positions[commodity] for position_id, position in positions.items(): if position['status'] == 'open' and position['contract'] == current_contract: if strategy_name == 'grid_hedge': pnl = self._calculate_futures_pnl( position['entry_price'], end_price, position['quantity'], commodity, is_long=False ) else: pnl = self._calculate_futures_pnl( position['entry_price'], end_price, position['quantity'], commodity, is_long=True ) unrealized_pnl += pnl return unrealized_pnl def _count_open_positions_for_commodity(self, strategy_name, commodity): """计算特定品种的未平仓头寸数量""" if strategy_name == 'combined': return (self._count_open_positions_for_commodity('grid_trading', commodity) + self._count_open_positions_for_commodity('grid_hedge', commodity)) count = 0 strategy_positions = self.active_positions.get(strategy_name, {}) if commodity in strategy_positions: positions = strategy_positions[commodity] for position_id, position in positions.items(): if position['status'] == 'open': count += 1 return count def _calculate_unrealized_pnl(self, strategy_name): """ 计算未平仓头寸的未实现盈亏 """ unrealized_pnl = 0.0 if strategy_name == 'combined': # 组合策略的未实现盈亏是网格交易和网格对冲的总和 return (self._calculate_unrealized_pnl('grid_trading') + self._calculate_unrealized_pnl('grid_hedge')) # 获取策略对应的头寸字典 strategy_positions = self.active_positions.get(strategy_name, {}) for commodity, positions in strategy_positions.items(): # 获取当前合约和最新价格 current_contract = self._get_current_contract(commodity, self.end_date.date()) if not current_contract: continue # 获取结束日期的收盘价 end_price = self._get_price_on_date(commodity, current_contract, self.end_date.date(), 'close') if end_price is None: continue for position_id, position in positions.items(): if position['status'] == 'open' and position['contract'] == current_contract: # 使用正确的期货盈亏计算公式 if strategy_name == 'grid_hedge': # 网格对冲是做空 pnl = self._calculate_futures_pnl( position['entry_price'], end_price, position['quantity'], commodity, is_long=False ) else: # 基础头寸和网格交易都是做多 pnl = self._calculate_futures_pnl( position['entry_price'], end_price, position['quantity'], commodity, is_long=True ) unrealized_pnl += pnl return unrealized_pnl def _count_open_positions(self, strategy_name): """ 计算未平仓头寸数量 """ if strategy_name == 'combined': return (self._count_open_positions('grid_trading') + self._count_open_positions('grid_hedge')) count = 0 strategy_positions = self.active_positions.get(strategy_name, {}) for commodity, positions in strategy_positions.items(): for position_id, position in positions.items(): if position['status'] == 'open': count += 1 return count def generate_comparison_report(self, performance_stats): """ 生成多级聚合的对比报告 包括品种-策略级、品种级、策略级和总体级报告 """ if self.verbose_logging: print("\n=== 多级聚合对比分析报告 ===") strategies = ['base_position', 'grid_trading', 'grid_hedge', 'combined'] strategy_names = { 'base_position': '基础头寸交易', 'grid_trading': '网格交易', 'grid_hedge': '网格对冲策略', 'combined': '组合策略' } all_comparison_data = { 'by_commodity_strategy': [], 'by_commodity': [], 'by_strategy': [], 'overall': [] } # 1. 品种-策略级对比报告 if self.verbose_logging: print("\n--- 品种-策略级对比 ---") by_comm_strategy_data = [] for strategy in strategies: for commodity in self.core_commodities.keys(): stats = performance_stats['by_commodity_strategy'].get(strategy, {}).get(commodity, {}) if stats.get('total_trades', 0) > 0 or stats.get('open_positions', 0) > 0: by_comm_strategy_data.append({ '品种': commodity, '策略': strategy_names[strategy], '已平仓': stats.get('total_trades', 0), '未平仓': stats.get('open_positions', 0), '胜率': f"{stats.get('win_rate', 0):.2%}", '已实现': f"{stats.get('closed_profit_loss', 0):.2f}", '未实现': f"{stats.get('unrealized_profit_loss', 0):.2f}", '总盈亏': f"{stats.get('total_profit_loss', 0):.2f}", '平均盈亏': f"{stats.get('avg_profit_loss', 0):.2f}", '最大盈利': f"{stats.get('max_profit', 0):.2f}", '最大亏损': f"{stats.get('max_loss', 0):.2f}", '平均天数': f"{stats.get('avg_holding_days', 0):.1f}", '盈亏比': f"{stats.get('profit_factor', 0):.2f}" if stats.get('profit_factor', 0) != float('inf') else "∞" }) if by_comm_strategy_data and self.verbose_logging: df_comm_strategy = pd.DataFrame(by_comm_strategy_data) print(df_comm_strategy.to_string(index=False)) all_comparison_data['by_commodity_strategy'] = by_comm_strategy_data # 2. 品种级汇总对比报告 if self.verbose_logging: print("\n--- 品种级汇总对比 ---") by_commodity_data = [] for commodity in self.core_commodities.keys(): stats = performance_stats['by_commodity'].get(commodity, {}) if stats.get('total_trades', 0) > 0 or stats.get('open_positions', 0) > 0: by_commodity_data.append({ '品种': commodity, '已平仓': stats.get('total_trades', 0), '未平仓': stats.get('open_positions', 0), '胜率': f"{stats.get('win_rate', 0):.2%}", '已实现': f"{stats.get('closed_profit_loss', 0):.2f}", '未实现': f"{stats.get('unrealized_profit_loss', 0):.2f}", '总盈亏': f"{stats.get('total_profit_loss', 0):.2f}", '平均盈亏': f"{stats.get('avg_profit_loss', 0):.2f}", '最大盈利': f"{stats.get('max_profit', 0):.2f}", '最大亏损': f"{stats.get('max_loss', 0):.2f}", '平均天数': f"{stats.get('avg_holding_days', 0):.1f}", '盈亏比': f"{stats.get('profit_factor', 0):.2f}" if stats.get('profit_factor', 0) != float('inf') else "∞" }) if by_commodity_data and self.verbose_logging: df_commodity = pd.DataFrame(by_commodity_data) print(df_commodity.to_string(index=False)) all_comparison_data['by_commodity'] = by_commodity_data # 3. 策略级汇总对比报告 if self.verbose_logging: print("\n--- 策略级汇总对比 ---") by_strategy_data = [] for strategy in strategies: stats = performance_stats['by_strategy'].get(strategy, {}) by_strategy_data.append({ '策略': strategy_names[strategy], '已平仓': stats.get('total_trades', 0), '未平仓': stats.get('open_positions', 0), '胜率': f"{stats.get('win_rate', 0):.2%}", '已实现': f"{stats.get('closed_profit_loss', 0):.2f}", '未实现': f"{stats.get('unrealized_profit_loss', 0):.2f}", '总盈亏': f"{stats.get('total_profit_loss', 0):.2f}", '平均盈亏': f"{stats.get('avg_profit_loss', 0):.2f}", '最大盈利': f"{stats.get('max_profit', 0):.2f}", '最大亏损': f"{stats.get('max_loss', 0):.2f}", '平均天数': f"{stats.get('avg_holding_days', 0):.1f}", '盈亏比': f"{stats.get('profit_factor', 0):.2f}" if stats.get('profit_factor', 0) != float('inf') else "∞" }) if by_strategy_data and self.verbose_logging: df_strategy = pd.DataFrame(by_strategy_data) print(df_strategy.to_string(index=False)) all_comparison_data['by_strategy'] = by_strategy_data # 4. 总体汇总报告 if self.verbose_logging: print("\n--- 整体汇总 ---") overall_data = [] stats = performance_stats['overall'] overall_data.append({ '项目': '整体表现', '已平仓': stats.get('total_trades', 0), '未平仓': stats.get('open_positions', 0), '胜率': f"{stats.get('win_rate', 0):.2%}", '已实现': f"{stats.get('closed_profit_loss', 0):.2f}", '未实现': f"{stats.get('unrealized_profit_loss', 0):.2f}", '总盈亏': f"{stats.get('total_profit_loss', 0):.2f}", '平均盈亏': f"{stats.get('avg_profit_loss', 0):.2f}", '最大盈利': f"{stats.get('max_profit', 0):.2f}", '最大亏损': f"{stats.get('max_loss', 0):.2f}", '平均天数': f"{stats.get('avg_holding_days', 0):.1f}", '盈亏比': f"{stats.get('profit_factor', 0):.2f}" if stats.get('profit_factor', 0) != float('inf') else "∞" }) if overall_data and self.verbose_logging: df_overall = pd.DataFrame(overall_data) print(df_overall.to_string(index=False)) all_comparison_data['overall'] = overall_data return all_comparison_data def generate_csv_output(self, performance_stats): """ 生成CSV输出文件 """ if self.verbose_logging: print("\n=== 步骤8: 生成CSV输出文件 ===") timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S') # 1. 生成交易记录CSV all_trades = [] for strategy_name, trades in self.trading_results.items(): all_trades.extend(trades) if all_trades: df_trades = pd.DataFrame(all_trades) # 添加列顺序,确保合约切换相关字段在前面 column_order = ['commodity', 'contract', 'strategy', 'entry_date', 'exit_date', 'entry_price', 'exit_price', 'quantity', 'profit_loss', 'profit_loss_pct', 'days_held', 'exit_reason'] # 重新排列DataFrame的列顺序 existing_columns = [col for col in column_order if col in df_trades.columns] other_columns = [col for col in df_trades.columns if col not in column_order] df_trades = df_trades[existing_columns + other_columns] trades_filename = f'grid_trading_records_{timestamp}.csv' df_trades.to_csv(trades_filename, index=False, encoding=self.output_encoding) if self.verbose_logging: print(f"交易记录已保存至: {trades_filename}") # 2. 生成多级性能统计CSV csv_files = [] # 2.1 品种-策略级统计CSV by_comm_strategy_data = [] for strategy, commodity_data in performance_stats['by_commodity_strategy'].items(): for commodity, stats in commodity_data.items(): stats_record = { '品种': commodity, '策略': strategy, **stats } by_comm_strategy_data.append(stats_record) if by_comm_strategy_data: df_comm_strategy = pd.DataFrame(by_comm_strategy_data) comm_strategy_filename = f'grid_trading_by_commodity_strategy_{timestamp}.csv' df_comm_strategy.to_csv(comm_strategy_filename, index=False, encoding=self.output_encoding) csv_files.append(comm_strategy_filename) if self.verbose_logging: print(f"品种-策略级统计已保存至: {comm_strategy_filename}") # 2.2 品种级汇总统计CSV by_commodity_data = [] for commodity, stats in performance_stats['by_commodity'].items(): stats_record = { '品种': commodity, **stats } by_commodity_data.append(stats_record) if by_commodity_data: df_commodity = pd.DataFrame(by_commodity_data) commodity_filename = f'grid_trading_by_commodity_{timestamp}.csv' df_commodity.to_csv(commodity_filename, index=False, encoding=self.output_encoding) csv_files.append(commodity_filename) if self.verbose_logging: print(f"品种级汇总统计已保存至: {commodity_filename}") # 2.3 策略级汇总统计CSV by_strategy_data = [] for strategy, stats in performance_stats['by_strategy'].items(): stats_record = { '策略': strategy, **stats } by_strategy_data.append(stats_record) if by_strategy_data: df_strategy = pd.DataFrame(by_strategy_data) strategy_filename = f'grid_trading_by_strategy_{timestamp}.csv' df_strategy.to_csv(strategy_filename, index=False, encoding=self.output_encoding) csv_files.append(strategy_filename) if self.verbose_logging: print(f"策略级汇总统计已保存至: {strategy_filename}") # 2.4 整体汇总统计CSV overall_data = [{ '项目': '整体汇总', **performance_stats['overall'] }] if overall_data: df_overall = pd.DataFrame(overall_data) overall_filename = f'grid_trading_overall_{timestamp}.csv' df_overall.to_csv(overall_filename, index=False, encoding=self.output_encoding) csv_files.append(overall_filename) if self.verbose_logging: print(f"整体汇总统计已保存至: {overall_filename}") return trades_filename if all_trades else None, csv_files def run_complete_analysis(self): """执行完整的网格交易分析流程(带主力合约切换)""" if self.verbose_logging: print("开始执行期货网格交易分析(带主力合约切换)") print("=" * 60) try: # 步骤1: 合约选择 self.select_contracts() if not self.selected_contracts: if self.verbose_logging: print("未选择到有效合约,分析终止") return None # 步骤2: 构建主力合约历史变化 self.build_dominant_contract_history() if not self.dominant_contract_history: if self.verbose_logging: print("未获取到主力合约历史,分析终止") return None # 步骤3: 收集价格数据 self.collect_price_data() if not self.price_data: if self.verbose_logging: print("未获取到有效价格数据,分析终止") return None # 步骤4: 带合约切换的交易模拟 self.simulate_with_contract_switching() # 步骤5: 组合策略模拟 self.simulate_combined_strategy() # 步骤6: 性能统计分析 performance_stats = self.calculate_performance_statistics() # 步骤7: 生成对比报告 comparison_report = self.generate_comparison_report(performance_stats) # 步骤8: 生成CSV输出 # trades_file, stats_files = self.generate_csv_output(performance_stats) # 分析汇总 total_commodities = len(self.selected_contracts) total_trades = sum(len(trades) for trades in self.trading_results.values()) contract_switches = sum(len(history) for history in self.dominant_contract_history.values()) if self.verbose_logging: print("\n" + "=" * 60) print("分析完成汇总:") print(f"分析商品数: {total_commodities}") print(f"合约切换次数: {contract_switches}") print(f"总交易笔数: {total_trades}") # print(f"交易记录文件: {trades_file}") # print(f"性能统计文件: {stats_file}") return { 'selected_contracts': self.selected_contracts, 'dominant_contract_history': self.dominant_contract_history, 'price_data': self.price_data, 'trading_results': self.trading_results, 'performance_stats': performance_stats, 'comparison_report': comparison_report, # 'output_files': { # 'trades_file': trades_file, # 'stats_files': stats_files # }, 'summary': { 'total_commodities': total_commodities, 'contract_switches': contract_switches, 'total_trades': total_trades } } except Exception as e: if self.verbose_logging: print(f"分析过程中出现错误: {str(e)}") import traceback traceback.print_exc() return None # ===================================================================================== # 主程序入口 # ===================================================================================== def run_grid_trading_analysis(config=None): """运行期货网格交易分析""" if config is None: config = GridTradingConfig # 打印配置信息 config.print_config() # 创建分析器并运行 analyzer = FutureGridTradingAnalyzer(config) results = analyzer.run_complete_analysis() return results # 执行分析 if __name__ == "__main__": print("期货网格交易研究分析工具(带主力合约切换)") print("研究期货网格交易策略在不同配置下的表现") print("核心功能包括主力合约自动切换、强制平仓和重新建仓逻辑") print("") print("支持四种交易场景的对比分析:") print(" 1. 基础头寸交易 - 价格-数量网格配置") print(" 2. 网格交易策略 - 限价订单网格买入卖出") print(" 3. 网格对冲策略 - 带止损的增强网格交易") print(" 4. 组合策略 - 网格交易+网格对冲组合") print("") print("主要特点:") print(" - 主力合约自动监控:每日检测主力合约变化") print(" - 强制平仓机制:合约切换时立即平掉旧合约所有头寸") print(" - 智能重新建仓:根据价格条件在新合约中重新建立头寸") print(" - 完整交易记录:记录所有交易包括合约切换引起的强制平仓") print("") print("适用于聚宽在线研究平台") results = run_grid_trading_analysis() if results: print("\n✅ 分析执行成功!") summary = results['summary'] print(f"📊 结果摘要:") print(f" - 分析商品数: {summary['total_commodities']}") print(f" - 合约切换次数: {summary['contract_switches']}") print(f" - 总交易笔数: {summary['total_trades']}") print(f" - 整体总盈亏: {results['performance_stats']['overall']['total_profit_loss']:.2f}") print(f" - 整体胜率: {results['performance_stats']['overall']['win_rate']:.2%}") print(f" - 未平仓头寸: {results['performance_stats']['overall']['open_positions']}") print(f"📂 输出文件:") print(f" - 交易记录文件: {results['output_files']['trades_file']}") for i, stats_file in enumerate(results['output_files']['stats_files'], 1): print(f" - 统计文件{i}: {stats_file}") print(f"\n📈 多级汇总:") # 品种级汇总简要显示 print(f" 品种表现:") for commodity in ['SA', 'M']: comm_stats = results['performance_stats']['by_commodity'].get(commodity, {}) if comm_stats.get('total_trades', 0) > 0 or comm_stats.get('open_positions', 0) > 0: print(f" {commodity}: 交易{comm_stats.get('total_trades', 0)}笔, 盈亏{comm_stats.get('total_profit_loss', 0):.2f}, 胜率{comm_stats.get('win_rate', 0):.2%}") # 策略级汇总简要显示 print(f" 策略表现:") strategy_names = { 'base_position': '基础头寸', 'grid_trading': '网格交易', 'grid_hedge': '网格对冲', 'combined': '组合策略' } for strategy, name in strategy_names.items(): strat_stats = results['performance_stats']['by_strategy'].get(strategy, {}) print(f" {name}: 交易{strat_stats.get('total_trades', 0)}笔, 盈亏{strat_stats.get('total_profit_loss', 0):.2f}, 胜率{strat_stats.get('win_rate', 0):.2%}") else: print("\n❌ 分析执行失败,请检查错误信息")