analysis_chart.py 14 KB


  1. import pandas as pd
  2. import numpy as np
  3. import matplotlib.pyplot as plt
  4. from statistics import mean
  5. import os
  6. from datetime import datetime
  7. def analyze_options(*options, mark_prices=None):
  8. """
  9. 统一的期权分析方法
  10. 参数:
  11. *options: 一个或多个期权,每个期权格式为 (direction, option_type, premium, strike_price, quantity)
  12. 例如: ('buy', 'call', 0.0456, 2.75, 1)
  13. direction: 方向,buy 或 sell
  14. option_type: 期权类型,call 或 put
  15. premium: 权利金
  16. strike_price: 行权价
  17. quantity: 数量
  18. mark_prices: 可选,需要标注的标的价格列表,如 [1.35, 1.52]
  19. 可以作为关键字参数或最后一个位置参数传递
  20. 示例:
  21. # 单个期权
  22. analyze_options(('buy', 'call', 0.05, 3.0, 1))
  23. # 期权组合
  24. analyze_options(('buy', 'call', 0.08, 2.9, 1), ('sell', 'call', 0.03, 3.1, 1))
  25. # 标注指定价格点(关键字参数方式)
  26. analyze_options(('buy', 'call', 0.05, 3.0, 1), mark_prices=[2.8, 3.2])
  27. # 标注指定价格点(位置参数方式)
  28. analyze_options(('buy', 'call', 0.05, 3.0, 1), [2.8, 3.2])
  29. """
  30. if not options:
  31. raise ValueError("请至少提供一个期权")
  32. # 检查最后一个参数是否是 mark_prices(列表类型且不是期权格式)
  33. actual_options = list(options)
  34. if (mark_prices is None and len(actual_options) > 0 and
  35. isinstance(actual_options[-1], list) and
  36. len(actual_options[-1]) != 5):
  37. # 最后一个参数是列表且不是期权格式,将其视为 mark_prices
  38. mark_prices = actual_options.pop()
  39. if not actual_options:
  40. raise ValueError("请至少提供一个期权")
  41. # 解析期权数据
  42. option_list = []
  43. all_strikes = []
  44. for i, opt in enumerate(actual_options):
  45. if not isinstance(opt, (tuple, list)) or len(opt) != 5:
  46. raise ValueError(f"期权{i+1}格式错误,应为(direction, option_type, premium, strike_price, quantity)")
  47. direction, option_type, premium, strike_price, quantity = opt
  48. option_list.append({
  49. 'direction': direction,
  50. 'option_type': option_type,
  51. 'premium': premium,
  52. 'strike_price': strike_price,
  53. 'quantity': quantity
  54. })
  55. all_strikes.append(strike_price)
  56. # 确定价格分析区间
  57. min_strike = min(all_strikes)
  58. max_strike = max(all_strikes)
  59. price_min = min_strike * 0.7
  60. price_max = max_strike * 1.3
  61. # 生成价格序列
  62. gap = (price_max - price_min) / 1000
  63. prices = np.arange(price_min, price_max + gap, gap)
  64. # 计算每个期权的收益
  65. results = {'price': prices}
  66. for i, opt in enumerate(option_list):
  67. profits = []
  68. for price in prices:
  69. profit = _calculate_profit(opt, price)
  70. profits.append(profit)
  71. results[f'opt{i+1}'] = profits
  72. # 计算组合收益
  73. if len(option_list) > 1:
  74. combined_profits = []
  75. for j in range(len(prices)):
  76. total = sum(results[f'opt{i+1}'][j] for i in range(len(option_list)))
  77. combined_profits.append(total)
  78. results['combined'] = combined_profits
  79. # 绘制图表
  80. _plot_results(results, option_list, prices, mark_prices=mark_prices)
  81. # 打印分析报告
  82. _print_report(results, option_list, prices, mark_prices=mark_prices)
  83. return pd.DataFrame(results)
  84. def _calculate_profit(option, price):
  85. """计算单个期权在特定价格下的收益"""
  86. direction = option['direction']
  87. option_type = option['option_type']
  88. premium = option['premium']
  89. strike_price = option['strike_price']
  90. quantity = option['quantity']
  91. if direction == 'buy' and option_type == 'call':
  92. # 买入认购
  93. if price > strike_price:
  94. return (price - strike_price - premium) * quantity
  95. else:
  96. return -premium * quantity
  97. elif direction == 'sell' and option_type == 'call':
  98. # 卖出认购
  99. if price > strike_price:
  100. return -(price - strike_price - premium) * quantity
  101. else:
  102. return premium * quantity
  103. elif direction == 'buy' and option_type == 'put':
  104. # 买入认沽
  105. if price < strike_price:
  106. return (strike_price - price - premium) * quantity
  107. else:
  108. return -premium * quantity
  109. elif direction == 'sell' and option_type == 'put':
  110. # 卖出认沽
  111. if price < strike_price:
  112. return -(strike_price - price - premium) * quantity
  113. else:
  114. return premium * quantity
  115. return 0
  116. def _plot_results(results, option_list, prices, mark_prices=None):
  117. """绘制分析图表"""
  118. plt.figure(figsize=(14, 10))
  119. plt.rcParams['axes.unicode_minus'] = False
  120. colors = ['blue', 'green', 'orange', 'purple', 'brown']
  121. # 绘制单个期权曲线
  122. for i in range(len(option_list)):
  123. opt = option_list[i]
  124. opt_name = f'opt{i+1}'
  125. strategy_name = f"{opt['direction'].upper()} {opt['option_type'].upper()}"
  126. color = colors[i % len(colors)]
  127. plt.plot(prices, results[opt_name], '--', color=color, linewidth=2, alpha=0.7,
  128. label=f'{opt_name}: {strategy_name}(strike price:{opt["strike_price"]}, premium:{opt["premium"]})')
  129. # 绘制组合曲线
  130. if 'combined' in results:
  131. plt.plot(prices, results['combined'], 'r-', linewidth=3, label='Portfolio returns')
  132. # 添加零线和行权价线
  133. plt.axhline(0, color='gray', linestyle='-', alpha=0.5)
  134. for opt in option_list:
  135. plt.axvline(opt['strike_price'], color='gray', linestyle='--', alpha=0.3)
  136. # 找到并标注关键点(盈亏平衡点、最大收益/损失边界点)
  137. if 'combined' in results:
  138. profits_for_marking = results['combined']
  139. _mark_key_points(profits_for_marking, prices, 'Portfolio')
  140. elif len(option_list) == 1:
  141. profits_for_marking = results['opt1']
  142. _mark_key_points(profits_for_marking, prices, 'Option')
  143. else:
  144. profits_for_marking = None
  145. # 标注指定的价格点
  146. if mark_prices and profits_for_marking is not None:
  147. _mark_specified_prices(mark_prices, profits_for_marking, prices, option_list)
  148. plt.xlabel('Assest Price', fontsize=12)
  149. plt.ylabel('Profit/Loss', fontsize=12)
  150. if len(option_list) == 1:
  151. opt = option_list[0]
  152. title = f'{opt["direction"].upper()} {opt["option_type"].upper()} Option Analysis'
  153. else:
  154. title = f'Portfolio Analysis ({len(option_list)} options)'
  155. plt.title(title, fontsize=14, weight='bold')
  156. plt.grid(True, alpha=0.3)
  157. plt.legend()
  158. plt.tight_layout()
  159. # 保存图片到data/Options目录
  160. timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
  161. filename = f'options_analysis_{timestamp}.png'
  162. save_path = os.path.join('data', 'Options', filename)
  163. plt.savefig(save_path, dpi=300, bbox_inches='tight')
  164. print(f"图片已保存到: {save_path}")
  165. plt.show()
  166. def _mark_key_points(profits, prices, label_prefix):
  167. """标注关键点:盈亏平衡点、最大收益/损失边界点"""
  168. # 1. 标注盈亏平衡点
  169. for i in range(len(profits) - 1):
  170. if profits[i] * profits[i + 1] <= 0: # 符号改变
  171. # 线性插值找到精确平衡点
  172. p1, profit1 = prices[i], profits[i]
  173. p2, profit2 = prices[i + 1], profits[i + 1]
  174. if profit2 != profit1:
  175. breakeven_price = p1 - profit1 * (p2 - p1) / (profit2 - profit1)
  176. plt.plot(breakeven_price, 0, 'ro', markersize=10)
  177. plt.annotate(f'Equilibrium Point: {breakeven_price:.4f}',
  178. xy=(breakeven_price, 0),
  179. xytext=(breakeven_price + (prices.max() - prices.min()) * 0.05, max(profits) * 0.1),
  180. arrowprops=dict(arrowstyle='->', color='red'),
  181. fontsize=11, color='red', weight='bold')
  182. # 2. 找到最大收益和最大损失点
  183. max_profit = max(profits)
  184. min_profit = min(profits)
  185. # 3. 检查是否存在最大收益/损失的边界点
  186. # 最大收益边界点:收益达到最大值后不再增长的点
  187. max_boundary_points = _find_boundary_points(profits, prices, max_profit, 'max')
  188. # 最大损失边界点:损失达到最大值后不再增长的点
  189. min_boundary_points = _find_boundary_points(profits, prices, min_profit, 'min')
  190. # 4. 标注最大收益边界点
  191. for bp in max_boundary_points:
  192. plt.plot(bp, max_profit, 'go', markersize=10)
  193. plt.annotate(f'Max Returns: ({bp:.4f}, {max_profit:.4f})',
  194. xy=(bp, max_profit),
  195. xytext=(bp + (prices.max() - prices.min()) * 0.05, max_profit + (max_profit - min_profit) * 0.1),
  196. arrowprops=dict(arrowstyle='->', color='green'),
  197. fontsize=10, color='green', weight='bold')
  198. # 5. 标注最大损失边界点
  199. for bp in min_boundary_points:
  200. plt.plot(bp, min_profit, 'mo', markersize=10)
  201. plt.annotate(f'Max Loss: ({bp:.4f}, {min_profit:.4f})',
  202. xy=(bp, min_profit),
  203. xytext=(bp + (prices.max() - prices.min()) * 0.05, min_profit - (max_profit - min_profit) * 0.1),
  204. arrowprops=dict(arrowstyle='->', color='magenta'),
  205. fontsize=10, color='magenta', weight='bold')
  206. def _mark_specified_prices(mark_prices, profits, prices, option_list):
  207. """标注指定的标的价格点及其对应的损益"""
  208. if not mark_prices:
  209. return
  210. # 计算每个指定价格对应的损益
  211. for mark_price in mark_prices:
  212. # 找到最接近的价格索引
  213. price_idx = np.argmin(np.abs(prices - mark_price))
  214. actual_price = prices[price_idx]
  215. profit_value = profits[price_idx]
  216. # 绘制标记点(使用黄色星形标记)
  217. plt.plot(actual_price, profit_value, '*', color='gold', markersize=15,
  218. markeredgecolor='darkorange', markeredgewidth=2, zorder=5)
  219. # 添加标注
  220. price_range = prices.max() - prices.min()
  221. profit_range = max(profits) - min(profits) if len(profits) > 0 else 1
  222. # 根据损益值的正负决定标注位置
  223. if profit_value >= 0:
  224. y_offset = profit_range * 0.15
  225. else:
  226. y_offset = -profit_range * 0.15
  227. plt.annotate(f'Price: {actual_price:.4f}\nP/L: {profit_value:.4f}',
  228. xy=(actual_price, profit_value),
  229. xytext=(actual_price + price_range * 0.05, profit_value + y_offset),
  230. arrowprops=dict(arrowstyle='->', color='darkorange', lw=1.5),
  231. fontsize=10, color='darkorange', weight='bold',
  232. bbox=dict(boxstyle='round,pad=0.5', facecolor='wheat', alpha=0.8))
  233. def _find_boundary_points(profits, prices, extreme_value, _extreme_type):
  234. """找到最大收益或最大损失的边界点"""
  235. boundary_points = []
  236. tolerance = abs(extreme_value) * 0.001 # 允许的误差范围
  237. # 找到所有接近极值的点
  238. extreme_indices = []
  239. for i, profit in enumerate(profits):
  240. if abs(profit - extreme_value) <= tolerance:
  241. extreme_indices.append(i)
  242. if not extreme_indices:
  243. return boundary_points
  244. # 找到连续区间的边界点
  245. if len(extreme_indices) > 1:
  246. # 检查是否是连续的区间
  247. continuous_regions = []
  248. current_region = [extreme_indices[0]]
  249. for i in range(1, len(extreme_indices)):
  250. if extreme_indices[i] - extreme_indices[i-1] <= 2: # 允许小的间隔
  251. current_region.append(extreme_indices[i])
  252. else:
  253. continuous_regions.append(current_region)
  254. current_region = [extreme_indices[i]]
  255. continuous_regions.append(current_region)
  256. # 对于每个连续区间,标注边界点
  257. for region in continuous_regions:
  258. if len(region) > 10: # 只有当区间足够长时才标注边界点
  259. # 左边界点
  260. left_boundary = prices[region[0]]
  261. boundary_points.append(left_boundary)
  262. # 右边界点
  263. right_boundary = prices[region[-1]]
  264. boundary_points.append(right_boundary)
  265. return boundary_points
  266. def _print_report(results, option_list, prices, mark_prices=None):
  267. """打印分析报告"""
  268. print("=" * 60)
  269. print("期权分析报告")
  270. print("=" * 60)
  271. # 期权基本信息
  272. for i, opt in enumerate(option_list):
  273. print(f"期权{i+1}: {opt['direction'].upper()} {opt['option_type'].upper()}")
  274. print(f" 行权价: {opt['strike_price']}")
  275. print(f" 权利金: {opt['premium']}")
  276. print(f" 数量: {opt['quantity']}手")
  277. # 分析关键指标
  278. if 'combined' in results:
  279. profits = results['combined']
  280. print(f"\n【组合分析】")
  281. else:
  282. profits = results['opt1']
  283. print(f"\n【单期权分析】")
  284. max_profit = max(profits)
  285. min_profit = min(profits)
  286. max_idx = profits.index(max_profit)
  287. min_idx = profits.index(min_profit)
  288. print(f"最大收益: {max_profit:.4f} (标的价格: {prices[max_idx]:.4f})")
  289. print(f"最大损失: {min_profit:.4f} (标的价格: {prices[min_idx]:.4f})")
  290. print(f"一单最大收益: {max_profit * 10000:.2f}元")
  291. print(f"一单最大亏损: {abs(min_profit) * 10000:.2f}元")
  292. # 找盈亏平衡点
  293. breakeven_points = []
  294. for i in range(len(profits) - 1):
  295. if profits[i] * profits[i + 1] <= 0:
  296. p1, profit1 = prices[i], profits[i]
  297. p2, profit2 = prices[i + 1], profits[i + 1]
  298. if profit2 != profit1:
  299. bp = p1 - profit1 * (p2 - p1) / (profit2 - profit1)
  300. breakeven_points.append(bp)
  301. if breakeven_points:
  302. print(f"盈亏平衡点: {[f'{bp:.4f}' for bp in breakeven_points]}")
  303. else:
  304. print("无盈亏平衡点")
  305. # 打印指定价格点的损益信息
  306. if mark_prices:
  307. print(f"\n【指定价格点分析】")
  308. for mark_price in mark_prices:
  309. # 找到最接近的价格索引
  310. price_idx = np.argmin(np.abs(prices - mark_price))
  311. actual_price = prices[price_idx]
  312. profit_value = profits[price_idx]
  313. print(f"标的价格: {actual_price:.4f} -> 损益: {profit_value:.4f} (一单: {profit_value * 10000:.2f}元)")
  314. print("=" * 60)
  315. analyze_options(('buy', 'put', 0.1636, 1.55, 2), ('sell', 'put', 0.084, 1.45, 1), [1.314, 1.35, 1.52])