import pandas as pd import numpy as np import matplotlib.pyplot as plt from statistics import mean def analyze_options(*options): """ 统一的期权分析方法 参数: *options: 一个或多个期权,每个期权格式为 (direction, option_type, premium, strike_price, quantity) 例如: ('buy', 'call', 0.0456, 2.75, 1) 示例: # 单个期权 analyze_options(('buy', 'call', 0.05, 3.0, 1)) # 期权组合 analyze_options(('buy', 'call', 0.08, 2.9, 1), ('sell', 'call', 0.03, 3.1, 1)) """ if not options: raise ValueError("请至少提供一个期权") # 解析期权数据 option_list = [] all_strikes = [] for i, opt in enumerate(options): if len(opt) != 5: raise ValueError(f"期权{i+1}格式错误,应为(direction, option_type, premium, strike_price, quantity)") direction, option_type, premium, strike_price, quantity = opt option_list.append({ 'direction': direction, 'option_type': option_type, 'premium': premium, 'strike_price': strike_price, 'quantity': quantity }) all_strikes.append(strike_price) # 确定价格分析区间 min_strike = min(all_strikes) max_strike = max(all_strikes) price_min = min_strike * 0.7 price_max = max_strike * 1.3 # 生成价格序列 gap = (price_max - price_min) / 1000 prices = np.arange(price_min, price_max + gap, gap) # 计算每个期权的收益 results = {'price': prices} for i, opt in enumerate(option_list): profits = [] for price in prices: profit = _calculate_profit(opt, price) profits.append(profit) results[f'opt{i+1}'] = profits # 计算组合收益 if len(option_list) > 1: combined_profits = [] for j in range(len(prices)): total = sum(results[f'opt{i+1}'][j] for i in range(len(option_list))) combined_profits.append(total) results['combined'] = combined_profits # 绘制图表 _plot_results(results, option_list, prices) # 打印分析报告 _print_report(results, option_list, prices) return pd.DataFrame(results) def _calculate_profit(option, price): """计算单个期权在特定价格下的收益""" direction = option['direction'] option_type = option['option_type'] premium = option['premium'] strike_price = option['strike_price'] quantity = option['quantity'] if direction == 'buy' and option_type == 'call': # 买入认购 if price > strike_price: return (price - strike_price - premium) * quantity else: return -premium * quantity elif direction == 'sell' and option_type == 'call': # 卖出认购 if price > strike_price: return -(price - strike_price - premium) * quantity else: return premium * quantity elif direction == 'buy' and option_type == 'put': # 买入认沽 if price < strike_price: return (strike_price - price - premium) * quantity else: return -premium * quantity elif direction == 'sell' and option_type == 'put': # 卖出认沽 if price < strike_price: return -(strike_price - price - premium) * quantity else: return premium * quantity return 0 def _plot_results(results, option_list, prices): """绘制分析图表""" plt.figure(figsize=(14, 10)) plt.rcParams['axes.unicode_minus'] = False colors = ['blue', 'green', 'orange', 'purple', 'brown'] # 绘制单个期权曲线 for i in range(len(option_list)): opt = option_list[i] opt_name = f'opt{i+1}' strategy_name = f"{opt['direction'].upper()} {opt['option_type'].upper()}" color = colors[i % len(colors)] plt.plot(prices, results[opt_name], '--', color=color, linewidth=2, alpha=0.7, label=f'{opt_name}: {strategy_name} (strike price:{opt["strike_price"]})') # 绘制组合曲线 if 'combined' in results: plt.plot(prices, results['combined'], 'r-', linewidth=3, label='Portfolio returns') # 添加零线和行权价线 plt.axhline(0, color='gray', linestyle='-', alpha=0.5) for opt in option_list: plt.axvline(opt['strike_price'], color='gray', linestyle='--', alpha=0.3) # 找到并标注关键点(盈亏平衡点、最大收益/损失边界点) if 'combined' in results: _mark_key_points(results['combined'], prices, 'Portfolio') elif len(option_list) == 1: _mark_key_points(results['opt1'], prices, 'Option') plt.xlabel('Assest Price', fontsize=12) plt.ylabel('Profit/Loss', fontsize=12) if len(option_list) == 1: opt = option_list[0] title = f'{opt["direction"].upper()} {opt["option_type"].upper()} Option Analysis' else: title = f'Portfolio Analysis ({len(option_list)} options)' plt.title(title, fontsize=14, weight='bold') plt.grid(True, alpha=0.3) plt.legend() plt.tight_layout() plt.show() def _mark_key_points(profits, prices, label_prefix): """标注关键点:盈亏平衡点、最大收益/损失边界点""" # 1. 标注盈亏平衡点 for i in range(len(profits) - 1): if profits[i] * profits[i + 1] <= 0: # 符号改变 # 线性插值找到精确平衡点 p1, profit1 = prices[i], profits[i] p2, profit2 = prices[i + 1], profits[i + 1] if profit2 != profit1: breakeven_price = p1 - profit1 * (p2 - p1) / (profit2 - profit1) plt.plot(breakeven_price, 0, 'ro', markersize=10) plt.annotate(f'Equilibrium Point: {breakeven_price:.4f}', xy=(breakeven_price, 0), xytext=(breakeven_price + (prices.max() - prices.min()) * 0.05, max(profits) * 0.1), arrowprops=dict(arrowstyle='->', color='red'), fontsize=11, color='red', weight='bold') # 2. 找到最大收益和最大损失点 max_profit = max(profits) min_profit = min(profits) # 3. 检查是否存在最大收益/损失的边界点 # 最大收益边界点:收益达到最大值后不再增长的点 max_boundary_points = _find_boundary_points(profits, prices, max_profit, 'max') # 最大损失边界点:损失达到最大值后不再增长的点 min_boundary_points = _find_boundary_points(profits, prices, min_profit, 'min') # 4. 标注最大收益边界点 for bp in max_boundary_points: plt.plot(bp, max_profit, 'go', markersize=10) plt.annotate(f'Max Returns: ({bp:.4f}, {max_profit:.4f})', xy=(bp, max_profit), xytext=(bp + (prices.max() - prices.min()) * 0.05, max_profit + (max_profit - min_profit) * 0.1), arrowprops=dict(arrowstyle='->', color='green'), fontsize=10, color='green', weight='bold') # 5. 标注最大损失边界点 for bp in min_boundary_points: plt.plot(bp, min_profit, 'mo', markersize=10) plt.annotate(f'Max Loss: ({bp:.4f}, {min_profit:.4f})', xy=(bp, min_profit), xytext=(bp + (prices.max() - prices.min()) * 0.05, min_profit - (max_profit - min_profit) * 0.1), arrowprops=dict(arrowstyle='->', color='magenta'), fontsize=10, color='magenta', weight='bold') def _find_boundary_points(profits, prices, extreme_value, _extreme_type): """找到最大收益或最大损失的边界点""" boundary_points = [] tolerance = abs(extreme_value) * 0.001 # 允许的误差范围 # 找到所有接近极值的点 extreme_indices = [] for i, profit in enumerate(profits): if abs(profit - extreme_value) <= tolerance: extreme_indices.append(i) if not extreme_indices: return boundary_points # 找到连续区间的边界点 if len(extreme_indices) > 1: # 检查是否是连续的区间 continuous_regions = [] current_region = [extreme_indices[0]] for i in range(1, len(extreme_indices)): if extreme_indices[i] - extreme_indices[i-1] <= 2: # 允许小的间隔 current_region.append(extreme_indices[i]) else: continuous_regions.append(current_region) current_region = [extreme_indices[i]] continuous_regions.append(current_region) # 对于每个连续区间,标注边界点 for region in continuous_regions: if len(region) > 10: # 只有当区间足够长时才标注边界点 # 左边界点 left_boundary = prices[region[0]] boundary_points.append(left_boundary) # 右边界点 right_boundary = prices[region[-1]] boundary_points.append(right_boundary) return boundary_points def _print_report(results, option_list, prices): """打印分析报告""" print("=" * 60) print("期权分析报告") print("=" * 60) # 期权基本信息 for i, opt in enumerate(option_list): print(f"期权{i+1}: {opt['direction'].upper()} {opt['option_type'].upper()}") print(f" 行权价: {opt['strike_price']}") print(f" 权利金: {opt['premium']}") print(f" 数量: {opt['quantity']}手") # 分析关键指标 if 'combined' in results: profits = results['combined'] print(f"\n【组合分析】") else: profits = results['opt1'] print(f"\n【单期权分析】") max_profit = max(profits) min_profit = min(profits) max_idx = profits.index(max_profit) min_idx = profits.index(min_profit) print(f"最大收益: {max_profit:.4f} (标的价格: {prices[max_idx]:.4f})") print(f"最大损失: {min_profit:.4f} (标的价格: {prices[min_idx]:.4f})") print(f"一单最大收益: {max_profit * 10000:.2f}元") print(f"一单最大亏损: {abs(min_profit) * 10000:.2f}元") # 找盈亏平衡点 breakeven_points = [] for i in range(len(profits) - 1): if profits[i] * profits[i + 1] <= 0: p1, profit1 = prices[i], profits[i] p2, profit2 = prices[i + 1], profits[i + 1] if profit2 != profit1: bp = p1 - profit1 * (p2 - p1) / (profit2 - profit1) breakeven_points.append(bp) if breakeven_points: print(f"盈亏平衡点: {[f'{bp:.4f}' for bp in breakeven_points]}") else: print("无盈亏平衡点") print("=" * 60) analyze_options(('sell', 'call', 0.0199, 1.0, 1), ('buy', 'call', 0.0482, 1.05, 1))