future_ma5_deviation_analysis.py 35 KB


  1. """
  2. 期货合约5日移动平均线偏离分析
  3. 研究期货合约在大幅偏离其5天移动平均线(K5)后是否出现重大反转走势
  4. 本程序实现完整的分析流程:
  5. 1. 数据结构设置 - 获取主力期货合约数据
  6. 2. 数据收集 - 扩展时间范围获取OHLC数据
  7. 3. 距离计算 - 计算价格相对K5的上下偏离距离
  8. 4. 阈值确定 - 使用百分位数作为极端偏差阈值
  9. 5. 信号检测与分析 - 识别极端偏差并分析后续走势
  10. 6. 结果输出 - 生成Range_1偏差统计和信号分析结果CSV
  11. 注:程序使用动态获取的主力合约进行分析,确保分析结果基于真实的交易活动
  12. 作者: jukuan研究团队
  13. 日期: 2025-09
  14. 适用平台: 聚宽在线研究平台
  15. """
  16. import pandas as pd
  17. import numpy as np
  18. from jqdata import *
  19. import datetime
  20. import warnings
  21. warnings.filterwarnings('ignore')
  22. # =====================================================================================
  23. # 分析配置参数 - 集中配置部分
  24. # =====================================================================================
  25. class AnalysisConfig:
  26. """期货5日移动平均线偏离分析配置参数"""
  27. # ==================== 时间范围设置 ====================
  28. ANALYSIS_START_DATE = datetime.datetime(2025, 8, 1) # 分析开始日期
  29. ANALYSIS_END_DATE = datetime.datetime(2025, 8, 20) # 分析结束日期
  30. # ==================== 时间扩展参数 ====================
  31. HISTORICAL_DAYS = 365 # 历史分析期:优势期前天数
  32. FORWARD_DAYS = 20 # 正向分析期:优势期后天数
  33. FORWARD_MONITOR_DAYS = 10 # 前向监控窗口:交易日数
  34. # ==================== 偏差阈值设置 ====================
  35. DEVIATION_PERCENTILE = 90 # 偏差阈值百分位数(范围:0-100)
  36. # ==================== 分析规模控制 ====================
  37. MAX_ANALYSIS_CONTRACTS = 10 # 最大分析合约数量限制
  38. # ==================== 移动平均线参数 ====================
  39. MA_PERIOD = 5 # K线移动平均线周期(天数)
  40. # ==================== 输出设置 ====================
  41. OUTPUT_ENCODING = 'utf-8-sig' # 输出文件编码格式
  42. VERBOSE_LOGGING = True # 是否打印详细日志
  43. @classmethod
  44. def print_config(cls):
  45. """打印当前配置信息"""
  46. print("=== 期货5日移动平均线偏离分析配置 ===")
  47. print(f"分析时间范围: {cls.ANALYSIS_START_DATE.strftime('%Y-%m-%d')} 至 {cls.ANALYSIS_END_DATE.strftime('%Y-%m-%d')}")
  48. print(f"历史分析期: {cls.HISTORICAL_DAYS}天")
  49. print(f"前向分析期: {cls.FORWARD_DAYS}天")
  50. print(f"前向监控窗口: {cls.FORWARD_MONITOR_DAYS}个交易日")
  51. print(f"偏差阈值百分位数: {cls.DEVIATION_PERCENTILE}%")
  52. print(f"最大分析合约数: {cls.MAX_ANALYSIS_CONTRACTS}")
  53. print(f"合约类型: 主力合约")
  54. print(f"移动平均线周期: {cls.MA_PERIOD}日")
  55. print(f"详细日志: {'开启' if cls.VERBOSE_LOGGING else '关闭'}")
  56. print("=" * 50)
  57. class FutureMA5DeviationAnalyzer:
  58. """期货5日移动平均线偏离分析器"""
  59. def __init__(self, config=None):
  60. """初始化分析器"""
  61. # 使用配置类参数
  62. if config is None:
  63. config = AnalysisConfig
  64. self.config = config
  65. self.historical_days = config.HISTORICAL_DAYS
  66. self.forward_days = config.FORWARD_DAYS
  67. self.deviation_percentile = config.DEVIATION_PERCENTILE
  68. self.forward_monitor_days = config.FORWARD_MONITOR_DAYS
  69. self.analysis_start_date = config.ANALYSIS_START_DATE
  70. self.analysis_end_date = config.ANALYSIS_END_DATE
  71. self.max_analysis_contracts = config.MAX_ANALYSIS_CONTRACTS
  72. self.ma_period = config.MA_PERIOD
  73. self.output_encoding = config.OUTPUT_ENCODING
  74. self.verbose_logging = config.VERBOSE_LOGGING
  75. # 期货品种与交易所的映射关系
  76. self.exchange_map = {
  77. # 大连商品交易所 (XDCE)
  78. 'A': 'XDCE', 'B': 'XDCE', 'C': 'XDCE', 'CS': 'XDCE', 'FB': 'XDCE',
  79. 'I': 'XDCE', 'J': 'XDCE', 'JD': 'XDCE', 'JM': 'XDCE', 'L': 'XDCE',
  80. 'M': 'XDCE', 'P': 'XDCE', 'PP': 'XDCE', 'PG': 'XDCE', 'RR': 'XDCE',
  81. 'V': 'XDCE', 'Y': 'XDCE', 'EB': 'XDCE', 'EG': 'XDCE', 'LH': 'XDCE',
  82. # 郑州商品交易所 (XZCE)
  83. 'AP': 'XZCE', 'CF': 'XZCE', 'CY': 'XZCE', 'FG': 'XZCE', 'JR': 'XZCE',
  84. 'LR': 'XZCE', 'MA': 'XZCE', 'OI': 'XZCE', 'PM': 'XZCE', 'RI': 'XZCE',
  85. 'RM': 'XZCE', 'RS': 'XZCE', 'SF': 'XZCE', 'SM': 'XZCE', 'SR': 'XZCE',
  86. 'TA': 'XZCE', 'WH': 'XZCE', 'ZC': 'XZCE', 'CJ': 'XZCE',
  87. 'ME': 'XZCE', 'PF': 'XZCE', 'PK': 'XZCE', 'RO': 'XZCE', 'SA': 'XZCE',
  88. 'TC': 'XZCE', 'UR': 'XZCE', 'WS': 'XZCE', 'WT': 'XZCE',
  89. # 上海期货交易所 (XSGE)
  90. 'AG': 'XSGE', 'AL': 'XSGE', 'AU': 'XSGE', 'BU': 'XSGE', 'CU': 'XSGE',
  91. 'FU': 'XSGE', 'HC': 'XSGE', 'NI': 'XSGE', 'PB': 'XSGE', 'RB': 'XSGE',
  92. 'RU': 'XSGE', 'SN': 'XSGE', 'SP': 'XSGE', 'SS': 'XSGE', 'WR': 'XSGE',
  93. 'ZN': 'XSGE',
  94. # 上海国际能源交易中心 (XINE)
  95. 'BC': 'XINE', 'LU': 'XINE', 'NR': 'XINE', 'SC': 'XINE',
  96. # 中金所 (CCFX)
  97. 'IC': 'CCFX', 'IF': 'CCFX', 'IH': 'CCFX', 'T': 'CCFX', 'TF': 'CCFX', 'TS': 'CCFX',
  98. # 广期所 (GFEX)
  99. 'SI': 'GFEX', 'LC': 'GFEX', 'PS': 'GFEX'
  100. }
  101. # 存储结果的字典
  102. self.contract_data = {} # 主合约数据结构
  103. self.signal_results = {} # 信号检测结果
  104. if self.verbose_logging:
  105. print("初始化期货5日移动平均线偏离分析器")
  106. print(f"配置参数 - 历史分析期: {self.historical_days}天, 前向分析期: {self.forward_days}天")
  107. print(f"偏差阈值: {self.deviation_percentile}分位数, 监控窗口: {self.forward_monitor_days}个交易日")
  108. print(f"最大分析合约数: {self.max_analysis_contracts}, 移动平均线周期: {self.ma_period}日")
  109. print(f"合约类型: 主力合约")
  110. def _is_trading_day(self, check_date):
  111. """
  112. 检查是否为交易日
  113. 参数:
  114. check_date (date): 要检查的日期
  115. 返回:
  116. bool: 是否为交易日
  117. """
  118. try:
  119. # 使用聚宽API检查交易日
  120. trade_days = get_trade_days(end_date=check_date, count=1)
  121. if len(trade_days) > 0:
  122. return trade_days[0].date() == check_date
  123. return False
  124. except:
  125. # 如果API调用失败,简单判断是否为工作日
  126. return check_date.weekday() < 5
  127. def _get_symbol_dominant_history(self, symbol, start_date, end_date):
  128. """
  129. 获取单个标的的主力合约历史变化
  130. 参数:
  131. symbol (str): 标的编码
  132. start_date (date): 开始日期
  133. end_date (date): 结束日期
  134. 返回:
  135. list: 主力合约记录列表
  136. """
  137. results = []
  138. current_date = start_date
  139. current_dominant = None
  140. period_start = None
  141. if self.verbose_logging:
  142. print(f" 获取 {symbol} 主力合约历史...")
  143. # 按日遍历时间范围
  144. while current_date <= end_date:
  145. # 跳过非交易日
  146. if not self._is_trading_day(current_date):
  147. current_date += datetime.timedelta(days=1)
  148. continue
  149. try:
  150. # 获取当日主力合约
  151. dominant_contract = get_dominant_future(symbol, current_date)
  152. if dominant_contract is None:
  153. current_date += datetime.timedelta(days=1)
  154. continue
  155. # 如果是新的主力合约
  156. if dominant_contract != current_dominant:
  157. # 如果不是第一个合约,结束上一个合约的记录
  158. if current_dominant is not None and period_start is not None:
  159. results.append({
  160. 'symbol': symbol,
  161. 'dominant_contract': current_dominant,
  162. 'start_date': period_start,
  163. 'end_date': current_date - datetime.timedelta(days=1)
  164. })
  165. # 开始新合约的记录
  166. current_dominant = dominant_contract
  167. period_start = current_date
  168. if self.verbose_logging:
  169. print(f" {current_date}: 主力合约变更为 {dominant_contract}")
  170. except Exception as e:
  171. if self.verbose_logging:
  172. print(f" 获取 {current_date} 的主力合约时出错: {str(e)}")
  173. current_date += datetime.timedelta(days=1)
  174. # 处理最后一个合约
  175. if current_dominant is not None and period_start is not None:
  176. results.append({
  177. 'symbol': symbol,
  178. 'dominant_contract': current_dominant,
  179. 'start_date': period_start,
  180. 'end_date': end_date
  181. })
  182. return results
  183. def get_dominant_contracts_history(self, symbol_list, start_date, end_date):
  184. """
  185. 获取指定时间范围内多个标的的主力合约历史变化
  186. 参数:
  187. symbol_list (list): 标的编码列表
  188. start_date (date): 开始日期
  189. end_date (date): 结束日期
  190. 返回:
  191. pandas.DataFrame: 包含主力合约变化历史的数据框
  192. """
  193. if self.verbose_logging:
  194. print(f"获取主力合约数据...")
  195. print(f"标的列表: {symbol_list}")
  196. print(f"时间范围: {start_date} 至 {end_date}")
  197. # 存储所有结果的列表
  198. all_results = []
  199. # 逐个处理每个标的
  200. for symbol in symbol_list:
  201. if self.verbose_logging:
  202. print(f"\n处理标的: {symbol}")
  203. try:
  204. # 获取该标的在指定时间范围内的主力合约历史
  205. symbol_results = self._get_symbol_dominant_history(symbol, start_date, end_date)
  206. all_results.extend(symbol_results)
  207. except Exception as e:
  208. if self.verbose_logging:
  209. print(f"处理标的 {symbol} 时发生错误: {str(e)}")
  210. continue
  211. # 将结果转换为DataFrame
  212. if all_results:
  213. result_df = pd.DataFrame(all_results)
  214. result_df = result_df.sort_values(['symbol', 'start_date']).reset_index(drop=True)
  215. if self.verbose_logging:
  216. print(f"\n成功获取 {len(result_df)} 条主力合约记录")
  217. return result_df
  218. else:
  219. if self.verbose_logging:
  220. print("\n未获取到任何主力合约数据")
  221. return pd.DataFrame(columns=['symbol', 'dominant_contract', 'start_date', 'end_date'])
  222. def build_contract_structure(self):
  223. """
  224. 构建期货主力合约数据结构
  225. 返回结构:Key=完整合约代码, Value=[start_date, end_date]
  226. """
  227. if self.verbose_logging:
  228. print("\n=== 步骤1: 构建期货主力合约数据结构 ===")
  229. # 分析时间范围
  230. analysis_start = self.analysis_start_date
  231. analysis_end = self.analysis_end_date
  232. if self.verbose_logging:
  233. print(f"分析时间范围: {analysis_start.strftime('%Y-%m-%d')} 至 {analysis_end.strftime('%Y-%m-%d')}")
  234. print(f"合约类型: 主力合约")
  235. # 构建主力合约数据结构
  236. contract_dict = self._build_dominant_contract_structure(analysis_start, analysis_end)
  237. self.contract_data = contract_dict
  238. if self.verbose_logging:
  239. print(f"构建完成,共{len(contract_dict)}个期货合约")
  240. return contract_dict
  241. def _build_dominant_contract_structure(self, analysis_start, analysis_end):
  242. """构建基于主力合约的数据结构"""
  243. if self.verbose_logging:
  244. print("使用主力合约获取逻辑...")
  245. # 获取要分析的期货品种列表
  246. symbol_list = list(self.exchange_map.keys())[:self.max_analysis_contracts]
  247. # 获取主力合约历史数据
  248. dominant_df = self.get_dominant_contracts_history(
  249. symbol_list,
  250. analysis_start.date(),
  251. analysis_end.date()
  252. )
  253. contract_dict = {}
  254. if not dominant_df.empty:
  255. # 将DataFrame转换为字典结构
  256. for _, row in dominant_df.iterrows():
  257. contract_code = row['dominant_contract']
  258. start_date = row['start_date'] if isinstance(row['start_date'], datetime.date) else row['start_date'].date()
  259. end_date = row['end_date'] if isinstance(row['end_date'], datetime.date) else row['end_date'].date()
  260. contract_dict[contract_code] = [start_date, end_date]
  261. if self.verbose_logging:
  262. print(f" 添加主力合约: {contract_code} ({start_date} 至 {end_date})")
  263. return contract_dict
  264. def collect_extended_data(self):
  265. """
  266. 数据收集:为每个合约收集扩展时间范围的数据
  267. 扩展期限:从(start_date-365天)到(end_date+100天)
  268. """
  269. if self.verbose_logging:
  270. print("\n=== 步骤2: 收集扩展时间范围的数据 ===")
  271. extended_data = {}
  272. for contract_code, period in self.contract_data.items():
  273. start_date_1, end_date_1 = period
  274. # 计算扩展期限
  275. start_date_2 = start_date_1 - datetime.timedelta(days=self.historical_days)
  276. end_date_2 = end_date_1 + datetime.timedelta(days=self.forward_days)
  277. if self.verbose_logging:
  278. print(f"获取 {contract_code} 数据")
  279. print(f" 原始期间: {start_date_1} 至 {end_date_1}")
  280. print(f" 扩展期间: {start_date_2} 至 {end_date_2}")
  281. try:
  282. # 获取日线数据
  283. data = get_price(
  284. contract_code,
  285. start_date=start_date_2,
  286. end_date=end_date_2,
  287. frequency='daily',
  288. fields=['open', 'close', 'high', 'low', 'volume'],
  289. skip_paused=False,
  290. panel=False
  291. )
  292. if data is not None and len(data) > 0:
  293. # 计算移动平均线
  294. data['K5'] = data['close'].rolling(window=self.ma_period).mean()
  295. extended_data[contract_code] = {
  296. 'data': data,
  297. 'original_start': start_date_1,
  298. 'original_end': end_date_1,
  299. 'extended_start': start_date_2,
  300. 'extended_end': end_date_2
  301. }
  302. if self.verbose_logging:
  303. print(f" 成功获取{len(data)}条数据记录")
  304. else:
  305. if self.verbose_logging:
  306. print(f" 未获取到数据")
  307. except Exception as e:
  308. if self.verbose_logging:
  309. print(f" 获取数据失败: {str(e)}")
  310. continue
  311. if self.verbose_logging:
  312. print(f"数据收集完成,共{len(extended_data)}个合约有效数据")
  313. return extended_data
  314. def calculate_k5_distances(self, extended_data):
  315. """
  316. 计算距离:每个交易日价格相对K5的上下偏离距离
  317. """
  318. if self.verbose_logging:
  319. print("\n=== 步骤3: 计算K5偏离距离 ===")
  320. distance_data = {}
  321. for contract_code, contract_info in extended_data.items():
  322. data = contract_info['data']
  323. if self.verbose_logging:
  324. print(f"计算 {contract_code} 的K5距离")
  325. # 初始化距离列
  326. data['upper_distance'] = np.nan
  327. data['lower_distance'] = np.nan
  328. # 计算每日的上下偏离距离
  329. for idx, row in data.iterrows():
  330. if pd.isna(row['K5']): # K5不可用则跳过
  331. continue
  332. K5 = row['K5']
  333. high = row['high']
  334. low = row['low']
  335. # 计算K5与高低价的差值
  336. k5_minus_high = K5 - high
  337. k5_minus_low = K5 - low
  338. if k5_minus_high > 0 and k5_minus_low > 0:
  339. # 高低都低于K5
  340. data.loc[idx, 'upper_distance'] = np.nan
  341. data.loc[idx, 'lower_distance'] = abs(K5 - low)
  342. elif k5_minus_high < 0 and k5_minus_low < 0:
  343. # 高低都在K5以上
  344. data.loc[idx, 'upper_distance'] = abs(K5 - high)
  345. data.loc[idx, 'lower_distance'] = np.nan
  346. elif k5_minus_high > 0 and k5_minus_low < 0:
  347. # 高低于K5,低高于K5(K5介于高与低之间)
  348. data.loc[idx, 'upper_distance'] = abs(K5 - high)
  349. data.loc[idx, 'lower_distance'] = abs(K5 - low)
  350. # 处理零情况
  351. if abs(k5_minus_high) < 1e-6: # high == K5
  352. data.loc[idx, 'upper_distance'] = 0.0
  353. if abs(k5_minus_low) < 1e-6: # low == K5
  354. data.loc[idx, 'lower_distance'] = 0.0
  355. distance_data[contract_code] = contract_info
  356. # 统计有效距离数据
  357. valid_upper = data['upper_distance'].notna().sum()
  358. valid_lower = data['lower_distance'].notna().sum()
  359. if self.verbose_logging:
  360. print(f" 上偏离距离有效数据: {valid_upper}条, 下偏离距离有效数据: {valid_lower}条")
  361. if self.verbose_logging:
  362. print("K5距离计算完成")
  363. return distance_data
  364. def determine_thresholds(self, distance_data):
  365. """
  366. 阈值确定:使用Range_1数据计算百分位数阈值
  367. Range_1: start_date_2 <= date < start_date_1
  368. """
  369. if self.verbose_logging:
  370. print(f"\n=== 步骤4: 确定{self.deviation_percentile}分位数阈值 ===")
  371. threshold_data = {}
  372. for contract_code, contract_info in distance_data.items():
  373. data = contract_info['data']
  374. start_date_1 = contract_info['original_start']
  375. start_date_2 = contract_info['extended_start']
  376. if self.verbose_logging:
  377. print(f"计算 {contract_code} 的阈值")
  378. print(f" Range_1期间: {start_date_2} 至 {start_date_1}")
  379. # 筛选Range_1数据
  380. range_1_mask = (data.index.date >= start_date_2) & (data.index.date < start_date_1)
  381. range_1_data = data[range_1_mask]
  382. if len(range_1_data) == 0:
  383. if self.verbose_logging:
  384. print(f" Range_1无有效数据,跳过")
  385. continue
  386. # 计算上下偏离距离的分布和阈值
  387. upper_distances = range_1_data['upper_distance'].dropna()
  388. lower_distances = range_1_data['lower_distance'].dropna()
  389. # 百分位数阈值
  390. percentile_value = self.deviation_percentile / 100.0
  391. upper_threshold = upper_distances.quantile(percentile_value) if len(upper_distances) > 0 else np.nan
  392. lower_threshold = lower_distances.quantile(percentile_value) if len(lower_distances) > 0 else np.nan
  393. threshold_data[contract_code] = {
  394. 'contract_info': contract_info,
  395. 'upper_threshold': upper_threshold,
  396. 'lower_threshold': lower_threshold,
  397. 'range_1_stats': {
  398. 'total_days': len(range_1_data),
  399. 'upper_valid_days': len(upper_distances),
  400. 'lower_valid_days': len(lower_distances),
  401. 'upward_deviation_days': len(lower_distances[lower_distances >= lower_threshold]) if not pd.isna(lower_threshold) else 0,
  402. 'downward_deviation_days': len(upper_distances[upper_distances >= upper_threshold]) if not pd.isna(upper_threshold) else 0
  403. }
  404. }
  405. if self.verbose_logging:
  406. print(f" 上偏离{self.deviation_percentile}分位数阈值: {upper_threshold:.4f}" if not pd.isna(upper_threshold) else f" 上偏离阈值: 无效")
  407. print(f" 下偏离{self.deviation_percentile}分位数阈值: {lower_threshold:.4f}" if not pd.isna(lower_threshold) else f" 下偏离阈值: 无效")
  408. if self.verbose_logging:
  409. print("阈值确定完成")
  410. return threshold_data
  411. def detect_signals_and_analyze(self, threshold_data):
  412. """
  413. 信号检测与分析:在Range_2期间扫描极端偏差并分析后续走势
  414. Range_2: start_date_1 <= date <= end_date_2
  415. """
  416. if self.verbose_logging:
  417. print("\n=== 步骤5: 信号检测与前瞻性分析 ===")
  418. signal_dict = {}
  419. for contract_code, threshold_info in threshold_data.items():
  420. contract_info = threshold_info['contract_info']
  421. data = contract_info['data']
  422. start_date_1 = contract_info['original_start']
  423. end_date_2 = contract_info['extended_end']
  424. upper_threshold = threshold_info['upper_threshold']
  425. lower_threshold = threshold_info['lower_threshold']
  426. if self.verbose_logging:
  427. print(f"\n分析 {contract_code} 的信号")
  428. print(f" Range_2期间: {start_date_1} 至 {end_date_2}")
  429. # 筛选Range_2数据
  430. range_2_mask = (data.index.date >= start_date_1) & (data.index.date <= end_date_2)
  431. range_2_data = data[range_2_mask]
  432. contract_signals = {}
  433. # 扫描极端偏差天数
  434. for idx, (signal_date, row) in enumerate(range_2_data.iterrows()):
  435. upper_distance = row['upper_distance']
  436. lower_distance = row['lower_distance']
  437. # 检查是否触发阈值
  438. trigger_upper = not pd.isna(upper_distance) and not pd.isna(upper_threshold) and upper_distance >= upper_threshold
  439. trigger_lower = not pd.isna(lower_distance) and not pd.isna(lower_threshold) and lower_distance >= lower_threshold
  440. if trigger_upper or trigger_lower:
  441. # 确定偏离方向和参考价格
  442. if trigger_lower: # lower_distance触发表示向上偏离(价格在K5之上)
  443. deviation_direction = "up"
  444. deviation_distance = lower_distance
  445. reference_category = "low"
  446. reference_price = row['low']
  447. else: # upper_distance触发表示向下偏离(价格在K5之下)
  448. deviation_direction = "down"
  449. deviation_distance = upper_distance
  450. reference_category = "high"
  451. reference_price = row['high']
  452. if self.verbose_logging:
  453. print(f" 发现信号: {signal_date.strftime('%Y-%m-%d')} {deviation_direction}偏离 距离{deviation_distance:.4f}")
  454. # 前瞻性分析
  455. forward_analysis = self._analyze_forward_performance(
  456. range_2_data, idx, deviation_direction, reference_price, self.forward_monitor_days
  457. )
  458. # 计算最大利润和风险
  459. max_profit_info = self._find_max_profit(forward_analysis)
  460. max_risk_info = self._find_max_risk(forward_analysis)
  461. contract_signals[signal_date] = {
  462. "deviation_direction": deviation_direction,
  463. "deviation_distance": float(deviation_distance),
  464. "reference_category": reference_category,
  465. "reference_price": float(reference_price),
  466. "forward_analysis": forward_analysis,
  467. "max_profit": max_profit_info,
  468. "max_risk": max_risk_info
  469. }
  470. if contract_signals:
  471. signal_dict[contract_code] = contract_signals
  472. if self.verbose_logging:
  473. print(f" 检测到{len(contract_signals)}个有效信号")
  474. else:
  475. if self.verbose_logging:
  476. print(f" 未检测到有效信号")
  477. if self.verbose_logging:
  478. print(f"\n信号检测完成,共{sum(len(signals) for signals in signal_dict.values())}个信号")
  479. self.signal_results = signal_dict
  480. return signal_dict
  481. def _analyze_forward_performance(self, data, signal_idx, deviation_direction, reference_price, monitor_days):
  482. """分析信号后续N个交易日的表现"""
  483. forward_analysis = {}
  484. for i in range(1, monitor_days + 1):
  485. future_idx = signal_idx + i
  486. if future_idx < len(data):
  487. future_row = data.iloc[future_idx]
  488. future_date = data.index[future_idx]
  489. # 根据偏离方向选择对应价格
  490. if deviation_direction == "up":
  491. corresponding_price = future_row['low']
  492. corresponding_return = (reference_price - corresponding_price) / reference_price
  493. reverse_return = (future_row['high'] - reference_price) / reference_price
  494. else: # down
  495. corresponding_price = future_row['high']
  496. corresponding_return = (corresponding_price - reference_price) / reference_price
  497. reverse_return = (reference_price - future_row['low']) / reference_price
  498. forward_analysis[future_date] = {
  499. "date": future_date.strftime('%Y-%m-%d'),
  500. "corresponding_price": float(corresponding_price),
  501. "corresponding_return": float(corresponding_return),
  502. "reverse_category": "high" if deviation_direction == "up" else "low",
  503. "reverse_return": float(reverse_return)
  504. }
  505. return forward_analysis
  506. def _find_max_profit(self, forward_analysis):
  507. """找到最大利润日期和收益率"""
  508. if not forward_analysis:
  509. return {"date": None, "return": 0.0}
  510. max_return = max(day_data["corresponding_return"] for day_data in forward_analysis.values())
  511. max_date = None
  512. for date, day_data in forward_analysis.items():
  513. if day_data["corresponding_return"] == max_return:
  514. max_date = date.strftime('%Y-%m-%d')
  515. break
  516. return {"date": max_date, "return": float(max_return)}
  517. def _find_max_risk(self, forward_analysis):
  518. """找到最大风险日期和收益率"""
  519. if not forward_analysis:
  520. return {"date": None, "return": 0.0}
  521. max_risk = max(day_data["reverse_return"] for day_data in forward_analysis.values())
  522. max_risk_date = None
  523. for date, day_data in forward_analysis.items():
  524. if day_data["reverse_return"] == max_risk:
  525. max_risk_date = date.strftime('%Y-%m-%d')
  526. break
  527. return {"date": max_risk_date, "return": float(max_risk)}
  528. def generate_csv_outputs(self, threshold_data, signal_dict):
  529. """生成两个CSV文件输出"""
  530. if self.verbose_logging:
  531. print("\n=== 步骤6: 生成CSV输出文件 ===")
  532. timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
  533. # CSV 1: Range_1偏差统计
  534. self._generate_range1_stats_csv(threshold_data, timestamp)
  535. # CSV 2: 信号分析结果
  536. self._generate_signal_analysis_csv(signal_dict, timestamp)
  537. if self.verbose_logging:
  538. print("CSV文件生成完成")
  539. def _generate_range1_stats_csv(self, threshold_data, timestamp):
  540. """生成Range_1偏差统计CSV"""
  541. stats_data = []
  542. for contract_code, threshold_info in threshold_data.items():
  543. stats = threshold_info['range_1_stats']
  544. total_days = stats['total_days']
  545. upward_ratio = stats['upward_deviation_days'] / total_days if total_days > 0 else 0
  546. downward_ratio = stats['downward_deviation_days'] / total_days if total_days > 0 else 0
  547. stats_data.append({
  548. 'contract_code': contract_code,
  549. 'upward_deviation_ratio': round(upward_ratio, 4),
  550. 'downward_deviation_ratio': round(downward_ratio, 4)
  551. })
  552. if stats_data:
  553. stats_df = pd.DataFrame(stats_data)
  554. filename = f'range1_deviation_stats_{timestamp}.csv'
  555. stats_df.to_csv(filename, index=False, encoding=self.output_encoding)
  556. if self.verbose_logging:
  557. print(f"Range_1偏差统计保存至: {filename}")
  558. def _generate_signal_analysis_csv(self, signal_dict, timestamp):
  559. """生成信号分析结果CSV"""
  560. signal_data = []
  561. for contract_code, contract_signals in signal_dict.items():
  562. for signal_date, signal_info in contract_signals.items():
  563. signal_data.append({
  564. 'contract_code': contract_code,
  565. 'signal_date': signal_date.strftime('%Y-%m-%d'),
  566. 'deviation_direction': signal_info['deviation_direction'],
  567. 'max_profit_date': signal_info['max_profit']['date'],
  568. 'max_profit_return': round(signal_info['max_profit']['return'], 4),
  569. 'max_risk_date': signal_info['max_risk']['date'],
  570. 'max_risk_return': round(signal_info['max_risk']['return'], 4)
  571. })
  572. if signal_data:
  573. signal_df = pd.DataFrame(signal_data)
  574. filename = f'signal_analysis_results_{timestamp}.csv'
  575. signal_df.to_csv(filename, index=False, encoding=self.output_encoding)
  576. if self.verbose_logging:
  577. print(f"信号分析结果保存至: {filename}")
  578. def run_complete_analysis(self):
  579. """执行完整的分析流程"""
  580. if self.verbose_logging:
  581. print("开始执行期货5日移动平均线偏离分析")
  582. print("=" * 60)
  583. try:
  584. # 步骤1: 构建合约数据结构
  585. contract_structure = self.build_contract_structure()
  586. # 步骤2: 收集扩展数据
  587. extended_data = self.collect_extended_data()
  588. if not extended_data:
  589. if self.verbose_logging:
  590. print("未获取到有效数据,分析终止")
  591. return
  592. # 步骤3: 计算K5距离
  593. distance_data = self.calculate_k5_distances(extended_data)
  594. # 步骤4: 确定阈值
  595. threshold_data = self.determine_thresholds(distance_data)
  596. if not threshold_data:
  597. if self.verbose_logging:
  598. print("未能确定有效阈值,分析终止")
  599. return
  600. # 步骤5: 信号检测与分析
  601. signal_dict = self.detect_signals_and_analyze(threshold_data)
  602. # 步骤6: 生成输出文件
  603. self.generate_csv_outputs(threshold_data, signal_dict)
  604. # 分析汇总
  605. total_contracts = len(contract_structure)
  606. valid_contracts = len(threshold_data)
  607. total_signals = sum(len(signals) for signals in signal_dict.values())
  608. if self.verbose_logging:
  609. print("\n" + "=" * 60)
  610. print("分析完成汇总:")
  611. print(f"总合约数: {total_contracts}")
  612. print(f"有效合约数: {valid_contracts}")
  613. print(f"检测信号数: {total_signals}")
  614. return {
  615. 'contract_structure': contract_structure,
  616. 'threshold_data': threshold_data,
  617. 'signal_results': signal_dict,
  618. 'summary': {
  619. 'total_contracts': total_contracts,
  620. 'valid_contracts': valid_contracts,
  621. 'total_signals': total_signals
  622. }
  623. }
  624. except Exception as e:
  625. if self.verbose_logging:
  626. print(f"分析过程中出现错误: {str(e)}")
  627. import traceback
  628. traceback.print_exc()
  629. return None
  630. # =====================================================================================
  631. # 主程序入口
  632. # =====================================================================================
  633. def run_ma5_deviation_analysis(config=None):
  634. """运行期货5日移动平均线偏离分析"""
  635. if config is None:
  636. config = AnalysisConfig
  637. # 打印配置信息
  638. config.print_config()
  639. # 创建分析器并运行
  640. analyzer = FutureMA5DeviationAnalyzer(config)
  641. results = analyzer.run_complete_analysis()
  642. return results
  643. # 执行分析
  644. if __name__ == "__main__":
  645. print("期货5日移动平均线偏离分析工具")
  646. print("研究期货合约在大幅偏离K5后的反转走势规律")
  647. print("使用动态获取的主力合约进行精准分析")
  648. print("适用于聚宽在线研究平台")
  649. results = run_ma5_deviation_analysis()
  650. if results:
  651. print("\n✅ 分析执行成功!")
  652. summary = results['summary']
  653. print(f"📊 结果摘要:")
  654. print(f" - 分析合约数: {summary['total_contracts']}")
  655. print(f" - 有效数据合约: {summary['valid_contracts']}")
  656. print(f" - 检测信号总数: {summary['total_signals']}")
  657. else:
  658. print("\n❌ 分析执行失败,请检查错误信息")