| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528252925302531253225332534253525362537253825392540254125422543254425452546254725482549255025512552255325542555255625572558255925602561256225632564256525662567256825692570257125722573257425752576257725782579258025812582258325842585258625872588258925902591259225932594259525962597259825992600260126022603260426052606260726082609261026112612261326142615261626172618261926202621262226232624262526262627262826292630263126322633263426352636263726382639264026412642264326442645264626472648264926502651265226532654265526562657265826592660266126622663266426652666266726682669267026712672267326742675267626772678267926802681268226832684268526862687268826892690269126922693269426952696269726982699270027012702270327042705270627072708270927102711271227132714271527162717271827192720272127222723272427252726272727282729273027312732273327342735273627372738273927402741274227432744274527462747274827492750275127522753275427552756275727582759276027612762276327642765276627672768276927702771277227732774277527762777277827792780278127822783278427852786278727882789279027912792279327942795279627972798279928002801280228032804280528062807280828092810281128122813281428152816281728182819282028212822282328242825282628272828282928302831283228332834283528362837283828392840284128422843284428452846284728482849285028512852285328542855285628572858285928602861286228632864286528662867286828692870287128722873287428752876287728782879288028812882288328842885288628872888288928902891289228932894289528962897289828992900290129022903290429052906290729082909291029112912291329142915291629172918291929202921292229232924292529262927292829292930293129322933293429352936293729382939294029412942294329442945294629472948294929502951295229532954295529562957295829592960296129622963296429652966296729682969297029712972297329742975297629772978297929802981298229832984298529862987298829892990299129922993299429952996299729982999300030013002300330043005300630073008300930103011301230133014301530163017301830193020302130223023302430253026302730283029303030313032303330343035303630373038303930403041304230433044304530463047304830493050305130523053305430553056305730583059306030613062306330643065306630673068306930703071307230733074307530763077307830793080308130823083308430853086308730883089309030913092309330943095309630973098309931003101310231033104310531063107310831093110311131123113311431153116311731183119312031213122312331243125312631273128312931303131313231333134313531363137313831393140314131423143314431453146314731483149315031513152315331543155315631573158315931603161316231633164316531663167316831693170317131723173317431753176317731783179318031813182318331843185318631873188318931903191319231933194319531963197319831993200320132023203320432053206320732083209321032113212321332143215321632173218321932203221322232233224322532263227322832293230323132323233323432353236323732383239324032413242324332443245324632473248324932503251325232533254325532563257325832593260326132623263326432653266326732683269327032713272327332743275327632773278327932803281328232833284328532863287328832893290329132923293329432953296329732983299330033013302330333043305330633073308330933103311331233133314331533163317331833193320332133223323332433253326332733283329333033313332333333343335333633373338333933403341334233433344334533463347334833493350335133523353335433553356335733583359336033613362336333643365336633673368336933703371337233733374337533763377337833793380338133823383338433853386338733883389339033913392339333943395339633973398339934003401340234033404340534063407340834093410341134123413341434153416341734183419342034213422342334243425342634273428342934303431343234333434343534363437343834393440344134423443344434453446344734483449345034513452345334543455345634573458345934603461346234633464346534663467346834693470347134723473347434753476347734783479348034813482348334843485348634873488348934903491349234933494349534963497349834993500350135023503350435053506350735083509351035113512351335143515351635173518351935203521352235233524352535263527352835293530353135323533353435353536353735383539354035413542354335443545354635473548354935503551355235533554355535563557355835593560356135623563356435653566356735683569357035713572357335743575357635773578357935803581358235833584358535863587358835893590359135923593359435953596359735983599360036013602360336043605360636073608360936103611361236133614361536163617361836193620362136223623362436253626362736283629363036313632363336343635363636373638363936403641364236433644364536463647364836493650365136523653365436553656365736583659366036613662366336643665366636673668366936703671367236733674367536763677367836793680368136823683368436853686368736883689369036913692369336943695369636973698 |
- """
- 期货网格交易研究分析工具(带主力合约切换)
- 研究期货网格交易策略在不同配置下的表现,支持多种交易场景的对比分析
- 核心功能包括主力合约自动切换、强制平仓和重新建仓逻辑
- 本程序实现完整的网格交易分析流程:
- 1. 主力合约监控与切换 - 自动检测主力合约变化并处理切换
- 2. 合约选择逻辑 - 基于主力合约选择算法
- 3. 基础头寸交易 - 价格-数量网格配置,支持合约切换时重新建仓
- 4. 网格交易策略 - 限价订单网格买入卖出,合约切换时根据价格条件重新建仓
- 5. 统计分析对比 - 多种交易场景性能分析
- 主要特点:
- - 主力合约自动监控:每日检测主力合约变化
- - 强制平仓机制:合约切换时立即平掉旧合约所有头寸
- - 智能重新建仓:根据价格条件在新合约中重新建立头寸
- - 标准期货盈亏计算:使用正确的合约倍数和期货盈亏公式
- - 最终持仓结算:分析期结束时对所有未平仓头寸进行市值计价
- - 完整交易记录:记录所有交易包括合约切换引起的强制平仓
- 期货盈亏计算公式:
- - 多头:(出场价格 - 入场价格) × 合约倍数 × 数量
- - 空头:(入场价格 - 出场价格) × 合约倍数 × 数量
- 注:程序支持多个核心商品同时分析,生成详细的交易记录和统计报告
- 作者: jukuan研究团队
- 日期: 2025-09
- 适用平台: 聚宽在线研究平台
- """
- import pandas as pd
- import numpy as np
- from jqdata import *
- import datetime
- import warnings
- warnings.filterwarnings('ignore')
- # =====================================================================================
- # 分析配置参数 - 集中配置部分
- # =====================================================================================
- class GridTradingConfig:
- """期货网格交易分析配置参数"""
-
- # ==================== 时间范围设置 ====================
- START_DATE = datetime.datetime(2024, 10, 7) # 分析开始日期
- END_DATE = datetime.datetime(2025, 9, 19) # 分析结束日期(修正为9月19日避免数据缺失)
-
- # ==================== 期货合约倍数配置 ====================
- FUTURES_MULTIPLIER = {
- # 贵金属
- 'AU': 1000, # 黄金
- 'AG': 15, # 白银
-
- # 有色金属
- 'CU': 5, 'AL': 5, 'ZN': 5, 'PB': 5, 'NI': 1, 'SN': 1, 'SS': 5,
-
- # 黑色系
- 'RB': 10, 'HC': 10, 'I': 100, 'JM': 100, 'J': 60,
-
- # 能源化工
- 'SP': 10, 'FU': 10, 'BU': 10, 'RU': 10, 'BR': 5, 'SC': 1000,
- 'NR': 10, 'LU': 10, 'LC': 1,
-
- # 化工
- 'FG': 20, 'TA': 5, 'MA': 10, 'SA': 20, 'L': 5, 'V': 5, 'EG': 10,
- 'PP': 5, 'EB': 5, 'PG': 20, 'UR': 20,
-
- # 农产品
- 'RM': 10, 'OI': 10, 'CF': 5, 'SR': 10, 'PF': 5, 'C': 10, 'CS': 10,
- 'CY': 5, 'A': 10, 'B': 10, 'M': 10, 'Y': 10, 'P': 10,
-
- # 股指期货
- 'IF': 300, 'IH': 300, 'IC': 200, 'IM': 200, 'TL': 10000,
-
- # 其他
- 'AP': 10, 'CJ': 5, 'PK': 5, 'JD': 10, 'LH': 16
- }
-
- # ==================== 核心商品配置 ====================
- CORE_COMMODITIES = {
- # 'SA': ['SA2501.XZCE', 'SA2505.XZCE', 'SA2509.XZCE', 'SA2601.XZCE'], # 纯碱
- # 'M': ['M2501.XDCE', 'M2505.XDCE', 'M2509.XDCE', 'M2605.XDCE'], # 豆粕
- # 'UR': ['UR2501.XZCE', 'UR2505.XZCE', 'UR2509.XZCE', 'UR2601.XZCE'], # 尿素
- # 'LH': ['LH2501.XDCE', 'LH2505.XDCE', 'LH2509.XDCE', 'LH2601.XDCE'], # 生猪
- 'TL': ['TL2503.CCFX', 'TL2506.CCFX', 'TL2509.CCFX', 'TL2512.CCFX'] # 30年期国债
- }
-
- # ==================== 合约切换配置 ====================
- REQUIRED_TRADING_DAYS = 30 # 合约切换前需要的最少有效交易日数
-
- # ==================== 基础头寸交易配置 ====================
- BASE_POSITION_GRID = {
- 'SA': {1400: 4, 1300: 6, 1200: 8, 1100: 12, 1000: 14, 900: 16},
- 'M': {2800: 4, 2750: 6, 2700: 8, 2650: 12, 2600: 14, 2550: 16},
- 'UR': {1750: 4, 1700: 6, 1650: 8, 1600: 12, 1550: 14, 1500: 16},
- 'LH': {13000: 1, 12500: 1, 12000: 1, 11500: 1, 11000: 2},
- 'TL': {118: 1, 117: 1, 116: 1, 115: 1, 114: 2, 113: 2},
- }
-
- # 统一退出价格(无止损)
- BASE_POSITION_EXIT_PRICE = {
- 'SA': 1500,
- 'M': 3800,
- 'UR': 2400,
- 'LH': 20000,
- 'TL': 121,
- }
-
- # ==================== 网格交易配置 ====================
- GRID_TRADING_CONFIG = {
- 'SA': {
- 'start_price': 1250, # 开始价格
- 'grid_size': 50, # 网格大小
- 'quantity_per_grid': 5, # 每网格数量
- 'exit_grid_size': 50 # 退出网格大小
- },
- 'M': {
- 'start_price': 2800,
- 'grid_size': 100,
- 'quantity_per_grid': 10,
- 'exit_grid_size': 100
- },
- 'UR': {
- 'start_price': 1800,
- 'grid_size': 50,
- 'quantity_per_grid': 10,
- 'exit_grid_size': 50
- },
- 'LH': {
- 'start_price': 13500,
- 'grid_size': 500,
- 'quantity_per_grid': 1,
- 'exit_grid_size': 500
- },
- 'TL': {
- 'start_price': 118,
- 'grid_size': 1,
- 'quantity_per_grid': 1,
- 'exit_grid_size': 1
- },
- }
-
- # ==================== 对冲策略配置 ====================
- HEDGE_ENABLED = True # 是否启用对冲策略
- HEDGE_USE_ATR_FILTER = False # 是否使用ATR波动率过滤来控制对冲建仓
- HEDGE_PROFIT_PULLBACK_PCT = 0.3 # 止盈回撤百分比 (m%)
- HEDGE_STOPLOSS_PCT = 0.01 # 止损百分比 (n%)
- HEDGE_ATR_PERIOD = 14 # ATR计算周期
- HEDGE_ATR_LOOKBACK_MONTHS = 3 # ATR回看月数
- HEDGE_ATR_THRESHOLD = 0.7 # ATR阈值系数
-
- # ==================== 对冲止盈策略配置 ====================
- HEDGE_LEVEL1_RECOVERY_PCT = 0.30 # 一级止盈回升百分比(30%)
- HEDGE_LEVEL2_RECOVERY_PCT = 0.60 # 二级止盈回升百分比(60%)
- HEDGE_COST_AREA_PCT = 0.02 # 成本区域百分比(2%)
-
- # ==================== 输出设置 ====================
- OUTPUT_ENCODING = 'utf-8-sig' # 输出文件编码格式
- VERBOSE_LOGGING = True # 是否打印详细日志
-
- @classmethod
- def print_config(cls):
- """打印当前配置信息"""
- print("=== 期货网格交易分析配置 ===")
- print(f"分析时间范围: {cls.START_DATE.strftime('%Y-%m-%d')} 至 {cls.END_DATE.strftime('%Y-%m-%d')}")
- print(f"核心商品数量: {len(cls.CORE_COMMODITIES)}")
- print("核心商品列表:")
- for commodity, contracts in cls.CORE_COMMODITIES.items():
- print(f" {commodity}: {contracts}")
- print(f"\n网格交易配置:")
- for commodity, config in cls.GRID_TRADING_CONFIG.items():
- print(f" {commodity}: 起始价{config['start_price']}, 网格大小{config['grid_size']}")
- print(f"\n对冲策略配置:")
- print(f" 对冲策略启用: {'是' if cls.HEDGE_ENABLED else '否'}")
- if cls.HEDGE_ENABLED:
- print(f" ATR波动率过滤: {'启用' if cls.HEDGE_USE_ATR_FILTER else '禁用'}")
- print(f" 止盈回撤百分比: {cls.HEDGE_PROFIT_PULLBACK_PCT:.1%}")
- print(f" 止损百分比: {cls.HEDGE_STOPLOSS_PCT:.1%}")
- print(f" 一级止盈回升百分比: {cls.HEDGE_LEVEL1_RECOVERY_PCT:.1%}")
- print(f" 二级止盈回升百分比: {cls.HEDGE_LEVEL2_RECOVERY_PCT:.1%}")
- print(f" 成本区域百分比: {cls.HEDGE_COST_AREA_PCT:.1%}")
- print(f"\n详细日志: {'开启' if cls.VERBOSE_LOGGING else '关闭'}")
- print("=" * 50)
- class FutureGridTradingAnalyzer:
- """期货网格交易分析器"""
-
- def __init__(self, config=None):
- """初始化分析器"""
- if config is None:
- config = GridTradingConfig
-
- self.config = config
- self.start_date = config.START_DATE
- self.end_date = config.END_DATE
- self.core_commodities = config.CORE_COMMODITIES
- self.base_position_grid = config.BASE_POSITION_GRID
- self.base_position_exit_price = config.BASE_POSITION_EXIT_PRICE
- self.grid_trading_config = config.GRID_TRADING_CONFIG
- self.verbose_logging = config.VERBOSE_LOGGING
- self.output_encoding = config.OUTPUT_ENCODING
-
- # 存储结果的字典
- self.selected_contracts = {} # 选中的合约
- self.price_data = {} # 价格数据
- self.dominant_contract_history = {} # 主力合约历史变化
- self.active_positions = { # 当前活跃头寸跟踪
- 'base_position': {},
- 'grid_trading': {},
- 'hedge_position': {} # 对冲头寸追踪
- }
- self.trading_results = { # 交易场景的结果
- 'base_position': [],
- 'grid_trading': [],
- 'hedge_position': [] # 对冲交易记录
- }
- self.hedge_state = {} # 对冲策略状态追踪
- self.ma_extended_data_cache = {} # MA扩展数据缓存
-
- if self.verbose_logging:
- print("初始化期货网格交易分析器")
- print(f"核心商品: {list(self.core_commodities.keys())}")
- print(f"分析期间: {self.start_date.strftime('%Y-%m-%d')} - {self.end_date.strftime('%Y-%m-%d')}")
-
- def select_contracts(self):
- """
- 合约选择逻辑
- 1. 首先获取商品的主导合约
- 2. 如果主导合约在可用列表中,选择它
- 3. 如果主导合约不在列表中,选择未来到期日期最近且晚于主导合约的合约
- """
- if self.verbose_logging:
- print("\n=== 步骤1: 合约选择逻辑 ===")
-
- for commodity, available_contracts in self.core_commodities.items():
- if self.verbose_logging:
- print(f"\n处理商品: {commodity}")
- print(f"可用合约: {available_contracts}")
-
- try:
- # 获取商品的主导合约
- dominant_contract = get_dominant_future(commodity, self.start_date.date())
-
- if self.verbose_logging:
- print(f"主导合约: {dominant_contract}")
-
- if dominant_contract in available_contracts:
- # 主导合约在可用列表中,直接选择
- selected_contract = dominant_contract
- if self.verbose_logging:
- print(f"选择主导合约: {selected_contract}")
- else:
- # 主导合约不在列表中,选择最近的未来合约
- selected_contract = self._select_nearest_future_contract(
- commodity, dominant_contract, available_contracts
- )
- if self.verbose_logging:
- print(f"选择最近的未来合约: {selected_contract}")
-
- self.selected_contracts[commodity] = selected_contract
-
- except Exception as e:
- if self.verbose_logging:
- print(f"获取{commodity}主导合约失败: {str(e)}")
- # 默认选择第一个可用合约
- self.selected_contracts[commodity] = available_contracts[0]
- if self.verbose_logging:
- print(f"默认选择第一个合约: {available_contracts[0]}")
-
- if self.verbose_logging:
- print(f"\n合约选择完成,共选择{len(self.selected_contracts)}个合约")
- for commodity, contract in self.selected_contracts.items():
- print(f" {commodity}: {contract}")
-
- return self.selected_contracts
-
- def _select_nearest_future_contract(self, commodity, dominant_contract, available_contracts):
- """选择最近的未来到期合约"""
- if not dominant_contract:
- return available_contracts[0]
-
- # 解析主导合约的到期月份
- try:
- # 提取合约代码中的月份信息 (例如 SA2507 -> 2507)
- dominant_year_month = dominant_contract.split('.')[0][-4:] # 取最后4位
- dominant_year = int(dominant_year_month[:2]) + 2000 # 假设是21世纪
- dominant_month = int(dominant_year_month[2:])
- except:
- return available_contracts[0]
-
- # 找到最近的未来合约
- best_contract = available_contracts[0]
- best_diff = float('inf')
-
- for contract in available_contracts:
- try:
- contract_year_month = contract.split('.')[0][-4:]
- contract_year = int(contract_year_month[:2]) + 2000
- contract_month = int(contract_year_month[2:])
-
- # 计算月份差异
- contract_total_months = contract_year * 12 + contract_month
- dominant_total_months = dominant_year * 12 + dominant_month
-
- # 只选择晚于主导合约的合约
- if contract_total_months > dominant_total_months:
- diff = contract_total_months - dominant_total_months
- if diff < best_diff:
- best_diff = diff
- best_contract = contract
- except:
- continue
-
- return best_contract
-
- def _get_futures_multiplier(self, commodity):
- """获取期货合约倍数"""
- return self.config.FUTURES_MULTIPLIER.get(commodity, 10) # 默认倍数为10
-
- def _calculate_futures_pnl(self, entry_price, exit_price, quantity, commodity, is_long=True):
- """
- 计算期货盈亏
-
- 参数:
- entry_price: 入场价格
- exit_price: 出场价格
- quantity: 数量(手数)
- commodity: 商品代码
- is_long: 是否多头,True为多头,False为空头
-
- 返回:
- 实际盈亏金额
- """
- multiplier = self._get_futures_multiplier(commodity)
-
- if is_long:
- # 多头:(出场价格 - 入场价格) × 合约倍数 × 数量
- pnl = (exit_price - entry_price) * multiplier * quantity
- else:
- # 空头:(入场价格 - 出场价格) × 合约倍数 × 数量
- pnl = (entry_price - exit_price) * multiplier * quantity
-
- return pnl
-
- def build_dominant_contract_history(self):
- """
- 构建主力合约历史变化记录
- 为每个商品在整个分析期间构建主力合约变化的时间序列
- 只有当合约真正发生变化时才记录为合约切换
- """
- if self.verbose_logging:
- print("\n=== 步骤2:构建主力合约历史变化记录 ===")
-
- for commodity in self.core_commodities.keys():
- if self.verbose_logging:
- print(f"构建 {commodity} 主力合约历史...")
-
- contract_history = []
- current_date = self.start_date.date()
- end_date = self.end_date.date()
- current_selected_contract = None # 跟踪选择的合约而不是主力合约
-
- while current_date <= end_date:
- # 跳过非交易日
- if current_date.weekday() >= 5: # 周六周日
- current_date += datetime.timedelta(days=1)
- continue
-
- try:
- # 获取当日主力合约
- dominant_contract = get_dominant_future(commodity, current_date)
- # print(f"日期: {current_date}, 主力合约: {dominant_contract}")
- selected_contract = self._match_to_available_contract(commodity, dominant_contract)
-
- # 只有当选择的合约真正发生变化时才记录
- if selected_contract != current_selected_contract:
- contract_history.append({
- 'date': current_date,
- 'dominant_contract': dominant_contract,
- 'selected_contract': selected_contract,
- 'is_initial': current_selected_contract is None # 标记是否为初始合约
- })
-
- if self.verbose_logging:
- if current_selected_contract is None:
- print(f" {current_date}: 初始合约设置为 {selected_contract}")
- else:
- print(f" {current_date}: 合约切换 {current_selected_contract} -> {selected_contract}")
-
- current_selected_contract = selected_contract
-
- except Exception as e:
- if self.verbose_logging:
- print(f" 获取 {current_date} 的主力合约时出错: {str(e)}")
-
- current_date += datetime.timedelta(days=1)
-
- self.dominant_contract_history[commodity] = contract_history
-
- if self.verbose_logging:
- total_changes = sum(len(history) for history in self.dominant_contract_history.values())
- actual_switches = sum(
- sum(1 for change in history if not change.get('is_initial', False))
- for history in self.dominant_contract_history.values()
- )
- initial_setups = total_changes - actual_switches
- print(f"主力合约历史构建完成,共 {total_changes} 次记录({initial_setups} 次初始设置,{actual_switches} 次真实切换)")
-
- return self.dominant_contract_history
-
- def _match_to_available_contract(self, commodity, dominant_contract):
- """将主力合约匹配到可用合约列表"""
- available_contracts = self.core_commodities.get(commodity, [])
-
- if dominant_contract in available_contracts:
- return dominant_contract
- else:
- return self._select_nearest_future_contract(commodity, dominant_contract, available_contracts)
-
- def collect_price_data(self):
- """收集所有可能用到的合约价格数据(优化日期范围)"""
- if self.verbose_logging:
- print("\n=== 步骤3: 收集价格数据(优化日期范围) ===")
-
- # 清除之前的调整建议
- if hasattr(self, 'adjustment_suggestions'):
- self.adjustment_suggestions = []
-
- # 为每个商品创建数据存储结构
- for commodity in self.core_commodities.keys():
- print(f'收集{commodity}的价格数据:')
- self.price_data[commodity] = {}
-
- # 根据主力合约历史确定每个合约的数据获取范围
- contract_date_ranges = self._determine_contract_date_ranges(commodity)
-
- for contract, date_range in contract_date_ranges.items():
- start_date, end_date = date_range
-
- if self.verbose_logging:
- print(f"获取 {contract} 价格数据...")
- print(f" 优化日期范围: {start_date} 至 {end_date}")
-
- try:
- # 获取价格数据(使用优化的日期范围)
- data = get_price(
- contract,
- start_date=start_date,
- end_date=end_date,
- frequency='daily',
- fields=['open', 'close', 'high', 'low', 'volume'],
- skip_paused=False,
- panel=False
- )
-
- if data is not None and len(data) > 0:
- # print(f"第一条有数据的日期是: {data.index[0].date()},数据是: {data.iloc[0]}")
- # print(f"最后一条有数据的日期是: {data.index[-1].date()}, 数据是: {data.iloc[-1]}")
- self.price_data[commodity][contract] = data
- # 检查这个数据里有多少条空值数据
- empty_data = data[data.isna().any(axis=1)]
-
- # 检查有效交易日数据并收集调整建议
- adjustment_info = self._check_thirty_day_trading_data(commodity, contract, data, start_date, end_date)
- if adjustment_info and adjustment_info.get('needs_adjustment'):
- # 暂存调整建议,稍后统一处理
- if not hasattr(self, 'adjustment_suggestions'):
- self.adjustment_suggestions = []
- self.adjustment_suggestions.append(adjustment_info)
-
- if self.verbose_logging:
- print(f" ✅ 成功获取{len(data)}条数据记录")
- print(f" 空值数据: {len(empty_data)}条")
- print(f" 价格范围: {data['low'].min():.2f} - {data['high'].max():.2f}")
- print(f" 数据日期范围: {data.index[0].date()} 至 {data.index[-1].date()}")
- else:
- if self.verbose_logging:
- print(f" ⚠️ 未获取到{contract}的数据")
-
- # 如果优化日期范围没有数据,尝试使用更宽泛的日期范围
- if self.verbose_logging:
- print(f" 尝试使用更宽泛的日期范围获取数据...")
-
- try:
- fallback_data = get_price(
- contract,
- start_date=self.start_date,
- end_date=self.end_date,
- frequency='daily',
- fields=['open', 'close', 'high', 'low', 'volume'],
- skip_paused=False,
- panel=False
- )
-
- if fallback_data is not None and len(fallback_data) > 0:
- self.price_data[commodity][contract] = fallback_data
-
- # 检查有效交易日数据并收集调整建议(回退方案)
- adjustment_info = self._check_thirty_day_trading_data(commodity, contract, fallback_data, self.start_date, self.end_date)
- if adjustment_info and adjustment_info.get('needs_adjustment'):
- if not hasattr(self, 'adjustment_suggestions'):
- self.adjustment_suggestions = []
- self.adjustment_suggestions.append(adjustment_info)
-
- if self.verbose_logging:
- print(f" ✅ 回退方案成功获取{len(fallback_data)}条数据记录")
- print(f" 数据日期范围: {fallback_data.index[0].date()} 至 {fallback_data.index[-1].date()}")
- else:
- if self.verbose_logging:
- print(f" ❌ 回退方案也未获取到{contract}的数据")
- except Exception as fallback_e:
- if self.verbose_logging:
- print(f" ❌ 回退方案出错: {str(fallback_e)}")
-
- except Exception as e:
- if self.verbose_logging:
- print(f" ❌ 获取{contract}数据时出错: {str(e)}")
- continue
-
- # 处理动态调整建议
- if hasattr(self, 'adjustment_suggestions') and self.adjustment_suggestions:
- self._apply_dynamic_adjustments()
-
- if self.verbose_logging:
- total_contracts = sum(len(contracts) for contracts in self.price_data.values())
- print(f"价格数据收集完成,共{total_contracts}个合约")
-
- return self.price_data
-
- def _determine_contract_date_ranges(self, commodity):
- """
- 根据主力合约历史确定每个合约的最优数据获取日期范围
- """
- contract_ranges = {}
-
- if commodity not in self.dominant_contract_history:
- # 如果没有主力合约历史,使用全范围
- for contract in self.core_commodities[commodity]:
- contract_ranges[contract] = (self.start_date, self.end_date)
- return contract_ranges
-
- contract_history = self.dominant_contract_history[commodity]
-
- # 分析每个合约的活跃期间
- for contract in self.core_commodities[commodity]:
- contract_start = self.start_date
- contract_end = self.end_date
-
- # 查找该合约在主力合约历史中的使用时间段
- for i, history_record in enumerate(contract_history):
- if history_record['selected_contract'] == contract:
- # 该合约开始使用的日期
- if history_record.get('is_initial', False):
- # 初始设置的合约,从分析开始日期或历史记录日期开始
- contract_start = max(self.start_date.date(), history_record['date'])
- else:
- # 切换到的合约,从切换日期开始
- contract_start = history_record['date']
-
- # 查找该合约结束使用的日期
- for j in range(i + 1, len(contract_history)):
- next_record = contract_history[j]
- if next_record['selected_contract'] != contract:
- # 找到下一次切换,该合约在此日期结束使用
- contract_end = next_record['date']
- break
- else:
- # 该合约一直使用到分析结束
- contract_end = self.end_date.date()
-
- break
-
- # 转换为datetime格式并添加缓冲区
- if isinstance(contract_start, datetime.date):
- contract_start = datetime.datetime.combine(contract_start, datetime.time.min)
- if isinstance(contract_end, datetime.date):
- contract_end = datetime.datetime.combine(contract_end, datetime.time.max)
-
- # 添加缓冲期以确保有足够的历史数据满足最低交易日要求
- # 使用REQUIRED_TRADING_DAYS作为缓冲,保证数据充足性
- contract_start_buffered = contract_start - datetime.timedelta(days=self.config.REQUIRED_TRADING_DAYS)
- contract_end_buffered = contract_end # + datetime.timedelta(days=self.config.REQUIRED_TRADING_DAYS)
-
- # 确保不超出总体分析范围
- contract_start_final = max(contract_start_buffered, self.start_date)
- contract_end_final = min(contract_end_buffered, self.end_date)
-
- contract_ranges[contract] = (contract_start_final, contract_end_final)
-
- if self.verbose_logging:
- print(f" {contract}: {contract_start_final.date()} 至 {contract_end_final.date()}")
-
- return contract_ranges
-
- def _check_thirty_day_trading_data(self, commodity, contract, data, start_date, end_date):
- """
- 检查合约是否有足够的有效交易日数据并进行动态调整
- 返回调整建议信息
- """
- if data is None or len(data) == 0:
- print(f" ⚠️ {contract}: 无价格数据")
- return None
-
- required_days = self.config.REQUIRED_TRADING_DAYS
-
- # 检查空值数据
- empty_data = data[data.isna().any(axis=1)]
- empty_count = len(empty_data)
-
- # 过滤出非空的收盘价数据
- valid_close_data = data['close'].dropna()
- valid_count = len(valid_close_data)
-
- print(f" 📊 {contract}: 有效收盘价数据共{valid_count}天")
-
- adjustment_info = {
- 'contract': contract,
- 'commodity': commodity,
- 'empty_count': empty_count,
- 'valid_count': valid_count,
- 'required_days': required_days,
- 'needs_adjustment': False,
- 'suggested_switch_date': None
- }
-
- # 检查是否有空值数据且需要调整
- if empty_count > 0:
- print(f" ⚠️ {contract}: 检测到{empty_count}条空值数据")
-
- if valid_count >= required_days:
- # 找到第N个有效收盘价的日期
- nth_date = valid_close_data.index[required_days - 1] # 索引从0开始
- nth_price = valid_close_data.iloc[required_days - 1]
-
- print(f" 📍 {contract}: 第{required_days}个有效收盘价日期为{nth_date.date()},价格{nth_price:.2f}")
-
- # 检查当前切换日期是否需要调整
- if commodity in self.dominant_contract_history:
- for history_record in self.dominant_contract_history[commodity]:
- if (history_record['selected_contract'] == contract and
- not history_record.get('is_initial', False)):
- current_switch_date = history_record['date']
-
- # 转换日期格式进行比较
- if isinstance(current_switch_date, datetime.date):
- current_switch_datetime = datetime.datetime.combine(current_switch_date, datetime.time.min)
- else:
- current_switch_datetime = current_switch_date
-
- if nth_date > current_switch_datetime:
- print(f" ❌ {contract}: 切换日期过早(当前:{current_switch_date}),建议调整至{nth_date.date()}")
- adjustment_info.update({
- 'needs_adjustment': True,
- 'suggested_switch_date': nth_date.date(),
- 'current_switch_date': current_switch_date
- })
- else:
- print(f" ✅ {contract}: 切换日期{current_switch_date}合理,在第{required_days}个有效交易日之后")
- break
- else:
- print(f" ❌ {contract}: 有效交易日不足{required_days}天(仅{valid_count}天),不符合切换要求")
- adjustment_info['needs_adjustment'] = True
- else:
- # 没有空值数据,检查是否有足够的交易日
- if valid_count >= required_days:
- nth_date = valid_close_data.index[required_days - 1]
- nth_price = valid_close_data.iloc[required_days - 1]
- print(f" ✅ {contract}: 第{required_days}个有效收盘价日期为{nth_date.date()},价格{nth_price:.2f}")
- else:
- print(f" ❌ {contract}: 有效交易日不足{required_days}天(仅{valid_count}天)")
-
- return adjustment_info
-
- def _apply_dynamic_adjustments(self):
- """应用动态调整建议,更新合约切换日期并重新获取数据"""
- if self.verbose_logging:
- print(f"\n=== 应用动态调整建议(共{len(self.adjustment_suggestions)}个) ===")
-
- adjustments_applied = []
-
- for suggestion in self.adjustment_suggestions:
- if suggestion.get('suggested_switch_date'):
- commodity = suggestion['commodity']
- contract = suggestion['contract']
- new_switch_date = suggestion['suggested_switch_date']
-
- print(f"📅 调整{commodity}的{contract}切换日期至{new_switch_date}")
-
- # 更新合约历史
- if self._update_contract_switch_date(commodity, contract, new_switch_date):
- adjustments_applied.append(suggestion)
-
- # 如果有调整,重新获取相关的价格数据
- if adjustments_applied:
- print(f"✅ 完成{len(adjustments_applied)}个调整,重新获取相关价格数据")
- self._refresh_price_data_for_adjustments(adjustments_applied)
-
- def _update_contract_switch_date(self, commodity, contract, new_switch_date):
- """更新指定合约的切换日期"""
- if commodity not in self.dominant_contract_history:
- return False
-
- # 查找并更新对应的历史记录
- for history_record in self.dominant_contract_history[commodity]:
- if (history_record['selected_contract'] == contract and
- not history_record.get('is_initial', False)):
- old_date = history_record['date']
- history_record['date'] = new_switch_date
- print(f" 📝 {contract}: 切换日期从{old_date}更新为{new_switch_date}")
- return True
-
- return False
-
- def _refresh_price_data_for_adjustments(self, adjustments):
- """为调整的合约重新获取价格数据"""
- affected_commodities = set()
-
- for adjustment in adjustments:
- commodity = adjustment['commodity']
- affected_commodities.add(commodity)
-
- for commodity in affected_commodities:
- print(f"🔄 重新获取{commodity}的价格数据...")
-
- # 重新计算日期范围
- contract_date_ranges = self._determine_contract_date_ranges(commodity)
-
- # 重新获取每个合约的数据
- for contract, date_range in contract_date_ranges.items():
- start_date, end_date = date_range
-
- try:
- # 获取价格数据(使用新的日期范围)
- data = get_price(
- contract,
- start_date=start_date,
- end_date=end_date,
- frequency='daily',
- fields=['open', 'close', 'high', 'low', 'volume'],
- skip_paused=False,
- panel=False
- )
-
- if data is not None and len(data) > 0:
- self.price_data[commodity][contract] = data
-
- # 检查调整后的数据
- empty_data = data[data.isna().any(axis=1)]
- empty_count = len(empty_data)
-
- print(f" ✅ {contract}: 重新获取{len(data)}条数据记录,空值{empty_count}条")
-
- if empty_count == 0:
- print(f" 🎉 {contract}: 空值数据已消除")
-
- except Exception as e:
- print(f" ❌ 重新获取{contract}数据时出错: {str(e)}")
- def simulate_with_contract_switching(self):
- """
- 模拟带有主力合约切换逻辑的交易
- """
- if self.verbose_logging:
- print("\n=== 步骤3: 带合约切换的交易模拟 ===")
-
- # 按日期顺序处理所有交易日
- current_date = self.start_date.date()
- end_date = self.end_date.date()
-
- while current_date <= end_date:
- # 跳过非交易日
- if current_date.weekday() >= 5:
- current_date += datetime.timedelta(days=1)
- continue
-
- # 检查每个商品的主力合约切换
- for commodity in self.core_commodities.keys():
- self._check_and_handle_contract_switch(commodity, current_date)
-
- # 处理正常的交易逻辑
- self._process_daily_trading(current_date)
-
- current_date += datetime.timedelta(days=1)
-
- # 在交易循环结束后,计算所有未平仓头寸的最终盈亏
- self._calculate_final_positions_pnl()
-
- if self.verbose_logging:
- print("带合约切换的交易模拟完成")
-
- def _calculate_final_positions_pnl(self):
- """
- 计算分析期结束时所有未平仓头寸的最终盈亏
- 将这些盈亏作为最终交易记录加入结果中
- """
- if self.verbose_logging:
- print("\n=== 计算最终持仓盈亏 ===")
-
- final_date = self.end_date.date()
- final_pnl_records = []
-
- # 添加诊断信息
- if self.verbose_logging:
- print(f"分析结束日期: {final_date}")
- print(f"活跃头寸概览:")
- for strategy_name in ['base_position', 'grid_trading', 'hedge_position']:
- strategy_positions = self.active_positions.get(strategy_name, {})
- total_positions = 0
- open_positions = 0
- for commodity, positions in strategy_positions.items():
- commodity_total = len(positions)
- commodity_open = sum(1 for p in positions.values() if p['status'] == 'open')
- total_positions += commodity_total
- open_positions += commodity_open
- if commodity_total > 0:
- print(f" {strategy_name} - {commodity}: 总计 {commodity_total} 个头寸, 未平仓 {commodity_open} 个")
-
- # 详细列出所有头寸信息
- print(f" 详细头寸列表:")
- for pos_id, pos_info in positions.items():
- status = pos_info.get('status', 'Unknown')
- entry_price = pos_info.get('entry_price', 'N/A')
- contract = pos_info.get('contract', 'N/A')
- entry_date = pos_info.get('entry_date', 'N/A')
- quantity = pos_info.get('quantity', 'N/A')
- print(f" {pos_id}: 状态={status}, 合约={contract}, 开仓价格={entry_price}, 日期={entry_date}, 数量={quantity}")
-
- print(f" {strategy_name} 策略总计: {open_positions}/{total_positions} 个未平仓头寸")
-
- # 验证头寸计数的准确性
- actual_count = len(positions)
- open_count_verify = len([p for p in positions.values() if p.get('status') == 'open'])
-
- if actual_count != commodity_total or open_count_verify != commodity_open:
- print(f" ⚠️ 计数不匹配!实际头寸数: {actual_count}, 预期: {commodity_total}; 实际未平仓: {open_count_verify}, 预期: {commodity_open}")
-
- # 检查是否有重复的开仓价格(同一合约同一状态)
- open_positions_by_price = {}
- for pos_id, pos_info in positions.items():
- if pos_info.get('status') == 'open':
- price = pos_info.get('entry_price')
- contract = pos_info.get('contract')
- key = f"{contract}_{price}"
- if key not in open_positions_by_price:
- open_positions_by_price[key] = []
- open_positions_by_price[key].append(pos_id)
-
- # for key, pos_ids in open_positions_by_price.items():
- # if len(pos_ids) > 1:
- # print(f" ⚠️ 发现重复的未平仓头寸: {key} -> {pos_ids}")
-
- print(f" {strategy_name} 策略总计: {open_positions}/{total_positions} 个未平仓头寸")
-
- for strategy_name in ['base_position', 'grid_trading', 'hedge_position']:
- strategy_positions = self.active_positions.get(strategy_name, {})
-
- for commodity, positions in strategy_positions.items():
- # 获取当前合约和最终价格
- current_contract = self._get_current_contract(commodity, final_date)
- if not current_contract:
- if self.verbose_logging:
- print(f" 警告: 无法确定 {commodity} 在 {final_date} 的当前合约")
- continue
-
- final_price = self._get_price_on_date(commodity, current_contract, final_date, 'close')
- if final_price is None:
- if self.verbose_logging:
- print(f" 警告: 无法获取 {commodity} {current_contract} 在 {final_date} 的价格")
- continue
-
- if self.verbose_logging and len(positions) > 0:
- print(f" {commodity} {strategy_name}: 当前合约 {current_contract}, 结算价格 {final_price:.2f}")
-
- for position_id, position in positions.items():
- if self.verbose_logging:
- print(f" 检查头寸 {position_id}: 状态={position['status']}, 合约={position['contract']}, 开仓价格={position.get('entry_price', 'N/A')}")
-
- if position['status'] == 'open' and position['contract'] == current_contract:
- if self.verbose_logging:
- print(f" 匹配头寸进行结算: {position_id}")
- print(f" 头寸详情: 开仓日期={position.get('entry_date', 'N/A')}, 开仓价格={position['entry_price']}, 数量={position.get('quantity', 'N/A')}")
-
- # 判断是否为空头(对冲头寸)
- is_long = not position.get('is_short', False)
-
- # 计算最终盈亏
- profit_loss = self._calculate_futures_pnl(
- position['entry_price'], final_price, position['quantity'], commodity, is_long=is_long
- )
-
- if is_long:
- profit_loss_pct = (final_price - position['entry_price']) / position['entry_price']
- else:
- profit_loss_pct = (position['entry_price'] - final_price) / position['entry_price']
-
- # 计算持有天数
- entry_date = datetime.datetime.strptime(position['entry_date'], '%Y-%m-%d').date()
- days_held = (final_date - entry_date).days
-
- # 创建最终持仓盈亏记录
- final_record = {
- 'commodity': commodity,
- 'contract': current_contract,
- 'strategy': strategy_name,
- 'entry_date': position['entry_date'],
- 'exit_date': final_date.strftime('%Y-%m-%d'),
- 'entry_price': position['entry_price'],
- 'exit_price': final_price,
- 'quantity': position['quantity'],
- 'profit_loss': profit_loss,
- 'profit_loss_pct': profit_loss_pct,
- 'days_held': days_held,
- 'exit_reason': 'final_settlement'
- }
-
- # 如果是对冲头寸,添加is_short标记
- if strategy_name == 'hedge_position':
- final_record['is_short'] = True
-
- if self.verbose_logging:
- print(f" 创建最终结算记录: 头寸ID={position_id}, 开仓价格={position['entry_price']}, 结算价格={final_price:.2f}")
-
- final_pnl_records.append(final_record)
-
- # 将头寸标记为已平仓
- self.active_positions[strategy_name][commodity][position_id]['status'] = 'final_settled'
-
- if self.verbose_logging:
- print(f" {commodity} {strategy_name} 最终结算: {position['entry_price']} -> {final_price:.2f}, 盈亏: {profit_loss:.2f}")
-
- # 将最终盈亏记录添加到交易结果中
- for record in final_pnl_records:
- strategy_name = record['strategy']
- self.trading_results[strategy_name].append(record)
-
-
- if self.verbose_logging:
- total_final_records = len(final_pnl_records)
- total_final_pnl = sum(record['profit_loss'] for record in final_pnl_records)
- print(f"最终持仓结算完成,共 {total_final_records} 个头寸,总未实现盈亏: {total_final_pnl:.2f}")
-
- # 显示所有最终结算记录的详情
- if final_pnl_records:
- print(f"最终结算记录详情:")
- for i, record in enumerate(final_pnl_records, 1):
- print(f" {i}. {record['commodity']} {record['strategy']}: {record['entry_price']} -> {record['exit_price']:.2f}, 盈亏: {record['profit_loss']:.2f}, 合约: {record['contract']}")
-
- def _check_and_handle_contract_switch(self, commodity, current_date):
- """
- 检查并处理主力合约切换
- 只有真正的合约切换才会触发平仓和重新建仓,初始设置不会
- """
- if commodity not in self.dominant_contract_history:
- return
-
- # 检查当天是否有合约变化
- contract_changes = self.dominant_contract_history[commodity]
- for change in contract_changes:
- if change['date'] == current_date:
- # 检查是否为初始合约设置
- if change.get('is_initial', False):
- # 初始合约设置,不需要平仓和重新建仓,只需要启动正常交易逻辑
- if self.verbose_logging:
- print(f"\n{current_date}: {commodity} 初始合约设置为 {change['selected_contract']}")
- return
-
- # 真正的合约切换
- old_contract = self._get_current_contract(commodity, current_date - datetime.timedelta(days=1))
- new_contract = change['selected_contract']
-
- if self.verbose_logging:
- print(f"\n{current_date}: {commodity} 合约切换 {old_contract} -> {new_contract}")
-
- # 平掉旧合约的所有头寸
- self._close_all_positions_on_switch(commodity, old_contract, current_date)
-
- # 在新合约中重新建仓
- self._reestablish_positions_in_new_contract(commodity, new_contract, current_date)
-
- break
-
- def _get_current_contract(self, commodity, date):
- """获取指定日期的当前合约"""
- if commodity not in self.dominant_contract_history:
- return None
-
- contract_changes = self.dominant_contract_history[commodity]
- current_contract = None
-
- for change in contract_changes:
- if change['date'] <= date:
- current_contract = change['selected_contract']
- else:
- break
-
- return current_contract
-
- def _close_all_positions_on_switch(self, commodity, old_contract, switch_date):
- """
- 在合约切换时平掉旧合约的所有头寸
- """
- if self.verbose_logging:
- print(f" 平掉 {old_contract} 的所有头寸")
-
- # 获取当日收盘价
- close_price = self._get_price_on_date(commodity, old_contract, switch_date, 'close')
- if close_price is None:
- if self.verbose_logging:
- print(f" 无法获取 {switch_date} 的价格数据,跳过平仓")
- return
-
- # 平掉基础头寸交易的头寸
- if commodity in self.active_positions['base_position']:
- positions = self.active_positions['base_position'][commodity].copy()
- for position_id, position in positions.items():
- if position['contract'] == old_contract and position['status'] == 'open':
- # 使用正确的期货盈亏计算公式(基础头寸都是多头)
- profit_loss = self._calculate_futures_pnl(
- position['entry_price'], close_price, position['quantity'], commodity, is_long=True
- )
- profit_loss_pct = (close_price - position['entry_price']) / position['entry_price']
-
- trade_record = {
- 'commodity': commodity,
- 'contract': old_contract,
- 'strategy': 'base_position',
- 'entry_date': position['entry_date'],
- 'exit_date': switch_date.strftime('%Y-%m-%d'),
- 'entry_price': position['entry_price'],
- 'exit_price': close_price,
- 'quantity': position['quantity'],
- 'profit_loss': profit_loss,
- 'profit_loss_pct': profit_loss_pct,
- 'days_held': (switch_date - datetime.datetime.strptime(position['entry_date'], '%Y-%m-%d').date()).days,
- 'exit_reason': 'contract_switch'
- }
-
- self.trading_results['base_position'].append(trade_record)
- self.active_positions['base_position'][commodity][position_id]['status'] = 'closed'
- self.active_positions['base_position'][commodity][position_id]['close_reason'] = 'contract_switch'
-
- if self.verbose_logging:
- print(f" 基础头寸平仓: {position['entry_price']} -> {close_price:.2f}, 盈亏: {profit_loss:.2f}")
-
- # 平掉网格交易的头寸
- if commodity in self.active_positions['grid_trading']:
- positions = self.active_positions['grid_trading'][commodity].copy()
- for position_id, position in positions.items():
- if position['contract'] == old_contract and position['status'] == 'open':
- # 使用正确的期货盈亏计算公式(网格交易都是多头)
- profit_loss = self._calculate_futures_pnl(
- position['entry_price'], close_price, position['quantity'], commodity, is_long=True
- )
- profit_loss_pct = (close_price - position['entry_price']) / position['entry_price']
-
- trade_record = {
- 'commodity': commodity,
- 'contract': old_contract,
- 'strategy': 'grid_trading',
- 'entry_date': position['entry_date'],
- 'exit_date': switch_date.strftime('%Y-%m-%d'),
- 'entry_price': position['entry_price'],
- 'exit_price': close_price,
- 'quantity': position['quantity'],
- 'profit_loss': profit_loss,
- 'profit_loss_pct': profit_loss_pct,
- 'days_held': (switch_date - datetime.datetime.strptime(position['entry_date'], '%Y-%m-%d').date()).days,
- 'exit_reason': 'contract_switch'
- }
-
- self.trading_results['grid_trading'].append(trade_record)
- self.active_positions['grid_trading'][commodity][position_id]['status'] = 'closed'
- self.active_positions['grid_trading'][commodity][position_id]['close_reason'] = 'contract_switch'
-
- if self.verbose_logging:
- print(f" 网格头寸平仓: {position['entry_price']} -> {close_price:.2f}, 盈亏: {profit_loss:.2f}")
-
- # 平掉对冲头寸
- if commodity in self.active_positions['hedge_position']:
- positions = self.active_positions['hedge_position'][commodity].copy()
- for position_id, position in positions.items():
- if position['contract'] == old_contract and position['status'] == 'open':
- # 使用正确的期货盈亏计算公式(对冲头寸是空头)
- profit_loss = self._calculate_futures_pnl(
- position['entry_price'], close_price, position['quantity'], commodity, is_long=False
- )
- profit_loss_pct = (position['entry_price'] - close_price) / position['entry_price']
-
- trade_record = {
- 'commodity': commodity,
- 'contract': old_contract,
- 'strategy': 'hedge_position',
- 'entry_date': position['entry_date'],
- 'exit_date': switch_date.strftime('%Y-%m-%d'),
- 'entry_price': position['entry_price'],
- 'exit_price': close_price,
- 'quantity': position['quantity'],
- 'profit_loss': profit_loss,
- 'profit_loss_pct': profit_loss_pct,
- 'days_held': (switch_date - datetime.datetime.strptime(position['entry_date'], '%Y-%m-%d').date()).days,
- 'exit_reason': 'contract_switch',
- 'is_short': True
- }
-
- self.trading_results['hedge_position'].append(trade_record)
- self.active_positions['hedge_position'][commodity][position_id]['status'] = 'closed'
- self.active_positions['hedge_position'][commodity][position_id]['close_reason'] = 'contract_switch'
-
- if self.verbose_logging:
- print(f" 对冲头寸平仓: {position['entry_price']} -> {close_price:.2f}, 盈亏: {profit_loss:.2f}")
-
- def _reestablish_positions_in_new_contract(self, commodity, new_contract, switch_date):
- """
- 在新合约中重新建仓
- """
- if self.verbose_logging:
- print(f" 在 {new_contract} 中重新建仓")
-
- # 获取当日收盘价
- close_price = self._get_price_on_date(commodity, new_contract, switch_date, 'close')
- if close_price is None:
- if self.verbose_logging:
- print(f" 无法获取 {switch_date} 的价格数据,跳过重新建仓")
- return
-
- # 基础头寸交易重新建仓
- self._reestablish_base_positions(commodity, new_contract, close_price, switch_date)
-
- # 网格交易重新建仓
- self._reestablish_grid_positions(commodity, new_contract, close_price, switch_date)
-
- # 对冲头寸重新建仓
- self._reestablish_hedge_positions(commodity, new_contract, close_price, switch_date)
-
- def _reestablish_base_positions(self, commodity, new_contract, close_price, switch_date):
- """重新建立基础头寸"""
- if commodity not in self.base_position_grid:
- return
-
- # 获取之前被平掉的基础头寸信息(按价格水平记录)
- closed_positions = {} # price_level -> quantity
- if commodity in self.active_positions['base_position']:
- for position in self.active_positions['base_position'][commodity].values():
- if position['status'] == 'closed' and 'contract_switch' in position.get('close_reason', ''):
- # 只处理因合约切换而平掉的头寸
- original_price = position.get('original_price_level', position['entry_price'])
- if original_price not in closed_positions:
- closed_positions[original_price] = 0
- closed_positions[original_price] += position['quantity']
-
- if self.verbose_logging:
- print(f" 发现需重建的基础头寸: {original_price}水平 {position['quantity']}手 (原合约: {position['contract']})")
-
- # 根据当前价格和原始价格水平重建头寸
- price_grid = self.base_position_grid[commodity]
- reestablish_count = 0
-
- for target_price, configured_quantity in price_grid.items():
- # 只有当目标价格大于等于当前价格时才重建头寸
- # 这确保了只重建"应该持有"的价格水平头寸
- if target_price >= close_price:
- # 检查是否有该价格水平的平仓头寸需要重建
- if target_price in closed_positions:
- quantity_to_reestablish = closed_positions[target_price]
- if self.verbose_logging:
- print(f" 重建条件检查: {target_price} >= {close_price:.2f} ✓ (重建原有平仓头寸)")
- else:
- # 当前价格低于目标价格,应该建立该价格水平的头寸
- quantity_to_reestablish = configured_quantity
- if self.verbose_logging:
- print(f" 重建条件检查: {target_price} >= {close_price:.2f} ✓ (建立新头寸)")
- else:
- # 当前价格高于目标价格,不重建
- if self.verbose_logging:
- print(f" 重建条件检查: {target_price} >= {close_price:.2f} ✗ (跳过重建)")
- continue
-
- if quantity_to_reestablish > 0:
- position_id = f"{commodity}_{new_contract}_{switch_date}_base_reestablish_{target_price}"
-
- if commodity not in self.active_positions['base_position']:
- self.active_positions['base_position'][commodity] = {}
-
- self.active_positions['base_position'][commodity][position_id] = {
- 'contract': new_contract,
- 'entry_date': switch_date.strftime('%Y-%m-%d'),
- 'entry_price': close_price, # 实际成交价格
- 'original_price_level': target_price, # 原始价格水平
- 'quantity': quantity_to_reestablish,
- 'status': 'open',
- 'exit_target': self.base_position_exit_price.get(commodity),
- 'hedge_checked': False # 标记为未检查对冲,允许在换月当天进行对冲检查
- }
-
- if self.verbose_logging:
- print(f" 创建重建头寸: {position_id}")
- print(f" 实际成交价格: {close_price}, 原始价格水平: {target_price}, 数量: {quantity_to_reestablish}")
-
- reestablish_count += quantity_to_reestablish
-
- if self.verbose_logging:
- print(f" 重建基础头寸 {target_price}水平: {quantity_to_reestablish} 手 @ {close_price:.2f}")
-
- if reestablish_count > 0 and self.verbose_logging:
- print(f" 基础头寸重建完成,总计: {reestablish_count} 手")
-
- def _reestablish_grid_positions(self, commodity, new_contract, close_price, switch_date):
- """重新建立网格交易头寸"""
- if commodity not in self.grid_trading_config:
- return
-
- config = self.grid_trading_config[commodity]
- grid_size = config['grid_size']
- quantity_per_grid = config['quantity_per_grid']
- exit_grid_size = config['exit_grid_size']
-
- # 获取之前的网格头寸信息
- previous_grid_levels = set()
- if commodity in self.active_positions['grid_trading']:
- for position in self.active_positions['grid_trading'][commodity].values():
- if position['status'] == 'closed' and 'contract_switch' in position.get('close_reason', ''):
- # 只处理因合约切换而平掉的头寸
- previous_grid_levels.add(position['entry_price'])
-
- if self.verbose_logging:
- print(f" 发现需重建的网格头寸: {position['entry_price']}水平 {position['quantity']}手")
-
- # 仅在原始网格开仓价格大于等于当前价格时重新建仓
- # 这确保了只重建"应该持有"的网格水平头寸
- reestablish_count = 0
- for grid_level in previous_grid_levels:
- if grid_level >= close_price:
- if self.verbose_logging:
- print(f" 网格重建条件检查: {grid_level} >= {close_price:.2f} ✓ (重建网格头寸)")
- position_id = f"{commodity}_{new_contract}_{switch_date}_grid_{grid_level}"
-
- if commodity not in self.active_positions['grid_trading']:
- self.active_positions['grid_trading'][commodity] = {}
-
- self.active_positions['grid_trading'][commodity][position_id] = {
- 'contract': new_contract,
- 'entry_date': switch_date.strftime('%Y-%m-%d'),
- 'entry_price': close_price,
- 'original_grid_level': grid_level,
- 'quantity': quantity_per_grid,
- 'status': 'open',
- 'exit_target': grid_level + exit_grid_size # 保持原始退出价格
- }
-
- reestablish_count += 1
- else:
- if self.verbose_logging:
- print(f" 网格重建条件检查: {grid_level} >= {close_price:.2f} ✗ (跳过重建)")
- continue
-
- # 同时检查是否需要开立新的网格头寸(价格更低的情况)
- start_price = config['start_price']
- current_level = start_price
- while current_level > close_price:
- current_level -= grid_size
- if current_level not in previous_grid_levels and current_level > 0:
- # 这是一个新的网格水平
- position_id = f"{commodity}_{new_contract}_{switch_date}_grid_new_{current_level}"
-
- if commodity not in self.active_positions['grid_trading']:
- self.active_positions['grid_trading'][commodity] = {}
-
- self.active_positions['grid_trading'][commodity][position_id] = {
- 'contract': new_contract,
- 'entry_date': switch_date.strftime('%Y-%m-%d'),
- 'entry_price': close_price,
- 'original_grid_level': current_level,
- 'quantity': quantity_per_grid,
- 'status': 'open',
- 'exit_target': current_level + exit_grid_size
- }
-
- reestablish_count += 1
-
- if self.verbose_logging and reestablish_count > 0:
- print(f" 重建网格头寸: {reestablish_count} 个网格 @ {close_price:.2f}")
-
- def _reestablish_hedge_positions(self, commodity, new_contract, close_price, switch_date):
- """重新建立对冲头寸"""
- if not self.config.HEDGE_ENABLED:
- return
-
- if commodity not in self.base_position_grid:
- return
-
- # 计算等权重数量
- hedge_quantity = self._calculate_equal_weight_hedge_quantity(commodity)
- if hedge_quantity == 0:
- return
-
- # 检查是否暂停建立新对冲
- self._initialize_hedge_state(commodity)
- if self.hedge_state[commodity].get('suspend_new_hedges', False):
- if self.verbose_logging:
- print(f" 对冲重建:波动率过滤暂停,跳过对冲头寸重建")
- return
-
- # 获取之前被平掉的对冲头寸信息
- closed_hedge_positions = {} # price_level -> True
- if commodity in self.active_positions['hedge_position']:
- for position in self.active_positions['hedge_position'][commodity].values():
- if position['status'] == 'closed' and 'contract_switch' in position.get('close_reason', ''):
- original_price = position.get('original_price_level', position['entry_price'])
- closed_hedge_positions[original_price] = True
-
- if self.verbose_logging:
- print(f" 发现需重建的对冲头寸: {original_price}水平 {position['quantity']}手")
-
- # 检查基础头寸,在有基础头寸的价格水平重建对冲头寸
- reestablish_count = 0
- if commodity in self.active_positions['base_position']:
- for position in self.active_positions['base_position'][commodity].values():
- if position['contract'] == new_contract and position['status'] == 'open':
- original_price_level = position.get('original_price_level', position['entry_price'])
-
- # 检查该价格水平是否已有对冲头寸
- has_hedge = False
- if commodity in self.active_positions['hedge_position']:
- for hedge_pos in self.active_positions['hedge_position'][commodity].values():
- if (hedge_pos['contract'] == new_contract and
- hedge_pos['status'] == 'open' and
- hedge_pos.get('original_price_level', hedge_pos['entry_price']) == original_price_level):
- has_hedge = True
- break
-
- # 如果没有对冲头寸,则建立
- if not has_hedge:
- position_id = f"{commodity}_{new_contract}_{switch_date}_hedge_reestablish_{original_price_level}"
-
- if commodity not in self.active_positions['hedge_position']:
- self.active_positions['hedge_position'][commodity] = {}
-
- self.active_positions['hedge_position'][commodity][position_id] = {
- 'contract': new_contract,
- 'entry_date': switch_date.strftime('%Y-%m-%d'),
- 'entry_price': close_price,
- 'original_price_level': original_price_level,
- 'quantity': hedge_quantity,
- 'status': 'open',
- 'is_short': True
- }
-
- reestablish_count += 1
-
- if self.verbose_logging:
- print(f" 重建对冲头寸: 网格{original_price_level}水平 @ {close_price:.2f}, 数量: {hedge_quantity}手(空头)")
-
- if self.verbose_logging and reestablish_count > 0:
- print(f" 重建对冲头寸完成: {reestablish_count} 个头寸")
-
- # 更新最后建仓日期(如果有重建头寸)
- if reestablish_count > 0 and commodity in self.hedge_state:
- self.hedge_state[commodity]['last_hedge_entry_date'] = switch_date.strftime('%Y-%m-%d')
-
- # 重置对冲状态(因为是新合约)
- if commodity in self.hedge_state:
- self.hedge_state[commodity]['peak_profit'] = 0
-
- def _get_price_on_date(self, commodity, contract, date, price_type='close'):
- """获取指定日期和合约的价格(增强NaN问题诊断)"""
- if commodity not in self.price_data or contract not in self.price_data[commodity]:
- if self.verbose_logging:
- print(f" ❌ 价格数据不存在: {commodity} -> {contract}")
- return None
-
- price_data = self.price_data[commodity][contract]
-
- # 找到日期对应的价格
- target_date = date if isinstance(date, datetime.date) else date.date()
-
- for idx, row in price_data.iterrows():
- if idx.date() == target_date:
- price_value = row[price_type]
-
- if self.verbose_logging:
- print(f'{price_type}的价格是: {price_value}')
-
- # 如果价格为NaN,进行详细诊断
- if pd.isna(price_value):
- self._diagnose_nan_price_issue(commodity, contract, target_date, price_type, row)
- return None
- else:
- return price_value
-
- # 如果没有找到精确日期,尝试查找最近的交易日
- if self.verbose_logging:
- print(f" ⚠️ 未找到 {contract} 在 {target_date} 的数据,尝试查找最近交易日...")
-
- return self._get_nearest_trading_day_price(commodity, contract, target_date, price_type)
-
- def _diagnose_nan_price_issue(self, commodity, contract, date, price_type, row):
- """诊断NaN价格问题的根本原因"""
- if self.verbose_logging:
- print(f" 🔍 NaN价格问题诊断: {commodity} {contract} {date}")
- print(f" 目标价格类型: {price_type}")
- print(f" 该日所有价格数据: 开盘={row['open']}, 收盘={row['close']}, 最高={row['high']}, 最低={row['low']}, 成交量={row['volume']}")
-
- # 检查是否所有价格都是NaN
- price_fields = ['open', 'close', 'high', 'low']
- nan_fields = [field for field in price_fields if pd.isna(row[field])]
- valid_fields = [field for field in price_fields if not pd.isna(row[field])]
-
- if len(nan_fields) == len(price_fields):
- print(f" ❌ 所有价格字段都为NaN - 可能该合约在此日期未开始交易")
- else:
- print(f" ⚠️ 部分价格字段为NaN: {nan_fields}")
- print(f" ✅ 有效价格字段: {valid_fields}")
-
- # 如果有有效价格,尝试使用替代方案
- if valid_fields:
- fallback_price = row[valid_fields[0]]
- print(f" 💡 建议使用替代价格: {valid_fields[0]} = {fallback_price}")
-
- # 检查成交量是否为0或NaN
- if pd.isna(row['volume']) or row['volume'] == 0:
- print(f" ⚠️ 成交量异常: {row['volume']} - 可能该合约在此日期无交易活动")
-
- # 检查是否是合约刚上市的情况
- price_data = self.price_data[commodity][contract]
- first_valid_date = None
- for idx, data_row in price_data.iterrows():
- if not pd.isna(data_row['close']):
- first_valid_date = idx.date()
- break
-
- if first_valid_date and date < first_valid_date:
- print(f" 🔍 合约首次有效交易日: {first_valid_date} (查询日期 {date} 早于首次交易日)")
- print(f" 💡 建议: 合约 {contract} 在 {date} 可能尚未开始交易")
-
- # 提供解决建议
- print(f" 📋 解决建议:")
- print(f" 1. 检查合约 {contract} 的上市日期")
- print(f" 2. 验证合约代码是否正确")
- print(f" 3. 考虑调整合约切换日期")
- if valid_fields:
- print(f" 4. 临时使用替代价格: {valid_fields[0]} = {row[valid_fields[0]]}")
-
- def _get_nearest_trading_day_price(self, commodity, contract, target_date, price_type):
- """获取最近交易日的价格"""
- price_data = self.price_data[commodity][contract]
-
- # 查找最近的交易日(前后5天范围内)
- search_range = 5
- for offset in range(1, search_range + 1):
- # 先查找之后的日期
- future_date = target_date + datetime.timedelta(days=offset)
- for idx, row in price_data.iterrows():
- if idx.date() == future_date:
- price_value = row[price_type]
- if not pd.isna(price_value):
- if self.verbose_logging:
- print(f" ✅ 使用后续交易日 {future_date} 的价格: {price_value}")
- return price_value
- break
-
- # 再查找之前的日期
- past_date = target_date - datetime.timedelta(days=offset)
- for idx, row in price_data.iterrows():
- if idx.date() == past_date:
- price_value = row[price_type]
- if not pd.isna(price_value):
- if self.verbose_logging:
- print(f" ✅ 使用前期交易日 {past_date} 的价格: {price_value}")
- return price_value
- break
-
- if self.verbose_logging:
- print(f" ❌ 在 {search_range} 天范围内未找到有效的 {price_type} 价格")
- return None
-
- def _process_daily_trading(self, current_date):
- """处理每日的正常交易逻辑"""
- for commodity in self.core_commodities.keys():
- current_contract = self._get_current_contract(commodity, current_date)
- if not current_contract:
- continue
-
- # 获取当日价格数据
- daily_prices = self._get_daily_prices(commodity, current_contract, current_date)
- if not daily_prices:
- continue
-
- # 检查基础头寸入场和退出机会
- self._check_base_position_trading(commodity, current_contract, current_date, daily_prices)
-
- # 检查网格交易入场和退出机会
- self._check_grid_trading(commodity, current_contract, current_date, daily_prices)
-
- # ========== 对冲策略相关检查 ==========
- if self.config.HEDGE_ENABLED:
- # 检查波动率过滤
- self._check_atr_volatility_filter(commodity, current_contract, current_date)
-
- # 检查基础头寸建仓完成状态
- self._check_base_positions_complete(commodity, current_contract)
-
- # 检查对冲建仓机会
- self._check_hedge_position_entry(commodity, current_contract, current_date, daily_prices)
-
- # 检查对冲止盈
- self._check_hedge_take_profit(commodity, current_contract, current_date, daily_prices)
-
- # 检查对冲止损
- self._check_hedge_stop_loss(commodity, current_contract, current_date, daily_prices)
-
- def _get_daily_prices(self, commodity, contract, date):
- """获取指定日期的价格数据"""
- if commodity not in self.price_data or contract not in self.price_data[commodity]:
- return None
-
- price_data = self.price_data[commodity][contract]
-
- for idx, row in price_data.iterrows():
- if idx.date() == date:
- return {
- 'open': row['open'],
- 'close': row['close'],
- 'high': row['high'],
- 'low': row['low'],
- 'volume': row['volume']
- }
-
- return None
-
- def _check_base_position_trading(self, commodity, contract, current_date, daily_prices):
- """检查基础头寸交易机会
-
- 逻辑:每日主动检查所有网格水平
- - 如果当前收盘价低于某个网格水平价格
- - 且该网格水平没有未平仓头寸
- - 则以当日收盘价在该网格水平开仓
- """
- if commodity not in self.base_position_grid:
- return
-
- # 检查入场机会 - 遍历所有网格水平
- price_grid = self.base_position_grid[commodity]
- current_close_price = daily_prices['close']
- daily_low = daily_prices['low']
- daily_high = daily_prices['high']
-
- for entry_price, quantity in price_grid.items():
- # 检查是否已经有这个价格水平的头寸
- position_exists = False
- if commodity in self.active_positions['base_position']:
- for position in self.active_positions['base_position'][commodity].values():
- if (position['contract'] == contract and position['status'] == 'open'):
- # 检查原始价格水平
- position_price_level = position.get('original_price_level', position['entry_price'])
- if position_price_level == entry_price:
- position_exists = True
- break
-
- # 主动开仓逻辑:当前价格低于网格水平价格 且 该水平没有头寸
- if not position_exists and current_close_price < entry_price:
- price_touched_grid = daily_low <= entry_price <= daily_high
- actual_entry_price = entry_price if price_touched_grid else current_close_price
- # 以符合规则的价格建立头寸
- position_id = f"{commodity}_{contract}_{current_date}_base_{entry_price}"
- if commodity not in self.active_positions['base_position']:
- self.active_positions['base_position'][commodity] = {}
-
- self.active_positions['base_position'][commodity][position_id] = {
- 'contract': contract,
- 'entry_date': current_date.strftime('%Y-%m-%d'),
- 'entry_price': actual_entry_price,
- 'original_price_level': entry_price, # 记录原始网格水平
- 'quantity': quantity,
- 'status': 'open',
- 'exit_target': self.base_position_exit_price.get(commodity),
- 'hedge_checked': False # 初始化对冲检查标记
- }
-
- if self.verbose_logging:
- print(f" {current_date}: {commodity} 基础头寸入场 @ 网格{entry_price} (实际价格: {actual_entry_price:.2f}), 数量: {quantity},当天收盘价{current_close_price:.2f}低于网格价格{entry_price}")
- print(f" 当天最低价:{daily_low}, 当天最高价:{daily_high}")
- if price_touched_grid:
- print(f" 价格区间触及网格价{entry_price},按网格价成交")
- else:
- print(f" 价格区间未触及网格价{entry_price},按收盘价成交")
-
- # 检查退出机会
- exit_target = self.base_position_exit_price.get(commodity)
- if exit_target and commodity in self.active_positions['base_position']:
- positions_to_close = []
- for position_id, position in self.active_positions['base_position'][commodity].items():
- if position['contract'] == contract and position['status'] == 'open' and daily_prices['high'] >= exit_target:
- positions_to_close.append(position_id)
-
- for position_id in positions_to_close:
- position = self.active_positions['base_position'][commodity][position_id]
- exit_price = min(exit_target, daily_prices['high'])
-
- # 使用正确的期货盈亏计算公式(基础头寸都是多头)
- profit_loss = self._calculate_futures_pnl(
- position['entry_price'], exit_price, position['quantity'], commodity, is_long=True
- )
- profit_loss_pct = (exit_price - position['entry_price']) / position['entry_price']
-
- trade_record = {
- 'commodity': commodity,
- 'contract': contract,
- 'strategy': 'base_position',
- 'entry_date': position['entry_date'],
- 'exit_date': current_date.strftime('%Y-%m-%d'),
- 'entry_price': position['entry_price'],
- 'exit_price': exit_price,
- 'quantity': position['quantity'],
- 'profit_loss': profit_loss,
- 'profit_loss_pct': profit_loss_pct,
- 'days_held': (current_date - datetime.datetime.strptime(position['entry_date'], '%Y-%m-%d').date()).days,
- 'exit_reason': 'target_reached'
- }
-
- self.trading_results['base_position'].append(trade_record)
- self.active_positions['base_position'][commodity][position_id]['status'] = 'closed'
-
- if self.verbose_logging:
- print(f" {current_date}: {commodity} 基础头寸退出 {position['entry_price']} -> {exit_price:.2f}, 盈亏: {profit_loss:.2f}")
-
- def _check_grid_trading(self, commodity, contract, current_date, daily_prices):
- """检查网格交易机会"""
- if commodity not in self.grid_trading_config:
- return
-
- config = self.grid_trading_config[commodity]
- start_price = config['start_price']
- grid_size = config['grid_size']
- quantity_per_grid = config['quantity_per_grid']
- exit_grid_size = config['exit_grid_size']
-
- # 检查入场机会
- current_level = start_price
- while current_level > daily_prices['low'] and current_level > 0:
- position_exists = False
- if commodity in self.active_positions['grid_trading']:
- for position in self.active_positions['grid_trading'][commodity].values():
- if (position['contract'] == contract and
- position.get('original_grid_level', position['entry_price']) == current_level and
- position['status'] == 'open'):
- position_exists = True
- break
-
- if not position_exists and daily_prices['low'] <= current_level <= daily_prices['high']:
- position_id = f"{commodity}_{contract}_{current_date}_grid_{current_level}"
- if commodity not in self.active_positions['grid_trading']:
- self.active_positions['grid_trading'][commodity] = {}
-
- self.active_positions['grid_trading'][commodity][position_id] = {
- 'contract': contract,
- 'entry_date': current_date.strftime('%Y-%m-%d'),
- 'entry_price': current_level,
- 'original_grid_level': current_level,
- 'quantity': quantity_per_grid,
- 'status': 'open',
- 'exit_target': current_level + exit_grid_size
- }
-
- if self.verbose_logging:
- print(f" {current_date}: {commodity} 网格入场 @ {current_level},数量:{quantity_per_grid},当天最低价为{daily_prices['low']},最高价为{daily_prices['high']}")
-
- current_level -= grid_size
-
- # 检查退出机会
- if commodity in self.active_positions['grid_trading']:
- positions_to_close = []
- for position_id, position in self.active_positions['grid_trading'][commodity].items():
- if position['contract'] == contract and position['status'] == 'open':
- if daily_prices['high'] >= position['exit_target']:
- positions_to_close.append(position_id)
-
- for position_id in positions_to_close:
- position = self.active_positions['grid_trading'][commodity][position_id]
- exit_price = position['exit_target']
-
- # 使用正确的期货盈亏计算公式(网格交易都是多头)
- profit_loss = self._calculate_futures_pnl(
- position['entry_price'], exit_price, position['quantity'], commodity, is_long=True
- )
- profit_loss_pct = (exit_price - position['entry_price']) / position['entry_price']
-
- trade_record = {
- 'commodity': commodity,
- 'contract': contract,
- 'strategy': 'grid_trading',
- 'entry_date': position['entry_date'],
- 'exit_date': current_date.strftime('%Y-%m-%d'),
- 'entry_price': position['entry_price'],
- 'exit_price': exit_price,
- 'quantity': position['quantity'],
- 'profit_loss': profit_loss,
- 'profit_loss_pct': profit_loss_pct,
- 'days_held': (current_date - datetime.datetime.strptime(position['entry_date'], '%Y-%m-%d').date()).days,
- 'exit_reason': 'target_reached'
- }
-
- self.trading_results['grid_trading'].append(trade_record)
- self.active_positions['grid_trading'][commodity][position_id]['status'] = 'closed'
-
- if self.verbose_logging:
- print(f" {current_date}: {commodity} 网格退出 {position['entry_price']} -> {exit_price:.2f}, 盈亏: {profit_loss:.2f} ({profit_loss_pct:.2%})")
-
- # ==================== 对冲策略相关方法 ====================
-
- def _calculate_equal_weight_hedge_quantity(self, commodity):
- """
- 计算等权重对冲数量
-
- 从BASE_POSITION_GRID中汇总所有数量并平均分配到所有价格水平
-
- 参数:
- commodity: 商品代码
-
- 返回:
- 等权重数量(整数)
- """
- if commodity not in self.base_position_grid:
- return 0
-
- price_grid = self.base_position_grid[commodity]
-
- # 确保price_grid是字典类型
- if not isinstance(price_grid, dict):
- if self.verbose_logging:
- print(f" ⚠️ 对冲数量计算错误: {commodity} 的price_grid不是字典类型,而是{type(price_grid)}")
- return 0
-
- # 显式转换dict.values()为列表并求和,确保得到数值
- quantities = list(price_grid.values())
- total_quantity = sum(quantities)
- num_levels = len(price_grid)
-
- if num_levels == 0:
- return 0
-
- # 确保total_quantity是数值类型(包括numpy类型)
- try:
- # 尝试转换为Python int,这会处理numpy类型
- total_quantity = int(total_quantity)
- except (TypeError, ValueError):
- if self.verbose_logging:
- print(f" ⚠️ 对冲数量计算错误: total_quantity无法转换为数值类型,类型为{type(total_quantity)}: {total_quantity}")
- return 0
-
- # 计算平均数量并向下取整
- equal_weight_quantity = int(total_quantity / num_levels)
-
- return equal_weight_quantity
-
- def _initialize_hedge_state(self, commodity):
- """初始化商品的对冲状态"""
- if commodity not in self.hedge_state:
- self.hedge_state[commodity] = {
- 'base_positions_complete': False,
- 'peak_profit': 0,
- 'atr_history': [],
- 'suspend_new_hedges': False,
- # 新增:用于追踪多头仓位回撤和对冲分阶段止盈
- 'max_drawdown': 0, # 多头仓位最大浮亏(绝对值)
- 'max_drawdown_price': 0, # 出现最大浮亏时的价格(最大浮亏价格)
- 'first_tp_triggered': False, # 一级止盈是否已触发
- 'hedge_quantity_closed': 0, # 已平仓的对冲数量(用于计算剩余对冲比例)
- 'last_hedge_entry_date': None # 最后一次对冲建仓日期
- }
-
- def _check_base_positions_complete(self, commodity, contract):
- """
- 检查基础头寸是否建仓完成
-
- 当所有配置的网格价格水平都有持仓(状态为'open')时,认为建仓完成
-
- 参数:
- commodity: 商品代码
- contract: 合约代码
- """
- if not self.config.HEDGE_ENABLED:
- return
-
- if commodity not in self.base_position_grid:
- return
-
- self._initialize_hedge_state(commodity)
-
- # 获取所有配置的价格水平
- configured_levels = set(self.base_position_grid[commodity].keys())
-
- # 获取当前持仓的价格水平
- open_levels = set()
- if commodity in self.active_positions['base_position']:
- for position in self.active_positions['base_position'][commodity].values():
- if position['contract'] == contract and position['status'] == 'open':
- original_level = position.get('original_price_level', position['entry_price'])
- open_levels.add(original_level)
-
- # 检查是否所有配置水平都有持仓
- was_complete = self.hedge_state[commodity]['base_positions_complete']
- is_complete = configured_levels == open_levels
-
- self.hedge_state[commodity]['base_positions_complete'] = is_complete
-
- # 状态变化时记录日志
- if is_complete and not was_complete:
- if self.verbose_logging:
- print(f" ✅ {commodity} 基础头寸建仓完成:所有{len(configured_levels)}个价格水平都有持仓")
-
- def _calculate_atr(self, commodity, contract, current_date):
- """
- 计算ATR并判断波动率过滤条件
-
- 参数:
- commodity: 商品代码
- contract: 合约代码
- current_date: 当前日期
-
- 返回:
- (current_atr, atr_3m_avg, pass_filter): 当前ATR、3个月ATR平均值、是否通过过滤
- """
- if commodity not in self.price_data or contract not in self.price_data[commodity]:
- return None, None, False
-
- price_data = self.price_data[commodity][contract]
- target_date = current_date if isinstance(current_date, datetime.date) else current_date.date()
-
- # 获取历史数据
- historical_data = price_data[price_data.index.date <= target_date]
-
- if len(historical_data) < self.config.HEDGE_ATR_PERIOD:
- # 数据不足,尝试扩展
- extended_data = self._extend_price_data_for_ma(commodity, contract, current_date,
- self.config.HEDGE_ATR_PERIOD * 6)
- if extended_data is not None and len(extended_data) >= self.config.HEDGE_ATR_PERIOD:
- historical_data = extended_data
- else:
- if self.verbose_logging:
- print(f" ⚠️ ATR计算:{commodity} 数据不足(需要{self.config.HEDGE_ATR_PERIOD}天)")
- return None, None, False
-
- # 计算True Range
- historical_data = historical_data.copy()
- historical_data['h-l'] = historical_data['high'] - historical_data['low']
- historical_data['h-pc'] = abs(historical_data['high'] - historical_data['close'].shift(1))
- historical_data['l-pc'] = abs(historical_data['low'] - historical_data['close'].shift(1))
- historical_data['tr'] = historical_data[['h-l', 'h-pc', 'l-pc']].max(axis=1)
-
- # 计算14日ATR
- atr_data = historical_data['tr'].rolling(window=self.config.HEDGE_ATR_PERIOD).mean()
- current_atr = atr_data.iloc[-1]
-
- # 计算3个月ATR平均值
- lookback_days = self.config.HEDGE_ATR_LOOKBACK_MONTHS * 30
- if len(atr_data) >= lookback_days:
- atr_3m_avg = atr_data.iloc[-lookback_days:].mean()
- else:
- # 使用所有可用数据
- atr_3m_avg = atr_data.mean()
-
- # 判断是否通过过滤
- threshold = self.config.HEDGE_ATR_THRESHOLD * atr_3m_avg
- pass_filter = current_atr >= threshold
-
- return current_atr, atr_3m_avg, pass_filter
-
- def _check_atr_volatility_filter(self, commodity, contract, current_date):
- """
- 检查ATR波动率过滤条件
-
- 当HEDGE_USE_ATR_FILTER=True且当前14日ATR < 0.7×(3个月ATR平均值)时,暂停建立新的对冲头寸
- 当HEDGE_USE_ATR_FILTER=False时,不应用波动率过滤,始终允许建立对冲头寸
-
- 参数:
- commodity: 商品代码
- contract: 合约代码
- current_date: 当前日期
- """
- if not self.config.HEDGE_ENABLED:
- return
-
- self._initialize_hedge_state(commodity)
-
- # 如果未启用ATR过滤,始终允许对冲
- if not self.config.HEDGE_USE_ATR_FILTER:
- self.hedge_state[commodity]['suspend_new_hedges'] = False
- return
-
- current_atr, atr_3m_avg, pass_filter = self._calculate_atr(commodity, contract, current_date)
-
- if current_atr is None:
- return
-
- was_suspended = self.hedge_state[commodity]['suspend_new_hedges']
- should_suspend = not pass_filter
-
- self.hedge_state[commodity]['suspend_new_hedges'] = should_suspend
-
- # 状态变化时记录日志
- if should_suspend != was_suspended:
- if should_suspend:
- if self.verbose_logging:
- print(f" 🚫 {commodity} 波动率过滤触发:暂停新对冲 (ATR={current_atr:.2f} < {self.config.HEDGE_ATR_THRESHOLD}×{atr_3m_avg:.2f}={self.config.HEDGE_ATR_THRESHOLD * atr_3m_avg:.2f})")
- else:
- if self.verbose_logging:
- print(f" ✅ {commodity} 波动率恢复:允许新对冲 (ATR={current_atr:.2f} >= {self.config.HEDGE_ATR_THRESHOLD}×{atr_3m_avg:.2f}={self.config.HEDGE_ATR_THRESHOLD * atr_3m_avg:.2f})")
-
- def _check_hedge_position_entry(self, commodity, contract, current_date, daily_prices):
- """
- 检查对冲头寸建仓机会
-
- 只对当天新建仓的基础头寸进行对冲检查。
- 对冲操作只在基础头寸开仓当天有效,如果当天不满足对冲条件,
- 则标记该基础头寸为"无需对冲",后续不再检查。
-
- 参数:
- commodity: 商品代码
- contract: 合约代码
- current_date: 当前日期
- daily_prices: 当日价格数据
- """
- if not self.config.HEDGE_ENABLED:
- return
-
- if commodity not in self.base_position_grid:
- return
-
- self._initialize_hedge_state(commodity)
-
- # 检查是否暂停建立新对冲
- should_hedge = not self.hedge_state[commodity]['suspend_new_hedges']
-
- # 计算等权重数量
- hedge_quantity = self._calculate_equal_weight_hedge_quantity(commodity)
- if hedge_quantity == 0:
- return
-
- current_close_price = daily_prices['close']
- current_date_str = current_date.strftime('%Y-%m-%d')
-
- # 只检查当天新建仓的基础头寸
- if commodity in self.active_positions['base_position']:
- for position_id, position in self.active_positions['base_position'][commodity].items():
- # 只处理当天建仓且状态为open的头寸
- if (position['contract'] == contract and
- position['status'] == 'open' and
- position['entry_date'] == current_date_str):
-
- # 检查该头寸是否已经处理过对冲检查
- if position.get('hedge_checked', False):
- continue
-
- # 标记该头寸已经进行过对冲检查
- self.active_positions['base_position'][commodity][position_id]['hedge_checked'] = True
-
- original_grid_level = position.get('original_price_level', position['entry_price'])
-
- # 关键修复:对冲头寸入场条件
- # 只有当日价格区间实际触及网格价格时,才在该网格水平建立对冲头寸
- # 检查条件:当日最低价 <= 网格价格 <= 当日最高价
- daily_low = daily_prices['low']
- daily_high = daily_prices['high']
- price_range_touches_grid = daily_low <= original_grid_level <= daily_high
-
- if not price_range_touches_grid:
- # 价格区间未触及网格水平,跳过对冲
- self.active_positions['base_position'][commodity][position_id]['hedged'] = False
- self.active_positions['base_position'][commodity][position_id]['hedge_skipped_reason'] = 'price_not_touch_grid'
-
- if self.verbose_logging:
- print(f" ⏭️ {current_date}: {commodity} 基础头寸@网格{original_grid_level}未对冲 "
- f"(原因: 价格区间[{daily_low:.2f}, {daily_high:.2f}]未触及网格{original_grid_level})")
- continue
-
- # 如果满足对冲条件,则建立对冲头寸
- if should_hedge:
- hedge_position_id = f"{commodity}_{contract}_{current_date}_hedge_{original_grid_level}"
-
- if commodity not in self.active_positions['hedge_position']:
- self.active_positions['hedge_position'][commodity] = {}
-
- actual_hedge_entry_price = original_grid_level
- self.active_positions['hedge_position'][commodity][hedge_position_id] = {
- 'contract': contract,
- 'entry_date': current_date_str,
- 'entry_price': actual_hedge_entry_price,
- 'fill_price': actual_hedge_entry_price,
- 'original_price_level': original_grid_level,
- 'quantity': hedge_quantity,
- 'status': 'open',
- 'is_short': True,
- 'base_position_id': position_id # 关联到对应的基础头寸
- }
-
- # 在基础头寸中记录已对冲状态
- self.active_positions['base_position'][commodity][position_id]['hedged'] = True
- self.active_positions['base_position'][commodity][position_id]['hedge_position_id'] = hedge_position_id
-
- # 更新最后建仓日期
- self.hedge_state[commodity]['last_hedge_entry_date'] = current_date_str
-
- if self.verbose_logging:
- print(f" 📉 {current_date}: {commodity} 对冲头寸建仓 @ 网格{original_grid_level} (记录开仓价: {original_grid_level:.2f}, 实际成交价: {actual_hedge_entry_price:.2f},当日收盘价: {current_close_price:.2f}), 数量: {hedge_quantity}手(空头)")
- else:
- # 不满足对冲条件,标记为无需对冲
- self.active_positions['base_position'][commodity][position_id]['hedged'] = False
- self.active_positions['base_position'][commodity][position_id]['hedge_skipped_reason'] = 'volatility_filter' if self.hedge_state[commodity]['suspend_new_hedges'] else 'other'
-
- if self.verbose_logging:
- reason = '波动率过滤' if self.hedge_state[commodity]['suspend_new_hedges'] else '其他条件'
- print(f" ⏭️ {current_date}: {commodity} 基础头寸@网格{original_grid_level}未对冲 (原因: {reason})")
-
- def _check_hedge_take_profit(self, commodity, contract, current_date, daily_prices):
- """
- 检查对冲头寸分阶段止盈条件
-
- 策略说明:
- 对冲仓位是为了保护多头仓位免受回撤影响。随着多头仓位回升,分阶段平掉对冲仓位:
-
- 一级止盈(50%对冲仓位):
- - 触发条件:多头浮亏从最大回撤水平回升约30%时
- - 操作:平掉50%的对冲仓位
-
- 二级止盈(剩余100%对冲仓位):
- - 触发条件A:浮亏继续收窄至从最大回撤回升约60%
- - 触发条件B:市场反弹至多头成本区上方2%以内
- - 操作:平掉剩余所有对冲仓位
-
- 参数:
- commodity: 商品代码
- contract: 合约代码
- current_date: 当前日期
- daily_prices: 当日价格数据
- """
- if not self.config.HEDGE_ENABLED:
- return
-
- if commodity not in self.hedge_state:
- return
-
- # 收集所有活跃的对冲头寸
- hedge_positions = []
- if commodity in self.active_positions['hedge_position']:
- for position_id, position in self.active_positions['hedge_position'][commodity].items():
- if position['contract'] == contract and position['status'] == 'open':
- hedge_positions.append((position_id, position))
-
- if len(hedge_positions) == 0:
- if self.verbose_logging:
- print(f" ⏭️ {current_date}: {commodity} 没有活跃的对冲头寸,跳过止盈检查")
- return
-
- # 检查是否为对冲建仓当天,如果是则跳过止盈检查
- current_date_str = current_date.strftime('%Y-%m-%d')
- last_hedge_entry_date = self.hedge_state[commodity].get('last_hedge_entry_date')
-
- if last_hedge_entry_date == current_date_str:
- if self.verbose_logging:
- print(f" ⏭️ {current_date}: {commodity} 对冲建仓当天,跳过止盈检查(活跃对冲头寸: {len(hedge_positions)})")
- return
-
- if self.verbose_logging:
- print(f" 📊 {current_date}: {commodity} 活跃的对冲头寸: {len(hedge_positions)}")
- # 计算所有基础多头头寸的浮动盈亏
- # 注意:浮亏峰值使用最低价计算,回升判断使用最高价计算
- base_positions_pnl_at_low = 0 # 基于最低价的浮亏(用于峰值计算)
- base_positions_pnl_at_high = 0 # 基于最高价的浮亏(用于回升判断)
- base_positions_cost = 0 # 用于计算平均成本价
- total_base_quantity = 0
-
- if commodity in self.active_positions['base_position']:
- for position_id, position in self.active_positions['base_position'][commodity].items():
- if position['contract'] == contract and position['status'] == 'open':
- # 计算基于最低价的浮动盈亏(多头浮亏峰值)
- floating_pnl_low = self._calculate_futures_pnl(
- position['entry_price'], daily_prices['low'],
- position['quantity'], commodity, is_long=True
- )
- # 计算基于最高价的浮动盈亏(多头回升判断)
- floating_pnl_high = self._calculate_futures_pnl(
- position['entry_price'], daily_prices['high'],
- position['quantity'], commodity, is_long=True
- )
- base_positions_pnl_at_low += floating_pnl_low
- base_positions_pnl_at_high += floating_pnl_high
- base_positions_cost += position['entry_price'] * position['quantity']
- total_base_quantity += position['quantity']
-
- # 如果没有活跃的基础头寸,则不处理
- if total_base_quantity == 0:
- if self.verbose_logging:
- print(f" ⏭️ {current_date}: {commodity} 无未平仓基础头寸,跳过止盈检查 (数量: {total_base_quantity})")
- return
-
- # 计算平均成本价
- avg_cost_price = base_positions_cost / total_base_quantity
- daily_low = daily_prices['low']
- daily_high = daily_prices['high']
- daily_close = daily_prices['close']
-
- # 更新最大回撤(基于最低价的最大浮亏绝对值)
- if self.verbose_logging:
- print(f" 📊 {current_date}: {commodity} 基础头寸 - 平均成本价: {avg_cost_price:.2f}, 当日价格区间: [{daily_low:.2f}, {daily_high:.2f}], 收盘价: {daily_close:.2f}")
- print(f" 📊 {current_date}: {commodity} 基础头寸浮盈亏 - 最低价: {base_positions_pnl_at_low:.2f}, 最高价: {base_positions_pnl_at_high:.2f}, 当前最大回撤: {self.hedge_state[commodity]['max_drawdown']:.2f}")
-
- # 使用最低价计算的浮亏更新峰值回撤
- if base_positions_pnl_at_low < 0 and abs(base_positions_pnl_at_low) > self.hedge_state[commodity]['max_drawdown']:
- self.hedge_state[commodity]['max_drawdown'] = abs(base_positions_pnl_at_low)
- self.hedge_state[commodity]['max_drawdown_price'] = daily_low # 保存出现最大浮亏时的价格
- if self.verbose_logging:
- print(f" 📊 {current_date}: {commodity} 更新最大回撤: {self.hedge_state[commodity]['max_drawdown']:.2f} (最大浮亏价格: {daily_low:.2f})")
-
- max_drawdown = self.hedge_state[commodity]['max_drawdown']
-
- # 获取期货合约倍数
- contract_multiplier = self._get_futures_multiplier(commodity)
-
- # 计算一级止盈价格(使用配置参数)
- # 回升公式: 最大浮亏价格 + (最大浮亏 * 回升百分比) / (数量 * 合约倍数)
- level1_recovery_pct = self.config.HEDGE_LEVEL1_RECOVERY_PCT
- level1_tp_loss_change = max_drawdown * level1_recovery_pct # 一级回升后的剩余浮亏
- level1_tp_loss = max_drawdown - level1_tp_loss_change
- # 使用最大浮亏价格作为基准,如果从未有过浮亏则使用当日最低价作为备用
- max_drawdown_price = self.hedge_state[commodity]['max_drawdown_price'] if self.hedge_state[commodity]['max_drawdown_price'] > 0 else daily_low
- level1_tp_price = max_drawdown_price + level1_tp_loss_change / (total_base_quantity * contract_multiplier)
- print(f" 📈 {current_date}: {commodity} 一级止盈价格: {level1_tp_price:.2f}, 最大浮亏价格: {max_drawdown_price:.2f}, 剩余浮亏: {level1_tp_loss:.2f}")
-
- # 计算二级止盈价格1(使用配置参数)
- level2_recovery_pct = self.config.HEDGE_LEVEL2_RECOVERY_PCT
- level2_tp1_loss_change = max_drawdown * level2_recovery_pct # 二级回升后的剩余浮亏
- level2_tp1_loss = max_drawdown - level2_tp1_loss_change
- level2_tp1_price = max_drawdown_price + level2_tp1_loss_change / (total_base_quantity * contract_multiplier)
- print(f" 📈 {current_date}: {commodity} 二级止盈价格1: {level2_tp1_price:.2f}, 最大浮亏价格: {max_drawdown_price:.2f}, 剩余浮亏: {level2_tp1_loss:.2f}")
- # 计算二级止盈价格2(成本价附近 - 空头策略应该是上方)
- # 对于空头对冲,价格需要上升到成本价以上才触发止盈
- cost_area_pct = self.config.HEDGE_COST_AREA_PCT
- level2_tp2_price = avg_cost_price * (1 + cost_area_pct) # 成本价上方百分比
-
- # 取二级止盈价格中的较低者
- level2_tp_price = min(level2_tp1_price, level2_tp2_price)
- print(f" 📈 {current_date}: {commodity} 二级止盈价格: {level2_tp_price:.2f}, 一级: {level2_tp1_price:.2f}, 二级: {level2_tp2_price:.2f}")
- level2_trigger_reason_detail = f'回升{level2_recovery_pct:.0%}' if level2_tp_price == level2_tp1_price else f'成本价上方{cost_area_pct:.0%}'
-
- if self.verbose_logging:
- print(f" 📈 {current_date}: {commodity} 合约倍数: {contract_multiplier}, 总数量: {total_base_quantity}")
- print(f" 📈 {current_date}: {commodity} 止盈参数 - 一级回升: {level1_recovery_pct:.0%}, 二级回升: {level2_recovery_pct:.0%}, 成本区域: {cost_area_pct:.0%}")
- print(f" 📈 {current_date}: {commodity} 最大浮亏价格: {max_drawdown_price:.2f}, 平均成本价: {avg_cost_price:.2f}")
- print(f" 📈 {current_date}: {commodity} 止盈价格水平 - 一级: {level1_tp_price:.2f}, 二级: {level2_tp1_price:.2f}(回升) vs {level2_tp2_price:.2f}(成本区) -> {level2_tp_price:.2f} ({level2_trigger_reason_detail})")
- print(f" 📈 {current_date}: {commodity} 当日最高价: {daily_high:.2f}, 最大回撤: {max_drawdown:.2f}")
- print(f" 📈 {current_date}: {commodity} 价格计算详情 - 一级剩余浮亏: {level1_tp_loss:.2f}, 二级剩余浮亏: {level2_tp1_loss:.2f}")
-
- # 显示止盈触发判断的详细信息
- level1_reached = daily_high >= level1_tp_price
- level2_reached = daily_high >= level2_tp_price
- first_tp_status = '已触发' if self.hedge_state[commodity]['first_tp_triggered'] else '未触发'
-
- print(f" 🔍 {current_date}: {commodity} 止盈判断 - 一级触及: {level1_reached}, 二级触及: {level2_reached}, 一级状态: {first_tp_status}")
-
- # 一级止盈:当日最高价触及30%回升价格水平
- if not self.hedge_state[commodity]['first_tp_triggered'] and daily_high >= level1_tp_price:
- if self.verbose_logging:
- print(f" 💰 {current_date}: {commodity} 一级止盈触发 - 最高价 {daily_high:.2f} >= 一级止盈价 {level1_tp_price:.2f} (回升{level1_recovery_pct:.0%})")
- print(f" 多头浮盈亏区间: [{base_positions_pnl_at_low:.2f}, {base_positions_pnl_at_high:.2f}], 最大回撤: {max_drawdown:.2f}")
-
- # 标记一级止盈已触发
- self.hedge_state[commodity]['first_tp_triggered'] = True
-
- # 计算需要平仓的数量(50%对冲仓位,使用四舍五入并确保至少平1手)
- total_hedge_quantity = sum(pos[1]['quantity'] for pos in hedge_positions)
- target_close_quantity = max(1, round(total_hedge_quantity * 0.5))
-
- if self.verbose_logging:
- print(f" 一级止盈数量计算: 总对冲仓位 {total_hedge_quantity}手, 50%目标 {total_hedge_quantity * 0.5:.1f}手, 实际平仓 {target_close_quantity}手")
-
- self._close_hedge_positions(
- commodity, contract, current_date, daily_prices,
- hedge_positions, target_close_quantity,
- exit_reason='take_profit_level1',
- log_prefix='一级止盈(50%)',
- exit_price=level1_tp_price
- )
-
- # 重新收集剩余的对冲头寸(为了支持同日二级止盈)
- hedge_positions = []
- if commodity in self.active_positions['hedge_position']:
- for position_id, position in self.active_positions['hedge_position'][commodity].items():
- if position['contract'] == contract and position['status'] == 'open':
- hedge_positions.append((position_id, position))
-
- # 二级止盈:当日最高价触及二级止盈价格水平(改为独立判断,支持同日连续止盈)
- if self.hedge_state[commodity]['first_tp_triggered'] and daily_high >= level2_tp_price:
- if self.verbose_logging:
- print(f" 💰💰 {current_date}: {commodity} 二级止盈触发 - 最高价 {daily_high:.2f} >= 二级止盈价 {level2_tp_price:.2f} ({level2_trigger_reason_detail})")
- print(f" 多头浮盈亏区间: [{base_positions_pnl_at_low:.2f}, {base_positions_pnl_at_high:.2f}], 最大回撤: {max_drawdown:.2f}")
-
- # 平掉所有剩余的对冲仓位
- remaining_hedge_quantity = sum(pos[1]['quantity'] for pos in hedge_positions)
-
- if self.verbose_logging:
- print(f" 二级止盈数量计算: 剩余对冲仓位 {remaining_hedge_quantity}手, 全部平仓")
-
- if remaining_hedge_quantity > 0:
- self._close_hedge_positions(
- commodity, contract, current_date, daily_prices,
- hedge_positions, remaining_hedge_quantity,
- exit_reason='take_profit_level2',
- log_prefix='二级止盈(100%剩余)',
- exit_price=level2_tp_price
- )
- else:
- if self.verbose_logging:
- print(f" 二级止盈: 无剩余对冲仓位需要平仓")
-
- # 重置对冲状态
- if self.verbose_logging:
- print(f" 🔄 {current_date}: {commodity} 重置对冲状态,清零最大回撤记录和最大浮亏价格")
- self.hedge_state[commodity]['first_tp_triggered'] = False
- self.hedge_state[commodity]['max_drawdown'] = 0
- self.hedge_state[commodity]['max_drawdown_price'] = 0
- # 检查最终的对冲仓位状态,显示等待信息
- if self.verbose_logging:
- # 重新检查剩余对冲仓位
- final_hedge_positions = []
- if commodity in self.active_positions['hedge_position']:
- for position_id, position in self.active_positions['hedge_position'][commodity].items():
- if position['contract'] == contract and position['status'] == 'open':
- final_hedge_positions.append((position_id, position))
-
- remaining_quantity = sum(pos[1]['quantity'] for pos in final_hedge_positions)
-
- if remaining_quantity > 0:
- if self.hedge_state[commodity]['first_tp_triggered']:
- gap = level2_tp_price - daily_high
- print(f" ⏸️ {current_date}: {commodity} 剩余对冲仓位 {remaining_quantity}手,等待二级止盈(需要 {level2_tp_price:.2f},当前最高价 {daily_high:.2f},差距 {gap:.2f})")
- else:
- gap = level1_tp_price - daily_high
- print(f" ⏸️ {current_date}: {commodity} 剩余对冲仓位 {remaining_quantity}手,等待一级止盈(需要 {level1_tp_price:.2f},当前最高价 {daily_high:.2f},差距 {gap:.2f})")
- else:
- print(f" ✅ {current_date}: {commodity} 所有对冲仓位已平仓完毕")
-
- def _close_hedge_positions(self, commodity, contract, current_date, daily_prices,
- hedge_positions, target_close_quantity, exit_reason, log_prefix, exit_price=None):
- """
- 平掉指定数量的对冲头寸
-
- 参数:
- commodity: 商品代码
- contract: 合约代码
- current_date: 当前日期
- daily_prices: 当日价格数据
- hedge_positions: 对冲头寸列表 [(position_id, position), ...]
- target_close_quantity: 目标平仓数量
- exit_reason: 退出原因
- log_prefix: 日志前缀
- exit_price: 平仓价格,如果为None则使用收盘价
- """
- if exit_price is None:
- exit_price = daily_prices['close']
- closed_quantity = 0
-
- for position_id, position in hedge_positions:
- if closed_quantity >= target_close_quantity:
- break
-
- # 计算本次平仓数量
- quantity_to_close = min(position['quantity'], target_close_quantity - closed_quantity)
-
- # 如果部分平仓
- if quantity_to_close < position['quantity']:
- # 创建部分平仓的交易记录
- profit_loss = self._calculate_futures_pnl(
- position['entry_price'], exit_price, quantity_to_close, commodity, is_long=False
- )
- profit_loss_pct = (position['entry_price'] - exit_price) / position['entry_price']
-
- trade_record = {
- 'commodity': commodity,
- 'contract': contract,
- 'strategy': 'hedge_position',
- 'entry_date': position['entry_date'],
- 'exit_date': current_date.strftime('%Y-%m-%d'),
- 'entry_price': position['entry_price'],
- 'exit_price': exit_price,
- 'quantity': quantity_to_close,
- 'profit_loss': profit_loss,
- 'profit_loss_pct': profit_loss_pct,
- 'days_held': (current_date - datetime.datetime.strptime(position['entry_date'], '%Y-%m-%d').date()).days,
- 'exit_reason': exit_reason,
- 'is_short': True
- }
-
- self.trading_results['hedge_position'].append(trade_record)
-
- # 更新头寸数量
- self.active_positions['hedge_position'][commodity][position_id]['quantity'] -= quantity_to_close
-
- if self.verbose_logging:
- remaining = position['quantity'] - quantity_to_close
- print(f" {log_prefix} 部分平仓: {position['entry_price']:.2f} -> {exit_price:.2f}, "
- f"平仓{quantity_to_close}手, 剩余{remaining}手, 盈亏: {profit_loss:.2f}")
-
- closed_quantity += quantity_to_close
- else:
- # 全部平仓
- profit_loss = self._calculate_futures_pnl(
- position['entry_price'], exit_price, position['quantity'], commodity, is_long=False
- )
- profit_loss_pct = (position['entry_price'] - exit_price) / position['entry_price']
-
- trade_record = {
- 'commodity': commodity,
- 'contract': contract,
- 'strategy': 'hedge_position',
- 'entry_date': position['entry_date'],
- 'exit_date': current_date.strftime('%Y-%m-%d'),
- 'entry_price': position['entry_price'],
- 'exit_price': exit_price,
- 'quantity': position['quantity'],
- 'profit_loss': profit_loss,
- 'profit_loss_pct': profit_loss_pct,
- 'days_held': (current_date - datetime.datetime.strptime(position['entry_date'], '%Y-%m-%d').date()).days,
- 'exit_reason': exit_reason,
- 'is_short': True
- }
-
- self.trading_results['hedge_position'].append(trade_record)
- self.active_positions['hedge_position'][commodity][position_id]['status'] = 'closed'
-
- if self.verbose_logging:
- print(f" {log_prefix} 全部平仓: {position['entry_price']:.2f} -> {exit_price:.2f}, "
- f"数量{position['quantity']}手, 盈亏: {profit_loss:.2f}")
-
- closed_quantity += position['quantity']
-
- # 更新已平仓的对冲数量
- self.hedge_state[commodity]['hedge_quantity_closed'] += closed_quantity
-
- def _check_hedge_stop_loss(self, commodity, contract, current_date, daily_prices):
- """
- 检查对冲头寸硬性止损条件
-
- 对于空头对冲头寸,当价格上涨导致浮动亏损超过阈值时立即平仓。
-
- 止损逻辑:
- - 空头头寸亏损 = 当前价格 > 入场价格
- - 亏损百分比 = (当前价格 - 入场价格) / 入场价格
- - 触发条件:亏损百分比 > HEDGE_STOPLOSS_PCT
-
- 参数:
- commodity: 商品代码
- contract: 合约代码
- current_date: 当前日期
- daily_prices: 当日价格数据
- """
- if not self.config.HEDGE_ENABLED:
- return
-
- if commodity not in self.active_positions['hedge_position']:
- return
-
- positions_to_close = []
- current_price = daily_prices['close']
-
- for position_id, position in self.active_positions['hedge_position'][commodity].items():
- if position['contract'] == contract and position['status'] == 'open':
- entry_price = position['entry_price']
- grid_price_level = position.get('original_price_level', entry_price)
-
- # 对于空头头寸的亏损百分比计算
- # 价格上涨 = 亏损,价格下跌 = 盈利
- # 亏损百分比 = (当前价格 - 入场价格) / 入场价格,因为对冲是做空,所以价格越低是收益越大,所以应该乘以-1
- price_change_pct = - (current_price - entry_price) / entry_price
- if self.verbose_logging:
- fill_price = position.get('fill_price', entry_price)
- print(f" 📊 {current_date}: {commodity} 当前价格: {current_price:.2f}, 网格价: {grid_price_level:.2f}, 记录开仓价: {entry_price:.2f}, 实际成交价: {fill_price:.2f}, 对冲头寸浮盈亏比例: {price_change_pct:.4f}")
- # 只有当价格上涨(亏损)且超过止损阈值时才触发止损
- if price_change_pct > self.config.HEDGE_STOPLOSS_PCT:
- # 计算实际盈亏金额(用于日志显示)
- floating_pnl = self._calculate_futures_pnl(
- entry_price, current_price,
- position['quantity'], commodity, is_long=False
- )
- positions_to_close.append((position_id, position, floating_pnl, price_change_pct))
-
- # 平掉触发止损的头寸
- for position_id, position, floating_pnl, loss_pct in positions_to_close:
- exit_price = daily_prices['close']
-
- profit_loss = self._calculate_futures_pnl(
- position['entry_price'], exit_price, position['quantity'], commodity, is_long=False
- )
- profit_loss_pct = (position['entry_price'] - exit_price) / position['entry_price']
-
- trade_record = {
- 'commodity': commodity,
- 'contract': contract,
- 'strategy': 'hedge_position',
- 'entry_date': position['entry_date'],
- 'exit_date': current_date.strftime('%Y-%m-%d'),
- 'entry_price': position['entry_price'],
- 'exit_price': exit_price,
- 'quantity': position['quantity'],
- 'profit_loss': profit_loss,
- 'profit_loss_pct': profit_loss_pct,
- 'days_held': (current_date - datetime.datetime.strptime(position['entry_date'], '%Y-%m-%d').date()).days,
- 'exit_reason': 'stop_loss_hard', # 硬性止损
- 'is_short': True
- }
-
- self.trading_results['hedge_position'].append(trade_record)
- self.active_positions['hedge_position'][commodity][position_id]['status'] = 'closed'
-
- if self.verbose_logging:
- grid_price_level = position.get('original_price_level', position['entry_price'])
- fill_price = position.get('fill_price', position['entry_price'])
- print(f" 🛑 {current_date}: {commodity} 对冲硬性止损触发 - "
- f"网格价@{grid_price_level:.2f}, 记录开仓@{position['entry_price']:.2f}, 实际成交@{fill_price:.2f}, 当前@{exit_price:.2f}, "
- f"价格上涨{loss_pct:.2%} > 阈值{self.config.HEDGE_STOPLOSS_PCT:.2%}, "
- f"平仓盈亏:{profit_loss:.2f}")
-
- def _extend_price_data_for_ma(self, commodity, contract, current_date, required_days=30):
- """扩展价格数据以满足MA计算需求
-
- 参数:
- commodity: 商品代码
- contract: 合约代码
- current_date: 当前日期
- required_days: 所需的最少数据天数
-
- 返回:
- 扩展后的价格数据DataFrame,如果获取失败则返回None
- """
- cache_key = f"{commodity}_{contract}"
-
- # 检查缓存
- if cache_key in self.ma_extended_data_cache:
- cached_data = self.ma_extended_data_cache[cache_key]
- target_date = current_date if isinstance(current_date, datetime.date) else current_date.date()
- historical_data = cached_data[cached_data.index.date <= target_date]
-
- if len(historical_data) >= required_days:
- if self.verbose_logging:
- print(f" ℹ️ MA过滤器:使用缓存的扩展数据,共{len(historical_data)}天")
- return historical_data
-
- # 获取现有数据
- if commodity not in self.price_data or contract not in self.price_data[commodity]:
- return None
-
- existing_data = self.price_data[commodity][contract]
- target_date = current_date if isinstance(current_date, datetime.date) else current_date.date()
- existing_historical = existing_data[existing_data.index.date <= target_date]
-
- if len(existing_historical) >= required_days:
- return existing_historical
-
- # 数据不足,需要扩展获取
- existing_count = len(existing_historical)
- shortage = required_days - existing_count
-
- if self.verbose_logging:
- print(f" 📊 MA过滤器:数据不足,开始扩展获取(当前{existing_count}天,需要{required_days}天,缺少{shortage}天)")
-
- # 计算扩展的开始日期:从现有数据最早日期往前推至少30个交易日
- if len(existing_data) > 0:
- earliest_date = existing_data.index.min().date()
- else:
- earliest_date = target_date
-
- # 往前推60个自然日(约等于40-45个交易日,提供充足缓冲)
- extended_start_date = earliest_date - datetime.timedelta(days=60)
- extended_end_date = target_date
-
- if self.verbose_logging:
- print(f" 📊 扩展日期范围: {extended_start_date} 至 {extended_end_date}")
-
- try:
- # 获取扩展的价格数据
- extended_data = get_price(
- contract,
- start_date=extended_start_date,
- end_date=extended_end_date,
- frequency='daily',
- fields=['open', 'close', 'high', 'low', 'volume'],
- skip_paused=False,
- panel=False
- )
-
- if extended_data is None or len(extended_data) == 0:
- if self.verbose_logging:
- print(f" ⚠️ 扩展数据获取失败:未获取到数据")
- return existing_historical
-
- # 合并数据:将扩展数据与现有数据合并,保留所有日期的数据
- # 使用concat合并,然后去重并按日期排序
- combined_data = pd.concat([existing_data, extended_data])
- combined_data = combined_data[~combined_data.index.duplicated(keep='first')]
- combined_data = combined_data.sort_index()
-
- # 过滤到当前日期(仅用于返回给MA计算)
- combined_historical = combined_data[combined_data.index.date <= target_date]
-
- if self.verbose_logging:
- print(f" ✅ 扩展数据获取成功:从{len(existing_historical)}天扩展到{len(combined_historical)}天")
- print(f" 合并后完整数据范围:{combined_data.index.min().date()} 至 {combined_data.index.max().date()}(共{len(combined_data)}天)")
-
- # 缓存扩展后的完整数据
- self.ma_extended_data_cache[cache_key] = combined_data
-
- # 更新主price_data,保留原有数据和新扩展的数据
- self.price_data[commodity][contract] = combined_data
-
- return combined_historical
-
- except Exception as e:
- if self.verbose_logging:
- print(f" ⚠️ 扩展数据获取异常:{str(e)}")
- return existing_historical
-
- def simulate_base_position_trading(self):
- """
- 模拟基础头寸交易
- 为每种商品配置价格-数量网格,以指定价格水平和数量开立多头头寸
- 所有头寸使用统一的退出价格(无止损)
- """
- if self.verbose_logging:
- print("\n=== 步骤3: 基础头寸交易模拟 ===")
-
- base_position_results = []
-
- for commodity in self.price_data.keys():
- if commodity not in self.base_position_grid:
- continue
-
- price_grid = self.base_position_grid[commodity]
- exit_price = self.base_position_exit_price[commodity]
- price_data = self.price_data[commodity]['data']
- contract = self.price_data[commodity]['contract']
-
- if self.verbose_logging:
- print(f"\n分析 {commodity} ({contract}) 基础头寸交易")
- print(f"价格网格: {price_grid}")
- print(f"退出价格: {exit_price}")
-
- # 遍历每个价格水平
- for entry_price, quantity in price_grid.items():
- # 查找触发入场的日期
- entry_dates = []
- for date, row in price_data.iterrows():
- if row['low'] <= entry_price <= row['high']:
- entry_dates.append(date)
-
- if not entry_dates:
- continue
-
- # 使用第一个触发日期作为入场点
- entry_date = entry_dates[0]
-
- # 查找退出点
- exit_date = None
- exit_price_actual = exit_price
-
- # 在入场后查找价格达到退出价格的日期
- for date, row in price_data.iterrows():
- if date > entry_date and row['high'] >= exit_price:
- exit_date = date
- exit_price_actual = min(exit_price, row['high'])
- break
-
- # 如果没有达到退出价格,使用最后一日的收盘价退出
- if exit_date is None:
- exit_date = price_data.index[-1]
- exit_price_actual = price_data.iloc[-1]['close']
-
- # 计算盈亏
- profit_loss = (exit_price_actual - entry_price) * quantity
- profit_loss_pct = (exit_price_actual - entry_price) / entry_price
-
- trade_record = {
- 'commodity': commodity,
- 'contract': contract,
- 'strategy': 'base_position',
- 'entry_date': entry_date.strftime('%Y-%m-%d'),
- 'exit_date': exit_date.strftime('%Y-%m-%d'),
- 'entry_price': entry_price,
- 'exit_price': exit_price_actual,
- 'quantity': quantity,
- 'profit_loss': profit_loss,
- 'profit_loss_pct': profit_loss_pct,
- 'days_held': (exit_date - entry_date).days
- }
-
- base_position_results.append(trade_record)
-
- if self.verbose_logging:
- print(f" 入场: {entry_date.strftime('%Y-%m-%d')} @ {entry_price}, 数量: {quantity}")
- print(f" 出场: {exit_date.strftime('%Y-%m-%d')} @ {exit_price_actual:.2f}")
- print(f" 盈亏: {profit_loss:.2f} ({profit_loss_pct:.2%})")
-
- self.trading_results['base_position'] = base_position_results
-
- if self.verbose_logging:
- print(f"\n基础头寸交易模拟完成,共{len(base_position_results)}笔交易")
-
- return base_position_results
-
- def simulate_grid_trading(self):
- """
- 模拟网格交易策略
- 从start_price开始,每次价格下降grid_size时买入quantity_per_grid
- 当价格从入场价格上涨exit_grid_size时退出
- """
- if self.verbose_logging:
- print("\n=== 步骤4: 网格交易策略模拟 ===")
-
- grid_trading_results = []
-
- for commodity in self.price_data.keys():
- if commodity not in self.grid_trading_config:
- continue
-
- config = self.grid_trading_config[commodity]
- start_price = config['start_price']
- grid_size = config['grid_size']
- quantity_per_grid = config['quantity_per_grid']
- exit_grid_size = config['exit_grid_size']
-
- price_data = self.price_data[commodity]['data']
- contract = self.price_data[commodity]['contract']
-
- if self.verbose_logging:
- print(f"\n分析 {commodity} ({contract}) 网格交易")
- print(f"起始价格: {start_price}, 网格大小: {grid_size}")
- print(f"每网格数量: {quantity_per_grid}, 退出网格大小: {exit_grid_size}")
-
- # 生成网格价格水平
- grid_levels = []
- current_level = start_price
- min_price = price_data['low'].min()
-
- while current_level > min_price:
- grid_levels.append(current_level)
- current_level -= grid_size
-
- # 模拟每个网格水平的交易
- for entry_price in grid_levels:
- exit_price = entry_price + exit_grid_size
-
- # 查找入场机会
- entry_date = None
- for date, row in price_data.iterrows():
- if row['low'] <= entry_price <= row['high']:
- entry_date = date
- break
-
- if entry_date is None:
- continue
-
- # 查找退出机会
- exit_date = None
- exit_price_actual = exit_price
-
- for date, row in price_data.iterrows():
- if date > entry_date and row['high'] >= exit_price:
- exit_date = date
- exit_price_actual = exit_price
- break
-
- # 如果没有达到退出价格,使用最后一日收盘价
- if exit_date is None:
- exit_date = price_data.index[-1]
- exit_price_actual = price_data.iloc[-1]['close']
-
- # 计算盈亏
- profit_loss = (exit_price_actual - entry_price) * quantity_per_grid
- profit_loss_pct = (exit_price_actual - entry_price) / entry_price
-
- trade_record = {
- 'commodity': commodity,
- 'contract': contract,
- 'strategy': 'grid_trading',
- 'entry_date': entry_date.strftime('%Y-%m-%d'),
- 'exit_date': exit_date.strftime('%Y-%m-%d'),
- 'entry_price': entry_price,
- 'exit_price': exit_price_actual,
- 'quantity': quantity_per_grid,
- 'profit_loss': profit_loss,
- 'profit_loss_pct': profit_loss_pct,
- 'days_held': (exit_date - entry_date).days
- }
-
- grid_trading_results.append(trade_record)
-
- if self.verbose_logging:
- print(f" 网格 {entry_price}: {entry_date.strftime('%Y-%m-%d')} -> {exit_date.strftime('%Y-%m-%d')}")
- print(f" 盈亏: {profit_loss:.2f} ({profit_loss_pct:.2%})")
-
- self.trading_results['grid_trading'] = grid_trading_results
-
- if self.verbose_logging:
- print(f"\n网格交易模拟完成,共{len(grid_trading_results)}笔交易")
-
- return grid_trading_results
-
- def calculate_performance_statistics(self):
- """
- 计算多品种多级聚合的性能统计
- 包括品种-策略级、品种级、策略级和总体级统计
- """
- if self.verbose_logging:
- print("\n=== 步骤6: 多级性能统计分析 ===")
-
- # 多级统计结构
- performance_stats = {
- 'by_commodity_strategy': {}, # 品种-策略级统计
- 'by_commodity': {}, # 品种级汇总
- 'by_strategy': {}, # 策略级汇总
- 'overall': {} # 总体汇总
- }
-
- if self.verbose_logging:
- print("\n--- 第一级:品种-策略级统计 ---")
-
- # 第一步:计算品种-策略级统计
- for strategy_name, results in self.trading_results.items():
- if strategy_name not in performance_stats['by_commodity_strategy']:
- performance_stats['by_commodity_strategy'][strategy_name] = {}
-
- # 按品种分组交易结果
- commodity_results = {}
- for result in results:
- commodity = result['commodity']
- if commodity not in commodity_results:
- commodity_results[commodity] = []
- commodity_results[commodity].append(result)
-
- # 为每个品种计算统计
- for commodity in self.core_commodities.keys():
- comm_results = commodity_results.get(commodity, [])
- stats = self._calculate_single_strategy_stats(strategy_name, comm_results, commodity)
- performance_stats['by_commodity_strategy'][strategy_name][commodity] = stats
-
- if self.verbose_logging:
- print(f"\n{commodity}-{strategy_name} 策略统计:")
- self._print_strategy_stats(stats)
-
- if self.verbose_logging:
- print("\n--- 第二级:品种级汇总统计 ---")
-
- # 第二步:计算品种级汇总统计
- for commodity in self.core_commodities.keys():
- commodity_stats = self._calculate_commodity_summary(commodity, performance_stats['by_commodity_strategy'])
- performance_stats['by_commodity'][commodity] = commodity_stats
-
- if self.verbose_logging:
- print(f"\n{commodity} 品种汇总统计:")
- self._print_strategy_stats(commodity_stats)
-
- if self.verbose_logging:
- print("\n--- 第三级:策略级汇总统计 ---")
-
- # 第三步:计算策略级汇总统计
- for strategy_name in self.trading_results.keys():
- strategy_stats = self._calculate_strategy_summary(strategy_name, performance_stats['by_commodity_strategy'])
- performance_stats['by_strategy'][strategy_name] = strategy_stats
-
- if self.verbose_logging:
- print(f"\n{strategy_name} 策略汇总统计:")
- self._print_strategy_stats(strategy_stats)
-
- if self.verbose_logging:
- print("\n--- 第四级:整体汇总统计 ---")
-
- # 第四步:计算总体统计
- overall_stats = self._calculate_overall_summary(performance_stats['by_strategy'])
- performance_stats['overall'] = overall_stats
-
- if self.verbose_logging:
- print(f"\n整体汇总统计:")
- self._print_strategy_stats(overall_stats)
-
- # 添加对冲策略专项统计
- if self.config.HEDGE_ENABLED:
- hedge_special_stats = self._calculate_hedge_special_stats()
- performance_stats['hedge_special_stats'] = hedge_special_stats
-
- if self.verbose_logging:
- print(f"\n--- 对冲策略专项统计 ---")
- self._print_hedge_special_stats(hedge_special_stats)
-
- return performance_stats
-
- def _calculate_single_strategy_stats(self, strategy_name, results, commodity):
- """计算单个策略在特定品种下的统计数据"""
- # 计算已平仓交易的盈亏
- closed_profit_loss = sum(r['profit_loss'] for r in results) if results else 0.0
-
- # 计算未平仓头寸的未实现盈亏(特定品种)
- unrealized_profit_loss = self._calculate_unrealized_pnl_for_commodity(strategy_name, commodity)
-
- # 总盈亏 = 已实现盈亏 + 未实现盈亏
- total_profit_loss = closed_profit_loss + unrealized_profit_loss
-
- if not results and unrealized_profit_loss == 0:
- return {
- 'total_trades': 0,
- 'open_positions': 0,
- 'profitable_trades': 0,
- 'losing_trades': 0,
- 'win_rate': 0.0,
- 'closed_profit_loss': 0.0,
- 'unrealized_profit_loss': 0.0,
- 'total_profit_loss': 0.0,
- 'avg_profit_loss': 0.0,
- 'avg_profit_loss_pct': 0.0,
- 'max_profit': 0.0,
- 'max_loss': 0.0,
- 'avg_holding_days': 0.0,
- 'profit_factor': 0.0
- }
-
- # 基本统计
- total_trades = len(results)
- open_positions = self._count_open_positions_for_commodity(strategy_name, commodity)
- profitable_trades = sum(1 for r in results if r['profit_loss'] > 0)
- losing_trades = sum(1 for r in results if r['profit_loss'] < 0)
- win_rate = profitable_trades / total_trades if total_trades > 0 else 0
-
- # 平均盈亏(基于已平仓交易)
- avg_profit_loss = closed_profit_loss / total_trades if total_trades > 0 else 0
- avg_profit_loss_pct = sum(r['profit_loss_pct'] for r in results) / total_trades if total_trades > 0 else 0
-
- # 最大盈亏(基于已平仓交易)
- profit_losses = [r['profit_loss'] for r in results]
- max_profit = max(profit_losses) if profit_losses else 0
- max_loss = min(profit_losses) if profit_losses else 0
-
- # 平均持有天数
- avg_holding_days = sum(r['days_held'] for r in results) / total_trades if total_trades > 0 else 0
-
- # 盈亏比(基于已平仓交易)
- total_profits = sum(r['profit_loss'] for r in results if r['profit_loss'] > 0)
- total_losses = abs(sum(r['profit_loss'] for r in results if r['profit_loss'] < 0))
- profit_factor = total_profits / total_losses if total_losses > 0 else float('inf') if total_profits > 0 else 0
-
- return {
- 'total_trades': total_trades,
- 'open_positions': open_positions,
- 'profitable_trades': profitable_trades,
- 'losing_trades': losing_trades,
- 'win_rate': win_rate,
- 'closed_profit_loss': closed_profit_loss,
- 'unrealized_profit_loss': unrealized_profit_loss,
- 'total_profit_loss': total_profit_loss,
- 'avg_profit_loss': avg_profit_loss,
- 'avg_profit_loss_pct': avg_profit_loss_pct,
- 'max_profit': max_profit,
- 'max_loss': max_loss,
- 'avg_holding_days': avg_holding_days,
- 'profit_factor': profit_factor
- }
-
- def _print_strategy_stats(self, stats):
- """打印策略统计信息"""
- print(f" 已平仓交易: {stats['total_trades']}")
- print(f" 未平仓头寸: {stats['open_positions']}")
- print(f" 盈利交易: {stats['profitable_trades']}")
- print(f" 亏损交易: {stats['losing_trades']}")
- print(f" 胜率: {stats['win_rate']:.2%}")
- print(f" 已实现盈亏: {stats['closed_profit_loss']:.2f}")
- print(f" 未实现盈亏: {stats['unrealized_profit_loss']:.2f}")
- print(f" 总盈亏: {stats['total_profit_loss']:.2f}")
- print(f" 平均盈亏: {stats['avg_profit_loss']:.2f}")
- print(f" 平均盈亏率: {stats['avg_profit_loss_pct']:.2%}")
- print(f" 最大盈利: {stats['max_profit']:.2f}")
- print(f" 最大亏损: {stats['max_loss']:.2f}")
- print(f" 平均持有天数: {stats['avg_holding_days']:.1f}")
- profit_factor_str = f"{stats['profit_factor']:.2f}" if stats['profit_factor'] != float('inf') else "∞"
- print(f" 盈亏比: {profit_factor_str}")
-
- def _calculate_commodity_summary(self, commodity, by_commodity_strategy):
- """计算品种级汇总统计"""
- total_stats = {
- 'total_trades': 0,
- 'open_positions': 0,
- 'profitable_trades': 0,
- 'losing_trades': 0,
- 'closed_profit_loss': 0.0,
- 'unrealized_profit_loss': 0.0,
- 'total_profit_loss': 0.0,
- 'max_profit': 0.0,
- 'max_loss': 0.0,
- 'total_holding_days': 0.0,
- 'total_profits': 0.0,
- 'total_losses': 0.0,
- 'all_pct': []
- }
-
- for strategy_name, commodity_stats in by_commodity_strategy.items():
- if commodity in commodity_stats:
- stats = commodity_stats[commodity]
- total_stats['total_trades'] += stats['total_trades']
- total_stats['open_positions'] += stats['open_positions']
- total_stats['profitable_trades'] += stats['profitable_trades']
- total_stats['losing_trades'] += stats['losing_trades']
- total_stats['closed_profit_loss'] += stats['closed_profit_loss']
- total_stats['unrealized_profit_loss'] += stats['unrealized_profit_loss']
- total_stats['total_profit_loss'] += stats['total_profit_loss']
- total_stats['max_profit'] = max(total_stats['max_profit'], stats['max_profit'])
- total_stats['max_loss'] = min(total_stats['max_loss'], stats['max_loss'])
- total_stats['total_holding_days'] += stats['avg_holding_days'] * stats['total_trades']
-
- if stats['profit_factor'] != float('inf'):
- profits = stats['closed_profit_loss'] if stats['closed_profit_loss'] > 0 else 0
- losses = abs(stats['closed_profit_loss']) if stats['closed_profit_loss'] < 0 else 0
- total_stats['total_profits'] += profits
- total_stats['total_losses'] += losses
-
- # 收集所有百分比数据
- if stats['total_trades'] > 0:
- total_stats['all_pct'].extend([stats['avg_profit_loss_pct']] * stats['total_trades'])
-
- # 计算汇总指标
- win_rate = total_stats['profitable_trades'] / total_stats['total_trades'] if total_stats['total_trades'] > 0 else 0
- avg_profit_loss = total_stats['closed_profit_loss'] / total_stats['total_trades'] if total_stats['total_trades'] > 0 else 0
- avg_profit_loss_pct = sum(total_stats['all_pct']) / len(total_stats['all_pct']) if total_stats['all_pct'] else 0
- avg_holding_days = total_stats['total_holding_days'] / total_stats['total_trades'] if total_stats['total_trades'] > 0 else 0
- profit_factor = total_stats['total_profits'] / total_stats['total_losses'] if total_stats['total_losses'] > 0 else float('inf') if total_stats['total_profits'] > 0 else 0
-
- return {
- 'total_trades': total_stats['total_trades'],
- 'open_positions': total_stats['open_positions'],
- 'profitable_trades': total_stats['profitable_trades'],
- 'losing_trades': total_stats['losing_trades'],
- 'win_rate': win_rate,
- 'closed_profit_loss': total_stats['closed_profit_loss'],
- 'unrealized_profit_loss': total_stats['unrealized_profit_loss'],
- 'total_profit_loss': total_stats['total_profit_loss'],
- 'avg_profit_loss': avg_profit_loss,
- 'avg_profit_loss_pct': avg_profit_loss_pct,
- 'max_profit': total_stats['max_profit'],
- 'max_loss': total_stats['max_loss'],
- 'avg_holding_days': avg_holding_days,
- 'profit_factor': profit_factor
- }
-
- def _calculate_strategy_summary(self, strategy_name, by_commodity_strategy):
- """计算策略级汇总统计"""
- if strategy_name not in by_commodity_strategy:
- return self._get_empty_stats()
-
- strategy_stats = by_commodity_strategy[strategy_name]
-
- total_stats = {
- 'total_trades': 0,
- 'open_positions': 0,
- 'profitable_trades': 0,
- 'losing_trades': 0,
- 'closed_profit_loss': 0.0,
- 'unrealized_profit_loss': 0.0,
- 'total_profit_loss': 0.0,
- 'max_profit': 0.0,
- 'max_loss': 0.0,
- 'total_holding_days': 0.0,
- 'total_profits': 0.0,
- 'total_losses': 0.0,
- 'all_pct': []
- }
-
- for commodity, stats in strategy_stats.items():
- total_stats['total_trades'] += stats['total_trades']
- total_stats['open_positions'] += stats['open_positions']
- total_stats['profitable_trades'] += stats['profitable_trades']
- total_stats['losing_trades'] += stats['losing_trades']
- total_stats['closed_profit_loss'] += stats['closed_profit_loss']
- total_stats['unrealized_profit_loss'] += stats['unrealized_profit_loss']
- total_stats['total_profit_loss'] += stats['total_profit_loss']
- total_stats['max_profit'] = max(total_stats['max_profit'], stats['max_profit'])
- total_stats['max_loss'] = min(total_stats['max_loss'], stats['max_loss'])
- total_stats['total_holding_days'] += stats['avg_holding_days'] * stats['total_trades']
-
- if stats['profit_factor'] != float('inf'):
- profits = stats['closed_profit_loss'] if stats['closed_profit_loss'] > 0 else 0
- losses = abs(stats['closed_profit_loss']) if stats['closed_profit_loss'] < 0 else 0
- total_stats['total_profits'] += profits
- total_stats['total_losses'] += losses
-
- # 收集所有百分比数据
- if stats['total_trades'] > 0:
- total_stats['all_pct'].extend([stats['avg_profit_loss_pct']] * stats['total_trades'])
-
- # 计算汇总指标
- win_rate = total_stats['profitable_trades'] / total_stats['total_trades'] if total_stats['total_trades'] > 0 else 0
- avg_profit_loss = total_stats['closed_profit_loss'] / total_stats['total_trades'] if total_stats['total_trades'] > 0 else 0
- avg_profit_loss_pct = sum(total_stats['all_pct']) / len(total_stats['all_pct']) if total_stats['all_pct'] else 0
- avg_holding_days = total_stats['total_holding_days'] / total_stats['total_trades'] if total_stats['total_trades'] > 0 else 0
- profit_factor = total_stats['total_profits'] / total_stats['total_losses'] if total_stats['total_losses'] > 0 else float('inf') if total_stats['total_profits'] > 0 else 0
-
- return {
- 'total_trades': total_stats['total_trades'],
- 'open_positions': total_stats['open_positions'],
- 'profitable_trades': total_stats['profitable_trades'],
- 'losing_trades': total_stats['losing_trades'],
- 'win_rate': win_rate,
- 'closed_profit_loss': total_stats['closed_profit_loss'],
- 'unrealized_profit_loss': total_stats['unrealized_profit_loss'],
- 'total_profit_loss': total_stats['total_profit_loss'],
- 'avg_profit_loss': avg_profit_loss,
- 'avg_profit_loss_pct': avg_profit_loss_pct,
- 'max_profit': total_stats['max_profit'],
- 'max_loss': total_stats['max_loss'],
- 'avg_holding_days': avg_holding_days,
- 'profit_factor': profit_factor
- }
-
- def _calculate_overall_summary(self, by_strategy):
- """计算总体汇总统计"""
- total_stats = {
- 'total_trades': 0,
- 'open_positions': 0,
- 'profitable_trades': 0,
- 'losing_trades': 0,
- 'closed_profit_loss': 0.0,
- 'unrealized_profit_loss': 0.0,
- 'total_profit_loss': 0.0,
- 'max_profit': 0.0,
- 'max_loss': 0.0,
- 'total_holding_days': 0.0,
- 'total_profits': 0.0,
- 'total_losses': 0.0,
- 'all_pct': []
- }
-
- for strategy_name, stats in by_strategy.items():
- total_stats['total_trades'] += stats['total_trades']
- total_stats['open_positions'] += stats['open_positions']
- total_stats['profitable_trades'] += stats['profitable_trades']
- total_stats['losing_trades'] += stats['losing_trades']
- total_stats['closed_profit_loss'] += stats['closed_profit_loss']
- total_stats['unrealized_profit_loss'] += stats['unrealized_profit_loss']
- total_stats['total_profit_loss'] += stats['total_profit_loss']
- total_stats['max_profit'] = max(total_stats['max_profit'], stats['max_profit'])
- total_stats['max_loss'] = min(total_stats['max_loss'], stats['max_loss'])
- total_stats['total_holding_days'] += stats['avg_holding_days'] * stats['total_trades']
-
- if stats['profit_factor'] != float('inf'):
- profits = stats['closed_profit_loss'] if stats['closed_profit_loss'] > 0 else 0
- losses = abs(stats['closed_profit_loss']) if stats['closed_profit_loss'] < 0 else 0
- total_stats['total_profits'] += profits
- total_stats['total_losses'] += losses
-
- # 收集所有百分比数据
- if stats['total_trades'] > 0:
- total_stats['all_pct'].extend([stats['avg_profit_loss_pct']] * stats['total_trades'])
-
- # 计算汇总指标
- win_rate = total_stats['profitable_trades'] / total_stats['total_trades'] if total_stats['total_trades'] > 0 else 0
- avg_profit_loss = total_stats['closed_profit_loss'] / total_stats['total_trades'] if total_stats['total_trades'] > 0 else 0
- avg_profit_loss_pct = sum(total_stats['all_pct']) / len(total_stats['all_pct']) if total_stats['all_pct'] else 0
- avg_holding_days = total_stats['total_holding_days'] / total_stats['total_trades'] if total_stats['total_trades'] > 0 else 0
- profit_factor = total_stats['total_profits'] / total_stats['total_losses'] if total_stats['total_losses'] > 0 else float('inf') if total_stats['total_profits'] > 0 else 0
-
- return {
- 'total_trades': total_stats['total_trades'],
- 'open_positions': total_stats['open_positions'],
- 'profitable_trades': total_stats['profitable_trades'],
- 'losing_trades': total_stats['losing_trades'],
- 'win_rate': win_rate,
- 'closed_profit_loss': total_stats['closed_profit_loss'],
- 'unrealized_profit_loss': total_stats['unrealized_profit_loss'],
- 'total_profit_loss': total_stats['total_profit_loss'],
- 'avg_profit_loss': avg_profit_loss,
- 'avg_profit_loss_pct': avg_profit_loss_pct,
- 'max_profit': total_stats['max_profit'],
- 'max_loss': total_stats['max_loss'],
- 'avg_holding_days': avg_holding_days,
- 'profit_factor': profit_factor
- }
-
- def _calculate_hedge_special_stats(self):
- """
- 计算对冲策略专项统计
-
- 返回对冲策略的详细统计信息,包括:
- - 止盈/止损事件统计
- - 波动率过滤统计
- - 与基础头寸的相关性
- """
- hedge_results = self.trading_results.get('hedge_position', [])
-
- stats = {
- 'total_hedge_trades': len(hedge_results),
- 'total_hedge_pnl': sum(r['profit_loss'] for r in hedge_results),
- 'take_profit_count': 0,
- 'stop_loss_price_count': 0,
- 'contract_switch_count': 0,
- 'avg_holding_days': 0,
- 'volatility_filter_triggers': 0,
- 'correlation_with_base': 0.0
- }
-
- if not hedge_results:
- return stats
-
- # 统计不同类型的退出原因
- for result in hedge_results:
- exit_reason = result.get('exit_reason', '')
- if exit_reason == 'take_profit':
- stats['take_profit_count'] += 1
- elif exit_reason == 'stop_loss_price_based':
- stats['stop_loss_price_count'] += 1
- elif exit_reason == 'contract_switch':
- stats['contract_switch_count'] += 1
-
- # 平均持有期
- stats['avg_holding_days'] = sum(r['days_held'] for r in hedge_results) / len(hedge_results)
-
- # 波动率过滤触发次数(从hedge_state中统计)
- # 这里简化处理,实际可以在运行时记录
- stats['volatility_filter_triggers'] = 0
-
- # 计算与基础头寸的相关性
- stats['correlation_with_base'] = self._calculate_hedge_correlation()
-
- return stats
-
- def _calculate_hedge_correlation(self):
- """
- 计算对冲头寸与基础头寸的盈亏相关性
-
- 返回相关系数(-1到1之间)
- """
- hedge_results = self.trading_results.get('hedge_position', [])
- base_results = self.trading_results.get('base_position', [])
-
- if not hedge_results or not base_results:
- return 0.0
-
- # 按日期对齐盈亏
- hedge_pnl_by_date = {}
- for result in hedge_results:
- exit_date = result['exit_date']
- if exit_date not in hedge_pnl_by_date:
- hedge_pnl_by_date[exit_date] = 0
- hedge_pnl_by_date[exit_date] += result['profit_loss']
-
- base_pnl_by_date = {}
- for result in base_results:
- exit_date = result['exit_date']
- if exit_date not in base_pnl_by_date:
- base_pnl_by_date[exit_date] = 0
- base_pnl_by_date[exit_date] += result['profit_loss']
-
- # 找到共同日期
- common_dates = set(hedge_pnl_by_date.keys()) & set(base_pnl_by_date.keys())
-
- if len(common_dates) < 2:
- return 0.0
-
- # 计算相关系数
- hedge_values = [hedge_pnl_by_date[date] for date in common_dates]
- base_values = [base_pnl_by_date[date] for date in common_dates]
-
- try:
- correlation = np.corrcoef(hedge_values, base_values)[0, 1]
- return correlation if not np.isnan(correlation) else 0.0
- except:
- return 0.0
-
- def _print_hedge_special_stats(self, stats):
- """打印对冲策略专项统计"""
- print(f" 对冲交易总数: {stats['total_hedge_trades']}")
- print(f" 对冲总盈亏: {stats['total_hedge_pnl']:.2f}")
- print(f" 止盈次数: {stats['take_profit_count']}")
- print(f" 止损次数(价格触发): {stats['stop_loss_price_count']}")
- print(f" 合约换月平仓: {stats['contract_switch_count']}")
- print(f" 平均持有天数: {stats['avg_holding_days']:.1f}")
- print(f" 与基础头寸相关系数: {stats['correlation_with_base']:.3f}")
-
- def _get_empty_stats(self):
- """返回空的统计数据"""
- return {
- 'total_trades': 0,
- 'open_positions': 0,
- 'profitable_trades': 0,
- 'losing_trades': 0,
- 'win_rate': 0.0,
- 'closed_profit_loss': 0.0,
- 'unrealized_profit_loss': 0.0,
- 'total_profit_loss': 0.0,
- 'avg_profit_loss': 0.0,
- 'avg_profit_loss_pct': 0.0,
- 'max_profit': 0.0,
- 'max_loss': 0.0,
- 'avg_holding_days': 0.0,
- 'profit_factor': 0.0
- }
-
- def _calculate_unrealized_pnl_for_commodity(self, strategy_name, commodity):
- """计算特定品种未平仓头寸的未实现盈亏"""
- unrealized_pnl = 0.0
- strategy_positions = self.active_positions.get(strategy_name, {})
-
- if commodity in strategy_positions:
- current_contract = self._get_current_contract(commodity, self.end_date.date())
- if not current_contract:
- return 0.0
-
- end_price = self._get_price_on_date(commodity, current_contract, self.end_date.date(), 'close')
- if end_price is None:
- return 0.0
-
- positions = strategy_positions[commodity]
- for position_id, position in positions.items():
- if position['status'] == 'open' and position['contract'] == current_contract:
- # 判断是多头还是空头
- is_long = not position.get('is_short', False)
- pnl = self._calculate_futures_pnl(
- position['entry_price'], end_price, position['quantity'], commodity, is_long=is_long
- )
- unrealized_pnl += pnl
-
- return unrealized_pnl
-
- def _count_open_positions_for_commodity(self, strategy_name, commodity):
- """计算特定品种的未平仓头寸数量"""
- count = 0
- strategy_positions = self.active_positions.get(strategy_name, {})
-
- if commodity in strategy_positions:
- positions = strategy_positions[commodity]
- for position_id, position in positions.items():
- if position['status'] == 'open':
- count += 1
-
- return count
-
- def _calculate_unrealized_pnl(self, strategy_name):
- """
- 计算未平仓头寸的未实现盈亏
- """
- unrealized_pnl = 0.0
-
- # 获取策略对应的头寸字典
- strategy_positions = self.active_positions.get(strategy_name, {})
-
- for commodity, positions in strategy_positions.items():
- # 获取当前合约和最新价格
- current_contract = self._get_current_contract(commodity, self.end_date.date())
- if not current_contract:
- continue
-
- # 获取结束日期的收盘价
- end_price = self._get_price_on_date(commodity, current_contract, self.end_date.date(), 'close')
- if end_price is None:
- continue
-
- for position_id, position in positions.items():
- if position['status'] == 'open' and position['contract'] == current_contract:
- # 判断是多头还是空头
- is_long = not position.get('is_short', False)
- pnl = self._calculate_futures_pnl(
- position['entry_price'], end_price, position['quantity'], commodity, is_long=is_long
- )
-
- unrealized_pnl += pnl
-
- return unrealized_pnl
-
- def _count_open_positions(self, strategy_name):
- """
- 计算未平仓头寸数量
- """
- count = 0
- strategy_positions = self.active_positions.get(strategy_name, {})
-
- for commodity, positions in strategy_positions.items():
- for position_id, position in positions.items():
- if position['status'] == 'open':
- count += 1
-
- return count
-
- def generate_comparison_report(self, performance_stats):
- """
- 生成多级聚合的对比报告
- 包括品种-策略级、品种级、策略级和总体级报告
- """
- if self.verbose_logging:
- print("\n=== 多级聚合对比分析报告 ===")
-
- strategies = ['base_position', 'grid_trading']
- strategy_names = {
- 'base_position': '基础头寸交易',
- 'grid_trading': '网格交易'
- }
-
- all_comparison_data = {
- 'by_commodity_strategy': [],
- 'by_commodity': [],
- 'by_strategy': [],
- 'overall': []
- }
-
- # 1. 品种-策略级对比报告
- if self.verbose_logging:
- print("\n--- 品种-策略级对比 ---")
-
- by_comm_strategy_data = []
- for strategy in strategies:
- for commodity in self.core_commodities.keys():
- stats = performance_stats['by_commodity_strategy'].get(strategy, {}).get(commodity, {})
- if stats.get('total_trades', 0) > 0 or stats.get('open_positions', 0) > 0:
- by_comm_strategy_data.append({
- '品种': commodity,
- '策略': strategy_names[strategy],
- '已平仓': stats.get('total_trades', 0),
- '未平仓': stats.get('open_positions', 0),
- '胜率': f"{stats.get('win_rate', 0):.2%}",
- '已实现': f"{stats.get('closed_profit_loss', 0):.2f}",
- '未实现': f"{stats.get('unrealized_profit_loss', 0):.2f}",
- '总盈亏': f"{stats.get('total_profit_loss', 0):.2f}",
- '平均盈亏': f"{stats.get('avg_profit_loss', 0):.2f}",
- '最大盈利': f"{stats.get('max_profit', 0):.2f}",
- '最大亏损': f"{stats.get('max_loss', 0):.2f}",
- '平均天数': f"{stats.get('avg_holding_days', 0):.1f}",
- '盈亏比': f"{stats.get('profit_factor', 0):.2f}" if stats.get('profit_factor', 0) != float('inf') else "∞"
- })
-
- if by_comm_strategy_data and self.verbose_logging:
- df_comm_strategy = pd.DataFrame(by_comm_strategy_data)
- print(df_comm_strategy.to_string(index=False))
- all_comparison_data['by_commodity_strategy'] = by_comm_strategy_data
-
- # 2. 品种级汇总对比报告
- if self.verbose_logging:
- print("\n--- 品种级汇总对比 ---")
-
- by_commodity_data = []
- for commodity in self.core_commodities.keys():
- stats = performance_stats['by_commodity'].get(commodity, {})
- if stats.get('total_trades', 0) > 0 or stats.get('open_positions', 0) > 0:
- by_commodity_data.append({
- '品种': commodity,
- '已平仓': stats.get('total_trades', 0),
- '未平仓': stats.get('open_positions', 0),
- '胜率': f"{stats.get('win_rate', 0):.2%}",
- '已实现': f"{stats.get('closed_profit_loss', 0):.2f}",
- '未实现': f"{stats.get('unrealized_profit_loss', 0):.2f}",
- '总盈亏': f"{stats.get('total_profit_loss', 0):.2f}",
- '平均盈亏': f"{stats.get('avg_profit_loss', 0):.2f}",
- '最大盈利': f"{stats.get('max_profit', 0):.2f}",
- '最大亏损': f"{stats.get('max_loss', 0):.2f}",
- '平均天数': f"{stats.get('avg_holding_days', 0):.1f}",
- '盈亏比': f"{stats.get('profit_factor', 0):.2f}" if stats.get('profit_factor', 0) != float('inf') else "∞"
- })
-
- if by_commodity_data and self.verbose_logging:
- df_commodity = pd.DataFrame(by_commodity_data)
- print(df_commodity.to_string(index=False))
- all_comparison_data['by_commodity'] = by_commodity_data
-
- # 3. 策略级汇总对比报告
- if self.verbose_logging:
- print("\n--- 策略级汇总对比 ---")
-
- by_strategy_data = []
- for strategy in strategies:
- stats = performance_stats['by_strategy'].get(strategy, {})
- by_strategy_data.append({
- '策略': strategy_names[strategy],
- '已平仓': stats.get('total_trades', 0),
- '未平仓': stats.get('open_positions', 0),
- '胜率': f"{stats.get('win_rate', 0):.2%}",
- '已实现': f"{stats.get('closed_profit_loss', 0):.2f}",
- '未实现': f"{stats.get('unrealized_profit_loss', 0):.2f}",
- '总盈亏': f"{stats.get('total_profit_loss', 0):.2f}",
- '平均盈亏': f"{stats.get('avg_profit_loss', 0):.2f}",
- '最大盈利': f"{stats.get('max_profit', 0):.2f}",
- '最大亏损': f"{stats.get('max_loss', 0):.2f}",
- '平均天数': f"{stats.get('avg_holding_days', 0):.1f}",
- '盈亏比': f"{stats.get('profit_factor', 0):.2f}" if stats.get('profit_factor', 0) != float('inf') else "∞"
- })
-
- if by_strategy_data and self.verbose_logging:
- df_strategy = pd.DataFrame(by_strategy_data)
- print(df_strategy.to_string(index=False))
- all_comparison_data['by_strategy'] = by_strategy_data
-
- # 4. 总体汇总报告
- if self.verbose_logging:
- print("\n--- 整体汇总 ---")
-
- overall_data = []
- stats = performance_stats['overall']
- overall_data.append({
- '项目': '整体表现',
- '已平仓': stats.get('total_trades', 0),
- '未平仓': stats.get('open_positions', 0),
- '胜率': f"{stats.get('win_rate', 0):.2%}",
- '已实现': f"{stats.get('closed_profit_loss', 0):.2f}",
- '未实现': f"{stats.get('unrealized_profit_loss', 0):.2f}",
- '总盈亏': f"{stats.get('total_profit_loss', 0):.2f}",
- '平均盈亏': f"{stats.get('avg_profit_loss', 0):.2f}",
- '最大盈利': f"{stats.get('max_profit', 0):.2f}",
- '最大亏损': f"{stats.get('max_loss', 0):.2f}",
- '平均天数': f"{stats.get('avg_holding_days', 0):.1f}",
- '盈亏比': f"{stats.get('profit_factor', 0):.2f}" if stats.get('profit_factor', 0) != float('inf') else "∞"
- })
-
- if overall_data and self.verbose_logging:
- df_overall = pd.DataFrame(overall_data)
- print(df_overall.to_string(index=False))
- all_comparison_data['overall'] = overall_data
-
- return all_comparison_data
-
- def generate_csv_output(self, performance_stats):
- """
- 生成CSV输出文件
- """
- if self.verbose_logging:
- print("\n=== 步骤8: 生成CSV输出文件 ===")
-
- timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
-
- # 1. 生成交易记录CSV
- all_trades = []
- for strategy_name, trades in self.trading_results.items():
- all_trades.extend(trades)
-
- if all_trades:
- df_trades = pd.DataFrame(all_trades)
- # 添加列顺序,确保合约切换相关字段在前面
- column_order = ['commodity', 'contract', 'strategy', 'entry_date', 'exit_date',
- 'entry_price', 'exit_price', 'quantity', 'profit_loss', 'profit_loss_pct',
- 'days_held', 'exit_reason']
- # 重新排列DataFrame的列顺序
- existing_columns = [col for col in column_order if col in df_trades.columns]
- other_columns = [col for col in df_trades.columns if col not in column_order]
- df_trades = df_trades[existing_columns + other_columns]
-
- trades_filename = f'grid_trading_records_{timestamp}.csv'
- df_trades.to_csv(trades_filename, index=False, encoding=self.output_encoding)
- if self.verbose_logging:
- print(f"交易记录已保存至: {trades_filename}")
-
- # 2. 生成多级性能统计CSV
- csv_files = []
-
- # 2.1 品种-策略级统计CSV
- by_comm_strategy_data = []
- for strategy, commodity_data in performance_stats['by_commodity_strategy'].items():
- for commodity, stats in commodity_data.items():
- stats_record = {
- '品种': commodity,
- '策略': strategy,
- **stats
- }
- by_comm_strategy_data.append(stats_record)
-
- if by_comm_strategy_data:
- df_comm_strategy = pd.DataFrame(by_comm_strategy_data)
- comm_strategy_filename = f'grid_trading_by_commodity_strategy_{timestamp}.csv'
- df_comm_strategy.to_csv(comm_strategy_filename, index=False, encoding=self.output_encoding)
- csv_files.append(comm_strategy_filename)
- if self.verbose_logging:
- print(f"品种-策略级统计已保存至: {comm_strategy_filename}")
-
- # 2.2 品种级汇总统计CSV
- by_commodity_data = []
- for commodity, stats in performance_stats['by_commodity'].items():
- stats_record = {
- '品种': commodity,
- **stats
- }
- by_commodity_data.append(stats_record)
-
- if by_commodity_data:
- df_commodity = pd.DataFrame(by_commodity_data)
- commodity_filename = f'grid_trading_by_commodity_{timestamp}.csv'
- df_commodity.to_csv(commodity_filename, index=False, encoding=self.output_encoding)
- csv_files.append(commodity_filename)
- if self.verbose_logging:
- print(f"品种级汇总统计已保存至: {commodity_filename}")
-
- # 2.3 策略级汇总统计CSV
- by_strategy_data = []
- for strategy, stats in performance_stats['by_strategy'].items():
- stats_record = {
- '策略': strategy,
- **stats
- }
- by_strategy_data.append(stats_record)
-
- if by_strategy_data:
- df_strategy = pd.DataFrame(by_strategy_data)
- strategy_filename = f'grid_trading_by_strategy_{timestamp}.csv'
- df_strategy.to_csv(strategy_filename, index=False, encoding=self.output_encoding)
- csv_files.append(strategy_filename)
- if self.verbose_logging:
- print(f"策略级汇总统计已保存至: {strategy_filename}")
-
- # 2.4 整体汇总统计CSV
- overall_data = [{
- '项目': '整体汇总',
- **performance_stats['overall']
- }]
-
- if overall_data:
- df_overall = pd.DataFrame(overall_data)
- overall_filename = f'grid_trading_overall_{timestamp}.csv'
- df_overall.to_csv(overall_filename, index=False, encoding=self.output_encoding)
- csv_files.append(overall_filename)
- if self.verbose_logging:
- print(f"整体汇总统计已保存至: {overall_filename}")
-
- # 2.5 对冲策略专项统计CSV
- if self.config.HEDGE_ENABLED and 'hedge_special_stats' in performance_stats:
- hedge_stats = performance_stats['hedge_special_stats']
- hedge_data = [{
- '项目': '对冲策略专项统计',
- '对冲交易总数': hedge_stats['total_hedge_trades'],
- '对冲总盈亏': hedge_stats['total_hedge_pnl'],
- '止盈次数': hedge_stats['take_profit_count'],
- '止损次数_价格触发': hedge_stats['stop_loss_price_count'],
- '合约换月平仓次数': hedge_stats['contract_switch_count'],
- '平均持有天数': hedge_stats['avg_holding_days'],
- '与基础头寸相关系数': hedge_stats['correlation_with_base']
- }]
-
- df_hedge = pd.DataFrame(hedge_data)
- hedge_filename = f'grid_trading_hedge_stats_{timestamp}.csv'
- df_hedge.to_csv(hedge_filename, index=False, encoding=self.output_encoding)
- csv_files.append(hedge_filename)
- if self.verbose_logging:
- print(f"对冲策略专项统计已保存至: {hedge_filename}")
-
- return trades_filename if all_trades else None, csv_files
-
- def run_complete_analysis(self):
- """执行完整的网格交易分析流程(带主力合约切换)"""
- if self.verbose_logging:
- print("开始执行期货网格交易分析(带主力合约切换)")
- print("=" * 60)
-
- try:
- # 步骤1: 合约选择
- self.select_contracts()
- if not self.selected_contracts:
- if self.verbose_logging:
- print("未选择到有效合约,分析终止")
- return None
-
- # 步骤2: 构建主力合约历史变化
- self.build_dominant_contract_history()
- if not self.dominant_contract_history:
- if self.verbose_logging:
- print("未获取到主力合约历史,分析终止")
- return None
-
- # 步骤3: 收集价格数据
- self.collect_price_data()
- if not self.price_data:
- if self.verbose_logging:
- print("未获取到有效价格数据,分析终止")
- return None
-
- # 步骤4: 带合约切换的交易模拟
- self.simulate_with_contract_switching()
-
- # 步骤5: 性能统计分析
- performance_stats = self.calculate_performance_statistics()
-
- # 步骤6: 生成对比报告
- comparison_report = self.generate_comparison_report(performance_stats)
-
- # 步骤8: 生成CSV输出
- # trades_file, stats_files = self.generate_csv_output(performance_stats)
-
- # 分析汇总
- total_commodities = len(self.selected_contracts)
- total_trades = sum(len(trades) for trades in self.trading_results.values())
- contract_switches = sum(len(history) for history in self.dominant_contract_history.values())
-
- if self.verbose_logging:
- print("\n" + "=" * 60)
- print("分析完成汇总:")
- print(f"分析商品数: {total_commodities}")
- print(f"合约切换次数: {contract_switches}")
- print(f"总交易笔数: {total_trades}")
- # print(f"交易记录文件: {trades_file}")
- # print(f"性能统计文件: {stats_file}")
-
- return {
- 'selected_contracts': self.selected_contracts,
- 'dominant_contract_history': self.dominant_contract_history,
- 'price_data': self.price_data,
- 'trading_results': self.trading_results,
- 'performance_stats': performance_stats,
- 'comparison_report': comparison_report,
- # 'output_files': {
- # 'trades_file': trades_file,
- # 'stats_files': stats_files
- # },
- 'summary': {
- 'total_commodities': total_commodities,
- 'contract_switches': contract_switches,
- 'total_trades': total_trades
- }
- }
-
- except Exception as e:
- if self.verbose_logging:
- print(f"分析过程中出现错误: {str(e)}")
- import traceback
- traceback.print_exc()
- return None
- # =====================================================================================
- # 主程序入口
- # =====================================================================================
- def run_grid_trading_analysis(config=None):
- """运行期货网格交易分析"""
- if config is None:
- config = GridTradingConfig
-
- # 打印配置信息
- config.print_config()
-
- # 创建分析器并运行
- analyzer = FutureGridTradingAnalyzer(config)
- results = analyzer.run_complete_analysis()
- return results
- # 执行分析
- if __name__ == "__main__":
- print("期货网格交易研究分析工具(带主力合约切换)")
- print("研究期货网格交易策略在不同配置下的表现")
- print("核心功能包括主力合约自动切换、强制平仓和重新建仓逻辑")
- print("")
- print("支持两种独立交易策略的分析:")
- print(" 1. 基础头寸交易 - 价格-数量网格配置")
- print(" 2. 网格交易策略 - 限价订单网格买入卖出")
- print("")
- print("主要特点:")
- print(" - 主力合约自动监控:每日检测主力合约变化")
- print(" - 强制平仓机制:合约切换时立即平掉旧合约所有头寸")
- print(" - 智能重新建仓:根据价格条件在新合约中重新建立头寸")
- print(" - 完整交易记录:记录所有交易包括合约切换引起的强制平仓")
- print("")
- print("适用于聚宽在线研究平台")
-
- results = run_grid_trading_analysis()
-
- if results:
- print("\n✅ 分析执行成功!")
- summary = results['summary']
- print(f"📊 结果摘要:")
- print(f" - 分析商品数: {summary['total_commodities']}")
- print(f" - 合约切换次数: {summary['contract_switches']}")
- print(f" - 总交易笔数: {summary['total_trades']}")
- print(f" - 整体总盈亏: {results['performance_stats']['overall']['total_profit_loss']:.2f}")
- print(f" - 整体胜率: {results['performance_stats']['overall']['win_rate']:.2%}")
- print(f" - 未平仓头寸: {results['performance_stats']['overall']['open_positions']}")
- print(f"📂 输出文件:")
- print(f" - 交易记录文件: {results['output_files']['trades_file']}")
- for i, stats_file in enumerate(results['output_files']['stats_files'], 1):
- print(f" - 统计文件{i}: {stats_file}")
-
- print(f"\n📈 多级汇总:")
-
- # 品种级汇总简要显示
- print(f" 品种表现:")
- for commodity in ['SA', 'M']:
- comm_stats = results['performance_stats']['by_commodity'].get(commodity, {})
- if comm_stats.get('total_trades', 0) > 0 or comm_stats.get('open_positions', 0) > 0:
- print(f" {commodity}: 交易{comm_stats.get('total_trades', 0)}笔, 盈亏{comm_stats.get('total_profit_loss', 0):.2f}, 胜率{comm_stats.get('win_rate', 0):.2%}")
-
- # 策略级汇总简要显示
- print(f" 策略表现:")
- strategy_names = {
- 'base_position': '基础头寸',
- 'grid_trading': '网格交易'
- }
- for strategy, name in strategy_names.items():
- strat_stats = results['performance_stats']['by_strategy'].get(strategy, {})
- print(f" {name}: 交易{strat_stats.get('total_trades', 0)}笔, 盈亏{strat_stats.get('total_profit_loss', 0):.2f}, 胜率{strat_stats.get('win_rate', 0):.2%}")
-
- else:
- print("\n❌ 分析执行失败,请检查错误信息")
|