# 深度实值买购和卖购组合的牛差策略 # 策略说明:使用深度实值买购期权替代ETF持仓,结合卖购期权构建牛差组合 # 参考资料:基于加百列分享中的50ETF期权备兑认购策略改进 import jqdata from jqdata import * import pandas as pd import numpy as np import datetime import matplotlib.pyplot as plt import os plt.rcParams['font.sans-serif'] = ['SimHei'] plt.rcParams['axes.unicode_minus'] = False class DeepITMBullSpreadStrategy: """深度实值买购和卖购组合的牛差策略""" def __init__(self, underlying_symbol='510300.XSHG', start_date='2024-01-01', end_date='2024-12-31'): """初始化策略参数""" self.underlying_symbol = underlying_symbol self.start_date = start_date self.end_date = end_date # 策略参数设置 - 对应README.md中的阈值设定 self.config = { 'contract_size': 30, # 一组张数 'min_premium': {'510300': 0.03, '510050': 0.05, '159915': 0.03}, # 最小权利金 'min_days_to_expiry': 20, # 最少开仓日期(距离到期日) 'call_time_value_threshold': 0.01, # 买购时间价值阈值(README中为0.01) 'put_close_premium_threshold': 0.005, # 卖购平仓权利金阈值(README中为50单位,转换为0.005) 'max_days_before_expiry': 7, # 合约到期移仓日期最大(交易日) 'min_days_before_expiry': 2, # 合约到期移仓日期最小(交易日) 'add_position_threshold': {'510300': 0.2, '510050': 0.1, '159915': 0.15}, # 加仓窗口阈值 'max_add_positions': 2, # 加仓次数上限 'max_profit_close_threshold': 0.95 } # 持仓和交易记录 self.positions = [] self.trade_records = [] # CSV文件输出设置 self.transaction_csv_path = 'transaction.csv' self.position_csv_path = 'position.csv' self.daily_positions = [] # 每日持仓记录 # 账户管理器回调(用于通知账户管理器更新汇总) self.account_manager_callback = None # 获取交易日历和月度分割点 self.trade_days = pd.Series(index=jqdata.get_trade_days(start_date, end_date)) self.trade_days.index = pd.to_datetime(self.trade_days.index) self.month_split = list(self.trade_days.resample('M', label='left').mean().index) + [pd.to_datetime(end_date)] def get_underlying_code(self): """获取标的ETF的简化代码""" if '510300' in self.underlying_symbol: return '510300' elif '510050' in self.underlying_symbol: return '510050' elif '159915' in self.underlying_symbol: return '159915' return '510300' # 默认 def get_option_contracts(self, trade_date, month_idx, contract_type='CO'): """获取指定月份的期权合约信息""" underlying_code = self.get_underlying_code() # 确保日期格式一致 start_date = self.month_split[month_idx].date() if hasattr(self.month_split[month_idx], 'date') else self.month_split[month_idx] end_date = self.month_split[month_idx + 1].date() if hasattr(self.month_split[month_idx + 1], 'date') else self.month_split[month_idx + 1] query_date = trade_date.date() if hasattr(trade_date, 'date') else trade_date # print(f"{trade_date.strftime('%Y-%m-%d')} 获取期权合约信息: start_date: {start_date}, end_date: {end_date}, query_date: {query_date}") q_contract_info = query( opt.OPT_CONTRACT_INFO.code, opt.OPT_CONTRACT_INFO.trading_code, opt.OPT_CONTRACT_INFO.name, opt.OPT_CONTRACT_INFO.exercise_price, opt.OPT_CONTRACT_INFO.last_trade_date, opt.OPT_CONTRACT_INFO.list_date ).filter( opt.OPT_CONTRACT_INFO.contract_type == contract_type, opt.OPT_CONTRACT_INFO.exchange_code == 'XSHG', opt.OPT_CONTRACT_INFO.last_trade_date > start_date, opt.OPT_CONTRACT_INFO.last_trade_date <= end_date, opt.OPT_CONTRACT_INFO.list_date < query_date ) contract_info = opt.run_query(q_contract_info) contract_info = contract_info[contract_info['trading_code'].str[:6] == underlying_code] return contract_info def select_call_option(self, trade_date, etf_price, month_idx, silent=False): """选择卖购期权(平值期权) 返回: (result, reason) - result为期权信息或None,reason为失败原因或None """ underlying_code = self.get_underlying_code() min_premium = self.config['min_premium'][underlying_code] min_days_to_expiry = self.config['min_days_to_expiry'] # 检查当月合约 contract_info = self.get_option_contracts(trade_date, month_idx, 'CO') if not contract_info.empty: contract_info['price_spread'] = abs(contract_info['exercise_price'] - etf_price) contract_info = contract_info.sort_values('price_spread') closest_contract = contract_info.iloc[0] # 检查到期日 expiry_date = pd.to_datetime(closest_contract['last_trade_date']) trade_date_obj = trade_date.date() if hasattr(trade_date, 'date') else trade_date days_to_expiry = len(get_trade_days(trade_date_obj, expiry_date.date())) - 1 if days_to_expiry >= min_days_to_expiry: # print(f"{trade_date.strftime('%Y-%m-%d')} 选择卖购期权: {closest_contract}") try: query_date = trade_date.date() if hasattr(trade_date, 'date') else trade_date q_price = query(opt.OPT_DAILY_PRICE.close).filter( opt.OPT_DAILY_PRICE.code == closest_contract['code'], opt.OPT_DAILY_PRICE.date == query_date ) price_result = opt.run_query(q_price) if not price_result.empty: option_price = price_result['close'].iloc[0] # 检查当月是否满足最小权利金要求 if option_price >= min_premium: selected_call = { 'code': closest_contract['code'], 'exercise_price': closest_contract['exercise_price'], 'price': option_price, 'expiry_date': closest_contract['last_trade_date'] } if not silent: print(f"{trade_date.strftime('%Y-%m-%d')} 卖购期权选择成功 - ETF价格: {etf_price:.4f}, 选中行权价: {closest_contract['exercise_price']:.2f}, 价差: {abs(closest_contract['exercise_price'] - etf_price):.4f}, 权利金: {option_price:.4f}") return selected_call, None else: reason = f"当月期权权利金不足:{option_price:.4f} < {min_premium:.4f}" if not silent: print(f"{trade_date.strftime('%Y-%m-%d')} 因为小于最小权利金,所以卖购期权选择失败 - ETF价格: {etf_price:.4f}, 无符合条件的期权") else: reason = f"当月期权无价格数据:{closest_contract['code']}" if not silent: print(f"{trade_date.strftime('%Y-%m-%d')} 当月期权无价格数据") except Exception as e: reason = f"当月期权价格查询失败:{str(e)}" if not silent: print(f"{trade_date.strftime('%Y-%m-%d')} 因为交易失败,所以卖购期权选择失败 - ETF价格: {etf_price:.4f}, 无符合条件的期权") else: reason = f"当月期权到期时间不足:剩余{days_to_expiry}天 < {min_days_to_expiry}天" if not silent: print(f"{trade_date.strftime('%Y-%m-%d')} 因为剩余交易日 {days_to_expiry} 小于最小到期日 {min_days_to_expiry},所以卖购期权选择失败 - ETF价格: {etf_price:.4f}, 无符合条件的期权") else: reason = f"当月无可用期权合约:月份索引{month_idx}" # 如果当月不满足(时间或价格),检查下月 if month_idx + 1 < len(self.month_split) - 1: next_month_contracts = self.get_option_contracts(trade_date, month_idx + 1, 'CO') if not next_month_contracts.empty: # print(f"{trade_date.strftime('%Y-%m-%d')} 检查下月卖购期权: {next_month_contracts}") next_month_contracts['price_spread'] = abs(next_month_contracts['exercise_price'] - etf_price) next_month_contracts = next_month_contracts.sort_values('price_spread') # 选择差价最小的下月期权,然后检查价格是否满足 closest_next = next_month_contracts.iloc[0] # 检查到期日 expiry_date_next = pd.to_datetime(closest_next['last_trade_date']) trade_date_obj_next = trade_date.date() if hasattr(trade_date, 'date') else trade_date days_to_expiry_next = len(get_trade_days(trade_date_obj_next, expiry_date_next.date())) - 1 if days_to_expiry_next >= min_days_to_expiry: try: query_date = trade_date.date() if hasattr(trade_date, 'date') else trade_date q_price = query(opt.OPT_DAILY_PRICE.close).filter( opt.OPT_DAILY_PRICE.code == closest_next['code'], opt.OPT_DAILY_PRICE.date == query_date ) price_result = opt.run_query(q_price) if not price_result.empty: option_price = price_result['close'].iloc[0] if option_price >= 0.05: # 下月最少0.05以上 selected_call = { 'code': closest_next['code'], 'exercise_price': closest_next['exercise_price'], 'price': option_price, 'expiry_date': closest_next['last_trade_date'] } if not silent: print(f"{trade_date.strftime('%Y-%m-%d')} 下月卖购期权选择成功 - ETF价格: {etf_price:.4f}, 选中行权价: {closest_next['exercise_price']:.2f}, 价差: {abs(closest_next['exercise_price'] - etf_price):.4f}, 权利金: {option_price:.4f}") return selected_call, None else: reason = f"下月期权权利金不足:{option_price:.4f} < 0.05" if not silent: print(f"{trade_date.strftime('%Y-%m-%d')} 下月卖购期权选择失败 - 当前候选期权为{closest_next['exercise_price']:.2f}@{option_price:.4f},因为权利金小于0.05,所以无符合条件的期权") return None, reason else: reason = f"下月期权无价格数据:{closest_next['code']}" if not silent: print(f"{trade_date.strftime('%Y-%m-%d')} 下月期权无价格数据") return None, reason except Exception as e: reason = f"下月期权价格查询失败:{str(e)}" if not silent: print(f"{trade_date.strftime('%Y-%m-%d')} 下月卖购期权选择失败 - ETF价格: {etf_price:.4f}, 因为获取价格失败,无符合条件的期权") return None, reason else: reason = f"下月期权到期时间不足:剩余{days_to_expiry_next}天 < {min_days_to_expiry}天" if not silent: print(f"{trade_date.strftime('%Y-%m-%d')} 因为剩余交易日 {days_to_expiry_next} 小于最小到期日 {min_days_to_expiry},所以下月卖购期权选择失败 - ETF价格: {etf_price:.4f}, 无符合条件的期权") return None, reason else: reason = f"下月无可用期权合约:月份索引{month_idx + 1}" if not silent: print(f"{trade_date.strftime('%Y-%m-%d')} 下月卖购期权选择失败 - ETF价格: {etf_price:.4f}, 无符合条件的期权") return None, reason else: # 当月失败且没有更多月份可检查时,返回当月的失败原因 pass return None, reason def select_deep_itm_call(self, trade_date, etf_price, month_idx, silent=False): """选择深度实值买购期权 返回: (result, reason) - result为期权信息或None,reason为失败原因或None """ contract_info = self.get_option_contracts(trade_date, month_idx, 'CO') if contract_info.empty: reason = f"月份索引{month_idx}无可用期权合约" if not silent: print(f"{trade_date.strftime('%Y-%m-%d')} 买购期权选择失败,因为contract_info为空 - ETF价格: {etf_price:.4f}, 无可用的深度实值期权") return None, reason # 筛选深度实值期权(行权价 < ETF价格) itm_contracts = contract_info[contract_info['exercise_price'] < etf_price].copy() if itm_contracts.empty: reason = f"无深度实值期权:所有{len(contract_info)}个期权的行权价都 >= ETF价格{etf_price:.4f}" if not silent: print(f"{trade_date.strftime('%Y-%m-%d')} 买购期权选择失败,因为itm_contracts为空 - ETF价格: {etf_price:.4f}, 无可用的深度实值期权") return None, reason # 按行权价与ETF价格的差异排序(升序,最接近的在前) itm_contracts['price_diff'] = abs(itm_contracts['exercise_price'] - etf_price) itm_contracts = itm_contracts.sort_values('price_diff') # 记录检查过程中的详细信息 checked_count = 0 failed_reasons = [] time_value_threshold = self.config['call_time_value_threshold'] # 计算时间价值并筛选,找到第一个符合条件的 for _, contract in itm_contracts.iterrows(): checked_count += 1 try: query_date = trade_date.date() if hasattr(trade_date, 'date') else trade_date q_price = query(opt.OPT_DAILY_PRICE.close).filter( opt.OPT_DAILY_PRICE.code == contract['code'], opt.OPT_DAILY_PRICE.date == query_date ) price_result = opt.run_query(q_price) if price_result.empty: failed_reasons.append(f"行权价{contract['exercise_price']:.2f}无价格数据") continue option_price = price_result['close'].iloc[0] intrinsic_value = etf_price - contract['exercise_price'] time_value = max(0, option_price - intrinsic_value) # 时间价值小于阈值 if time_value < time_value_threshold: selected_buy_call = { 'code': contract['code'], 'exercise_price': contract['exercise_price'], 'price': option_price, 'time_value': time_value, 'expiry_date': contract['last_trade_date'], 'price_diff': abs(contract['exercise_price'] - etf_price) } if not silent: print(f"{trade_date.strftime('%Y-%m-%d')} 买购期权选择成功 - ETF价格: {etf_price:.4f}, 选中行权价: {selected_buy_call['exercise_price']:.2f}, 时间价值: {selected_buy_call['time_value']:.4f}, 权利金: {selected_buy_call['price']:.4f}") return selected_buy_call, None else: failed_reasons.append(f"行权价{contract['exercise_price']:.2f}时间价值过高({time_value:.4f}>={time_value_threshold:.4f})") except Exception as e: failed_reasons.append(f"行权价{contract['exercise_price']:.2f}价格查询失败({str(e)})") continue # 构建详细的失败原因 reason = f"检查了{checked_count}个深度实值期权均不满足要求(时间价值需<{time_value_threshold:.4f}): {'; '.join(failed_reasons[:3])}" + ("..." if len(failed_reasons) > 3 else "") if not silent: print(f"{trade_date.strftime('%Y-%m-%d')} 买购期权选择失败,因为无符合时间价值要求的深度实值期权(时间价值需<{self.config['call_time_value_threshold']:.4f}) - ETF价格: {etf_price:.4f}") return None, reason def calculate_bull_spread_profit(self, buy_call, sell_call): """计算牛差组合的盈利情况""" # 单张最大盈利 = (卖购行权价 - 买购行权价 - 买购权利金 + 卖购权利金) * 10000 max_profit_per_contract = ( sell_call['exercise_price'] - buy_call['exercise_price'] - buy_call['price'] + sell_call['price'] ) * 10000 # 最小盈利(卖购权利金) min_profit_per_contract = sell_call['price'] * 10000 return { 'max_profit_per_contract': max_profit_per_contract, 'min_profit_per_contract': min_profit_per_contract, 'total_max_profit': max_profit_per_contract * self.config['contract_size'], 'total_min_profit': min_profit_per_contract * self.config['contract_size'] } def open_bull_spread_position(self, trade_date, etf_price, position_type='main', silent=False, save_to_csv=True): """开仓牛差组合""" # 确定月份索引 # print(f"{trade_date.strftime('%Y-%m-%d')} 开仓牛差组合: {trade_date}, {etf_price}, {position_type}, {silent}") month_idx = 0 for i, month_date in enumerate(self.month_split[:-1]): if trade_date >= month_date: month_idx = i # 选择卖购期权 sell_call, sell_call_reason = self.select_call_option(trade_date, etf_price, month_idx, silent) if not sell_call: # 无论silent如何,都输出卖购期权选择失败的日志和具体原因 print(f"{trade_date.strftime('%Y-%m-%d')} 卖购期权选择失败,无法开仓牛差组合 - ETF价格: {etf_price:.4f},失败原因: {sell_call_reason}") return None # 确定卖购期权的真实月份索引(基于到期日) sell_expiry = pd.to_datetime(sell_call['expiry_date']) sell_call_month_idx = month_idx # 默认使用当前月份 for i, month_date in enumerate(self.month_split[:-1]): if sell_expiry <= self.month_split[i + 1]: sell_call_month_idx = i break # 选择买购期权(深度实值,必须与卖购期权月份完全一致) buy_call, buy_call_reason = self.select_deep_itm_call(trade_date, etf_price, sell_call_month_idx, silent) if not buy_call: # 无论silent如何,都输出买购期权选择失败的日志和具体原因 print(f"{trade_date.strftime('%Y-%m-%d')} 买购期权选择失败,无法构建牛差组合 - ETF价格: {etf_price:.4f},失败原因: {buy_call_reason}") return None # 验证买购和卖购期权的到期日是否一致 buy_expiry = pd.to_datetime(buy_call['expiry_date']) if buy_expiry != sell_expiry: # 无论silent如何,都输出到期日不一致的日志 print(f"{trade_date.strftime('%Y-%m-%d')} 买购和卖购期权到期日不一致,无法构建牛差组合: 买购到期{buy_expiry.strftime('%Y-%m-%d')} vs 卖购到期{sell_expiry.strftime('%Y-%m-%d')} - ETF价格: {etf_price:.4f}") return None # 计算盈利信息 profit_info = self.calculate_bull_spread_profit(buy_call, sell_call) # 创建仓位记录 position = { 'open_date': trade_date, 'etf_price': etf_price, 'buy_call': buy_call, 'sell_call': sell_call, 'contract_size': self.config['contract_size'], 'profit_info': profit_info, 'position_type': position_type, 'status': 'open', 'add_position_trigger_price': etf_price - self.config['add_position_threshold'][self.get_underlying_code()] if position_type == 'main' else None } self.positions.append(position) # 记录交易(内存) trade_record = { '交易日期': trade_date, '交易类型': '开仓', '仓位类型': position_type, 'ETF标的': self.underlying_symbol, '买购期权价格': buy_call['price'], '买购期权行权价': buy_call['exercise_price'], '买购期权到期日': buy_call['expiry_date'], '卖购期权价格': sell_call['price'], '卖购期权行权价': sell_call['exercise_price'], '卖购期权到期日': sell_call['expiry_date'], '合约数量': self.config['contract_size'], 'ETF价格': etf_price, '单张最大盈利': profit_info['max_profit_per_contract'], '单张最小盈利': profit_info['min_profit_per_contract'], '总最大盈利': profit_info['total_max_profit'], '总最小盈利': profit_info['total_min_profit'] } self.trade_records.append(trade_record) # 保存交易记录到CSV(如果需要) if save_to_csv: self.save_transaction_to_csv(trade_record) return position def should_close_position(self, position, current_date, etf_price): """判断是否应该平仓""" if position['status'] != 'open': return False, None # 检查合约到期时间 expiry_date = pd.to_datetime(position['sell_call']['expiry_date']) # 确保日期格式正确 current_date_obj = current_date.date() if hasattr(current_date, 'date') else current_date expiry_date_obj = expiry_date.date() if hasattr(expiry_date, 'date') else expiry_date days_to_expiry = len(get_trade_days(current_date_obj, expiry_date_obj)) - 1 # print(f"{current_date.strftime('%Y-%m-%d')} 检查是否按照过期时间规则平仓: 过期天数{days_to_expiry}, 最大过期天数{self.config['max_days_before_expiry']}") # 到期日临近 if days_to_expiry <= self.config['max_days_before_expiry']: return True, '过期时间平仓' # 获取卖购行权价 sell_call_strike = position['sell_call']['exercise_price'] # 根据ETF价格与卖购行权价的关系选择平仓条件 if etf_price >= sell_call_strike: # ETF价格大于等于卖购行权价时,只考虑最大盈利平仓条件 try: query_date = current_date.date() if hasattr(current_date, 'date') else current_date # 获取买购期权价格 q_buy_price = query(opt.OPT_DAILY_PRICE.close).filter( opt.OPT_DAILY_PRICE.code == position['buy_call']['code'], opt.OPT_DAILY_PRICE.date == query_date ) buy_price_result = opt.run_query(q_buy_price) # 获取卖购期权价格 q_sell_price = query(opt.OPT_DAILY_PRICE.close).filter( opt.OPT_DAILY_PRICE.code == position['sell_call']['code'], opt.OPT_DAILY_PRICE.date == query_date ) sell_price_result = opt.run_query(q_sell_price) if not buy_price_result.empty and not sell_price_result.empty: current_buy_call_price = buy_price_result['close'].iloc[0] current_sell_call_price = sell_price_result['close'].iloc[0] # 计算当前盈利 buy_call_pnl = (current_buy_call_price - position['buy_call']['price']) * 10000 sell_call_pnl = (position['sell_call']['price'] - current_sell_call_price) * 10000 current_pnl_per_contract = buy_call_pnl + sell_call_pnl # 获取最大盈利 max_profit_per_contract = position['profit_info']['max_profit_per_contract'] # print(f"{current_date.strftime('%Y-%m-%d')} ETF价格({etf_price:.4f})>=卖购行权价({sell_call_strike:.2f}),检查最大盈利平仓条件: 当前盈利: {current_pnl_per_contract:.2f}, 最大盈利: {max_profit_per_contract:.2f}") # 判断是否达到平仓阈值 if max_profit_per_contract > 0 and current_pnl_per_contract >= max_profit_per_contract * self.config['max_profit_close_threshold']: print(f"{current_date.strftime('%Y-%m-%d')} 触发最大盈利平仓: 当前盈利: {current_pnl_per_contract:.2f}, 最大盈利: {max_profit_per_contract:.2f}") return True, '最大盈利平仓' except Exception as e: print(f"{current_date.strftime('%Y-%m-%d')} 检查最大盈利平仓条件时出错: {e}") pass else: # ETF价格小于卖购行权价时,只考虑卖购权利金剩余平仓条件 try: query_date = current_date.date() if hasattr(current_date, 'date') else current_date q_price = query(opt.OPT_DAILY_PRICE.close).filter( opt.OPT_DAILY_PRICE.code == position['sell_call']['code'], opt.OPT_DAILY_PRICE.date == query_date ) price_result = opt.run_query(q_price) if not price_result.empty: current_sell_call_price = price_result['close'].iloc[0] # print(f"{current_date.strftime('%Y-%m-%d')} ETF价格({etf_price:.4f})<卖购行权价({sell_call_strike:.2f}),检查卖购权利金平仓条件: 卖购期权价格{current_sell_call_price:.4f}, 阈值{self.config['put_close_premium_threshold']:.4f}") if current_sell_call_price < self.config['put_close_premium_threshold']: print(f"{current_date.strftime('%Y-%m-%d')} 触发卖购权利金平仓: 卖购期权价格{current_sell_call_price:.4f}") return True, '卖购权利金平仓' except: pass return False, None def close_position(self, position, current_date, etf_price, reason): """平仓操作""" try: # 获取当前期权价格 query_date = current_date.date() if hasattr(current_date, 'date') else current_date q_buy_price = query(opt.OPT_DAILY_PRICE.close).filter( opt.OPT_DAILY_PRICE.code == position['buy_call']['code'], opt.OPT_DAILY_PRICE.date == query_date ) buy_price_result = opt.run_query(q_buy_price) if buy_price_result.empty: raise Exception(f"无法获取买购期权{position['buy_call']['code']}在{query_date}的价格数据") buy_call_close_price = buy_price_result['close'].iloc[0] q_sell_price = query(opt.OPT_DAILY_PRICE.close).filter( opt.OPT_DAILY_PRICE.code == position['sell_call']['code'], opt.OPT_DAILY_PRICE.date == query_date ) sell_price_result = opt.run_query(q_sell_price) if sell_price_result.empty: raise Exception(f"无法获取卖购期权{position['sell_call']['code']}在{query_date}的价格数据") sell_call_close_price = sell_price_result['close'].iloc[0] # 计算盈亏 buy_call_pnl = (buy_call_close_price - position['buy_call']['price']) * position['contract_size'] * 10000 sell_call_pnl = (position['sell_call']['price'] - sell_call_close_price) * position['contract_size'] * 10000 total_pnl = buy_call_pnl + sell_call_pnl - 10 # 扣除手续费 # 更新仓位状态 position['status'] = 'closed' position['close_date'] = current_date position['close_etf_price'] = etf_price position['close_reason'] = reason position['buy_call_close_price'] = buy_call_close_price position['sell_call_close_price'] = sell_call_close_price position['pnl'] = total_pnl # 记录交易(内存) trade_record = { '交易日期': current_date, '交易类型': '平仓', '仓位类型': position['position_type'], 'ETF标的': self.underlying_symbol, '买购期权价格': position['buy_call']['price'], '买购期权行权价': position['buy_call']['exercise_price'], '买购期权到期日': position['buy_call']['expiry_date'], '卖购期权价格': position['sell_call']['price'], '卖购期权行权价': position['sell_call']['exercise_price'], '卖购期权到期日': position['sell_call']['expiry_date'], '合约数量': position['contract_size'], 'ETF价格': etf_price, '买购期权收盘价': buy_call_close_price, '卖购期权收盘价': sell_call_close_price, '开仓日期': position['open_date'], '开仓ETF价格': position['etf_price'], '买购期权盈亏': buy_call_pnl, '卖购期权盈亏': sell_call_pnl, '总盈亏': total_pnl, '平仓原因': reason, '单张最大盈利': '', # 平仓时不需要,保持字段一致性 '单张最小盈利': '', # 平仓时不需要,保持字段一致性 '总最大盈利': '', # 平仓时不需要,保持字段一致性 '总最小盈利': '' # 平仓时不需要,保持字段一致性 } self.trade_records.append(trade_record) # 保存交易记录到CSV self.save_transaction_to_csv(trade_record) except Exception as e: print(f"平仓时出错: {e}") position['status'] = 'error' def should_add_position(self, current_date, etf_price): """判断是否应该加仓""" # 检查是否有主仓位 main_positions = [p for p in self.positions if p['position_type'] == 'main' and p['status'] == 'open'] if not main_positions: return False # 检查加仓次数是否超限 add_positions = [p for p in self.positions if p['position_type'] == 'add' and p['status'] == 'open'] if len(add_positions) >= self.config['max_add_positions']: return False # 检查是否触发加仓条件 latest_main_position = main_positions[-1] trigger_price = latest_main_position['add_position_trigger_price'] if trigger_price and etf_price <= trigger_price: return True return False def save_transaction_to_csv(self, transaction_data): """保存交易记录到CSV文件""" try: # 数据验证 - 确保必需字段存在 required_fields = ['交易日期', '交易类型', '仓位类型', 'ETF标的', '合约数量', 'ETF价格'] for field in required_fields: if field not in transaction_data or transaction_data[field] is None: print(f"警告: 交易记录缺少必需字段 {field},跳过保存") return # 数据验证 - 确保数据类型正确 if not isinstance(transaction_data.get('合约数量'), (int, float)) or transaction_data.get('合约数量') <= 0: print(f"警告: 合约数量无效 {transaction_data.get('合约数量')},跳过保存") return if not isinstance(transaction_data.get('ETF价格'), (int, float)) or transaction_data.get('ETF价格') <= 0: print(f"警告: ETF价格无效 {transaction_data.get('ETF价格')},跳过保存") return # 复制数据并标准化格式 data_copy = transaction_data.copy() # 定义完整的字段顺序,确保开仓和平仓记录字段一致 standard_fields = [ '交易日期', '交易类型', '仓位类型', 'ETF标的', '买购期权价格', '买购期权行权价', '买购期权到期日', '卖购期权价格', '卖购期权行权价', '卖购期权到期日', '合约数量', 'ETF价格', '单张最大盈利', '单张最小盈利', '总最大盈利', '总最小盈利', '买购期权收盘价', '卖购期权收盘价', '开仓日期', '开仓ETF价格', '买购期权盈亏', '卖购期权盈亏', '总盈亏', '平仓原因' ] # 确保所有字段都存在,缺失的用空字符串填充 standardized_data = {} for field in standard_fields: standardized_data[field] = data_copy.get(field, '') # 转换日期字段为YYYY-MM-DD格式 date_fields = ['交易日期', '买购期权到期日', '卖购期权到期日', '开仓日期'] for field in date_fields: if standardized_data[field] and standardized_data[field] != '': try: if hasattr(standardized_data[field], 'strftime'): standardized_data[field] = standardized_data[field].strftime('%Y-%m-%d') elif hasattr(standardized_data[field], 'date'): standardized_data[field] = standardized_data[field].date().strftime('%Y-%m-%d') else: # 如果是字符串,尝试解析后转换 date_obj = pd.to_datetime(standardized_data[field]) standardized_data[field] = date_obj.strftime('%Y-%m-%d') except: print(f"警告: 无法转换日期字段 {field}: {standardized_data[field]}") pass # 保持原值 # 检查文件是否存在 file_exists = os.path.exists(self.transaction_csv_path) # 转换为DataFrame,保持字段顺序 df = pd.DataFrame([standardized_data], columns=standard_fields) # 保存到CSV if file_exists: df.to_csv(self.transaction_csv_path, mode='a', header=False, index=False, encoding='utf-8-sig') else: df.to_csv(self.transaction_csv_path, mode='w', header=True, index=False, encoding='utf-8-sig') except Exception as e: print(f"保存交易记录到CSV时出错: {e}") print(f"问题数据: {transaction_data}") def save_daily_position_to_csv(self, position_data): """保存每日持仓记录到CSV文件""" try: # 复制数据并标准化日期格式 data_copy = position_data.copy() # 转换日期字段为YYYY-MM-DD格式(使用中文字段名) if '交易日期' in data_copy and data_copy['交易日期'] is not None: if hasattr(data_copy['交易日期'], 'strftime'): data_copy['交易日期'] = data_copy['交易日期'].strftime('%Y-%m-%d') elif hasattr(data_copy['交易日期'], 'date'): data_copy['交易日期'] = data_copy['交易日期'].date().strftime('%Y-%m-%d') else: # 如果是字符串,尝试解析后转换 try: date_obj = pd.to_datetime(data_copy['交易日期']) data_copy['交易日期'] = date_obj.strftime('%Y-%m-%d') except: pass # 保持原值 # 处理持仓详情字段 - 将列表转换为字符串 if '持仓详情' in data_copy and isinstance(data_copy['持仓详情'], list): if data_copy['持仓详情']: # 如果列表不为空 detail_strings = [] for detail in data_copy['持仓详情']: detail_str = f"{detail['期权类别']}:{detail['持仓标的代码']}@{detail['行权价格']:.2f}×{detail['合约数量']}(盈亏{detail['盈亏金额']:.2f})" detail_strings.append(detail_str) data_copy['持仓详情'] = '; '.join(detail_strings) else: data_copy['持仓详情'] = '无持仓' # 添加到每日持仓列表 self.daily_positions.append(data_copy) # 创建DataFrame,排除列表类型的复杂字段 csv_data = {k: v for k, v in data_copy.items() if not isinstance(v, list)} df = pd.DataFrame([csv_data]) # 检查文件是否存在 file_exists = os.path.exists(self.position_csv_path) # 保存到CSV if file_exists: df.to_csv(self.position_csv_path, mode='a', header=False, index=False, encoding='utf-8-sig') else: df.to_csv(self.position_csv_path, mode='w', header=True, index=False, encoding='utf-8-sig') except Exception as e: print(f"保存持仓记录到CSV时出错: {e}") print(f"问题数据: {position_data}") def record_daily_positions(self, trade_date, etf_price): """记录每日持仓状况""" # 获取当前所有开仓状态的持仓 open_positions = [p for p in self.positions if p['status'] == 'open'] if not open_positions: # 如果没有持仓,记录一条空仓记录 position_record = { '交易日期': trade_date, 'ETF价格': etf_price, '总仓位数': 0, '主仓位数': 0, '加仓仓位数': 0, '总合约数': 0, '总组合盈亏': 0, '总卖购盈亏': 0, '总买购盈亏': 0, '持仓详情': [] } self.save_daily_position_to_csv(position_record) return # 统计持仓信息 main_positions = [p for p in open_positions if p['position_type'] == 'main'] add_positions = [p for p in open_positions if p['position_type'] == 'add'] total_contracts = sum(p['contract_size'] for p in open_positions) # 记录详细持仓信息 position_details = [] total_combo_pnl = 0 total_sell_call_pnl = 0 total_buy_call_pnl = 0 for i, pos in enumerate(open_positions): # 获取当前期权价值(尽量获取,失败则使用开仓价格) try: query_date = trade_date.date() if hasattr(trade_date, 'date') else trade_date # 获取买购期权当前价格 q_buy_price = query(opt.OPT_DAILY_PRICE.close).filter( opt.OPT_DAILY_PRICE.code == pos['buy_call']['code'], opt.OPT_DAILY_PRICE.date == query_date ) buy_price_result = opt.run_query(q_buy_price) if buy_price_result.empty: raise Exception("无法获取买购期权价格数据") buy_call_current_price = buy_price_result['close'].iloc[0] # 获取卖购期权当前价格 q_sell_price = query(opt.OPT_DAILY_PRICE.close).filter( opt.OPT_DAILY_PRICE.code == pos['sell_call']['code'], opt.OPT_DAILY_PRICE.date == query_date ) sell_price_result = opt.run_query(q_sell_price) if sell_price_result.empty: raise Exception("无法获取卖购期权价格数据") sell_call_current_price = sell_price_result['close'].iloc[0] # 计算当前浮动盈亏 buy_call_pnl = (buy_call_current_price - pos['buy_call']['price']) * pos['contract_size'] * 10000 sell_call_pnl = (pos['sell_call']['price'] - sell_call_current_price) * pos['contract_size'] * 10000 combo_pnl = buy_call_pnl + sell_call_pnl except: buy_call_current_price = pos['buy_call']['price'] sell_call_current_price = pos['sell_call']['price'] buy_call_pnl = 0 sell_call_pnl = 0 combo_pnl = 0 # 累计总盈亏 total_combo_pnl += combo_pnl total_sell_call_pnl += sell_call_pnl total_buy_call_pnl += buy_call_pnl # 记录买购期权详细信息 buy_call_detail = { '持仓标的代码': pos['buy_call']['code'], '期权类别': '买购', '合约数量': pos['contract_size'], '行权价格': pos['buy_call']['exercise_price'], '成本价格': pos['buy_call']['price'], '当前价格': buy_call_current_price, '盈亏金额': buy_call_pnl, '仓位类型': pos['position_type'], '开仓日期': pos['open_date'] } # 记录卖购期权详细信息 sell_call_detail = { '持仓标的代码': pos['sell_call']['code'], '期权类别': '卖购', '合约数量': pos['contract_size'], '行权价格': pos['sell_call']['exercise_price'], '成本价格': pos['sell_call']['price'], '当前价格': sell_call_current_price, '盈亏金额': sell_call_pnl, '仓位类型': pos['position_type'], '开仓日期': pos['open_date'] } position_details.extend([buy_call_detail, sell_call_detail]) # 记录每日持仓汇总 position_record = { '交易日期': trade_date, 'ETF价格': etf_price, '总仓位数': len(open_positions), '主仓位数': len(main_positions), '加仓仓位数': len(add_positions), '总合约数': total_contracts, '总组合盈亏': total_combo_pnl, '总卖购盈亏': total_sell_call_pnl, '总买购盈亏': total_buy_call_pnl, '持仓详情': position_details } self.save_daily_position_to_csv(position_record) # 注释掉单独的账户管理器回调,改为在多标的管理器中统一处理 # 通知账户管理器更新每日汇总 # if self.account_manager_callback: # try: # self.account_manager_callback(trade_date) # except Exception as e: # print(f"{trade_date.strftime('%Y-%m-%d')} 更新账户汇总回调出错: {e}") def run_strategy(self): """运行策略主逻辑""" print("开始运行深度实值买购和卖购组合的牛差策略...") for i, trade_date in enumerate(self.trade_days.index): # 获取ETF价格 try: price_data = get_price(self.underlying_symbol, trade_date, trade_date, fields=['close'])['close'] # print(f"{trade_date.strftime('%Y-%m-%d')} 获取ETF价格: {price_data.iloc[0]:.4f}") if price_data.empty: print(f"{trade_date.strftime('%Y-%m-%d')} 获取ETF价格失败,因为price_data为空") continue etf_price = price_data.iloc[0] except: print(f"{trade_date.strftime('%Y-%m-%d')} 获取ETF价格失败,因为获取{self.underlying_symbol}价格失败") continue # 记录每日持仓状况 self.record_daily_positions(trade_date, etf_price) # 标记是否有交易发生 has_trading = False # 检查是否需要平仓 if len(self.positions) > 0: for position in self.positions: should_close, reason = self.should_close_position(position, trade_date, etf_price) # print(f"{trade_date.strftime('%Y-%m-%d')} 检查是否需要平仓: {should_close}, 平仓原因: {reason}") if should_close: self.close_position(position, trade_date, etf_price, reason) print(f"{trade_date.strftime('%Y-%m-%d')} 平仓: {reason}, ETF价格: {etf_price:.4f}") has_trading = True # 检查是否需要开新仓(首次开仓或平仓后重新开仓) open_positions = [p for p in self.positions if p['status'] == 'open'] if not open_positions: new_position = self.open_bull_spread_position(trade_date, etf_price, 'main') if new_position: max_profit = new_position['profit_info']['total_max_profit'] contract_size = new_position['contract_size'] # 获取资金信息(如果有的话) if hasattr(self, 'config') and 'allocated_capital' in self.config: allocated_capital = self.config.get('allocated_capital', 0) # 估算保证金使用(简化计算) estimated_margin = contract_size * 1000 # 每张约1000元保证金(粗略估算) print(f"{trade_date.strftime('%Y-%m-%d')} 开仓主仓位: 买购{new_position['buy_call']['exercise_price']:.2f}@{new_position['buy_call']['price']:.4f}, 卖购{new_position['sell_call']['exercise_price']:.2f}@{new_position['sell_call']['price']:.4f}, ETF价格: {etf_price:.4f}, 合约数量: {contract_size}张, 最大牛差收益: {max_profit:.2f}元, 可用资金: {allocated_capital:.0f}元, 预估保证金: {estimated_margin:.0f}元") else: print(f"{trade_date.strftime('%Y-%m-%d')} 开仓主仓位: 买购{new_position['buy_call']['exercise_price']:.2f}@{new_position['buy_call']['price']:.4f}, 卖购{new_position['sell_call']['exercise_price']:.2f}@{new_position['sell_call']['price']:.4f}, ETF价格: {etf_price:.4f}, 合约数量: {contract_size}张, 最大牛差收益: {max_profit:.2f}元") has_trading = True # 检查是否需要加仓 elif self.should_add_position(trade_date, etf_price): add_position = self.open_bull_spread_position(trade_date, etf_price, 'add', silent=False) if add_position: max_profit = add_position['profit_info']['total_max_profit'] contract_size = add_position['contract_size'] # 获取资金信息(如果有的话) if hasattr(self, 'config') and 'allocated_capital' in self.config: allocated_capital = self.config.get('allocated_capital', 0) # 估算保证金使用(简化计算) estimated_margin = contract_size * 1000 # 每张约1000元保证金(粗略估算) print(f"{trade_date.strftime('%Y-%m-%d')} 加仓: 买购{add_position['buy_call']['exercise_price']:.2f}@{add_position['buy_call']['price']:.4f}, 卖购{add_position['sell_call']['exercise_price']:.2f}@{add_position['sell_call']['price']:.4f}, ETF价格: {etf_price:.4f}, 合约数量: {contract_size}张, 最大牛差收益: {max_profit:.2f}元, 可用资金: {allocated_capital:.0f}元, 预估保证金: {estimated_margin:.0f}元") else: print(f"{trade_date.strftime('%Y-%m-%d')} 加仓: 买购{add_position['buy_call']['exercise_price']:.2f}@{add_position['buy_call']['price']:.4f}, 卖购{add_position['sell_call']['exercise_price']:.2f}@{add_position['sell_call']['price']:.4f}, ETF价格: {etf_price:.4f}, 合约数量: {contract_size}张, 最大牛差收益: {max_profit:.2f}元") has_trading = True print("策略运行完成!") def get_performance_summary(self): """获取策略表现总结""" if not self.trade_records: return "没有交易记录" closed_positions = [p for p in self.positions if p['status'] == 'closed'] if not closed_positions: return "没有已平仓的交易" total_pnl = sum(p['pnl'] for p in closed_positions) winning_trades = len([p for p in closed_positions if p['pnl'] > 0]) total_trades = len(closed_positions) win_rate = winning_trades / total_trades if total_trades > 0 else 0 summary = f""" 策略表现总结: ============= 总交易次数: {total_trades} 获利交易: {winning_trades} 胜率: {win_rate:.2%} 总盈亏: {total_pnl:.2f}元 平均每笔盈亏: {total_pnl/total_trades:.2f}元 """ return summary def plot_results(self): """绘制策略结果""" if not self.daily_positions: print("没有持仓数据可以绘制") return # 使用每日持仓记录绘制盈亏曲线 position_df = pd.DataFrame(self.daily_positions) # 过滤有持仓的记录(排除空仓记录,但保留盈亏为0的记录用于显示完整曲线) position_data = position_df.copy() if position_data.empty: print("没有持仓数据可以绘制") return # 确保交易日期是datetime格式 position_data['交易日期'] = pd.to_datetime(position_data['交易日期']) position_data = position_data.sort_values('交易日期') # 确保盈亏字段为数值类型 position_data['总组合盈亏'] = pd.to_numeric(position_data['总组合盈亏'], errors='coerce').fillna(0) position_data['总卖购盈亏'] = pd.to_numeric(position_data['总卖购盈亏'], errors='coerce').fillna(0) # 绘图 fig, ax = plt.subplots(1, 1, figsize=(12, 8)) # 绘制两条盈亏曲线 ax.plot(position_data['交易日期'], position_data['总组合盈亏'], label='每日组合浮动盈亏(买购+卖购)', color='blue', linewidth=2) ax.plot(position_data['交易日期'], position_data['总卖购盈亏'], label='每日卖购浮动盈亏', color='red', linewidth=2, linestyle='--') # 添加零线 ax.axhline(y=0, color='black', linestyle='-', alpha=0.3) ax.set_title(f'{self.get_underlying_code()}策略每日浮动盈亏对比', fontsize=14) ax.set_ylabel('浮动盈亏 (元)', fontsize=12) ax.set_xlabel('日期', fontsize=12) ax.legend(fontsize=10) ax.grid(True, alpha=0.3) # 格式化日期显示 import matplotlib.dates as mdates ax.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m-%d')) ax.xaxis.set_major_locator(mdates.MonthLocator()) plt.xticks(rotation=45) # 添加数据标注 if len(position_data) > 0: final_combo_pnl = position_data['总组合盈亏'].iloc[-1] final_sell_pnl = position_data['总卖购盈亏'].iloc[-1] max_combo_pnl = position_data['总组合盈亏'].max() min_combo_pnl = position_data['总组合盈亏'].min() ax.text(0.02, 0.98, f'当前组合浮盈: {final_combo_pnl:.2f}元\n' f'当前卖购浮盈: {final_sell_pnl:.2f}元\n' f'最大组合浮盈: {max_combo_pnl:.2f}元\n' f'最大组合浮亏: {min_combo_pnl:.2f}元', transform=ax.transAxes, fontsize=10, verticalalignment='top', bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.8)) plt.tight_layout() plt.show() # 策略配置类 class StrategyConfig: """策略配置管理""" def __init__(self): # ETF标的配置字典 self.etf_symbols = { '50ETF': '510050.XSHG' # 上证50ETF # '300ETF': '510300.XSHG', # 沪深300ETF # '创业板ETF': '159915.XSHE' # 创业板ETF } # 时间范围配置 self.time_config = { 'start_date': '2024-06-01', 'end_date': '2024-08-31' } # 资金配置 self.capital_config = { 'total_capital': 1000000, # 总资金额度(100万) 'capital_allocation': { # 不同标的资金分配比例 '50ETF': 0.3, # 50ETF 30% '300ETF': 0.5, # 300ETF 50% '创业板ETF': 0.2 # 创业板ETF 20% }, 'capital_usage_limit': 0.8, # 资金使用上限80% 'bull_spread_margin_discount': 30, # 牛差组合策略保证金优惠(每单位组合加收30元) 'contract_unit': 10000, # 合约单位(1张期权代表10000份标的) 'margin_params': { # 保证金计算参数 'volatility_factor': 0.12, # 波动率因子12% 'min_margin_factor': 0.07 # 最小保证金因子7% } } def get_allocated_capital(self, etf_code): """获取指定ETF的分配资金""" total_usable = self.capital_config['total_capital'] * self.capital_config['capital_usage_limit'] allocation_ratio = self.capital_config['capital_allocation'].get(etf_code, 0) return total_usable * allocation_ratio def calculate_option_margin(self, option_type, settlement_price, underlying_price, strike_price): """ 计算单张期权的保证金 :param option_type: 'call' 或 'put' :param settlement_price: 合约前结算价 :param underlying_price: 标的证券前收盘价 :param strike_price: 行权价 :return: 单张期权保证金 """ contract_unit = self.capital_config['contract_unit'] volatility_factor = self.capital_config['margin_params']['volatility_factor'] min_margin_factor = self.capital_config['margin_params']['min_margin_factor'] if option_type == 'call': # 认购期权虚值 = max(行权价 - 标的价格, 0) out_of_money = max(strike_price - underlying_price, 0) margin = (settlement_price + max( volatility_factor * underlying_price - out_of_money, min_margin_factor * underlying_price )) * contract_unit elif option_type == 'put': # 认沽期权虚值 = max(标的价格 - 行权价, 0) out_of_money = max(underlying_price - strike_price, 0) margin = min( settlement_price + max( volatility_factor * underlying_price - out_of_money, min_margin_factor * strike_price ), strike_price ) * contract_unit else: raise ValueError("option_type must be 'call' or 'put'") return margin def calculate_bull_spread_margin(self, buy_call_info, sell_call_info, underlying_price): """ 计算牛差组合的保证金 :param buy_call_info: 买入认购期权信息 :param sell_call_info: 卖出认购期权信息 :param underlying_price: 标的价格 :return: 牛差组合保证金 """ # 对于牛差组合,保证金 = 行权价差 * 合约单位 + 保证金优惠 strike_diff = sell_call_info['exercise_price'] - buy_call_info['exercise_price'] contract_unit = self.capital_config['contract_unit'] margin_discount = self.capital_config['bull_spread_margin_discount'] # 牛差组合的理论最大损失就是行权价差 bull_spread_margin = strike_diff * contract_unit + margin_discount return bull_spread_margin def calculate_contract_size(self, etf_code, etf_price, buy_call_info=None, sell_call_info=None): """ 计算可开仓的合约数量 :param etf_code: ETF代码 :param etf_price: ETF价格 :param buy_call_info: 买入认购期权信息(可选,用于牛差组合计算) :param sell_call_info: 卖出认购期权信息(可选,用于牛差组合计算) """ allocated_capital = self.get_allocated_capital(etf_code) if buy_call_info and sell_call_info: # 牛差组合保证金计算 margin_per_contract = self.calculate_bull_spread_margin(buy_call_info, sell_call_info, etf_price) else: # 单一期权保证金估算(使用卖出认购期权) estimated_settlement_price = etf_price * 0.02 # 估算权利金为标的价格的2% estimated_strike_price = etf_price * 1.05 # 估算行权价为标的价格的105% margin_per_contract = self.calculate_option_margin('call', estimated_settlement_price, etf_price, estimated_strike_price) max_contracts = int(allocated_capital / margin_per_contract) return min(max_contracts, 100) # 限制最大100张 # 多标的策略管理器 class MultiUnderlyingBullSpreadManager: """多标的牛差策略管理器""" def __init__(self, config: StrategyConfig): self.config = config self.strategies = {} self.daily_account_records = [] # 每日账户资金记录 self.account_csv_path = 'account_summary.csv' self.cumulative_realized_pnl = 0 # 累积已实现盈亏 self.previous_date_summary = None # 前一天的账户汇总记录 self.initialize_strategies() def initialize_strategies(self): """初始化各个标的的策略""" for etf_code, symbol in self.config.etf_symbols.items(): allocated_capital = self.config.get_allocated_capital(etf_code) if allocated_capital > 0: strategy = DeepITMBullSpreadStrategy( underlying_symbol=symbol, start_date=self.config.time_config['start_date'], end_date=self.config.time_config['end_date'] ) # 动态调整策略参数 strategy.config['allocated_capital'] = allocated_capital strategy.config['etf_code'] = etf_code # 设置账户管理器回调 strategy.account_manager_callback = self.record_daily_account_summary self.strategies[etf_code] = strategy print(f"初始化{etf_code}策略,分配资金: {allocated_capital:,.0f}元") def calculate_dynamic_contract_size(self, strategy, etf_price, buy_call_info=None, sell_call_info=None): """动态计算合约数量""" etf_code = strategy.config['etf_code'] return self.config.calculate_contract_size(etf_code, etf_price, buy_call_info, sell_call_info) def calculate_used_capital(self, strategy): """计算策略已使用的资金(牛差策略的净权利金支出)""" used_capital = 0 open_positions = [p for p in strategy.positions if p['status'] == 'open'] for position in open_positions: # 牛差策略的资金使用 = (买购权利金 - 卖购权利金) * 合约数量 * 10000 net_premium = position['buy_call']['price'] - position['sell_call']['price'] position_capital = net_premium * position['contract_size'] * 10000 used_capital += position_capital return used_capital def record_daily_account_summary(self, trade_date): """记录每日账户资金汇总""" initial_capital = self.config.capital_config['total_capital'] total_used_capital = 0 total_floating_pnl = 0 total_current_market_value = 0 # 所有策略的当前市值总和 strategy_details = {} # 计算当天的已实现盈亏(通过检查平仓交易) today_realized_pnl = 0 for etf_code, strategy in self.strategies.items(): # 检查当天是否有平仓交易 for trade_record in strategy.trade_records: if (trade_record.get('交易类型') == '平仓' and trade_record.get('交易日期') and trade_record.get('总盈亏') is not None): trade_date_record = trade_record.get('交易日期') if hasattr(trade_date_record, 'date'): trade_date_record = trade_date_record.date() elif isinstance(trade_date_record, str): try: trade_date_record = pd.to_datetime(trade_date_record).date() except: continue target_date = trade_date.date() if hasattr(trade_date, 'date') else trade_date if trade_date_record == target_date: today_realized_pnl += float(trade_record.get('总盈亏', 0)) # 更新累积已实现盈亏 self.cumulative_realized_pnl += today_realized_pnl # 汇总各策略的资金使用和浮动盈亏 for etf_code, strategy in self.strategies.items(): used_capital = self.calculate_used_capital(strategy) open_positions_count = len([p for p in strategy.positions if p['status'] == 'open']) # 计算组合当前市值和浮动盈亏 current_market_value = 0 floating_pnl = 0 if strategy.daily_positions: # 查找指定日期的持仓记录 target_record = None for position_record in strategy.daily_positions: record_date = position_record.get('交易日期') if hasattr(record_date, 'date'): record_date = record_date.date() elif isinstance(record_date, str): try: record_date = pd.to_datetime(record_date).date() except: continue target_date = trade_date.date() if hasattr(trade_date, 'date') else trade_date if record_date == target_date: target_record = position_record break # 如果找到了对应日期的记录,计算当前市值和浮动盈亏 if target_record: floating_pnl = target_record.get('总组合盈亏', 0) if isinstance(floating_pnl, str): try: floating_pnl = float(floating_pnl) except: floating_pnl = 0 # 计算组合当前市值 = 初始投入成本 + 浮动盈亏 current_market_value = used_capital + floating_pnl total_used_capital += used_capital total_floating_pnl += floating_pnl total_current_market_value += current_market_value strategy_details[etf_code] = { '分配资金': strategy.config.get('allocated_capital', 0), '初始投入': used_capital, '当前市值': current_market_value, '浮动盈亏': floating_pnl, '持仓数量': open_positions_count } # 修正后的计算逻辑: # 当前总资金 = 初始资金 + 累积已实现盈亏 current_total_capital = initial_capital + self.cumulative_realized_pnl # 剩余现金 = 当前总资金 - 投入资金 remaining_cash = current_total_capital - total_used_capital # 账户总价值 = 剩余现金 + 组合当前市值 total_account_value = remaining_cash + total_current_market_value # 记录账户汇总 account_record = { '交易日期': trade_date, '总资金': current_total_capital, # 使用当前总资金而非初始资金 '初始投入总额': total_used_capital, '剩余现金': remaining_cash, '组合当前市值': total_current_market_value, '总浮动盈亏': total_floating_pnl, '账户总价值': total_account_value, '累积已实现盈亏': self.cumulative_realized_pnl, # 新增字段 '当日已实现盈亏': today_realized_pnl, # 新增字段 '资金使用率': total_used_capital / current_total_capital if current_total_capital > 0 else 0, '收益率': (total_account_value - initial_capital) / initial_capital if initial_capital > 0 else 0, # 修正收益率计算 '策略详情': strategy_details } self.daily_account_records.append(account_record) self.save_account_summary_to_csv(account_record) # 更新前一天的记录 self.previous_date_summary = account_record # 输出整体账户汇总(只在有活动时输出) # if total_used_capital > 0 or total_floating_pnl != 0: # print(f"{trade_date.strftime('%Y-%m-%d')} 【账户汇总】总资金{total_capital:.0f}元|已投入{total_used_capital:.0f}元|剩余现金{remaining_cash:.0f}元|组合市值{total_current_market_value:.0f}元|账户总值{total_account_value:.0f}元|总收益{total_floating_pnl:.0f}元|收益率{(total_floating_pnl/total_capital)*100:.2f}%") def save_account_summary_to_csv(self, account_data): """保存账户汇总到CSV文件""" try: # 复制数据并处理日期格式 data_copy = account_data.copy() # 转换日期字段 if '交易日期' in data_copy and data_copy['交易日期'] is not None: if hasattr(data_copy['交易日期'], 'strftime'): data_copy['交易日期'] = data_copy['交易日期'].strftime('%Y-%m-%d') elif hasattr(data_copy['交易日期'], 'date'): data_copy['交易日期'] = data_copy['交易日期'].date().strftime('%Y-%m-%d') else: try: date_obj = pd.to_datetime(data_copy['交易日期']) data_copy['交易日期'] = date_obj.strftime('%Y-%m-%d') except: pass # 处理策略详情字段 if '策略详情' in data_copy and isinstance(data_copy['策略详情'], dict): strategy_strings = [] for etf_code, details in data_copy['策略详情'].items(): strategy_str = f"{etf_code}(分配{details['分配资金']:.0f}|投入{details['初始投入']:.0f}|市值{details['当前市值']:.0f}|浮盈{details['浮动盈亏']:.0f}|持仓{details['持仓数量']})" strategy_strings.append(strategy_str) data_copy['策略详情'] = '; '.join(strategy_strings) # 创建DataFrame,排除复杂字段 csv_data = {k: v for k, v in data_copy.items() if not isinstance(v, dict)} df = pd.DataFrame([csv_data]) # 检查文件是否存在 file_exists = os.path.exists(self.account_csv_path) # 保存到CSV if file_exists: df.to_csv(self.account_csv_path, mode='a', header=False, index=False, encoding='utf-8-sig') else: df.to_csv(self.account_csv_path, mode='w', header=True, index=False, encoding='utf-8-sig') except Exception as e: print(f"保存账户汇总到CSV时出错: {e}") print(f"问题数据: {account_data}") def plot_account_summary(self): """绘制整体账户资金曲线""" if not self.daily_account_records: print("没有账户数据可以绘制") return # 转换为DataFrame account_df = pd.DataFrame(self.daily_account_records) if account_df.empty: print("没有账户数据可以绘制") return # print(f"账户数据记录数量: {len(account_df)}") # print(f"账户数据列: {account_df.columns.tolist()}") # print(f"前几行数据:\n{account_df.head()}") # 确保日期格式正确 account_df['交易日期'] = pd.to_datetime(account_df['交易日期'], errors='coerce') account_df = account_df.dropna(subset=['交易日期']).sort_values('交易日期') if account_df.empty: print("日期转换后没有有效数据") return # 确保数值字段为数值类型,处理可能的字符串或其他类型 numeric_columns = ['总资金', '初始投入总额', '剩余现金', '组合当前市值', '总浮动盈亏', '账户总价值', '资金使用率', '收益率'] for col in numeric_columns: if col in account_df.columns: # 先转换为字符串,移除可能的非数字字符 account_df[col] = account_df[col].astype(str).str.replace('[^\d.-]', '', regex=True) # 再转换为数值类型 account_df[col] = pd.to_numeric(account_df[col], errors='coerce').fillna(0) print(f"列 {col} 数据范围: {account_df[col].min()} 到 {account_df[col].max()}") # 检查是否有有效的数值数据 if account_df[['总资金', '账户总价值']].isna().all().all(): print("所有数值数据都无效,无法绘图") return # 绘图 fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 10)) try: # 上图:账户总价值曲线 # 转换为numpy数组以确保数据类型 dates = account_df['交易日期'].values total_values = account_df['账户总价值'].values.astype(float) initial_capitals = account_df['总资金'].values.astype(float) ax1.plot(dates, total_values, label='账户总价值', color='green', linewidth=2) ax1.plot(dates, initial_capitals, label='初始资金', color='gray', linewidth=1, linestyle='--') # 填充区域(只有在数据有效时) if len(total_values) > 0 and len(initial_capitals) > 0: profit_mask = total_values >= initial_capitals loss_mask = total_values < initial_capitals if profit_mask.any(): ax1.fill_between(dates, initial_capitals, total_values, where=profit_mask, color='green', alpha=0.3, label='盈利区域') if loss_mask.any(): ax1.fill_between(dates, initial_capitals, total_values, where=loss_mask, color='red', alpha=0.3, label='亏损区域') ax1.set_title('整体账户资金曲线', fontsize=14) ax1.set_ylabel('资金 (元)', fontsize=12) ax1.legend(fontsize=10) ax1.grid(True, alpha=0.3) # 下图:资金构成 remaining_cash = account_df['剩余现金'].values.astype(float) current_market_value = account_df['组合当前市值'].values.astype(float) ax2.plot(dates, remaining_cash, label='剩余现金', color='blue', linewidth=2) ax2.plot(dates, current_market_value, label='组合当前市值', color='orange', linewidth=2, linestyle='--') ax2.axhline(y=0, color='black', linestyle='-', alpha=0.3) ax2.set_title('资金构成分析', fontsize=14) ax2.set_ylabel('金额 (元)', fontsize=12) ax2.set_xlabel('日期', fontsize=12) ax2.legend(fontsize=10) ax2.grid(True, alpha=0.3) # 格式化日期显示 import matplotlib.dates as mdates for ax in [ax1, ax2]: ax.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m-%d')) ax.xaxis.set_major_locator(mdates.MonthLocator()) plt.xticks(rotation=45) # 添加数据标注 if len(total_values) > 0: final_value = float(total_values[-1]) initial_capital = float(initial_capitals[0]) total_return = final_value - initial_capital return_rate = total_return / initial_capital if initial_capital > 0 else 0 max_value = float(np.max(total_values)) min_value = float(np.min(total_values)) ax1.text(0.02, 0.98, f'当前账户价值: {final_value:,.0f}元\n' f'总收益: {total_return:,.0f}元\n' f'收益率: {return_rate:.2%}\n' f'最高价值: {max_value:,.0f}元\n' f'最低价值: {min_value:,.0f}元', transform=ax1.transAxes, fontsize=10, verticalalignment='top', bbox=dict(boxstyle='round', facecolor='lightblue', alpha=0.8)) plt.tight_layout() plt.show() except Exception as e: print(f"绘图过程中出错: {e}") print(f"账户数据样本:\n{account_df[['交易日期', '总资金', '账户总价值', '剩余现金', '组合当前市值', '总浮动盈亏']].head()}") # 尝试简化版本的绘图 try: fig2, ax = plt.subplots(1, 1, figsize=(10, 6)) ax.plot(range(len(account_df)), account_df['账户总价值'].astype(float), label='账户总价值', color='green', linewidth=2) ax.set_title('简化版账户资金曲线') ax.set_ylabel('资金 (元)') ax.set_xlabel('交易日序号') ax.legend() ax.grid(True) plt.show() except Exception as e2: print(f"简化绘图也失败: {e2}") def run_all_strategies(self): """运行所有策略""" print("="*60) print("开始运行多标的深度实值买购和卖购组合的牛差策略") print("="*60) # 获取统一的交易日历(取第一个策略的交易日历作为基准) if not self.strategies: print("没有可运行的策略") return {} first_strategy = list(self.strategies.values())[0] trade_days = first_strategy.trade_days.index # 为每个策略设置动态合约数量计算方法 for etf_code, strategy in self.strategies.items(): original_open_method = strategy.open_bull_spread_position def make_dynamic_open_position(strat, orig_method): def dynamic_open_position(trade_date, etf_price, position_type='main', silent=False, save_to_csv=True): # 首先用原方法获取期权信息,但不保存到CSV original_contract_size = strat.config['contract_size'] strat.config['contract_size'] = 1 # 临时设置为1张以获取期权信息 temp_result = orig_method(trade_date, etf_price, position_type, silent=True, save_to_csv=False) if temp_result: # 用获取的期权信息动态计算合约数量 buy_call_info = temp_result['buy_call'] sell_call_info = temp_result['sell_call'] dynamic_size = self.calculate_dynamic_contract_size(strat, etf_price, buy_call_info, sell_call_info) # 更新合约数量并重新开仓 strat.config['contract_size'] = dynamic_size # 移除临时仓位和交易记录 if strat.positions and strat.positions[-1] == temp_result: strat.positions.pop() if strat.trade_records and len(strat.trade_records) > 0: # 检查最后一条记录是否是刚刚添加的临时记录 last_record = strat.trade_records[-1] if (last_record.get('交易类型') == '开仓' and not last_record.get('平仓原因') and last_record.get('合约数量') == 1): strat.trade_records.pop() # 重新开仓,这次保存到CSV result = orig_method(trade_date, etf_price, position_type, silent, save_to_csv) else: result = None # 恢复原始设置 strat.config['contract_size'] = original_contract_size return result return dynamic_open_position strategy.open_bull_spread_position = make_dynamic_open_position(strategy, original_open_method) # 按日期统一运行所有策略 print(f"开始按日期统一运行策略,共{len(trade_days)}个交易日") for i, trade_date in enumerate(trade_days): # print(f"\n处理交易日 {trade_date.strftime('%Y-%m-%d')} ({i+1}/{len(trade_days)})") # 为每个策略处理当天的交易 daily_has_activity = False for etf_code, strategy in self.strategies.items(): try: # 获取ETF价格 price_data = get_price(strategy.underlying_symbol, trade_date, trade_date, fields=['close'])['close'] if price_data.empty: print(f" {etf_code}: 获取ETF价格失败") continue etf_price = price_data.iloc[0] # 记录每日持仓状况(但不触发账户汇总回调) strategy.record_daily_positions(trade_date, etf_price) # 检查是否需要平仓 if len(strategy.positions) > 0: for position in strategy.positions: should_close, reason = strategy.should_close_position(position, trade_date, etf_price) if should_close: strategy.close_position(position, trade_date, etf_price, reason) print(f" {etf_code}: 平仓 {reason}, ETF价格: {etf_price:.4f}") daily_has_activity = True # 检查是否需要开新仓(首次开仓或平仓后重新开仓) open_positions = [p for p in strategy.positions if p['status'] == 'open'] if not open_positions: new_position = strategy.open_bull_spread_position(trade_date, etf_price, 'main') if new_position: max_profit = new_position['profit_info']['total_max_profit'] contract_size = new_position['contract_size'] print(f" {etf_code}: 开仓主仓位,ETF价格: {etf_price:.4f}, 合约数量: {contract_size}张, 最大牛差收益: {max_profit:.2f}元") daily_has_activity = True # 检查是否需要加仓 elif strategy.should_add_position(trade_date, etf_price): add_position = strategy.open_bull_spread_position(trade_date, etf_price, 'add') if add_position: max_profit = add_position['profit_info']['total_max_profit'] contract_size = add_position['contract_size'] print(f" {etf_code}: 加仓,ETF价格: {etf_price:.4f}, 合约数量: {contract_size}张, 最大牛差收益: {max_profit:.2f}元") daily_has_activity = True except Exception as e: print(f" {etf_code}: 处理失败 - {e}") # 统一记录当天的账户汇总(无论是否有交易活动) try: self.record_daily_account_summary(trade_date) if daily_has_activity: print(f" 账户汇总已更新") except Exception as e: print(f" 记录账户汇总失败: {e}") # 收集所有策略的结果 results = {} for etf_code, strategy in self.strategies.items(): try: results[etf_code] = { 'strategy': strategy, 'summary': strategy.get_performance_summary(), 'allocated_capital': strategy.config['allocated_capital'] } print(f"{etf_code}策略运行完成") except Exception as e: print(f"{etf_code}策略结果收集出错: {e}") results[etf_code] = {'error': str(e)} print("\n所有策略运行完成!") return results def generate_overall_report(self, results): """生成总体报告""" print("\n" + "="*60) print("多标的牛差策略总体报告") print("="*60) total_pnl = 0 total_trades = 0 total_winning_trades = 0 for etf_code, result in results.items(): if 'error' in result: print(f"\n{etf_code}: 策略执行出错 - {result['error']}") continue strategy = result['strategy'] allocated_capital = result['allocated_capital'] print(f"\n{etf_code} 策略结果:") print(f"分配资金: {allocated_capital:,.0f}元") print(result['summary']) # 统计总体数据 closed_positions = [p for p in strategy.positions if p['status'] == 'closed'] if closed_positions: strategy_pnl = sum(p['pnl'] for p in closed_positions) strategy_trades = len(closed_positions) strategy_winning = len([p for p in closed_positions if p['pnl'] > 0]) total_pnl += strategy_pnl total_trades += strategy_trades total_winning_trades += strategy_winning # 总体统计 if total_trades > 0: overall_win_rate = total_winning_trades / total_trades print(f"\n总体策略表现:") print(f"总交易次数: {total_trades}") print(f"总获利交易: {total_winning_trades}") print(f"总体胜率: {overall_win_rate:.2%}") print(f"总盈亏: {total_pnl:.2f}元") print(f"平均每笔盈亏: {total_pnl/total_trades:.2f}元") print(f"总资金收益率: {total_pnl/self.config.capital_config['total_capital']:.2%}") # 使用示例 def run_deep_itm_bull_spread_example(): """运行深度实值买购和卖购组合牛差策略示例""" # 创建策略配置 config = StrategyConfig() # 可以自定义配置 config.time_config = { 'start_date': '2024-05-01', 'end_date': '2025-01-31' } config.capital_config.update({ 'total_capital': 1000000, # 100万总资金 'capital_allocation': { '50ETF': 0.5, # 50ETF 50% '300ETF': 0.3, # 300ETF 30% '创业板ETF': 0.2 # 创业板ETF 20% }, 'capital_usage_limit': 0.8, # 使用80%资金 'bull_spread_margin_discount': 30, # 牛差组合策略保证金优惠(每单位组合加收30元) 'contract_unit': 10000, # 合约单位 'margin_params': { # 保证金计算参数(按交易所规定) 'volatility_factor': 0.12, # 波动率因子12% 'min_margin_factor': 0.07 # 最小保证金因子7% } }) # 创建多标的策略管理器 manager = MultiUnderlyingBullSpreadManager(config) # 运行所有策略 results = manager.run_all_strategies() # 生成报告 manager.generate_overall_report(results) # 绘制整体账户资金曲线 try: print(f"\n绘制整体账户资金曲线...") manager.plot_account_summary() except Exception as e: print(f"绘制整体账户资金曲线时出错: {e}") # 绘制各策略结果(可选) for etf_code, result in results.items(): if 'strategy' in result: try: print(f"\n绘制{etf_code}策略结果...") result['strategy'].plot_results() except Exception as e: print(f"绘制{etf_code}图表时出错: {e}")