| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064 |
- # 深度实值买购和卖购组合的牛差策略
- # 策略说明:使用深度实值买购期权替代ETF持仓,结合卖购期权构建牛差组合
- # 参考资料:基于加百列分享中的50ETF期权备兑认购策略改进
- import jqdata
- from jqdata import *
- import pandas as pd
- import numpy as np
- import datetime
- import matplotlib.pyplot as plt
- import os
- from enum import Enum
- plt.rcParams['font.sans-serif'] = ['SimHei']
- plt.rcParams['axes.unicode_minus'] = False
- class StrategyType(Enum):
- """策略类型枚举"""
- BULL_SPREAD = "bull_spread" # 牛差组合策略(深度实值买购+卖购)
- class DeepITMBullSpreadStrategy:
- """深度实值买购和卖购组合的牛差策略"""
-
- def __init__(self, underlying_symbol='510300.XSHG', start_date='2024-01-01', end_date='2024-12-31'):
- """初始化策略参数"""
- self.underlying_symbol = underlying_symbol
- self.start_date = start_date
- self.end_date = end_date
-
- # 策略参数设置 - 对应README.md中的阈值设定
- self.config = {
- 'contract_size': 30, # 一组张数
- 'min_premium': {'510300': 0.03, '510050': 0.05, '159915': 0.03}, # 最小权利金
- 'min_days_to_expiry': 15, # 最少开仓日期(距离到期日)
- 'call_time_value_threshold': 0.015, # 买购时间价值阈值(README中为0.015)
- 'put_close_premium_threshold': 0.005, # 卖购平仓权利金阈值(README中为50单位,转换为0.005)
- 'max_days_before_expiry': 3, # 合约到期移仓日期最大(交易日)
- 'min_days_before_expiry': 1, # 合约到期移仓日期最小(交易日)
- 'add_position_threshold': {'510300': 0.2, '510050': 0.1, '159915': 0.15}, # 加仓窗口阈值
- 'max_add_positions': 2, # 加仓次数上限
- 'max_profit_close_threshold': 0.83
- }
-
- # 持仓和交易记录
- self.positions = []
- self.trade_records = []
-
- # CSV文件输出设置
- self.transaction_csv_path = 'transaction.csv'
- self.position_csv_path = 'position.csv'
-
- # 验证合约数量
- self._validate_contract_size()
- self.daily_positions = [] # 每日持仓记录
-
- # 账户管理器回调(用于通知账户管理器更新汇总)
- self.account_manager_callback = None
-
- # 获取交易日历和月度分割点
- self.trade_days = pd.Series(index=jqdata.get_trade_days(start_date, end_date))
- self.trade_days.index = pd.to_datetime(self.trade_days.index)
- self.month_split = list(self.trade_days.resample('M', label='left').mean().index) + [pd.to_datetime(end_date)]
-
- # 调试输出:显示月份分割点
- # print(f"Month split 计算结果 ({len(self.month_split)}个分割点):")
- for i, split_date in enumerate(self.month_split):
- # print(f" 索引{i}: {split_date.strftime('%Y-%m-%d')}")
- if i < len(self.month_split) - 1:
- next_split = self.month_split[i + 1]
- # print(f" 月份{i}覆盖范围: {split_date.strftime('%Y-%m-%d')} 到 {next_split.strftime('%Y-%m-%d')}")
-
- # 用于存储前一日ETF价格,计算当日涨幅
- self.previous_etf_price = None
-
- def _validate_contract_size(self):
- """验证合约数量的有效性"""
- contract_size = self.config.get('contract_size', 30)
- if not isinstance(contract_size, (int, float)) or contract_size <= 0:
- print(f"警告: 合约数量无效({contract_size}),重置为默认值30张")
- self.config['contract_size'] = 30
- elif contract_size > 200: # 设置一个合理的上限
- print(f"警告: 合约数量过大({contract_size}),限制为200张")
- self.config['contract_size'] = 200
- else:
- # 确保为整数
- self.config['contract_size'] = int(contract_size)
-
- def get_safe_contract_size(self):
- """安全获取合约数量,确保返回有效值"""
- contract_size = self.config.get('contract_size', 30)
- if not isinstance(contract_size, (int, float)) or contract_size <= 0:
- print(f"警告: 运行时检测到合约数量无效({contract_size}),使用默认值30张")
- contract_size = 30
- self.config['contract_size'] = 30
- elif contract_size > 200:
- print(f"警告: 运行时检测到合约数量过大({contract_size}),限制为200张")
- contract_size = 200
- self.config['contract_size'] = 200
- return int(contract_size)
-
- def get_underlying_code(self):
- """获取标的ETF的简化代码"""
- if '510300' in self.underlying_symbol:
- return '510300'
- elif '510050' in self.underlying_symbol:
- return '510050'
- elif '159915' in self.underlying_symbol:
- return '159915'
- return '510300' # 默认
-
- def get_option_contracts(self, trade_date, month_idx, contract_type='CO'):
- """获取指定月份的期权合约信息"""
- underlying_code = self.get_underlying_code()
-
- # 基于自然月份计算期权筛选范围
- query_date = trade_date.date() if hasattr(trade_date, 'date') else trade_date
- current_year = query_date.year
- current_month = query_date.month
- # print(f" 期权筛选调试: month_idx={month_idx}, query_date={query_date}, 当前年月={current_year}-{current_month:02d}")
-
- # 计算目标月份
- if month_idx == 0:
- # 当月期权:到期日在当前月份
- target_year = current_year
- target_month = current_month
- elif month_idx == 1:
- # 下月期权:到期日在下个月份
- if current_month == 12:
- target_year = current_year + 1
- target_month = 1
- else:
- target_year = current_year
- target_month = current_month + 1
- else:
- # 其他月份:使用原来的month_split逻辑作为后备
- start_date = self.month_split[month_idx].date() if hasattr(self.month_split[month_idx], 'date') else self.month_split[month_idx]
- end_date = self.month_split[month_idx + 1].date() if hasattr(self.month_split[month_idx + 1], 'date') else self.month_split[month_idx + 1]
- print(f" 使用month_split后备逻辑: 月份索引{month_idx}, 范围{start_date}到{end_date}")
-
- if month_idx <= 1:
- # 计算目标月份的完整范围
- start_date = pd.to_datetime(f'{target_year}-{target_month:02d}-01').date()
- if target_month == 12:
- end_date = pd.to_datetime(f'{target_year + 1}-01-01').date() - pd.Timedelta(days=1)
- else:
- end_date = pd.to_datetime(f'{target_year}-{target_month + 1:02d}-01').date() - pd.Timedelta(days=1)
-
- # print(f" 月份索引{month_idx}({target_year}-{target_month:02d})期权筛选范围: {start_date} 到 {end_date}")
-
- q_contract_info = query(
- opt.OPT_CONTRACT_INFO.code,
- opt.OPT_CONTRACT_INFO.trading_code,
- opt.OPT_CONTRACT_INFO.name,
- opt.OPT_CONTRACT_INFO.exercise_price,
- opt.OPT_CONTRACT_INFO.last_trade_date,
- opt.OPT_CONTRACT_INFO.list_date
- ).filter(
- opt.OPT_CONTRACT_INFO.contract_type == contract_type,
- opt.OPT_CONTRACT_INFO.exchange_code == 'XSHG',
- opt.OPT_CONTRACT_INFO.last_trade_date >= start_date,
- opt.OPT_CONTRACT_INFO.last_trade_date <= end_date,
- opt.OPT_CONTRACT_INFO.list_date < query_date
- )
-
- contract_info = opt.run_query(q_contract_info)
- contract_info = contract_info[contract_info['trading_code'].str[:6] == underlying_code]
-
- # 调试输出:显示筛选到的期权到期日
- if not contract_info.empty:
- print(f" 筛选到{len(contract_info)}个期权,到期日范围:{contract_info['last_trade_date'].min()} 到 {contract_info['last_trade_date'].max()}")
- else:
- print(f" 未筛选到任何期权")
-
- return contract_info
-
- def get_monthly_option_candidates(self, trade_date, month_idx, silent=False):
- """获取指定月份的所有认购期权候选信息
- 返回: (contract_info, month_info, failure_reason) - contract_info为期权列表,month_info为月份信息,failure_reason为失败原因(成功时为None)
- """
- # if not silent:
- # print(f"{trade_date.strftime('%Y-%m-%d')} 获取月份索引 {month_idx} 的认购期权候选信息")
-
- # 获取期权合约信息
- contract_info = self.get_option_contracts(trade_date, month_idx, 'CO')
-
- if contract_info.empty:
- failure_reason = "无可用认购期权合约"
- if not silent:
- print(f" 月份索引 {month_idx} {failure_reason}")
- # 增加调试信息,显示查询的时间范围
- if month_idx < len(self.month_split) - 1:
- start_date = self.month_split[month_idx].date() if hasattr(self.month_split[month_idx], 'date') else self.month_split[month_idx]
- end_date = self.month_split[month_idx + 1].date() if hasattr(self.month_split[month_idx + 1], 'date') else self.month_split[month_idx + 1]
- print(f" 查询时间范围: {start_date} 到 {end_date}")
- print(f" 查询日期: {trade_date}")
- print(f" 标的代码: {self.get_underlying_code()}")
- print(f" month_split总长度: {len(self.month_split)}")
- return None, None, failure_reason
-
- # 获取月份信息
- underlying_code = self.get_underlying_code()
- min_days_to_expiry = self.config['min_days_to_expiry']
-
- # 为每个合约添加额外信息
- query_date = trade_date.date() if hasattr(trade_date, 'date') else trade_date
- candidates = []
-
- # 统计失败原因
- expiry_rejected_count = 0
- price_missing_count = 0
- price_error_count = 0
-
- # if not silent:
- # print(f" 查询到 {len(contract_info)} 个认购期权合约,开始获取价格信息:")
-
- for idx, contract in contract_info.iterrows():
- # 检查到期日
- expiry_date = pd.to_datetime(contract['last_trade_date'])
- trade_date_obj = trade_date.date() if hasattr(trade_date, 'date') else trade_date
- days_to_expiry = len(get_trade_days(trade_date_obj, expiry_date.date())) - 1
-
- if days_to_expiry < min_days_to_expiry:
- expiry_rejected_count += 1
- # if not silent:
- # print(f" 行权价 {contract['exercise_price']:.3f}: 到期时间不足 ({days_to_expiry} < {min_days_to_expiry})")
- continue
-
- # 查询期权价格
- try:
- q_price = query(opt.OPT_DAILY_PRICE.close).filter(
- opt.OPT_DAILY_PRICE.code == contract['code'],
- opt.OPT_DAILY_PRICE.date == query_date
- )
- price_result = opt.run_query(q_price)
- if price_result.empty:
- price_missing_count += 1
- if not silent:
- print(f" 行权价 {contract['exercise_price']:.3f}: 无价格数据")
- continue
-
- option_price = price_result['close'].iloc[0]
-
- candidate = {
- 'code': contract['code'],
- 'exercise_price': contract['exercise_price'],
- 'price': option_price,
- 'expiry_date': contract['last_trade_date'],
- 'days_to_expiry': days_to_expiry,
- 'contract_info': contract
- }
- candidates.append(candidate)
-
- # if not silent:
- # print(f" 行权价 {contract['exercise_price']:.3f}: 期权价格 {option_price:.4f}, 剩余天数 {days_to_expiry}")
-
- except Exception as e:
- price_error_count += 1
- if not silent:
- print(f" 行权价 {contract['exercise_price']:.3f}: 价格查询失败 ({str(e)})")
- continue
-
- if not candidates:
- # 构建详细的失败原因
- total_contracts = len(contract_info)
- failure_parts = []
- if expiry_rejected_count > 0:
- failure_parts.append(f"到期时间不足{expiry_rejected_count}个")
- if price_missing_count > 0:
- failure_parts.append(f"无价格数据{price_missing_count}个")
- if price_error_count > 0:
- failure_parts.append(f"价格查询失败{price_error_count}个")
-
- if failure_parts:
- failure_reason = f"共{total_contracts}个期权合约,但均不符合条件:" + "、".join(failure_parts)
- else:
- failure_reason = f"共{total_contracts}个期权合约,但均不符合基本条件"
-
- # if not silent:
- # print(f" 月份索引 {month_idx} 无符合基本条件的期权候选")
- return None, None, failure_reason
-
- month_info = {
- 'month_idx': month_idx,
- 'underlying_code': underlying_code,
- 'min_days_to_expiry': min_days_to_expiry,
- 'query_date': query_date
- }
-
- if not silent:
- print(f" 成功获取 {len(candidates)} 个有效期权候选")
- # print(f" 月份索引 {month_idx} 的认购期权候选信息: {candidates}")
-
- return candidates, month_info, None
-
- def select_sell_call_from_candidates(self, candidates, etf_price, min_premium, silent=False):
- """从候选期权中选择卖购期权(虚值期权)
- 返回: (selected_call, reason) - selected_call为期权信息或None,reason为失败原因或None
- """
- # if not silent:
- # print(f" 从 {len(candidates)} 个候选中筛选虚值卖购期权(行权价 > ETF价格 {etf_price:.4f}):")
-
- # 筛选虚值期权(行权价 > ETF价格)
- otm_candidates = [c for c in candidates if c['exercise_price'] > etf_price]
-
- if not otm_candidates:
- reason = f"无虚值期权:所有{len(candidates)}个期权的行权价都 <= ETF价格{etf_price:.4f}"
- if not silent:
- print(f" 筛选结果:无虚值期权(所有期权行权价都 <= ETF价格)")
- for c in candidates:
- option_type = "虚值" if c['exercise_price'] > etf_price else "实值"
- print(f" 行权价: {c['exercise_price']:.3f} ({option_type})")
- return None, reason
-
- if not silent:
- print(f" etf价格为{etf_price:.4f}时,筛选出 {len(otm_candidates)} 个虚值期权:")
- for c in otm_candidates:
- price_spread = c['exercise_price'] - etf_price
- print(f" 行权价: {c['exercise_price']:.3f}, 价差: {price_spread:.4f}, 价格: {c['price']:.4f}")
-
- # 按行权价排序,选择最接近ETF价格的虚值期权(行权价最小的)
- otm_candidates.sort(key=lambda x: x['exercise_price'])
- closest_otm = otm_candidates[0]
-
- if not silent:
- print(f" 选择最接近的虚值期权:行权价 {closest_otm['exercise_price']:.3f}, 价格 {closest_otm['price']:.4f}")
-
- # 检查权利金要求
- if closest_otm['price'] >= min_premium:
- if not silent:
- print(f" ✓ 满足权利金要求 ({closest_otm['price']:.4f} >= {min_premium:.4f})")
-
- # 构建返回结果
- selected_call = {
- 'code': closest_otm['code'],
- 'exercise_price': closest_otm['exercise_price'],
- 'price': closest_otm['price'],
- 'expiry_date': closest_otm['expiry_date'],
- 'price_spread': closest_otm['exercise_price'] - etf_price,
- 'days_to_expiry': closest_otm['days_to_expiry']
- }
- return selected_call, None
- else:
- reason = f"期权权利金不足:{closest_otm['price']:.4f} < {min_premium:.4f}"
- if not silent:
- print(f" ✗ 权利金不足 ({closest_otm['price']:.4f} < {min_premium:.4f})")
- return None, reason
-
- def select_buy_call_from_candidates(self, candidates, etf_price, time_value_threshold, trade_date, silent=False):
- """从候选期权中选择深度实值买购期权
- 返回: (selected_call, reason) - selected_call为期权信息或None,reason为失败原因或None
- """
- if not silent:
- print(f" 从 {len(candidates)} 个候选中筛选深度实值买购期权(行权价 < ETF价格 {etf_price:.4f}):")
-
- # 筛选深度实值期权(行权价 < ETF价格)
- itm_candidates = [c for c in candidates if c['exercise_price'] < etf_price]
-
- if not itm_candidates:
- reason = f"无深度实值期权:所有{len(candidates)}个期权的行权价都 >= ETF价格{etf_price:.4f}"
- if not silent:
- print(f" 筛选结果:无深度实值期权(所有期权行权价都 >= ETF价格)")
- for c in candidates:
- option_type = "实值" if c['exercise_price'] < etf_price else "虚值"
- print(f" 行权价: {c['exercise_price']:.3f} ({option_type})")
- return None, reason
-
- if not silent:
- print(f" 筛选出 {len(itm_candidates)} 个深度实值期权:")
- for c in itm_candidates:
- print(f" 行权价: {c['exercise_price']:.3f}, 价格: {c['price']:.4f}")
-
- # 按行权价与ETF价格的差异排序(升序,最接近的在前)
- itm_candidates.sort(key=lambda x: abs(x['exercise_price'] - etf_price))
-
- if not silent:
- print(f" 按距离ETF价格的差异排序,检查时间价值(阈值: {time_value_threshold:.4f}):")
-
- # 检查时间价值
- for candidate in itm_candidates:
- intrinsic_value = etf_price - candidate['exercise_price']
- time_value = round(max(0, candidate['price'] - intrinsic_value), 4)
-
- if not silent:
- print(f" 行权价 {candidate['exercise_price']:.3f}: 内在价值 {intrinsic_value:.4f}, 时间价值 {time_value:.4f}")
-
- if time_value < time_value_threshold:
- if not silent:
- print(f" ✓ 满足时间价值要求 (< {time_value_threshold:.4f})")
-
- # 构建返回结果
- selected_call = {
- 'code': candidate['code'],
- 'exercise_price': candidate['exercise_price'],
- 'price': candidate['price'],
- 'time_value': time_value,
- 'expiry_date': candidate['expiry_date'],
- 'price_diff': abs(candidate['exercise_price'] - etf_price),
- 'days_to_expiry': candidate['days_to_expiry']
- }
- return selected_call, None
- else:
- if not silent:
- print(f" ✗ 时间价值过高 ({time_value:.4f} >= {time_value_threshold:.4f})")
-
- reason = f"所有{len(itm_candidates)}个深度实值期权的时间价值都过高(需<{time_value_threshold:.4f})"
- if not silent:
- print(f" 深度实值买购期权选择失败:所有期权时间价值都过高")
- return None, reason
-
- def try_bull_spread_for_month(self, trade_date, etf_price, month_idx, is_current_month=True, silent=False):
- """尝试指定月份的牛差策略
- 返回: (buy_call, sell_call, reason) - 成功返回两个期权,失败返回None和原因
- """
- month_type = "当月" if is_current_month else "下月"
- underlying_code = self.get_underlying_code()
- base_premium = self.config['min_premium'][underlying_code]
- min_premium = base_premium * 0.6 if is_current_month else base_premium
- time_value_threshold = self.config['call_time_value_threshold']
-
- if not silent:
- print(f"{trade_date.strftime('%Y-%m-%d')} 尝试{month_type}牛差策略(月份索引:{month_idx})")
- print(f" 权利金阈值: {min_premium:.4f},时间价值阈值: {time_value_threshold:.4f}")
-
- # 1. 获取月份期权候选
- candidates, month_info, failure_reason = self.get_monthly_option_candidates(trade_date, month_idx, silent)
- if not candidates:
- reason = f"{month_type}期权候选获取失败:{failure_reason}"
- if not silent:
- print(f" {month_type}牛差策略失败:{reason}")
- return None, None, reason
-
- # 2. 选择卖购期权
- sell_call, sell_reason = self.select_sell_call_from_candidates(candidates, etf_price, min_premium, silent)
- if not sell_call:
- reason = f"{month_type}卖购期权选择失败:{sell_reason}"
- if not silent:
- print(f" {month_type}牛差策略失败:{reason}")
- return None, None, reason
-
- # 3. 选择买购期权(必须与卖购期权到期日一致)
- buy_call, buy_reason = self.select_buy_call_from_candidates(candidates, etf_price, time_value_threshold, trade_date, silent)
- if not buy_call:
- reason = f"{month_type}买购期权选择失败:{buy_reason}"
- if not silent:
- print(f" {month_type}牛差策略失败:{reason}")
- return None, None, reason
-
- # 4. 验证到期日是否一致
- sell_expiry = pd.to_datetime(sell_call['expiry_date'])
- buy_expiry = pd.to_datetime(buy_call['expiry_date'])
- if buy_expiry != sell_expiry:
- reason = f"买购和卖购期权到期日不一致:买购到期{buy_expiry.strftime('%Y-%m-%d')} vs 卖购到期{sell_expiry.strftime('%Y-%m-%d')}"
- if not silent:
- print(f" {month_type}牛差策略失败:{reason}")
- return None, None, reason
-
- if not silent:
- print(f" {month_type}牛差策略匹配成功:")
- print(f" 卖购期权:行权价 {sell_call['exercise_price']:.3f}, 价格 {sell_call['price']:.4f}")
- print(f" 买购期权:行权价 {buy_call['exercise_price']:.3f}, 价格 {buy_call['price']:.4f}")
- print(f" 到期日:{sell_expiry.strftime('%Y-%m-%d')}")
-
- return buy_call, sell_call, None
-
-
- def calculate_bull_spread_profit(self, buy_call, sell_call, contract_size=None):
- """计算牛差组合的盈利情况"""
- if contract_size is None:
- contract_size = self.get_safe_contract_size()
-
- # 单张最大盈利 = (卖购行权价 - 买购行权价 - 买购权利金 + 卖购权利金) * 10000
- max_profit_per_contract = (
- sell_call['exercise_price'] - buy_call['exercise_price']
- - buy_call['price'] + sell_call['price']
- ) * 10000
-
- # 最小盈利(卖购权利金)
- min_profit_per_contract = sell_call['price'] * 10000
-
- return {
- 'max_profit_per_contract': max_profit_per_contract,
- 'min_profit_per_contract': min_profit_per_contract,
- 'total_max_profit': max_profit_per_contract * contract_size,
- 'total_min_profit': min_profit_per_contract * contract_size
- }
- def _create_bull_spread_position(self, trade_date, etf_price, buy_call, sell_call, position_type='main', silent=False, save_to_csv=True):
- """直接使用已选择的期权信息创建牛差仓位,避免重复期权选择"""
- # 输出期权选择成功的信息
- if not silent:
- print(f"{trade_date.strftime('%Y-%m-%d')} 卖购期权选择成功,sell_call: {sell_call}")
- print(f"{trade_date.strftime('%Y-%m-%d')} 检查买购期权选择,buy_call: {buy_call}")
-
- # 安全获取合约数量
- contract_size = self.get_safe_contract_size()
-
- # 计算盈利信息
- profit_info = self.calculate_bull_spread_profit(buy_call, sell_call, contract_size)
-
- # 创建仓位记录
- position = {
- 'open_date': trade_date,
- 'etf_price': etf_price,
- 'buy_call': buy_call,
- 'sell_call': sell_call,
- 'contract_size': contract_size,
- 'profit_info': profit_info,
- 'position_type': position_type,
- 'status': 'open',
- 'strategy_type': StrategyType.BULL_SPREAD, # 标记策略类型
- 'add_position_trigger_price': etf_price - self.config['add_position_threshold'][self.get_underlying_code()] if position_type == 'main' else None
- }
- self.positions.append(position)
- # 记录交易(内存)
- trade_record = {
- '交易日期': trade_date,
- '交易类型': '开仓',
- '仓位类型': position_type,
- '策略类型': StrategyType.BULL_SPREAD.value, # 新增字段
- 'ETF标的': self.underlying_symbol,
- '买购期权价格': buy_call['price'],
- '买购期权行权价': buy_call['exercise_price'],
- '买购期权到期日': buy_call['expiry_date'],
- '卖购期权价格': sell_call['price'],
- '卖购期权行权价': sell_call['exercise_price'],
- '卖购期权到期日': sell_call['expiry_date'],
- '合约数量': contract_size,
- 'ETF价格': etf_price,
- '单张最大盈利': profit_info['max_profit_per_contract'],
- '单张最小盈利': profit_info['min_profit_per_contract'],
- '总最大盈利': profit_info['total_max_profit'],
- '总最小盈利': profit_info['total_min_profit']
- }
- self.trade_records.append(trade_record)
-
- # 保存交易记录到CSV(如果需要)
- if save_to_csv:
- self.save_transaction_to_csv(trade_record)
- return position
- def try_bull_spread_strategy(self, trade_date, etf_price, month_idx, position_type='main', silent=False, save_to_csv=True):
- """尝试牛差策略:深度实值买购+卖购期权"""
-
- # 1. 先尝试当月牛差策略
- buy_call, sell_call, reason = self.try_bull_spread_for_month(trade_date, etf_price, month_idx, is_current_month=True, silent=silent)
-
- if buy_call and sell_call:
- if not silent:
- print(f" 当月牛差策略匹配成功,执行开仓")
- return self._create_bull_spread_position(trade_date, etf_price, buy_call, sell_call, position_type, silent, save_to_csv), None
-
- # 2. 当月失败,尝试下月牛差策略
- # 计算下个月的月份索引
- trade_date_obj = trade_date.date() if hasattr(trade_date, 'date') else trade_date
- current_year = trade_date_obj.year
- current_month = trade_date_obj.month
-
- # 计算下个月
- if current_month == 12:
- next_year = current_year + 1
- next_month = 1
- else:
- next_year = current_year
- next_month = current_month + 1
-
- # 下个月第一天和最后一天
- next_month_start = pd.to_datetime(f'{next_year}-{next_month:02d}-01').date()
- if next_month == 12:
- next_month_end = pd.to_datetime(f'{next_year + 1}-01-01').date() - pd.Timedelta(days=1)
- else:
- next_month_end = pd.to_datetime(f'{next_year}-{next_month + 1:02d}-01').date() - pd.Timedelta(days=1)
-
- # 查找对应的月份索引
- next_month_idx = None
- for i, month_date in enumerate(self.month_split[:-1]):
- month_start = month_date.date() if hasattr(month_date, 'date') else month_date
- month_end = self.month_split[i + 1].date() if hasattr(self.month_split[i + 1], 'date') else self.month_split[i + 1]
-
- # 检查下月范围是否与month_split中的某个月份重叠
- if (next_month_start <= month_end and next_month_end >= month_start):
- next_month_idx = i
- break
-
- if next_month_idx is None:
- if not silent:
- print(f" 无法找到下月({next_year}-{next_month:02d})对应的月份索引")
- return None, f"牛差策略失败: 当月原因({reason}),且无下月期权可用"
-
- buy_call_next, sell_call_next, reason_next = self.try_bull_spread_for_month(trade_date, etf_price, next_month_idx, is_current_month=False, silent=silent)
-
- if buy_call_next and sell_call_next:
- if not silent:
- print(f" 下月牛差策略匹配成功,执行开仓")
- return self._create_bull_spread_position(trade_date, etf_price, buy_call_next, sell_call_next, position_type, silent, save_to_csv), None
-
- # 两个月份都失败
- combined_reason = f"当月失败({reason}),下月失败({reason_next})"
- return None, f"牛差策略失败: {combined_reason}"
- def open_position(self, trade_date, etf_price, position_type='main', silent=False, save_to_csv=True):
- """开仓方法 - 牛差策略(深度实值买购+卖购)"""
- # 确定月份索引
- month_idx = 0
- for i, month_date in enumerate(self.month_split[:-1]):
- if trade_date >= month_date:
- month_idx = i
-
- # 先输出开始尝试的日志,确保顺序正确
- if not silent:
- print(f"{trade_date.strftime('%Y-%m-%d')} 开始尝试牛差策略")
-
- # 尝试牛差策略(深度实值买购+卖购)
- result, reason = self.try_bull_spread_strategy(trade_date, etf_price, month_idx, position_type, silent, save_to_csv)
- if result is not None:
- return result
-
- # 牛差策略失败,输出详细原因
- print(f"{trade_date.strftime('%Y-%m-%d')} 牛差策略开仓失败,原因:{reason}")
- return None
- def should_close_position(self, position, current_date, etf_price):
- """判断是否应该平仓 - 支持两种策略类型"""
- if position['status'] != 'open':
- return False, None
-
- # 获取策略类型,兼容旧版本数据
- strategy_type = position.get('strategy_type', StrategyType.BULL_SPREAD)
-
- # 检查合约到期时间
- expiry_date = pd.to_datetime(position['sell_call']['expiry_date'])
- # 确保日期格式正确
- current_date_obj = current_date.date() if hasattr(current_date, 'date') else current_date
- expiry_date_obj = expiry_date.date() if hasattr(expiry_date, 'date') else expiry_date
- days_to_expiry = len(get_trade_days(current_date_obj, expiry_date_obj)) - 1
-
- # 到期日临近(对所有策略类型都适用)
- if days_to_expiry <= self.config['max_days_before_expiry']:
- return True, '过期时间平仓'
-
- # 获取卖购行权价
- sell_call_strike = position['sell_call']['exercise_price']
-
- # 牛差策略的平仓逻辑:根据ETF价格与卖购行权价关系选择条件
- if etf_price >= sell_call_strike:
- # ETF价格大于等于卖购行权价时,检查最大盈利平仓条件
- return self._check_max_profit_close_condition(position, current_date, etf_price, sell_call_strike)
- else:
- # ETF价格小于卖购行权价时,检查卖购权利金剩余平仓条件
- return self._check_sell_call_close_condition(position, current_date, etf_price, sell_call_strike)
- return False, None
-
-
- def _check_max_profit_close_condition(self, position, current_date, etf_price, sell_call_strike):
- """检查最大盈利平仓条件(牛差策略专用)"""
- try:
- query_date = current_date.date() if hasattr(current_date, 'date') else current_date
-
- # 获取买购期权价格
- q_buy_price = query(opt.OPT_DAILY_PRICE.close).filter(
- opt.OPT_DAILY_PRICE.code == position['buy_call']['code'],
- opt.OPT_DAILY_PRICE.date == query_date
- )
- buy_price_result = opt.run_query(q_buy_price)
-
- # 获取卖购期权价格
- q_sell_price = query(opt.OPT_DAILY_PRICE.close).filter(
- opt.OPT_DAILY_PRICE.code == position['sell_call']['code'],
- opt.OPT_DAILY_PRICE.date == query_date
- )
- sell_price_result = opt.run_query(q_sell_price)
-
- if not buy_price_result.empty and not sell_price_result.empty:
- current_buy_call_price = buy_price_result['close'].iloc[0]
- current_sell_call_price = sell_price_result['close'].iloc[0]
- # 计算当前盈利
- buy_call_pnl = (current_buy_call_price - position['buy_call']['price']) * 10000
- sell_call_pnl = (position['sell_call']['price'] - current_sell_call_price) * 10000
- current_pnl_per_contract = buy_call_pnl + sell_call_pnl
-
- # 获取最大盈利
- max_profit_per_contract = position['profit_info']['max_profit_per_contract']
-
- # 判断是否达到平仓阈值
- if max_profit_per_contract > 0 and current_pnl_per_contract >= max_profit_per_contract * self.config['max_profit_close_threshold']:
- print(f"{current_date.strftime('%Y-%m-%d')} 触发最大盈利平仓: 当前盈利: {current_pnl_per_contract:.2f}, 最大盈利: {max_profit_per_contract:.2f}")
- return True, '最大盈利平仓'
- except Exception as e:
- print(f"{current_date.strftime('%Y-%m-%d')} 检查最大盈利平仓条件时出错: {e}")
-
- return False, None
-
- def _check_sell_call_close_condition(self, position, current_date, etf_price, sell_call_strike):
- """检查卖购权利金剩余平仓条件(两种策略共用)"""
- try:
- query_date = current_date.date() if hasattr(current_date, 'date') else current_date
- q_price = query(opt.OPT_DAILY_PRICE.close).filter(
- opt.OPT_DAILY_PRICE.code == position['sell_call']['code'],
- opt.OPT_DAILY_PRICE.date == query_date
- )
- price_result = opt.run_query(q_price)
- if not price_result.empty:
- current_sell_call_price = price_result['close'].iloc[0]
- if current_sell_call_price < self.config['put_close_premium_threshold']:
- print(f"{current_date.strftime('%Y-%m-%d')} 触发卖购权利金平仓: 卖购期权价格{current_sell_call_price:.4f}")
- return True, '卖购权利金平仓'
- except Exception as e:
- print(f"{current_date.strftime('%Y-%m-%d')} 检查卖购权利金平仓条件时出错: {e}")
-
- return False, None
-
-
- def close_position(self, position, current_date, etf_price, reason):
- """平仓操作 - 支持两种策略类型"""
- try:
- # 获取策略类型,兼容旧版本数据
- strategy_type = position.get('strategy_type', StrategyType.BULL_SPREAD)
-
- query_date = current_date.date() if hasattr(current_date, 'date') else current_date
-
- # 获取卖购期权当前价格(两种策略都需要)
- q_sell_price = query(opt.OPT_DAILY_PRICE.close).filter(
- opt.OPT_DAILY_PRICE.code == position['sell_call']['code'],
- opt.OPT_DAILY_PRICE.date == query_date
- )
- sell_price_result = opt.run_query(q_sell_price)
- if sell_price_result.empty:
- raise Exception(f"无法获取卖购期权{position['sell_call']['code']}在{query_date}的价格数据")
- sell_call_close_price = sell_price_result['close'].iloc[0]
-
- # 计算卖购期权盈亏
- sell_call_pnl = (position['sell_call']['price'] - sell_call_close_price) * position['contract_size'] * 10000
-
- # 获取买购期权当前价格
- q_buy_price = query(opt.OPT_DAILY_PRICE.close).filter(
- opt.OPT_DAILY_PRICE.code == position['buy_call']['code'],
- opt.OPT_DAILY_PRICE.date == query_date
- )
- buy_price_result = opt.run_query(q_buy_price)
- if buy_price_result.empty:
- raise Exception(f"无法获取买购期权{position['buy_call']['code']}在{query_date}的价格数据")
- buy_call_close_price = buy_price_result['close'].iloc[0]
-
- buy_call_pnl = (buy_call_close_price - position['buy_call']['price']) * position['contract_size'] * 10000
- total_pnl = buy_call_pnl + sell_call_pnl - 10 # 两种策略都扣除相同手续费
-
- # 牛差策略平仓详情输出
- print(f"{current_date.strftime('%Y-%m-%d')} 牛差策略平仓详情:")
- print(f" 深度实值买购:开仓价格{position['buy_call']['price']:.4f} -> 平仓价格{buy_call_close_price:.4f}")
- print(f" 深度实值买购盈亏:({buy_call_close_price:.4f} - {position['buy_call']['price']:.4f}) × {position['contract_size']} × 10000 = {buy_call_pnl:.2f}元")
- print(f" 卖购期权:开仓价格{position['sell_call']['price']:.4f} -> 平仓价格{sell_call_close_price:.4f}")
- print(f" 卖购期权盈亏:({position['sell_call']['price']:.4f} - {sell_call_close_price:.4f}) × {position['contract_size']} × 10000 = {sell_call_pnl:.2f}元")
- print(f" 手续费:-10元")
- print(f" 组合总盈亏:{buy_call_pnl:.2f} + {sell_call_pnl:.2f} - 10 = {total_pnl:.2f}元")
-
- # 更新仓位状态
- position['status'] = 'closed'
- position['close_date'] = current_date
- position['close_etf_price'] = etf_price
- position['close_reason'] = reason
- position['buy_call_close_price'] = buy_call_close_price
- position['sell_call_close_price'] = sell_call_close_price
- position['pnl'] = total_pnl
- # 记录交易(内存)- 两种策略都有买购期权信息
- trade_record = {
- '交易日期': current_date,
- '交易类型': '平仓',
- '仓位类型': position['position_type'],
- '策略类型': strategy_type.value, # 策略类型字段
- 'ETF标的': self.underlying_symbol,
- '买购期权价格': position['buy_call']['price'], # 两种策略都有买购期权
- '买购期权行权价': position['buy_call']['exercise_price'],
- '买购期权到期日': position['buy_call']['expiry_date'],
- '卖购期权价格': position['sell_call']['price'],
- '卖购期权行权价': position['sell_call']['exercise_price'],
- '卖购期权到期日': position['sell_call']['expiry_date'],
- '合约数量': position['contract_size'],
- 'ETF价格': etf_price,
- '买购期权收盘价': buy_call_close_price,
- '卖购期权收盘价': sell_call_close_price,
- '开仓日期': position['open_date'],
- '开仓ETF价格': position['etf_price'],
- '买购期权盈亏': buy_call_pnl,
- '卖购期权盈亏': sell_call_pnl,
- '总盈亏': total_pnl,
- '平仓原因': reason,
- '单张最大盈利': '', # 平仓时不需要,保持字段一致性
- '单张最小盈利': '', # 平仓时不需要,保持字段一致性
- '总最大盈利': '', # 平仓时不需要,保持字段一致性
- '总最小盈利': '' # 平仓时不需要,保持字段一致性
- }
- self.trade_records.append(trade_record)
-
- # 保存交易记录到CSV
- self.save_transaction_to_csv(trade_record)
- except Exception as e:
- print(f"平仓时出错: {e}")
- position['status'] = 'error'
- def should_add_position(self, current_date, etf_price):
- """判断是否应该加仓"""
- # 检查是否有主仓位
- main_positions = [p for p in self.positions if p['position_type'] == 'main' and p['status'] == 'open']
- if not main_positions:
- return False
- # 获取最新主仓位
- latest_main_position = main_positions[-1]
- # 检查加仓次数是否超限
- add_positions = [p for p in self.positions if p['position_type'] == 'add' and p['status'] == 'open']
- if len(add_positions) >= self.config['max_add_positions']:
- return False
-
- # 检查是否触发加仓条件
- trigger_price = latest_main_position['add_position_trigger_price']
-
- if trigger_price and etf_price <= trigger_price:
- return True
- return False
- def save_transaction_to_csv(self, transaction_data):
- """保存交易记录到CSV文件"""
- try:
- # 数据验证 - 确保必需字段存在
- required_fields = ['交易日期', '交易类型', '仓位类型', 'ETF标的', '合约数量', 'ETF价格']
- for field in required_fields:
- if field not in transaction_data or transaction_data[field] is None:
- print(f"警告: 交易记录缺少必需字段 {field},跳过保存")
- return
-
- # 数据验证 - 确保数据类型正确
- if not isinstance(transaction_data.get('合约数量'), (int, float)) or transaction_data.get('合约数量') <= 0:
- print(f"警告: 合约数量无效 {transaction_data.get('合约数量')},跳过保存")
- return
-
- if not isinstance(transaction_data.get('ETF价格'), (int, float)) or transaction_data.get('ETF价格') <= 0:
- print(f"警告: ETF价格无效 {transaction_data.get('ETF价格')},跳过保存")
- return
-
- # 复制数据并标准化格式
- data_copy = transaction_data.copy()
-
- # 定义完整的字段顺序,确保开仓和平仓记录字段一致
- standard_fields = [
- '交易日期', '交易类型', '仓位类型', '策略类型', 'ETF标的', # 新增策略类型字段
- '买购期权价格', '买购期权行权价', '买购期权到期日',
- '卖购期权价格', '卖购期权行权价', '卖购期权到期日',
- '合约数量', 'ETF价格',
- '单张最大盈利', '单张最小盈利', '总最大盈利', '总最小盈利',
- '买购期权收盘价', '卖购期权收盘价', '开仓日期', '开仓ETF价格',
- '买购期权盈亏', '卖购期权盈亏', '总盈亏', '平仓原因'
- ]
-
- # 确保所有字段都存在,缺失的用空字符串填充
- standardized_data = {}
- for field in standard_fields:
- standardized_data[field] = data_copy.get(field, '')
-
- # 转换日期字段为YYYY-MM-DD格式
- date_fields = ['交易日期', '买购期权到期日', '卖购期权到期日', '开仓日期']
- for field in date_fields:
- if standardized_data[field] and standardized_data[field] != '':
- try:
- if hasattr(standardized_data[field], 'strftime'):
- standardized_data[field] = standardized_data[field].strftime('%Y-%m-%d')
- elif hasattr(standardized_data[field], 'date'):
- standardized_data[field] = standardized_data[field].date().strftime('%Y-%m-%d')
- else:
- # 如果是字符串,尝试解析后转换
- date_obj = pd.to_datetime(standardized_data[field])
- standardized_data[field] = date_obj.strftime('%Y-%m-%d')
- except:
- print(f"警告: 无法转换日期字段 {field}: {standardized_data[field]}")
- pass # 保持原值
-
- # 检查文件是否存在
- file_exists = os.path.exists(self.transaction_csv_path)
-
- # 转换为DataFrame,保持字段顺序
- df = pd.DataFrame([standardized_data], columns=standard_fields)
-
- # 保存到CSV
- if file_exists:
- df.to_csv(self.transaction_csv_path, mode='a', header=False, index=False, encoding='utf-8-sig')
- else:
- df.to_csv(self.transaction_csv_path, mode='w', header=True, index=False, encoding='utf-8-sig')
-
- except Exception as e:
- print(f"保存交易记录到CSV时出错: {e}")
- print(f"问题数据: {transaction_data}")
-
- def save_daily_position_to_csv(self, position_data):
- """保存每日持仓记录到CSV文件"""
- try:
- # 复制数据并标准化日期格式
- data_copy = position_data.copy()
-
- # 转换日期字段为YYYY-MM-DD格式(使用中文字段名)
- if '交易日期' in data_copy and data_copy['交易日期'] is not None:
- if hasattr(data_copy['交易日期'], 'strftime'):
- data_copy['交易日期'] = data_copy['交易日期'].strftime('%Y-%m-%d')
- elif hasattr(data_copy['交易日期'], 'date'):
- data_copy['交易日期'] = data_copy['交易日期'].date().strftime('%Y-%m-%d')
- else:
- # 如果是字符串,尝试解析后转换
- try:
- date_obj = pd.to_datetime(data_copy['交易日期'])
- data_copy['交易日期'] = date_obj.strftime('%Y-%m-%d')
- except:
- pass # 保持原值
-
- # 处理持仓详情字段 - 将列表转换为字符串
- if '持仓详情' in data_copy and isinstance(data_copy['持仓详情'], list):
- if data_copy['持仓详情']: # 如果列表不为空
- detail_strings = []
- for detail in data_copy['持仓详情']:
- detail_str = f"{detail['期权类别']}:{detail['持仓标的代码']}@{detail['行权价格']:.2f}×{detail['合约数量']}(盈亏{detail['盈亏金额']:.2f})"
- detail_strings.append(detail_str)
- data_copy['持仓详情'] = '; '.join(detail_strings)
- else:
- data_copy['持仓详情'] = '无持仓'
-
- # 添加到每日持仓列表
- self.daily_positions.append(data_copy)
-
- # 创建DataFrame,排除列表类型的复杂字段
- csv_data = {k: v for k, v in data_copy.items() if not isinstance(v, list)}
- df = pd.DataFrame([csv_data])
-
- # 检查文件是否存在
- file_exists = os.path.exists(self.position_csv_path)
-
- # 保存到CSV
- if file_exists:
- df.to_csv(self.position_csv_path, mode='a', header=False, index=False, encoding='utf-8-sig')
- else:
- df.to_csv(self.position_csv_path, mode='w', header=True, index=False, encoding='utf-8-sig')
-
- except Exception as e:
- print(f"保存持仓记录到CSV时出错: {e}")
- print(f"问题数据: {position_data}")
- def record_daily_positions(self, trade_date, etf_price):
- """记录每日持仓状况"""
- # 获取当前所有开仓状态的持仓
- open_positions = [p for p in self.positions if p['status'] == 'open']
-
- if not open_positions:
- # 如果没有持仓,记录一条空仓记录
- position_record = {
- '交易日期': trade_date,
- 'ETF价格': etf_price,
- '总仓位数': 0,
- '主仓位数': 0,
- '加仓仓位数': 0,
- '总合约数': 0,
- '总组合盈亏': 0,
- '总卖购盈亏': 0,
- '总买购盈亏': 0,
- '持仓详情': []
- }
- self.save_daily_position_to_csv(position_record)
- return
-
- # 统计持仓信息
- main_positions = [p for p in open_positions if p['position_type'] == 'main']
- add_positions = [p for p in open_positions if p['position_type'] == 'add']
- total_contracts = sum(p['contract_size'] for p in open_positions)
-
- # 记录详细持仓信息
- position_details = []
- total_combo_pnl = 0
- total_sell_call_pnl = 0
- total_buy_call_pnl = 0
-
- for i, pos in enumerate(open_positions):
- # 获取策略类型,兼容旧版本数据
- strategy_type = pos.get('strategy_type', StrategyType.BULL_SPREAD)
-
- # 获取当前期权价值(尽量获取,失败则使用开仓价格)
- try:
- query_date = trade_date.date() if hasattr(trade_date, 'date') else trade_date
-
- # 获取卖购期权当前价格(两种策略都需要)
- q_sell_price = query(opt.OPT_DAILY_PRICE.close).filter(
- opt.OPT_DAILY_PRICE.code == pos['sell_call']['code'],
- opt.OPT_DAILY_PRICE.date == query_date
- )
- sell_price_result = opt.run_query(q_sell_price)
- if sell_price_result.empty:
- raise Exception("无法获取卖购期权价格数据")
- sell_call_current_price = sell_price_result['close'].iloc[0]
-
- # 计算卖购期权盈亏
- sell_call_pnl = (pos['sell_call']['price'] - sell_call_current_price) * pos['contract_size'] * 10000
-
- # 两种策略都有买购期权,需要计算买购期权盈亏
- q_buy_price = query(opt.OPT_DAILY_PRICE.close).filter(
- opt.OPT_DAILY_PRICE.code == pos['buy_call']['code'],
- opt.OPT_DAILY_PRICE.date == query_date
- )
- buy_price_result = opt.run_query(q_buy_price)
- if buy_price_result.empty:
- raise Exception("无法获取买购期权价格数据")
- buy_call_current_price = buy_price_result['close'].iloc[0]
-
- buy_call_pnl = (buy_call_current_price - pos['buy_call']['price']) * pos['contract_size'] * 10000
- combo_pnl = buy_call_pnl + sell_call_pnl
-
- except:
- # 两种策略都有买购期权,异常处理相同
- buy_call_current_price = pos['buy_call']['price']
- sell_call_current_price = pos['sell_call']['price']
- buy_call_pnl = 0
- sell_call_pnl = 0
- combo_pnl = 0
-
- # 累计总盈亏
- total_combo_pnl += combo_pnl
- total_sell_call_pnl += sell_call_pnl
- total_buy_call_pnl += buy_call_pnl
-
- # 牛差策略:记录深度实值买购和卖购期权
- buy_call_detail = {
- '持仓标的代码': pos['buy_call']['code'],
- '期权类别': '买购(牛差)',
- '合约数量': pos['contract_size'],
- '行权价格': pos['buy_call']['exercise_price'],
- '成本价格': pos['buy_call']['price'],
- '当前价格': buy_call_current_price,
- '盈亏金额': buy_call_pnl,
- '仓位类型': pos['position_type'],
- '策略类型': '牛差组合',
- '开仓日期': pos['open_date']
- }
-
- sell_call_detail = {
- '持仓标的代码': pos['sell_call']['code'],
- '期权类别': '卖购(牛差)',
- '合约数量': pos['contract_size'],
- '行权价格': pos['sell_call']['exercise_price'],
- '成本价格': pos['sell_call']['price'],
- '当前价格': sell_call_current_price,
- '盈亏金额': sell_call_pnl,
- '仓位类型': pos['position_type'],
- '策略类型': '牛差组合',
- '开仓日期': pos['open_date']
- }
-
- position_details.extend([buy_call_detail, sell_call_detail])
-
- # 记录每日持仓汇总
- position_record = {
- '交易日期': trade_date,
- 'ETF价格': etf_price,
- '总仓位数': len(open_positions),
- '主仓位数': len(main_positions),
- '加仓仓位数': len(add_positions),
- '总合约数': total_contracts,
- '总组合盈亏': total_combo_pnl,
- '总卖购盈亏': total_sell_call_pnl,
- '总买购盈亏': total_buy_call_pnl,
- '持仓详情': position_details
- }
-
- self.save_daily_position_to_csv(position_record)
-
- # 注释掉单独的账户管理器回调,改为在多标的管理器中统一处理
- # 通知账户管理器更新每日汇总
- # if self.account_manager_callback:
- # try:
- # self.account_manager_callback(trade_date)
- # except Exception as e:
- # print(f"{trade_date.strftime('%Y-%m-%d')} 更新账户汇总回调出错: {e}")
- def run_strategy(self):
- """运行策略主逻辑"""
- print("开始运行深度实值买购和卖购组合的牛差策略...")
-
- for i, trade_date in enumerate(self.trade_days.index):
- # 获取ETF价格
- try:
- price_data = get_price(self.underlying_symbol, trade_date, trade_date, fields=['close'])['close']
- # print(f"{trade_date.strftime('%Y-%m-%d')} 获取ETF价格: {price_data.iloc[0]:.4f}")
- if price_data.empty:
- print(f"{trade_date.strftime('%Y-%m-%d')} 获取ETF价格失败,因为price_data为空")
- continue
- etf_price = price_data.iloc[0]
- except:
- print(f"{trade_date.strftime('%Y-%m-%d')} 获取ETF价格失败,因为获取{self.underlying_symbol}价格失败")
- continue
- # 记录每日持仓状况
- self.record_daily_positions(trade_date, etf_price)
-
- # 标记是否有交易发生
- has_trading = False
-
- # 检查是否需要平仓
- if len(self.positions) > 0:
- for position in self.positions:
- should_close, reason = self.should_close_position(position, trade_date, etf_price)
- # print(f"{trade_date.strftime('%Y-%m-%d')} 检查是否需要平仓: {should_close}, 平仓原因: {reason}")
- if should_close:
- self.close_position(position, trade_date, etf_price, reason)
- print(f"{trade_date.strftime('%Y-%m-%d')} 平仓: {reason}, ETF价格: {etf_price:.4f}")
- has_trading = True
- # 检查是否需要开新仓(首次开仓或平仓后重新开仓)
- open_positions = [p for p in self.positions if p['status'] == 'open']
- if not open_positions:
- new_position = self.open_position(trade_date, etf_price, 'main') # 使用新的统一开仓方法
- if new_position:
- max_profit = new_position['profit_info']['total_max_profit']
- contract_size = new_position['contract_size']
-
- # 牛差策略输出
- strategy_desc = f"牛差组合: 深度实值买购{new_position['buy_call']['exercise_price']:.2f}@{new_position['buy_call']['price']:.4f}, 卖购{new_position['sell_call']['exercise_price']:.2f}@{new_position['sell_call']['price']:.4f}"
- profit_desc = f"最大牛差收益: {max_profit:.2f}元"
-
- # 获取资金信息(如果有的话)
- if hasattr(self, 'config') and 'allocated_capital' in self.config:
- allocated_capital = self.config.get('allocated_capital', 0)
- estimated_margin = contract_size * 1000 # 每张约1000元保证金(粗略估算)
- print(f"{trade_date.strftime('%Y-%m-%d')} 开仓主仓位: {strategy_desc}, ETF价格: {etf_price:.4f}, 合约数量: {contract_size}张, {profit_desc}, 可用资金: {allocated_capital:.0f}元, 预估保证金: {estimated_margin:.0f}元")
- else:
- print(f"{trade_date.strftime('%Y-%m-%d')} 开仓主仓位: {strategy_desc}, ETF价格: {etf_price:.4f}, 合约数量: {contract_size}张, {profit_desc}")
- has_trading = True
-
- # 检查是否需要加仓
- elif self.should_add_position(trade_date, etf_price):
- add_position = self.open_position(trade_date, etf_price, 'add', silent=False) # 使用新的统一开仓方法
- if add_position:
- max_profit = add_position['profit_info']['total_max_profit']
- contract_size = add_position['contract_size']
-
- # 牛差策略输出
- strategy_desc = f"牛差组合: 深度实值买购{add_position['buy_call']['exercise_price']:.2f}@{add_position['buy_call']['price']:.4f}, 卖购{add_position['sell_call']['exercise_price']:.2f}@{add_position['sell_call']['price']:.4f}"
- profit_desc = f"最大牛差收益: {max_profit:.2f}元"
-
- # 获取资金信息(如果有的话)
- if hasattr(self, 'config') and 'allocated_capital' in self.config:
- allocated_capital = self.config.get('allocated_capital', 0)
- estimated_margin = contract_size * 1000 # 每张约1000元保证金(粗略估算)
- print(f"{trade_date.strftime('%Y-%m-%d')} 加仓: {strategy_desc}, ETF价格: {etf_price:.4f}, 合约数量: {contract_size}张, {profit_desc}, 可用资金: {allocated_capital:.0f}元, 预估保证金: {estimated_margin:.0f}元")
- else:
- print(f"{trade_date.strftime('%Y-%m-%d')} 加仓: {strategy_desc}, ETF价格: {etf_price:.4f}, 合约数量: {contract_size}张, {profit_desc}")
- has_trading = True
-
- # 更新前一日ETF价格,用于下一天计算涨幅
- self.previous_etf_price = etf_price
-
- print("策略运行完成!")
-
- def get_performance_summary(self):
- """获取策略表现总结"""
- if not self.trade_records:
- return "没有交易记录"
-
- closed_positions = [p for p in self.positions if p['status'] == 'closed']
- if not closed_positions:
- return "没有已平仓的交易"
-
- total_pnl = sum(p['pnl'] for p in closed_positions)
- winning_trades = len([p for p in closed_positions if p['pnl'] > 0])
- total_trades = len(closed_positions)
- win_rate = winning_trades / total_trades if total_trades > 0 else 0
-
- summary = f"""
- 策略表现总结:
- =============
- 总交易次数: {total_trades}
- 获利交易: {winning_trades}
- 胜率: {win_rate:.2%}
- 总盈亏: {total_pnl:.2f}元
- 平均每笔盈亏: {total_pnl/total_trades:.2f}元
- """
- return summary
-
- def plot_results(self):
- """绘制策略结果"""
- if not self.daily_positions:
- print("没有持仓数据可以绘制")
- return
- # 使用每日持仓记录绘制盈亏曲线
- position_df = pd.DataFrame(self.daily_positions)
-
- # 过滤有持仓的记录(排除空仓记录,但保留盈亏为0的记录用于显示完整曲线)
- position_data = position_df.copy()
-
- if position_data.empty:
- print("没有持仓数据可以绘制")
- return
-
- # 确保交易日期是datetime格式
- position_data['交易日期'] = pd.to_datetime(position_data['交易日期'])
- position_data = position_data.sort_values('交易日期')
-
- # 确保盈亏字段为数值类型
- position_data['总组合盈亏'] = pd.to_numeric(position_data['总组合盈亏'], errors='coerce').fillna(0)
- position_data['总卖购盈亏'] = pd.to_numeric(position_data['总卖购盈亏'], errors='coerce').fillna(0)
-
- # 绘图
- fig, ax = plt.subplots(1, 1, figsize=(12, 8))
-
- # 绘制两条盈亏曲线
- ax.plot(position_data['交易日期'], position_data['总组合盈亏'],
- label='每日组合浮动盈亏(买购+卖购)', color='blue', linewidth=2)
- ax.plot(position_data['交易日期'], position_data['总卖购盈亏'],
- label='每日卖购浮动盈亏', color='red', linewidth=2, linestyle='--')
-
- # 添加零线
- ax.axhline(y=0, color='black', linestyle='-', alpha=0.3)
-
- ax.set_title(f'{self.get_underlying_code()}策略每日浮动盈亏对比', fontsize=14)
- ax.set_ylabel('浮动盈亏 (元)', fontsize=12)
- ax.set_xlabel('日期', fontsize=12)
- ax.legend(fontsize=10)
- ax.grid(True, alpha=0.3)
-
- # 格式化日期显示
- import matplotlib.dates as mdates
- ax.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m-%d'))
- ax.xaxis.set_major_locator(mdates.MonthLocator())
- plt.xticks(rotation=45)
-
- # 添加数据标注
- if len(position_data) > 0:
- final_combo_pnl = position_data['总组合盈亏'].iloc[-1]
- final_sell_pnl = position_data['总卖购盈亏'].iloc[-1]
- max_combo_pnl = position_data['总组合盈亏'].max()
- min_combo_pnl = position_data['总组合盈亏'].min()
-
- ax.text(0.02, 0.98,
- f'当前组合浮盈: {final_combo_pnl:.2f}元\n'
- f'当前卖购浮盈: {final_sell_pnl:.2f}元\n'
- f'最大组合浮盈: {max_combo_pnl:.2f}元\n'
- f'最大组合浮亏: {min_combo_pnl:.2f}元',
- transform=ax.transAxes, fontsize=10, verticalalignment='top',
- bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.8))
- plt.tight_layout()
- plt.show()
- # 策略配置类
- class StrategyConfig:
- """策略配置管理"""
-
- def __init__(self):
- # ETF标的配置字典
- self.etf_symbols = {
- '50ETF': '510050.XSHG', # 上证50ETF
- '300ETF': '510300.XSHG', # 沪深300ETF
- '创业板ETF': '159915.XSHE' # 创业板ETF
- }
-
- # 时间范围配置
- self.time_config = {
- 'start_date': '2024-06-01',
- 'end_date': '2024-08-31'
- }
-
- # 资金配置
- self.capital_config = {
- 'total_capital': 1000000, # 总资金额度(100万)
- 'capital_allocation': { # 不同标的资金分配比例
- '50ETF': 0.3, # 50ETF 30%
- '300ETF': 0.5, # 300ETF 50%
- '创业板ETF': 0.2 # 创业板ETF 20%
- },
- 'capital_usage_limit': 0.8, # 资金使用上限80%
- 'bull_spread_margin_discount': 30, # 牛差组合策略保证金优惠(每单位组合加收30元)
- 'contract_unit': 10000, # 合约单位(1张期权代表10000份标的)
- 'margin_params': { # 保证金计算参数
- 'volatility_factor': 0.12, # 波动率因子12%
- 'min_margin_factor': 0.07 # 最小保证金因子7%
- }
- }
-
- def get_allocated_capital(self, etf_code):
- """获取指定ETF的分配资金"""
- total_usable = self.capital_config['total_capital'] * self.capital_config['capital_usage_limit']
-
- # 获取实际存在的ETF列表
- available_etfs = list(self.etf_symbols.keys())
-
- # 计算实际存在ETF的原始比例总和
- original_allocation = self.capital_config['capital_allocation']
- available_ratio_sum = sum(original_allocation.get(etf, 0) for etf in available_etfs)
-
- if available_ratio_sum == 0:
- print(f"警告: 无可用ETF或比例配置错误,{etf_code}分配资金为0")
- return 0
-
- # 获取当前ETF的原始比例
- original_ratio = original_allocation.get(etf_code, 0)
-
- if original_ratio == 0:
- print(f"警告: {etf_code}未在capital_allocation中配置,分配资金为0")
- return 0
-
- # 重新调整比例:当前ETF比例 / 实际存在ETF的比例总和
- adjusted_ratio = original_ratio / available_ratio_sum
-
- allocated_capital = total_usable * adjusted_ratio
-
- print(f"资金分配调整 - {etf_code}: 原始比例{original_ratio:.1%}, 调整后比例{adjusted_ratio:.1%}, 分配资金{allocated_capital:,.0f}元")
-
- return allocated_capital
-
- def calculate_option_margin(self, option_type, settlement_price, underlying_price, strike_price):
- """
- 计算单张期权的保证金
- :param option_type: 'call' 或 'put'
- :param settlement_price: 合约前结算价
- :param underlying_price: 标的证券前收盘价
- :param strike_price: 行权价
- :return: 单张期权保证金
- """
- contract_unit = self.capital_config['contract_unit']
- volatility_factor = self.capital_config['margin_params']['volatility_factor']
- min_margin_factor = self.capital_config['margin_params']['min_margin_factor']
-
- if option_type == 'call':
- # 认购期权虚值 = max(行权价 - 标的价格, 0)
- out_of_money = max(strike_price - underlying_price, 0)
- margin = (settlement_price + max(
- volatility_factor * underlying_price - out_of_money,
- min_margin_factor * underlying_price
- )) * contract_unit
- elif option_type == 'put':
- # 认沽期权虚值 = max(标的价格 - 行权价, 0)
- out_of_money = max(underlying_price - strike_price, 0)
- margin = min(
- settlement_price + max(
- volatility_factor * underlying_price - out_of_money,
- min_margin_factor * strike_price
- ),
- strike_price
- ) * contract_unit
- else:
- raise ValueError("option_type must be 'call' or 'put'")
-
- return margin
-
- def calculate_bull_spread_margin(self, buy_call_info, sell_call_info, underlying_price):
- """
- 计算牛差组合的保证金
- :param buy_call_info: 买入认购期权信息
- :param sell_call_info: 卖出认购期权信息
- :param underlying_price: 标的价格
- :return: 牛差组合保证金
- """
- # 计算行权价差
- strike_diff = sell_call_info['exercise_price'] - buy_call_info['exercise_price']
- contract_unit = self.capital_config['contract_unit']
- margin_discount = self.capital_config['bull_spread_margin_discount']
-
- # 判断组合类型
- if strike_diff > 0:
- # 传统牛差组合(买购行权价 < 卖购行权价)
- # 保证金 = 行权价差 * 合约单位 + 保证金优惠
- bull_spread_margin = strike_diff * contract_unit + margin_discount
- else:
- # 其他情况的保证金计算
- # 使用净权利金作为保证金基础
- net_premium = buy_call_info['price'] - sell_call_info['price']
- if net_premium > 0:
- # 需要支付净权利金
- bull_spread_margin = abs(net_premium) * contract_unit + margin_discount
- else:
- # 收取净权利金,使用估算保证金
- estimated_margin = max(sell_call_info['price'] * contract_unit * 0.1, 1000) # 最少1000元保证金
- bull_spread_margin = estimated_margin
-
- return bull_spread_margin
-
- def calculate_contract_size(self, etf_code, etf_price, buy_call_info=None, sell_call_info=None):
- """
- 计算可开仓的合约数量
- :param etf_code: ETF代码
- :param etf_price: ETF价格
- :param buy_call_info: 买入认购期权信息(可选,用于牛差组合计算)
- :param sell_call_info: 卖出认购期权信息(可选,用于牛差组合计算)
- """
- allocated_capital = self.get_allocated_capital(etf_code)
-
- if buy_call_info and sell_call_info:
- # 牛差组合保证金计算
- margin_per_contract = self.calculate_bull_spread_margin(buy_call_info, sell_call_info, etf_price)
- else:
- # 单一期权保证金估算(使用卖出认购期权)
- estimated_settlement_price = etf_price * 0.02 # 估算权利金为标的价格的2%
- estimated_strike_price = etf_price * 1.05 # 估算行权价为标的价格的105%
- margin_per_contract = self.calculate_option_margin('call', estimated_settlement_price, etf_price, estimated_strike_price)
-
- # 边界条件检查
- if allocated_capital <= 0:
- print(f"警告: {etf_code} 分配资金无效({allocated_capital}),返回默认合约数量30张")
- return 30
-
- if margin_per_contract <= 0:
- print(f"警告: {etf_code} 保证金计算结果无效({margin_per_contract}),返回默认合约数量30张")
- return 30
-
- max_contracts = int(allocated_capital / margin_per_contract)
-
- # 确保结果为正数
- if max_contracts <= 0:
- print(f"警告: {etf_code} 合约数量计算结果无效({max_contracts}),返回默认合约数量30张")
- return 30
-
- return min(max_contracts, 100) # 限制最大100张
- # 多标的策略管理器
- class MultiUnderlyingBullSpreadManager:
- """多标的牛差策略管理器"""
-
- def __init__(self, config: StrategyConfig):
- self.config = config
- self.strategies = {}
- self.daily_account_records = [] # 每日账户资金记录
- self.account_csv_path = 'account_summary.csv'
- self.cumulative_realized_pnl = 0 # 累积已实现盈亏
- self.previous_date_summary = None # 前一天的账户汇总记录
- self.initialize_strategies()
-
- def initialize_strategies(self):
- """初始化各个标的的策略"""
- for etf_code, symbol in self.config.etf_symbols.items():
- allocated_capital = self.config.get_allocated_capital(etf_code)
- if allocated_capital > 0:
- strategy = DeepITMBullSpreadStrategy(
- underlying_symbol=symbol,
- start_date=self.config.time_config['start_date'],
- end_date=self.config.time_config['end_date']
- )
-
- # 动态调整策略参数
- strategy.config['allocated_capital'] = allocated_capital
- strategy.config['etf_code'] = etf_code
-
- # 设置账户管理器回调
- strategy.account_manager_callback = self.record_daily_account_summary
-
- self.strategies[etf_code] = strategy
-
- print(f"初始化{etf_code}策略,分配资金: {allocated_capital:,.0f}元")
-
- def calculate_dynamic_contract_size(self, strategy, etf_price, buy_call_info=None, sell_call_info=None):
- """动态计算合约数量"""
- etf_code = strategy.config['etf_code']
- return self.config.calculate_contract_size(etf_code, etf_price, buy_call_info, sell_call_info)
-
- def calculate_used_capital(self, strategy):
- """计算策略已使用的资金 - 支持两种策略类型"""
- used_capital = 0
- open_positions = [p for p in strategy.positions if p['status'] == 'open']
-
- for position in open_positions:
- # 获取策略类型,兼容旧版本数据
- strategy_type = position.get('strategy_type', StrategyType.BULL_SPREAD)
-
- # 两种策略都有买购期权,资金使用计算相同
- # 资金使用 = (买购权利金 - 卖购权利金) * 合约数量 * 10000
- net_premium = position['buy_call']['price'] - position['sell_call']['price']
- position_capital = net_premium * position['contract_size'] * 10000
-
- used_capital += position_capital
-
- return used_capital
-
- def record_daily_account_summary(self, trade_date):
- """记录每日账户资金汇总"""
- initial_capital = self.config.capital_config['total_capital']
- total_used_capital = 0
- total_floating_pnl = 0
- total_current_market_value = 0 # 所有策略的当前市值总和
-
- strategy_details = {}
-
- # 计算当天的已实现盈亏(通过检查平仓交易)
- today_realized_pnl = 0
- for etf_code, strategy in self.strategies.items():
- # 检查当天是否有平仓交易
- for trade_record in strategy.trade_records:
- if (trade_record.get('交易类型') == '平仓' and
- trade_record.get('交易日期') and
- trade_record.get('总盈亏') is not None):
-
- trade_date_record = trade_record.get('交易日期')
- if hasattr(trade_date_record, 'date'):
- trade_date_record = trade_date_record.date()
- elif isinstance(trade_date_record, str):
- try:
- trade_date_record = pd.to_datetime(trade_date_record).date()
- except:
- continue
-
- target_date = trade_date.date() if hasattr(trade_date, 'date') else trade_date
-
- if trade_date_record == target_date:
- today_realized_pnl += float(trade_record.get('总盈亏', 0))
-
- # 更新累积已实现盈亏
- self.cumulative_realized_pnl += today_realized_pnl
-
- # 汇总各策略的资金使用和浮动盈亏
- total_bull_spread_positions = 0
-
- for etf_code, strategy in self.strategies.items():
- used_capital = self.calculate_used_capital(strategy)
- open_positions = [p for p in strategy.positions if p['status'] == 'open']
- open_positions_count = len(open_positions)
-
- # 统计牛差策略仓位数量
- bull_spread_count = len([p for p in open_positions if p.get('strategy_type', StrategyType.BULL_SPREAD) == StrategyType.BULL_SPREAD])
-
- total_bull_spread_positions += bull_spread_count
-
- # 计算组合当前市值和浮动盈亏
- current_market_value = 0
- floating_pnl = 0
-
- if strategy.daily_positions:
- # 查找指定日期的持仓记录
- target_record = None
- for position_record in strategy.daily_positions:
- record_date = position_record.get('交易日期')
- if hasattr(record_date, 'date'):
- record_date = record_date.date()
- elif isinstance(record_date, str):
- try:
- record_date = pd.to_datetime(record_date).date()
- except:
- continue
-
- target_date = trade_date.date() if hasattr(trade_date, 'date') else trade_date
-
- if record_date == target_date:
- target_record = position_record
- break
-
- # 如果找到了对应日期的记录,计算当前市值和浮动盈亏
- if target_record:
- floating_pnl = target_record.get('总组合盈亏', 0)
- if isinstance(floating_pnl, str):
- try:
- floating_pnl = float(floating_pnl)
- except:
- floating_pnl = 0
-
- # 计算组合当前市值 = 初始投入成本 + 浮动盈亏
- current_market_value = used_capital + floating_pnl
-
- total_used_capital += used_capital
- total_floating_pnl += floating_pnl
- total_current_market_value += current_market_value
-
- strategy_details[etf_code] = {
- '分配资金': strategy.config.get('allocated_capital', 0),
- '初始投入': used_capital,
- '当前市值': current_market_value,
- '浮动盈亏': floating_pnl,
- '持仓数量': open_positions_count,
- '牛差策略数量': bull_spread_count
- }
-
- # 修正后的计算逻辑:
- # 当前总资金 = 初始资金 + 累积已实现盈亏
- current_total_capital = initial_capital + self.cumulative_realized_pnl
- # 剩余现金 = 当前总资金 - 投入资金
- remaining_cash = current_total_capital - total_used_capital
- # 账户总价值 = 剩余现金 + 组合当前市值
- total_account_value = remaining_cash + total_current_market_value
-
- # 记录账户汇总
- account_record = {
- '交易日期': trade_date,
- '总资金': current_total_capital, # 使用当前总资金而非初始资金
- '初始投入总额': total_used_capital,
- '剩余现金': remaining_cash,
- '组合当前市值': total_current_market_value,
- '总浮动盈亏': total_floating_pnl,
- '账户总价值': total_account_value,
- '累积已实现盈亏': self.cumulative_realized_pnl, # 新增字段
- '当日已实现盈亏': today_realized_pnl, # 新增字段
- '资金使用率': total_used_capital / current_total_capital if current_total_capital > 0 else 0,
- '收益率': (total_account_value - initial_capital) / initial_capital if initial_capital > 0 else 0, # 修正收益率计算
- '牛差策略总数': total_bull_spread_positions, # 新增字段
- '策略详情': strategy_details
- }
-
- self.daily_account_records.append(account_record)
- self.save_account_summary_to_csv(account_record)
-
- # 更新前一天的记录
- self.previous_date_summary = account_record
-
- # 输出整体账户汇总(只在有活动时输出)
- # if total_used_capital > 0 or total_floating_pnl != 0:
- # print(f"{trade_date.strftime('%Y-%m-%d')} 【账户汇总】总资金{total_capital:.0f}元|已投入{total_used_capital:.0f}元|剩余现金{remaining_cash:.0f}元|组合市值{total_current_market_value:.0f}元|账户总值{total_account_value:.0f}元|总收益{total_floating_pnl:.0f}元|收益率{(total_floating_pnl/total_capital)*100:.2f}%")
-
- def save_account_summary_to_csv(self, account_data):
- """保存账户汇总到CSV文件"""
- try:
- # 复制数据并处理日期格式
- data_copy = account_data.copy()
-
- # 转换日期字段
- if '交易日期' in data_copy and data_copy['交易日期'] is not None:
- if hasattr(data_copy['交易日期'], 'strftime'):
- data_copy['交易日期'] = data_copy['交易日期'].strftime('%Y-%m-%d')
- elif hasattr(data_copy['交易日期'], 'date'):
- data_copy['交易日期'] = data_copy['交易日期'].date().strftime('%Y-%m-%d')
- else:
- try:
- date_obj = pd.to_datetime(data_copy['交易日期'])
- data_copy['交易日期'] = date_obj.strftime('%Y-%m-%d')
- except:
- pass
-
- # 处理策略详情字段
- if '策略详情' in data_copy and isinstance(data_copy['策略详情'], dict):
- strategy_strings = []
- for etf_code, details in data_copy['策略详情'].items():
- bull_count = details.get('牛差策略数量', 0)
- strategy_str = f"{etf_code}(分配{details['分配资金']:.0f}|投入{details['初始投入']:.0f}|市值{details['当前市值']:.0f}|浮盈{details['浮动盈亏']:.0f}|持仓{details['持仓数量']}|牛差{bull_count})"
- strategy_strings.append(strategy_str)
- data_copy['策略详情'] = '; '.join(strategy_strings)
-
- # 创建DataFrame,排除复杂字段
- csv_data = {k: v for k, v in data_copy.items() if not isinstance(v, dict)}
- df = pd.DataFrame([csv_data])
-
- # 检查文件是否存在
- file_exists = os.path.exists(self.account_csv_path)
-
- # 保存到CSV
- if file_exists:
- df.to_csv(self.account_csv_path, mode='a', header=False, index=False, encoding='utf-8-sig')
- else:
- df.to_csv(self.account_csv_path, mode='w', header=True, index=False, encoding='utf-8-sig')
-
- except Exception as e:
- print(f"保存账户汇总到CSV时出错: {e}")
- print(f"问题数据: {account_data}")
-
- def plot_account_summary(self):
- """绘制整体账户资金曲线"""
- if not self.daily_account_records:
- print("没有账户数据可以绘制")
- return
-
- # 转换为DataFrame
- account_df = pd.DataFrame(self.daily_account_records)
-
- if account_df.empty:
- print("没有账户数据可以绘制")
- return
-
- # print(f"账户数据记录数量: {len(account_df)}")
- # print(f"账户数据列: {account_df.columns.tolist()}")
- # print(f"前几行数据:\n{account_df.head()}")
-
- # 确保日期格式正确
- account_df['交易日期'] = pd.to_datetime(account_df['交易日期'], errors='coerce')
- account_df = account_df.dropna(subset=['交易日期']).sort_values('交易日期')
-
- if account_df.empty:
- print("日期转换后没有有效数据")
- return
-
- # 确保数值字段为数值类型,处理可能的字符串或其他类型
- numeric_columns = ['总资金', '初始投入总额', '剩余现金', '组合当前市值', '总浮动盈亏', '账户总价值', '资金使用率', '收益率', '牛差策略总数']
-
- for col in numeric_columns:
- if col in account_df.columns:
- # 先转换为字符串,移除可能的非数字字符
- account_df[col] = account_df[col].astype(str).str.replace('[^\d.-]', '', regex=True)
- # 再转换为数值类型
- account_df[col] = pd.to_numeric(account_df[col], errors='coerce').fillna(0)
- print(f"列 {col} 数据范围: {account_df[col].min()} 到 {account_df[col].max()}")
-
- # 检查是否有有效的数值数据
- if account_df[['总资金', '账户总价值']].isna().all().all():
- print("所有数值数据都无效,无法绘图")
- return
-
- # 绘图
- fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 10))
-
- try:
- # 上图:账户总价值曲线
- # 转换为numpy数组以确保数据类型
- dates = account_df['交易日期'].values
- total_values = account_df['账户总价值'].values.astype(float)
- initial_capitals = account_df['总资金'].values.astype(float)
-
- ax1.plot(dates, total_values,
- label='账户总价值', color='green', linewidth=2)
- ax1.plot(dates, initial_capitals,
- label='初始资金', color='gray', linewidth=1, linestyle='--')
-
- # 填充区域(只有在数据有效时)
- if len(total_values) > 0 and len(initial_capitals) > 0:
- profit_mask = total_values >= initial_capitals
- loss_mask = total_values < initial_capitals
-
- if profit_mask.any():
- ax1.fill_between(dates, initial_capitals, total_values,
- where=profit_mask,
- color='green', alpha=0.3, label='盈利区域')
- if loss_mask.any():
- ax1.fill_between(dates, initial_capitals, total_values,
- where=loss_mask,
- color='red', alpha=0.3, label='亏损区域')
-
- ax1.set_title('整体账户资金曲线', fontsize=14)
- ax1.set_ylabel('资金 (元)', fontsize=12)
- ax1.legend(fontsize=10)
- ax1.grid(True, alpha=0.3)
-
- # 下图:资金构成
- remaining_cash = account_df['剩余现金'].values.astype(float)
- current_market_value = account_df['组合当前市值'].values.astype(float)
-
- ax2.plot(dates, remaining_cash,
- label='剩余现金', color='blue', linewidth=2)
- ax2.plot(dates, current_market_value,
- label='组合当前市值', color='orange', linewidth=2, linestyle='--')
-
- ax2.axhline(y=0, color='black', linestyle='-', alpha=0.3)
- ax2.set_title('资金构成分析', fontsize=14)
- ax2.set_ylabel('金额 (元)', fontsize=12)
- ax2.set_xlabel('日期', fontsize=12)
- ax2.legend(fontsize=10)
- ax2.grid(True, alpha=0.3)
-
- # 格式化日期显示
- import matplotlib.dates as mdates
- for ax in [ax1, ax2]:
- ax.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m-%d'))
- ax.xaxis.set_major_locator(mdates.MonthLocator())
-
- plt.xticks(rotation=45)
-
- # 添加数据标注
- if len(total_values) > 0:
- final_value = float(total_values[-1])
- initial_capital = float(initial_capitals[0])
- total_return = final_value - initial_capital
- return_rate = total_return / initial_capital if initial_capital > 0 else 0
- max_value = float(np.max(total_values))
- min_value = float(np.min(total_values))
-
- # 获取最新的策略类型统计
- final_bull_count = account_df['牛差策略总数'].iloc[-1] if '牛差策略总数' in account_df.columns else 0
-
- ax1.text(0.02, 0.98,
- f'当前账户价值: {final_value:,.0f}元\n'
- f'总收益: {total_return:,.0f}元\n'
- f'收益率: {return_rate:.2%}\n'
- f'最高价值: {max_value:,.0f}元\n'
- f'最低价值: {min_value:,.0f}元\n'
- f'当前持仓:牛差{final_bull_count}个',
- transform=ax1.transAxes, fontsize=10, verticalalignment='top',
- bbox=dict(boxstyle='round', facecolor='lightblue', alpha=0.8))
-
- plt.tight_layout()
- plt.show()
-
- except Exception as e:
- print(f"绘图过程中出错: {e}")
- print(f"账户数据样本:\n{account_df[['交易日期', '总资金', '账户总价值', '剩余现金', '组合当前市值', '总浮动盈亏']].head()}")
- # 尝试简化版本的绘图
- try:
- fig2, ax = plt.subplots(1, 1, figsize=(10, 6))
- ax.plot(range(len(account_df)), account_df['账户总价值'].astype(float),
- label='账户总价值', color='green', linewidth=2)
- ax.set_title('简化版账户资金曲线')
- ax.set_ylabel('资金 (元)')
- ax.set_xlabel('交易日序号')
- ax.legend()
- ax.grid(True)
- plt.show()
- except Exception as e2:
- print(f"简化绘图也失败: {e2}")
-
- def run_all_strategies(self):
- """运行所有策略"""
- print("="*60)
- print("开始运行多标的深度实值买购和卖购组合的牛差策略")
- print("="*60)
-
- # 获取统一的交易日历(取第一个策略的交易日历作为基准)
- if not self.strategies:
- print("没有可运行的策略")
- return {}
-
- first_strategy = list(self.strategies.values())[0]
- trade_days = first_strategy.trade_days.index
-
- # 为每个策略设置动态合约数量计算方法
- for etf_code, strategy in self.strategies.items():
- original_open_method = strategy.open_position # 使用新的统一开仓方法
-
- def make_dynamic_open_position(strat, orig_method):
- def dynamic_open_position(trade_date, etf_price, position_type='main', silent=False, save_to_csv=True):
- # 首先用原方法获取期权信息,但不保存到CSV
- original_contract_size = strat.config['contract_size']
- strat.config['contract_size'] = 1 # 临时设置为1张以获取期权信息
-
- temp_result = orig_method(trade_date, etf_price, position_type, silent=True, save_to_csv=False)
-
- if temp_result:
- # 根据策略类型获取期权信息进行动态计算
- strategy_type = temp_result.get('strategy_type', StrategyType.BULL_SPREAD)
- sell_call_info = temp_result['sell_call']
- buy_call_info = temp_result['buy_call'] # 两种策略都有买购期权
-
- # 两种策略都使用买购和卖购期权信息
- dynamic_size = self.calculate_dynamic_contract_size(strat, etf_price, buy_call_info, sell_call_info)
-
- # 验证动态合约数量,确保为正数
- if dynamic_size <= 0:
- print(f" {strat.config['etf_code']}: 动态合约数量计算结果无效({dynamic_size}),使用默认值30张")
- dynamic_size = 30 # 使用默认值
-
- # 更新合约数量并重新开仓
- strat.config['contract_size'] = dynamic_size
- strat._validate_contract_size() # 验证合约数量
-
- # 移除临时仓位和交易记录
- if strat.positions and strat.positions[-1] == temp_result:
- strat.positions.pop()
- if strat.trade_records and len(strat.trade_records) > 0:
- # 检查最后一条记录是否是刚刚添加的临时记录
- last_record = strat.trade_records[-1]
- if (last_record.get('交易类型') == '开仓' and
- not last_record.get('平仓原因') and
- last_record.get('合约数量') == 1):
- strat.trade_records.pop()
-
- # 重新开仓,这次保存到CSV
- result = orig_method(trade_date, etf_price, position_type, silent, save_to_csv)
- else:
- # 第一次调用失败,恢复原始合约数量设置,用传入的silent参数重新调用显示失败详情
- strat.config['contract_size'] = original_contract_size
- result = orig_method(trade_date, etf_price, position_type, silent, save_to_csv)
- return result
-
- # 恢复原始设置
- strat.config['contract_size'] = original_contract_size
- return result
- return dynamic_open_position
-
- strategy.open_position = make_dynamic_open_position(strategy, original_open_method) # 更新方法名
-
- # 按日期统一运行所有策略
- print(f"开始按日期统一运行策略,共{len(trade_days)}个交易日")
-
- for i, trade_date in enumerate(trade_days):
- # print(f"\n处理交易日 {trade_date.strftime('%Y-%m-%d')} ({i+1}/{len(trade_days)})")
-
- # 为每个策略处理当天的交易
- daily_has_activity = False
-
- for etf_code, strategy in self.strategies.items():
- try:
- # 获取ETF价格
- price_data = get_price(strategy.underlying_symbol, trade_date, trade_date, fields=['close'])['close']
- if price_data.empty:
- print(f" {etf_code}: 获取ETF价格失败")
- continue
- etf_price = price_data.iloc[0]
-
- # 记录每日持仓状况(但不触发账户汇总回调)
- strategy.record_daily_positions(trade_date, etf_price)
-
- # 检查是否需要平仓
- if len(strategy.positions) > 0:
- for position in strategy.positions:
- should_close, reason = strategy.should_close_position(position, trade_date, etf_price)
- if should_close:
- strategy.close_position(position, trade_date, etf_price, reason)
- print(f" {etf_code}: 平仓 {reason}, ETF价格: {etf_price:.4f}")
- daily_has_activity = True
- # 检查是否需要开新仓(首次开仓或平仓后重新开仓)
- open_positions = [p for p in strategy.positions if p['status'] == 'open']
- if not open_positions:
- new_position = strategy.open_position(trade_date, etf_price, 'main', silent=False) # 使用新的统一开仓方法
- if new_position:
- max_profit = new_position['profit_info']['total_max_profit']
- contract_size = new_position['contract_size']
-
- # 牛差策略输出
- strategy_desc = f"牛差组合策略"
- profit_desc = f"最大牛差收益: {max_profit:.2f}元"
-
- print(f" {etf_code}: 开仓主仓位({strategy_desc}),ETF价格: {etf_price:.4f}, 合约数量: {contract_size}张, {profit_desc}")
- daily_has_activity = True
- # else:
- # print(f" {etf_code}: 开仓失败,无法找到合适的期权组合,ETF价格: {etf_price:.4f}")
-
- # 检查是否需要加仓
- elif strategy.should_add_position(trade_date, etf_price):
- add_position = strategy.open_position(trade_date, etf_price, 'add', silent=False) # 使用新的统一开仓方法
- if add_position:
- max_profit = add_position['profit_info']['total_max_profit']
- contract_size = add_position['contract_size']
-
- # 牛差策略输出
- strategy_desc = f"牛差组合策略"
- profit_desc = f"最大牛差收益: {max_profit:.2f}元"
-
- print(f" {etf_code}: 加仓({strategy_desc}),ETF价格: {etf_price:.4f}, 合约数量: {contract_size}张, {profit_desc}")
- daily_has_activity = True
-
- # 更新策略的前一日ETF价格
- strategy.previous_etf_price = etf_price
-
- except Exception as e:
- print(f" {etf_code}: 处理失败 - {e}")
-
- # 统一记录当天的账户汇总(无论是否有交易活动)
- try:
- self.record_daily_account_summary(trade_date)
- if daily_has_activity:
- print(f" 账户汇总已更新")
- except Exception as e:
- print(f" 记录账户汇总失败: {e}")
-
- # 收集所有策略的结果
- results = {}
- for etf_code, strategy in self.strategies.items():
- try:
- results[etf_code] = {
- 'strategy': strategy,
- 'summary': strategy.get_performance_summary(),
- 'allocated_capital': strategy.config['allocated_capital']
- }
- print(f"{etf_code}策略运行完成")
- except Exception as e:
- print(f"{etf_code}策略结果收集出错: {e}")
- results[etf_code] = {'error': str(e)}
-
- print("\n所有策略运行完成!")
- return results
-
- def generate_overall_report(self, results):
- """生成总体报告"""
- print("\n" + "="*60)
- print("多标的牛差策略总体报告")
- print("="*60)
-
- total_pnl = 0
- total_trades = 0
- total_winning_trades = 0
-
- for etf_code, result in results.items():
- if 'error' in result:
- print(f"\n{etf_code}: 策略执行出错 - {result['error']}")
- continue
-
- strategy = result['strategy']
- allocated_capital = result['allocated_capital']
-
- print(f"\n{etf_code} 策略结果:")
- print(f"分配资金: {allocated_capital:,.0f}元")
- print(result['summary'])
-
- # 统计总体数据
- closed_positions = [p for p in strategy.positions if p['status'] == 'closed']
- if closed_positions:
- strategy_pnl = sum(p['pnl'] for p in closed_positions)
- strategy_trades = len(closed_positions)
- strategy_winning = len([p for p in closed_positions if p['pnl'] > 0])
-
- total_pnl += strategy_pnl
- total_trades += strategy_trades
- total_winning_trades += strategy_winning
-
- # 总体统计
- if total_trades > 0:
- overall_win_rate = total_winning_trades / total_trades
- print(f"\n总体策略表现:")
- print(f"总交易次数: {total_trades}")
- print(f"总获利交易: {total_winning_trades}")
- print(f"总体胜率: {overall_win_rate:.2%}")
- print(f"总盈亏: {total_pnl:.2f}元")
- print(f"平均每笔盈亏: {total_pnl/total_trades:.2f}元")
- print(f"总资金收益率: {total_pnl/self.config.capital_config['total_capital']:.2%}")
- # 使用示例
- def run_deep_itm_bull_spread_example():
- """运行深度实值买购和卖购组合牛差策略示例"""
-
- # 创建策略配置
- config = StrategyConfig()
-
- # 可以自定义配置
- config.time_config = {
- 'start_date': '2025-06-15',
- 'end_date': '2025-08-25'
- }
-
- config.capital_config.update({
- 'total_capital': 1000000, # 100万总资金
- 'capital_allocation': {
- '50ETF': 0.5, # 50ETF 50%
- '300ETF': 0.3, # 300ETF 30%
- '创业板ETF': 0.2 # 创业板ETF 20%
- },
- 'capital_usage_limit': 0.8, # 使用80%资金
- 'bull_spread_margin_discount': 30, # 牛差组合策略保证金优惠(每单位组合加收30元)
- 'contract_unit': 10000, # 合约单位
- 'margin_params': { # 保证金计算参数(按交易所规定)
- 'volatility_factor': 0.12, # 波动率因子12%
- 'min_margin_factor': 0.07 # 最小保证金因子7%
- }
- })
-
- # 创建多标的策略管理器
- manager = MultiUnderlyingBullSpreadManager(config)
-
- # 运行所有策略
- results = manager.run_all_strategies()
-
- # 生成报告
- manager.generate_overall_report(results)
-
- # 绘制整体账户资金曲线
- try:
- print(f"\n绘制整体账户资金曲线...")
- manager.plot_account_summary()
- except Exception as e:
- print(f"绘制整体账户资金曲线时出错: {e}")
-
- # 绘制各策略结果(可选)
- for etf_code, result in results.items():
- if 'strategy' in result:
- try:
- print(f"\n绘制{etf_code}策略结果...")
- result['strategy'].plot_results()
- except Exception as e:
- print(f"绘制{etf_code}图表时出错: {e}")
|