||
- """
- 期货市场技术形态规律分析 - 综合版本
- 研究期货品种在大幅下跌后出现特定K线形态的规律,并分析后续走势
- 本文件包含:
- - 分析配置参数
- - 主要分析类FuturePatternAnalyzer
- - 基于合约切换日志的周线数据获取功能
- - 辅助工具函数和数据导出功能
- - 完整的分析流程
- 主要功能:
- 1. get_contract_weekly_data() - 基于合约切换日志获取周线数据的正式方法
- 2. get_weekly_data_by_contract_log() - 核心数据获取逻辑,直接使用get_bars获取原生周线数据
- 3. 简化的get_weekly_kline_data() - 直接使用8888加权合约,避免主力合约切换复杂性
- 4. validate_manual_patterns() - 手动模式验证系统,支持指定日期的模式验证
- 5. run_manual_pattern_validation() - 便捷的手动验证入口函数
- 6. _display_full_candlestick_charts() - 模式识别失败时的完整K线图显示
- 数据结构简化:
- - 移除了主力合约获取和切换逻辑
- - 使用单级字典结构:键为8888合约代码,值为日期范围
- - 所有数据检索均使用8888加权合约,简化了代码复杂性
- 验证模式:
- - 自动搜索模式:使用identify_target_pattern()自动搜索符合条件的技术形态
- - 手动验证模式:使用validate_manual_patterns()验证指定日期是否符合模式条件
- - 两种模式使用相同的验证逻辑和标准,但工作流程独立
- 作者: jukuan研究团队
- 日期: 2025-09
- 适用平台: 聚宽在线研究平台
- """
- import pandas as pd
- import numpy as np
- from jqdata import *
- import datetime
- import matplotlib.pyplot as plt
- import matplotlib.patches as patches
- from matplotlib.dates import DateFormatter
- import warnings
- warnings.filterwarnings('ignore')
- # 中文字体设置
- plt.rcParams['font.sans-serif'] = ['SimHei', 'Microsoft YaHei', 'DejaVu Sans']
- plt.rcParams['axes.unicode_minus'] = False
- # =====================================================================================
- # 第一部分:分析配置参数
- # 用户可根据需要调整以下参数来控制分析行为
- # =====================================================================================
- class AnalysisConfig:
- """分析配置类 - 集中管理所有分析参数"""
-
- # ==================== 分析时间设置 ====================
- START_DATE = '2022-01-01' # 分析起始日期 (建议至少2年数据)
- END_DATE = '2025-08-31' # 分析结束日期
-
- # ==================== 筛选条件设置 ====================
- MIN_DRAWDOWN_THRESHOLD = 0.30 # 最小回撤阈值 (0.3 = 30%)
-
- # K线形态识别条件
- KLINE_CONDITIONS = {
- 'max_open_close_diff': 0.01, # 开盘价与收盘价最大差异 (0.01 = 1%)
- 'min_high_open_gain': 0.03, # 最高价相对开盘价的最小涨幅 (0.03 = 3%)
- }
-
- # ==================== 分析参数设置 ====================
- ANALYSIS_EXTEND_DAYS = 30 # 扩展分析天数 (在下跌期间前后各扩展的天数)
- FOLLOW_UP_WEEKS = 10 # 后续跟踪周数
- SUCCESS_RETURN_THRESHOLD = 0.05 # 成功率计算的收益率阈值 (0.05 = 5%)
-
- # ==================== 输出设置 ====================
- GENERATE_CHARTS = True # 是否生成K线图
- MAX_CHART_COUNT = 10 # 最大图表生成数量 (避免生成过多图片)
- CHART_OUTPUT_DIR = './' # 图表保存目录
- CSV_ENCODING = 'utf-8-sig' # CSV文件编码
-
- # ==================== 期货品种过滤设置 ====================
- # 需要排除的期货品种 (可根据需要调整)
- EXCLUDED_FUTURES = [
- 'IF9999.CCFX', #
- 'LU8888.XINE', #
- 'NR8888.XINE', #
- 'LG8888.XDCE', # 原木 - 新上市品种数据较少
- ]
-
- # 重点关注的期货品种 (如果设置,将只分析这些品种)
- FOCUS_FUTURES = [] # 空列表表示分析所有品种
-
- # ==================== 分析规模控制 ====================
- MAX_ANALYSIS_FUTURES = 5 # 最大分析期货品种数 (-1表示分析所有品种)
- VERBOSE_LOGGING = True # 是否打印详细日志
- SHOW_PROGRESS = True # 是否在分析过程中显示进度
-
- # ==================== 数据质量控制 ====================
- MIN_DATA_LENGTH = 100 # 最小数据长度要求 (日线数据点数)
- MIN_WEEKLY_DATA = 20 # 最小周线数据要求
- MAX_MISSING_DATA_RATIO = 0.1 # 最大缺失数据比例
-
- @classmethod
- def get_config_dict(cls):
- """获取分析配置字典"""
- return {
- 'time_range': {
- 'start_date': cls.START_DATE,
- 'end_date': cls.END_DATE,
- },
- 'filter_conditions': {
- 'min_drawdown': cls.MIN_DRAWDOWN_THRESHOLD,
- 'kline_conditions': cls.KLINE_CONDITIONS,
- },
- 'analysis_params': {
- 'extend_days': cls.ANALYSIS_EXTEND_DAYS,
- 'follow_up_weeks': cls.FOLLOW_UP_WEEKS,
- 'success_threshold': cls.SUCCESS_RETURN_THRESHOLD,
- },
- 'output_settings': {
- 'generate_charts': cls.GENERATE_CHARTS,
- 'max_chart_count': cls.MAX_CHART_COUNT,
- 'chart_dir': cls.CHART_OUTPUT_DIR,
- 'csv_encoding': cls.CSV_ENCODING,
- },
- 'future_filter': {
- 'excluded': cls.EXCLUDED_FUTURES,
- 'focus_only': cls.FOCUS_FUTURES,
- },
- 'analysis_control': {
- 'max_futures': cls.MAX_ANALYSIS_FUTURES,
- 'verbose': cls.VERBOSE_LOGGING,
- 'show_progress': cls.SHOW_PROGRESS,
- },
- 'data_quality': {
- 'min_data_length': cls.MIN_DATA_LENGTH,
- 'min_weekly_data': cls.MIN_WEEKLY_DATA,
- 'max_missing_ratio': cls.MAX_MISSING_DATA_RATIO,
- }
- }
-
- @classmethod
- def print_config(cls):
- """打印当前配置信息"""
- print("=== 期货技术形态分析配置 ===")
- print(f"分析时间范围: {cls.START_DATE} 至 {cls.END_DATE}")
- print(f"最小回撤阈值: {cls.MIN_DRAWDOWN_THRESHOLD*100:.0f}%")
- print(f"开收差异阈值: {cls.KLINE_CONDITIONS['max_open_close_diff']*100:.0f}%")
- print(f"最高涨幅阈值: {cls.KLINE_CONDITIONS['min_high_open_gain']*100:.0f}%")
- print(f"后续跟踪周数: {cls.FOLLOW_UP_WEEKS}")
- print(f"成功率阈值: {cls.SUCCESS_RETURN_THRESHOLD*100:.0f}%")
-
- if cls.EXCLUDED_FUTURES:
- print(f"排除品种数: {len(cls.EXCLUDED_FUTURES)}")
- if cls.FOCUS_FUTURES:
- print(f"重点品种数: {len(cls.FOCUS_FUTURES)}")
-
- print(f"生成图表: {'是' if cls.GENERATE_CHARTS else '否'}")
- print(f"最大分析品种数: {'所有' if cls.MAX_ANALYSIS_FUTURES == -1 else cls.MAX_ANALYSIS_FUTURES}")
- # =====================================================================================
- # 第二部分:期货技术形态分析器主类
- # 实现完整的技术形态识别和分析功能
- # =====================================================================================
- class FuturePatternAnalyzer:
- """期货技术形态分析器 - 主要分析类"""
-
- def __init__(self, start_date=None, end_date=None, config=None):
- """
- 初始化分析器
-
- 参数:
- - start_date: 分析开始日期,默认使用配置
- - end_date: 分析结束日期,默认使用配置
- - config: 自定义配置,默认使用AnalysisConfig
- """
- # 使用提供的配置或默认配置
- if config is None:
- config = AnalysisConfig.get_config_dict()
-
- self.start_date = pd.to_datetime(start_date or config['time_range']['start_date'])
- self.end_date = pd.to_datetime(end_date or config['time_range']['end_date'])
- self.config = config
- self.future_codes = {} # 期货代码映射
-
- # 期货品种与交易所的映射关系
- self.exchange_map = {
- # 大连商品交易所 (XDCE)
- 'A': 'XDCE', 'B': 'XDCE', 'C': 'XDCE', 'CS': 'XDCE', 'FB': 'XDCE',
- 'I': 'XDCE', 'J': 'XDCE', 'JD': 'XDCE', 'JM': 'XDCE', 'L': 'XDCE',
- 'M': 'XDCE', 'P': 'XDCE', 'PP': 'XDCE', 'PG': 'XDCE', 'RR': 'XDCE',
- 'V': 'XDCE', 'Y': 'XDCE', 'EB': 'XDCE', 'EG': 'XDCE', 'LH': 'XDCE',
-
- # 郑州商品交易所 (XZCE)
- 'AP': 'XZCE', 'CF': 'XZCE', 'CY': 'XZCE', 'FG': 'XZCE', 'JR': 'XZCE',
- 'LR': 'XZCE', 'MA': 'XZCE', 'OI': 'XZCE', 'PM': 'XZCE', 'RI': 'XZCE',
- 'RM': 'XZCE', 'RS': 'XZCE', 'SF': 'XZCE', 'SM': 'XZCE', 'SR': 'XZCE',
- 'TA': 'XZCE', 'WH': 'XZCE', 'ZC': 'XZCE', 'CJ': 'XZCE',
- 'ME': 'XZCE', 'PF': 'XZCE', 'PK': 'XZCE', 'RO': 'XZCE', 'SA': 'XZCE',
- 'TC': 'XZCE', 'UR': 'XZCE', 'WS': 'XZCE', 'WT': 'XZCE',
-
- # 上海期货交易所 (XSGE)
- 'AG': 'XSGE', 'AL': 'XSGE', 'AU': 'XSGE', 'BU': 'XSGE', 'CU': 'XSGE',
- 'FU': 'XSGE', 'HC': 'XSGE', 'NI': 'XSGE', 'PB': 'XSGE', 'RB': 'XSGE',
- 'RU': 'XSGE', 'SN': 'XSGE', 'SP': 'XSGE', 'SS': 'XSGE', 'WR': 'XSGE',
- 'ZN': 'XSGE',
-
- # 上海国际能源交易中心 (XINE)
- 'BC': 'XINE', 'LU': 'XINE', 'NR': 'XINE', 'SC': 'XINE',
-
- # 中金所 (CCFX)
- 'IC': 'CCFX', 'IF': 'CCFX', 'IH': 'CCFX', 'T': 'CCFX', 'TF': 'CCFX', 'TS': 'CCFX',
-
- # 广期所 (GFEX)
- 'SI': 'GFEX', 'LC': 'GFEX', 'PS': 'GFEX'
- }
-
- self._log(f"初始化分析器 - 分析期间: {self.start_date.strftime('%Y-%m-%d')} 至 {self.end_date.strftime('%Y-%m-%d')}")
-
- def _log(self, message):
- """记录日志"""
- if self.config['analysis_control']['verbose']:
- print(message)
-
- def build_future_codes(self):
- """构建期货品种基础数据"""
- self._log("\n=== 步骤1: 构建期货品种基础数据 ===")
-
- # 构建8888主连合约代码
- for code, exchange in self.exchange_map.items():
- future_code = f"{code}8888.{exchange}"
-
- # 应用过滤器
- if future_code in self.config['future_filter']['excluded']:
- continue
-
- # 如果设置了重点关注品种,只添加重点品种
- if (self.config['future_filter']['focus_only'] and
- future_code not in self.config['future_filter']['focus_only']):
- continue
-
- self.future_codes[code] = future_code
-
- self._log(f"构建完成,共{len(self.future_codes)}个期货品种")
- if self.config['analysis_control']['verbose']:
- self._log("示例品种:")
- for i, (code, full_code) in enumerate(list(self.future_codes.items())[:5]):
- self._log(f" {code} -> {full_code}")
-
- return self.future_codes
-
- def get_future_price_data(self, future_code, start_date, end_date):
- """获取期货价格数据"""
- try:
- # print(f"获取{future_code}数据: {start_date} 至 {end_date}")
- data = get_price(
- future_code,
- start_date=start_date,
- end_date=end_date,
- frequency='daily',
- fields=['open', 'close', 'high', 'low', 'volume'],
- skip_paused=False,
- panel=False
- )
- # print(f"获取的最后一条数据: {data.iloc[-1]}")
- return data
- except Exception as e:
- if self.config['analysis_control']['verbose']:
- print(f" 获取{future_code}数据失败: {str(e)}")
- return None
-
- def calculate_drawdown(self, data):
- """计算最大回撤"""
- if data is None or len(data) == 0:
- return None, None, None, None
-
- # 计算累计最高价
- data['cum_max'] = data['high'].expanding().max()
-
- # 计算回撤
- data['drawdown'] = (data['low'] - data['cum_max']) / data['cum_max']
-
- # 找到最大回撤
- max_drawdown = data['drawdown'].min() # 最大回撤是负值,所以用min
- max_drawdown_idx = data['drawdown'].idxmin()
-
- # 找到最大回撤对应的高点
- max_high_before_dd = data.loc[:max_drawdown_idx, 'cum_max'].iloc[-1]
- high_point_idx = data[data['high'] == max_high_before_dd].index[0]
-
- return abs(max_drawdown), high_point_idx, max_drawdown_idx, data
-
- def find_major_decline_futures(self, min_drawdown=None):
- """筛选大幅下跌的期货品种"""
- if min_drawdown is None:
- min_drawdown = self.config['filter_conditions']['min_drawdown']
-
- self._log(f"\n=== 步骤2: 筛选跌幅>{min_drawdown*100}%的期货品种 ===")
-
- decline_futures = []
- total_count = len(self.future_codes)
- max_futures = self.config['analysis_control']['max_futures']
- analyzed_count = 0
-
- for code, future_code in self.future_codes.items():
- analyzed_count += 1
-
- # 最大品种数限制
- if max_futures > 0 and analyzed_count > max_futures:
- break
-
- if self.config['analysis_control']['show_progress']:
- display_total = min(total_count, max_futures) if max_futures > 0 else total_count
- print(f"分析 {code}({future_code}) [{analyzed_count}/{display_total}]", end=" ")
-
- # 获取价格数据
- price_data = self.get_future_price_data(future_code, self.start_date, self.end_date)
-
- if price_data is None or len(price_data) < self.config['data_quality']['min_data_length']:
- if self.config['analysis_control']['show_progress']:
- print("- 数据不足")
- continue
-
- # 计算最大回撤
- max_drawdown, high_date, low_date, data_with_dd = self.calculate_drawdown(price_data)
-
- if max_drawdown is None or max_drawdown < min_drawdown:
- if self.config['analysis_control']['show_progress']:
- print(f"- 最大回撤{max_drawdown:.1%}不符合条件")
- continue
-
- # 记录符合条件的品种
- decline_info = {
- '品种代码': code,
- '合约代码': future_code,
- '最大回撤': max_drawdown,
- '高点日期': high_date,
- '低点日期': low_date,
- '高点价格': data_with_dd.loc[high_date, 'high'],
- '低点价格': data_with_dd.loc[low_date, 'low'],
- '下跌天数': (low_date - high_date).days
- }
- decline_futures.append(decline_info)
-
- if self.config['analysis_control']['show_progress']:
- print(f"✓ 最大回撤{max_drawdown:.1%}, 下跌期间{high_date.strftime('%Y-%m-%d')}至{low_date.strftime('%Y-%m-%d')}")
-
- self._log(f"\n找到{len(decline_futures)}个符合条件的期货品种")
- return decline_futures
-
- def get_simplified_contract_mapping(self, decline_futures):
- """获取8888加权合约映射 - 简化版本,直接使用加权合约"""
- self._log("\n=== 步骤3: 创建8888加权合约映射 ===")
-
- contract_mapping = {}
-
- for future_info in decline_futures:
- code = future_info['品种代码']
- high_date = future_info['高点日期']
- low_date = future_info['低点日期']
-
- # 扩展日期范围前后30天
- extend_days = self.config['analysis_params']['extend_days']
- start_date = high_date - pd.Timedelta(days=extend_days)
- end_date = low_date + pd.Timedelta(days=extend_days)
-
- # 构建8888加权合约代码
- contract_8888 = self.future_codes.get(code)
-
- if contract_8888:
- # 直接使用8888合约,时间范围为分析期间
- contract_mapping[contract_8888] = [start_date, end_date]
-
- self._log(f" ✓ {code}: 使用8888加权合约 {contract_8888}")
- self._log(f" 时间范围: {start_date.strftime('%Y-%m-%d')} 至 {end_date.strftime('%Y-%m-%d')}")
- else:
- self._log(f" ⚠️ {code}: 未找到对应的8888合约代码")
-
- self._log(f"完成8888合约映射,共{len(contract_mapping)}个合约")
- return contract_mapping
-
-
- def get_weekly_kline_data(self, contract_mapping):
- """获取周线K线数据 - 简化版本,直接使用8888加权合约"""
- self._log("\n=== 步骤4: 获取周线K线数据 ===")
-
- weekly_data = {}
-
- for contract_code, time_range in contract_mapping.items():
- start_date, end_date = time_range
-
- self._log(f"获取{contract_code}周线数据 ({start_date.strftime('%Y-%m-%d')} 至 {end_date.strftime('%Y-%m-%d')})")
-
- try:
- # 计算需要获取的周数,确保充足的数据覆盖
- time_diff = end_date - start_date
- estimated_weeks = int(time_diff.days / 7) + 20 # 额外增加20周缓冲
- estimated_weeks = max(estimated_weeks, 40) # 最少40周
-
- self._log(f" 计算获取{estimated_weeks}周数据")
-
- # 直接使用get_bars API获取原生周线数据,避免日线转换的精度损失
- weekly_bars = get_bars(
- security=contract_code,
- count=estimated_weeks,
- unit='1w', # 直接获取周线数据
- fields=['date', 'open', 'high', 'low', 'close', 'volume'],
- include_now=False,
- end_dt=end_date,
- fq_ref_date=None,
- df=True
- )
-
- if weekly_bars is not None and len(weekly_bars) > 0:
- # 确保日期类型一致,避免 'Cannot compare type Timestamp with type date' 错误
- # get_bars返回的date列可能是date类型,需统一转换为pandas Timestamp
- weekly_bars['date'] = pd.to_datetime(weekly_bars['date'])
- start_date = pd.to_datetime(start_date)
- end_date = pd.to_datetime(end_date)
-
- # 筛选在指定时间范围内的数据
- mask = (weekly_bars['date'] >= start_date) & (weekly_bars['date'] <= end_date)
- filtered_weekly_data = weekly_bars[mask].copy()
-
- if len(filtered_weekly_data) > 0:
- # 设置日期为索引
- filtered_weekly_data.set_index('date', inplace=True)
-
- # 从合约代码中提取品种代码(用于后续识别)
- symbol_code = contract_code.replace('8888.', '').split('.')[0]
-
- weekly_data[symbol_code] = {
- 'data': filtered_weekly_data,
- 'contract_code': contract_code,
- 'time_range': time_range
- }
- self._log(f" ✓ {symbol_code}({contract_code}): 获取{len(filtered_weekly_data)}根周K线")
- else:
- self._log(f" ⚠️ {contract_code}: 筛选后周线数据不足")
- else:
- self._log(f" ⚠️ {contract_code}: 未获取到周线数据")
-
- except Exception as e:
- self._log(f" ❌ 获取{contract_code}周线数据失败: {str(e)}")
- continue
-
- self._log(f"完成周线数据获取,共{len(weekly_data)}个品种")
- return weekly_data
-
- def get_weekly_data_by_contract_log(self, contract_switch_log, extend_weeks=10):
- """
- 基于合约切换日志获取每个合约的周线数据
-
- 参数:
- - contract_switch_log: 合约切换日志,格式为列表,每个元素包含:
- {'contract': '合约代码', 'start_date': '开始日期', 'end_date': '结束日期'}
- - extend_weeks: 延长周数,默认10周
-
- 返回:
- - dict: 每个合约的周线数据,键为合约代码,值为数据和时间范围信息
- """
- self._log(f"\n=== 基于合约切换日志获取周线数据 ===")
- self._log(f"延长周数: {extend_weeks}周")
-
- weekly_data_by_contract = {}
-
- for contract_info in contract_switch_log:
- contract_code = contract_info.get('contract')
- start_date = contract_info.get('start_date')
- end_date = contract_info.get('end_date')
-
- if not all([contract_code, start_date, end_date]):
- self._log(f"⚠️ 合约信息不完整,跳过: {contract_info}")
- continue
-
- # 转换日期格式
- if isinstance(start_date, str):
- start_date = pd.to_datetime(start_date)
- if isinstance(end_date, str):
- end_date = pd.to_datetime(end_date)
-
- # 计算延长后的时间范围
- extended_start_date = start_date - pd.Timedelta(weeks=extend_weeks)
- extended_end_date = end_date + pd.Timedelta(weeks=extend_weeks)
-
- self._log(f"处理合约: {contract_code}")
- self._log(f" 原始时间范围: {start_date.strftime('%Y-%m-%d')} 至 {end_date.strftime('%Y-%m-%d')}")
- self._log(f" 延长时间范围: {extended_start_date.strftime('%Y-%m-%d')} 至 {extended_end_date.strftime('%Y-%m-%d')}")
-
- try:
- # 计算需要获取的周数,采用更稳健的方式
- time_diff = extended_end_date - extended_start_date
- estimated_weeks = int(time_diff.days / 7) + 20 # 额外增加20周作为充足缓冲
-
- # 确保最少获取60周数据(约1.2年)以提供充分的历史背景
- estimated_weeks = max(estimated_weeks, 60)
-
- self._log(f" 计算获取{estimated_weeks}周数据 (时间跨度{time_diff.days}天)")
-
- # 直接使用get_bars API获取原生周线数据,避免日线转换的精度损失
- weekly_bars = get_bars(
- security=contract_code,
- count=estimated_weeks, # 扩展的周数以确保完整覆盖
- unit='1w', # 直接获取周线数据,避免转换精度损失
- fields=['date', 'open', 'high', 'low', 'close', 'volume'],
- include_now=False, # 不包含当前未完成的周期
- end_dt=extended_end_date,
- fq_ref_date=None, # 不复权,保持原始价格
- df=True # 返回DataFrame格式便于处理
- )
-
- if weekly_bars is not None and len(weekly_bars) > 0:
- # 确保日期类型一致,避免 'Cannot compare type Timestamp with type date' 错误
- # get_bars返回的date列可能是date类型,需统一转换为pandas Timestamp
- weekly_bars['date'] = pd.to_datetime(weekly_bars['date'])
- extended_start_date = pd.to_datetime(extended_start_date)
- extended_end_date = pd.to_datetime(extended_end_date)
-
- # 筛选在延长时间范围内的数据
- mask = (weekly_bars['date'] >= extended_start_date) & (weekly_bars['date'] <= extended_end_date)
- filtered_data = weekly_bars[mask].copy()
-
- if len(filtered_data) > 0:
- # 设置日期为索引
- filtered_data.set_index('date', inplace=True)
-
- # 存储数据
- weekly_data_by_contract[contract_code] = {
- 'data': filtered_data,
- 'original_start': start_date,
- 'original_end': end_date,
- 'extended_start': extended_start_date,
- 'extended_end': extended_end_date,
- 'total_weeks': len(filtered_data)
- }
-
- self._log(f" ✓ 获取到{len(filtered_data)}根周K线数据")
- self._log(f" 数据时间范围: {filtered_data.index[0].strftime('%Y-%m-%d')} 至 {filtered_data.index[-1].strftime('%Y-%m-%d')}")
-
- # 添加数据质量检查
- original_start_ts = pd.to_datetime(start_date)
- original_end_ts = pd.to_datetime(end_date)
- original_weeks = int((original_end_ts - original_start_ts).days / 7) + 1
- expected_total_weeks = original_weeks + 2 * extend_weeks
- if len(filtered_data) < expected_total_weeks * 0.7: # 如果数据不足预期的70%
- self._log(f" ⚠️ 数据可能不完整,预期约{expected_total_weeks}周,实际{len(filtered_data)}周")
- else:
- self._log(f" ⚠️ 筛选后无有效数据")
-
- else:
- self._log(f" ⚠️ 未获取到周线数据")
-
- except Exception as e:
- self._log(f" ❌ 获取{contract_code}周线数据失败: {str(e)}")
- continue
-
- self._log(f"\n成功获取{len(weekly_data_by_contract)}个合约的周线数据")
- return weekly_data_by_contract
-
- def identify_target_pattern(self, weekly_data, decline_futures):
- """
- 识别目标K线形态
-
- 当未识别到任何目标形态时,会自动显示完整的烛台图(K线图)作为数据可视化备选方案。
- 图表包含完整的OHLC数据,直接显示不保存到本地存储。
- """
- self._log("\n=== 步骤5: 识别目标K线形态 ===")
-
- pattern_results = []
- kline_conditions = self.config['filter_conditions']['kline_conditions']
-
- # 创建品种代码到下跌信息的映射
- decline_info_map = {future['品种代码']: future for future in decline_futures}
-
- for code, data_info in weekly_data.items():
- data = data_info['data']
- contract_code = data_info['contract_code'] # 8888加权合约代码
-
- # 从decline_futures中获取下跌时间信息
- if code not in decline_info_map:
- continue
-
- decline_info = decline_info_map[code]
- decline_start = decline_info['高点日期']
- decline_end = decline_info['低点日期']
- extend_days = self.config['analysis_params']['extend_days']
-
- self._log(f"分析{code}的K线形态 (使用{contract_code})")
-
- for idx, (date, row) in enumerate(data.iterrows()):
- open_price = row['open']
- close_price = row['close']
- high_price = row['high']
- low_price = row['low']
- volume = row['volume']
-
- # 检查是否在下跌期间或附近
- date_in_range = (date >= decline_start - pd.Timedelta(days=extend_days) and
- date <= decline_end + pd.Timedelta(days=extend_days))
-
- if not date_in_range:
- continue
-
- # 使用统一的核心验证逻辑
- validation_result = self._core_pattern_validation(date, row, kline_conditions)
-
- if validation_result['validation_passed']:
- pattern_info = {
- '品种代码': code,
- '日期': date,
- '开盘价': validation_result['ohlc_data']['open'],
- '收盘价': validation_result['ohlc_data']['close'],
- '最高价': validation_result['ohlc_data']['high'],
- '最低价': validation_result['ohlc_data']['low'],
- '成交量': validation_result['ohlc_data']['volume'],
- '开收差异率': validation_result['indicators']['price_diff_pct']['actual_value'],
- '最高涨幅率': validation_result['indicators']['high_gain_pct']['actual_value'],
- '下跌开始日期': decline_start,
- '下跌结束日期': decline_end,
- '数据索引': idx,
- '使用合约': contract_code # 8888加权合约
- }
- pattern_results.append(pattern_info)
-
- price_diff_pct = validation_result['indicators']['price_diff_pct']['actual_value'] / 100
- high_gain_pct = validation_result['indicators']['high_gain_pct']['actual_value'] / 100
- self._log(f" ✓ 找到形态: {date.strftime('%Y-%m-%d')} 开收差{price_diff_pct:.1%} 最高涨{high_gain_pct:.1%} 合约{contract_code}")
-
- self._log(f"\n共识别出{len(pattern_results)}个目标K线形态")
-
- # 如果未识别到任何模式,显示完整的烛台图
- if len(pattern_results) == 0:
- self._log("\n⚠️ 未识别到任何目标形态,显示完整数据集的K线图")
- self._display_full_candlestick_charts(weekly_data)
-
- return pattern_results
-
- def _display_full_candlestick_charts(self, weekly_data):
- """显示完整的烛台图(K线图)"""
- self._log("=== 显示完整数据集的K线图 ===")
-
- for code, data_info in weekly_data.items():
- data = data_info['data']
- contract_code = data_info['contract_code']
-
- if data is None or len(data) == 0:
- self._log(f"⚠️ {code}: 无可用数据")
- continue
-
- self._log(f"绘制{code}({contract_code})的完整K线图,共{len(data)}根K线")
-
- try:
- # 创建图表
- fig, ax = plt.subplots(figsize=(16, 10))
-
- # 准备数据用于绘制烛台图
- dates = data.index
- opens = data['open']
- highs = data['high']
- lows = data['low']
- closes = data['close']
- volumes = data['volume']
-
- # 绘制K线图
- for i in range(len(data)):
- date = dates[i]
- open_price = opens.iloc[i]
- high_price = highs.iloc[i]
- low_price = lows.iloc[i]
- close_price = closes.iloc[i]
-
- # K线颜色:红涨绿跌
- color = 'red' if close_price > open_price else 'green'
- edge_color = 'darkred' if close_price > open_price else 'darkgreen'
-
- # 绘制影线(最高价到最低价的竖线)
- ax.plot([i, i], [low_price, high_price], color='black', linewidth=1)
-
- # 绘制实体(开盘价到收盘价的矩形)
- body_height = abs(close_price - open_price)
- bottom = min(open_price, close_price)
-
- # 使用矩形绘制K线实体
- rect = patches.Rectangle((i-0.4, bottom), 0.8, body_height,
- linewidth=1, edgecolor=edge_color,
- facecolor=color, alpha=0.8)
- ax.add_patch(rect)
-
- # 设置图表标题和标签
- ax.set_title(f'{code} ({contract_code}) 完整周线K线图\n'
- f'数据期间: {dates[0].strftime("%Y-%m-%d")} 至 {dates[-1].strftime("%Y-%m-%d")} '
- f'(共{len(data)}根K线)',
- fontsize=14, fontweight='bold', pad=20)
-
- ax.set_xlabel('时间', fontsize=12)
- ax.set_ylabel('价格', fontsize=12)
- ax.grid(True, alpha=0.3)
-
- # 设置x轴标签
- # 选择合适的标签间隔,避免过于密集
- step = max(1, len(data) // 10) # 显示约10个时间标签
- tick_positions = range(0, len(data), step)
- tick_labels = [dates[i].strftime('%Y-%m-%d') for i in tick_positions]
-
- ax.set_xticks(tick_positions)
- ax.set_xticklabels(tick_labels, rotation=45, ha='right')
-
- # 添加统计信息
- max_price = highs.max()
- min_price = lows.min()
- latest_close = closes.iloc[-1]
- total_change = (latest_close - closes.iloc[0]) / closes.iloc[0] * 100
-
- # 在图表上添加统计文本
- stats_text = (f'最高价: {max_price:.2f}\n'
- f'最低价: {min_price:.2f}\n'
- f'最新收盘: {latest_close:.2f}\n'
- f'总涨跌幅: {total_change:+.2f}%')
-
- ax.text(0.02, 0.98, stats_text, transform=ax.transAxes,
- verticalalignment='top', bbox=dict(boxstyle='round',
- facecolor='wheat', alpha=0.8), fontsize=10)
-
- # 调整布局并显示
- plt.tight_layout()
- plt.show()
-
- self._log(f"✓ 已显示{code}的完整K线图")
-
- except Exception as e:
- self._log(f"❌ 绘制{code}K线图时出错: {str(e)}")
- continue
-
- self._log("完成所有品种的K线图显示")
-
- def _core_pattern_validation(self, target_date, kline_data_row, kline_conditions):
- """
- 统一的核心模式验证逻辑
-
- 此函数包含两种模式共同的验证逻辑,消除代码重复。
- 无论是自动模式遍历所有K线,还是验证模式检查特定日期,
- 都使用相同的验证标准和计算方法。
-
- 参数:
- - target_date: 目标日期 (pandas Timestamp)
- - kline_data_row: 单根K线数据 (pandas Series,包含open、high、low、close、volume)
- - kline_conditions: 验证条件配置字典
-
- 返回:
- - dict: 包含验证结果和详细诊断信息的字典
- """
-
- # 提取OHLC数据
- open_price = kline_data_row['open']
- close_price = kline_data_row['close']
- high_price = kline_data_row['high']
- low_price = kline_data_row['low']
- volume = kline_data_row['volume']
-
- # 计算模式指标
- price_diff_pct = abs(close_price - open_price) / open_price
- high_gain_pct = (high_price - open_price) / open_price
-
- # 验证条件A: 开盘价与收盘价差异
- condition_a = price_diff_pct <= kline_conditions['max_open_close_diff']
- condition_a_threshold = kline_conditions['max_open_close_diff']
-
- # 验证条件B: 最高价相对开盘价涨幅
- condition_b = high_gain_pct >= kline_conditions['min_high_open_gain']
- condition_b_threshold = kline_conditions['min_high_open_gain']
-
- # 整体验证结果
- validation_passed = condition_a and condition_b
-
- # 构建详细的验证结果
- result = {
- 'date': target_date,
- 'validation_passed': validation_passed,
- 'ohlc_data': {
- 'open': round(open_price, 2),
- 'high': round(high_price, 2),
- 'low': round(low_price, 2),
- 'close': round(close_price, 2),
- 'volume': volume
- },
- 'indicators': {
- 'price_diff_pct': {
- 'actual_value': round(price_diff_pct * 100, 2),
- 'threshold': round(condition_a_threshold * 100, 2),
- 'unit': '%',
- 'condition': '≤',
- 'passed': condition_a,
- 'description': '开盘价与收盘价差异率'
- },
- 'high_gain_pct': {
- 'actual_value': round(high_gain_pct * 100, 2),
- 'threshold': round(condition_b_threshold * 100, 2),
- 'unit': '%',
- 'condition': '≥',
- 'passed': condition_b,
- 'description': '最高价相对开盘价涨幅'
- }
- }
- }
-
- # 生成失败原因
- if not validation_passed:
- failed_conditions = []
- if not condition_a:
- failed_conditions.append(f"开收差异率{price_diff_pct*100:.2f}% > 阈值{condition_a_threshold*100:.1f}%")
- if not condition_b:
- failed_conditions.append(f"最高涨幅{high_gain_pct*100:.2f}% < 阈值{condition_b_threshold*100:.1f}%")
-
- result['failure_reason'] = "; ".join(failed_conditions)
-
- return result
-
- def validate_manual_patterns(self, manual_config, weekly_data):
- """
- 手动验证模式功能
-
- 参数:
- - manual_config: 手动配置字典,格式:
- {'B':['2022-07-22','2024-08-23'], 'A':['2023-01-06','2023-06-09']}
- - weekly_data: 周线数据字典
-
- 返回:
- - dict: 验证结果,包含成功和失败的详细信息
- """
- self._log("\n=== 手动模式验证 ===")
- self._log(f"配置的模式类型数: {len(manual_config)}")
-
- validation_results = {
- 'successful_validations': [],
- 'failed_validations': [],
- 'summary': {}
- }
-
- kline_conditions = self.config['filter_conditions']['kline_conditions']
-
- for pattern_type, date_list in manual_config.items():
- self._log(f"\n处理模式类型 '{pattern_type}': {len(date_list)} 个日期")
-
- type_results = {
- 'pattern_type': pattern_type,
- 'total_dates': len(date_list),
- 'successful_count': 0,
- 'failed_count': 0,
- 'validations': []
- }
-
- for date_str in date_list:
- try:
- # 转换日期格式
- target_date = pd.to_datetime(date_str)
- self._log(f" 验证日期: {target_date.strftime('%Y-%m-%d')}")
-
- # 验证这个日期
- validation_result = self._validate_single_date(
- target_date, pattern_type, weekly_data, kline_conditions
- )
-
- type_results['validations'].append(validation_result)
-
- if validation_result['validation_passed']:
- type_results['successful_count'] += 1
- validation_results['successful_validations'].append(validation_result)
- self._log(f" ✓ 验证通过")
- else:
- type_results['failed_count'] += 1
- validation_results['failed_validations'].append(validation_result)
- self._log(f" ❌ 验证失败: {validation_result['failure_reason']}")
-
- except Exception as e:
- error_result = {
- 'pattern_type': pattern_type,
- 'target_date': date_str,
- 'validation_passed': False,
- 'failure_reason': f'日期处理错误: {str(e)}',
- 'error': True
- }
- type_results['validations'].append(error_result)
- type_results['failed_count'] += 1
- validation_results['failed_validations'].append(error_result)
- self._log(f" ❌ 日期处理错误: {str(e)}")
-
- validation_results['summary'][pattern_type] = type_results
-
- # 打印总体统计
- total_validations = sum(result['total_dates'] for result in validation_results['summary'].values())
- total_successful = len(validation_results['successful_validations'])
- total_failed = len(validation_results['failed_validations'])
-
- self._log(f"\n=== 验证结果汇总 ===")
- self._log(f"总验证数: {total_validations}")
- self._log(f"成功验证: {total_successful}")
- self._log(f"失败验证: {total_failed}")
- self._log(f"成功率: {(total_successful/total_validations*100) if total_validations > 0 else 0:.1f}%")
-
- # 显示失败详情
- if validation_results['failed_validations']:
- self._display_validation_failures(validation_results['failed_validations'])
-
- return validation_results
-
- def _validate_single_date(self, target_date, pattern_type, weekly_data, kline_conditions):
- """验证单个日期的模式 - 使用统一的核心验证逻辑"""
-
- validation_result = {
- 'pattern_type': pattern_type,
- 'target_date': target_date.strftime('%Y-%m-%d'),
- 'validation_passed': False,
- 'failure_reason': '',
- 'diagnostics': {}
- }
-
- # 查找包含目标日期的数据
- found_data = None
-
- for code, data_info in weekly_data.items():
- data = data_info['data']
- contract_code = data_info['contract_code']
-
- # 检查目标日期是否在数据范围内
- if target_date >= data.index[0] and target_date <= data.index[-1]:
- # 找到最接近的周线数据
- closest_idx = data.index.get_indexer([target_date], method='nearest')[0]
- if closest_idx >= 0 and closest_idx < len(data):
- found_data = data.iloc[closest_idx]
- validation_result['contract_code'] = contract_code
- validation_result['closest_date'] = data.index[closest_idx].strftime('%Y-%m-%d')
- break
-
- if found_data is None:
- validation_result['failure_reason'] = '未找到匹配的数据'
- return validation_result
-
- # 使用统一的核心验证逻辑
- core_result = self._core_pattern_validation(target_date, found_data, kline_conditions)
-
- # 整合核心结果到验证结果
- validation_result['validation_passed'] = core_result['validation_passed']
- validation_result['diagnostics']['ohlc_data'] = core_result['ohlc_data']
- validation_result['diagnostics']['indicators'] = core_result['indicators']
-
- if 'failure_reason' in core_result:
- validation_result['failure_reason'] = core_result['failure_reason']
-
- return validation_result
-
- def _display_validation_failures(self, failed_validations):
- """显示详细的验证失败信息"""
- self._log("\n=== 详细失败诊断 ===")
-
- for i, failure in enumerate(failed_validations, 1):
- if failure.get('error'):
- continue # 跳过错误情况
-
- self._log(f"\n失败 #{i}: 模式类型 '{failure['pattern_type']}' - 日期 {failure['target_date']}")
-
- if 'contract_code' in failure:
- self._log(f" 合约: {failure['contract_code']}")
- self._log(f" 最近日期: {failure['closest_date']}")
-
- # 显示OHLC数据
- if 'ohlc_data' in failure['diagnostics']:
- ohlc = failure['diagnostics']['ohlc_data']
- self._log(f" OHLC数据:")
- self._log(f" 开盘: {ohlc['open']:>8}")
- self._log(f" 最高: {ohlc['high']:>8}")
- self._log(f" 最低: {ohlc['low']:>8}")
- self._log(f" 收盘: {ohlc['close']:>8}")
- self._log(f" 成交量: {ohlc['volume']}")
-
- # 显示指标验证详情
- if 'indicators' in failure['diagnostics']:
- self._log(f" 指标验证:")
- for indicator_name, indicator_data in failure['diagnostics']['indicators'].items():
- status = "✓" if indicator_data['passed'] else "❌"
- self._log(f" {status} {indicator_data['description']}:")
- self._log(f" 实际值: {indicator_data['actual_value']}{indicator_data['unit']}")
- self._log(f" 阈值: {indicator_data['condition']} {indicator_data['threshold']}{indicator_data['unit']}")
- self._log(f" 结果: {'通过' if indicator_data['passed'] else '失败'}")
-
- self._log(f" 失败原因: {failure['failure_reason']}")
-
- def analyze_future_performance(self, pattern_results, weekly_data):
- """分析后续走势 - 逐周详细分析"""
- self._log("\n=== 步骤6: 分析后续10周逐周表现 ===")
-
- performance_results = []
- follow_up_weeks = self.config['analysis_params']['follow_up_weeks']
-
- for pattern in pattern_results:
- code = pattern['品种代码']
- pattern_date = pattern['日期']
- pattern_idx = pattern['数据索引']
- pattern_close_price = pattern['收盘价']
-
- if code not in weekly_data:
- continue
-
- data = weekly_data[code]['data']
-
- # 基础信息
- future_performance = {
- '品种代码': code,
- '形态日期': pattern_date,
- '形态收盘价': pattern_close_price
- }
-
- # 详细的逐周分析
- print(f"\n{code} ({pattern_date.strftime('%Y-%m-%d')}) 后续10周详细分析:")
- print("-" * 80)
- print(f"{'周次':<4} {'日期':<12} {'开盘':<8} {'收盘':<8} {'最高':<8} {'最低':<8} "
- f"{'周涨跌幅':<10} {'最大跌幅':<10}")
- print("-" * 80)
-
- # 逐周分析
- for weeks in range(1, follow_up_weeks + 1):
- future_idx = pattern_idx + weeks
-
- if future_idx < len(data):
- future_row = data.iloc[future_idx]
- week_date = data.index[future_idx]
-
- week_open = future_row['open']
- week_close = future_row['close']
- week_high = future_row['high']
- week_low = future_row['low']
-
- # a) 每周的涨跌幅:收盘价相对于开盘价的涨跌幅度
- weekly_change_pct = (week_close - week_open) / week_open * 100
-
- # b) 每周的最大跌幅:开盘价到当周最低价的最大跌幅
- max_decline_pct = (week_low - week_open) / week_open * 100
-
- # 存储详细数据
- future_performance[f'第{weeks}周_日期'] = week_date.strftime('%Y-%m-%d')
- future_performance[f'第{weeks}周_开盘价'] = round(week_open, 2)
- future_performance[f'第{weeks}周_收盘价'] = round(week_close, 2)
- future_performance[f'第{weeks}周_最高价'] = round(week_high, 2)
- future_performance[f'第{weeks}周_最低价'] = round(week_low, 2)
- future_performance[f'第{weeks}周_涨跌幅'] = round(weekly_change_pct, 2)
- future_performance[f'第{weeks}周_最大跌幅'] = round(max_decline_pct, 2)
-
- # 打印逐周详细信息
- print(f"{weeks:>2} {week_date.strftime('%Y-%m-%d'):<12} "
- f"{week_open:<8.2f} {week_close:<8.2f} {week_high:<8.2f} {week_low:<8.2f} "
- f"{weekly_change_pct:>+8.2f}% {max_decline_pct:>+8.2f}%")
-
- else:
- # 数据不足的情况
- for field in ['日期', '开盘价', '收盘价', '最高价', '最低价', '涨跌幅', '最大跌幅']:
- future_performance[f'第{weeks}周_{field}'] = None
- print(f"{weeks:>2} {'无数据':<12} {'--':<8} {'--':<8} {'--':<8} {'--':<8} "
- f"{'--':<10} {'--':<10}")
-
- print("-" * 80)
-
- # 计算统计指标
- weekly_changes = [future_performance[f'第{w}周_涨跌幅']
- for w in range(1, follow_up_weeks + 1)
- if future_performance[f'第{w}周_涨跌幅'] is not None]
-
- max_declines = [future_performance[f'第{w}周_最大跌幅']
- for w in range(1, follow_up_weeks + 1)
- if future_performance[f'第{w}周_最大跌幅'] is not None]
-
- if weekly_changes:
- avg_weekly_change = sum(weekly_changes) / len(weekly_changes)
- max_weekly_gain = max(weekly_changes)
- max_weekly_loss = min(weekly_changes)
-
- future_performance['统计_平均周涨跌幅'] = round(avg_weekly_change, 2)
- future_performance['统计_最大周涨幅'] = round(max_weekly_gain, 2)
- future_performance['统计_最大周跌幅'] = round(max_weekly_loss, 2)
-
- print(f"统计汇总:")
- print(f" 平均周涨跌幅: {avg_weekly_change:+.2f}%")
- print(f" 最大周涨幅: {max_weekly_gain:+.2f}%")
- print(f" 最大周跌幅: {max_weekly_loss:+.2f}%")
-
- if max_declines:
- avg_max_decline = sum(max_declines) / len(max_declines)
- worst_decline = min(max_declines)
-
- future_performance['统计_平均最大跌幅'] = round(avg_max_decline, 2)
- future_performance['统计_最差跌幅'] = round(worst_decline, 2)
-
- print(f" 平均最大跌幅: {avg_max_decline:+.2f}%")
- print(f" 最差单周跌幅: {worst_decline:+.2f}%")
-
- performance_results.append(future_performance)
- print("=" * 80)
-
- return performance_results
-
- def create_visualizations(self, pattern_results, weekly_data):
- """创建K线图可视化"""
- if not self.config['output_settings']['generate_charts']:
- return
-
- self._log("\n=== 步骤7: 生成K线图可视化 ===")
-
- max_charts = min(len(pattern_results), self.config['output_settings']['max_chart_count'])
-
- for i, pattern in enumerate(pattern_results[:max_charts]):
- code = pattern['品种代码']
- pattern_date = pattern['日期']
- pattern_idx = pattern['数据索引']
-
- if code not in weekly_data:
- continue
-
- data = weekly_data[code]['data']
-
- # 准备绘图数据(形态前后各5周)
- start_idx = max(0, pattern_idx - 5)
- end_idx = min(len(data), pattern_idx + 11)
- plot_data = data.iloc[start_idx:end_idx]
-
- # 创建K线图
- fig, ax = plt.subplots(figsize=(12, 8))
-
- # 绘制K线
- for j, (date, row) in enumerate(plot_data.iterrows()):
- open_price = row['open']
- close_price = row['close']
- high_price = row['high']
- low_price = row['low']
-
- # K线颜色
- color = 'red' if close_price > open_price else 'green'
-
- # 绘制影线
- ax.plot([j, j], [low_price, high_price], color='black', linewidth=1)
-
- # 绘制实体
- body_height = abs(close_price - open_price)
- if close_price > open_price:
- rect = patches.Rectangle((j-0.3, open_price), 0.6, body_height,
- linewidth=1, edgecolor='red', facecolor='red', alpha=0.7)
- else:
- rect = patches.Rectangle((j-0.3, close_price), 0.6, body_height,
- linewidth=1, edgecolor='green', facecolor='green', alpha=0.7)
- ax.add_patch(rect)
-
- # 标记目标形态
- if date == pattern_date:
- ax.scatter(j, high_price + (high_price - low_price) * 0.1,
- color='yellow', s=100, marker='*', zorder=5)
- ax.text(j, high_price + (high_price - low_price) * 0.15,
- '目标形态', ha='center', fontsize=10, color='red', fontweight='bold')
-
- # 设置图表
- ax.set_title(f'{code} 周线K线图 - 目标形态分析\n形态日期: {pattern_date.strftime("%Y-%m-%d")}',
- fontsize=14, fontweight='bold')
- ax.set_xlabel('时间', fontsize=12)
- ax.set_ylabel('价格', fontsize=12)
- ax.grid(True, alpha=0.3)
-
- # 设置x轴标签
- dates = [date.strftime('%m-%d') for date in plot_data.index]
- ax.set_xticks(range(len(dates)))
- ax.set_xticklabels(dates, rotation=45)
-
- # 显示图表
- plt.tight_layout()
- plt.show() # 直接在界面中显示图表
- self._log(f"显示图表: {code} {pattern_date.strftime('%Y-%m-%d')} K线形态分析")
-
- def generate_summary_report(self, decline_futures, pattern_results, performance_results):
- """生成整体规律总结报告"""
- self._log("\n=== 步骤8: 生成统计报告 ===")
-
- # 基础统计
- total_decline_futures = len(decline_futures)
- total_patterns = len(pattern_results)
- total_analyzed = len(performance_results)
-
- print(f"\n=== 期货技术形态规律分析报告 ===")
- print(f"分析期间: {self.start_date.strftime('%Y-%m-%d')} 至 {self.end_date.strftime('%Y-%m-%d')}")
- print(f"总分析期货品种数: {len(self.future_codes)}")
- print(f"大幅下跌品种数: {total_decline_futures}")
- print(f"识别出的目标形态数: {total_patterns}")
- print(f"完成后续走势分析数: {total_analyzed}")
-
- if total_analyzed == 0:
- print("无有效数据进行统计分析")
- return {}
-
- # 收益率统计
- print(f"\n=== 后续走势统计分析 ===")
-
- # 提取所有统计指标
- avg_weekly_changes = [r['统计_平均周涨跌幅'] for r in performance_results if '统计_平均周涨跌幅' in r]
- max_weekly_gains = [r['统计_最大周涨幅'] for r in performance_results if '统计_最大周涨幅' in r]
- max_weekly_losses = [r['统计_最大周跌幅'] for r in performance_results if '统计_最大周跌幅' in r]
- avg_max_declines = [r['统计_平均最大跌幅'] for r in performance_results if '统计_平均最大跌幅' in r]
-
- if avg_weekly_changes:
- print(f"平均周涨跌幅统计:")
- print(f" 平均值: {np.mean(avg_weekly_changes):.2f}%")
- print(f" 中位数: {np.median(avg_weekly_changes):.2f}%")
- print(f" 最大值: {max(avg_weekly_changes):.2f}%")
- print(f" 最小值: {min(avg_weekly_changes):.2f}%")
-
- positive_weeks = sum(1 for r in avg_weekly_changes if r > 0)
- print(f" 正收益形态比例: {positive_weeks}/{len(avg_weekly_changes)} ({positive_weeks/len(avg_weekly_changes)*100:.1f}%)")
-
- if max_weekly_gains:
- print(f"\n最大周涨幅统计:")
- print(f" 平均值: {np.mean(max_weekly_gains):.2f}%")
- print(f" 中位数: {np.median(max_weekly_gains):.2f}%")
- print(f" 最大值: {max(max_weekly_gains):.2f}%")
- print(f" 最小值: {min(max_weekly_gains):.2f}%")
-
- if max_weekly_losses:
- print(f"\n最大周跌幅统计:")
- print(f" 平均值: {np.mean(max_weekly_losses):.2f}%")
- print(f" 中位数: {np.median(max_weekly_losses):.2f}%")
- print(f" 最大值: {max(max_weekly_losses):.2f}%")
- print(f" 最小值: {min(max_weekly_losses):.2f}%")
-
- if avg_max_declines:
- print(f"\n平均最大跌幅统计:")
- print(f" 平均值: {np.mean(avg_max_declines):.2f}%")
- print(f" 中位数: {np.median(avg_max_declines):.2f}%")
- print(f" 最大值: {max(avg_max_declines):.2f}%")
- print(f" 最小值: {min(avg_max_declines):.2f}%")
-
- # 成功率统计 - 基于平均周涨跌幅是否为正
- positive_performance_count = sum(1 for r in avg_weekly_changes if r > 0) if avg_weekly_changes else 0
- success_rate = positive_performance_count / len(avg_weekly_changes) * 100 if avg_weekly_changes else 0
-
- # 基于最大周涨幅的成功率统计
- success_threshold = self.config['analysis_params']['success_threshold'] * 100
- high_gain_count = sum(1 for r in max_weekly_gains if r > success_threshold) if max_weekly_gains else 0
- high_gain_rate = high_gain_count / len(max_weekly_gains) * 100 if max_weekly_gains else 0
-
- print(f"\n=== 交易成功率分析 ===")
- print(f"正收益形态数量: {positive_performance_count}/{len(avg_weekly_changes) if avg_weekly_changes else 0}")
- print(f"正收益成功率: {success_rate:.1f}%")
- print(f"")
- print(f"高收益标准: 最大周涨幅 > {success_threshold}%")
- print(f"高收益形态数量: {high_gain_count}/{len(max_weekly_gains) if max_weekly_gains else 0}")
- print(f"高收益成功率: {high_gain_rate:.1f}%")
-
- return {
- 'total_futures': len(self.future_codes),
- 'decline_futures': total_decline_futures,
- 'pattern_count': total_patterns,
- 'analyzed_count': total_analyzed,
- 'avg_weekly_change': np.mean(avg_weekly_changes) if avg_weekly_changes else 0,
- 'avg_max_gain': np.mean(max_weekly_gains) if max_weekly_gains else 0,
- 'avg_max_loss': np.mean(max_weekly_losses) if max_weekly_losses else 0,
- 'avg_max_decline': np.mean(avg_max_declines) if avg_max_declines else 0,
- 'success_rate': success_rate,
- 'high_gain_rate': high_gain_rate,
- 'positive_count': positive_performance_count,
- 'high_gain_count': high_gain_count
- }
-
- def save_results(self, decline_futures, pattern_results, performance_results, summary_stats):
- """保存结果到CSV文件"""
- self._log("\n=== 保存分析结果 ===")
-
- timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
- encoding = self.config['output_settings']['csv_encoding']
-
- # 保存大幅下跌期货品种
- if decline_futures:
- decline_df = pd.DataFrame(decline_futures)
- decline_file = f'future_major_decline_{timestamp}.csv'
- decline_df.to_csv(decline_file, index=False, encoding=encoding)
- self._log(f"大幅下跌期货保存至: {decline_file}")
-
- # 保存识别的K线形态
- if pattern_results:
- pattern_df = pd.DataFrame(pattern_results)
- pattern_file = f'future_kline_patterns_{timestamp}.csv'
- pattern_df.to_csv(pattern_file, index=False, encoding=encoding)
- self._log(f"K线形态数据保存至: {pattern_file}")
-
- # 保存后续走势分析
- if performance_results:
- performance_df = pd.DataFrame(performance_results)
- performance_file = f'future_pattern_performance_{timestamp}.csv'
- performance_df.to_csv(performance_file, index=False, encoding=encoding)
- self._log(f"后续走势分析保存至: {performance_file}")
-
- # 保存汇总报告
- if summary_stats:
- summary_df = pd.DataFrame([summary_stats])
- summary_file = f'future_pattern_summary_{timestamp}.csv'
- summary_df.to_csv(summary_file, index=False, encoding=encoding)
- self._log(f"汇总报告保存至: {summary_file}")
-
- def run_analysis(self, mode=1, manual_config=None):
- """
- 统一的分析运行入口
-
- 参数:
- - mode: 执行模式
- 1 = 原始/自动模式:自动搜索符合条件的技术形态
- 2 = 验证模式:验证指定日期是否符合模式条件
- - manual_config: 手动配置字典,仅在mode=2时使用
-
- 返回:
- - dict: 分析结果
- """
-
- if mode == 1:
- return self._run_original_mode()
- elif mode == 2:
- if manual_config is None:
- raise ValueError("验证模式(mode=2)需要提供manual_config参数")
- return self._run_validation_mode(manual_config)
- else:
- raise ValueError("mode参数必须为1(原始模式)或2(验证模式)")
-
- def _run_original_mode(self):
- """原始/自动模式:自动搜索符合条件的技术形态"""
- print("开始期货技术形态规律分析... [原始模式]")
-
- # 构建期货品种基础数据
- self.build_future_codes()
-
- # 筛选大幅下跌的期货品种
- decline_futures = self.find_major_decline_futures()
-
- if not decline_futures:
- print("未找到符合条件的大幅下跌期货品种")
- return {}
-
- # 创建8888加权合约映射
- contract_mapping = self.get_simplified_contract_mapping(decline_futures)
-
- if not contract_mapping:
- print("未能创建8888合约映射")
- return {}
- else:
- print(f"创建8888合约映射成功,共{len(contract_mapping)}个合约")
- for contract_code, time_range in contract_mapping.items():
- start_date, end_date = time_range
- print(f" {contract_code}: {start_date.strftime('%Y-%m-%d')} 至 {end_date.strftime('%Y-%m-%d')}")
-
- # 获取周线K线数据
- weekly_data = self.get_weekly_kline_data(contract_mapping)
-
- if not weekly_data:
- print("未能获取周线数据")
- return {}
-
- # 识别目标K线形态
- pattern_results = self.identify_target_pattern(weekly_data, decline_futures)
-
- if not pattern_results:
- print("未识别出目标K线形态")
- return {}
-
- # 分析后续走势
- performance_results = self.analyze_future_performance(pattern_results, weekly_data)
-
- # 创建可视化
- self.create_visualizations(pattern_results, weekly_data)
-
- # 生成汇总报告
- summary_stats = self.generate_summary_report(decline_futures, pattern_results, performance_results)
-
- print(f"\n=== 分析完成 ===")
- print(f"共分析了{len(decline_futures)}个大幅下跌期货品种")
- print(f"识别出{len(pattern_results)}个目标K线形态")
- print(f"完成{len(performance_results)}个形态的后续走势分析")
-
- return {
- 'mode': 1,
- 'decline_futures': decline_futures,
- 'pattern_results': pattern_results,
- 'performance_results': performance_results,
- 'summary_stats': summary_stats
- }
-
- def _run_validation_mode(self, manual_config):
- """验证模式:验证指定日期是否符合模式条件"""
- print("开始期货技术形态验证分析... [验证模式]")
-
- # 构建期货品种基础数据(仅构建需要的品种)
- self.build_future_codes()
-
- # 从配置中提取需要验证的品种代码
- config_codes = list(manual_config.keys())
- print(f"配置的验证目标: {config_codes}")
-
- # 为每个配置的品种创建数据获取范围
- sample_futures = []
-
- for code in config_codes:
- if code in self.future_codes:
- contract_8888 = self.future_codes[code]
- date_list = manual_config[code]
-
- # 基于配置的日期计算数据获取范围
- config_dates = [pd.to_datetime(date_str) for date_str in date_list]
- earliest_date = min(config_dates)
- latest_date = max(config_dates)
-
- # 为了确保有足够的数据进行验证,在日期范围前后各扩展一些时间
- buffer_days = 90 # 3个月缓冲
- start_date = earliest_date - pd.Timedelta(days=buffer_days)
- end_date = latest_date + pd.Timedelta(days=buffer_days)
-
- sample_futures.append({
- '品种代码': code,
- '合约代码': contract_8888,
- '高点日期': start_date,
- '低点日期': end_date
- })
-
- print(f" ✓ {code} ({contract_8888}): {start_date.strftime('%Y-%m-%d')} 至 {end_date.strftime('%Y-%m-%d')}")
- else:
- print(f" ⚠️ 未找到品种 '{code}' 对应的合约代码")
-
- print(f"实际处理的品种数: {len(sample_futures)}")
-
- if not sample_futures:
- print("❌ 配置中的品种代码都无法找到对应的合约")
- return {}
-
- # 创建简化的合约映射
- contract_mapping = self.get_simplified_contract_mapping(sample_futures)
-
- if not contract_mapping:
- print("❌ 无法创建合约映射")
- return {}
-
- # 获取周线数据
- weekly_data = self.get_weekly_kline_data(contract_mapping)
-
- if not weekly_data:
- print("❌ 无法获取周线数据")
- return {}
-
- print(f"✓ 成功获取 {len(weekly_data)} 个品种的数据")
-
- # 执行手动验证
- validation_results = self.validate_manual_patterns(manual_config, weekly_data)
- validation_results['mode'] = 2
-
- return validation_results
- # =====================================================================================
- # 第三部分:主程序运行函数
- # 提供统一的程序入口和运行选项
- # =====================================================================================
- def run_pattern_analysis(mode=1, manual_config=None):
- """
- 统一的期货技术形态分析入口
-
- 参数:
- - mode: 执行模式
- 1 = 原始/自动模式:自动搜索符合条件的技术形态
- 2 = 验证模式:验证指定日期是否符合模式条件
- - manual_config: 手动配置字典,仅在mode=2时使用
-
- 返回:
- - dict: 分析结果
- """
- print("期货技术形态规律分析工具")
- print("="*60)
-
- # 打印配置信息
- AnalysisConfig.print_config()
- print("="*60)
-
- mode_name = "原始模式" if mode == 1 else "验证模式"
- print(f"开始分析... [{mode_name}]")
-
- # 创建分析器并运行
- analyzer = FuturePatternAnalyzer(
- AnalysisConfig.START_DATE,
- AnalysisConfig.END_DATE
- )
-
- results = analyzer.run_analysis(mode=mode, manual_config=manual_config)
- return results
-
- # =====================================================================================
- # 第四部分:辅助工具函数
- # 用于处理合约切换日志和数据导出
- # =====================================================================================
- def parse_contract_switch_log_from_text(log_text):
- """
- 从文本格式的合约切换日志中解析合约信息
-
- 参数:
- - log_text: 日志文本,格式示例:
- "A2203.XDCE:2022年1月26日至2022年2月22日
- A2207.XDCE:2022年2月23日至2022年6月21日"
-
- 返回:
- - list: 标准化的合约切换日志
- """
- import re
-
- contracts = []
- lines = log_text.strip().split('\n')
-
- for line in lines:
- line = line.strip()
- if not line:
- continue
-
- # 正则匹配合约代码和日期范围
- pattern = r'([A-Z0-9]+\.[A-Z]+):(\d{4})年(\d{1,2})月(\d{1,2})日至(\d{4})年(\d{1,2})月(\d{1,2})日'
- match = re.search(pattern, line)
-
- if match:
- contract_code = match.group(1)
- start_year = int(match.group(2))
- start_month = int(match.group(3))
- start_day = int(match.group(4))
- end_year = int(match.group(5))
- end_month = int(match.group(6))
- end_day = int(match.group(7))
-
- contracts.append({
- 'contract': contract_code,
- 'start_date': f'{start_year}-{start_month:02d}-{start_day:02d}',
- 'end_date': f'{end_year}-{end_month:02d}-{end_day:02d}'
- })
-
- return contracts
- def export_contract_weekly_data(weekly_data_dict, filename_prefix='contract_weekly_data'):
- """
- 导出合约周线数据到CSV文件
-
- 参数:
- - weekly_data_dict: get_weekly_data_by_contract_log返回的数据字典
- - filename_prefix: 文件名前缀
-
- 注意:导出的数据是直接从get_bars API获取的原生周线数据,
- 相比日线转换的数据具有更高的精度和准确性
- """
- import datetime
-
- timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
-
- for contract_code, contract_data in weekly_data_dict.items():
- # 清理文件名中的特殊字符
- safe_contract_name = contract_code.replace('.', '_')
- filename = f'{filename_prefix}_{safe_contract_name}_{timestamp}.csv'
-
- # 准备数据
- df = contract_data['data'].copy()
- df['contract'] = contract_code
- df['original_start'] = contract_data['original_start'].strftime('%Y-%m-%d')
- df['original_end'] = contract_data['original_end'].strftime('%Y-%m-%d')
-
- # 保存到CSV
- df.to_csv(filename, encoding='utf-8-sig')
- print(f"合约 {contract_code} 的周线数据已保存至: {filename}")
- def run_manual_pattern_validation(manual_config, analyzer=None):
- """
- 运行手动模式验证的便捷函数
-
- 此函数只会处理manual_config中明确指定的品种和时间范围,
- 不会查询额外的品种数据。现在使用统一的分析框架。
-
- 参数:
- - manual_config: 手动配置字典,格式:
- {'A':['2023-01-06','2023-06-09'], 'B':['2022-07-22','2024-08-23']}
- 键: 期货品种代码(如'A'、'B'、'RB'等)
- 值: 需要验证的日期列表('YYYY-MM-DD'格式)
- - analyzer: 可选的分析器实例,如果为None则创建新实例
-
- 返回:
- - dict: 验证结果,包含成功和失败的详细信息
-
- 注意:
- - 系统只会获取配置中指定品种的数据
- - 数据获取范围基于每个品种配置的日期范围自动计算
- - 不会处理未在配置中明确指定的品种
- - 现在使用统一的核心验证逻辑,确保与原始模式的一致性
- """
- print("\n=== 手动模式验证工具 ===")
- print("此功能用于验证指定日期是否符合预定义的技术形态模式")
- print("使用统一的验证逻辑,与原始模式保持完全一致")
-
- # 创建分析器实例(如果没有提供)
- if analyzer is None:
- analyzer = FuturePatternAnalyzer()
-
- # 使用统一的分析入口 - 验证模式
- validation_results = analyzer.run_analysis(mode=2, manual_config=manual_config)
-
- return validation_results
- def demo_manual_validation():
- """演示手动模式验证功能"""
- print("\n=== 手动模式验证演示 ===")
-
- # 示例配置
- demo_config = {
- 'A': ['2023-01-06', '2023-06-09', '2024-07-26'],
- 'B': ['2022-07-22', '2024-08-23', '2025-01-03']
- }
-
- print("使用演示配置:")
- for pattern_type, dates in demo_config.items():
- print(f" 模式 '{pattern_type}': {len(dates)} 个日期")
- for date in dates:
- print(f" - {date}")
-
- try:
- # 运行验证
- results = run_manual_pattern_validation(demo_config)
-
- if results:
- print("\n✅ 验证完成")
- return results
- else:
- print("\n⚠️ 验证未能完成")
- return {}
-
- except Exception as e:
- print(f"\n❌ 验证过程中出错: {str(e)}")
- import traceback
- traceback.print_exc()
- return {}
- def get_contract_weekly_data(contract_switch_log, extend_weeks=10, export_data=False):
- """
- 基于合约切换日志获取周线数据的正式方法
-
- 参数:
- - contract_switch_log: 合约切换日志,可以是文本格式或列表格式
- - extend_weeks: 延长周数,默认10周
- - export_data: 是否导出数据到CSV文件,默认False
-
- 返回:
- - dict: 每个合约的周线数据
- """
- print("\n=== 基于合约切换日志获取周线数据 ===")
-
- # 如果是文本格式,先解析
- if isinstance(contract_switch_log, str):
- print("解析合约切换日志:")
- contracts = parse_contract_switch_log_from_text(contract_switch_log)
- for contract in contracts:
- print(f" {contract['contract']}: {contract['start_date']} 至 {contract['end_date']}")
- else:
- contracts = contract_switch_log
- print(f"处理{len(contracts)}个合约的切换日志")
-
- print(f"\n创建分析器并获取周线数据 (延长{extend_weeks}周):")
- analyzer = FuturePatternAnalyzer()
-
- try:
- # 获取周线数据
- weekly_data = analyzer.get_weekly_data_by_contract_log(contracts, extend_weeks=extend_weeks)
-
- if weekly_data:
- print(" ✓ 获取成功!")
- for contract_code, data_info in weekly_data.items():
- print(f" {contract_code}: {data_info['total_weeks']}周数据")
- print(f" 原始范围: {data_info['original_start'].strftime('%Y-%m-%d')} 至 {data_info['original_end'].strftime('%Y-%m-%d')}")
- print(f" 延长范围: {data_info['extended_start'].strftime('%Y-%m-%d')} 至 {data_info['extended_end'].strftime('%Y-%m-%d')}")
-
- # 导出数据
- if export_data:
- export_contract_weekly_data(weekly_data)
-
- return weekly_data
- else:
- print(" ⚠️ 未获取到数据")
- return {}
-
- except Exception as e:
- print(f" ❌ 获取失败: {str(e)}")
- import traceback
- traceback.print_exc()
- return {}
- # =====================================================================================
- # 第五部分:程序主入口 - 自动运行分析
- # =====================================================================================
- print("期货市场技术形态规律分析工具")
- print("本工具专门研究期货品种在大幅下跌后出现特定K线形态的规律")
- print("适用于聚宽在线研究平台")
- print("="*80)
- # =====================================================================================
- # 基于合约切换日志的周线数据获取功能使用说明
- # =====================================================================================
- #
- # 使用示例1:处理文本格式的合约切换日志
- # sample_log = """
- # A2203.XDCE:2022年1月26日至2022年2月22日
- # A2207.XDCE:2022年2月23日至2022年6月21日
- # A2209.XDCE:2022年6月22日至2022年8月30日
- # """
- # weekly_data = get_contract_weekly_data(sample_log, extend_weeks=10, export_data=True)
- #
- # 使用示例2:处理列表格式的合约切换日志
- # contract_list = [
- # {'contract': 'A2203.XDCE', 'start_date': '2022-01-26', 'end_date': '2022-02-22'},
- # {'contract': 'A2207.XDCE', 'start_date': '2022-02-23', 'end_date': '2022-06-21'}
- # ]
- # weekly_data = get_contract_weekly_data(contract_list, extend_weeks=15, export_data=False)
- #
- # 新增功能1:模式识别失败时的完整K线图显示
- # 当identify_target_pattern函数未识别到任何目标形态时,系统会自动:
- # 1. 显示所有可用品种的完整烛台图
- # 2. 包含OHLC数据的专业K线图表现
- # 3. 添加统计信息(最高价、最低价、总涨跌幅等)
- # 4. 直接在界面中显示,不保存到本地文件
- #
- # 新增功能2:手动模式验证系统
- # 使用示例3:手动验证指定日期的模式匹配
- # 现在使用统一的入口函数,通过mode参数控制执行模式
- manual_config = {
- 'A': ['2023-01-06', '2023-06-09', '2024-07-26'],
- 'B': ['2022-07-22', '2024-08-23', '2025-01-03']
- }
- # 方法1: 使用统一入口函数
- validation_results = run_pattern_analysis(mode=2, manual_config=manual_config)
- # 方法2: 或者使用便捷函数(内部调用统一入口)
- # validation_results = run_manual_pattern_validation(manual_config)
- # 或运行演示: demo_manual_validation()
- #
- # 重构后的统一模式控制系统:
- # 1. mode=1(原始模式):自动搜索符合条件的技术形态,遍历所有期货品种
- # 2. mode=2(验证模式):验证指定日期是否符合模式条件,只处理配置品种
- # 3. 统一的核心验证逻辑:_core_pattern_validation()消除了代码重复
- # 4. 独立的执行路径:两种模式不再共享步骤1,完全独立执行
- # 5. 一致的验证标准:两种模式使用完全相同的模式识别条件和计算方法
- #
- # 数据获取特点:
- # 1. 直接使用get_bars API获取原生周线数据,避免日线转换的精度损失
- # 2. 自动延长时间范围(每个合约前后各延长指定周数)
- # 3. 确保获取充足的历史数据(最少60周作为背景)
- # 4. 支持数据导出到CSV文件便于后续分析
- # 5. 简化结构:直接使用8888加权合约,无需处理主力合约切换
- #
- # 简化后的工作流程:
- # 1. 识别符合条件的大幅下跌期货品种
- # 2. 为每个品种创建8888合约映射(单级字典结构)
- # 3. 直接从8888合约获取周线数据,避免主力合约复杂性
- # 4. 进行技术形态识别和后续走势分析
- # 5. 当未识别到目标形态时,自动显示完整的烛台图(K线图)
- # =====================================================================================
- # 直接运行分析
- # print("\n开始运行完整分析...")
- # try:
- # results = run_pattern_analysis()
- # except Exception as e:
- # print(f"运行过程中出现错误: {str(e)}")
- # import traceback
- # traceback.print_exc()
- # results = None
- # if results:
- # print("\n✅ 分析完成!")
- # if isinstance(results, dict) and 'summary_stats' in results:
- # summary = results['summary_stats']
- # print(f"\n📊 快速统计:")
- # print(f" - 识别到技术形态: {summary.get('pattern_count', 0)}个")
- # print(f" - 正收益成功率: {summary.get('success_rate', 0):.1f}%")
- # print(f" - 高收益成功率: {summary.get('high_gain_rate', 0):.1f}%")
- # print(f" - 平均周涨跌幅: {summary.get('avg_weekly_change', 0):+.2f}%")
- # else:
- # print("\n❌ 分析未能完成,请检查错误信息。")
|