Просмотр исходного кода

新增期货合约5日移动平均线大幅偏离后是否反弹的分析工具,包含完整的分析流程和配置参数,支持数据收集、距离计算、阈值确定及信号检测与分析。同时新增VSCode配置文件以支持Python环境管理。

maxfeng 1 месяц назад
Родитель
Сommit
723bc03a37
2 измененных файлов с 826 добавлено и 0 удалено
  1. 5 0
      .vscode/settings.json
  2. 821 0
      Lib/research/future_ma5_deviation_analysis.py

+ 5 - 0
.vscode/settings.json

@@ -0,0 +1,5 @@
+{
+    "python-envs.defaultEnvManager": "ms-python.python:conda",
+    "python-envs.defaultPackageManager": "ms-python.python:conda",
+    "python-envs.pythonProjects": []
+}

+ 821 - 0
Lib/research/future_ma5_deviation_analysis.py

@@ -0,0 +1,821 @@
+"""
+期货合约5日移动平均线偏离分析
+研究期货合约在大幅偏离其5天移动平均线(K5)后是否出现重大反转走势
+
+本程序实现完整的分析流程:
+1. 数据结构设置 - 获取主力期货合约数据
+2. 数据收集 - 扩展时间范围获取OHLC数据
+3. 距离计算 - 计算价格相对K5的上下偏离距离
+4. 阈值确定 - 使用百分位数作为极端偏差阈值
+5. 信号检测与分析 - 识别极端偏差并分析后续走势
+6. 结果输出 - 生成Range_1偏差统计和信号分析结果CSV
+
+注:程序使用动态获取的主力合约进行分析,确保分析结果基于真实的交易活动
+
+作者: jukuan研究团队
+日期: 2025-09
+适用平台: 聚宽在线研究平台
+"""
+
+import pandas as pd
+import numpy as np
+from jqdata import *
+import datetime
+import warnings
+warnings.filterwarnings('ignore')
+
+# =====================================================================================
+# 分析配置参数 - 集中配置部分
+# =====================================================================================
+
+class AnalysisConfig:
+    """期货5日移动平均线偏离分析配置参数"""
+    
+    # ==================== 时间范围设置 ====================
+    ANALYSIS_START_DATE = datetime.datetime(2025, 8, 1)  # 分析开始日期
+    ANALYSIS_END_DATE = datetime.datetime(2025, 8, 20)   # 分析结束日期
+    
+    # ==================== 时间扩展参数 ====================
+    HISTORICAL_DAYS = 365      # 历史分析期:优势期前天数
+    FORWARD_DAYS = 20         # 正向分析期:优势期后天数
+    FORWARD_MONITOR_DAYS = 10  # 前向监控窗口:交易日数
+    
+    # ==================== 偏差阈值设置 ====================
+    DEVIATION_PERCENTILE = 90  # 偏差阈值百分位数(范围:0-100)
+    
+    # ==================== 分析规模控制 ====================
+    MAX_ANALYSIS_CONTRACTS = 10  # 最大分析合约数量限制
+    
+    
+    # ==================== 移动平均线参数 ====================
+    MA_PERIOD = 5  # K线移动平均线周期(天数)
+    
+    # ==================== 输出设置 ====================
+    OUTPUT_ENCODING = 'utf-8-sig'  # 输出文件编码格式
+    VERBOSE_LOGGING = True          # 是否打印详细日志
+    
+    @classmethod
+    def print_config(cls):
+        """打印当前配置信息"""
+        print("=== 期货5日移动平均线偏离分析配置 ===")
+        print(f"分析时间范围: {cls.ANALYSIS_START_DATE.strftime('%Y-%m-%d')} 至 {cls.ANALYSIS_END_DATE.strftime('%Y-%m-%d')}")
+        print(f"历史分析期: {cls.HISTORICAL_DAYS}天")
+        print(f"前向分析期: {cls.FORWARD_DAYS}天")
+        print(f"前向监控窗口: {cls.FORWARD_MONITOR_DAYS}个交易日")
+        print(f"偏差阈值百分位数: {cls.DEVIATION_PERCENTILE}%")
+        print(f"最大分析合约数: {cls.MAX_ANALYSIS_CONTRACTS}")
+        print(f"合约类型: 主力合约")
+        print(f"移动平均线周期: {cls.MA_PERIOD}日")
+        print(f"详细日志: {'开启' if cls.VERBOSE_LOGGING else '关闭'}")
+        print("=" * 50)
+
+class FutureMA5DeviationAnalyzer:
+    """期货5日移动平均线偏离分析器"""
+    
+    def __init__(self, config=None):
+        """初始化分析器"""
+        # 使用配置类参数
+        if config is None:
+            config = AnalysisConfig
+        
+        self.config = config
+        self.historical_days = config.HISTORICAL_DAYS
+        self.forward_days = config.FORWARD_DAYS  
+        self.deviation_percentile = config.DEVIATION_PERCENTILE
+        self.forward_monitor_days = config.FORWARD_MONITOR_DAYS
+        self.analysis_start_date = config.ANALYSIS_START_DATE
+        self.analysis_end_date = config.ANALYSIS_END_DATE
+        self.max_analysis_contracts = config.MAX_ANALYSIS_CONTRACTS
+        self.ma_period = config.MA_PERIOD
+        self.output_encoding = config.OUTPUT_ENCODING
+        self.verbose_logging = config.VERBOSE_LOGGING
+        
+        # 期货品种与交易所的映射关系
+        self.exchange_map = {
+            # 大连商品交易所 (XDCE)
+            'A': 'XDCE', 'B': 'XDCE', 'C': 'XDCE', 'CS': 'XDCE', 'FB': 'XDCE', 
+            'I': 'XDCE', 'J': 'XDCE', 'JD': 'XDCE', 'JM': 'XDCE', 'L': 'XDCE',
+            'M': 'XDCE', 'P': 'XDCE', 'PP': 'XDCE', 'PG': 'XDCE', 'RR': 'XDCE',
+            'V': 'XDCE', 'Y': 'XDCE', 'EB': 'XDCE', 'EG': 'XDCE', 'LH': 'XDCE',
+            
+            # 郑州商品交易所 (XZCE)
+            'AP': 'XZCE', 'CF': 'XZCE', 'CY': 'XZCE', 'FG': 'XZCE', 'JR': 'XZCE',
+            'LR': 'XZCE', 'MA': 'XZCE', 'OI': 'XZCE', 'PM': 'XZCE', 'RI': 'XZCE',
+            'RM': 'XZCE', 'RS': 'XZCE', 'SF': 'XZCE', 'SM': 'XZCE', 'SR': 'XZCE',
+            'TA': 'XZCE', 'WH': 'XZCE', 'ZC': 'XZCE', 'CJ': 'XZCE', 
+            'ME': 'XZCE', 'PF': 'XZCE', 'PK': 'XZCE', 'RO': 'XZCE', 'SA': 'XZCE',
+            'TC': 'XZCE', 'UR': 'XZCE', 'WS': 'XZCE', 'WT': 'XZCE',
+            
+            # 上海期货交易所 (XSGE)
+            'AG': 'XSGE', 'AL': 'XSGE', 'AU': 'XSGE', 'BU': 'XSGE', 'CU': 'XSGE',
+            'FU': 'XSGE', 'HC': 'XSGE', 'NI': 'XSGE', 'PB': 'XSGE', 'RB': 'XSGE',
+            'RU': 'XSGE', 'SN': 'XSGE', 'SP': 'XSGE', 'SS': 'XSGE', 'WR': 'XSGE',
+            'ZN': 'XSGE',
+            
+            # 上海国际能源交易中心 (XINE)
+            'BC': 'XINE', 'LU': 'XINE', 'NR': 'XINE', 'SC': 'XINE',
+            
+            # 中金所 (CCFX)
+            'IC': 'CCFX', 'IF': 'CCFX', 'IH': 'CCFX', 'T': 'CCFX', 'TF': 'CCFX', 'TS': 'CCFX',
+            
+            # 广期所 (GFEX)
+            'SI': 'GFEX', 'LC': 'GFEX', 'PS': 'GFEX'
+        }
+        
+        # 存储结果的字典
+        self.contract_data = {}  # 主合约数据结构
+        self.signal_results = {}  # 信号检测结果
+        
+        if self.verbose_logging:
+            print("初始化期货5日移动平均线偏离分析器")
+            print(f"配置参数 - 历史分析期: {self.historical_days}天, 前向分析期: {self.forward_days}天")
+            print(f"偏差阈值: {self.deviation_percentile}分位数, 监控窗口: {self.forward_monitor_days}个交易日")
+            print(f"最大分析合约数: {self.max_analysis_contracts}, 移动平均线周期: {self.ma_period}日")
+            print(f"合约类型: 主力合约")
+    
+    def _is_trading_day(self, check_date):
+        """
+        检查是否为交易日
+        
+        参数:
+            check_date (date): 要检查的日期
+        
+        返回:
+            bool: 是否为交易日
+        """
+        try:
+            # 使用聚宽API检查交易日
+            trade_days = get_trade_days(end_date=check_date, count=1)
+            if len(trade_days) > 0:
+                return trade_days[0].date() == check_date
+            return False
+        except:
+            # 如果API调用失败,简单判断是否为工作日
+            return check_date.weekday() < 5
+    
+    def _get_symbol_dominant_history(self, symbol, start_date, end_date):
+        """
+        获取单个标的的主力合约历史变化
+        
+        参数:
+            symbol (str): 标的编码
+            start_date (date): 开始日期
+            end_date (date): 结束日期
+        
+        返回:
+            list: 主力合约记录列表
+        """
+        results = []
+        current_date = start_date
+        current_dominant = None
+        period_start = None
+        
+        if self.verbose_logging:
+            print(f"  获取 {symbol} 主力合约历史...")
+        
+        # 按日遍历时间范围
+        while current_date <= end_date:
+            
+            # 跳过非交易日
+            if not self._is_trading_day(current_date):
+                current_date += datetime.timedelta(days=1)
+                continue
+            
+            try:
+                # 获取当日主力合约
+                dominant_contract = get_dominant_future(symbol, current_date)
+                
+                if dominant_contract is None:
+                    current_date += datetime.timedelta(days=1)
+                    continue
+                    
+                # 如果是新的主力合约
+                if dominant_contract != current_dominant:
+                    
+                    # 如果不是第一个合约,结束上一个合约的记录
+                    if current_dominant is not None and period_start is not None:
+                        results.append({
+                            'symbol': symbol,
+                            'dominant_contract': current_dominant,
+                            'start_date': period_start,
+                            'end_date': current_date - datetime.timedelta(days=1)
+                        })
+                    
+                    # 开始新合约的记录
+                    current_dominant = dominant_contract
+                    period_start = current_date
+                    
+                    if self.verbose_logging:
+                        print(f"    {current_date}: 主力合约变更为 {dominant_contract}")
+                
+            except Exception as e:
+                if self.verbose_logging:
+                    print(f"    获取 {current_date} 的主力合约时出错: {str(e)}")
+            
+            current_date += datetime.timedelta(days=1)
+        
+        # 处理最后一个合约
+        if current_dominant is not None and period_start is not None:
+            results.append({
+                'symbol': symbol,
+                'dominant_contract': current_dominant,
+                'start_date': period_start,
+                'end_date': end_date
+            })
+        
+        return results
+    
+    def get_dominant_contracts_history(self, symbol_list, start_date, end_date):
+        """
+        获取指定时间范围内多个标的的主力合约历史变化
+        
+        参数:
+            symbol_list (list): 标的编码列表
+            start_date (date): 开始日期
+            end_date (date): 结束日期
+        
+        返回:
+            pandas.DataFrame: 包含主力合约变化历史的数据框
+        """
+        if self.verbose_logging:
+            print(f"获取主力合约数据...")
+            print(f"标的列表: {symbol_list}")
+            print(f"时间范围: {start_date} 至 {end_date}")
+        
+        # 存储所有结果的列表
+        all_results = []
+        
+        # 逐个处理每个标的
+        for symbol in symbol_list:
+            if self.verbose_logging:
+                print(f"\n处理标的: {symbol}")
+            
+            try:
+                # 获取该标的在指定时间范围内的主力合约历史
+                symbol_results = self._get_symbol_dominant_history(symbol, start_date, end_date)
+                all_results.extend(symbol_results)
+                
+            except Exception as e:
+                if self.verbose_logging:
+                    print(f"处理标的 {symbol} 时发生错误: {str(e)}")
+                continue
+        
+        # 将结果转换为DataFrame
+        if all_results:
+            result_df = pd.DataFrame(all_results)
+            result_df = result_df.sort_values(['symbol', 'start_date']).reset_index(drop=True)
+            
+            if self.verbose_logging:
+                print(f"\n成功获取 {len(result_df)} 条主力合约记录")
+            return result_df
+        else:
+            if self.verbose_logging:
+                print("\n未获取到任何主力合约数据")
+            return pd.DataFrame(columns=['symbol', 'dominant_contract', 'start_date', 'end_date'])
+
+    def build_contract_structure(self):
+        """
+        构建期货主力合约数据结构
+        返回结构:Key=完整合约代码, Value=[start_date, end_date]
+        """
+        if self.verbose_logging:
+            print("\n=== 步骤1: 构建期货主力合约数据结构 ===")
+        
+        # 分析时间范围
+        analysis_start = self.analysis_start_date
+        analysis_end = self.analysis_end_date
+        
+        if self.verbose_logging:
+            print(f"分析时间范围: {analysis_start.strftime('%Y-%m-%d')} 至 {analysis_end.strftime('%Y-%m-%d')}")
+            print(f"合约类型: 主力合约")
+        
+        # 构建主力合约数据结构
+        contract_dict = self._build_dominant_contract_structure(analysis_start, analysis_end)
+        
+        self.contract_data = contract_dict
+        if self.verbose_logging:
+            print(f"构建完成,共{len(contract_dict)}个期货合约")
+        return contract_dict
+    
+    def _build_dominant_contract_structure(self, analysis_start, analysis_end):
+        """构建基于主力合约的数据结构"""
+        if self.verbose_logging:
+            print("使用主力合约获取逻辑...")
+        
+        # 获取要分析的期货品种列表
+        symbol_list = list(self.exchange_map.keys())[:self.max_analysis_contracts]
+        
+        # 获取主力合约历史数据
+        dominant_df = self.get_dominant_contracts_history(
+            symbol_list, 
+            analysis_start.date(), 
+            analysis_end.date()
+        )
+        
+        contract_dict = {}
+        
+        if not dominant_df.empty:
+            # 将DataFrame转换为字典结构
+            for _, row in dominant_df.iterrows():
+                contract_code = row['dominant_contract']
+                start_date = row['start_date'] if isinstance(row['start_date'], datetime.date) else row['start_date'].date()
+                end_date = row['end_date'] if isinstance(row['end_date'], datetime.date) else row['end_date'].date()
+                
+                contract_dict[contract_code] = [start_date, end_date]
+                
+                if self.verbose_logging:
+                    print(f"  添加主力合约: {contract_code} ({start_date} 至 {end_date})")
+        
+        return contract_dict
+    
+    def collect_extended_data(self):
+        """
+        数据收集:为每个合约收集扩展时间范围的数据
+        扩展期限:从(start_date-365天)到(end_date+100天)
+        """
+        if self.verbose_logging:
+            print("\n=== 步骤2: 收集扩展时间范围的数据 ===")
+        
+        extended_data = {}
+        
+        for contract_code, period in self.contract_data.items():
+            start_date_1, end_date_1 = period
+            
+            # 计算扩展期限
+            start_date_2 = start_date_1 - datetime.timedelta(days=self.historical_days)
+            end_date_2 = end_date_1 + datetime.timedelta(days=self.forward_days)
+            
+            if self.verbose_logging:
+                print(f"获取 {contract_code} 数据")
+                print(f"  原始期间: {start_date_1} 至 {end_date_1}")
+                print(f"  扩展期间: {start_date_2} 至 {end_date_2}")
+            
+            try:
+                # 获取日线数据
+                data = get_price(
+                    contract_code,
+                    start_date=start_date_2,
+                    end_date=end_date_2,
+                    frequency='daily',
+                    fields=['open', 'close', 'high', 'low', 'volume'],
+                    skip_paused=False,
+                    panel=False
+                )
+                
+                if data is not None and len(data) > 0:
+                    # 计算移动平均线
+                    data['K5'] = data['close'].rolling(window=self.ma_period).mean()
+                    
+                    extended_data[contract_code] = {
+                        'data': data,
+                        'original_start': start_date_1,
+                        'original_end': end_date_1,
+                        'extended_start': start_date_2,
+                        'extended_end': end_date_2
+                    }
+                    
+                    if self.verbose_logging:
+                        print(f"  成功获取{len(data)}条数据记录")
+                else:
+                    if self.verbose_logging:
+                        print(f"  未获取到数据")
+                    
+            except Exception as e:
+                if self.verbose_logging:
+                    print(f"  获取数据失败: {str(e)}")
+                continue
+        
+        if self.verbose_logging:
+            print(f"数据收集完成,共{len(extended_data)}个合约有效数据")
+        return extended_data
+    
+    def calculate_k5_distances(self, extended_data):
+        """
+        计算距离:每个交易日价格相对K5的上下偏离距离
+        """
+        if self.verbose_logging:
+            print("\n=== 步骤3: 计算K5偏离距离 ===")
+        
+        distance_data = {}
+        
+        for contract_code, contract_info in extended_data.items():
+            data = contract_info['data']
+            if self.verbose_logging:
+                print(f"计算 {contract_code} 的K5距离")
+            
+            # 初始化距离列
+            data['upper_distance'] = np.nan
+            data['lower_distance'] = np.nan
+            
+            # 计算每日的上下偏离距离
+            for idx, row in data.iterrows():
+                if pd.isna(row['K5']):  # K5不可用则跳过
+                    continue
+                
+                K5 = row['K5']
+                high = row['high']
+                low = row['low']
+                
+                # 计算K5与高低价的差值
+                k5_minus_high = K5 - high
+                k5_minus_low = K5 - low
+                
+                if k5_minus_high > 0 and k5_minus_low > 0:
+                    # 高低都低于K5
+                    data.loc[idx, 'upper_distance'] = np.nan
+                    data.loc[idx, 'lower_distance'] = abs(K5 - low)
+                    
+                elif k5_minus_high < 0 and k5_minus_low < 0:
+                    # 高低都在K5以上
+                    data.loc[idx, 'upper_distance'] = abs(K5 - high)
+                    data.loc[idx, 'lower_distance'] = np.nan
+                    
+                elif k5_minus_high > 0 and k5_minus_low < 0:
+                    # 高低于K5,低高于K5(K5介于高与低之间)
+                    data.loc[idx, 'upper_distance'] = abs(K5 - high)
+                    data.loc[idx, 'lower_distance'] = abs(K5 - low)
+                
+                # 处理零情况
+                if abs(k5_minus_high) < 1e-6:  # high == K5
+                    data.loc[idx, 'upper_distance'] = 0.0
+                if abs(k5_minus_low) < 1e-6:   # low == K5
+                    data.loc[idx, 'lower_distance'] = 0.0
+            
+            distance_data[contract_code] = contract_info
+            
+            # 统计有效距离数据
+            valid_upper = data['upper_distance'].notna().sum()
+            valid_lower = data['lower_distance'].notna().sum()
+            if self.verbose_logging:
+                print(f"  上偏离距离有效数据: {valid_upper}条, 下偏离距离有效数据: {valid_lower}条")
+        
+        if self.verbose_logging:
+            print("K5距离计算完成")
+        return distance_data
+    
+    def determine_thresholds(self, distance_data):
+        """
+        阈值确定:使用Range_1数据计算百分位数阈值
+        Range_1: start_date_2 <= date < start_date_1
+        """
+        if self.verbose_logging:
+            print(f"\n=== 步骤4: 确定{self.deviation_percentile}分位数阈值 ===")
+        
+        threshold_data = {}
+        
+        for contract_code, contract_info in distance_data.items():
+            data = contract_info['data']
+            start_date_1 = contract_info['original_start']
+            start_date_2 = contract_info['extended_start']
+            
+            if self.verbose_logging:
+                print(f"计算 {contract_code} 的阈值")
+                print(f"  Range_1期间: {start_date_2} 至 {start_date_1}")
+            
+            # 筛选Range_1数据
+            range_1_mask = (data.index.date >= start_date_2) & (data.index.date < start_date_1)
+            range_1_data = data[range_1_mask]
+            
+            if len(range_1_data) == 0:
+                if self.verbose_logging:
+                    print(f"  Range_1无有效数据,跳过")
+                continue
+            
+            # 计算上下偏离距离的分布和阈值
+            upper_distances = range_1_data['upper_distance'].dropna()
+            lower_distances = range_1_data['lower_distance'].dropna()
+            
+            # 百分位数阈值
+            percentile_value = self.deviation_percentile / 100.0
+            upper_threshold = upper_distances.quantile(percentile_value) if len(upper_distances) > 0 else np.nan
+            lower_threshold = lower_distances.quantile(percentile_value) if len(lower_distances) > 0 else np.nan
+            
+            threshold_data[contract_code] = {
+                'contract_info': contract_info,
+                'upper_threshold': upper_threshold,
+                'lower_threshold': lower_threshold,
+                'range_1_stats': {
+                    'total_days': len(range_1_data),
+                    'upper_valid_days': len(upper_distances),
+                    'lower_valid_days': len(lower_distances),
+                    'upward_deviation_days': len(lower_distances[lower_distances >= lower_threshold]) if not pd.isna(lower_threshold) else 0,
+                    'downward_deviation_days': len(upper_distances[upper_distances >= upper_threshold]) if not pd.isna(upper_threshold) else 0
+                }
+            }
+            
+            if self.verbose_logging:
+                print(f"  上偏离{self.deviation_percentile}分位数阈值: {upper_threshold:.4f}" if not pd.isna(upper_threshold) else f"  上偏离阈值: 无效")
+                print(f"  下偏离{self.deviation_percentile}分位数阈值: {lower_threshold:.4f}" if not pd.isna(lower_threshold) else f"  下偏离阈值: 无效")
+            
+        if self.verbose_logging:
+            print("阈值确定完成")
+        return threshold_data
+    
+    def detect_signals_and_analyze(self, threshold_data):
+        """
+        信号检测与分析:在Range_2期间扫描极端偏差并分析后续走势
+        Range_2: start_date_1 <= date <= end_date_2
+        """
+        if self.verbose_logging:
+            print("\n=== 步骤5: 信号检测与前瞻性分析 ===")
+        
+        signal_dict = {}
+        
+        for contract_code, threshold_info in threshold_data.items():
+            contract_info = threshold_info['contract_info']
+            data = contract_info['data']
+            start_date_1 = contract_info['original_start']
+            end_date_2 = contract_info['extended_end']
+            
+            upper_threshold = threshold_info['upper_threshold']
+            lower_threshold = threshold_info['lower_threshold']
+            
+            if self.verbose_logging:
+                print(f"\n分析 {contract_code} 的信号")
+                print(f"  Range_2期间: {start_date_1} 至 {end_date_2}")
+            
+            # 筛选Range_2数据
+            range_2_mask = (data.index.date >= start_date_1) & (data.index.date <= end_date_2)
+            range_2_data = data[range_2_mask]
+            
+            contract_signals = {}
+            
+            # 扫描极端偏差天数
+            for idx, (signal_date, row) in enumerate(range_2_data.iterrows()):
+                upper_distance = row['upper_distance']
+                lower_distance = row['lower_distance']
+                
+                # 检查是否触发阈值
+                trigger_upper = not pd.isna(upper_distance) and not pd.isna(upper_threshold) and upper_distance >= upper_threshold
+                trigger_lower = not pd.isna(lower_distance) and not pd.isna(lower_threshold) and lower_distance >= lower_threshold
+                
+                if trigger_upper or trigger_lower:
+                    # 确定偏离方向和参考价格
+                    if trigger_lower:  # lower_distance触发表示向上偏离(价格在K5之上)
+                        deviation_direction = "up"
+                        deviation_distance = lower_distance
+                        reference_category = "low"
+                        reference_price = row['low']
+                    else:  # upper_distance触发表示向下偏离(价格在K5之下)
+                        deviation_direction = "down"
+                        deviation_distance = upper_distance
+                        reference_category = "high"
+                        reference_price = row['high']
+                    
+                    if self.verbose_logging:
+                        print(f"  发现信号: {signal_date.strftime('%Y-%m-%d')} {deviation_direction}偏离 距离{deviation_distance:.4f}")
+                    
+                    # 前瞻性分析
+                    forward_analysis = self._analyze_forward_performance(
+                        range_2_data, idx, deviation_direction, reference_price, self.forward_monitor_days
+                    )
+                    
+                    # 计算最大利润和风险
+                    max_profit_info = self._find_max_profit(forward_analysis)
+                    max_risk_info = self._find_max_risk(forward_analysis)
+                    
+                    contract_signals[signal_date] = {
+                        "deviation_direction": deviation_direction,
+                        "deviation_distance": float(deviation_distance),
+                        "reference_category": reference_category,
+                        "reference_price": float(reference_price),
+                        "forward_analysis": forward_analysis,
+                        "max_profit": max_profit_info,
+                        "max_risk": max_risk_info
+                    }
+            
+            if contract_signals:
+                signal_dict[contract_code] = contract_signals
+                if self.verbose_logging:
+                    print(f"  检测到{len(contract_signals)}个有效信号")
+            else:
+                if self.verbose_logging:
+                    print(f"  未检测到有效信号")
+        
+        if self.verbose_logging:
+            print(f"\n信号检测完成,共{sum(len(signals) for signals in signal_dict.values())}个信号")
+        self.signal_results = signal_dict
+        return signal_dict
+    
+    def _analyze_forward_performance(self, data, signal_idx, deviation_direction, reference_price, monitor_days):
+        """分析信号后续N个交易日的表现"""
+        forward_analysis = {}
+        
+        for i in range(1, monitor_days + 1):
+            future_idx = signal_idx + i
+            if future_idx < len(data):
+                future_row = data.iloc[future_idx]
+                future_date = data.index[future_idx]
+                
+                # 根据偏离方向选择对应价格
+                if deviation_direction == "up":
+                    corresponding_price = future_row['low']
+                    corresponding_return = (reference_price - corresponding_price) / reference_price
+                    reverse_return = (future_row['high'] - reference_price) / reference_price
+                else:  # down
+                    corresponding_price = future_row['high']
+                    corresponding_return = (corresponding_price - reference_price) / reference_price
+                    reverse_return = (reference_price - future_row['low']) / reference_price
+                
+                forward_analysis[future_date] = {
+                    "date": future_date.strftime('%Y-%m-%d'),
+                    "corresponding_price": float(corresponding_price),
+                    "corresponding_return": float(corresponding_return),
+                    "reverse_category": "high" if deviation_direction == "up" else "low",
+                    "reverse_return": float(reverse_return)
+                }
+        
+        return forward_analysis
+    
+    def _find_max_profit(self, forward_analysis):
+        """找到最大利润日期和收益率"""
+        if not forward_analysis:
+            return {"date": None, "return": 0.0}
+        
+        max_return = max(day_data["corresponding_return"] for day_data in forward_analysis.values())
+        max_date = None
+        
+        for date, day_data in forward_analysis.items():
+            if day_data["corresponding_return"] == max_return:
+                max_date = date.strftime('%Y-%m-%d')
+                break
+        
+        return {"date": max_date, "return": float(max_return)}
+    
+    def _find_max_risk(self, forward_analysis):
+        """找到最大风险日期和收益率"""
+        if not forward_analysis:
+            return {"date": None, "return": 0.0}
+        
+        max_risk = max(day_data["reverse_return"] for day_data in forward_analysis.values())
+        max_risk_date = None
+        
+        for date, day_data in forward_analysis.items():
+            if day_data["reverse_return"] == max_risk:
+                max_risk_date = date.strftime('%Y-%m-%d')
+                break
+        
+        return {"date": max_risk_date, "return": float(max_risk)}
+    
+    def generate_csv_outputs(self, threshold_data, signal_dict):
+        """生成两个CSV文件输出"""
+        if self.verbose_logging:
+            print("\n=== 步骤6: 生成CSV输出文件 ===")
+        
+        timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
+        
+        # CSV 1: Range_1偏差统计
+        self._generate_range1_stats_csv(threshold_data, timestamp)
+        
+        # CSV 2: 信号分析结果
+        self._generate_signal_analysis_csv(signal_dict, timestamp)
+        
+        if self.verbose_logging:
+            print("CSV文件生成完成")
+    
+    def _generate_range1_stats_csv(self, threshold_data, timestamp):
+        """生成Range_1偏差统计CSV"""
+        stats_data = []
+        
+        for contract_code, threshold_info in threshold_data.items():
+            stats = threshold_info['range_1_stats']
+            total_days = stats['total_days']
+            
+            upward_ratio = stats['upward_deviation_days'] / total_days if total_days > 0 else 0
+            downward_ratio = stats['downward_deviation_days'] / total_days if total_days > 0 else 0
+            
+            stats_data.append({
+                'contract_code': contract_code,
+                'upward_deviation_ratio': round(upward_ratio, 4),
+                'downward_deviation_ratio': round(downward_ratio, 4)
+            })
+        
+        if stats_data:
+            stats_df = pd.DataFrame(stats_data)
+            filename = f'range1_deviation_stats_{timestamp}.csv'
+            stats_df.to_csv(filename, index=False, encoding=self.output_encoding)
+            if self.verbose_logging:
+                print(f"Range_1偏差统计保存至: {filename}")
+    
+    def _generate_signal_analysis_csv(self, signal_dict, timestamp):
+        """生成信号分析结果CSV"""
+        signal_data = []
+        
+        for contract_code, contract_signals in signal_dict.items():
+            for signal_date, signal_info in contract_signals.items():
+                signal_data.append({
+                    'contract_code': contract_code,
+                    'signal_date': signal_date.strftime('%Y-%m-%d'),
+                    'deviation_direction': signal_info['deviation_direction'],
+                    'max_profit_date': signal_info['max_profit']['date'],
+                    'max_profit_return': round(signal_info['max_profit']['return'], 4),
+                    'max_risk_date': signal_info['max_risk']['date'],
+                    'max_risk_return': round(signal_info['max_risk']['return'], 4)
+                })
+        
+        if signal_data:
+            signal_df = pd.DataFrame(signal_data)
+            filename = f'signal_analysis_results_{timestamp}.csv'
+            signal_df.to_csv(filename, index=False, encoding=self.output_encoding)
+            if self.verbose_logging:
+                print(f"信号分析结果保存至: {filename}")
+    
+    def run_complete_analysis(self):
+        """执行完整的分析流程"""
+        if self.verbose_logging:
+            print("开始执行期货5日移动平均线偏离分析")
+            print("=" * 60)
+        
+        try:
+            # 步骤1: 构建合约数据结构
+            contract_structure = self.build_contract_structure()
+            
+            # 步骤2: 收集扩展数据
+            extended_data = self.collect_extended_data()
+            if not extended_data:
+                if self.verbose_logging:
+                    print("未获取到有效数据,分析终止")
+                return
+            
+            # 步骤3: 计算K5距离
+            distance_data = self.calculate_k5_distances(extended_data)
+            
+            # 步骤4: 确定阈值
+            threshold_data = self.determine_thresholds(distance_data)
+            if not threshold_data:
+                if self.verbose_logging:
+                    print("未能确定有效阈值,分析终止")
+                return
+            
+            # 步骤5: 信号检测与分析
+            signal_dict = self.detect_signals_and_analyze(threshold_data)
+            
+            # 步骤6: 生成输出文件
+            self.generate_csv_outputs(threshold_data, signal_dict)
+            
+            # 分析汇总
+            total_contracts = len(contract_structure)
+            valid_contracts = len(threshold_data)
+            total_signals = sum(len(signals) for signals in signal_dict.values())
+            
+            if self.verbose_logging:
+                print("\n" + "=" * 60)
+                print("分析完成汇总:")
+                print(f"总合约数: {total_contracts}")
+                print(f"有效合约数: {valid_contracts}")
+                print(f"检测信号数: {total_signals}")
+            
+            return {
+                'contract_structure': contract_structure,
+                'threshold_data': threshold_data,
+                'signal_results': signal_dict,
+                'summary': {
+                    'total_contracts': total_contracts,
+                    'valid_contracts': valid_contracts,
+                    'total_signals': total_signals
+                }
+            }
+            
+        except Exception as e:
+            if self.verbose_logging:
+                print(f"分析过程中出现错误: {str(e)}")
+                import traceback
+                traceback.print_exc()
+            return None
+
+
+# =====================================================================================
+# 主程序入口
+# =====================================================================================
+
+def run_ma5_deviation_analysis(config=None):
+    """运行期货5日移动平均线偏离分析"""
+    if config is None:
+        config = AnalysisConfig
+    
+    # 打印配置信息
+    config.print_config()
+    
+    # 创建分析器并运行
+    analyzer = FutureMA5DeviationAnalyzer(config)
+    results = analyzer.run_complete_analysis()
+    return results
+
+# 执行分析
+if __name__ == "__main__":
+    print("期货5日移动平均线偏离分析工具")
+    print("研究期货合约在大幅偏离K5后的反转走势规律")
+    print("使用动态获取的主力合约进行精准分析")
+    print("适用于聚宽在线研究平台")
+    
+    results = run_ma5_deviation_analysis()
+    
+    if results:
+        print("\n✅ 分析执行成功!")
+        summary = results['summary']
+        print(f"📊 结果摘要:")
+        print(f"   - 分析合约数: {summary['total_contracts']}")
+        print(f"   - 有效数据合约: {summary['valid_contracts']}")
+        print(f"   - 检测信号总数: {summary['total_signals']}")
+    else:
+        print("\n❌ 分析执行失败,请检查错误信息")