| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319 |
- import pandas as pd
- import numpy as np
- import matplotlib.pyplot as plt
- from statistics import mean
- import os
- from datetime import datetime
- def analyze_options(*options):
- """
- 统一的期权分析方法
- 参数:
- *options: 一个或多个期权,每个期权格式为 (direction, option_type, premium, strike_price, quantity)
- 例如: ('buy', 'call', 0.0456, 2.75, 1)
- direction: 方向,buy 或 sell
- option_type: 期权类型,call 或 put
- premium: 权利金
- strike_price: 行权价
- quantity: 数量
- 示例:
- # 单个期权
- analyze_options(('buy', 'call', 0.05, 3.0, 1))
- # 期权组合
- analyze_options(('buy', 'call', 0.08, 2.9, 1), ('sell', 'call', 0.03, 3.1, 1))
- """
- if not options:
- raise ValueError("请至少提供一个期权")
- # 解析期权数据
- option_list = []
- all_strikes = []
- for i, opt in enumerate(options):
- if len(opt) != 5:
- raise ValueError(f"期权{i+1}格式错误,应为(direction, option_type, premium, strike_price, quantity)")
- direction, option_type, premium, strike_price, quantity = opt
- option_list.append({
- 'direction': direction,
- 'option_type': option_type,
- 'premium': premium,
- 'strike_price': strike_price,
- 'quantity': quantity
- })
- all_strikes.append(strike_price)
- # 确定价格分析区间
- min_strike = min(all_strikes)
- max_strike = max(all_strikes)
- price_min = min_strike * 0.7
- price_max = max_strike * 1.3
- # 生成价格序列
- gap = (price_max - price_min) / 1000
- prices = np.arange(price_min, price_max + gap, gap)
- # 计算每个期权的收益
- results = {'price': prices}
- for i, opt in enumerate(option_list):
- profits = []
- for price in prices:
- profit = _calculate_profit(opt, price)
- profits.append(profit)
- results[f'opt{i+1}'] = profits
- # 计算组合收益
- if len(option_list) > 1:
- combined_profits = []
- for j in range(len(prices)):
- total = sum(results[f'opt{i+1}'][j] for i in range(len(option_list)))
- combined_profits.append(total)
- results['combined'] = combined_profits
- # 绘制图表
- _plot_results(results, option_list, prices)
- # 打印分析报告
- _print_report(results, option_list, prices)
- return pd.DataFrame(results)
- def _calculate_profit(option, price):
- """计算单个期权在特定价格下的收益"""
- direction = option['direction']
- option_type = option['option_type']
- premium = option['premium']
- strike_price = option['strike_price']
- quantity = option['quantity']
- if direction == 'buy' and option_type == 'call':
- # 买入认购
- if price > strike_price:
- return (price - strike_price - premium) * quantity
- else:
- return -premium * quantity
- elif direction == 'sell' and option_type == 'call':
- # 卖出认购
- if price > strike_price:
- return -(price - strike_price - premium) * quantity
- else:
- return premium * quantity
- elif direction == 'buy' and option_type == 'put':
- # 买入认沽
- if price < strike_price:
- return (strike_price - price - premium) * quantity
- else:
- return -premium * quantity
- elif direction == 'sell' and option_type == 'put':
- # 卖出认沽
- if price < strike_price:
- return -(strike_price - price - premium) * quantity
- else:
- return premium * quantity
- return 0
- def _plot_results(results, option_list, prices):
- """绘制分析图表"""
- plt.figure(figsize=(14, 10))
- plt.rcParams['axes.unicode_minus'] = False
- colors = ['blue', 'green', 'orange', 'purple', 'brown']
- # 绘制单个期权曲线
- for i in range(len(option_list)):
- opt = option_list[i]
- opt_name = f'opt{i+1}'
- strategy_name = f"{opt['direction'].upper()} {opt['option_type'].upper()}"
- color = colors[i % len(colors)]
- plt.plot(prices, results[opt_name], '--', color=color, linewidth=2, alpha=0.7,
- label=f'{opt_name}: {strategy_name}(strike price:{opt["strike_price"]}, premium:{opt["premium"]})')
- # 绘制组合曲线
- if 'combined' in results:
- plt.plot(prices, results['combined'], 'r-', linewidth=3, label='Portfolio returns')
- # 添加零线和行权价线
- plt.axhline(0, color='gray', linestyle='-', alpha=0.5)
- for opt in option_list:
- plt.axvline(opt['strike_price'], color='gray', linestyle='--', alpha=0.3)
- # 找到并标注关键点(盈亏平衡点、最大收益/损失边界点)
- if 'combined' in results:
- _mark_key_points(results['combined'], prices, 'Portfolio')
- elif len(option_list) == 1:
- _mark_key_points(results['opt1'], prices, 'Option')
- plt.xlabel('Assest Price', fontsize=12)
- plt.ylabel('Profit/Loss', fontsize=12)
- if len(option_list) == 1:
- opt = option_list[0]
- title = f'{opt["direction"].upper()} {opt["option_type"].upper()} Option Analysis'
- else:
- title = f'Portfolio Analysis ({len(option_list)} options)'
- plt.title(title, fontsize=14, weight='bold')
- plt.grid(True, alpha=0.3)
- plt.legend()
- plt.tight_layout()
-
- # 保存图片到data/Options目录
- timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
- filename = f'options_analysis_{timestamp}.png'
- save_path = os.path.join('data', 'Options', filename)
- plt.savefig(save_path, dpi=300, bbox_inches='tight')
- print(f"图片已保存到: {save_path}")
- plt.show()
- def _mark_key_points(profits, prices, label_prefix):
- """标注关键点:盈亏平衡点、最大收益/损失边界点"""
- # 1. 标注盈亏平衡点
- for i in range(len(profits) - 1):
- if profits[i] * profits[i + 1] <= 0: # 符号改变
- # 线性插值找到精确平衡点
- p1, profit1 = prices[i], profits[i]
- p2, profit2 = prices[i + 1], profits[i + 1]
- if profit2 != profit1:
- breakeven_price = p1 - profit1 * (p2 - p1) / (profit2 - profit1)
- plt.plot(breakeven_price, 0, 'ro', markersize=10)
- plt.annotate(f'Equilibrium Point: {breakeven_price:.4f}',
- xy=(breakeven_price, 0),
- xytext=(breakeven_price + (prices.max() - prices.min()) * 0.05, max(profits) * 0.1),
- arrowprops=dict(arrowstyle='->', color='red'),
- fontsize=11, color='red', weight='bold')
- # 2. 找到最大收益和最大损失点
- max_profit = max(profits)
- min_profit = min(profits)
- # 3. 检查是否存在最大收益/损失的边界点
- # 最大收益边界点:收益达到最大值后不再增长的点
- max_boundary_points = _find_boundary_points(profits, prices, max_profit, 'max')
- # 最大损失边界点:损失达到最大值后不再增长的点
- min_boundary_points = _find_boundary_points(profits, prices, min_profit, 'min')
- # 4. 标注最大收益边界点
- for bp in max_boundary_points:
- plt.plot(bp, max_profit, 'go', markersize=10)
- plt.annotate(f'Max Returns: ({bp:.4f}, {max_profit:.4f})',
- xy=(bp, max_profit),
- xytext=(bp + (prices.max() - prices.min()) * 0.05, max_profit + (max_profit - min_profit) * 0.1),
- arrowprops=dict(arrowstyle='->', color='green'),
- fontsize=10, color='green', weight='bold')
- # 5. 标注最大损失边界点
- for bp in min_boundary_points:
- plt.plot(bp, min_profit, 'mo', markersize=10)
- plt.annotate(f'Max Loss: ({bp:.4f}, {min_profit:.4f})',
- xy=(bp, min_profit),
- xytext=(bp + (prices.max() - prices.min()) * 0.05, min_profit - (max_profit - min_profit) * 0.1),
- arrowprops=dict(arrowstyle='->', color='magenta'),
- fontsize=10, color='magenta', weight='bold')
- def _find_boundary_points(profits, prices, extreme_value, _extreme_type):
- """找到最大收益或最大损失的边界点"""
- boundary_points = []
- tolerance = abs(extreme_value) * 0.001 # 允许的误差范围
- # 找到所有接近极值的点
- extreme_indices = []
- for i, profit in enumerate(profits):
- if abs(profit - extreme_value) <= tolerance:
- extreme_indices.append(i)
- if not extreme_indices:
- return boundary_points
- # 找到连续区间的边界点
- if len(extreme_indices) > 1:
- # 检查是否是连续的区间
- continuous_regions = []
- current_region = [extreme_indices[0]]
- for i in range(1, len(extreme_indices)):
- if extreme_indices[i] - extreme_indices[i-1] <= 2: # 允许小的间隔
- current_region.append(extreme_indices[i])
- else:
- continuous_regions.append(current_region)
- current_region = [extreme_indices[i]]
- continuous_regions.append(current_region)
- # 对于每个连续区间,标注边界点
- for region in continuous_regions:
- if len(region) > 10: # 只有当区间足够长时才标注边界点
- # 左边界点
- left_boundary = prices[region[0]]
- boundary_points.append(left_boundary)
- # 右边界点
- right_boundary = prices[region[-1]]
- boundary_points.append(right_boundary)
- return boundary_points
- def _print_report(results, option_list, prices):
- """打印分析报告"""
- print("=" * 60)
- print("期权分析报告")
- print("=" * 60)
- # 期权基本信息
- for i, opt in enumerate(option_list):
- print(f"期权{i+1}: {opt['direction'].upper()} {opt['option_type'].upper()}")
- print(f" 行权价: {opt['strike_price']}")
- print(f" 权利金: {opt['premium']}")
- print(f" 数量: {opt['quantity']}手")
- # 分析关键指标
- if 'combined' in results:
- profits = results['combined']
- print(f"\n【组合分析】")
- else:
- profits = results['opt1']
- print(f"\n【单期权分析】")
- max_profit = max(profits)
- min_profit = min(profits)
- max_idx = profits.index(max_profit)
- min_idx = profits.index(min_profit)
- print(f"最大收益: {max_profit:.4f} (标的价格: {prices[max_idx]:.4f})")
- print(f"最大损失: {min_profit:.4f} (标的价格: {prices[min_idx]:.4f})")
- print(f"一单最大收益: {max_profit * 10000:.2f}元")
- print(f"一单最大亏损: {abs(min_profit) * 10000:.2f}元")
- # 找盈亏平衡点
- breakeven_points = []
- for i in range(len(profits) - 1):
- if profits[i] * profits[i + 1] <= 0:
- p1, profit1 = prices[i], profits[i]
- p2, profit2 = prices[i + 1], profits[i + 1]
- if profit2 != profit1:
- bp = p1 - profit1 * (p2 - p1) / (profit2 - profit1)
- breakeven_points.append(bp)
- if breakeven_points:
- print(f"盈亏平衡点: {[f'{bp:.4f}' for bp in breakeven_points]}")
- else:
- print("无盈亏平衡点")
- print("=" * 60)
- analyze_options(('sell', 'call', 0.066, 3.0, 1), ('buy', 'call', 0.2017, 2.8, 1))
|