| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821 |
- """
- 期货合约5日移动平均线偏离分析
- 研究期货合约在大幅偏离其5天移动平均线(K5)后是否出现重大反转走势
- 本程序实现完整的分析流程:
- 1. 数据结构设置 - 获取主力期货合约数据
- 2. 数据收集 - 扩展时间范围获取OHLC数据
- 3. 距离计算 - 计算价格相对K5的上下偏离距离
- 4. 阈值确定 - 使用百分位数作为极端偏差阈值
- 5. 信号检测与分析 - 识别极端偏差并分析后续走势
- 6. 结果输出 - 生成Range_1偏差统计和信号分析结果CSV
- 注:程序使用动态获取的主力合约进行分析,确保分析结果基于真实的交易活动
- 作者: jukuan研究团队
- 日期: 2025-09
- 适用平台: 聚宽在线研究平台
- """
- import pandas as pd
- import numpy as np
- from jqdata import *
- import datetime
- import warnings
- warnings.filterwarnings('ignore')
- # =====================================================================================
- # 分析配置参数 - 集中配置部分
- # =====================================================================================
- class AnalysisConfig:
- """期货5日移动平均线偏离分析配置参数"""
-
- # ==================== 时间范围设置 ====================
- ANALYSIS_START_DATE = datetime.datetime(2025, 8, 1) # 分析开始日期
- ANALYSIS_END_DATE = datetime.datetime(2025, 8, 20) # 分析结束日期
-
- # ==================== 时间扩展参数 ====================
- HISTORICAL_DAYS = 365 # 历史分析期:优势期前天数
- FORWARD_DAYS = 20 # 正向分析期:优势期后天数
- FORWARD_MONITOR_DAYS = 10 # 前向监控窗口:交易日数
-
- # ==================== 偏差阈值设置 ====================
- DEVIATION_PERCENTILE = 90 # 偏差阈值百分位数(范围:0-100)
-
- # ==================== 分析规模控制 ====================
- MAX_ANALYSIS_CONTRACTS = 10 # 最大分析合约数量限制
-
-
- # ==================== 移动平均线参数 ====================
- MA_PERIOD = 5 # K线移动平均线周期(天数)
-
- # ==================== 输出设置 ====================
- OUTPUT_ENCODING = 'utf-8-sig' # 输出文件编码格式
- VERBOSE_LOGGING = True # 是否打印详细日志
-
- @classmethod
- def print_config(cls):
- """打印当前配置信息"""
- print("=== 期货5日移动平均线偏离分析配置 ===")
- print(f"分析时间范围: {cls.ANALYSIS_START_DATE.strftime('%Y-%m-%d')} 至 {cls.ANALYSIS_END_DATE.strftime('%Y-%m-%d')}")
- print(f"历史分析期: {cls.HISTORICAL_DAYS}天")
- print(f"前向分析期: {cls.FORWARD_DAYS}天")
- print(f"前向监控窗口: {cls.FORWARD_MONITOR_DAYS}个交易日")
- print(f"偏差阈值百分位数: {cls.DEVIATION_PERCENTILE}%")
- print(f"最大分析合约数: {cls.MAX_ANALYSIS_CONTRACTS}")
- print(f"合约类型: 主力合约")
- print(f"移动平均线周期: {cls.MA_PERIOD}日")
- print(f"详细日志: {'开启' if cls.VERBOSE_LOGGING else '关闭'}")
- print("=" * 50)
- class FutureMA5DeviationAnalyzer:
- """期货5日移动平均线偏离分析器"""
-
- def __init__(self, config=None):
- """初始化分析器"""
- # 使用配置类参数
- if config is None:
- config = AnalysisConfig
-
- self.config = config
- self.historical_days = config.HISTORICAL_DAYS
- self.forward_days = config.FORWARD_DAYS
- self.deviation_percentile = config.DEVIATION_PERCENTILE
- self.forward_monitor_days = config.FORWARD_MONITOR_DAYS
- self.analysis_start_date = config.ANALYSIS_START_DATE
- self.analysis_end_date = config.ANALYSIS_END_DATE
- self.max_analysis_contracts = config.MAX_ANALYSIS_CONTRACTS
- self.ma_period = config.MA_PERIOD
- self.output_encoding = config.OUTPUT_ENCODING
- self.verbose_logging = config.VERBOSE_LOGGING
-
- # 期货品种与交易所的映射关系
- self.exchange_map = {
- # 大连商品交易所 (XDCE)
- 'A': 'XDCE', 'B': 'XDCE', 'C': 'XDCE', 'CS': 'XDCE', 'FB': 'XDCE',
- 'I': 'XDCE', 'J': 'XDCE', 'JD': 'XDCE', 'JM': 'XDCE', 'L': 'XDCE',
- 'M': 'XDCE', 'P': 'XDCE', 'PP': 'XDCE', 'PG': 'XDCE', 'RR': 'XDCE',
- 'V': 'XDCE', 'Y': 'XDCE', 'EB': 'XDCE', 'EG': 'XDCE', 'LH': 'XDCE',
-
- # 郑州商品交易所 (XZCE)
- 'AP': 'XZCE', 'CF': 'XZCE', 'CY': 'XZCE', 'FG': 'XZCE', 'JR': 'XZCE',
- 'LR': 'XZCE', 'MA': 'XZCE', 'OI': 'XZCE', 'PM': 'XZCE', 'RI': 'XZCE',
- 'RM': 'XZCE', 'RS': 'XZCE', 'SF': 'XZCE', 'SM': 'XZCE', 'SR': 'XZCE',
- 'TA': 'XZCE', 'WH': 'XZCE', 'ZC': 'XZCE', 'CJ': 'XZCE',
- 'ME': 'XZCE', 'PF': 'XZCE', 'PK': 'XZCE', 'RO': 'XZCE', 'SA': 'XZCE',
- 'TC': 'XZCE', 'UR': 'XZCE', 'WS': 'XZCE', 'WT': 'XZCE',
-
- # 上海期货交易所 (XSGE)
- 'AG': 'XSGE', 'AL': 'XSGE', 'AU': 'XSGE', 'BU': 'XSGE', 'CU': 'XSGE',
- 'FU': 'XSGE', 'HC': 'XSGE', 'NI': 'XSGE', 'PB': 'XSGE', 'RB': 'XSGE',
- 'RU': 'XSGE', 'SN': 'XSGE', 'SP': 'XSGE', 'SS': 'XSGE', 'WR': 'XSGE',
- 'ZN': 'XSGE',
-
- # 上海国际能源交易中心 (XINE)
- 'BC': 'XINE', 'LU': 'XINE', 'NR': 'XINE', 'SC': 'XINE',
-
- # 中金所 (CCFX)
- 'IC': 'CCFX', 'IF': 'CCFX', 'IH': 'CCFX', 'T': 'CCFX', 'TF': 'CCFX', 'TS': 'CCFX',
-
- # 广期所 (GFEX)
- 'SI': 'GFEX', 'LC': 'GFEX', 'PS': 'GFEX'
- }
-
- # 存储结果的字典
- self.contract_data = {} # 主合约数据结构
- self.signal_results = {} # 信号检测结果
-
- if self.verbose_logging:
- print("初始化期货5日移动平均线偏离分析器")
- print(f"配置参数 - 历史分析期: {self.historical_days}天, 前向分析期: {self.forward_days}天")
- print(f"偏差阈值: {self.deviation_percentile}分位数, 监控窗口: {self.forward_monitor_days}个交易日")
- print(f"最大分析合约数: {self.max_analysis_contracts}, 移动平均线周期: {self.ma_period}日")
- print(f"合约类型: 主力合约")
-
- def _is_trading_day(self, check_date):
- """
- 检查是否为交易日
-
- 参数:
- check_date (date): 要检查的日期
-
- 返回:
- bool: 是否为交易日
- """
- try:
- # 使用聚宽API检查交易日
- trade_days = get_trade_days(end_date=check_date, count=1)
- if len(trade_days) > 0:
- return trade_days[0].date() == check_date
- return False
- except:
- # 如果API调用失败,简单判断是否为工作日
- return check_date.weekday() < 5
-
- def _get_symbol_dominant_history(self, symbol, start_date, end_date):
- """
- 获取单个标的的主力合约历史变化
-
- 参数:
- symbol (str): 标的编码
- start_date (date): 开始日期
- end_date (date): 结束日期
-
- 返回:
- list: 主力合约记录列表
- """
- results = []
- current_date = start_date
- current_dominant = None
- period_start = None
-
- if self.verbose_logging:
- print(f" 获取 {symbol} 主力合约历史...")
-
- # 按日遍历时间范围
- while current_date <= end_date:
-
- # 跳过非交易日
- if not self._is_trading_day(current_date):
- current_date += datetime.timedelta(days=1)
- continue
-
- try:
- # 获取当日主力合约
- dominant_contract = get_dominant_future(symbol, current_date)
-
- if dominant_contract is None:
- current_date += datetime.timedelta(days=1)
- continue
-
- # 如果是新的主力合约
- if dominant_contract != current_dominant:
-
- # 如果不是第一个合约,结束上一个合约的记录
- if current_dominant is not None and period_start is not None:
- results.append({
- 'symbol': symbol,
- 'dominant_contract': current_dominant,
- 'start_date': period_start,
- 'end_date': current_date - datetime.timedelta(days=1)
- })
-
- # 开始新合约的记录
- current_dominant = dominant_contract
- period_start = current_date
-
- if self.verbose_logging:
- print(f" {current_date}: 主力合约变更为 {dominant_contract}")
-
- except Exception as e:
- if self.verbose_logging:
- print(f" 获取 {current_date} 的主力合约时出错: {str(e)}")
-
- current_date += datetime.timedelta(days=1)
-
- # 处理最后一个合约
- if current_dominant is not None and period_start is not None:
- results.append({
- 'symbol': symbol,
- 'dominant_contract': current_dominant,
- 'start_date': period_start,
- 'end_date': end_date
- })
-
- return results
-
- def get_dominant_contracts_history(self, symbol_list, start_date, end_date):
- """
- 获取指定时间范围内多个标的的主力合约历史变化
-
- 参数:
- symbol_list (list): 标的编码列表
- start_date (date): 开始日期
- end_date (date): 结束日期
-
- 返回:
- pandas.DataFrame: 包含主力合约变化历史的数据框
- """
- if self.verbose_logging:
- print(f"获取主力合约数据...")
- print(f"标的列表: {symbol_list}")
- print(f"时间范围: {start_date} 至 {end_date}")
-
- # 存储所有结果的列表
- all_results = []
-
- # 逐个处理每个标的
- for symbol in symbol_list:
- if self.verbose_logging:
- print(f"\n处理标的: {symbol}")
-
- try:
- # 获取该标的在指定时间范围内的主力合约历史
- symbol_results = self._get_symbol_dominant_history(symbol, start_date, end_date)
- all_results.extend(symbol_results)
-
- except Exception as e:
- if self.verbose_logging:
- print(f"处理标的 {symbol} 时发生错误: {str(e)}")
- continue
-
- # 将结果转换为DataFrame
- if all_results:
- result_df = pd.DataFrame(all_results)
- result_df = result_df.sort_values(['symbol', 'start_date']).reset_index(drop=True)
-
- if self.verbose_logging:
- print(f"\n成功获取 {len(result_df)} 条主力合约记录")
- return result_df
- else:
- if self.verbose_logging:
- print("\n未获取到任何主力合约数据")
- return pd.DataFrame(columns=['symbol', 'dominant_contract', 'start_date', 'end_date'])
- def build_contract_structure(self):
- """
- 构建期货主力合约数据结构
- 返回结构:Key=完整合约代码, Value=[start_date, end_date]
- """
- if self.verbose_logging:
- print("\n=== 步骤1: 构建期货主力合约数据结构 ===")
-
- # 分析时间范围
- analysis_start = self.analysis_start_date
- analysis_end = self.analysis_end_date
-
- if self.verbose_logging:
- print(f"分析时间范围: {analysis_start.strftime('%Y-%m-%d')} 至 {analysis_end.strftime('%Y-%m-%d')}")
- print(f"合约类型: 主力合约")
-
- # 构建主力合约数据结构
- contract_dict = self._build_dominant_contract_structure(analysis_start, analysis_end)
-
- self.contract_data = contract_dict
- if self.verbose_logging:
- print(f"构建完成,共{len(contract_dict)}个期货合约")
- return contract_dict
-
- def _build_dominant_contract_structure(self, analysis_start, analysis_end):
- """构建基于主力合约的数据结构"""
- if self.verbose_logging:
- print("使用主力合约获取逻辑...")
-
- # 获取要分析的期货品种列表
- symbol_list = list(self.exchange_map.keys())[:self.max_analysis_contracts]
-
- # 获取主力合约历史数据
- dominant_df = self.get_dominant_contracts_history(
- symbol_list,
- analysis_start.date(),
- analysis_end.date()
- )
-
- contract_dict = {}
-
- if not dominant_df.empty:
- # 将DataFrame转换为字典结构
- for _, row in dominant_df.iterrows():
- contract_code = row['dominant_contract']
- start_date = row['start_date'] if isinstance(row['start_date'], datetime.date) else row['start_date'].date()
- end_date = row['end_date'] if isinstance(row['end_date'], datetime.date) else row['end_date'].date()
-
- contract_dict[contract_code] = [start_date, end_date]
-
- if self.verbose_logging:
- print(f" 添加主力合约: {contract_code} ({start_date} 至 {end_date})")
-
- return contract_dict
-
- def collect_extended_data(self):
- """
- 数据收集:为每个合约收集扩展时间范围的数据
- 扩展期限:从(start_date-365天)到(end_date+100天)
- """
- if self.verbose_logging:
- print("\n=== 步骤2: 收集扩展时间范围的数据 ===")
-
- extended_data = {}
-
- for contract_code, period in self.contract_data.items():
- start_date_1, end_date_1 = period
-
- # 计算扩展期限
- start_date_2 = start_date_1 - datetime.timedelta(days=self.historical_days)
- end_date_2 = end_date_1 + datetime.timedelta(days=self.forward_days)
-
- if self.verbose_logging:
- print(f"获取 {contract_code} 数据")
- print(f" 原始期间: {start_date_1} 至 {end_date_1}")
- print(f" 扩展期间: {start_date_2} 至 {end_date_2}")
-
- try:
- # 获取日线数据
- data = get_price(
- contract_code,
- start_date=start_date_2,
- end_date=end_date_2,
- frequency='daily',
- fields=['open', 'close', 'high', 'low', 'volume'],
- skip_paused=False,
- panel=False
- )
-
- if data is not None and len(data) > 0:
- # 计算移动平均线
- data['K5'] = data['close'].rolling(window=self.ma_period).mean()
-
- extended_data[contract_code] = {
- 'data': data,
- 'original_start': start_date_1,
- 'original_end': end_date_1,
- 'extended_start': start_date_2,
- 'extended_end': end_date_2
- }
-
- if self.verbose_logging:
- print(f" 成功获取{len(data)}条数据记录")
- else:
- if self.verbose_logging:
- print(f" 未获取到数据")
-
- except Exception as e:
- if self.verbose_logging:
- print(f" 获取数据失败: {str(e)}")
- continue
-
- if self.verbose_logging:
- print(f"数据收集完成,共{len(extended_data)}个合约有效数据")
- return extended_data
-
- def calculate_k5_distances(self, extended_data):
- """
- 计算距离:每个交易日价格相对K5的上下偏离距离
- """
- if self.verbose_logging:
- print("\n=== 步骤3: 计算K5偏离距离 ===")
-
- distance_data = {}
-
- for contract_code, contract_info in extended_data.items():
- data = contract_info['data']
- if self.verbose_logging:
- print(f"计算 {contract_code} 的K5距离")
-
- # 初始化距离列
- data['upper_distance'] = np.nan
- data['lower_distance'] = np.nan
-
- # 计算每日的上下偏离距离
- for idx, row in data.iterrows():
- if pd.isna(row['K5']): # K5不可用则跳过
- continue
-
- K5 = row['K5']
- high = row['high']
- low = row['low']
-
- # 计算K5与高低价的差值
- k5_minus_high = K5 - high
- k5_minus_low = K5 - low
-
- if k5_minus_high > 0 and k5_minus_low > 0:
- # 高低都低于K5
- data.loc[idx, 'upper_distance'] = np.nan
- data.loc[idx, 'lower_distance'] = abs(K5 - low)
-
- elif k5_minus_high < 0 and k5_minus_low < 0:
- # 高低都在K5以上
- data.loc[idx, 'upper_distance'] = abs(K5 - high)
- data.loc[idx, 'lower_distance'] = np.nan
-
- elif k5_minus_high > 0 and k5_minus_low < 0:
- # 高低于K5,低高于K5(K5介于高与低之间)
- data.loc[idx, 'upper_distance'] = abs(K5 - high)
- data.loc[idx, 'lower_distance'] = abs(K5 - low)
-
- # 处理零情况
- if abs(k5_minus_high) < 1e-6: # high == K5
- data.loc[idx, 'upper_distance'] = 0.0
- if abs(k5_minus_low) < 1e-6: # low == K5
- data.loc[idx, 'lower_distance'] = 0.0
-
- distance_data[contract_code] = contract_info
-
- # 统计有效距离数据
- valid_upper = data['upper_distance'].notna().sum()
- valid_lower = data['lower_distance'].notna().sum()
- if self.verbose_logging:
- print(f" 上偏离距离有效数据: {valid_upper}条, 下偏离距离有效数据: {valid_lower}条")
-
- if self.verbose_logging:
- print("K5距离计算完成")
- return distance_data
-
- def determine_thresholds(self, distance_data):
- """
- 阈值确定:使用Range_1数据计算百分位数阈值
- Range_1: start_date_2 <= date < start_date_1
- """
- if self.verbose_logging:
- print(f"\n=== 步骤4: 确定{self.deviation_percentile}分位数阈值 ===")
-
- threshold_data = {}
-
- for contract_code, contract_info in distance_data.items():
- data = contract_info['data']
- start_date_1 = contract_info['original_start']
- start_date_2 = contract_info['extended_start']
-
- if self.verbose_logging:
- print(f"计算 {contract_code} 的阈值")
- print(f" Range_1期间: {start_date_2} 至 {start_date_1}")
-
- # 筛选Range_1数据
- range_1_mask = (data.index.date >= start_date_2) & (data.index.date < start_date_1)
- range_1_data = data[range_1_mask]
-
- if len(range_1_data) == 0:
- if self.verbose_logging:
- print(f" Range_1无有效数据,跳过")
- continue
-
- # 计算上下偏离距离的分布和阈值
- upper_distances = range_1_data['upper_distance'].dropna()
- lower_distances = range_1_data['lower_distance'].dropna()
-
- # 百分位数阈值
- percentile_value = self.deviation_percentile / 100.0
- upper_threshold = upper_distances.quantile(percentile_value) if len(upper_distances) > 0 else np.nan
- lower_threshold = lower_distances.quantile(percentile_value) if len(lower_distances) > 0 else np.nan
-
- threshold_data[contract_code] = {
- 'contract_info': contract_info,
- 'upper_threshold': upper_threshold,
- 'lower_threshold': lower_threshold,
- 'range_1_stats': {
- 'total_days': len(range_1_data),
- 'upper_valid_days': len(upper_distances),
- 'lower_valid_days': len(lower_distances),
- 'upward_deviation_days': len(lower_distances[lower_distances >= lower_threshold]) if not pd.isna(lower_threshold) else 0,
- 'downward_deviation_days': len(upper_distances[upper_distances >= upper_threshold]) if not pd.isna(upper_threshold) else 0
- }
- }
-
- if self.verbose_logging:
- print(f" 上偏离{self.deviation_percentile}分位数阈值: {upper_threshold:.4f}" if not pd.isna(upper_threshold) else f" 上偏离阈值: 无效")
- print(f" 下偏离{self.deviation_percentile}分位数阈值: {lower_threshold:.4f}" if not pd.isna(lower_threshold) else f" 下偏离阈值: 无效")
-
- if self.verbose_logging:
- print("阈值确定完成")
- return threshold_data
-
- def detect_signals_and_analyze(self, threshold_data):
- """
- 信号检测与分析:在Range_2期间扫描极端偏差并分析后续走势
- Range_2: start_date_1 <= date <= end_date_2
- """
- if self.verbose_logging:
- print("\n=== 步骤5: 信号检测与前瞻性分析 ===")
-
- signal_dict = {}
-
- for contract_code, threshold_info in threshold_data.items():
- contract_info = threshold_info['contract_info']
- data = contract_info['data']
- start_date_1 = contract_info['original_start']
- end_date_2 = contract_info['extended_end']
-
- upper_threshold = threshold_info['upper_threshold']
- lower_threshold = threshold_info['lower_threshold']
-
- if self.verbose_logging:
- print(f"\n分析 {contract_code} 的信号")
- print(f" Range_2期间: {start_date_1} 至 {end_date_2}")
-
- # 筛选Range_2数据
- range_2_mask = (data.index.date >= start_date_1) & (data.index.date <= end_date_2)
- range_2_data = data[range_2_mask]
-
- contract_signals = {}
-
- # 扫描极端偏差天数
- for idx, (signal_date, row) in enumerate(range_2_data.iterrows()):
- upper_distance = row['upper_distance']
- lower_distance = row['lower_distance']
-
- # 检查是否触发阈值
- trigger_upper = not pd.isna(upper_distance) and not pd.isna(upper_threshold) and upper_distance >= upper_threshold
- trigger_lower = not pd.isna(lower_distance) and not pd.isna(lower_threshold) and lower_distance >= lower_threshold
-
- if trigger_upper or trigger_lower:
- # 确定偏离方向和参考价格
- if trigger_lower: # lower_distance触发表示向上偏离(价格在K5之上)
- deviation_direction = "up"
- deviation_distance = lower_distance
- reference_category = "low"
- reference_price = row['low']
- else: # upper_distance触发表示向下偏离(价格在K5之下)
- deviation_direction = "down"
- deviation_distance = upper_distance
- reference_category = "high"
- reference_price = row['high']
-
- if self.verbose_logging:
- print(f" 发现信号: {signal_date.strftime('%Y-%m-%d')} {deviation_direction}偏离 距离{deviation_distance:.4f}")
-
- # 前瞻性分析
- forward_analysis = self._analyze_forward_performance(
- range_2_data, idx, deviation_direction, reference_price, self.forward_monitor_days
- )
-
- # 计算最大利润和风险
- max_profit_info = self._find_max_profit(forward_analysis)
- max_risk_info = self._find_max_risk(forward_analysis)
-
- contract_signals[signal_date] = {
- "deviation_direction": deviation_direction,
- "deviation_distance": float(deviation_distance),
- "reference_category": reference_category,
- "reference_price": float(reference_price),
- "forward_analysis": forward_analysis,
- "max_profit": max_profit_info,
- "max_risk": max_risk_info
- }
-
- if contract_signals:
- signal_dict[contract_code] = contract_signals
- if self.verbose_logging:
- print(f" 检测到{len(contract_signals)}个有效信号")
- else:
- if self.verbose_logging:
- print(f" 未检测到有效信号")
-
- if self.verbose_logging:
- print(f"\n信号检测完成,共{sum(len(signals) for signals in signal_dict.values())}个信号")
- self.signal_results = signal_dict
- return signal_dict
-
- def _analyze_forward_performance(self, data, signal_idx, deviation_direction, reference_price, monitor_days):
- """分析信号后续N个交易日的表现"""
- forward_analysis = {}
-
- for i in range(1, monitor_days + 1):
- future_idx = signal_idx + i
- if future_idx < len(data):
- future_row = data.iloc[future_idx]
- future_date = data.index[future_idx]
-
- # 根据偏离方向选择对应价格
- if deviation_direction == "up":
- corresponding_price = future_row['low']
- corresponding_return = (reference_price - corresponding_price) / reference_price
- reverse_return = (future_row['high'] - reference_price) / reference_price
- else: # down
- corresponding_price = future_row['high']
- corresponding_return = (corresponding_price - reference_price) / reference_price
- reverse_return = (reference_price - future_row['low']) / reference_price
-
- forward_analysis[future_date] = {
- "date": future_date.strftime('%Y-%m-%d'),
- "corresponding_price": float(corresponding_price),
- "corresponding_return": float(corresponding_return),
- "reverse_category": "high" if deviation_direction == "up" else "low",
- "reverse_return": float(reverse_return)
- }
-
- return forward_analysis
-
- def _find_max_profit(self, forward_analysis):
- """找到最大利润日期和收益率"""
- if not forward_analysis:
- return {"date": None, "return": 0.0}
-
- max_return = max(day_data["corresponding_return"] for day_data in forward_analysis.values())
- max_date = None
-
- for date, day_data in forward_analysis.items():
- if day_data["corresponding_return"] == max_return:
- max_date = date.strftime('%Y-%m-%d')
- break
-
- return {"date": max_date, "return": float(max_return)}
-
- def _find_max_risk(self, forward_analysis):
- """找到最大风险日期和收益率"""
- if not forward_analysis:
- return {"date": None, "return": 0.0}
-
- max_risk = max(day_data["reverse_return"] for day_data in forward_analysis.values())
- max_risk_date = None
-
- for date, day_data in forward_analysis.items():
- if day_data["reverse_return"] == max_risk:
- max_risk_date = date.strftime('%Y-%m-%d')
- break
-
- return {"date": max_risk_date, "return": float(max_risk)}
-
- def generate_csv_outputs(self, threshold_data, signal_dict):
- """生成两个CSV文件输出"""
- if self.verbose_logging:
- print("\n=== 步骤6: 生成CSV输出文件 ===")
-
- timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
-
- # CSV 1: Range_1偏差统计
- self._generate_range1_stats_csv(threshold_data, timestamp)
-
- # CSV 2: 信号分析结果
- self._generate_signal_analysis_csv(signal_dict, timestamp)
-
- if self.verbose_logging:
- print("CSV文件生成完成")
-
- def _generate_range1_stats_csv(self, threshold_data, timestamp):
- """生成Range_1偏差统计CSV"""
- stats_data = []
-
- for contract_code, threshold_info in threshold_data.items():
- stats = threshold_info['range_1_stats']
- total_days = stats['total_days']
-
- upward_ratio = stats['upward_deviation_days'] / total_days if total_days > 0 else 0
- downward_ratio = stats['downward_deviation_days'] / total_days if total_days > 0 else 0
-
- stats_data.append({
- 'contract_code': contract_code,
- 'upward_deviation_ratio': round(upward_ratio, 4),
- 'downward_deviation_ratio': round(downward_ratio, 4)
- })
-
- if stats_data:
- stats_df = pd.DataFrame(stats_data)
- filename = f'range1_deviation_stats_{timestamp}.csv'
- stats_df.to_csv(filename, index=False, encoding=self.output_encoding)
- if self.verbose_logging:
- print(f"Range_1偏差统计保存至: {filename}")
-
- def _generate_signal_analysis_csv(self, signal_dict, timestamp):
- """生成信号分析结果CSV"""
- signal_data = []
-
- for contract_code, contract_signals in signal_dict.items():
- for signal_date, signal_info in contract_signals.items():
- signal_data.append({
- 'contract_code': contract_code,
- 'signal_date': signal_date.strftime('%Y-%m-%d'),
- 'deviation_direction': signal_info['deviation_direction'],
- 'max_profit_date': signal_info['max_profit']['date'],
- 'max_profit_return': round(signal_info['max_profit']['return'], 4),
- 'max_risk_date': signal_info['max_risk']['date'],
- 'max_risk_return': round(signal_info['max_risk']['return'], 4)
- })
-
- if signal_data:
- signal_df = pd.DataFrame(signal_data)
- filename = f'signal_analysis_results_{timestamp}.csv'
- signal_df.to_csv(filename, index=False, encoding=self.output_encoding)
- if self.verbose_logging:
- print(f"信号分析结果保存至: {filename}")
-
- def run_complete_analysis(self):
- """执行完整的分析流程"""
- if self.verbose_logging:
- print("开始执行期货5日移动平均线偏离分析")
- print("=" * 60)
-
- try:
- # 步骤1: 构建合约数据结构
- contract_structure = self.build_contract_structure()
-
- # 步骤2: 收集扩展数据
- extended_data = self.collect_extended_data()
- if not extended_data:
- if self.verbose_logging:
- print("未获取到有效数据,分析终止")
- return
-
- # 步骤3: 计算K5距离
- distance_data = self.calculate_k5_distances(extended_data)
-
- # 步骤4: 确定阈值
- threshold_data = self.determine_thresholds(distance_data)
- if not threshold_data:
- if self.verbose_logging:
- print("未能确定有效阈值,分析终止")
- return
-
- # 步骤5: 信号检测与分析
- signal_dict = self.detect_signals_and_analyze(threshold_data)
-
- # 步骤6: 生成输出文件
- self.generate_csv_outputs(threshold_data, signal_dict)
-
- # 分析汇总
- total_contracts = len(contract_structure)
- valid_contracts = len(threshold_data)
- total_signals = sum(len(signals) for signals in signal_dict.values())
-
- if self.verbose_logging:
- print("\n" + "=" * 60)
- print("分析完成汇总:")
- print(f"总合约数: {total_contracts}")
- print(f"有效合约数: {valid_contracts}")
- print(f"检测信号数: {total_signals}")
-
- return {
- 'contract_structure': contract_structure,
- 'threshold_data': threshold_data,
- 'signal_results': signal_dict,
- 'summary': {
- 'total_contracts': total_contracts,
- 'valid_contracts': valid_contracts,
- 'total_signals': total_signals
- }
- }
-
- except Exception as e:
- if self.verbose_logging:
- print(f"分析过程中出现错误: {str(e)}")
- import traceback
- traceback.print_exc()
- return None
- # =====================================================================================
- # 主程序入口
- # =====================================================================================
- def run_ma5_deviation_analysis(config=None):
- """运行期货5日移动平均线偏离分析"""
- if config is None:
- config = AnalysisConfig
-
- # 打印配置信息
- config.print_config()
-
- # 创建分析器并运行
- analyzer = FutureMA5DeviationAnalyzer(config)
- results = analyzer.run_complete_analysis()
- return results
- # 执行分析
- if __name__ == "__main__":
- print("期货5日移动平均线偏离分析工具")
- print("研究期货合约在大幅偏离K5后的反转走势规律")
- print("使用动态获取的主力合约进行精准分析")
- print("适用于聚宽在线研究平台")
-
- results = run_ma5_deviation_analysis()
-
- if results:
- print("\n✅ 分析执行成功!")
- summary = results['summary']
- print(f"📊 结果摘要:")
- print(f" - 分析合约数: {summary['total_contracts']}")
- print(f" - 有效数据合约: {summary['valid_contracts']}")
- print(f" - 检测信号总数: {summary['total_signals']}")
- else:
- print("\n❌ 分析执行失败,请检查错误信息")
|