""" 期货合约5日移动平均线偏离分析 研究期货合约在大幅偏离其5天移动平均线(K5)后是否出现重大反转走势 本程序实现完整的分析流程: 1. 数据结构设置 - 获取主力期货合约数据 2. 数据收集 - 扩展时间范围获取OHLC数据 3. 距离计算 - 计算价格相对K5的上下偏离距离 4. 阈值确定 - 使用百分位数作为极端偏差阈值 5. 信号检测与分析 - 识别极端偏差并分析后续走势 6. 结果输出 - 生成Range_1偏差统计和信号分析结果CSV 注:程序使用动态获取的主力合约进行分析,确保分析结果基于真实的交易活动 作者: jukuan研究团队 日期: 2025-09 适用平台: 聚宽在线研究平台 """ import pandas as pd import numpy as np from jqdata import * import datetime import warnings warnings.filterwarnings('ignore') # ===================================================================================== # 分析配置参数 - 集中配置部分 # ===================================================================================== class AnalysisConfig: """期货5日移动平均线偏离分析配置参数""" # ==================== 时间范围设置 ==================== ANALYSIS_START_DATE = datetime.datetime(2025, 8, 1) # 分析开始日期 ANALYSIS_END_DATE = datetime.datetime(2025, 8, 20) # 分析结束日期 # ==================== 时间扩展参数 ==================== HISTORICAL_DAYS = 365 # 历史分析期:优势期前天数 FORWARD_DAYS = 20 # 正向分析期:优势期后天数 FORWARD_MONITOR_DAYS = 10 # 前向监控窗口:交易日数 # ==================== 偏差阈值设置 ==================== DEVIATION_PERCENTILE = 90 # 偏差阈值百分位数(范围:0-100) # ==================== 分析规模控制 ==================== MAX_ANALYSIS_CONTRACTS = 10 # 最大分析合约数量限制 # ==================== 移动平均线参数 ==================== MA_PERIOD = 5 # K线移动平均线周期(天数) # ==================== 输出设置 ==================== OUTPUT_ENCODING = 'utf-8-sig' # 输出文件编码格式 VERBOSE_LOGGING = True # 是否打印详细日志 @classmethod def print_config(cls): """打印当前配置信息""" print("=== 期货5日移动平均线偏离分析配置 ===") print(f"分析时间范围: {cls.ANALYSIS_START_DATE.strftime('%Y-%m-%d')} 至 {cls.ANALYSIS_END_DATE.strftime('%Y-%m-%d')}") print(f"历史分析期: {cls.HISTORICAL_DAYS}天") print(f"前向分析期: {cls.FORWARD_DAYS}天") print(f"前向监控窗口: {cls.FORWARD_MONITOR_DAYS}个交易日") print(f"偏差阈值百分位数: {cls.DEVIATION_PERCENTILE}%") print(f"最大分析合约数: {cls.MAX_ANALYSIS_CONTRACTS}") print(f"合约类型: 主力合约") print(f"移动平均线周期: {cls.MA_PERIOD}日") print(f"详细日志: {'开启' if cls.VERBOSE_LOGGING else '关闭'}") print("=" * 50) class FutureMA5DeviationAnalyzer: """期货5日移动平均线偏离分析器""" def __init__(self, config=None): """初始化分析器""" # 使用配置类参数 if config is None: config = AnalysisConfig self.config = config self.historical_days = config.HISTORICAL_DAYS self.forward_days = config.FORWARD_DAYS self.deviation_percentile = config.DEVIATION_PERCENTILE self.forward_monitor_days = config.FORWARD_MONITOR_DAYS self.analysis_start_date = config.ANALYSIS_START_DATE self.analysis_end_date = config.ANALYSIS_END_DATE self.max_analysis_contracts = config.MAX_ANALYSIS_CONTRACTS self.ma_period = config.MA_PERIOD self.output_encoding = config.OUTPUT_ENCODING self.verbose_logging = config.VERBOSE_LOGGING # 期货品种与交易所的映射关系 self.exchange_map = { # 大连商品交易所 (XDCE) 'A': 'XDCE', 'B': 'XDCE', 'C': 'XDCE', 'CS': 'XDCE', 'FB': 'XDCE', 'I': 'XDCE', 'J': 'XDCE', 'JD': 'XDCE', 'JM': 'XDCE', 'L': 'XDCE', 'M': 'XDCE', 'P': 'XDCE', 'PP': 'XDCE', 'PG': 'XDCE', 'RR': 'XDCE', 'V': 'XDCE', 'Y': 'XDCE', 'EB': 'XDCE', 'EG': 'XDCE', 'LH': 'XDCE', # 郑州商品交易所 (XZCE) 'AP': 'XZCE', 'CF': 'XZCE', 'CY': 'XZCE', 'FG': 'XZCE', 'JR': 'XZCE', 'LR': 'XZCE', 'MA': 'XZCE', 'OI': 'XZCE', 'PM': 'XZCE', 'RI': 'XZCE', 'RM': 'XZCE', 'RS': 'XZCE', 'SF': 'XZCE', 'SM': 'XZCE', 'SR': 'XZCE', 'TA': 'XZCE', 'WH': 'XZCE', 'ZC': 'XZCE', 'CJ': 'XZCE', 'ME': 'XZCE', 'PF': 'XZCE', 'PK': 'XZCE', 'RO': 'XZCE', 'SA': 'XZCE', 'TC': 'XZCE', 'UR': 'XZCE', 'WS': 'XZCE', 'WT': 'XZCE', # 上海期货交易所 (XSGE) 'AG': 'XSGE', 'AL': 'XSGE', 'AU': 'XSGE', 'BU': 'XSGE', 'CU': 'XSGE', 'FU': 'XSGE', 'HC': 'XSGE', 'NI': 'XSGE', 'PB': 'XSGE', 'RB': 'XSGE', 'RU': 'XSGE', 'SN': 'XSGE', 'SP': 'XSGE', 'SS': 'XSGE', 'WR': 'XSGE', 'ZN': 'XSGE', # 上海国际能源交易中心 (XINE) 'BC': 'XINE', 'LU': 'XINE', 'NR': 'XINE', 'SC': 'XINE', # 中金所 (CCFX) 'IC': 'CCFX', 'IF': 'CCFX', 'IH': 'CCFX', 'T': 'CCFX', 'TF': 'CCFX', 'TS': 'CCFX', # 广期所 (GFEX) 'SI': 'GFEX', 'LC': 'GFEX', 'PS': 'GFEX' } # 存储结果的字典 self.contract_data = {} # 主合约数据结构 self.signal_results = {} # 信号检测结果 if self.verbose_logging: print("初始化期货5日移动平均线偏离分析器") print(f"配置参数 - 历史分析期: {self.historical_days}天, 前向分析期: {self.forward_days}天") print(f"偏差阈值: {self.deviation_percentile}分位数, 监控窗口: {self.forward_monitor_days}个交易日") print(f"最大分析合约数: {self.max_analysis_contracts}, 移动平均线周期: {self.ma_period}日") print(f"合约类型: 主力合约") def _is_trading_day(self, check_date): """ 检查是否为交易日 参数: check_date (date): 要检查的日期 返回: bool: 是否为交易日 """ try: # 使用聚宽API检查交易日 trade_days = get_trade_days(end_date=check_date, count=1) if len(trade_days) > 0: return trade_days[0].date() == check_date return False except: # 如果API调用失败,简单判断是否为工作日 return check_date.weekday() < 5 def _get_symbol_dominant_history(self, symbol, start_date, end_date): """ 获取单个标的的主力合约历史变化 参数: symbol (str): 标的编码 start_date (date): 开始日期 end_date (date): 结束日期 返回: list: 主力合约记录列表 """ results = [] current_date = start_date current_dominant = None period_start = None if self.verbose_logging: print(f" 获取 {symbol} 主力合约历史...") # 按日遍历时间范围 while current_date <= end_date: # 跳过非交易日 if not self._is_trading_day(current_date): current_date += datetime.timedelta(days=1) continue try: # 获取当日主力合约 dominant_contract = get_dominant_future(symbol, current_date) if dominant_contract is None: current_date += datetime.timedelta(days=1) continue # 如果是新的主力合约 if dominant_contract != current_dominant: # 如果不是第一个合约,结束上一个合约的记录 if current_dominant is not None and period_start is not None: results.append({ 'symbol': symbol, 'dominant_contract': current_dominant, 'start_date': period_start, 'end_date': current_date - datetime.timedelta(days=1) }) # 开始新合约的记录 current_dominant = dominant_contract period_start = current_date if self.verbose_logging: print(f" {current_date}: 主力合约变更为 {dominant_contract}") except Exception as e: if self.verbose_logging: print(f" 获取 {current_date} 的主力合约时出错: {str(e)}") current_date += datetime.timedelta(days=1) # 处理最后一个合约 if current_dominant is not None and period_start is not None: results.append({ 'symbol': symbol, 'dominant_contract': current_dominant, 'start_date': period_start, 'end_date': end_date }) return results def get_dominant_contracts_history(self, symbol_list, start_date, end_date): """ 获取指定时间范围内多个标的的主力合约历史变化 参数: symbol_list (list): 标的编码列表 start_date (date): 开始日期 end_date (date): 结束日期 返回: pandas.DataFrame: 包含主力合约变化历史的数据框 """ if self.verbose_logging: print(f"获取主力合约数据...") print(f"标的列表: {symbol_list}") print(f"时间范围: {start_date} 至 {end_date}") # 存储所有结果的列表 all_results = [] # 逐个处理每个标的 for symbol in symbol_list: if self.verbose_logging: print(f"\n处理标的: {symbol}") try: # 获取该标的在指定时间范围内的主力合约历史 symbol_results = self._get_symbol_dominant_history(symbol, start_date, end_date) all_results.extend(symbol_results) except Exception as e: if self.verbose_logging: print(f"处理标的 {symbol} 时发生错误: {str(e)}") continue # 将结果转换为DataFrame if all_results: result_df = pd.DataFrame(all_results) result_df = result_df.sort_values(['symbol', 'start_date']).reset_index(drop=True) if self.verbose_logging: print(f"\n成功获取 {len(result_df)} 条主力合约记录") return result_df else: if self.verbose_logging: print("\n未获取到任何主力合约数据") return pd.DataFrame(columns=['symbol', 'dominant_contract', 'start_date', 'end_date']) def build_contract_structure(self): """ 构建期货主力合约数据结构 返回结构:Key=完整合约代码, Value=[start_date, end_date] """ if self.verbose_logging: print("\n=== 步骤1: 构建期货主力合约数据结构 ===") # 分析时间范围 analysis_start = self.analysis_start_date analysis_end = self.analysis_end_date if self.verbose_logging: print(f"分析时间范围: {analysis_start.strftime('%Y-%m-%d')} 至 {analysis_end.strftime('%Y-%m-%d')}") print(f"合约类型: 主力合约") # 构建主力合约数据结构 contract_dict = self._build_dominant_contract_structure(analysis_start, analysis_end) self.contract_data = contract_dict if self.verbose_logging: print(f"构建完成,共{len(contract_dict)}个期货合约") return contract_dict def _build_dominant_contract_structure(self, analysis_start, analysis_end): """构建基于主力合约的数据结构""" if self.verbose_logging: print("使用主力合约获取逻辑...") # 获取要分析的期货品种列表 symbol_list = list(self.exchange_map.keys())[:self.max_analysis_contracts] # 获取主力合约历史数据 dominant_df = self.get_dominant_contracts_history( symbol_list, analysis_start.date(), analysis_end.date() ) contract_dict = {} if not dominant_df.empty: # 将DataFrame转换为字典结构 for _, row in dominant_df.iterrows(): contract_code = row['dominant_contract'] start_date = row['start_date'] if isinstance(row['start_date'], datetime.date) else row['start_date'].date() end_date = row['end_date'] if isinstance(row['end_date'], datetime.date) else row['end_date'].date() contract_dict[contract_code] = [start_date, end_date] if self.verbose_logging: print(f" 添加主力合约: {contract_code} ({start_date} 至 {end_date})") return contract_dict def collect_extended_data(self): """ 数据收集:为每个合约收集扩展时间范围的数据 扩展期限:从(start_date-365天)到(end_date+100天) """ if self.verbose_logging: print("\n=== 步骤2: 收集扩展时间范围的数据 ===") extended_data = {} for contract_code, period in self.contract_data.items(): start_date_1, end_date_1 = period # 计算扩展期限 start_date_2 = start_date_1 - datetime.timedelta(days=self.historical_days) end_date_2 = end_date_1 + datetime.timedelta(days=self.forward_days) if self.verbose_logging: print(f"获取 {contract_code} 数据") print(f" 原始期间: {start_date_1} 至 {end_date_1}") print(f" 扩展期间: {start_date_2} 至 {end_date_2}") try: # 获取日线数据 data = get_price( contract_code, start_date=start_date_2, end_date=end_date_2, frequency='daily', fields=['open', 'close', 'high', 'low', 'volume'], skip_paused=False, panel=False ) if data is not None and len(data) > 0: # 计算移动平均线 data['K5'] = data['close'].rolling(window=self.ma_period).mean() extended_data[contract_code] = { 'data': data, 'original_start': start_date_1, 'original_end': end_date_1, 'extended_start': start_date_2, 'extended_end': end_date_2 } if self.verbose_logging: print(f" 成功获取{len(data)}条数据记录") else: if self.verbose_logging: print(f" 未获取到数据") except Exception as e: if self.verbose_logging: print(f" 获取数据失败: {str(e)}") continue if self.verbose_logging: print(f"数据收集完成,共{len(extended_data)}个合约有效数据") return extended_data def calculate_k5_distances(self, extended_data): """ 计算距离:每个交易日价格相对K5的上下偏离距离 """ if self.verbose_logging: print("\n=== 步骤3: 计算K5偏离距离 ===") distance_data = {} for contract_code, contract_info in extended_data.items(): data = contract_info['data'] if self.verbose_logging: print(f"计算 {contract_code} 的K5距离") # 初始化距离列 data['upper_distance'] = np.nan data['lower_distance'] = np.nan # 计算每日的上下偏离距离 for idx, row in data.iterrows(): if pd.isna(row['K5']): # K5不可用则跳过 continue K5 = row['K5'] high = row['high'] low = row['low'] # 计算K5与高低价的差值 k5_minus_high = K5 - high k5_minus_low = K5 - low if k5_minus_high > 0 and k5_minus_low > 0: # 高低都低于K5 data.loc[idx, 'upper_distance'] = np.nan data.loc[idx, 'lower_distance'] = abs(K5 - low) elif k5_minus_high < 0 and k5_minus_low < 0: # 高低都在K5以上 data.loc[idx, 'upper_distance'] = abs(K5 - high) data.loc[idx, 'lower_distance'] = np.nan elif k5_minus_high > 0 and k5_minus_low < 0: # 高低于K5,低高于K5(K5介于高与低之间) data.loc[idx, 'upper_distance'] = abs(K5 - high) data.loc[idx, 'lower_distance'] = abs(K5 - low) # 处理零情况 if abs(k5_minus_high) < 1e-6: # high == K5 data.loc[idx, 'upper_distance'] = 0.0 if abs(k5_minus_low) < 1e-6: # low == K5 data.loc[idx, 'lower_distance'] = 0.0 distance_data[contract_code] = contract_info # 统计有效距离数据 valid_upper = data['upper_distance'].notna().sum() valid_lower = data['lower_distance'].notna().sum() if self.verbose_logging: print(f" 上偏离距离有效数据: {valid_upper}条, 下偏离距离有效数据: {valid_lower}条") if self.verbose_logging: print("K5距离计算完成") return distance_data def determine_thresholds(self, distance_data): """ 阈值确定:使用Range_1数据计算百分位数阈值 Range_1: start_date_2 <= date < start_date_1 """ if self.verbose_logging: print(f"\n=== 步骤4: 确定{self.deviation_percentile}分位数阈值 ===") threshold_data = {} for contract_code, contract_info in distance_data.items(): data = contract_info['data'] start_date_1 = contract_info['original_start'] start_date_2 = contract_info['extended_start'] if self.verbose_logging: print(f"计算 {contract_code} 的阈值") print(f" Range_1期间: {start_date_2} 至 {start_date_1}") # 筛选Range_1数据 range_1_mask = (data.index.date >= start_date_2) & (data.index.date < start_date_1) range_1_data = data[range_1_mask] if len(range_1_data) == 0: if self.verbose_logging: print(f" Range_1无有效数据,跳过") continue # 计算上下偏离距离的分布和阈值 upper_distances = range_1_data['upper_distance'].dropna() lower_distances = range_1_data['lower_distance'].dropna() # 百分位数阈值 percentile_value = self.deviation_percentile / 100.0 upper_threshold = upper_distances.quantile(percentile_value) if len(upper_distances) > 0 else np.nan lower_threshold = lower_distances.quantile(percentile_value) if len(lower_distances) > 0 else np.nan threshold_data[contract_code] = { 'contract_info': contract_info, 'upper_threshold': upper_threshold, 'lower_threshold': lower_threshold, 'range_1_stats': { 'total_days': len(range_1_data), 'upper_valid_days': len(upper_distances), 'lower_valid_days': len(lower_distances), 'upward_deviation_days': len(lower_distances[lower_distances >= lower_threshold]) if not pd.isna(lower_threshold) else 0, 'downward_deviation_days': len(upper_distances[upper_distances >= upper_threshold]) if not pd.isna(upper_threshold) else 0 } } if self.verbose_logging: print(f" 上偏离{self.deviation_percentile}分位数阈值: {upper_threshold:.4f}" if not pd.isna(upper_threshold) else f" 上偏离阈值: 无效") print(f" 下偏离{self.deviation_percentile}分位数阈值: {lower_threshold:.4f}" if not pd.isna(lower_threshold) else f" 下偏离阈值: 无效") if self.verbose_logging: print("阈值确定完成") return threshold_data def detect_signals_and_analyze(self, threshold_data): """ 信号检测与分析:在Range_2期间扫描极端偏差并分析后续走势 Range_2: start_date_1 <= date <= end_date_2 """ if self.verbose_logging: print("\n=== 步骤5: 信号检测与前瞻性分析 ===") signal_dict = {} for contract_code, threshold_info in threshold_data.items(): contract_info = threshold_info['contract_info'] data = contract_info['data'] start_date_1 = contract_info['original_start'] end_date_2 = contract_info['extended_end'] upper_threshold = threshold_info['upper_threshold'] lower_threshold = threshold_info['lower_threshold'] if self.verbose_logging: print(f"\n分析 {contract_code} 的信号") print(f" Range_2期间: {start_date_1} 至 {end_date_2}") # 筛选Range_2数据 range_2_mask = (data.index.date >= start_date_1) & (data.index.date <= end_date_2) range_2_data = data[range_2_mask] contract_signals = {} # 扫描极端偏差天数 for idx, (signal_date, row) in enumerate(range_2_data.iterrows()): upper_distance = row['upper_distance'] lower_distance = row['lower_distance'] # 检查是否触发阈值 trigger_upper = not pd.isna(upper_distance) and not pd.isna(upper_threshold) and upper_distance >= upper_threshold trigger_lower = not pd.isna(lower_distance) and not pd.isna(lower_threshold) and lower_distance >= lower_threshold if trigger_upper or trigger_lower: # 确定偏离方向和参考价格 if trigger_lower: # lower_distance触发表示向上偏离(价格在K5之上) deviation_direction = "up" deviation_distance = lower_distance reference_category = "low" reference_price = row['low'] else: # upper_distance触发表示向下偏离(价格在K5之下) deviation_direction = "down" deviation_distance = upper_distance reference_category = "high" reference_price = row['high'] if self.verbose_logging: print(f" 发现信号: {signal_date.strftime('%Y-%m-%d')} {deviation_direction}偏离 距离{deviation_distance:.4f}") # 前瞻性分析 forward_analysis = self._analyze_forward_performance( range_2_data, idx, deviation_direction, reference_price, self.forward_monitor_days ) # 计算最大利润和风险 max_profit_info = self._find_max_profit(forward_analysis) max_risk_info = self._find_max_risk(forward_analysis) contract_signals[signal_date] = { "deviation_direction": deviation_direction, "deviation_distance": float(deviation_distance), "reference_category": reference_category, "reference_price": float(reference_price), "forward_analysis": forward_analysis, "max_profit": max_profit_info, "max_risk": max_risk_info } if contract_signals: signal_dict[contract_code] = contract_signals if self.verbose_logging: print(f" 检测到{len(contract_signals)}个有效信号") else: if self.verbose_logging: print(f" 未检测到有效信号") if self.verbose_logging: print(f"\n信号检测完成,共{sum(len(signals) for signals in signal_dict.values())}个信号") self.signal_results = signal_dict return signal_dict def _analyze_forward_performance(self, data, signal_idx, deviation_direction, reference_price, monitor_days): """分析信号后续N个交易日的表现""" forward_analysis = {} for i in range(1, monitor_days + 1): future_idx = signal_idx + i if future_idx < len(data): future_row = data.iloc[future_idx] future_date = data.index[future_idx] # 根据偏离方向选择对应价格 if deviation_direction == "up": corresponding_price = future_row['low'] corresponding_return = (reference_price - corresponding_price) / reference_price reverse_return = (future_row['high'] - reference_price) / reference_price else: # down corresponding_price = future_row['high'] corresponding_return = (corresponding_price - reference_price) / reference_price reverse_return = (reference_price - future_row['low']) / reference_price forward_analysis[future_date] = { "date": future_date.strftime('%Y-%m-%d'), "corresponding_price": float(corresponding_price), "corresponding_return": float(corresponding_return), "reverse_category": "high" if deviation_direction == "up" else "low", "reverse_return": float(reverse_return) } return forward_analysis def _find_max_profit(self, forward_analysis): """找到最大利润日期和收益率""" if not forward_analysis: return {"date": None, "return": 0.0} max_return = max(day_data["corresponding_return"] for day_data in forward_analysis.values()) max_date = None for date, day_data in forward_analysis.items(): if day_data["corresponding_return"] == max_return: max_date = date.strftime('%Y-%m-%d') break return {"date": max_date, "return": float(max_return)} def _find_max_risk(self, forward_analysis): """找到最大风险日期和收益率""" if not forward_analysis: return {"date": None, "return": 0.0} max_risk = max(day_data["reverse_return"] for day_data in forward_analysis.values()) max_risk_date = None for date, day_data in forward_analysis.items(): if day_data["reverse_return"] == max_risk: max_risk_date = date.strftime('%Y-%m-%d') break return {"date": max_risk_date, "return": float(max_risk)} def generate_csv_outputs(self, threshold_data, signal_dict): """生成两个CSV文件输出""" if self.verbose_logging: print("\n=== 步骤6: 生成CSV输出文件 ===") timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S') # CSV 1: Range_1偏差统计 self._generate_range1_stats_csv(threshold_data, timestamp) # CSV 2: 信号分析结果 self._generate_signal_analysis_csv(signal_dict, timestamp) if self.verbose_logging: print("CSV文件生成完成") def _generate_range1_stats_csv(self, threshold_data, timestamp): """生成Range_1偏差统计CSV""" stats_data = [] for contract_code, threshold_info in threshold_data.items(): stats = threshold_info['range_1_stats'] total_days = stats['total_days'] upward_ratio = stats['upward_deviation_days'] / total_days if total_days > 0 else 0 downward_ratio = stats['downward_deviation_days'] / total_days if total_days > 0 else 0 stats_data.append({ 'contract_code': contract_code, 'upward_deviation_ratio': round(upward_ratio, 4), 'downward_deviation_ratio': round(downward_ratio, 4) }) if stats_data: stats_df = pd.DataFrame(stats_data) filename = f'range1_deviation_stats_{timestamp}.csv' stats_df.to_csv(filename, index=False, encoding=self.output_encoding) if self.verbose_logging: print(f"Range_1偏差统计保存至: {filename}") def _generate_signal_analysis_csv(self, signal_dict, timestamp): """生成信号分析结果CSV""" signal_data = [] for contract_code, contract_signals in signal_dict.items(): for signal_date, signal_info in contract_signals.items(): signal_data.append({ 'contract_code': contract_code, 'signal_date': signal_date.strftime('%Y-%m-%d'), 'deviation_direction': signal_info['deviation_direction'], 'max_profit_date': signal_info['max_profit']['date'], 'max_profit_return': round(signal_info['max_profit']['return'], 4), 'max_risk_date': signal_info['max_risk']['date'], 'max_risk_return': round(signal_info['max_risk']['return'], 4) }) if signal_data: signal_df = pd.DataFrame(signal_data) filename = f'signal_analysis_results_{timestamp}.csv' signal_df.to_csv(filename, index=False, encoding=self.output_encoding) if self.verbose_logging: print(f"信号分析结果保存至: {filename}") def run_complete_analysis(self): """执行完整的分析流程""" if self.verbose_logging: print("开始执行期货5日移动平均线偏离分析") print("=" * 60) try: # 步骤1: 构建合约数据结构 contract_structure = self.build_contract_structure() # 步骤2: 收集扩展数据 extended_data = self.collect_extended_data() if not extended_data: if self.verbose_logging: print("未获取到有效数据,分析终止") return # 步骤3: 计算K5距离 distance_data = self.calculate_k5_distances(extended_data) # 步骤4: 确定阈值 threshold_data = self.determine_thresholds(distance_data) if not threshold_data: if self.verbose_logging: print("未能确定有效阈值,分析终止") return # 步骤5: 信号检测与分析 signal_dict = self.detect_signals_and_analyze(threshold_data) # 步骤6: 生成输出文件 self.generate_csv_outputs(threshold_data, signal_dict) # 分析汇总 total_contracts = len(contract_structure) valid_contracts = len(threshold_data) total_signals = sum(len(signals) for signals in signal_dict.values()) if self.verbose_logging: print("\n" + "=" * 60) print("分析完成汇总:") print(f"总合约数: {total_contracts}") print(f"有效合约数: {valid_contracts}") print(f"检测信号数: {total_signals}") return { 'contract_structure': contract_structure, 'threshold_data': threshold_data, 'signal_results': signal_dict, 'summary': { 'total_contracts': total_contracts, 'valid_contracts': valid_contracts, 'total_signals': total_signals } } except Exception as e: if self.verbose_logging: print(f"分析过程中出现错误: {str(e)}") import traceback traceback.print_exc() return None # ===================================================================================== # 主程序入口 # ===================================================================================== def run_ma5_deviation_analysis(config=None): """运行期货5日移动平均线偏离分析""" if config is None: config = AnalysisConfig # 打印配置信息 config.print_config() # 创建分析器并运行 analyzer = FutureMA5DeviationAnalyzer(config) results = analyzer.run_complete_analysis() return results # 执行分析 if __name__ == "__main__": print("期货5日移动平均线偏离分析工具") print("研究期货合约在大幅偏离K5后的反转走势规律") print("使用动态获取的主力合约进行精准分析") print("适用于聚宽在线研究平台") results = run_ma5_deviation_analysis() if results: print("\n✅ 分析执行成功!") summary = results['summary'] print(f"📊 结果摘要:") print(f" - 分析合约数: {summary['total_contracts']}") print(f" - 有效数据合约: {summary['valid_contracts']}") print(f" - 检测信号总数: {summary['total_signals']}") else: print("\n❌ 分析执行失败,请检查错误信息")