| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481 |
- """
- 期货烛台影线形态分析
- 研究期货合约中带有明显影线的K线对未来价格走势的预测能力
- 本程序实现完整的分析流程:
- 1. 数据结构设置 - 获取主力期货合约数据
- 2. 数据收集 - 扩展时间范围获取OHLC数据
- 3. 实体长度计算 - 计算历史平均实体长度作为阈值
- 4. 影线形态识别 - 识别满足条件的K线形态
- 5. 未来表现分析 - 分析影线方向与价格走势的关系
- 6. 结果输出 - 生成影线形态分析结果CSV
- 注:程序使用动态获取的主力合约进行分析,确保分析结果基于真实的交易活动
- 作者: jukuan研究团队
- 日期: 2025-09
- 适用平台: 聚宽在线研究平台
- """
- import pandas as pd
- import numpy as np
- from jqdata import *
- import datetime
- import warnings
- warnings.filterwarnings('ignore')
- # =====================================================================================
- # 分析配置参数 - 集中配置部分
- # =====================================================================================
- class CandlestickHatchConfig:
- """期货烛台影线形态分析配置参数"""
-
- # ==================== 时间范围设置 ====================
- ANALYSIS_START_DATE = datetime.datetime(2025, 8, 1) # 分析开始日期
- ANALYSIS_END_DATE = datetime.datetime(2025, 8, 20) # 分析结束日期
-
- # ==================== 时间扩展参数 ====================
- HISTORICAL_DAYS = 365 # 历史数据回溯期:优势期前天数
- FORWARD_DAYS = 15 # 正向分析期:优势期后天数
- FUTURE_MONITOR_DAYS = 15 # 未来表现跟踪期:交易日数
-
- # ==================== 影线形态参数 ====================
- HATCH_TO_BODY_RATIO = 1.2 # 影线与实体长度比率阈值
- OPPOSITE_HATCH_RATIO = 0.5 # 相反方向影线与实体长度比率阈值
-
- # ==================== 形态验证阈值 ====================
- MAX_REVENUE_THRESHOLD = 0.02 # 最大盈利阈值
- MAX_LOSS_THRESHOLD = 0.01 # 最大亏损阈值
-
- # ==================== 分析规模控制 ====================
- MAX_ANALYSIS_CONTRACTS = -1 # 最大分析合约数量限制
-
- # ==================== 输出设置 ====================
- OUTPUT_ENCODING = 'utf-8-sig' # 输出文件编码格式
- VERBOSE_LOGGING = True # 是否打印详细日志
-
- # ==================== 交易仓位管理 ====================
- INITIAL_TRADE_AMOUNT = 20000 # 单笔交易初始金额
- LOSS_RATE = 0.01 # 亏损交易的亏损率(可配置)
- PROFIT_DRAWDOWN_RATE = 0.02 # 盈利交易的回撤率(可配置)
-
- @classmethod
- def print_config(cls):
- """打印当前配置信息"""
- print("=== 期货烛台影线形态分析配置 ===")
- print(f"分析时间范围: {cls.ANALYSIS_START_DATE.strftime('%Y-%m-%d')} 至 {cls.ANALYSIS_END_DATE.strftime('%Y-%m-%d')}")
- print(f"历史数据回溯期: {cls.HISTORICAL_DAYS}天")
- print(f"前向分析期: {cls.FORWARD_DAYS}天")
- print(f"未来表现跟踪期: {cls.FUTURE_MONITOR_DAYS}个交易日")
- print(f"影线与实体长度比率阈值: {cls.HATCH_TO_BODY_RATIO}")
- print(f"相反方向影线比率阈值: {cls.OPPOSITE_HATCH_RATIO}")
- print(f"最大盈利阈值: {cls.MAX_REVENUE_THRESHOLD}")
- print(f"最大亏损阈值: {cls.MAX_LOSS_THRESHOLD}")
- print(f"最大分析合约数: {cls.MAX_ANALYSIS_CONTRACTS}")
- print(f"合约类型: 主力合约")
- print(f"详细日志: {'开启' if cls.VERBOSE_LOGGING else '关闭'}")
- print(f"初始交易金额: {cls.INITIAL_TRADE_AMOUNT}")
- print(f"亏损交易亏损率: {cls.LOSS_RATE}")
- print(f"盈利交易回撤率: {cls.PROFIT_DRAWDOWN_RATE}")
- print("=" * 50)
- class FutureCandlestickHatchAnalyzer:
- """期货烛台影线形态分析器"""
-
- def __init__(self, config=None):
- """初始化分析器"""
- # 使用配置类参数
- if config is None:
- config = CandlestickHatchConfig
-
- self.config = config
- self.historical_days = config.HISTORICAL_DAYS
- self.forward_days = config.FORWARD_DAYS
- self.future_monitor_days = config.FUTURE_MONITOR_DAYS
- self.hatch_to_body_ratio = config.HATCH_TO_BODY_RATIO
- self.opposite_hatch_ratio = config.OPPOSITE_HATCH_RATIO
- self.max_revenue_threshold = config.MAX_REVENUE_THRESHOLD
- self.max_loss_threshold = config.MAX_LOSS_THRESHOLD
- self.analysis_start_date = config.ANALYSIS_START_DATE
- self.analysis_end_date = config.ANALYSIS_END_DATE
- self.max_analysis_contracts = config.MAX_ANALYSIS_CONTRACTS
- self.output_encoding = config.OUTPUT_ENCODING
- self.verbose_logging = config.VERBOSE_LOGGING
-
- # 交易仓位管理参数
- self.initial_trade_amount = config.INITIAL_TRADE_AMOUNT
- self.loss_rate = config.LOSS_RATE
- self.profit_drawdown_rate = config.PROFIT_DRAWDOWN_RATE
-
- # 期货品种与交易所的映射关系
- self.exchange_map = {
- # 大连商品交易所 (XDCE)
- 'A': 'XDCE', 'B': 'XDCE', 'C': 'XDCE', 'CS': 'XDCE',
- 'I': 'XDCE', 'J': 'XDCE', 'JD': 'XDCE', 'JM': 'XDCE', 'L': 'XDCE',
- 'M': 'XDCE', 'P': 'XDCE', 'PP': 'XDCE', 'PG': 'XDCE', 'RR': 'XDCE',
- 'V': 'XDCE', 'Y': 'XDCE', 'EB': 'XDCE', 'EG': 'XDCE', 'LH': 'XDCE',
-
- # 郑州商品交易所 (XZCE)
- 'AP': 'XZCE', 'CF': 'XZCE', 'CY': 'XZCE', 'FG': 'XZCE', 'JR': 'XZCE',
- 'LR': 'XZCE', 'MA': 'XZCE', 'OI': 'XZCE', 'PM': 'XZCE', 'RI': 'XZCE',
- 'RM': 'XZCE', 'RS': 'XZCE', 'SF': 'XZCE', 'SM': 'XZCE', 'SR': 'XZCE',
- 'TA': 'XZCE', 'WH': 'XZCE', 'ZC': 'XZCE', 'CJ': 'XZCE',
- 'ME': 'XZCE', 'PF': 'XZCE', 'PK': 'XZCE', 'RO': 'XZCE', 'SA': 'XZCE',
- 'TC': 'XZCE', 'UR': 'XZCE', 'WS': 'XZCE', 'WT': 'XZCE',
-
- # 上海期货交易所 (XSGE)
- 'AG': 'XSGE', 'AL': 'XSGE', 'AU': 'XSGE', 'BU': 'XSGE', 'CU': 'XSGE',
- 'FU': 'XSGE', 'HC': 'XSGE', 'NI': 'XSGE', 'PB': 'XSGE', 'RB': 'XSGE',
- 'RU': 'XSGE', 'SN': 'XSGE', 'SP': 'XSGE', 'SS': 'XSGE', 'WR': 'XSGE',
- 'ZN': 'XSGE',
-
- # 上海国际能源交易中心 (XINE)
- 'BC': 'XINE', 'LU': 'XINE', 'NR': 'XINE', 'SC': 'XINE',
-
- # 中金所 (CCFX)
- 'IC': 'CCFX', 'IF': 'CCFX', 'IH': 'CCFX', 'T': 'CCFX', 'TF': 'CCFX', 'TS': 'CCFX',
-
- # 广期所 (GFEX)
- 'SI': 'GFEX', 'LC': 'GFEX', 'PS': 'GFEX'
- }
-
- # 存储结果的字典
- self.contract_data = {} # 主合约数据结构 - dict1
- self.hatch_patterns = {} # 影线形态数据 - dict2
-
- # 投资组合管理
- self.portfolio_results = [] # 存储所有交易结果
-
- if self.verbose_logging:
- print("初始化期货烛台影线形态分析器")
- print(f"配置参数 - 历史数据回溯期: {self.historical_days}天, 前向分析期: {self.forward_days}天")
- print(f"影线与实体长度比率阈值: {self.hatch_to_body_ratio}, 相反影线比率阈值: {self.opposite_hatch_ratio}")
- print(f"最大盈利阈值: {self.max_revenue_threshold}, 最大亏损阈值: {self.max_loss_threshold}")
- print(f"监控窗口: {self.future_monitor_days}个交易日")
- print(f"最大分析合约数: {self.max_analysis_contracts}")
- print(f"合约类型: 主力合约")
-
- def _is_trading_day(self, check_date):
- """
- 检查是否为交易日
-
- 参数:
- check_date (date): 要检查的日期
-
- 返回:
- bool: 是否为交易日
- """
- try:
- # 使用聚宽API检查交易日
- trade_days = get_trade_days(end_date=check_date, count=1)
- if len(trade_days) > 0:
- return trade_days[0].date() == check_date
- return False
- except:
- # 如果API调用失败,简单判断是否为工作日
- return check_date.weekday() < 5
-
- def _get_symbol_dominant_history(self, symbol, start_date, end_date):
- """
- 获取单个标的的主力合约历史变化
-
- 参数:
- symbol (str): 标的编码
- start_date (date): 开始日期
- end_date (date): 结束日期
-
- 返回:
- list: 主力合约记录列表
- """
- results = []
- current_date = start_date
- current_dominant = None
- period_start = None
-
- # if self.verbose_logging:
- # print(f" 获取 {symbol} 主力合约历史...")
-
- # 按日遍历时间范围
- while current_date <= end_date:
-
- # 跳过非交易日
- if not self._is_trading_day(current_date):
- current_date += datetime.timedelta(days=1)
- continue
-
- try:
- # 获取当日主力合约
- dominant_contract = get_dominant_future(symbol, current_date)
-
- if dominant_contract is None:
- current_date += datetime.timedelta(days=1)
- continue
-
- # 如果是新的主力合约
- if dominant_contract != current_dominant:
-
- # 如果不是第一个合约,结束上一个合约的记录
- if current_dominant is not None and period_start is not None:
- results.append({
- 'symbol': symbol,
- 'dominant_contract': current_dominant,
- 'start_date': period_start,
- 'end_date': current_date - datetime.timedelta(days=1)
- })
-
- # 开始新合约的记录
- current_dominant = dominant_contract
- period_start = current_date
-
- # if self.verbose_logging:
- # print(f" {current_date}: 主力合约变更为 {dominant_contract}")
-
- except Exception as e:
- if self.verbose_logging:
- print(f" 获取 {current_date} 的主力合约时出错: {str(e)}")
-
- current_date += datetime.timedelta(days=1)
-
- # 处理最后一个合约
- if current_dominant is not None and period_start is not None:
- results.append({
- 'symbol': symbol,
- 'dominant_contract': current_dominant,
- 'start_date': period_start,
- 'end_date': end_date
- })
-
- return results
-
- def get_dominant_contracts_history(self, symbol_list, start_date, end_date):
- """
- 获取指定时间范围内多个标的的主力合约历史变化
-
- 参数:
- symbol_list (list): 标的编码列表
- start_date (date): 开始日期
- end_date (date): 结束日期
-
- 返回:
- pandas.DataFrame: 包含主力合约变化历史的数据框
- """
- # if self.verbose_logging:
- # print(f"获取主力合约数据...")
- # print(f"标的列表: {symbol_list}")
- # print(f"时间范围: {start_date} 至 {end_date}")
-
- # 存储所有结果的列表
- all_results = []
-
- # 逐个处理每个标的
- for symbol in symbol_list:
- # if self.verbose_logging:
- # print(f"\n处理标的: {symbol}")
-
- try:
- # 获取该标的在指定时间范围内的主力合约历史
- symbol_results = self._get_symbol_dominant_history(symbol, start_date, end_date)
- all_results.extend(symbol_results)
-
- except Exception as e:
- if self.verbose_logging:
- print(f"处理标的 {symbol} 时发生错误: {str(e)}")
- continue
-
- # 将结果转换为DataFrame
- if all_results:
- result_df = pd.DataFrame(all_results)
- result_df = result_df.sort_values(['symbol', 'start_date']).reset_index(drop=True)
-
- if self.verbose_logging:
- print(f"\n成功获取 {len(result_df)} 条主力合约记录")
- return result_df
- else:
- if self.verbose_logging:
- print("\n未获取到任何主力合约数据")
- return pd.DataFrame(columns=['symbol', 'dominant_contract', 'start_date', 'end_date'])
- def build_contract_structure(self):
- """
- 构建期货主力合约数据结构 (dict1)
- 返回结构:Key=完整合约代码, Value=[start_date, end_date]
- """
- if self.verbose_logging:
- print("\n=== 步骤1: 构建期货主力合约数据结构 ===")
-
- # 分析时间范围
- analysis_start = self.analysis_start_date
- analysis_end = self.analysis_end_date
-
- if self.verbose_logging:
- print(f"分析时间范围: {analysis_start.strftime('%Y-%m-%d')} 至 {analysis_end.strftime('%Y-%m-%d')}")
- print(f"合约类型: 主力合约")
-
- # 构建主力合约数据结构
- contract_dict = self._build_dominant_contract_structure(analysis_start, analysis_end)
-
- self.contract_data = contract_dict
- if self.verbose_logging:
- print(f"构建完成,共{len(contract_dict)}个期货合约")
- return contract_dict
-
- def _build_dominant_contract_structure(self, analysis_start, analysis_end):
- """构建基于主力合约的数据结构"""
- if self.verbose_logging:
- print("使用主力合约获取逻辑...")
-
- # 获取要分析的期货品种列表
- if self.max_analysis_contracts > 0:
- symbol_list = list(self.exchange_map.keys())[:self.max_analysis_contracts]
- else:
- symbol_list = list(self.exchange_map.keys())
-
- # 获取主力合约历史数据
- dominant_df = self.get_dominant_contracts_history(
- symbol_list,
- analysis_start.date(),
- analysis_end.date()
- )
-
- contract_dict = {}
-
- if not dominant_df.empty:
- # 将DataFrame转换为字典结构
- for _, row in dominant_df.iterrows():
- contract_code = row['dominant_contract']
- start_date = row['start_date'] if isinstance(row['start_date'], datetime.date) else row['start_date'].date()
- end_date = row['end_date'] if isinstance(row['end_date'], datetime.date) else row['end_date'].date()
-
- contract_dict[contract_code] = [start_date, end_date]
-
- # if self.verbose_logging:
- # print(f" 添加主力合约: {contract_code} ({start_date} 至 {end_date})")
-
- return contract_dict
-
- def collect_extended_data(self):
- """
- 数据收集:为每个合约收集扩展时间范围的数据
- 扩展期限:从(start_date-365天)到(end_date+15天)
- """
- if self.verbose_logging:
- print("\n=== 步骤2: 收集扩展时间范围的数据 ===")
-
- extended_data = {}
-
- for contract_code, period in self.contract_data.items():
- start_date, end_date = period
-
- # 计算扩展期限
- start_date_2 = start_date - datetime.timedelta(days=self.historical_days)
- end_date_2 = end_date + datetime.timedelta(days=self.forward_days)
-
- # if self.verbose_logging:
- # print(f"获取 {contract_code} 数据")
- # print(f" 原始期间: {start_date} 至 {end_date}")
- # print(f" 扩展期间: {start_date_2} 至 {end_date_2}")
-
- try:
- # 获取日线数据
- data = get_price(
- contract_code,
- start_date=start_date_2,
- end_date=end_date_2,
- frequency='daily',
- fields=['open', 'close', 'high', 'low', 'volume'],
- skip_paused=False,
- panel=False
- )
-
- if data is not None and len(data) > 0:
- extended_data[contract_code] = {
- 'data': data,
- 'original_start': start_date,
- 'original_end': end_date,
- 'extended_start': start_date_2,
- 'extended_end': end_date_2
- }
-
- # if self.verbose_logging:
- # print(f" 成功获取{len(data)}条数据记录")
- # else:
- # if self.verbose_logging:
- # print(f" 未获取到数据")
-
- except Exception as e:
- if self.verbose_logging:
- print(f" 获取数据失败: {str(e)}")
- continue
-
- if self.verbose_logging:
- print(f"数据收集完成,共{len(extended_data)}个合约有效数据")
- return extended_data
-
- def calculate_body_thresholds(self, extended_data):
- """
- 计算实体长度阈值:使用历史数据计算平均实体长度
- """
- if self.verbose_logging:
- print("\n=== 步骤3: 计算实体长度阈值 ===")
-
- threshold_data = {}
-
- for contract_code, contract_info in extended_data.items():
- data = contract_info['data']
- start_date = contract_info['original_start']
- start_date_2 = contract_info['extended_start']
-
- # if self.verbose_logging:
- # print(f"计算 {contract_code} 的实体长度阈值")
- # print(f" 历史期间: {start_date_2} 至 {start_date}")
-
- # 筛选历史数据:date >= start_date_2 且 date < start_date
- historical_mask = (data.index.date >= start_date_2) & (data.index.date < start_date)
- historical_data = data[historical_mask]
-
- if len(historical_data) == 0:
- if self.verbose_logging:
- print(f" 历史数据无效,跳过")
- continue
-
- # 计算实体长度
- historical_data = historical_data.copy()
- historical_data['body_length'] = abs(historical_data['close'] - historical_data['open'])
-
- # 计算平均实体长度作为阈值
- body_threshold = historical_data['body_length'].mean()
-
- threshold_data[contract_code] = {
- 'contract_info': contract_info,
- 'body_threshold': body_threshold,
- 'historical_days': len(historical_data)
- }
-
- # if self.verbose_logging:
- # print(f" 实体长度阈值: {body_threshold:.4f} (基于{len(historical_data)}个历史交易日)")
-
- if self.verbose_logging:
- print("实体长度阈值计算完成")
- return threshold_data
-
- def identify_hatch_patterns(self, threshold_data):
- """
- 影线形态识别:扫描符合条件的K线
- 条件:实体长度 >= 阈值 且 影线长度 >= 1.2 × 实体长度
- """
- if self.verbose_logging:
- print(f"\n=== 步骤4: 识别影线形态 ===")
-
- pattern_dict = {} # dict2结构
-
- for contract_code, threshold_info in threshold_data.items():
- contract_info = threshold_info['contract_info']
- data = contract_info['data']
- body_threshold = threshold_info['body_threshold']
- start_date = contract_info['original_start'] # 使用原始开始日期
- end_date = contract_info['original_end']
-
- # if self.verbose_logging:
- # print(f"分析 {contract_code} 的影线形态")
- # print(f" 扫描期间: {start_date} 至 {end_date}")
- # print(f" 实体长度阈值: {body_threshold:.4f}")
-
- # 筛选扫描数据:从start_date到end_date(含)
- scan_mask = (data.index.date >= start_date) & (data.index.date <= end_date)
- scan_data = data[scan_mask]
-
- contract_patterns = {}
- pattern_count = 0
-
- for idx, (signal_date, row) in enumerate(scan_data.iterrows()):
- # 计算K线各项数值
- open_price = row['open']
- close_price = row['close']
- high_price = row['high']
- low_price = row['low']
-
- # 计算实体长度
- body_length = abs(close_price - open_price)
-
- # 检查实体长度是否满足阈值
- if body_length < body_threshold:
- continue
-
- # 计算影线长度
- upper_hatch = high_price - max(open_price, close_price)
- lower_hatch = min(open_price, close_price) - low_price
-
- # 检查影线条件
- hatch_direction = None
- hatch_length = 0
-
- # 检查上影线条件:上影线长度符合要求 且 下影线长度小于实体长度的一半
- if (upper_hatch >= self.hatch_to_body_ratio * body_length and
- lower_hatch < self.opposite_hatch_ratio * body_length):
- hatch_direction = 'up'
- hatch_length = upper_hatch
- # 检查下影线条件:下影线长度符合要求 且 上影线长度小于实体长度的一半
- elif (lower_hatch >= self.hatch_to_body_ratio * body_length and
- upper_hatch < self.opposite_hatch_ratio * body_length):
- hatch_direction = 'down'
- hatch_length = lower_hatch
-
- # 如果满足影线条件,记录形态
- if hatch_direction is not None:
- pattern_count += 1
-
- # dict3结构
- pattern_info = {
- 'date': signal_date.strftime('%Y-%m-%d'),
- 'high': float(high_price),
- 'low': float(low_price),
- 'open': float(open_price),
- 'close': float(close_price), # 标准收盘价
- 'body_len': float(body_length),
- 'hatch_dire': hatch_direction,
- 'hatch_len': float(hatch_length)
- }
-
- contract_patterns[signal_date] = pattern_info
-
- # if self.verbose_logging:
- # opposite_hatch_len = lower_hatch if hatch_direction == 'up' else upper_hatch
- # print(f" 发现形态: {signal_date.strftime('%Y-%m-%d')} {hatch_direction}影线 实体:{body_length:.4f} 影线:{hatch_length:.4f} 相反影线:{opposite_hatch_len:.4f}")
-
- if contract_patterns:
- pattern_dict[contract_code] = contract_patterns
- # if self.verbose_logging:
- # print(f" 检测到{pattern_count}个有效影线形态")
- # else:
- # if self.verbose_logging:
- # print(f" 未检测到有效影线形态")
-
- # if self.verbose_logging:
- # total_patterns = sum(len(patterns) for patterns in pattern_dict.values())
- # print(f"\n影线形态识别完成,共{total_patterns}个形态")
-
- self.hatch_patterns = pattern_dict
- return pattern_dict
-
- def analyze_future_performance(self, threshold_data, pattern_dict):
- """
- 未来表现分析:分析影线形态对未来价格走势的预测能力
- 交易方向:上影线做空,下影线做多
- """
- if self.verbose_logging:
- print(f"\n=== 步骤5: 分析未来表现 ===")
-
- enhanced_pattern_dict = {}
-
- for contract_code, contract_patterns in pattern_dict.items():
- # 获取对应的数据信息
- contract_info = threshold_data[contract_code]['contract_info']
- data = contract_info['data']
- end_date_2 = contract_info['extended_end']
-
- if self.verbose_logging:
- print(f"\n分析 {contract_code} 的未来表现")
- print(f" 监控窗口: {self.future_monitor_days}个交易日")
-
- enhanced_patterns = {}
-
- for signal_date, pattern_info in contract_patterns.items():
- hatch_direction = pattern_info['hatch_dire']
- standard_close = pattern_info['close'] # 标准收盘价
- print(f"形态日期:{signal_date.strftime('%Y-%m-%d')}, 标准收盘价: {standard_close}")
-
- # 在数据中找到信号日期的位置
- signal_idx = None
- for idx, (date, row) in enumerate(data.iterrows()):
- if date.date() == signal_date.date():
- signal_idx = idx
- break
-
- if signal_idx is None:
- if self.verbose_logging:
- print(f" 未找到信号日期 {signal_date.strftime('%Y-%m-%d')} 在数据中")
- continue
-
- # 分析未来N天的表现
- future_analysis = self._analyze_future_n_days(
- data, signal_idx, hatch_direction, standard_close,
- self.future_monitor_days, end_date_2
- )
-
- # 计算最大收益和最大风险(原逻辑:与影线方向相反交易)
- max_revenue_info, min_revenue_info = self._calculate_max_revenue_risk(future_analysis, hatch_direction, standard_close)
-
- # 计算最大收益和最大风险(新逻辑:与影线方向相同交易)
- max_revenue_info_2, min_revenue_info_2 = self._calculate_max_revenue_risk_2(future_analysis, hatch_direction, standard_close)
-
- # if self.verbose_logging:
- # print(f" 原逻辑最大收益: {max_revenue_info['revenue']:.4f} ({max_revenue_info['date']})")
- # print(f" 原逻辑最大亏损: {min_revenue_info['revenue']:.4f} ({min_revenue_info['date']})")
- # print(f" 新逻辑最大收益: {max_revenue_info_2['revenue']:.4f} ({max_revenue_info_2['date']})")
- # print(f" 新逻辑最大亏损: {min_revenue_info_2['revenue']:.4f} ({min_revenue_info_2['date']})")
- # 计算conclusion字段(增强版,包含日损失检查)
- conclusion = self._calculate_conclusion(max_revenue_info, min_revenue_info, future_analysis, hatch_direction, standard_close)
-
- # 计算conclusion2字段(新逻辑:与影线方向相同交易)
- conclusion_2 = self._calculate_conclusion_2(max_revenue_info_2, min_revenue_info_2, future_analysis, hatch_direction, standard_close)
-
- # 计算新增字段
- body_direction, direction_consistency = self._calculate_direction_fields(pattern_info, hatch_direction)
-
- # 计算影线与实体长度比率
- hatch_to_body_ratio = pattern_info['hatch_len'] / pattern_info['body_len'] if pattern_info['body_len'] > 0 else 0
-
- # 计算移动平均线
- ma_values = self._calculate_moving_averages(data, signal_idx)
-
- # 计算基于收盘价与5K移动平均线关系的价格方向
- price_direction = self._calculate_price_direction(standard_close, ma_values['ma5'])
-
- # 计算趋势强度
- trend_strength = self._calculate_trend_strength(ma_values)
-
- # 动态仓位计算
- remaining_amount = self._calculate_dynamic_position(
- conclusion, max_revenue_info['revenue'], self.initial_trade_amount
- )
-
- # 记录到投资组合(包含两种交易逻辑)
- self._add_portfolio_trade(
- contract_code,
- signal_date.strftime('%Y-%m-%d'),
- conclusion,
- conclusion_2,
- max_revenue_info['revenue'],
- max_revenue_info_2['revenue'],
- self.initial_trade_amount,
- remaining_amount
- )
-
- # 增强pattern_info
- enhanced_pattern = pattern_info.copy()
- enhanced_pattern.update({
- 'max_revenue_date': max_revenue_info['date'],
- 'max_revenue': max_revenue_info['revenue'],
- 'min_revenue_date': min_revenue_info['date'],
- 'min_revenue': min_revenue_info['revenue'],
- 'max_revenue_date2': max_revenue_info_2['date'],
- 'max_revenue2': max_revenue_info_2['revenue'],
- 'min_revenue_date2': min_revenue_info_2['date'],
- 'min_revenue2': min_revenue_info_2['revenue'],
- 'conclusion': conclusion,
- 'conclusion2': conclusion_2,
- 'body_direction': body_direction,
- 'direction_consistency': direction_consistency,
- 'price_direction': price_direction, # 新增:基于收盘价与5K移动平均线关系的价格方向
- 'trade_amount': float(self.initial_trade_amount),
- 'remaining_amount': float(remaining_amount),
- 'profit_loss': float(remaining_amount - self.initial_trade_amount),
- 'hatch_body_ratio': float(hatch_to_body_ratio),
- 'ma5': ma_values['ma5'],
- 'ma10': ma_values['ma10'],
- 'ma20': ma_values['ma20'],
- 'ma30': ma_values['ma30'],
- 'trend_strength': trend_strength
- })
-
- enhanced_patterns[signal_date] = enhanced_pattern
-
- if self.verbose_logging:
- print(f" {signal_date.strftime('%Y-%m-%d')} {hatch_direction}影线:")
- print(f" 原逻辑 - 最大收益: {max_revenue_info['revenue']:.4f} ({max_revenue_info['date']})")
- print(f" 原逻辑 - 最大亏损: {min_revenue_info['revenue']:.4f} ({min_revenue_info['date']})")
- print(f" 原逻辑 - 结论: {conclusion}")
- print(f" 新逻辑 - 最大收益: {max_revenue_info_2['revenue']:.4f} ({max_revenue_info_2['date']})")
- print(f" 新逻辑 - 最大亏损: {min_revenue_info_2['revenue']:.4f} ({min_revenue_info_2['date']})")
- print(f" 新逻辑 - 结论: {conclusion_2}")
- print(f" 实体方向: {body_direction}")
- print(f" 方向一致性: {direction_consistency}")
- print(f" 价格方向: {price_direction} (收盘价vs5K线)")
-
- if enhanced_patterns:
- enhanced_pattern_dict[contract_code] = enhanced_patterns
- if self.verbose_logging:
- print(f" 完成{len(enhanced_patterns)}个形态的未来表现分析")
-
- if self.verbose_logging:
- total_patterns = sum(len(patterns) for patterns in enhanced_pattern_dict.values())
- print(f"\n未来表现分析完成,共{total_patterns}个形态")
-
- return enhanced_pattern_dict
-
- def _analyze_future_n_days(self, data, signal_idx, hatch_direction, standard_close, monitor_days, end_date_2):
- """分析信号后未来N天的价格表现"""
- future_prices = []
- data_list = list(data.iterrows())
-
- for i in range(1, monitor_days + 1):
- future_idx = signal_idx + i
- if future_idx < len(data_list):
- future_date, future_row = data_list[future_idx]
-
- # 检查是否超出扩展结束日期
- if future_date.date() > end_date_2:
- break
-
- future_prices.append({
- 'date': future_date.strftime('%Y-%m-%d'),
- 'high': float(future_row['high']),
- 'low': float(future_row['low'])
- })
- else:
- break
-
- return future_prices
-
- def _calculate_max_revenue_risk(self, future_analysis, hatch_direction, standard_close):
- """计算最大收益和最大风险(原逻辑:与影线方向相反交易)"""
- if not future_analysis:
- return {'date': None, 'revenue': 0.0}, {'date': None, 'revenue': 0.0}
-
- max_revenue = float('-inf')
- min_revenue = float('inf')
- max_revenue_date = None
- min_revenue_date = None
-
- for day_data in future_analysis:
- date = day_data['date']
- high = day_data['high']
- low = day_data['low']
-
- if hatch_direction == 'up':
- # 上影线做空:收益 = (标准收盘价 - 最低价) / 标准收盘价,风险 = (标准收盘价 - 最高价) / 标准收盘价
- revenue = (standard_close - low) / standard_close
- risk = (standard_close - high) / standard_close
- else:
- # 下影线做多:收益 = (最高价 - 标准收盘价) / 标准收盘价,风险 = (最低价 - 标准收盘价) / 标准收盘价
- revenue = (high - standard_close) / standard_close
- risk = (low - standard_close) / standard_close
-
- # 更新最大收益
- if revenue > max_revenue:
- max_revenue = round(revenue, 4)
- max_revenue_date = date
-
- # 更新最大风险(最大亏损,即最小收益的相反数)
- if risk < min_revenue:
- min_revenue = round(risk, 4)
- min_revenue_date = date
-
- return {'date': max_revenue_date, 'revenue': float(max_revenue)}, {'date': min_revenue_date, 'revenue': float(min_revenue)}
-
- def _calculate_max_revenue_risk_2(self, future_analysis, hatch_direction, standard_close):
- """计算最大收益和最大风险(新逻辑:与影线方向相同交易)"""
- if not future_analysis:
- return {'date': None, 'revenue': 0.0}, {'date': None, 'revenue': 0.0}
-
- max_revenue_2 = float('-inf')
- min_revenue_2 = float('inf')
- max_revenue_date_2 = None
- min_revenue_date_2 = None
-
- for day_data in future_analysis:
- date = day_data['date']
- high = day_data['high']
- low = day_data['low']
-
- if hatch_direction == 'up':
- # 上影线做多:收益 = (最高价 - 标准收盘价) / 标准收盘价,风险 = (最低价 - 标准收盘价) / 标准收盘价
- revenue_2 = (high - standard_close) / standard_close
- risk_2 = (low - standard_close) / standard_close
- else:
- # 下影线做空:收益 = (标准收盘价 - 最低价) / 标准收盘价,风险 = (标准收盘价 - 最高价) / 标准收盘价
- revenue_2 = (standard_close - low) / standard_close
- risk_2 = (standard_close - high) / standard_close
-
- # 更新最大收益
- if revenue_2 > max_revenue_2:
- max_revenue_2 = round(revenue_2, 4)
- max_revenue_date_2 = date
-
- # 更新最大风险(最大亏损,即最小收益的相反数)
- if risk_2 < min_revenue_2:
- min_revenue_2 = round(risk_2, 4)
- min_revenue_date_2 = date
-
- return {'date': max_revenue_date_2, 'revenue': float(max_revenue_2)}, {'date': min_revenue_date_2, 'revenue': float(min_revenue_2)}
-
- def _calculate_conclusion(self, max_revenue_info, min_revenue_info, future_analysis, hatch_direction, standard_close):
- """
- 计算形态的结论:是否为有效的预测信号(增强版)
-
- 逻辑:
- - 若满足以下条件则返回 "true":
- 1. (最大盈利日期 < 最大亏损日期)且(最大盈利 > max_revenue_threshold)
- 2. 或者(最大盈利日期 > 最大亏损日期)且(最大盈利 > max_revenue_threshold)且(最大亏损 < max_loss_threshold)
- - 增强条件:如果在最大收益日期之前的任何一天,损失超过了MAX_LOSS_THRESHOLD,则返回 "false"
- - 否则返回 "false"
- """
- max_revenue_date = max_revenue_info['date']
- max_revenue = max_revenue_info['revenue']
- min_revenue_date = min_revenue_info['date']
- min_revenue = min_revenue_info['revenue']
-
- # 如果日期为空,返回false
- if max_revenue_date is None or min_revenue_date is None:
- return "false"
-
- # 转换日期字符串为datetime对象进行比较
- try:
- from datetime import datetime
- max_revenue_dt = datetime.strptime(max_revenue_date, '%Y-%m-%d')
- min_revenue_dt = datetime.strptime(min_revenue_date, '%Y-%m-%d')
- except:
- return "false"
-
- # 增强检查:检查最大收益日期之前是否有任何一天的损失超过阈值
- if future_analysis and self._check_early_excessive_loss(future_analysis, max_revenue_date, hatch_direction, standard_close):
- return "false"
-
- # 条件1:(最大盈利日期 < 最大亏损日期)且(最大盈利 > 阈值)
- condition1 = (max_revenue_dt < min_revenue_dt) and (max_revenue > self.max_revenue_threshold)
-
- # 条件2:(最大盈利日期 > 最大亏损日期)且(最大盈利 > 阈值)且(最大亏损 < 阈值)
- condition2 = (max_revenue_dt > min_revenue_dt) and (max_revenue > self.max_revenue_threshold) and (abs(min_revenue) < self.max_loss_threshold)
-
- return "true" if (condition1 or condition2) else "false"
-
- def _check_early_excessive_loss(self, future_analysis, max_revenue_date, hatch_direction, standard_close):
- """
- 检查在最大收益日期之前是否有任何一天的损失超过阈值(原逻辑:与影线方向相反交易)
-
- 参数:
- future_analysis: 未来价格分析数据
- max_revenue_date: 最大收益日期
- hatch_direction: 影线方向
- standard_close: 标准收盘价
-
- 返回:
- bool: True表示存在早期过度损失
- """
- from datetime import datetime
- try:
- max_revenue_dt = datetime.strptime(max_revenue_date, '%Y-%m-%d')
- except:
- return False
-
- for day_data in future_analysis:
- day_date_str = day_data['date']
- try:
- day_dt = datetime.strptime(day_date_str, '%Y-%m-%d')
-
- # 只检查最大收益日期之前的日期
- if day_dt >= max_revenue_dt:
- continue
-
- high = day_data['high']
- low = day_data['low']
-
- if hatch_direction == 'up':
- # 上影线做空:亏损 = (标准收盘价 - 最高价) / 标准收盘价,当价格上涨时为负值(亏损)
- loss_value = (standard_close - high) / standard_close
- else:
- # 下影线做多:亏损 = (最低价 - 标准收盘价) / 标准收盘价,当价格下跌时为负值(亏损)
- loss_value = (low - standard_close) / standard_close
-
- # 检查亏损是否超过阈值:亏损为负值,阈值为正值,所以检查 abs(亏损) > 阈值
- if loss_value < 0 and abs(loss_value) > self.max_loss_threshold:
- return True
-
- except:
- continue
-
- return False
-
- def _calculate_conclusion_2(self, max_revenue_info_2, min_revenue_info_2, future_analysis, hatch_direction, standard_close):
- """
- 计算形态的结论2:是否为有效的预测信号(新逻辑:与影线方向相同交易)
-
- 逻辑:
- - 若满足以下条件则返回 "true":
- 1. (最大盈利日期 < 最大亏损日期)且(最大盈利 > max_revenue_threshold)
- 2. 或者(最大盈利日期 > 最大亏损日期)且(最大盈利 > max_revenue_threshold)且(最大亏损 < max_loss_threshold)
- - 增强条件:如果在最大收益日期之前的任何一天,损失超过了MAX_LOSS_THRESHOLD,则返回 "false"
- - 否则返回 "false"
- """
- max_revenue_date_2 = max_revenue_info_2['date']
- max_revenue_2 = max_revenue_info_2['revenue']
- min_revenue_date_2 = min_revenue_info_2['date']
- min_revenue_2 = min_revenue_info_2['revenue']
-
- # 如果日期为空,返回false
- if max_revenue_date_2 is None or min_revenue_date_2 is None:
- return "false"
-
- # 转换日期字符串为datetime对象进行比较
- try:
- from datetime import datetime
- max_revenue_dt_2 = datetime.strptime(max_revenue_date_2, '%Y-%m-%d')
- min_revenue_dt_2 = datetime.strptime(min_revenue_date_2, '%Y-%m-%d')
- except:
- return "false"
-
- # 增强检查:检查最大收益日期之前是否有任何一天的损失超过阈值(新逻辑)
- if future_analysis and self._check_early_excessive_loss_2(future_analysis, max_revenue_date_2, hatch_direction, standard_close):
- return "false"
-
- # 条件1:(最大盈利日期 < 最大亏损日期)且(最大盈利 > 阈值)
- condition1 = (max_revenue_dt_2 < min_revenue_dt_2) and (max_revenue_2 > self.max_revenue_threshold)
-
- # 条件2:(最大盈利日期 > 最大亏损日期)且(最大盈利 > 阈值)且(最大亏损 < 阈值)
- condition2 = (max_revenue_dt_2 > min_revenue_dt_2) and (max_revenue_2 > self.max_revenue_threshold) and (abs(min_revenue_2) < self.max_loss_threshold)
-
- return "true" if (condition1 or condition2) else "false"
-
- def _check_early_excessive_loss_2(self, future_analysis, max_revenue_date_2, hatch_direction, standard_close):
- """
- 检查在最大收益日期之前是否有任何一天的损失超过阈值(新逻辑:与影线方向相同交易)
-
- 参数:
- future_analysis: 未来价格分析数据
- max_revenue_date_2: 最大收益日期
- hatch_direction: 影线方向
- standard_close: 标准收盘价
-
- 返回:
- bool: True表示存在早期过度损失
- """
- from datetime import datetime
- try:
- max_revenue_dt_2 = datetime.strptime(max_revenue_date_2, '%Y-%m-%d')
- except:
- return False
-
- for day_data in future_analysis:
- day_date_str = day_data['date']
- try:
- day_dt = datetime.strptime(day_date_str, '%Y-%m-%d')
-
- # 只检查最大收益日期之前的日期
- if day_dt >= max_revenue_dt_2:
- continue
-
- high = day_data['high']
- low = day_data['low']
-
- if hatch_direction == 'up':
- # 上影线做多:亏损 = (最低价 - 标准收盘价) / 标准收盘价,当价格下跌时为负值(亏损)
- loss_value = (low - standard_close) / standard_close
- else:
- # 下影线做空:亏损 = (标准收盘价 - 最高价) / 标准收盘价,当价格上涨时为负值(亏损)
- loss_value = (standard_close - high) / standard_close
-
- # 检查亏损是否超过阈值:亏损为负值,阈值为正值,所以检查 abs(亏损) > 阈值
- if loss_value < 0 and abs(loss_value) > self.max_loss_threshold:
- return True
-
- except:
- continue
-
- return False
-
- def _calculate_direction_fields(self, pattern_info, hatch_direction):
- """
- 计算body_direction和direction_consistency字段
-
- 参数:
- pattern_info: K线形态信息
- hatch_direction: 影线方向 ('up' 或 'down')
-
- 返回:
- tuple: (body_direction, direction_consistency)
- """
- open_price = pattern_info['open']
- close_price = pattern_info['close']
-
- # 计算body_direction:实体方向
- if close_price > open_price:
- body_direction = 'up'
- elif close_price < open_price:
- body_direction = 'down'
- else:
- body_direction = 'flat' # 十字星情况
-
- # 计算direction_consistency:检查实体方向与影线方向是否相反
- if body_direction == 'flat':
- direction_consistency = 'false' # 十字星情况无法判断一致性
- elif (body_direction == 'up' and hatch_direction == 'down') or \
- (body_direction == 'down' and hatch_direction == 'up'):
- direction_consistency = 'true' # 相反,符合预期
- else:
- direction_consistency = 'false' # 不相反
-
- return body_direction, direction_consistency
-
- def _calculate_price_direction(self, close_price, ma5_value):
- """
- 计算基于收盘价与5K移动平均线关系的价格方向
-
- 参数:
- close_price: 形态形成日的收盘价
- ma5_value: 5日移动平均线价格
-
- 返回:
- str: 价格方向分类 ("上涨" 或 "下跌")
- """
- if ma5_value is None:
- return "数据不足" # 如果MA5数据不足,返回数据不足
-
- if close_price >= ma5_value:
- return "上涨"
- else:
- return "下跌"
-
- def _calculate_dynamic_position(self, conclusion, max_revenue, trade_amount):
- """
- 计算动态仓位调整后的剩余资金
-
- 参数:
- conclusion: 交易结论 ('true' 为正确, 'false' 为错误)
- max_revenue: 最大盈利率
- trade_amount: 当前交易金额
-
- 返回:
- float: 调整后的剩余资金
- """
- if conclusion == "false": # 亏损交易(错误结果)
- remaining_amount = trade_amount * (1 - self.loss_rate)
- if self.verbose_logging:
- print(f" 亏损交易,剩余资金: {remaining_amount:.2f} (亏损率: {self.loss_rate})")
- else: # 盈利交易(正确结果)
- remaining_amount = trade_amount * (1 + max_revenue - self.profit_drawdown_rate)
- if self.verbose_logging:
- print(f" 盈利交易,剩余资金: {remaining_amount:.2f} (最大利润: {max_revenue:.4f}, 回撤率: {self.profit_drawdown_rate})")
-
- return remaining_amount
-
- def _add_portfolio_trade(self, contract_code, date, conclusion, conclusion_2, max_revenue, max_revenue_2, trade_amount, remaining_amount):
- """
- 添加交易记录到投资组合(支持两种交易逻辑)
-
- 参数:
- contract_code: 合约代码
- date: 交易日期
- conclusion: 原逻辑交易结论
- conclusion_2: 新逻辑交易结论
- max_revenue: 原逻辑最大盈利率
- max_revenue_2: 新逻辑最大盈利率
- trade_amount: 交易金额
- remaining_amount: 剩余金额(基于原逻辑计算)
- """
- # 计算新逻辑的剩余金额
- remaining_amount_2 = self._calculate_dynamic_position(
- conclusion_2, max_revenue_2, trade_amount
- )
-
- trade_record = {
- 'contract_code': contract_code,
- 'date': date,
- 'conclusion': conclusion,
- 'conclusion2': conclusion_2,
- 'max_revenue': max_revenue,
- 'max_revenue2': max_revenue_2,
- 'trade_amount': trade_amount,
- 'remaining_amount': remaining_amount,
- 'remaining_amount2': remaining_amount_2,
- 'profit_loss': remaining_amount - trade_amount,
- 'profit_loss2': remaining_amount_2 - trade_amount
- }
- self.portfolio_results.append(trade_record)
-
- def _calculate_moving_averages(self, data, signal_idx):
- """
- 计算形态形成日的移动平均线
-
- 参数:
- data: 价格数据DataFrame
- signal_idx: 信号日期在数据中的索引
-
- 返回:
- dict: 包含各移动平均线的字典
- """
- if signal_idx < 29: # 至少需要30天数据来计算30日移动平均
- return {'ma5': None, 'ma10': None, 'ma20': None, 'ma30': None}
-
- # 获取到信号日期为止的收盘价数据
- data_list = list(data.iterrows())
- closes = []
- for i in range(max(0, signal_idx - 29), signal_idx + 1): # 取30天数据
- if i < len(data_list):
- _, row = data_list[i]
- closes.append(row['close'])
-
- if len(closes) < 30:
- return {'ma5': None, 'ma10': None, 'ma20': None, 'ma30': None}
-
- # 计算移动平均线(使用最后的N天数据)
- ma5 = sum(closes[-5:]) / 5 if len(closes) >= 5 else None
- ma10 = sum(closes[-10:]) / 10 if len(closes) >= 10 else None
- ma20 = sum(closes[-20:]) / 20 if len(closes) >= 20 else None
- ma30 = sum(closes[-30:]) / 30 if len(closes) >= 30 else None
-
- return {
- 'ma5': round(ma5, 2) if ma5 is not None else None,
- 'ma10': round(ma10, 2) if ma10 is not None else None,
- 'ma20': round(ma20, 2) if ma20 is not None else None,
- 'ma30': round(ma30, 2) if ma30 is not None else None
- }
-
- def _calculate_trend_strength(self, ma_values):
- """
- 计算趋势强度评估
-
- 参数:
- ma_values: 移动平均线字典
-
- 返回:
- str: 趋势强度分类
- """
- ma5 = ma_values['ma5']
- ma10 = ma_values['ma10']
- ma20 = ma_values['ma20']
- ma30 = ma_values['ma30']
-
- # 如果有任何移动平均线为空,返回"数据不足"
- if None in [ma5, ma10, ma20, ma30]:
- return "数据不足"
-
- # 进行6项趋势比较
- comparisons = [
- ma5 >= ma10, # 5K≥10K
- ma10 >= ma20, # 10K≥20K
- ma20 >= ma30, # 20K≥30K
- ma5 >= ma20, # 5K≥20K
- ma10 >= ma30, # 10K≥30K
- ma5 >= ma30 # 5K≥30K
- ]
-
- # 计算上涨趋势的数量
- uptrend_count = sum(comparisons)
-
- # 根据上涨趋势数量进行分类
- if uptrend_count == 6 or uptrend_count == 0:
- return "强趋势"
- elif uptrend_count == 3:
- return "震荡"
- elif uptrend_count == 5 or uptrend_count == 1:
- return "中趋势"
- elif uptrend_count == 4 or uptrend_count == 2:
- return "弱趋势"
- else:
- return "未知" # 理论上不应该到这里
-
- def calculate_portfolio_summary(self):
- """
- 计算投资组合整体表现汇总(支持两种交易逻辑对比)
-
- 返回:
- dict: 投资组合汇总信息
- """
- if not self.portfolio_results:
- return {
- 'total_trades': 0,
- 'logic1': {
- 'successful_trades': 0,
- 'failed_trades': 0,
- 'success_rate': 0.0,
- 'total_profit_loss': 0.0,
- 'final_capital': self.initial_trade_amount,
- 'total_return': 0.0
- },
- 'logic2': {
- 'successful_trades': 0,
- 'failed_trades': 0,
- 'success_rate': 0.0,
- 'total_profit_loss': 0.0,
- 'final_capital': self.initial_trade_amount,
- 'total_return': 0.0
- }
- }
-
- total_trades = len(self.portfolio_results)
- initial_capital = total_trades * self.initial_trade_amount
-
- # 原逻辑(与影线方向相反交易)统计
- successful_trades_1 = sum(1 for trade in self.portfolio_results if trade['conclusion'] == 'true')
- failed_trades_1 = total_trades - successful_trades_1
- success_rate_1 = (successful_trades_1 / total_trades) * 100 if total_trades > 0 else 0
- total_profit_loss_1 = sum(trade['profit_loss'] for trade in self.portfolio_results)
- final_capital_1 = initial_capital + total_profit_loss_1
- total_return_1 = (total_profit_loss_1 / initial_capital * 100) if initial_capital > 0 else 0
-
- # 新逻辑(与影线方向相同交易)统计
- successful_trades_2 = sum(1 for trade in self.portfolio_results if trade['conclusion2'] == 'true')
- failed_trades_2 = total_trades - successful_trades_2
- success_rate_2 = (successful_trades_2 / total_trades) * 100 if total_trades > 0 else 0
- total_profit_loss_2 = sum(trade['profit_loss2'] for trade in self.portfolio_results)
- final_capital_2 = initial_capital + total_profit_loss_2
- total_return_2 = (total_profit_loss_2 / initial_capital * 100) if initial_capital > 0 else 0
-
- portfolio_summary = {
- 'total_trades': total_trades,
- 'initial_capital': initial_capital,
- 'logic1': {
- 'name': '原逻辑(与影线方向相反交易)',
- 'successful_trades': successful_trades_1,
- 'failed_trades': failed_trades_1,
- 'success_rate': round(success_rate_1, 2),
- 'total_profit_loss': round(total_profit_loss_1, 2),
- 'final_capital': round(final_capital_1, 2),
- 'total_return': round(total_return_1, 2)
- },
- 'logic2': {
- 'name': '新逻辑(与影线方向相同交易)',
- 'successful_trades': successful_trades_2,
- 'failed_trades': failed_trades_2,
- 'success_rate': round(success_rate_2, 2),
- 'total_profit_loss': round(total_profit_loss_2, 2),
- 'final_capital': round(final_capital_2, 2),
- 'total_return': round(total_return_2, 2)
- }
- }
-
- if self.verbose_logging:
- print(f"\n=== 投资组合双向交易逻辑对比分析 ===")
- print(f"总交易次数: {total_trades}")
- print(f"初始资本: {initial_capital:.2f}")
- print(f"\n--- 原逻辑(与影线方向相反交易)---")
- print(f"成功交易: {successful_trades_1}")
- print(f"失败交易: {failed_trades_1}")
- print(f"成功率: {success_rate_1:.2f}%")
- print(f"总盈亏: {total_profit_loss_1:.2f}")
- print(f"最终资本: {final_capital_1:.2f}")
- print(f"总回报率: {total_return_1:.2f}%")
- print(f"\n--- 新逻辑(与影线方向相同交易)---")
- print(f"成功交易: {successful_trades_2}")
- print(f"失败交易: {failed_trades_2}")
- print(f"成功率: {success_rate_2:.2f}%")
- print(f"总盈亏: {total_profit_loss_2:.2f}")
- print(f"最终资本: {final_capital_2:.2f}")
- print(f"总回报率: {total_return_2:.2f}%")
- print(f"\n--- 逻辑对比 ---")
- print(f"成功率差异: {success_rate_2 - success_rate_1:.2f}% (新逻辑 - 原逻辑)")
- print(f"回报率差异: {total_return_2 - total_return_1:.2f}% (新逻辑 - 原逻辑)")
-
- return portfolio_summary
-
- def generate_csv_output(self, enhanced_pattern_dict):
- """
- 生成CSV输出文件
- """
- if self.verbose_logging:
- print("\n=== 步骤6: 生成CSV输出文件 ===")
-
- # 准备CSV数据(按指定顺序排列字段)
- csv_data = []
-
- for contract_code, patterns in enhanced_pattern_dict.items():
- for signal_date, pattern_info in patterns.items():
- csv_data.append({
- # 按更新的字段顺序:contract_code, date, conclusion, conclusion2, body_direction, direction_consistency, price_direction, body_len, hatch_len, hatch/body, close, open, high, low, max_revenue, max_revenue_date, min_revenue, min_revenue_date, max_revenue2, max_revenue_date2, min_revenue2, min_revenue_date2, profit_loss, remaining_amount, trade_amount
- 'contract_code': contract_code,
- 'date': pattern_info['date'],
- 'conclusion': pattern_info['conclusion'],
- 'conclusion2': pattern_info['conclusion2'],
- 'body_direction': pattern_info['body_direction'],
- 'direction_consistency': pattern_info['direction_consistency'],
- 'price_direction': pattern_info['price_direction'], # 新增:基于收盘价与5K移动平均线关系的价格方向
- 'body_len': pattern_info['body_len'],
- 'hatch_len': pattern_info['hatch_len'],
- 'hatch/body': pattern_info['hatch_body_ratio'],
- 'close': pattern_info['close'],
- 'open': pattern_info['open'],
- 'high': pattern_info['high'],
- 'low': pattern_info['low'],
- 'max_revenue': pattern_info['max_revenue'],
- 'max_revenue_date': pattern_info['max_revenue_date'],
- 'min_revenue': pattern_info['min_revenue'],
- 'min_revenue_date': pattern_info['min_revenue_date'],
- 'max_revenue2': pattern_info['max_revenue2'],
- 'max_revenue_date2': pattern_info['max_revenue_date2'],
- 'min_revenue2': pattern_info['min_revenue2'],
- 'min_revenue_date2': pattern_info['min_revenue_date2'],
- 'profit_loss': pattern_info['profit_loss'],
- 'remaining_amount': pattern_info['remaining_amount'],
- 'trade_amount': pattern_info['trade_amount']
- })
-
- if csv_data:
- # 创建DataFrame并保存为CSV,严格按照用户指定的列顺序
- df = pd.DataFrame(csv_data)
-
- # 定义更新的确切列顺序(包含新的price_direction字段)
- column_order = [
- 'contract_code', 'date', 'conclusion', 'conclusion2', 'body_direction',
- 'direction_consistency', 'price_direction', 'body_len', 'hatch_len', 'hatch/body',
- 'close', 'open', 'high', 'low', 'max_revenue', 'max_revenue_date',
- 'min_revenue', 'min_revenue_date', 'max_revenue2', 'max_revenue_date2',
- 'min_revenue2', 'min_revenue_date2', 'profit_loss', 'remaining_amount', 'trade_amount'
- ]
-
- # 重新排列DataFrame的列顺序
- df = df[column_order]
-
- timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
- filename = f'future_candlestick_hatch_analysis_{timestamp}.csv'
- df.to_csv(filename, index=False, encoding=self.output_encoding)
-
- if self.verbose_logging:
- print(f"分析结果已保存至: {filename}")
- print(f"总计 {len(csv_data)} 条记录")
-
- return filename
- else:
- if self.verbose_logging:
- print("没有数据可以输出")
- return None
-
- def run_complete_analysis(self):
- """执行完整的分析流程"""
- if self.verbose_logging:
- print("开始执行期货烛台影线形态分析")
- print("=" * 60)
-
- try:
- # 步骤1: 构建合约数据结构
- contract_structure = self.build_contract_structure()
- if not contract_structure:
- if self.verbose_logging:
- print("未获取到有效合约,分析终止")
- return None
-
- # 步骤2: 收集扩展数据
- extended_data = self.collect_extended_data()
- if not extended_data:
- if self.verbose_logging:
- print("未获取到有效数据,分析终止")
- return None
-
- # 步骤3: 计算实体长度阈值
- threshold_data = self.calculate_body_thresholds(extended_data)
- if not threshold_data:
- if self.verbose_logging:
- print("未能计算有效阈值,分析终止")
- return None
-
- # 步骤4: 识别影线形态
- pattern_dict = self.identify_hatch_patterns(threshold_data)
- if not pattern_dict:
- if self.verbose_logging:
- print("未检测到有效影线形态,分析终止")
- return None
-
- # 步骤5: 分析未来表现
- enhanced_pattern_dict = self.analyze_future_performance(threshold_data, pattern_dict)
-
- # 步骤6: 生成CSV输出
- output_filename = self.generate_csv_output(enhanced_pattern_dict)
-
- # 步骤7: 计算投资组合汇总
- portfolio_summary = self.calculate_portfolio_summary()
-
- # 分析汇总
- total_contracts = len(contract_structure)
- valid_contracts = len(threshold_data)
- total_patterns = sum(len(patterns) for patterns in enhanced_pattern_dict.values())
-
- # 计算模式有效性指标
- effective_patterns = 0
- for contract_patterns in enhanced_pattern_dict.values():
- for pattern_info in contract_patterns.values():
- if pattern_info.get('conclusion') == 'true':
- effective_patterns += 1
-
- effective_rate = (effective_patterns / total_patterns * 100) if total_patterns > 0 else 0
-
- if self.verbose_logging:
- print("\n" + "=" * 60)
- print("分析完成汇总:")
- print(f"总合约数: {total_contracts}")
- print(f"有效合约数: {valid_contracts}")
- print(f"检测形态数: {total_patterns}")
- print(f"有效形态数: {effective_patterns}")
- print(f"有效率: {effective_rate:.1f}%")
- print(f"输出文件: {output_filename}")
-
- return {
- 'contract_structure': contract_structure,
- 'threshold_data': threshold_data,
- 'pattern_results': enhanced_pattern_dict,
- 'output_filename': output_filename,
- 'portfolio_summary': portfolio_summary,
- 'summary': {
- 'total_contracts': total_contracts,
- 'valid_contracts': valid_contracts,
- 'total_patterns': total_patterns,
- 'effective_patterns': effective_patterns,
- 'effective_rate': effective_rate
- }
- }
-
- except Exception as e:
- if self.verbose_logging:
- print(f"分析过程中出现错误: {str(e)}")
- import traceback
- traceback.print_exc()
- return None
- # =====================================================================================
- # 主程序入口
- # =====================================================================================
- def run_candlestick_hatch_analysis(config=None):
- """运行期货烛台影线形态分析"""
- if config is None:
- config = CandlestickHatchConfig
-
- # 打印配置信息
- config.print_config()
-
- # 创建分析器并运行
- analyzer = FutureCandlestickHatchAnalyzer(config)
- results = analyzer.run_complete_analysis()
- return results
- # 执行分析
- if __name__ == "__main__":
- print("期货烛台影线形态双向交易分析工具")
- print("研究带有明显影线的K线对未来价格走势的预测能力")
- print("双向交易逻辑对比测试:")
- print(" 原逻辑:上影线做空,下影线做多(与影线方向相反交易)")
- print(" 新逻辑:上影线做多,下影线做空(与影线方向相同交易)")
- print("适用于聚宽在线研究平台")
-
- results = run_candlestick_hatch_analysis()
-
- if results:
- print("\n✅ 分析执行成功!")
- summary = results['summary']
- portfolio = results['portfolio_summary']
- print(f"📊 结果摘要:")
- print(f" - 分析合约数: {summary['total_contracts']}")
- print(f" - 有效数据合约: {summary['valid_contracts']}")
- print(f" - 检测形态总数: {summary['total_patterns']}")
- print(f" - 有效形态数: {summary['effective_patterns']}")
- print(f" - 有效率: {summary['effective_rate']:.1f}%")
- print(f" - 输出文件: {results['output_filename']}")
- print(f"\n💰 双向交易逻辑对比分析:")
- print(f" - 总交易次数: {portfolio['total_trades']}")
- print(f" - 初始资本: {portfolio['initial_capital']:.2f}")
- print(f"\n 📈 原逻辑(与影线方向相反交易):")
- print(f" 成功率: {portfolio['logic1']['success_rate']:.2f}%")
- print(f" 总盈亏: {portfolio['logic1']['total_profit_loss']:.2f}")
- print(f" 总回报率: {portfolio['logic1']['total_return']:.2f}%")
- print(f"\n 📉 新逻辑(与影线方向相同交易):")
- print(f" 成功率: {portfolio['logic2']['success_rate']:.2f}%")
- print(f" 总盈亏: {portfolio['logic2']['total_profit_loss']:.2f}")
- print(f" 总回报率: {portfolio['logic2']['total_return']:.2f}%")
- print(f"\n 🔍 逻辑对比:")
- success_diff = portfolio['logic2']['success_rate'] - portfolio['logic1']['success_rate']
- return_diff = portfolio['logic2']['total_return'] - portfolio['logic1']['total_return']
- print(f" 成功率差异: {success_diff:+.2f}% (新逻辑 - 原逻辑)")
- print(f" 回报率差异: {return_diff:+.2f}% (新逻辑 - 原逻辑)")
- else:
- print("\n❌ 分析执行失败,请检查错误信息")
|