""" 期货烛台影线形态分析 研究期货合约中带有明显影线的K线对未来价格走势的预测能力 本程序实现完整的分析流程: 1. 数据结构设置 - 获取主力期货合约数据 2. 数据收集 - 扩展时间范围获取OHLC数据 3. 实体长度计算 - 计算历史平均实体长度作为阈值 4. 影线形态识别 - 识别满足条件的K线形态 5. 未来表现分析 - 分析影线方向与价格走势的关系 6. 结果输出 - 生成影线形态分析结果CSV 注:程序使用动态获取的主力合约进行分析,确保分析结果基于真实的交易活动 作者: jukuan研究团队 日期: 2025-09 适用平台: 聚宽在线研究平台 """ import pandas as pd import numpy as np from jqdata import * import datetime import warnings warnings.filterwarnings('ignore') # ===================================================================================== # 分析配置参数 - 集中配置部分 # ===================================================================================== class CandlestickHatchConfig: """期货烛台影线形态分析配置参数""" # ==================== 时间范围设置 ==================== ANALYSIS_START_DATE = datetime.datetime(2025, 8, 1) # 分析开始日期 ANALYSIS_END_DATE = datetime.datetime(2025, 8, 20) # 分析结束日期 # ==================== 时间扩展参数 ==================== HISTORICAL_DAYS = 365 # 历史数据回溯期:优势期前天数 FORWARD_DAYS = 15 # 正向分析期:优势期后天数 FUTURE_MONITOR_DAYS = 15 # 未来表现跟踪期:交易日数 # ==================== 影线形态参数 ==================== HATCH_TO_BODY_RATIO = 1.2 # 影线与实体长度比率阈值 OPPOSITE_HATCH_RATIO = 0.5 # 相反方向影线与实体长度比率阈值 # ==================== 形态验证阈值 ==================== MAX_REVENUE_THRESHOLD = 0.02 # 最大盈利阈值 MAX_LOSS_THRESHOLD = 0.01 # 最大亏损阈值 # ==================== 分析规模控制 ==================== MAX_ANALYSIS_CONTRACTS = -1 # 最大分析合约数量限制 # ==================== 输出设置 ==================== OUTPUT_ENCODING = 'utf-8-sig' # 输出文件编码格式 VERBOSE_LOGGING = True # 是否打印详细日志 # ==================== 交易仓位管理 ==================== INITIAL_TRADE_AMOUNT = 20000 # 单笔交易初始金额 LOSS_RATE = 0.01 # 亏损交易的亏损率(可配置) PROFIT_DRAWDOWN_RATE = 0.02 # 盈利交易的回撤率(可配置) @classmethod def print_config(cls): """打印当前配置信息""" print("=== 期货烛台影线形态分析配置 ===") print(f"分析时间范围: {cls.ANALYSIS_START_DATE.strftime('%Y-%m-%d')} 至 {cls.ANALYSIS_END_DATE.strftime('%Y-%m-%d')}") print(f"历史数据回溯期: {cls.HISTORICAL_DAYS}天") print(f"前向分析期: {cls.FORWARD_DAYS}天") print(f"未来表现跟踪期: {cls.FUTURE_MONITOR_DAYS}个交易日") print(f"影线与实体长度比率阈值: {cls.HATCH_TO_BODY_RATIO}") print(f"相反方向影线比率阈值: {cls.OPPOSITE_HATCH_RATIO}") print(f"最大盈利阈值: {cls.MAX_REVENUE_THRESHOLD}") print(f"最大亏损阈值: {cls.MAX_LOSS_THRESHOLD}") print(f"最大分析合约数: {cls.MAX_ANALYSIS_CONTRACTS}") print(f"合约类型: 主力合约") print(f"详细日志: {'开启' if cls.VERBOSE_LOGGING else '关闭'}") print(f"初始交易金额: {cls.INITIAL_TRADE_AMOUNT}") print(f"亏损交易亏损率: {cls.LOSS_RATE}") print(f"盈利交易回撤率: {cls.PROFIT_DRAWDOWN_RATE}") print("=" * 50) class FutureCandlestickHatchAnalyzer: """期货烛台影线形态分析器""" def __init__(self, config=None): """初始化分析器""" # 使用配置类参数 if config is None: config = CandlestickHatchConfig self.config = config self.historical_days = config.HISTORICAL_DAYS self.forward_days = config.FORWARD_DAYS self.future_monitor_days = config.FUTURE_MONITOR_DAYS self.hatch_to_body_ratio = config.HATCH_TO_BODY_RATIO self.opposite_hatch_ratio = config.OPPOSITE_HATCH_RATIO self.max_revenue_threshold = config.MAX_REVENUE_THRESHOLD self.max_loss_threshold = config.MAX_LOSS_THRESHOLD self.analysis_start_date = config.ANALYSIS_START_DATE self.analysis_end_date = config.ANALYSIS_END_DATE self.max_analysis_contracts = config.MAX_ANALYSIS_CONTRACTS self.output_encoding = config.OUTPUT_ENCODING self.verbose_logging = config.VERBOSE_LOGGING # 交易仓位管理参数 self.initial_trade_amount = config.INITIAL_TRADE_AMOUNT self.loss_rate = config.LOSS_RATE self.profit_drawdown_rate = config.PROFIT_DRAWDOWN_RATE # 期货品种与交易所的映射关系 self.exchange_map = { # 大连商品交易所 (XDCE) 'A': 'XDCE', 'B': 'XDCE', 'C': 'XDCE', 'CS': 'XDCE', 'I': 'XDCE', 'J': 'XDCE', 'JD': 'XDCE', 'JM': 'XDCE', 'L': 'XDCE', 'M': 'XDCE', 'P': 'XDCE', 'PP': 'XDCE', 'PG': 'XDCE', 'RR': 'XDCE', 'V': 'XDCE', 'Y': 'XDCE', 'EB': 'XDCE', 'EG': 'XDCE', 'LH': 'XDCE', # 郑州商品交易所 (XZCE) 'AP': 'XZCE', 'CF': 'XZCE', 'CY': 'XZCE', 'FG': 'XZCE', 'JR': 'XZCE', 'LR': 'XZCE', 'MA': 'XZCE', 'OI': 'XZCE', 'PM': 'XZCE', 'RI': 'XZCE', 'RM': 'XZCE', 'RS': 'XZCE', 'SF': 'XZCE', 'SM': 'XZCE', 'SR': 'XZCE', 'TA': 'XZCE', 'WH': 'XZCE', 'ZC': 'XZCE', 'CJ': 'XZCE', 'ME': 'XZCE', 'PF': 'XZCE', 'PK': 'XZCE', 'RO': 'XZCE', 'SA': 'XZCE', 'TC': 'XZCE', 'UR': 'XZCE', 'WS': 'XZCE', 'WT': 'XZCE', # 上海期货交易所 (XSGE) 'AG': 'XSGE', 'AL': 'XSGE', 'AU': 'XSGE', 'BU': 'XSGE', 'CU': 'XSGE', 'FU': 'XSGE', 'HC': 'XSGE', 'NI': 'XSGE', 'PB': 'XSGE', 'RB': 'XSGE', 'RU': 'XSGE', 'SN': 'XSGE', 'SP': 'XSGE', 'SS': 'XSGE', 'WR': 'XSGE', 'ZN': 'XSGE', # 上海国际能源交易中心 (XINE) 'BC': 'XINE', 'LU': 'XINE', 'NR': 'XINE', 'SC': 'XINE', # 中金所 (CCFX) 'IC': 'CCFX', 'IF': 'CCFX', 'IH': 'CCFX', 'T': 'CCFX', 'TF': 'CCFX', 'TS': 'CCFX', # 广期所 (GFEX) 'SI': 'GFEX', 'LC': 'GFEX', 'PS': 'GFEX' } # 存储结果的字典 self.contract_data = {} # 主合约数据结构 - dict1 self.hatch_patterns = {} # 影线形态数据 - dict2 # 投资组合管理 self.portfolio_results = [] # 存储所有交易结果 if self.verbose_logging: print("初始化期货烛台影线形态分析器") print(f"配置参数 - 历史数据回溯期: {self.historical_days}天, 前向分析期: {self.forward_days}天") print(f"影线与实体长度比率阈值: {self.hatch_to_body_ratio}, 相反影线比率阈值: {self.opposite_hatch_ratio}") print(f"最大盈利阈值: {self.max_revenue_threshold}, 最大亏损阈值: {self.max_loss_threshold}") print(f"监控窗口: {self.future_monitor_days}个交易日") print(f"最大分析合约数: {self.max_analysis_contracts}") print(f"合约类型: 主力合约") def _is_trading_day(self, check_date): """ 检查是否为交易日 参数: check_date (date): 要检查的日期 返回: bool: 是否为交易日 """ try: # 使用聚宽API检查交易日 trade_days = get_trade_days(end_date=check_date, count=1) if len(trade_days) > 0: return trade_days[0].date() == check_date return False except: # 如果API调用失败,简单判断是否为工作日 return check_date.weekday() < 5 def _get_symbol_dominant_history(self, symbol, start_date, end_date): """ 获取单个标的的主力合约历史变化 参数: symbol (str): 标的编码 start_date (date): 开始日期 end_date (date): 结束日期 返回: list: 主力合约记录列表 """ results = [] current_date = start_date current_dominant = None period_start = None # if self.verbose_logging: # print(f" 获取 {symbol} 主力合约历史...") # 按日遍历时间范围 while current_date <= end_date: # 跳过非交易日 if not self._is_trading_day(current_date): current_date += datetime.timedelta(days=1) continue try: # 获取当日主力合约 dominant_contract = get_dominant_future(symbol, current_date) if dominant_contract is None: current_date += datetime.timedelta(days=1) continue # 如果是新的主力合约 if dominant_contract != current_dominant: # 如果不是第一个合约,结束上一个合约的记录 if current_dominant is not None and period_start is not None: results.append({ 'symbol': symbol, 'dominant_contract': current_dominant, 'start_date': period_start, 'end_date': current_date - datetime.timedelta(days=1) }) # 开始新合约的记录 current_dominant = dominant_contract period_start = current_date # if self.verbose_logging: # print(f" {current_date}: 主力合约变更为 {dominant_contract}") except Exception as e: if self.verbose_logging: print(f" 获取 {current_date} 的主力合约时出错: {str(e)}") current_date += datetime.timedelta(days=1) # 处理最后一个合约 if current_dominant is not None and period_start is not None: results.append({ 'symbol': symbol, 'dominant_contract': current_dominant, 'start_date': period_start, 'end_date': end_date }) return results def get_dominant_contracts_history(self, symbol_list, start_date, end_date): """ 获取指定时间范围内多个标的的主力合约历史变化 参数: symbol_list (list): 标的编码列表 start_date (date): 开始日期 end_date (date): 结束日期 返回: pandas.DataFrame: 包含主力合约变化历史的数据框 """ # if self.verbose_logging: # print(f"获取主力合约数据...") # print(f"标的列表: {symbol_list}") # print(f"时间范围: {start_date} 至 {end_date}") # 存储所有结果的列表 all_results = [] # 逐个处理每个标的 for symbol in symbol_list: # if self.verbose_logging: # print(f"\n处理标的: {symbol}") try: # 获取该标的在指定时间范围内的主力合约历史 symbol_results = self._get_symbol_dominant_history(symbol, start_date, end_date) all_results.extend(symbol_results) except Exception as e: if self.verbose_logging: print(f"处理标的 {symbol} 时发生错误: {str(e)}") continue # 将结果转换为DataFrame if all_results: result_df = pd.DataFrame(all_results) result_df = result_df.sort_values(['symbol', 'start_date']).reset_index(drop=True) if self.verbose_logging: print(f"\n成功获取 {len(result_df)} 条主力合约记录") return result_df else: if self.verbose_logging: print("\n未获取到任何主力合约数据") return pd.DataFrame(columns=['symbol', 'dominant_contract', 'start_date', 'end_date']) def build_contract_structure(self): """ 构建期货主力合约数据结构 (dict1) 返回结构:Key=完整合约代码, Value=[start_date, end_date] """ if self.verbose_logging: print("\n=== 步骤1: 构建期货主力合约数据结构 ===") # 分析时间范围 analysis_start = self.analysis_start_date analysis_end = self.analysis_end_date if self.verbose_logging: print(f"分析时间范围: {analysis_start.strftime('%Y-%m-%d')} 至 {analysis_end.strftime('%Y-%m-%d')}") print(f"合约类型: 主力合约") # 构建主力合约数据结构 contract_dict = self._build_dominant_contract_structure(analysis_start, analysis_end) self.contract_data = contract_dict if self.verbose_logging: print(f"构建完成,共{len(contract_dict)}个期货合约") return contract_dict def _build_dominant_contract_structure(self, analysis_start, analysis_end): """构建基于主力合约的数据结构""" if self.verbose_logging: print("使用主力合约获取逻辑...") # 获取要分析的期货品种列表 if self.max_analysis_contracts > 0: symbol_list = list(self.exchange_map.keys())[:self.max_analysis_contracts] else: symbol_list = list(self.exchange_map.keys()) # 获取主力合约历史数据 dominant_df = self.get_dominant_contracts_history( symbol_list, analysis_start.date(), analysis_end.date() ) contract_dict = {} if not dominant_df.empty: # 将DataFrame转换为字典结构 for _, row in dominant_df.iterrows(): contract_code = row['dominant_contract'] start_date = row['start_date'] if isinstance(row['start_date'], datetime.date) else row['start_date'].date() end_date = row['end_date'] if isinstance(row['end_date'], datetime.date) else row['end_date'].date() contract_dict[contract_code] = [start_date, end_date] # if self.verbose_logging: # print(f" 添加主力合约: {contract_code} ({start_date} 至 {end_date})") return contract_dict def collect_extended_data(self): """ 数据收集:为每个合约收集扩展时间范围的数据 扩展期限:从(start_date-365天)到(end_date+15天) """ if self.verbose_logging: print("\n=== 步骤2: 收集扩展时间范围的数据 ===") extended_data = {} for contract_code, period in self.contract_data.items(): start_date, end_date = period # 计算扩展期限 start_date_2 = start_date - datetime.timedelta(days=self.historical_days) end_date_2 = end_date + datetime.timedelta(days=self.forward_days) # if self.verbose_logging: # print(f"获取 {contract_code} 数据") # print(f" 原始期间: {start_date} 至 {end_date}") # print(f" 扩展期间: {start_date_2} 至 {end_date_2}") try: # 获取日线数据 data = get_price( contract_code, start_date=start_date_2, end_date=end_date_2, frequency='daily', fields=['open', 'close', 'high', 'low', 'volume'], skip_paused=False, panel=False ) if data is not None and len(data) > 0: extended_data[contract_code] = { 'data': data, 'original_start': start_date, 'original_end': end_date, 'extended_start': start_date_2, 'extended_end': end_date_2 } # if self.verbose_logging: # print(f" 成功获取{len(data)}条数据记录") # else: # if self.verbose_logging: # print(f" 未获取到数据") except Exception as e: if self.verbose_logging: print(f" 获取数据失败: {str(e)}") continue if self.verbose_logging: print(f"数据收集完成,共{len(extended_data)}个合约有效数据") return extended_data def calculate_body_thresholds(self, extended_data): """ 计算实体长度阈值:使用历史数据计算平均实体长度 """ if self.verbose_logging: print("\n=== 步骤3: 计算实体长度阈值 ===") threshold_data = {} for contract_code, contract_info in extended_data.items(): data = contract_info['data'] start_date = contract_info['original_start'] start_date_2 = contract_info['extended_start'] # if self.verbose_logging: # print(f"计算 {contract_code} 的实体长度阈值") # print(f" 历史期间: {start_date_2} 至 {start_date}") # 筛选历史数据:date >= start_date_2 且 date < start_date historical_mask = (data.index.date >= start_date_2) & (data.index.date < start_date) historical_data = data[historical_mask] if len(historical_data) == 0: if self.verbose_logging: print(f" 历史数据无效,跳过") continue # 计算实体长度 historical_data = historical_data.copy() historical_data['body_length'] = abs(historical_data['close'] - historical_data['open']) # 计算平均实体长度作为阈值 body_threshold = historical_data['body_length'].mean() threshold_data[contract_code] = { 'contract_info': contract_info, 'body_threshold': body_threshold, 'historical_days': len(historical_data) } # if self.verbose_logging: # print(f" 实体长度阈值: {body_threshold:.4f} (基于{len(historical_data)}个历史交易日)") if self.verbose_logging: print("实体长度阈值计算完成") return threshold_data def identify_hatch_patterns(self, threshold_data): """ 影线形态识别:扫描符合条件的K线 条件:实体长度 >= 阈值 且 影线长度 >= 1.2 × 实体长度 """ if self.verbose_logging: print(f"\n=== 步骤4: 识别影线形态 ===") pattern_dict = {} # dict2结构 for contract_code, threshold_info in threshold_data.items(): contract_info = threshold_info['contract_info'] data = contract_info['data'] body_threshold = threshold_info['body_threshold'] start_date = contract_info['original_start'] # 使用原始开始日期 end_date = contract_info['original_end'] # if self.verbose_logging: # print(f"分析 {contract_code} 的影线形态") # print(f" 扫描期间: {start_date} 至 {end_date}") # print(f" 实体长度阈值: {body_threshold:.4f}") # 筛选扫描数据:从start_date到end_date(含) scan_mask = (data.index.date >= start_date) & (data.index.date <= end_date) scan_data = data[scan_mask] contract_patterns = {} pattern_count = 0 for idx, (signal_date, row) in enumerate(scan_data.iterrows()): # 计算K线各项数值 open_price = row['open'] close_price = row['close'] high_price = row['high'] low_price = row['low'] # 计算实体长度 body_length = abs(close_price - open_price) # 检查实体长度是否满足阈值 if body_length < body_threshold: continue # 计算影线长度 upper_hatch = high_price - max(open_price, close_price) lower_hatch = min(open_price, close_price) - low_price # 检查影线条件 hatch_direction = None hatch_length = 0 # 检查上影线条件:上影线长度符合要求 且 下影线长度小于实体长度的一半 if (upper_hatch >= self.hatch_to_body_ratio * body_length and lower_hatch < self.opposite_hatch_ratio * body_length): hatch_direction = 'up' hatch_length = upper_hatch # 检查下影线条件:下影线长度符合要求 且 上影线长度小于实体长度的一半 elif (lower_hatch >= self.hatch_to_body_ratio * body_length and upper_hatch < self.opposite_hatch_ratio * body_length): hatch_direction = 'down' hatch_length = lower_hatch # 如果满足影线条件,记录形态 if hatch_direction is not None: pattern_count += 1 # dict3结构 pattern_info = { 'date': signal_date.strftime('%Y-%m-%d'), 'high': float(high_price), 'low': float(low_price), 'open': float(open_price), 'close': float(close_price), # 标准收盘价 'body_len': float(body_length), 'hatch_dire': hatch_direction, 'hatch_len': float(hatch_length) } contract_patterns[signal_date] = pattern_info # if self.verbose_logging: # opposite_hatch_len = lower_hatch if hatch_direction == 'up' else upper_hatch # print(f" 发现形态: {signal_date.strftime('%Y-%m-%d')} {hatch_direction}影线 实体:{body_length:.4f} 影线:{hatch_length:.4f} 相反影线:{opposite_hatch_len:.4f}") if contract_patterns: pattern_dict[contract_code] = contract_patterns # if self.verbose_logging: # print(f" 检测到{pattern_count}个有效影线形态") # else: # if self.verbose_logging: # print(f" 未检测到有效影线形态") # if self.verbose_logging: # total_patterns = sum(len(patterns) for patterns in pattern_dict.values()) # print(f"\n影线形态识别完成,共{total_patterns}个形态") self.hatch_patterns = pattern_dict return pattern_dict def analyze_future_performance(self, threshold_data, pattern_dict): """ 未来表现分析:分析影线形态对未来价格走势的预测能力 交易方向:上影线做空,下影线做多 """ if self.verbose_logging: print(f"\n=== 步骤5: 分析未来表现 ===") enhanced_pattern_dict = {} for contract_code, contract_patterns in pattern_dict.items(): # 获取对应的数据信息 contract_info = threshold_data[contract_code]['contract_info'] data = contract_info['data'] end_date_2 = contract_info['extended_end'] if self.verbose_logging: print(f"\n分析 {contract_code} 的未来表现") print(f" 监控窗口: {self.future_monitor_days}个交易日") enhanced_patterns = {} for signal_date, pattern_info in contract_patterns.items(): hatch_direction = pattern_info['hatch_dire'] standard_close = pattern_info['close'] # 标准收盘价 print(f"形态日期:{signal_date.strftime('%Y-%m-%d')}, 标准收盘价: {standard_close}") # 在数据中找到信号日期的位置 signal_idx = None for idx, (date, row) in enumerate(data.iterrows()): if date.date() == signal_date.date(): signal_idx = idx break if signal_idx is None: if self.verbose_logging: print(f" 未找到信号日期 {signal_date.strftime('%Y-%m-%d')} 在数据中") continue # 分析未来N天的表现 future_analysis = self._analyze_future_n_days( data, signal_idx, hatch_direction, standard_close, self.future_monitor_days, end_date_2 ) # 计算最大收益和最大风险(原逻辑:与影线方向相反交易) max_revenue_info, min_revenue_info = self._calculate_max_revenue_risk(future_analysis, hatch_direction, standard_close) # 计算最大收益和最大风险(新逻辑:与影线方向相同交易) max_revenue_info_2, min_revenue_info_2 = self._calculate_max_revenue_risk_2(future_analysis, hatch_direction, standard_close) # if self.verbose_logging: # print(f" 原逻辑最大收益: {max_revenue_info['revenue']:.4f} ({max_revenue_info['date']})") # print(f" 原逻辑最大亏损: {min_revenue_info['revenue']:.4f} ({min_revenue_info['date']})") # print(f" 新逻辑最大收益: {max_revenue_info_2['revenue']:.4f} ({max_revenue_info_2['date']})") # print(f" 新逻辑最大亏损: {min_revenue_info_2['revenue']:.4f} ({min_revenue_info_2['date']})") # 计算conclusion字段(增强版,包含日损失检查) conclusion = self._calculate_conclusion(max_revenue_info, min_revenue_info, future_analysis, hatch_direction, standard_close) # 计算conclusion2字段(新逻辑:与影线方向相同交易) conclusion_2 = self._calculate_conclusion_2(max_revenue_info_2, min_revenue_info_2, future_analysis, hatch_direction, standard_close) # 计算新增字段 body_direction, direction_consistency = self._calculate_direction_fields(pattern_info, hatch_direction) # 计算影线与实体长度比率 hatch_to_body_ratio = pattern_info['hatch_len'] / pattern_info['body_len'] if pattern_info['body_len'] > 0 else 0 # 计算移动平均线 ma_values = self._calculate_moving_averages(data, signal_idx) # 计算基于收盘价与5K移动平均线关系的价格方向 price_direction = self._calculate_price_direction(standard_close, ma_values['ma5']) # 计算趋势强度 trend_strength = self._calculate_trend_strength(ma_values) # 动态仓位计算 remaining_amount = self._calculate_dynamic_position( conclusion, max_revenue_info['revenue'], self.initial_trade_amount ) # 记录到投资组合(包含两种交易逻辑) self._add_portfolio_trade( contract_code, signal_date.strftime('%Y-%m-%d'), conclusion, conclusion_2, max_revenue_info['revenue'], max_revenue_info_2['revenue'], self.initial_trade_amount, remaining_amount ) # 增强pattern_info enhanced_pattern = pattern_info.copy() enhanced_pattern.update({ 'max_revenue_date': max_revenue_info['date'], 'max_revenue': max_revenue_info['revenue'], 'min_revenue_date': min_revenue_info['date'], 'min_revenue': min_revenue_info['revenue'], 'max_revenue_date2': max_revenue_info_2['date'], 'max_revenue2': max_revenue_info_2['revenue'], 'min_revenue_date2': min_revenue_info_2['date'], 'min_revenue2': min_revenue_info_2['revenue'], 'conclusion': conclusion, 'conclusion2': conclusion_2, 'body_direction': body_direction, 'direction_consistency': direction_consistency, 'price_direction': price_direction, # 新增:基于收盘价与5K移动平均线关系的价格方向 'trade_amount': float(self.initial_trade_amount), 'remaining_amount': float(remaining_amount), 'profit_loss': float(remaining_amount - self.initial_trade_amount), 'hatch_body_ratio': float(hatch_to_body_ratio), 'ma5': ma_values['ma5'], 'ma10': ma_values['ma10'], 'ma20': ma_values['ma20'], 'ma30': ma_values['ma30'], 'trend_strength': trend_strength }) enhanced_patterns[signal_date] = enhanced_pattern if self.verbose_logging: print(f" {signal_date.strftime('%Y-%m-%d')} {hatch_direction}影线:") print(f" 原逻辑 - 最大收益: {max_revenue_info['revenue']:.4f} ({max_revenue_info['date']})") print(f" 原逻辑 - 最大亏损: {min_revenue_info['revenue']:.4f} ({min_revenue_info['date']})") print(f" 原逻辑 - 结论: {conclusion}") print(f" 新逻辑 - 最大收益: {max_revenue_info_2['revenue']:.4f} ({max_revenue_info_2['date']})") print(f" 新逻辑 - 最大亏损: {min_revenue_info_2['revenue']:.4f} ({min_revenue_info_2['date']})") print(f" 新逻辑 - 结论: {conclusion_2}") print(f" 实体方向: {body_direction}") print(f" 方向一致性: {direction_consistency}") print(f" 价格方向: {price_direction} (收盘价vs5K线)") if enhanced_patterns: enhanced_pattern_dict[contract_code] = enhanced_patterns if self.verbose_logging: print(f" 完成{len(enhanced_patterns)}个形态的未来表现分析") if self.verbose_logging: total_patterns = sum(len(patterns) for patterns in enhanced_pattern_dict.values()) print(f"\n未来表现分析完成,共{total_patterns}个形态") return enhanced_pattern_dict def _analyze_future_n_days(self, data, signal_idx, hatch_direction, standard_close, monitor_days, end_date_2): """分析信号后未来N天的价格表现""" future_prices = [] data_list = list(data.iterrows()) for i in range(1, monitor_days + 1): future_idx = signal_idx + i if future_idx < len(data_list): future_date, future_row = data_list[future_idx] # 检查是否超出扩展结束日期 if future_date.date() > end_date_2: break future_prices.append({ 'date': future_date.strftime('%Y-%m-%d'), 'high': float(future_row['high']), 'low': float(future_row['low']) }) else: break return future_prices def _calculate_max_revenue_risk(self, future_analysis, hatch_direction, standard_close): """计算最大收益和最大风险(原逻辑:与影线方向相反交易)""" if not future_analysis: return {'date': None, 'revenue': 0.0}, {'date': None, 'revenue': 0.0} max_revenue = float('-inf') min_revenue = float('inf') max_revenue_date = None min_revenue_date = None for day_data in future_analysis: date = day_data['date'] high = day_data['high'] low = day_data['low'] if hatch_direction == 'up': # 上影线做空:收益 = (标准收盘价 - 最低价) / 标准收盘价,风险 = (标准收盘价 - 最高价) / 标准收盘价 revenue = (standard_close - low) / standard_close risk = (standard_close - high) / standard_close else: # 下影线做多:收益 = (最高价 - 标准收盘价) / 标准收盘价,风险 = (最低价 - 标准收盘价) / 标准收盘价 revenue = (high - standard_close) / standard_close risk = (low - standard_close) / standard_close # 更新最大收益 if revenue > max_revenue: max_revenue = round(revenue, 4) max_revenue_date = date # 更新最大风险(最大亏损,即最小收益的相反数) if risk < min_revenue: min_revenue = round(risk, 4) min_revenue_date = date return {'date': max_revenue_date, 'revenue': float(max_revenue)}, {'date': min_revenue_date, 'revenue': float(min_revenue)} def _calculate_max_revenue_risk_2(self, future_analysis, hatch_direction, standard_close): """计算最大收益和最大风险(新逻辑:与影线方向相同交易)""" if not future_analysis: return {'date': None, 'revenue': 0.0}, {'date': None, 'revenue': 0.0} max_revenue_2 = float('-inf') min_revenue_2 = float('inf') max_revenue_date_2 = None min_revenue_date_2 = None for day_data in future_analysis: date = day_data['date'] high = day_data['high'] low = day_data['low'] if hatch_direction == 'up': # 上影线做多:收益 = (最高价 - 标准收盘价) / 标准收盘价,风险 = (最低价 - 标准收盘价) / 标准收盘价 revenue_2 = (high - standard_close) / standard_close risk_2 = (low - standard_close) / standard_close else: # 下影线做空:收益 = (标准收盘价 - 最低价) / 标准收盘价,风险 = (标准收盘价 - 最高价) / 标准收盘价 revenue_2 = (standard_close - low) / standard_close risk_2 = (standard_close - high) / standard_close # 更新最大收益 if revenue_2 > max_revenue_2: max_revenue_2 = round(revenue_2, 4) max_revenue_date_2 = date # 更新最大风险(最大亏损,即最小收益的相反数) if risk_2 < min_revenue_2: min_revenue_2 = round(risk_2, 4) min_revenue_date_2 = date return {'date': max_revenue_date_2, 'revenue': float(max_revenue_2)}, {'date': min_revenue_date_2, 'revenue': float(min_revenue_2)} def _calculate_conclusion(self, max_revenue_info, min_revenue_info, future_analysis, hatch_direction, standard_close): """ 计算形态的结论:是否为有效的预测信号(增强版) 逻辑: - 若满足以下条件则返回 "true": 1. (最大盈利日期 < 最大亏损日期)且(最大盈利 > max_revenue_threshold) 2. 或者(最大盈利日期 > 最大亏损日期)且(最大盈利 > max_revenue_threshold)且(最大亏损 < max_loss_threshold) - 增强条件:如果在最大收益日期之前的任何一天,损失超过了MAX_LOSS_THRESHOLD,则返回 "false" - 否则返回 "false" """ max_revenue_date = max_revenue_info['date'] max_revenue = max_revenue_info['revenue'] min_revenue_date = min_revenue_info['date'] min_revenue = min_revenue_info['revenue'] # 如果日期为空,返回false if max_revenue_date is None or min_revenue_date is None: return "false" # 转换日期字符串为datetime对象进行比较 try: from datetime import datetime max_revenue_dt = datetime.strptime(max_revenue_date, '%Y-%m-%d') min_revenue_dt = datetime.strptime(min_revenue_date, '%Y-%m-%d') except: return "false" # 增强检查:检查最大收益日期之前是否有任何一天的损失超过阈值 if future_analysis and self._check_early_excessive_loss(future_analysis, max_revenue_date, hatch_direction, standard_close): return "false" # 条件1:(最大盈利日期 < 最大亏损日期)且(最大盈利 > 阈值) condition1 = (max_revenue_dt < min_revenue_dt) and (max_revenue > self.max_revenue_threshold) # 条件2:(最大盈利日期 > 最大亏损日期)且(最大盈利 > 阈值)且(最大亏损 < 阈值) condition2 = (max_revenue_dt > min_revenue_dt) and (max_revenue > self.max_revenue_threshold) and (abs(min_revenue) < self.max_loss_threshold) return "true" if (condition1 or condition2) else "false" def _check_early_excessive_loss(self, future_analysis, max_revenue_date, hatch_direction, standard_close): """ 检查在最大收益日期之前是否有任何一天的损失超过阈值(原逻辑:与影线方向相反交易) 参数: future_analysis: 未来价格分析数据 max_revenue_date: 最大收益日期 hatch_direction: 影线方向 standard_close: 标准收盘价 返回: bool: True表示存在早期过度损失 """ from datetime import datetime try: max_revenue_dt = datetime.strptime(max_revenue_date, '%Y-%m-%d') except: return False for day_data in future_analysis: day_date_str = day_data['date'] try: day_dt = datetime.strptime(day_date_str, '%Y-%m-%d') # 只检查最大收益日期之前的日期 if day_dt >= max_revenue_dt: continue high = day_data['high'] low = day_data['low'] if hatch_direction == 'up': # 上影线做空:亏损 = (标准收盘价 - 最高价) / 标准收盘价,当价格上涨时为负值(亏损) loss_value = (standard_close - high) / standard_close else: # 下影线做多:亏损 = (最低价 - 标准收盘价) / 标准收盘价,当价格下跌时为负值(亏损) loss_value = (low - standard_close) / standard_close # 检查亏损是否超过阈值:亏损为负值,阈值为正值,所以检查 abs(亏损) > 阈值 if loss_value < 0 and abs(loss_value) > self.max_loss_threshold: return True except: continue return False def _calculate_conclusion_2(self, max_revenue_info_2, min_revenue_info_2, future_analysis, hatch_direction, standard_close): """ 计算形态的结论2:是否为有效的预测信号(新逻辑:与影线方向相同交易) 逻辑: - 若满足以下条件则返回 "true": 1. (最大盈利日期 < 最大亏损日期)且(最大盈利 > max_revenue_threshold) 2. 或者(最大盈利日期 > 最大亏损日期)且(最大盈利 > max_revenue_threshold)且(最大亏损 < max_loss_threshold) - 增强条件:如果在最大收益日期之前的任何一天,损失超过了MAX_LOSS_THRESHOLD,则返回 "false" - 否则返回 "false" """ max_revenue_date_2 = max_revenue_info_2['date'] max_revenue_2 = max_revenue_info_2['revenue'] min_revenue_date_2 = min_revenue_info_2['date'] min_revenue_2 = min_revenue_info_2['revenue'] # 如果日期为空,返回false if max_revenue_date_2 is None or min_revenue_date_2 is None: return "false" # 转换日期字符串为datetime对象进行比较 try: from datetime import datetime max_revenue_dt_2 = datetime.strptime(max_revenue_date_2, '%Y-%m-%d') min_revenue_dt_2 = datetime.strptime(min_revenue_date_2, '%Y-%m-%d') except: return "false" # 增强检查:检查最大收益日期之前是否有任何一天的损失超过阈值(新逻辑) if future_analysis and self._check_early_excessive_loss_2(future_analysis, max_revenue_date_2, hatch_direction, standard_close): return "false" # 条件1:(最大盈利日期 < 最大亏损日期)且(最大盈利 > 阈值) condition1 = (max_revenue_dt_2 < min_revenue_dt_2) and (max_revenue_2 > self.max_revenue_threshold) # 条件2:(最大盈利日期 > 最大亏损日期)且(最大盈利 > 阈值)且(最大亏损 < 阈值) condition2 = (max_revenue_dt_2 > min_revenue_dt_2) and (max_revenue_2 > self.max_revenue_threshold) and (abs(min_revenue_2) < self.max_loss_threshold) return "true" if (condition1 or condition2) else "false" def _check_early_excessive_loss_2(self, future_analysis, max_revenue_date_2, hatch_direction, standard_close): """ 检查在最大收益日期之前是否有任何一天的损失超过阈值(新逻辑:与影线方向相同交易) 参数: future_analysis: 未来价格分析数据 max_revenue_date_2: 最大收益日期 hatch_direction: 影线方向 standard_close: 标准收盘价 返回: bool: True表示存在早期过度损失 """ from datetime import datetime try: max_revenue_dt_2 = datetime.strptime(max_revenue_date_2, '%Y-%m-%d') except: return False for day_data in future_analysis: day_date_str = day_data['date'] try: day_dt = datetime.strptime(day_date_str, '%Y-%m-%d') # 只检查最大收益日期之前的日期 if day_dt >= max_revenue_dt_2: continue high = day_data['high'] low = day_data['low'] if hatch_direction == 'up': # 上影线做多:亏损 = (最低价 - 标准收盘价) / 标准收盘价,当价格下跌时为负值(亏损) loss_value = (low - standard_close) / standard_close else: # 下影线做空:亏损 = (标准收盘价 - 最高价) / 标准收盘价,当价格上涨时为负值(亏损) loss_value = (standard_close - high) / standard_close # 检查亏损是否超过阈值:亏损为负值,阈值为正值,所以检查 abs(亏损) > 阈值 if loss_value < 0 and abs(loss_value) > self.max_loss_threshold: return True except: continue return False def _calculate_direction_fields(self, pattern_info, hatch_direction): """ 计算body_direction和direction_consistency字段 参数: pattern_info: K线形态信息 hatch_direction: 影线方向 ('up' 或 'down') 返回: tuple: (body_direction, direction_consistency) """ open_price = pattern_info['open'] close_price = pattern_info['close'] # 计算body_direction:实体方向 if close_price > open_price: body_direction = 'up' elif close_price < open_price: body_direction = 'down' else: body_direction = 'flat' # 十字星情况 # 计算direction_consistency:检查实体方向与影线方向是否相反 if body_direction == 'flat': direction_consistency = 'false' # 十字星情况无法判断一致性 elif (body_direction == 'up' and hatch_direction == 'down') or \ (body_direction == 'down' and hatch_direction == 'up'): direction_consistency = 'true' # 相反,符合预期 else: direction_consistency = 'false' # 不相反 return body_direction, direction_consistency def _calculate_price_direction(self, close_price, ma5_value): """ 计算基于收盘价与5K移动平均线关系的价格方向 参数: close_price: 形态形成日的收盘价 ma5_value: 5日移动平均线价格 返回: str: 价格方向分类 ("上涨" 或 "下跌") """ if ma5_value is None: return "数据不足" # 如果MA5数据不足,返回数据不足 if close_price >= ma5_value: return "上涨" else: return "下跌" def _calculate_dynamic_position(self, conclusion, max_revenue, trade_amount): """ 计算动态仓位调整后的剩余资金 参数: conclusion: 交易结论 ('true' 为正确, 'false' 为错误) max_revenue: 最大盈利率 trade_amount: 当前交易金额 返回: float: 调整后的剩余资金 """ if conclusion == "false": # 亏损交易(错误结果) remaining_amount = trade_amount * (1 - self.loss_rate) if self.verbose_logging: print(f" 亏损交易,剩余资金: {remaining_amount:.2f} (亏损率: {self.loss_rate})") else: # 盈利交易(正确结果) remaining_amount = trade_amount * (1 + max_revenue - self.profit_drawdown_rate) if self.verbose_logging: print(f" 盈利交易,剩余资金: {remaining_amount:.2f} (最大利润: {max_revenue:.4f}, 回撤率: {self.profit_drawdown_rate})") return remaining_amount def _add_portfolio_trade(self, contract_code, date, conclusion, conclusion_2, max_revenue, max_revenue_2, trade_amount, remaining_amount): """ 添加交易记录到投资组合(支持两种交易逻辑) 参数: contract_code: 合约代码 date: 交易日期 conclusion: 原逻辑交易结论 conclusion_2: 新逻辑交易结论 max_revenue: 原逻辑最大盈利率 max_revenue_2: 新逻辑最大盈利率 trade_amount: 交易金额 remaining_amount: 剩余金额(基于原逻辑计算) """ # 计算新逻辑的剩余金额 remaining_amount_2 = self._calculate_dynamic_position( conclusion_2, max_revenue_2, trade_amount ) trade_record = { 'contract_code': contract_code, 'date': date, 'conclusion': conclusion, 'conclusion2': conclusion_2, 'max_revenue': max_revenue, 'max_revenue2': max_revenue_2, 'trade_amount': trade_amount, 'remaining_amount': remaining_amount, 'remaining_amount2': remaining_amount_2, 'profit_loss': remaining_amount - trade_amount, 'profit_loss2': remaining_amount_2 - trade_amount } self.portfolio_results.append(trade_record) def _calculate_moving_averages(self, data, signal_idx): """ 计算形态形成日的移动平均线 参数: data: 价格数据DataFrame signal_idx: 信号日期在数据中的索引 返回: dict: 包含各移动平均线的字典 """ if signal_idx < 29: # 至少需要30天数据来计算30日移动平均 return {'ma5': None, 'ma10': None, 'ma20': None, 'ma30': None} # 获取到信号日期为止的收盘价数据 data_list = list(data.iterrows()) closes = [] for i in range(max(0, signal_idx - 29), signal_idx + 1): # 取30天数据 if i < len(data_list): _, row = data_list[i] closes.append(row['close']) if len(closes) < 30: return {'ma5': None, 'ma10': None, 'ma20': None, 'ma30': None} # 计算移动平均线(使用最后的N天数据) ma5 = sum(closes[-5:]) / 5 if len(closes) >= 5 else None ma10 = sum(closes[-10:]) / 10 if len(closes) >= 10 else None ma20 = sum(closes[-20:]) / 20 if len(closes) >= 20 else None ma30 = sum(closes[-30:]) / 30 if len(closes) >= 30 else None return { 'ma5': round(ma5, 2) if ma5 is not None else None, 'ma10': round(ma10, 2) if ma10 is not None else None, 'ma20': round(ma20, 2) if ma20 is not None else None, 'ma30': round(ma30, 2) if ma30 is not None else None } def _calculate_trend_strength(self, ma_values): """ 计算趋势强度评估 参数: ma_values: 移动平均线字典 返回: str: 趋势强度分类 """ ma5 = ma_values['ma5'] ma10 = ma_values['ma10'] ma20 = ma_values['ma20'] ma30 = ma_values['ma30'] # 如果有任何移动平均线为空,返回"数据不足" if None in [ma5, ma10, ma20, ma30]: return "数据不足" # 进行6项趋势比较 comparisons = [ ma5 >= ma10, # 5K≥10K ma10 >= ma20, # 10K≥20K ma20 >= ma30, # 20K≥30K ma5 >= ma20, # 5K≥20K ma10 >= ma30, # 10K≥30K ma5 >= ma30 # 5K≥30K ] # 计算上涨趋势的数量 uptrend_count = sum(comparisons) # 根据上涨趋势数量进行分类 if uptrend_count == 6 or uptrend_count == 0: return "强趋势" elif uptrend_count == 3: return "震荡" elif uptrend_count == 5 or uptrend_count == 1: return "中趋势" elif uptrend_count == 4 or uptrend_count == 2: return "弱趋势" else: return "未知" # 理论上不应该到这里 def calculate_portfolio_summary(self): """ 计算投资组合整体表现汇总(支持两种交易逻辑对比) 返回: dict: 投资组合汇总信息 """ if not self.portfolio_results: return { 'total_trades': 0, 'logic1': { 'successful_trades': 0, 'failed_trades': 0, 'success_rate': 0.0, 'total_profit_loss': 0.0, 'final_capital': self.initial_trade_amount, 'total_return': 0.0 }, 'logic2': { 'successful_trades': 0, 'failed_trades': 0, 'success_rate': 0.0, 'total_profit_loss': 0.0, 'final_capital': self.initial_trade_amount, 'total_return': 0.0 } } total_trades = len(self.portfolio_results) initial_capital = total_trades * self.initial_trade_amount # 原逻辑(与影线方向相反交易)统计 successful_trades_1 = sum(1 for trade in self.portfolio_results if trade['conclusion'] == 'true') failed_trades_1 = total_trades - successful_trades_1 success_rate_1 = (successful_trades_1 / total_trades) * 100 if total_trades > 0 else 0 total_profit_loss_1 = sum(trade['profit_loss'] for trade in self.portfolio_results) final_capital_1 = initial_capital + total_profit_loss_1 total_return_1 = (total_profit_loss_1 / initial_capital * 100) if initial_capital > 0 else 0 # 新逻辑(与影线方向相同交易)统计 successful_trades_2 = sum(1 for trade in self.portfolio_results if trade['conclusion2'] == 'true') failed_trades_2 = total_trades - successful_trades_2 success_rate_2 = (successful_trades_2 / total_trades) * 100 if total_trades > 0 else 0 total_profit_loss_2 = sum(trade['profit_loss2'] for trade in self.portfolio_results) final_capital_2 = initial_capital + total_profit_loss_2 total_return_2 = (total_profit_loss_2 / initial_capital * 100) if initial_capital > 0 else 0 portfolio_summary = { 'total_trades': total_trades, 'initial_capital': initial_capital, 'logic1': { 'name': '原逻辑(与影线方向相反交易)', 'successful_trades': successful_trades_1, 'failed_trades': failed_trades_1, 'success_rate': round(success_rate_1, 2), 'total_profit_loss': round(total_profit_loss_1, 2), 'final_capital': round(final_capital_1, 2), 'total_return': round(total_return_1, 2) }, 'logic2': { 'name': '新逻辑(与影线方向相同交易)', 'successful_trades': successful_trades_2, 'failed_trades': failed_trades_2, 'success_rate': round(success_rate_2, 2), 'total_profit_loss': round(total_profit_loss_2, 2), 'final_capital': round(final_capital_2, 2), 'total_return': round(total_return_2, 2) } } if self.verbose_logging: print(f"\n=== 投资组合双向交易逻辑对比分析 ===") print(f"总交易次数: {total_trades}") print(f"初始资本: {initial_capital:.2f}") print(f"\n--- 原逻辑(与影线方向相反交易)---") print(f"成功交易: {successful_trades_1}") print(f"失败交易: {failed_trades_1}") print(f"成功率: {success_rate_1:.2f}%") print(f"总盈亏: {total_profit_loss_1:.2f}") print(f"最终资本: {final_capital_1:.2f}") print(f"总回报率: {total_return_1:.2f}%") print(f"\n--- 新逻辑(与影线方向相同交易)---") print(f"成功交易: {successful_trades_2}") print(f"失败交易: {failed_trades_2}") print(f"成功率: {success_rate_2:.2f}%") print(f"总盈亏: {total_profit_loss_2:.2f}") print(f"最终资本: {final_capital_2:.2f}") print(f"总回报率: {total_return_2:.2f}%") print(f"\n--- 逻辑对比 ---") print(f"成功率差异: {success_rate_2 - success_rate_1:.2f}% (新逻辑 - 原逻辑)") print(f"回报率差异: {total_return_2 - total_return_1:.2f}% (新逻辑 - 原逻辑)") return portfolio_summary def generate_csv_output(self, enhanced_pattern_dict): """ 生成CSV输出文件 """ if self.verbose_logging: print("\n=== 步骤6: 生成CSV输出文件 ===") # 准备CSV数据(按指定顺序排列字段) csv_data = [] for contract_code, patterns in enhanced_pattern_dict.items(): for signal_date, pattern_info in patterns.items(): csv_data.append({ # 按更新的字段顺序:contract_code, date, conclusion, conclusion2, body_direction, direction_consistency, price_direction, body_len, hatch_len, hatch/body, close, open, high, low, max_revenue, max_revenue_date, min_revenue, min_revenue_date, max_revenue2, max_revenue_date2, min_revenue2, min_revenue_date2, profit_loss, remaining_amount, trade_amount 'contract_code': contract_code, 'date': pattern_info['date'], 'conclusion': pattern_info['conclusion'], 'conclusion2': pattern_info['conclusion2'], 'body_direction': pattern_info['body_direction'], 'direction_consistency': pattern_info['direction_consistency'], 'price_direction': pattern_info['price_direction'], # 新增:基于收盘价与5K移动平均线关系的价格方向 'body_len': pattern_info['body_len'], 'hatch_len': pattern_info['hatch_len'], 'hatch/body': pattern_info['hatch_body_ratio'], 'close': pattern_info['close'], 'open': pattern_info['open'], 'high': pattern_info['high'], 'low': pattern_info['low'], 'max_revenue': pattern_info['max_revenue'], 'max_revenue_date': pattern_info['max_revenue_date'], 'min_revenue': pattern_info['min_revenue'], 'min_revenue_date': pattern_info['min_revenue_date'], 'max_revenue2': pattern_info['max_revenue2'], 'max_revenue_date2': pattern_info['max_revenue_date2'], 'min_revenue2': pattern_info['min_revenue2'], 'min_revenue_date2': pattern_info['min_revenue_date2'], 'profit_loss': pattern_info['profit_loss'], 'remaining_amount': pattern_info['remaining_amount'], 'trade_amount': pattern_info['trade_amount'] }) if csv_data: # 创建DataFrame并保存为CSV,严格按照用户指定的列顺序 df = pd.DataFrame(csv_data) # 定义更新的确切列顺序(包含新的price_direction字段) column_order = [ 'contract_code', 'date', 'conclusion', 'conclusion2', 'body_direction', 'direction_consistency', 'price_direction', 'body_len', 'hatch_len', 'hatch/body', 'close', 'open', 'high', 'low', 'max_revenue', 'max_revenue_date', 'min_revenue', 'min_revenue_date', 'max_revenue2', 'max_revenue_date2', 'min_revenue2', 'min_revenue_date2', 'profit_loss', 'remaining_amount', 'trade_amount' ] # 重新排列DataFrame的列顺序 df = df[column_order] timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S') filename = f'future_candlestick_hatch_analysis_{timestamp}.csv' df.to_csv(filename, index=False, encoding=self.output_encoding) if self.verbose_logging: print(f"分析结果已保存至: {filename}") print(f"总计 {len(csv_data)} 条记录") return filename else: if self.verbose_logging: print("没有数据可以输出") return None def run_complete_analysis(self): """执行完整的分析流程""" if self.verbose_logging: print("开始执行期货烛台影线形态分析") print("=" * 60) try: # 步骤1: 构建合约数据结构 contract_structure = self.build_contract_structure() if not contract_structure: if self.verbose_logging: print("未获取到有效合约,分析终止") return None # 步骤2: 收集扩展数据 extended_data = self.collect_extended_data() if not extended_data: if self.verbose_logging: print("未获取到有效数据,分析终止") return None # 步骤3: 计算实体长度阈值 threshold_data = self.calculate_body_thresholds(extended_data) if not threshold_data: if self.verbose_logging: print("未能计算有效阈值,分析终止") return None # 步骤4: 识别影线形态 pattern_dict = self.identify_hatch_patterns(threshold_data) if not pattern_dict: if self.verbose_logging: print("未检测到有效影线形态,分析终止") return None # 步骤5: 分析未来表现 enhanced_pattern_dict = self.analyze_future_performance(threshold_data, pattern_dict) # 步骤6: 生成CSV输出 output_filename = self.generate_csv_output(enhanced_pattern_dict) # 步骤7: 计算投资组合汇总 portfolio_summary = self.calculate_portfolio_summary() # 分析汇总 total_contracts = len(contract_structure) valid_contracts = len(threshold_data) total_patterns = sum(len(patterns) for patterns in enhanced_pattern_dict.values()) # 计算模式有效性指标 effective_patterns = 0 for contract_patterns in enhanced_pattern_dict.values(): for pattern_info in contract_patterns.values(): if pattern_info.get('conclusion') == 'true': effective_patterns += 1 effective_rate = (effective_patterns / total_patterns * 100) if total_patterns > 0 else 0 if self.verbose_logging: print("\n" + "=" * 60) print("分析完成汇总:") print(f"总合约数: {total_contracts}") print(f"有效合约数: {valid_contracts}") print(f"检测形态数: {total_patterns}") print(f"有效形态数: {effective_patterns}") print(f"有效率: {effective_rate:.1f}%") print(f"输出文件: {output_filename}") return { 'contract_structure': contract_structure, 'threshold_data': threshold_data, 'pattern_results': enhanced_pattern_dict, 'output_filename': output_filename, 'portfolio_summary': portfolio_summary, 'summary': { 'total_contracts': total_contracts, 'valid_contracts': valid_contracts, 'total_patterns': total_patterns, 'effective_patterns': effective_patterns, 'effective_rate': effective_rate } } except Exception as e: if self.verbose_logging: print(f"分析过程中出现错误: {str(e)}") import traceback traceback.print_exc() return None # ===================================================================================== # 主程序入口 # ===================================================================================== def run_candlestick_hatch_analysis(config=None): """运行期货烛台影线形态分析""" if config is None: config = CandlestickHatchConfig # 打印配置信息 config.print_config() # 创建分析器并运行 analyzer = FutureCandlestickHatchAnalyzer(config) results = analyzer.run_complete_analysis() return results # 执行分析 if __name__ == "__main__": print("期货烛台影线形态双向交易分析工具") print("研究带有明显影线的K线对未来价格走势的预测能力") print("双向交易逻辑对比测试:") print(" 原逻辑:上影线做空,下影线做多(与影线方向相反交易)") print(" 新逻辑:上影线做多,下影线做空(与影线方向相同交易)") print("适用于聚宽在线研究平台") results = run_candlestick_hatch_analysis() if results: print("\n✅ 分析执行成功!") summary = results['summary'] portfolio = results['portfolio_summary'] print(f"📊 结果摘要:") print(f" - 分析合约数: {summary['total_contracts']}") print(f" - 有效数据合约: {summary['valid_contracts']}") print(f" - 检测形态总数: {summary['total_patterns']}") print(f" - 有效形态数: {summary['effective_patterns']}") print(f" - 有效率: {summary['effective_rate']:.1f}%") print(f" - 输出文件: {results['output_filename']}") print(f"\n💰 双向交易逻辑对比分析:") print(f" - 总交易次数: {portfolio['total_trades']}") print(f" - 初始资本: {portfolio['initial_capital']:.2f}") print(f"\n 📈 原逻辑(与影线方向相反交易):") print(f" 成功率: {portfolio['logic1']['success_rate']:.2f}%") print(f" 总盈亏: {portfolio['logic1']['total_profit_loss']:.2f}") print(f" 总回报率: {portfolio['logic1']['total_return']:.2f}%") print(f"\n 📉 新逻辑(与影线方向相同交易):") print(f" 成功率: {portfolio['logic2']['success_rate']:.2f}%") print(f" 总盈亏: {portfolio['logic2']['total_profit_loss']:.2f}") print(f" 总回报率: {portfolio['logic2']['total_return']:.2f}%") print(f"\n 🔍 逻辑对比:") success_diff = portfolio['logic2']['success_rate'] - portfolio['logic1']['success_rate'] return_diff = portfolio['logic2']['total_return'] - portfolio['logic1']['total_return'] print(f" 成功率差异: {success_diff:+.2f}% (新逻辑 - 原逻辑)") print(f" 回报率差异: {return_diff:+.2f}% (新逻辑 - 原逻辑)") else: print("\n❌ 分析执行失败,请检查错误信息")