""" 期货市场技术形态规律分析 - 综合版本 研究期货品种在大幅下跌后出现特定K线形态的规律,并分析后续走势 本文件包含: - 分析配置参数 - 主要分析类FuturePatternAnalyzer - 基于合约切换日志的周线数据获取功能 - 辅助工具函数和数据导出功能 - 完整的分析流程 主要功能: 1. get_contract_weekly_data() - 基于合约切换日志获取周线数据的正式方法 2. get_weekly_data_by_contract_log() - 核心数据获取逻辑,直接使用get_bars获取原生周线数据 3. 简化的get_weekly_kline_data() - 直接使用8888加权合约,避免主力合约切换复杂性 4. validate_manual_patterns() - 手动模式验证系统,支持指定日期的模式验证 5. run_manual_pattern_validation() - 便捷的手动验证入口函数 6. _display_full_candlestick_charts() - 模式识别失败时的完整K线图显示 数据结构简化: - 移除了主力合约获取和切换逻辑 - 使用单级字典结构:键为8888合约代码,值为日期范围 - 所有数据检索均使用8888加权合约,简化了代码复杂性 验证模式: - 自动搜索模式:使用identify_target_pattern()自动搜索符合条件的技术形态 - 手动验证模式:使用validate_manual_patterns()验证指定日期是否符合模式条件 - 两种模式使用相同的验证逻辑和标准,但工作流程独立 作者: jukuan研究团队 日期: 2025-09 适用平台: 聚宽在线研究平台 """ import pandas as pd import numpy as np from jqdata import * import datetime import matplotlib.pyplot as plt import matplotlib.patches as patches from matplotlib.dates import DateFormatter import warnings warnings.filterwarnings('ignore') # 中文字体设置 plt.rcParams['font.sans-serif'] = ['SimHei', 'Microsoft YaHei', 'DejaVu Sans'] plt.rcParams['axes.unicode_minus'] = False # ===================================================================================== # 第一部分:分析配置参数 # 用户可根据需要调整以下参数来控制分析行为 # ===================================================================================== class AnalysisConfig: """分析配置类 - 集中管理所有分析参数""" # ==================== 分析时间设置 ==================== START_DATE = '2022-01-01' # 分析起始日期 (建议至少2年数据) END_DATE = '2025-08-31' # 分析结束日期 # ==================== 筛选条件设置 ==================== MIN_DRAWDOWN_THRESHOLD = 0.30 # 最小回撤阈值 (0.3 = 30%) # K线形态识别条件 KLINE_CONDITIONS = { 'max_open_close_diff': 0.01, # 开盘价与收盘价最大差异 (0.01 = 1%) 'min_high_open_gain': 0.03, # 最高价相对开盘价的最小涨幅 (0.03 = 3%) } # ==================== 分析参数设置 ==================== ANALYSIS_EXTEND_DAYS = 30 # 扩展分析天数 (在下跌期间前后各扩展的天数) FOLLOW_UP_WEEKS = 10 # 后续跟踪周数 SUCCESS_RETURN_THRESHOLD = 0.05 # 成功率计算的收益率阈值 (0.05 = 5%) # ==================== 输出设置 ==================== GENERATE_CHARTS = True # 是否生成K线图 MAX_CHART_COUNT = 10 # 最大图表生成数量 (避免生成过多图片) CHART_OUTPUT_DIR = './' # 图表保存目录 CSV_ENCODING = 'utf-8-sig' # CSV文件编码 # ==================== 期货品种过滤设置 ==================== # 需要排除的期货品种 (可根据需要调整) EXCLUDED_FUTURES = [ 'IF9999.CCFX', # 'LU8888.XINE', # 'NR8888.XINE', # 'LG8888.XDCE', # 原木 - 新上市品种数据较少 ] # 重点关注的期货品种 (如果设置,将只分析这些品种) FOCUS_FUTURES = [] # 空列表表示分析所有品种 # ==================== 分析规模控制 ==================== MAX_ANALYSIS_FUTURES = 5 # 最大分析期货品种数 (-1表示分析所有品种) VERBOSE_LOGGING = True # 是否打印详细日志 SHOW_PROGRESS = True # 是否在分析过程中显示进度 # ==================== 数据质量控制 ==================== MIN_DATA_LENGTH = 100 # 最小数据长度要求 (日线数据点数) MIN_WEEKLY_DATA = 20 # 最小周线数据要求 MAX_MISSING_DATA_RATIO = 0.1 # 最大缺失数据比例 @classmethod def get_config_dict(cls): """获取分析配置字典""" return { 'time_range': { 'start_date': cls.START_DATE, 'end_date': cls.END_DATE, }, 'filter_conditions': { 'min_drawdown': cls.MIN_DRAWDOWN_THRESHOLD, 'kline_conditions': cls.KLINE_CONDITIONS, }, 'analysis_params': { 'extend_days': cls.ANALYSIS_EXTEND_DAYS, 'follow_up_weeks': cls.FOLLOW_UP_WEEKS, 'success_threshold': cls.SUCCESS_RETURN_THRESHOLD, }, 'output_settings': { 'generate_charts': cls.GENERATE_CHARTS, 'max_chart_count': cls.MAX_CHART_COUNT, 'chart_dir': cls.CHART_OUTPUT_DIR, 'csv_encoding': cls.CSV_ENCODING, }, 'future_filter': { 'excluded': cls.EXCLUDED_FUTURES, 'focus_only': cls.FOCUS_FUTURES, }, 'analysis_control': { 'max_futures': cls.MAX_ANALYSIS_FUTURES, 'verbose': cls.VERBOSE_LOGGING, 'show_progress': cls.SHOW_PROGRESS, }, 'data_quality': { 'min_data_length': cls.MIN_DATA_LENGTH, 'min_weekly_data': cls.MIN_WEEKLY_DATA, 'max_missing_ratio': cls.MAX_MISSING_DATA_RATIO, } } @classmethod def print_config(cls): """打印当前配置信息""" print("=== 期货技术形态分析配置 ===") print(f"分析时间范围: {cls.START_DATE} 至 {cls.END_DATE}") print(f"最小回撤阈值: {cls.MIN_DRAWDOWN_THRESHOLD*100:.0f}%") print(f"开收差异阈值: {cls.KLINE_CONDITIONS['max_open_close_diff']*100:.0f}%") print(f"最高涨幅阈值: {cls.KLINE_CONDITIONS['min_high_open_gain']*100:.0f}%") print(f"后续跟踪周数: {cls.FOLLOW_UP_WEEKS}") print(f"成功率阈值: {cls.SUCCESS_RETURN_THRESHOLD*100:.0f}%") if cls.EXCLUDED_FUTURES: print(f"排除品种数: {len(cls.EXCLUDED_FUTURES)}") if cls.FOCUS_FUTURES: print(f"重点品种数: {len(cls.FOCUS_FUTURES)}") print(f"生成图表: {'是' if cls.GENERATE_CHARTS else '否'}") print(f"最大分析品种数: {'所有' if cls.MAX_ANALYSIS_FUTURES == -1 else cls.MAX_ANALYSIS_FUTURES}") # ===================================================================================== # 第二部分:期货技术形态分析器主类 # 实现完整的技术形态识别和分析功能 # ===================================================================================== class FuturePatternAnalyzer: """期货技术形态分析器 - 主要分析类""" def __init__(self, start_date=None, end_date=None, config=None): """ 初始化分析器 参数: - start_date: 分析开始日期,默认使用配置 - end_date: 分析结束日期,默认使用配置 - config: 自定义配置,默认使用AnalysisConfig """ # 使用提供的配置或默认配置 if config is None: config = AnalysisConfig.get_config_dict() self.start_date = pd.to_datetime(start_date or config['time_range']['start_date']) self.end_date = pd.to_datetime(end_date or config['time_range']['end_date']) self.config = config self.future_codes = {} # 期货代码映射 # 期货品种与交易所的映射关系 self.exchange_map = { # 大连商品交易所 (XDCE) 'A': 'XDCE', 'B': 'XDCE', 'C': 'XDCE', 'CS': 'XDCE', 'FB': 'XDCE', 'I': 'XDCE', 'J': 'XDCE', 'JD': 'XDCE', 'JM': 'XDCE', 'L': 'XDCE', 'M': 'XDCE', 'P': 'XDCE', 'PP': 'XDCE', 'PG': 'XDCE', 'RR': 'XDCE', 'V': 'XDCE', 'Y': 'XDCE', 'EB': 'XDCE', 'EG': 'XDCE', 'LH': 'XDCE', # 郑州商品交易所 (XZCE) 'AP': 'XZCE', 'CF': 'XZCE', 'CY': 'XZCE', 'FG': 'XZCE', 'JR': 'XZCE', 'LR': 'XZCE', 'MA': 'XZCE', 'OI': 'XZCE', 'PM': 'XZCE', 'RI': 'XZCE', 'RM': 'XZCE', 'RS': 'XZCE', 'SF': 'XZCE', 'SM': 'XZCE', 'SR': 'XZCE', 'TA': 'XZCE', 'WH': 'XZCE', 'ZC': 'XZCE', 'CJ': 'XZCE', 'ME': 'XZCE', 'PF': 'XZCE', 'PK': 'XZCE', 'RO': 'XZCE', 'SA': 'XZCE', 'TC': 'XZCE', 'UR': 'XZCE', 'WS': 'XZCE', 'WT': 'XZCE', # 上海期货交易所 (XSGE) 'AG': 'XSGE', 'AL': 'XSGE', 'AU': 'XSGE', 'BU': 'XSGE', 'CU': 'XSGE', 'FU': 'XSGE', 'HC': 'XSGE', 'NI': 'XSGE', 'PB': 'XSGE', 'RB': 'XSGE', 'RU': 'XSGE', 'SN': 'XSGE', 'SP': 'XSGE', 'SS': 'XSGE', 'WR': 'XSGE', 'ZN': 'XSGE', # 上海国际能源交易中心 (XINE) 'BC': 'XINE', 'LU': 'XINE', 'NR': 'XINE', 'SC': 'XINE', # 中金所 (CCFX) 'IC': 'CCFX', 'IF': 'CCFX', 'IH': 'CCFX', 'T': 'CCFX', 'TF': 'CCFX', 'TS': 'CCFX', # 广期所 (GFEX) 'SI': 'GFEX', 'LC': 'GFEX', 'PS': 'GFEX' } self._log(f"初始化分析器 - 分析期间: {self.start_date.strftime('%Y-%m-%d')} 至 {self.end_date.strftime('%Y-%m-%d')}") def _log(self, message): """记录日志""" if self.config['analysis_control']['verbose']: print(message) def build_future_codes(self): """构建期货品种基础数据""" self._log("\n=== 步骤1: 构建期货品种基础数据 ===") # 构建8888主连合约代码 for code, exchange in self.exchange_map.items(): future_code = f"{code}8888.{exchange}" # 应用过滤器 if future_code in self.config['future_filter']['excluded']: continue # 如果设置了重点关注品种,只添加重点品种 if (self.config['future_filter']['focus_only'] and future_code not in self.config['future_filter']['focus_only']): continue self.future_codes[code] = future_code self._log(f"构建完成,共{len(self.future_codes)}个期货品种") if self.config['analysis_control']['verbose']: self._log("示例品种:") for i, (code, full_code) in enumerate(list(self.future_codes.items())[:5]): self._log(f" {code} -> {full_code}") return self.future_codes def get_future_price_data(self, future_code, start_date, end_date): """获取期货价格数据""" try: # print(f"获取{future_code}数据: {start_date} 至 {end_date}") data = get_price( future_code, start_date=start_date, end_date=end_date, frequency='daily', fields=['open', 'close', 'high', 'low', 'volume'], skip_paused=False, panel=False ) # print(f"获取的最后一条数据: {data.iloc[-1]}") return data except Exception as e: if self.config['analysis_control']['verbose']: print(f" 获取{future_code}数据失败: {str(e)}") return None def calculate_drawdown(self, data): """计算最大回撤""" if data is None or len(data) == 0: return None, None, None, None # 计算累计最高价 data['cum_max'] = data['high'].expanding().max() # 计算回撤 data['drawdown'] = (data['low'] - data['cum_max']) / data['cum_max'] # 找到最大回撤 max_drawdown = data['drawdown'].min() # 最大回撤是负值,所以用min max_drawdown_idx = data['drawdown'].idxmin() # 找到最大回撤对应的高点 max_high_before_dd = data.loc[:max_drawdown_idx, 'cum_max'].iloc[-1] high_point_idx = data[data['high'] == max_high_before_dd].index[0] return abs(max_drawdown), high_point_idx, max_drawdown_idx, data def find_major_decline_futures(self, min_drawdown=None): """筛选大幅下跌的期货品种""" if min_drawdown is None: min_drawdown = self.config['filter_conditions']['min_drawdown'] self._log(f"\n=== 步骤2: 筛选跌幅>{min_drawdown*100}%的期货品种 ===") decline_futures = [] total_count = len(self.future_codes) max_futures = self.config['analysis_control']['max_futures'] analyzed_count = 0 for code, future_code in self.future_codes.items(): analyzed_count += 1 # 最大品种数限制 if max_futures > 0 and analyzed_count > max_futures: break if self.config['analysis_control']['show_progress']: display_total = min(total_count, max_futures) if max_futures > 0 else total_count print(f"分析 {code}({future_code}) [{analyzed_count}/{display_total}]", end=" ") # 获取价格数据 price_data = self.get_future_price_data(future_code, self.start_date, self.end_date) if price_data is None or len(price_data) < self.config['data_quality']['min_data_length']: if self.config['analysis_control']['show_progress']: print("- 数据不足") continue # 计算最大回撤 max_drawdown, high_date, low_date, data_with_dd = self.calculate_drawdown(price_data) if max_drawdown is None or max_drawdown < min_drawdown: if self.config['analysis_control']['show_progress']: print(f"- 最大回撤{max_drawdown:.1%}不符合条件") continue # 记录符合条件的品种 decline_info = { '品种代码': code, '合约代码': future_code, '最大回撤': max_drawdown, '高点日期': high_date, '低点日期': low_date, '高点价格': data_with_dd.loc[high_date, 'high'], '低点价格': data_with_dd.loc[low_date, 'low'], '下跌天数': (low_date - high_date).days } decline_futures.append(decline_info) if self.config['analysis_control']['show_progress']: print(f"✓ 最大回撤{max_drawdown:.1%}, 下跌期间{high_date.strftime('%Y-%m-%d')}至{low_date.strftime('%Y-%m-%d')}") self._log(f"\n找到{len(decline_futures)}个符合条件的期货品种") return decline_futures def get_simplified_contract_mapping(self, decline_futures): """获取8888加权合约映射 - 简化版本,直接使用加权合约""" self._log("\n=== 步骤3: 创建8888加权合约映射 ===") contract_mapping = {} for future_info in decline_futures: code = future_info['品种代码'] high_date = future_info['高点日期'] low_date = future_info['低点日期'] # 扩展日期范围前后30天 extend_days = self.config['analysis_params']['extend_days'] start_date = high_date - pd.Timedelta(days=extend_days) end_date = low_date + pd.Timedelta(days=extend_days) # 构建8888加权合约代码 contract_8888 = self.future_codes.get(code) if contract_8888: # 直接使用8888合约,时间范围为分析期间 contract_mapping[contract_8888] = [start_date, end_date] self._log(f" ✓ {code}: 使用8888加权合约 {contract_8888}") self._log(f" 时间范围: {start_date.strftime('%Y-%m-%d')} 至 {end_date.strftime('%Y-%m-%d')}") else: self._log(f" ⚠️ {code}: 未找到对应的8888合约代码") self._log(f"完成8888合约映射,共{len(contract_mapping)}个合约") return contract_mapping def get_weekly_kline_data(self, contract_mapping): """获取周线K线数据 - 简化版本,直接使用8888加权合约""" self._log("\n=== 步骤4: 获取周线K线数据 ===") weekly_data = {} for contract_code, time_range in contract_mapping.items(): start_date, end_date = time_range self._log(f"获取{contract_code}周线数据 ({start_date.strftime('%Y-%m-%d')} 至 {end_date.strftime('%Y-%m-%d')})") try: # 计算需要获取的周数,确保充足的数据覆盖 time_diff = end_date - start_date estimated_weeks = int(time_diff.days / 7) + 20 # 额外增加20周缓冲 estimated_weeks = max(estimated_weeks, 40) # 最少40周 self._log(f" 计算获取{estimated_weeks}周数据") # 直接使用get_bars API获取原生周线数据,避免日线转换的精度损失 weekly_bars = get_bars( security=contract_code, count=estimated_weeks, unit='1w', # 直接获取周线数据 fields=['date', 'open', 'high', 'low', 'close', 'volume'], include_now=False, end_dt=end_date, fq_ref_date=None, df=True ) if weekly_bars is not None and len(weekly_bars) > 0: # 确保日期类型一致,避免 'Cannot compare type Timestamp with type date' 错误 # get_bars返回的date列可能是date类型,需统一转换为pandas Timestamp weekly_bars['date'] = pd.to_datetime(weekly_bars['date']) start_date = pd.to_datetime(start_date) end_date = pd.to_datetime(end_date) # 筛选在指定时间范围内的数据 mask = (weekly_bars['date'] >= start_date) & (weekly_bars['date'] <= end_date) filtered_weekly_data = weekly_bars[mask].copy() if len(filtered_weekly_data) > 0: # 设置日期为索引 filtered_weekly_data.set_index('date', inplace=True) # 从合约代码中提取品种代码(用于后续识别) symbol_code = contract_code.replace('8888.', '').split('.')[0] weekly_data[symbol_code] = { 'data': filtered_weekly_data, 'contract_code': contract_code, 'time_range': time_range } self._log(f" ✓ {symbol_code}({contract_code}): 获取{len(filtered_weekly_data)}根周K线") else: self._log(f" ⚠️ {contract_code}: 筛选后周线数据不足") else: self._log(f" ⚠️ {contract_code}: 未获取到周线数据") except Exception as e: self._log(f" ❌ 获取{contract_code}周线数据失败: {str(e)}") continue self._log(f"完成周线数据获取,共{len(weekly_data)}个品种") return weekly_data def get_weekly_data_by_contract_log(self, contract_switch_log, extend_weeks=10): """ 基于合约切换日志获取每个合约的周线数据 参数: - contract_switch_log: 合约切换日志,格式为列表,每个元素包含: {'contract': '合约代码', 'start_date': '开始日期', 'end_date': '结束日期'} - extend_weeks: 延长周数,默认10周 返回: - dict: 每个合约的周线数据,键为合约代码,值为数据和时间范围信息 """ self._log(f"\n=== 基于合约切换日志获取周线数据 ===") self._log(f"延长周数: {extend_weeks}周") weekly_data_by_contract = {} for contract_info in contract_switch_log: contract_code = contract_info.get('contract') start_date = contract_info.get('start_date') end_date = contract_info.get('end_date') if not all([contract_code, start_date, end_date]): self._log(f"⚠️ 合约信息不完整,跳过: {contract_info}") continue # 转换日期格式 if isinstance(start_date, str): start_date = pd.to_datetime(start_date) if isinstance(end_date, str): end_date = pd.to_datetime(end_date) # 计算延长后的时间范围 extended_start_date = start_date - pd.Timedelta(weeks=extend_weeks) extended_end_date = end_date + pd.Timedelta(weeks=extend_weeks) self._log(f"处理合约: {contract_code}") self._log(f" 原始时间范围: {start_date.strftime('%Y-%m-%d')} 至 {end_date.strftime('%Y-%m-%d')}") self._log(f" 延长时间范围: {extended_start_date.strftime('%Y-%m-%d')} 至 {extended_end_date.strftime('%Y-%m-%d')}") try: # 计算需要获取的周数,采用更稳健的方式 time_diff = extended_end_date - extended_start_date estimated_weeks = int(time_diff.days / 7) + 20 # 额外增加20周作为充足缓冲 # 确保最少获取60周数据(约1.2年)以提供充分的历史背景 estimated_weeks = max(estimated_weeks, 60) self._log(f" 计算获取{estimated_weeks}周数据 (时间跨度{time_diff.days}天)") # 直接使用get_bars API获取原生周线数据,避免日线转换的精度损失 weekly_bars = get_bars( security=contract_code, count=estimated_weeks, # 扩展的周数以确保完整覆盖 unit='1w', # 直接获取周线数据,避免转换精度损失 fields=['date', 'open', 'high', 'low', 'close', 'volume'], include_now=False, # 不包含当前未完成的周期 end_dt=extended_end_date, fq_ref_date=None, # 不复权,保持原始价格 df=True # 返回DataFrame格式便于处理 ) if weekly_bars is not None and len(weekly_bars) > 0: # 确保日期类型一致,避免 'Cannot compare type Timestamp with type date' 错误 # get_bars返回的date列可能是date类型,需统一转换为pandas Timestamp weekly_bars['date'] = pd.to_datetime(weekly_bars['date']) extended_start_date = pd.to_datetime(extended_start_date) extended_end_date = pd.to_datetime(extended_end_date) # 筛选在延长时间范围内的数据 mask = (weekly_bars['date'] >= extended_start_date) & (weekly_bars['date'] <= extended_end_date) filtered_data = weekly_bars[mask].copy() if len(filtered_data) > 0: # 设置日期为索引 filtered_data.set_index('date', inplace=True) # 存储数据 weekly_data_by_contract[contract_code] = { 'data': filtered_data, 'original_start': start_date, 'original_end': end_date, 'extended_start': extended_start_date, 'extended_end': extended_end_date, 'total_weeks': len(filtered_data) } self._log(f" ✓ 获取到{len(filtered_data)}根周K线数据") self._log(f" 数据时间范围: {filtered_data.index[0].strftime('%Y-%m-%d')} 至 {filtered_data.index[-1].strftime('%Y-%m-%d')}") # 添加数据质量检查 original_start_ts = pd.to_datetime(start_date) original_end_ts = pd.to_datetime(end_date) original_weeks = int((original_end_ts - original_start_ts).days / 7) + 1 expected_total_weeks = original_weeks + 2 * extend_weeks if len(filtered_data) < expected_total_weeks * 0.7: # 如果数据不足预期的70% self._log(f" ⚠️ 数据可能不完整,预期约{expected_total_weeks}周,实际{len(filtered_data)}周") else: self._log(f" ⚠️ 筛选后无有效数据") else: self._log(f" ⚠️ 未获取到周线数据") except Exception as e: self._log(f" ❌ 获取{contract_code}周线数据失败: {str(e)}") continue self._log(f"\n成功获取{len(weekly_data_by_contract)}个合约的周线数据") return weekly_data_by_contract def identify_target_pattern(self, weekly_data, decline_futures): """ 识别目标K线形态 当未识别到任何目标形态时,会自动显示完整的烛台图(K线图)作为数据可视化备选方案。 图表包含完整的OHLC数据,直接显示不保存到本地存储。 """ self._log("\n=== 步骤5: 识别目标K线形态 ===") pattern_results = [] kline_conditions = self.config['filter_conditions']['kline_conditions'] # 创建品种代码到下跌信息的映射 decline_info_map = {future['品种代码']: future for future in decline_futures} for code, data_info in weekly_data.items(): data = data_info['data'] contract_code = data_info['contract_code'] # 8888加权合约代码 # 从decline_futures中获取下跌时间信息 if code not in decline_info_map: continue decline_info = decline_info_map[code] decline_start = decline_info['高点日期'] decline_end = decline_info['低点日期'] extend_days = self.config['analysis_params']['extend_days'] self._log(f"分析{code}的K线形态 (使用{contract_code})") for idx, (date, row) in enumerate(data.iterrows()): open_price = row['open'] close_price = row['close'] high_price = row['high'] low_price = row['low'] volume = row['volume'] # 检查是否在下跌期间或附近 date_in_range = (date >= decline_start - pd.Timedelta(days=extend_days) and date <= decline_end + pd.Timedelta(days=extend_days)) if not date_in_range: continue # 使用统一的核心验证逻辑 validation_result = self._core_pattern_validation(date, row, kline_conditions) if validation_result['validation_passed']: pattern_info = { '品种代码': code, '日期': date, '开盘价': validation_result['ohlc_data']['open'], '收盘价': validation_result['ohlc_data']['close'], '最高价': validation_result['ohlc_data']['high'], '最低价': validation_result['ohlc_data']['low'], '成交量': validation_result['ohlc_data']['volume'], '开收差异率': validation_result['indicators']['price_diff_pct']['actual_value'], '最高涨幅率': validation_result['indicators']['high_gain_pct']['actual_value'], '下跌开始日期': decline_start, '下跌结束日期': decline_end, '数据索引': idx, '使用合约': contract_code # 8888加权合约 } pattern_results.append(pattern_info) price_diff_pct = validation_result['indicators']['price_diff_pct']['actual_value'] / 100 high_gain_pct = validation_result['indicators']['high_gain_pct']['actual_value'] / 100 self._log(f" ✓ 找到形态: {date.strftime('%Y-%m-%d')} 开收差{price_diff_pct:.1%} 最高涨{high_gain_pct:.1%} 合约{contract_code}") self._log(f"\n共识别出{len(pattern_results)}个目标K线形态") # 如果未识别到任何模式,显示完整的烛台图 if len(pattern_results) == 0: self._log("\n⚠️ 未识别到任何目标形态,显示完整数据集的K线图") self._display_full_candlestick_charts(weekly_data) return pattern_results def _display_full_candlestick_charts(self, weekly_data): """显示完整的烛台图(K线图)""" self._log("=== 显示完整数据集的K线图 ===") for code, data_info in weekly_data.items(): data = data_info['data'] contract_code = data_info['contract_code'] if data is None or len(data) == 0: self._log(f"⚠️ {code}: 无可用数据") continue self._log(f"绘制{code}({contract_code})的完整K线图,共{len(data)}根K线") try: # 创建图表 fig, ax = plt.subplots(figsize=(16, 10)) # 准备数据用于绘制烛台图 dates = data.index opens = data['open'] highs = data['high'] lows = data['low'] closes = data['close'] volumes = data['volume'] # 绘制K线图 for i in range(len(data)): date = dates[i] open_price = opens.iloc[i] high_price = highs.iloc[i] low_price = lows.iloc[i] close_price = closes.iloc[i] # K线颜色:红涨绿跌 color = 'red' if close_price > open_price else 'green' edge_color = 'darkred' if close_price > open_price else 'darkgreen' # 绘制影线(最高价到最低价的竖线) ax.plot([i, i], [low_price, high_price], color='black', linewidth=1) # 绘制实体(开盘价到收盘价的矩形) body_height = abs(close_price - open_price) bottom = min(open_price, close_price) # 使用矩形绘制K线实体 rect = patches.Rectangle((i-0.4, bottom), 0.8, body_height, linewidth=1, edgecolor=edge_color, facecolor=color, alpha=0.8) ax.add_patch(rect) # 设置图表标题和标签 ax.set_title(f'{code} ({contract_code}) 完整周线K线图\n' f'数据期间: {dates[0].strftime("%Y-%m-%d")} 至 {dates[-1].strftime("%Y-%m-%d")} ' f'(共{len(data)}根K线)', fontsize=14, fontweight='bold', pad=20) ax.set_xlabel('时间', fontsize=12) ax.set_ylabel('价格', fontsize=12) ax.grid(True, alpha=0.3) # 设置x轴标签 # 选择合适的标签间隔,避免过于密集 step = max(1, len(data) // 10) # 显示约10个时间标签 tick_positions = range(0, len(data), step) tick_labels = [dates[i].strftime('%Y-%m-%d') for i in tick_positions] ax.set_xticks(tick_positions) ax.set_xticklabels(tick_labels, rotation=45, ha='right') # 添加统计信息 max_price = highs.max() min_price = lows.min() latest_close = closes.iloc[-1] total_change = (latest_close - closes.iloc[0]) / closes.iloc[0] * 100 # 在图表上添加统计文本 stats_text = (f'最高价: {max_price:.2f}\n' f'最低价: {min_price:.2f}\n' f'最新收盘: {latest_close:.2f}\n' f'总涨跌幅: {total_change:+.2f}%') ax.text(0.02, 0.98, stats_text, transform=ax.transAxes, verticalalignment='top', bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.8), fontsize=10) # 调整布局并显示 plt.tight_layout() plt.show() self._log(f"✓ 已显示{code}的完整K线图") except Exception as e: self._log(f"❌ 绘制{code}K线图时出错: {str(e)}") continue self._log("完成所有品种的K线图显示") def _core_pattern_validation(self, target_date, kline_data_row, kline_conditions): """ 统一的核心模式验证逻辑 此函数包含两种模式共同的验证逻辑,消除代码重复。 无论是自动模式遍历所有K线,还是验证模式检查特定日期, 都使用相同的验证标准和计算方法。 参数: - target_date: 目标日期 (pandas Timestamp) - kline_data_row: 单根K线数据 (pandas Series,包含open、high、low、close、volume) - kline_conditions: 验证条件配置字典 返回: - dict: 包含验证结果和详细诊断信息的字典 """ # 提取OHLC数据 open_price = kline_data_row['open'] close_price = kline_data_row['close'] high_price = kline_data_row['high'] low_price = kline_data_row['low'] volume = kline_data_row['volume'] # 计算模式指标 price_diff_pct = abs(close_price - open_price) / open_price high_gain_pct = (high_price - open_price) / open_price # 验证条件A: 开盘价与收盘价差异 condition_a = price_diff_pct <= kline_conditions['max_open_close_diff'] condition_a_threshold = kline_conditions['max_open_close_diff'] # 验证条件B: 最高价相对开盘价涨幅 condition_b = high_gain_pct >= kline_conditions['min_high_open_gain'] condition_b_threshold = kline_conditions['min_high_open_gain'] # 整体验证结果 validation_passed = condition_a and condition_b # 构建详细的验证结果 result = { 'date': target_date, 'validation_passed': validation_passed, 'ohlc_data': { 'open': round(open_price, 2), 'high': round(high_price, 2), 'low': round(low_price, 2), 'close': round(close_price, 2), 'volume': volume }, 'indicators': { 'price_diff_pct': { 'actual_value': round(price_diff_pct * 100, 2), 'threshold': round(condition_a_threshold * 100, 2), 'unit': '%', 'condition': '≤', 'passed': condition_a, 'description': '开盘价与收盘价差异率' }, 'high_gain_pct': { 'actual_value': round(high_gain_pct * 100, 2), 'threshold': round(condition_b_threshold * 100, 2), 'unit': '%', 'condition': '≥', 'passed': condition_b, 'description': '最高价相对开盘价涨幅' } } } # 生成失败原因 if not validation_passed: failed_conditions = [] if not condition_a: failed_conditions.append(f"开收差异率{price_diff_pct*100:.2f}% > 阈值{condition_a_threshold*100:.1f}%") if not condition_b: failed_conditions.append(f"最高涨幅{high_gain_pct*100:.2f}% < 阈值{condition_b_threshold*100:.1f}%") result['failure_reason'] = "; ".join(failed_conditions) return result def validate_manual_patterns(self, manual_config, weekly_data): """ 手动验证模式功能 参数: - manual_config: 手动配置字典,格式: {'B':['2022-07-22','2024-08-23'], 'A':['2023-01-06','2023-06-09']} - weekly_data: 周线数据字典 返回: - dict: 验证结果,包含成功和失败的详细信息 """ self._log("\n=== 手动模式验证 ===") self._log(f"配置的模式类型数: {len(manual_config)}") validation_results = { 'successful_validations': [], 'failed_validations': [], 'summary': {} } kline_conditions = self.config['filter_conditions']['kline_conditions'] for pattern_type, date_list in manual_config.items(): self._log(f"\n处理模式类型 '{pattern_type}': {len(date_list)} 个日期") type_results = { 'pattern_type': pattern_type, 'total_dates': len(date_list), 'successful_count': 0, 'failed_count': 0, 'validations': [] } for date_str in date_list: try: # 转换日期格式 target_date = pd.to_datetime(date_str) self._log(f" 验证日期: {target_date.strftime('%Y-%m-%d')}") # 验证这个日期 validation_result = self._validate_single_date( target_date, pattern_type, weekly_data, kline_conditions ) type_results['validations'].append(validation_result) if validation_result['validation_passed']: type_results['successful_count'] += 1 validation_results['successful_validations'].append(validation_result) self._log(f" ✓ 验证通过") else: type_results['failed_count'] += 1 validation_results['failed_validations'].append(validation_result) self._log(f" ❌ 验证失败: {validation_result['failure_reason']}") except Exception as e: error_result = { 'pattern_type': pattern_type, 'target_date': date_str, 'validation_passed': False, 'failure_reason': f'日期处理错误: {str(e)}', 'error': True } type_results['validations'].append(error_result) type_results['failed_count'] += 1 validation_results['failed_validations'].append(error_result) self._log(f" ❌ 日期处理错误: {str(e)}") validation_results['summary'][pattern_type] = type_results # 打印总体统计 total_validations = sum(result['total_dates'] for result in validation_results['summary'].values()) total_successful = len(validation_results['successful_validations']) total_failed = len(validation_results['failed_validations']) self._log(f"\n=== 验证结果汇总 ===") self._log(f"总验证数: {total_validations}") self._log(f"成功验证: {total_successful}") self._log(f"失败验证: {total_failed}") self._log(f"成功率: {(total_successful/total_validations*100) if total_validations > 0 else 0:.1f}%") # 显示失败详情 if validation_results['failed_validations']: self._display_validation_failures(validation_results['failed_validations']) return validation_results def _validate_single_date(self, target_date, pattern_type, weekly_data, kline_conditions): """验证单个日期的模式 - 使用统一的核心验证逻辑""" validation_result = { 'pattern_type': pattern_type, 'target_date': target_date.strftime('%Y-%m-%d'), 'validation_passed': False, 'failure_reason': '', 'diagnostics': {} } # 查找包含目标日期的数据 found_data = None for code, data_info in weekly_data.items(): data = data_info['data'] contract_code = data_info['contract_code'] # 检查目标日期是否在数据范围内 if target_date >= data.index[0] and target_date <= data.index[-1]: # 找到最接近的周线数据 closest_idx = data.index.get_indexer([target_date], method='nearest')[0] if closest_idx >= 0 and closest_idx < len(data): found_data = data.iloc[closest_idx] validation_result['contract_code'] = contract_code validation_result['closest_date'] = data.index[closest_idx].strftime('%Y-%m-%d') break if found_data is None: validation_result['failure_reason'] = '未找到匹配的数据' return validation_result # 使用统一的核心验证逻辑 core_result = self._core_pattern_validation(target_date, found_data, kline_conditions) # 整合核心结果到验证结果 validation_result['validation_passed'] = core_result['validation_passed'] validation_result['diagnostics']['ohlc_data'] = core_result['ohlc_data'] validation_result['diagnostics']['indicators'] = core_result['indicators'] if 'failure_reason' in core_result: validation_result['failure_reason'] = core_result['failure_reason'] return validation_result def _display_validation_failures(self, failed_validations): """显示详细的验证失败信息""" self._log("\n=== 详细失败诊断 ===") for i, failure in enumerate(failed_validations, 1): if failure.get('error'): continue # 跳过错误情况 self._log(f"\n失败 #{i}: 模式类型 '{failure['pattern_type']}' - 日期 {failure['target_date']}") if 'contract_code' in failure: self._log(f" 合约: {failure['contract_code']}") self._log(f" 最近日期: {failure['closest_date']}") # 显示OHLC数据 if 'ohlc_data' in failure['diagnostics']: ohlc = failure['diagnostics']['ohlc_data'] self._log(f" OHLC数据:") self._log(f" 开盘: {ohlc['open']:>8}") self._log(f" 最高: {ohlc['high']:>8}") self._log(f" 最低: {ohlc['low']:>8}") self._log(f" 收盘: {ohlc['close']:>8}") self._log(f" 成交量: {ohlc['volume']}") # 显示指标验证详情 if 'indicators' in failure['diagnostics']: self._log(f" 指标验证:") for indicator_name, indicator_data in failure['diagnostics']['indicators'].items(): status = "✓" if indicator_data['passed'] else "❌" self._log(f" {status} {indicator_data['description']}:") self._log(f" 实际值: {indicator_data['actual_value']}{indicator_data['unit']}") self._log(f" 阈值: {indicator_data['condition']} {indicator_data['threshold']}{indicator_data['unit']}") self._log(f" 结果: {'通过' if indicator_data['passed'] else '失败'}") self._log(f" 失败原因: {failure['failure_reason']}") def analyze_future_performance(self, pattern_results, weekly_data): """分析后续走势 - 逐周详细分析""" self._log("\n=== 步骤6: 分析后续10周逐周表现 ===") performance_results = [] follow_up_weeks = self.config['analysis_params']['follow_up_weeks'] for pattern in pattern_results: code = pattern['品种代码'] pattern_date = pattern['日期'] pattern_idx = pattern['数据索引'] pattern_close_price = pattern['收盘价'] if code not in weekly_data: continue data = weekly_data[code]['data'] # 基础信息 future_performance = { '品种代码': code, '形态日期': pattern_date, '形态收盘价': pattern_close_price } # 详细的逐周分析 print(f"\n{code} ({pattern_date.strftime('%Y-%m-%d')}) 后续10周详细分析:") print("-" * 80) print(f"{'周次':<4} {'日期':<12} {'开盘':<8} {'收盘':<8} {'最高':<8} {'最低':<8} " f"{'周涨跌幅':<10} {'最大跌幅':<10}") print("-" * 80) # 逐周分析 for weeks in range(1, follow_up_weeks + 1): future_idx = pattern_idx + weeks if future_idx < len(data): future_row = data.iloc[future_idx] week_date = data.index[future_idx] week_open = future_row['open'] week_close = future_row['close'] week_high = future_row['high'] week_low = future_row['low'] # a) 每周的涨跌幅:收盘价相对于开盘价的涨跌幅度 weekly_change_pct = (week_close - week_open) / week_open * 100 # b) 每周的最大跌幅:开盘价到当周最低价的最大跌幅 max_decline_pct = (week_low - week_open) / week_open * 100 # 存储详细数据 future_performance[f'第{weeks}周_日期'] = week_date.strftime('%Y-%m-%d') future_performance[f'第{weeks}周_开盘价'] = round(week_open, 2) future_performance[f'第{weeks}周_收盘价'] = round(week_close, 2) future_performance[f'第{weeks}周_最高价'] = round(week_high, 2) future_performance[f'第{weeks}周_最低价'] = round(week_low, 2) future_performance[f'第{weeks}周_涨跌幅'] = round(weekly_change_pct, 2) future_performance[f'第{weeks}周_最大跌幅'] = round(max_decline_pct, 2) # 打印逐周详细信息 print(f"{weeks:>2} {week_date.strftime('%Y-%m-%d'):<12} " f"{week_open:<8.2f} {week_close:<8.2f} {week_high:<8.2f} {week_low:<8.2f} " f"{weekly_change_pct:>+8.2f}% {max_decline_pct:>+8.2f}%") else: # 数据不足的情况 for field in ['日期', '开盘价', '收盘价', '最高价', '最低价', '涨跌幅', '最大跌幅']: future_performance[f'第{weeks}周_{field}'] = None print(f"{weeks:>2} {'无数据':<12} {'--':<8} {'--':<8} {'--':<8} {'--':<8} " f"{'--':<10} {'--':<10}") print("-" * 80) # 计算统计指标 weekly_changes = [future_performance[f'第{w}周_涨跌幅'] for w in range(1, follow_up_weeks + 1) if future_performance[f'第{w}周_涨跌幅'] is not None] max_declines = [future_performance[f'第{w}周_最大跌幅'] for w in range(1, follow_up_weeks + 1) if future_performance[f'第{w}周_最大跌幅'] is not None] if weekly_changes: avg_weekly_change = sum(weekly_changes) / len(weekly_changes) max_weekly_gain = max(weekly_changes) max_weekly_loss = min(weekly_changes) future_performance['统计_平均周涨跌幅'] = round(avg_weekly_change, 2) future_performance['统计_最大周涨幅'] = round(max_weekly_gain, 2) future_performance['统计_最大周跌幅'] = round(max_weekly_loss, 2) print(f"统计汇总:") print(f" 平均周涨跌幅: {avg_weekly_change:+.2f}%") print(f" 最大周涨幅: {max_weekly_gain:+.2f}%") print(f" 最大周跌幅: {max_weekly_loss:+.2f}%") if max_declines: avg_max_decline = sum(max_declines) / len(max_declines) worst_decline = min(max_declines) future_performance['统计_平均最大跌幅'] = round(avg_max_decline, 2) future_performance['统计_最差跌幅'] = round(worst_decline, 2) print(f" 平均最大跌幅: {avg_max_decline:+.2f}%") print(f" 最差单周跌幅: {worst_decline:+.2f}%") performance_results.append(future_performance) print("=" * 80) return performance_results def create_visualizations(self, pattern_results, weekly_data): """创建K线图可视化""" if not self.config['output_settings']['generate_charts']: return self._log("\n=== 步骤7: 生成K线图可视化 ===") max_charts = min(len(pattern_results), self.config['output_settings']['max_chart_count']) for i, pattern in enumerate(pattern_results[:max_charts]): code = pattern['品种代码'] pattern_date = pattern['日期'] pattern_idx = pattern['数据索引'] if code not in weekly_data: continue data = weekly_data[code]['data'] # 准备绘图数据(形态前后各5周) start_idx = max(0, pattern_idx - 5) end_idx = min(len(data), pattern_idx + 11) plot_data = data.iloc[start_idx:end_idx] # 创建K线图 fig, ax = plt.subplots(figsize=(12, 8)) # 绘制K线 for j, (date, row) in enumerate(plot_data.iterrows()): open_price = row['open'] close_price = row['close'] high_price = row['high'] low_price = row['low'] # K线颜色 color = 'red' if close_price > open_price else 'green' # 绘制影线 ax.plot([j, j], [low_price, high_price], color='black', linewidth=1) # 绘制实体 body_height = abs(close_price - open_price) if close_price > open_price: rect = patches.Rectangle((j-0.3, open_price), 0.6, body_height, linewidth=1, edgecolor='red', facecolor='red', alpha=0.7) else: rect = patches.Rectangle((j-0.3, close_price), 0.6, body_height, linewidth=1, edgecolor='green', facecolor='green', alpha=0.7) ax.add_patch(rect) # 标记目标形态 if date == pattern_date: ax.scatter(j, high_price + (high_price - low_price) * 0.1, color='yellow', s=100, marker='*', zorder=5) ax.text(j, high_price + (high_price - low_price) * 0.15, '目标形态', ha='center', fontsize=10, color='red', fontweight='bold') # 设置图表 ax.set_title(f'{code} 周线K线图 - 目标形态分析\n形态日期: {pattern_date.strftime("%Y-%m-%d")}', fontsize=14, fontweight='bold') ax.set_xlabel('时间', fontsize=12) ax.set_ylabel('价格', fontsize=12) ax.grid(True, alpha=0.3) # 设置x轴标签 dates = [date.strftime('%m-%d') for date in plot_data.index] ax.set_xticks(range(len(dates))) ax.set_xticklabels(dates, rotation=45) # 显示图表 plt.tight_layout() plt.show() # 直接在界面中显示图表 self._log(f"显示图表: {code} {pattern_date.strftime('%Y-%m-%d')} K线形态分析") def generate_summary_report(self, decline_futures, pattern_results, performance_results): """生成整体规律总结报告""" self._log("\n=== 步骤8: 生成统计报告 ===") # 基础统计 total_decline_futures = len(decline_futures) total_patterns = len(pattern_results) total_analyzed = len(performance_results) print(f"\n=== 期货技术形态规律分析报告 ===") print(f"分析期间: {self.start_date.strftime('%Y-%m-%d')} 至 {self.end_date.strftime('%Y-%m-%d')}") print(f"总分析期货品种数: {len(self.future_codes)}") print(f"大幅下跌品种数: {total_decline_futures}") print(f"识别出的目标形态数: {total_patterns}") print(f"完成后续走势分析数: {total_analyzed}") if total_analyzed == 0: print("无有效数据进行统计分析") return {} # 收益率统计 print(f"\n=== 后续走势统计分析 ===") # 提取所有统计指标 avg_weekly_changes = [r['统计_平均周涨跌幅'] for r in performance_results if '统计_平均周涨跌幅' in r] max_weekly_gains = [r['统计_最大周涨幅'] for r in performance_results if '统计_最大周涨幅' in r] max_weekly_losses = [r['统计_最大周跌幅'] for r in performance_results if '统计_最大周跌幅' in r] avg_max_declines = [r['统计_平均最大跌幅'] for r in performance_results if '统计_平均最大跌幅' in r] if avg_weekly_changes: print(f"平均周涨跌幅统计:") print(f" 平均值: {np.mean(avg_weekly_changes):.2f}%") print(f" 中位数: {np.median(avg_weekly_changes):.2f}%") print(f" 最大值: {max(avg_weekly_changes):.2f}%") print(f" 最小值: {min(avg_weekly_changes):.2f}%") positive_weeks = sum(1 for r in avg_weekly_changes if r > 0) print(f" 正收益形态比例: {positive_weeks}/{len(avg_weekly_changes)} ({positive_weeks/len(avg_weekly_changes)*100:.1f}%)") if max_weekly_gains: print(f"\n最大周涨幅统计:") print(f" 平均值: {np.mean(max_weekly_gains):.2f}%") print(f" 中位数: {np.median(max_weekly_gains):.2f}%") print(f" 最大值: {max(max_weekly_gains):.2f}%") print(f" 最小值: {min(max_weekly_gains):.2f}%") if max_weekly_losses: print(f"\n最大周跌幅统计:") print(f" 平均值: {np.mean(max_weekly_losses):.2f}%") print(f" 中位数: {np.median(max_weekly_losses):.2f}%") print(f" 最大值: {max(max_weekly_losses):.2f}%") print(f" 最小值: {min(max_weekly_losses):.2f}%") if avg_max_declines: print(f"\n平均最大跌幅统计:") print(f" 平均值: {np.mean(avg_max_declines):.2f}%") print(f" 中位数: {np.median(avg_max_declines):.2f}%") print(f" 最大值: {max(avg_max_declines):.2f}%") print(f" 最小值: {min(avg_max_declines):.2f}%") # 成功率统计 - 基于平均周涨跌幅是否为正 positive_performance_count = sum(1 for r in avg_weekly_changes if r > 0) if avg_weekly_changes else 0 success_rate = positive_performance_count / len(avg_weekly_changes) * 100 if avg_weekly_changes else 0 # 基于最大周涨幅的成功率统计 success_threshold = self.config['analysis_params']['success_threshold'] * 100 high_gain_count = sum(1 for r in max_weekly_gains if r > success_threshold) if max_weekly_gains else 0 high_gain_rate = high_gain_count / len(max_weekly_gains) * 100 if max_weekly_gains else 0 print(f"\n=== 交易成功率分析 ===") print(f"正收益形态数量: {positive_performance_count}/{len(avg_weekly_changes) if avg_weekly_changes else 0}") print(f"正收益成功率: {success_rate:.1f}%") print(f"") print(f"高收益标准: 最大周涨幅 > {success_threshold}%") print(f"高收益形态数量: {high_gain_count}/{len(max_weekly_gains) if max_weekly_gains else 0}") print(f"高收益成功率: {high_gain_rate:.1f}%") return { 'total_futures': len(self.future_codes), 'decline_futures': total_decline_futures, 'pattern_count': total_patterns, 'analyzed_count': total_analyzed, 'avg_weekly_change': np.mean(avg_weekly_changes) if avg_weekly_changes else 0, 'avg_max_gain': np.mean(max_weekly_gains) if max_weekly_gains else 0, 'avg_max_loss': np.mean(max_weekly_losses) if max_weekly_losses else 0, 'avg_max_decline': np.mean(avg_max_declines) if avg_max_declines else 0, 'success_rate': success_rate, 'high_gain_rate': high_gain_rate, 'positive_count': positive_performance_count, 'high_gain_count': high_gain_count } def save_results(self, decline_futures, pattern_results, performance_results, summary_stats): """保存结果到CSV文件""" self._log("\n=== 保存分析结果 ===") timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S') encoding = self.config['output_settings']['csv_encoding'] # 保存大幅下跌期货品种 if decline_futures: decline_df = pd.DataFrame(decline_futures) decline_file = f'future_major_decline_{timestamp}.csv' decline_df.to_csv(decline_file, index=False, encoding=encoding) self._log(f"大幅下跌期货保存至: {decline_file}") # 保存识别的K线形态 if pattern_results: pattern_df = pd.DataFrame(pattern_results) pattern_file = f'future_kline_patterns_{timestamp}.csv' pattern_df.to_csv(pattern_file, index=False, encoding=encoding) self._log(f"K线形态数据保存至: {pattern_file}") # 保存后续走势分析 if performance_results: performance_df = pd.DataFrame(performance_results) performance_file = f'future_pattern_performance_{timestamp}.csv' performance_df.to_csv(performance_file, index=False, encoding=encoding) self._log(f"后续走势分析保存至: {performance_file}") # 保存汇总报告 if summary_stats: summary_df = pd.DataFrame([summary_stats]) summary_file = f'future_pattern_summary_{timestamp}.csv' summary_df.to_csv(summary_file, index=False, encoding=encoding) self._log(f"汇总报告保存至: {summary_file}") def run_analysis(self, mode=1, manual_config=None): """ 统一的分析运行入口 参数: - mode: 执行模式 1 = 原始/自动模式:自动搜索符合条件的技术形态 2 = 验证模式:验证指定日期是否符合模式条件 - manual_config: 手动配置字典,仅在mode=2时使用 返回: - dict: 分析结果 """ if mode == 1: return self._run_original_mode() elif mode == 2: if manual_config is None: raise ValueError("验证模式(mode=2)需要提供manual_config参数") return self._run_validation_mode(manual_config) else: raise ValueError("mode参数必须为1(原始模式)或2(验证模式)") def _run_original_mode(self): """原始/自动模式:自动搜索符合条件的技术形态""" print("开始期货技术形态规律分析... [原始模式]") # 构建期货品种基础数据 self.build_future_codes() # 筛选大幅下跌的期货品种 decline_futures = self.find_major_decline_futures() if not decline_futures: print("未找到符合条件的大幅下跌期货品种") return {} # 创建8888加权合约映射 contract_mapping = self.get_simplified_contract_mapping(decline_futures) if not contract_mapping: print("未能创建8888合约映射") return {} else: print(f"创建8888合约映射成功,共{len(contract_mapping)}个合约") for contract_code, time_range in contract_mapping.items(): start_date, end_date = time_range print(f" {contract_code}: {start_date.strftime('%Y-%m-%d')} 至 {end_date.strftime('%Y-%m-%d')}") # 获取周线K线数据 weekly_data = self.get_weekly_kline_data(contract_mapping) if not weekly_data: print("未能获取周线数据") return {} # 识别目标K线形态 pattern_results = self.identify_target_pattern(weekly_data, decline_futures) if not pattern_results: print("未识别出目标K线形态") return {} # 分析后续走势 performance_results = self.analyze_future_performance(pattern_results, weekly_data) # 创建可视化 self.create_visualizations(pattern_results, weekly_data) # 生成汇总报告 summary_stats = self.generate_summary_report(decline_futures, pattern_results, performance_results) print(f"\n=== 分析完成 ===") print(f"共分析了{len(decline_futures)}个大幅下跌期货品种") print(f"识别出{len(pattern_results)}个目标K线形态") print(f"完成{len(performance_results)}个形态的后续走势分析") return { 'mode': 1, 'decline_futures': decline_futures, 'pattern_results': pattern_results, 'performance_results': performance_results, 'summary_stats': summary_stats } def _run_validation_mode(self, manual_config): """验证模式:验证指定日期是否符合模式条件""" print("开始期货技术形态验证分析... [验证模式]") # 构建期货品种基础数据(仅构建需要的品种) self.build_future_codes() # 从配置中提取需要验证的品种代码 config_codes = list(manual_config.keys()) print(f"配置的验证目标: {config_codes}") # 为每个配置的品种创建数据获取范围 sample_futures = [] for code in config_codes: if code in self.future_codes: contract_8888 = self.future_codes[code] date_list = manual_config[code] # 基于配置的日期计算数据获取范围 config_dates = [pd.to_datetime(date_str) for date_str in date_list] earliest_date = min(config_dates) latest_date = max(config_dates) # 为了确保有足够的数据进行验证,在日期范围前后各扩展一些时间 buffer_days = 90 # 3个月缓冲 start_date = earliest_date - pd.Timedelta(days=buffer_days) end_date = latest_date + pd.Timedelta(days=buffer_days) sample_futures.append({ '品种代码': code, '合约代码': contract_8888, '高点日期': start_date, '低点日期': end_date }) print(f" ✓ {code} ({contract_8888}): {start_date.strftime('%Y-%m-%d')} 至 {end_date.strftime('%Y-%m-%d')}") else: print(f" ⚠️ 未找到品种 '{code}' 对应的合约代码") print(f"实际处理的品种数: {len(sample_futures)}") if not sample_futures: print("❌ 配置中的品种代码都无法找到对应的合约") return {} # 创建简化的合约映射 contract_mapping = self.get_simplified_contract_mapping(sample_futures) if not contract_mapping: print("❌ 无法创建合约映射") return {} # 获取周线数据 weekly_data = self.get_weekly_kline_data(contract_mapping) if not weekly_data: print("❌ 无法获取周线数据") return {} print(f"✓ 成功获取 {len(weekly_data)} 个品种的数据") # 执行手动验证 validation_results = self.validate_manual_patterns(manual_config, weekly_data) validation_results['mode'] = 2 return validation_results # ===================================================================================== # 第三部分:主程序运行函数 # 提供统一的程序入口和运行选项 # ===================================================================================== def run_pattern_analysis(mode=1, manual_config=None): """ 统一的期货技术形态分析入口 参数: - mode: 执行模式 1 = 原始/自动模式:自动搜索符合条件的技术形态 2 = 验证模式:验证指定日期是否符合模式条件 - manual_config: 手动配置字典,仅在mode=2时使用 返回: - dict: 分析结果 """ print("期货技术形态规律分析工具") print("="*60) # 打印配置信息 AnalysisConfig.print_config() print("="*60) mode_name = "原始模式" if mode == 1 else "验证模式" print(f"开始分析... [{mode_name}]") # 创建分析器并运行 analyzer = FuturePatternAnalyzer( AnalysisConfig.START_DATE, AnalysisConfig.END_DATE ) results = analyzer.run_analysis(mode=mode, manual_config=manual_config) return results # ===================================================================================== # 第四部分:辅助工具函数 # 用于处理合约切换日志和数据导出 # ===================================================================================== def parse_contract_switch_log_from_text(log_text): """ 从文本格式的合约切换日志中解析合约信息 参数: - log_text: 日志文本,格式示例: "A2203.XDCE:2022年1月26日至2022年2月22日 A2207.XDCE:2022年2月23日至2022年6月21日" 返回: - list: 标准化的合约切换日志 """ import re contracts = [] lines = log_text.strip().split('\n') for line in lines: line = line.strip() if not line: continue # 正则匹配合约代码和日期范围 pattern = r'([A-Z0-9]+\.[A-Z]+):(\d{4})年(\d{1,2})月(\d{1,2})日至(\d{4})年(\d{1,2})月(\d{1,2})日' match = re.search(pattern, line) if match: contract_code = match.group(1) start_year = int(match.group(2)) start_month = int(match.group(3)) start_day = int(match.group(4)) end_year = int(match.group(5)) end_month = int(match.group(6)) end_day = int(match.group(7)) contracts.append({ 'contract': contract_code, 'start_date': f'{start_year}-{start_month:02d}-{start_day:02d}', 'end_date': f'{end_year}-{end_month:02d}-{end_day:02d}' }) return contracts def export_contract_weekly_data(weekly_data_dict, filename_prefix='contract_weekly_data'): """ 导出合约周线数据到CSV文件 参数: - weekly_data_dict: get_weekly_data_by_contract_log返回的数据字典 - filename_prefix: 文件名前缀 注意:导出的数据是直接从get_bars API获取的原生周线数据, 相比日线转换的数据具有更高的精度和准确性 """ import datetime timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S') for contract_code, contract_data in weekly_data_dict.items(): # 清理文件名中的特殊字符 safe_contract_name = contract_code.replace('.', '_') filename = f'{filename_prefix}_{safe_contract_name}_{timestamp}.csv' # 准备数据 df = contract_data['data'].copy() df['contract'] = contract_code df['original_start'] = contract_data['original_start'].strftime('%Y-%m-%d') df['original_end'] = contract_data['original_end'].strftime('%Y-%m-%d') # 保存到CSV df.to_csv(filename, encoding='utf-8-sig') print(f"合约 {contract_code} 的周线数据已保存至: {filename}") def run_manual_pattern_validation(manual_config, analyzer=None): """ 运行手动模式验证的便捷函数 此函数只会处理manual_config中明确指定的品种和时间范围, 不会查询额外的品种数据。现在使用统一的分析框架。 参数: - manual_config: 手动配置字典,格式: {'A':['2023-01-06','2023-06-09'], 'B':['2022-07-22','2024-08-23']} 键: 期货品种代码(如'A'、'B'、'RB'等) 值: 需要验证的日期列表('YYYY-MM-DD'格式) - analyzer: 可选的分析器实例,如果为None则创建新实例 返回: - dict: 验证结果,包含成功和失败的详细信息 注意: - 系统只会获取配置中指定品种的数据 - 数据获取范围基于每个品种配置的日期范围自动计算 - 不会处理未在配置中明确指定的品种 - 现在使用统一的核心验证逻辑,确保与原始模式的一致性 """ print("\n=== 手动模式验证工具 ===") print("此功能用于验证指定日期是否符合预定义的技术形态模式") print("使用统一的验证逻辑,与原始模式保持完全一致") # 创建分析器实例(如果没有提供) if analyzer is None: analyzer = FuturePatternAnalyzer() # 使用统一的分析入口 - 验证模式 validation_results = analyzer.run_analysis(mode=2, manual_config=manual_config) return validation_results def demo_manual_validation(): """演示手动模式验证功能""" print("\n=== 手动模式验证演示 ===") # 示例配置 demo_config = { 'A': ['2023-01-06', '2023-06-09', '2024-07-26'], 'B': ['2022-07-22', '2024-08-23', '2025-01-03'] } print("使用演示配置:") for pattern_type, dates in demo_config.items(): print(f" 模式 '{pattern_type}': {len(dates)} 个日期") for date in dates: print(f" - {date}") try: # 运行验证 results = run_manual_pattern_validation(demo_config) if results: print("\n✅ 验证完成") return results else: print("\n⚠️ 验证未能完成") return {} except Exception as e: print(f"\n❌ 验证过程中出错: {str(e)}") import traceback traceback.print_exc() return {} def get_contract_weekly_data(contract_switch_log, extend_weeks=10, export_data=False): """ 基于合约切换日志获取周线数据的正式方法 参数: - contract_switch_log: 合约切换日志,可以是文本格式或列表格式 - extend_weeks: 延长周数,默认10周 - export_data: 是否导出数据到CSV文件,默认False 返回: - dict: 每个合约的周线数据 """ print("\n=== 基于合约切换日志获取周线数据 ===") # 如果是文本格式,先解析 if isinstance(contract_switch_log, str): print("解析合约切换日志:") contracts = parse_contract_switch_log_from_text(contract_switch_log) for contract in contracts: print(f" {contract['contract']}: {contract['start_date']} 至 {contract['end_date']}") else: contracts = contract_switch_log print(f"处理{len(contracts)}个合约的切换日志") print(f"\n创建分析器并获取周线数据 (延长{extend_weeks}周):") analyzer = FuturePatternAnalyzer() try: # 获取周线数据 weekly_data = analyzer.get_weekly_data_by_contract_log(contracts, extend_weeks=extend_weeks) if weekly_data: print(" ✓ 获取成功!") for contract_code, data_info in weekly_data.items(): print(f" {contract_code}: {data_info['total_weeks']}周数据") print(f" 原始范围: {data_info['original_start'].strftime('%Y-%m-%d')} 至 {data_info['original_end'].strftime('%Y-%m-%d')}") print(f" 延长范围: {data_info['extended_start'].strftime('%Y-%m-%d')} 至 {data_info['extended_end'].strftime('%Y-%m-%d')}") # 导出数据 if export_data: export_contract_weekly_data(weekly_data) return weekly_data else: print(" ⚠️ 未获取到数据") return {} except Exception as e: print(f" ❌ 获取失败: {str(e)}") import traceback traceback.print_exc() return {} # ===================================================================================== # 第五部分:程序主入口 - 自动运行分析 # ===================================================================================== print("期货市场技术形态规律分析工具") print("本工具专门研究期货品种在大幅下跌后出现特定K线形态的规律") print("适用于聚宽在线研究平台") print("="*80) # ===================================================================================== # 基于合约切换日志的周线数据获取功能使用说明 # ===================================================================================== # # 使用示例1:处理文本格式的合约切换日志 # sample_log = """ # A2203.XDCE:2022年1月26日至2022年2月22日 # A2207.XDCE:2022年2月23日至2022年6月21日 # A2209.XDCE:2022年6月22日至2022年8月30日 # """ # weekly_data = get_contract_weekly_data(sample_log, extend_weeks=10, export_data=True) # # 使用示例2:处理列表格式的合约切换日志 # contract_list = [ # {'contract': 'A2203.XDCE', 'start_date': '2022-01-26', 'end_date': '2022-02-22'}, # {'contract': 'A2207.XDCE', 'start_date': '2022-02-23', 'end_date': '2022-06-21'} # ] # weekly_data = get_contract_weekly_data(contract_list, extend_weeks=15, export_data=False) # # 新增功能1:模式识别失败时的完整K线图显示 # 当identify_target_pattern函数未识别到任何目标形态时,系统会自动: # 1. 显示所有可用品种的完整烛台图 # 2. 包含OHLC数据的专业K线图表现 # 3. 添加统计信息(最高价、最低价、总涨跌幅等) # 4. 直接在界面中显示,不保存到本地文件 # # 新增功能2:手动模式验证系统 # 使用示例3:手动验证指定日期的模式匹配 # 现在使用统一的入口函数,通过mode参数控制执行模式 manual_config = { 'A': ['2023-01-06', '2023-06-09', '2024-07-26'], 'B': ['2022-07-22', '2024-08-23', '2025-01-03'] } # 方法1: 使用统一入口函数 validation_results = run_pattern_analysis(mode=2, manual_config=manual_config) # 方法2: 或者使用便捷函数(内部调用统一入口) # validation_results = run_manual_pattern_validation(manual_config) # 或运行演示: demo_manual_validation() # # 重构后的统一模式控制系统: # 1. mode=1(原始模式):自动搜索符合条件的技术形态,遍历所有期货品种 # 2. mode=2(验证模式):验证指定日期是否符合模式条件,只处理配置品种 # 3. 统一的核心验证逻辑:_core_pattern_validation()消除了代码重复 # 4. 独立的执行路径:两种模式不再共享步骤1,完全独立执行 # 5. 一致的验证标准:两种模式使用完全相同的模式识别条件和计算方法 # # 数据获取特点: # 1. 直接使用get_bars API获取原生周线数据,避免日线转换的精度损失 # 2. 自动延长时间范围(每个合约前后各延长指定周数) # 3. 确保获取充足的历史数据(最少60周作为背景) # 4. 支持数据导出到CSV文件便于后续分析 # 5. 简化结构:直接使用8888加权合约,无需处理主力合约切换 # # 简化后的工作流程: # 1. 识别符合条件的大幅下跌期货品种 # 2. 为每个品种创建8888合约映射(单级字典结构) # 3. 直接从8888合约获取周线数据,避免主力合约复杂性 # 4. 进行技术形态识别和后续走势分析 # 5. 当未识别到目标形态时,自动显示完整的烛台图(K线图) # ===================================================================================== # 直接运行分析 # print("\n开始运行完整分析...") # try: # results = run_pattern_analysis() # except Exception as e: # print(f"运行过程中出现错误: {str(e)}") # import traceback # traceback.print_exc() # results = None # if results: # print("\n✅ 分析完成!") # if isinstance(results, dict) and 'summary_stats' in results: # summary = results['summary_stats'] # print(f"\n📊 快速统计:") # print(f" - 识别到技术形态: {summary.get('pattern_count', 0)}个") # print(f" - 正收益成功率: {summary.get('success_rate', 0):.1f}%") # print(f" - 高收益成功率: {summary.get('high_gain_rate', 0):.1f}%") # print(f" - 平均周涨跌幅: {summary.get('avg_weekly_change', 0):+.2f}%") # else: # print("\n❌ 分析未能完成,请检查错误信息。")