kline_reconstruction.py 18 KB


  1. # K线复原工具
  2. # 用于从交易记录CSV文件中提取开仓记录,获取对应的K线数据并绘制包含均线的K线图
  3. from jqdata import *
  4. import pandas as pd
  5. import numpy as np
  6. import matplotlib.pyplot as plt
  7. import matplotlib.patches as patches
  8. from datetime import datetime, timedelta, date
  9. import re
  10. import os
  11. import warnings
  12. warnings.filterwarnings('ignore')
  13. # 中文字体设置(虽然图片内文字用英文,但保留设置以防需要)
  14. plt.rcParams['font.sans-serif'] = ['SimHei', 'Microsoft YaHei', 'DejaVu Sans']
  15. plt.rcParams['axes.unicode_minus'] = False
  16. def _get_current_directory():
  17. """
  18. 获取当前文件所在目录,兼容 Jupyter notebook 环境
  19. 返回:
  20. str: 当前目录路径
  21. """
  22. try:
  23. # 在普通 Python 脚本中,使用 __file__
  24. current_dir = os.path.dirname(os.path.abspath(__file__))
  25. except NameError:
  26. # 在 Jupyter notebook 环境中,__file__ 不存在,使用当前工作目录
  27. current_dir = os.getcwd()
  28. # 如果当前目录不是 future 目录,尝试查找
  29. if not os.path.exists(os.path.join(current_dir, 'transaction.csv')):
  30. # 尝试查找 future 目录
  31. if 'future' not in current_dir:
  32. # 尝试向上查找 future 目录
  33. parent_dir = os.path.dirname(current_dir)
  34. future_dir = os.path.join(parent_dir, 'future')
  35. if os.path.exists(os.path.join(future_dir, 'transaction.csv')):
  36. current_dir = future_dir
  37. return current_dir
  38. def read_and_filter_open_positions(csv_path):
  39. """
  40. 读取CSV文件并筛选出开仓记录
  41. 参数:
  42. csv_path (str): CSV文件路径
  43. 返回:
  44. pandas.DataFrame: 包含开仓记录的DataFrame
  45. """
  46. try:
  47. df = pd.read_csv(csv_path, encoding='utf-8-sig')
  48. # 筛选交易类型第一个字符为"开"的行
  49. open_positions = df[df['交易类型'].str[0] == '开'].copy()
  50. print(f"从CSV文件中读取到 {len(df)} 条记录")
  51. print(f"筛选出 {len(open_positions)} 条开仓记录")
  52. return open_positions
  53. except Exception as e:
  54. print(f"读取CSV文件时出错: {str(e)}")
  55. return pd.DataFrame()
  56. def extract_contract_code_and_date(row):
  57. """
  58. 从标的列提取合约编号,从日期列提取日期
  59. 参数:
  60. row (pandas.Series): DataFrame的一行数据
  61. 返回:
  62. tuple: (contract_code, trade_date) 或 (None, None) 如果提取失败
  63. """
  64. try:
  65. # 提取合约编号:从"标的"列中提取括号内的内容
  66. target_str = str(row['标的'])
  67. match = re.search(r'\(([^)]+)\)', target_str)
  68. if match:
  69. contract_code = match.group(1)
  70. else:
  71. print(f"无法从标的 '{target_str}' 中提取合约编号")
  72. return None, None
  73. # 提取日期
  74. date_str = str(row['日期'])
  75. try:
  76. trade_date = datetime.strptime(date_str, '%Y-%m-%d').date()
  77. except:
  78. print(f"日期格式错误: {date_str}")
  79. return None, None
  80. return contract_code, trade_date
  81. except Exception as e:
  82. print(f"提取合约编号和日期时出错: {str(e)}")
  83. return None, None
  84. def calculate_trade_days_range(trade_date, days_before=60, days_after=10):
  85. """
  86. 计算交易日范围:往前days_before个交易日,往后days_after个交易日
  87. 参数:
  88. trade_date (date): 开仓日期
  89. days_before (int): 往前交易日数量,默认60
  90. days_after (int): 往后交易日数量,默认10
  91. 返回:
  92. tuple: (start_date, end_date) 或 (None, None) 如果计算失败
  93. """
  94. try:
  95. # 往前找:从trade_date往前找days_before个交易日
  96. # get_trade_days(end_date=trade_date, count=n) 返回包括trade_date在内的n个交易日
  97. # 所以需要count=days_before+1,第一个就是days_before个交易日前的日期
  98. trade_days_before = get_trade_days(end_date=trade_date, count=days_before + 1)
  99. if len(trade_days_before) < days_before + 1:
  100. print(f"无法获取足够的往前交易日,只获取到 {len(trade_days_before)} 个")
  101. return None, None
  102. start_date = trade_days_before[0].date()
  103. # 往后找:从trade_date往后找days_after个交易日
  104. # get_trade_days(start_date=trade_date, count=n) 返回包括trade_date在内的n个交易日
  105. # 所以需要count=days_after+1,最后一个就是days_after个交易日后的日期
  106. trade_days_after = get_trade_days(start_date=trade_date, count=days_after + 1)
  107. if len(trade_days_after) < days_after + 1:
  108. print(f"无法获取足够的往后交易日,只获取到 {len(trade_days_after)} 个")
  109. return None, None
  110. end_date = trade_days_after[-1].date()
  111. return start_date, end_date
  112. except Exception as e:
  113. print(f"计算交易日范围时出错: {str(e)}")
  114. return None, None
  115. def get_kline_data(contract_code, start_date, end_date):
  116. """
  117. 获取指定合约在时间范围内的K线数据
  118. 参数:
  119. contract_code (str): 合约编号,如 'JD2502.XDCE'
  120. start_date (date): 开始日期
  121. end_date (date): 结束日期
  122. 返回:
  123. pandas.DataFrame: 包含OHLC数据的DataFrame,如果获取失败返回None
  124. """
  125. try:
  126. # 使用get_price获取K线数据
  127. price_data = get_price(
  128. contract_code,
  129. start_date=start_date,
  130. end_date=end_date,
  131. frequency='1d',
  132. fields=['open', 'close', 'high', 'low']
  133. )
  134. if price_data is None or len(price_data) == 0:
  135. print(f"未获取到 {contract_code} 在 {start_date} 至 {end_date} 的数据")
  136. return None
  137. return price_data
  138. except Exception as e:
  139. print(f"获取K线数据时出错: {str(e)}")
  140. return None
  141. def calculate_moving_averages(data):
  142. """
  143. 计算5K, 10K, 20K, 30K均线
  144. 参数:
  145. data (pandas.DataFrame): 包含close列的DataFrame
  146. 返回:
  147. pandas.DataFrame: 添加了均线列的DataFrame
  148. """
  149. data = data.copy()
  150. # 计算均线
  151. data['ma5'] = data['close'].rolling(window=5).mean()
  152. data['ma10'] = data['close'].rolling(window=10).mean()
  153. data['ma20'] = data['close'].rolling(window=20).mean()
  154. data['ma30'] = data['close'].rolling(window=30).mean()
  155. return data
  156. def filter_data_with_ma(data):
  157. """
  158. 过滤掉任何一条均线为空的日期
  159. 参数:
  160. data (pandas.DataFrame): 包含均线列的DataFrame
  161. 返回:
  162. pandas.DataFrame: 过滤后的DataFrame
  163. """
  164. # 过滤掉任何一条均线为空的日期
  165. filtered_data = data.dropna(subset=['ma5', 'ma10', 'ma20', 'ma30'])
  166. return filtered_data
  167. def plot_kline_chart(data, contract_code, trade_date, save_path):
  168. """
  169. 绘制K线图(包含均线和开仓日期标注)
  170. 参数:
  171. data (pandas.DataFrame): 包含OHLC和均线数据的DataFrame
  172. contract_code (str): 合约编号
  173. trade_date (date): 开仓日期
  174. save_path (str): 保存路径
  175. """
  176. try:
  177. # 创建图表
  178. fig, ax = plt.subplots(figsize=(16, 10))
  179. # 准备数据
  180. dates = data.index
  181. opens = data['open']
  182. highs = data['high']
  183. lows = data['low']
  184. closes = data['close']
  185. # 找到开仓日期在数据中的位置
  186. trade_date_idx = None
  187. for i, date_idx in enumerate(dates):
  188. if isinstance(date_idx, date):
  189. if date_idx == trade_date:
  190. trade_date_idx = i
  191. break
  192. elif isinstance(date_idx, datetime):
  193. if date_idx.date() == trade_date:
  194. trade_date_idx = i
  195. break
  196. # 绘制K线
  197. for i in range(len(data)):
  198. date_idx = dates[i]
  199. open_price = opens.iloc[i]
  200. high_price = highs.iloc[i]
  201. low_price = lows.iloc[i]
  202. close_price = closes.iloc[i]
  203. # K线颜色:红涨绿跌
  204. color = 'red' if close_price > open_price else 'green'
  205. edge_color = 'darkred' if close_price > open_price else 'darkgreen'
  206. # 绘制影线(最高价到最低价的竖线)
  207. ax.plot([i, i], [low_price, high_price], color='black', linewidth=1)
  208. # 绘制实体(开盘价到收盘价的矩形)
  209. body_height = abs(close_price - open_price)
  210. if body_height == 0:
  211. body_height = 0.01 # 避免高度为0
  212. bottom = min(open_price, close_price)
  213. # 使用矩形绘制K线实体
  214. rect = patches.Rectangle((i-0.4, bottom), 0.8, body_height,
  215. linewidth=1, edgecolor=edge_color,
  216. facecolor=color, alpha=0.8)
  217. ax.add_patch(rect)
  218. # 绘制均线
  219. ax.plot(range(len(data)), data['ma5'], label='MA5', color='blue', linewidth=1.5, alpha=0.8)
  220. ax.plot(range(len(data)), data['ma10'], label='MA10', color='orange', linewidth=1.5, alpha=0.8)
  221. ax.plot(range(len(data)), data['ma20'], label='MA20', color='purple', linewidth=1.5, alpha=0.8)
  222. ax.plot(range(len(data)), data['ma30'], label='MA30', color='brown', linewidth=1.5, alpha=0.8)
  223. # 标注开仓日期位置
  224. if trade_date_idx is not None:
  225. trade_price = closes.iloc[trade_date_idx]
  226. ax.plot(trade_date_idx, trade_price, marker='*', markersize=15,
  227. color='yellow', markeredgecolor='black', markeredgewidth=1.5,
  228. label='Open Position', zorder=10)
  229. # 添加垂直线
  230. ax.axvline(x=trade_date_idx, color='yellow', linestyle='--',
  231. linewidth=2, alpha=0.7, zorder=5)
  232. # 设置图表标题和标签(使用英文)
  233. contract_simple = contract_code.split('.')[0] # 提取合约编号的简约部分
  234. ax.set_title(f'{contract_simple} ({contract_code}) K-Line Chart\n'
  235. f'Period: {dates[0].strftime("%Y-%m-%d")} to {dates[-1].strftime("%Y-%m-%d")} '
  236. f'({len(data)} bars)',
  237. fontsize=14, fontweight='bold', pad=20)
  238. ax.set_xlabel('Time', fontsize=12)
  239. ax.set_ylabel('Price', fontsize=12)
  240. ax.grid(True, alpha=0.3)
  241. ax.legend(loc='upper left', fontsize=10)
  242. # 设置x轴标签
  243. step = max(1, len(data) // 10) # 显示约10个时间标签
  244. tick_positions = range(0, len(data), step)
  245. tick_labels = []
  246. for pos in tick_positions:
  247. date_val = dates[pos]
  248. if isinstance(date_val, date):
  249. tick_labels.append(date_val.strftime('%Y-%m-%d'))
  250. elif isinstance(date_val, datetime):
  251. tick_labels.append(date_val.strftime('%Y-%m-%d'))
  252. else:
  253. tick_labels.append(str(date_val))
  254. ax.set_xticks(tick_positions)
  255. ax.set_xticklabels(tick_labels, rotation=45, ha='right')
  256. # 添加统计信息(使用英文)
  257. max_price = highs.max()
  258. min_price = lows.min()
  259. latest_close = closes.iloc[-1]
  260. first_close = closes.iloc[0]
  261. total_change = (latest_close - first_close) / first_close * 100
  262. stats_text = (f'High: {max_price:.2f}\n'
  263. f'Low: {min_price:.2f}\n'
  264. f'Latest Close: {latest_close:.2f}\n'
  265. f'Total Change: {total_change:+.2f}%')
  266. ax.text(0.02, 0.98, stats_text, transform=ax.transAxes,
  267. verticalalignment='top', bbox=dict(boxstyle='round',
  268. facecolor='wheat', alpha=0.8), fontsize=10)
  269. # 调整布局并保存
  270. plt.tight_layout()
  271. plt.savefig(save_path, dpi=150, bbox_inches='tight')
  272. plt.close()
  273. print(f"K线图已保存到: {save_path}")
  274. except Exception as e:
  275. print(f"绘制K线图时出错: {str(e)}")
  276. raise
  277. def reconstruct_kline_from_transactions(csv_path=None, output_dir=None):
  278. """
  279. 主函数:从交易记录中复原K线图
  280. 参数:
  281. csv_path (str): CSV文件路径,默认为 'Lib/future/transaction.csv'
  282. output_dir (str): 输出目录,默认为 'Lib/future/K'
  283. """
  284. # 设置默认路径
  285. if csv_path is None:
  286. # 获取当前文件所在目录
  287. # 在 Jupyter notebook 中,__file__ 不存在,使用当前工作目录
  288. try:
  289. current_dir = os.path.dirname(os.path.abspath(__file__))
  290. except NameError:
  291. # 在 Jupyter notebook 环境中,使用当前工作目录
  292. current_dir = os.getcwd()
  293. # 如果当前目录不是 future 目录,尝试查找
  294. if not os.path.exists(os.path.join(current_dir, 'transaction.csv')):
  295. # 尝试查找 future 目录
  296. if 'future' in current_dir:
  297. pass # 已经在 future 目录中
  298. else:
  299. # 尝试向上查找 future 目录
  300. parent_dir = os.path.dirname(current_dir)
  301. future_dir = os.path.join(parent_dir, 'future')
  302. if os.path.exists(os.path.join(future_dir, 'transaction.csv')):
  303. current_dir = future_dir
  304. csv_path = os.path.join(current_dir, 'transaction.csv')
  305. if output_dir is None:
  306. # 获取当前文件所在目录
  307. try:
  308. current_dir = os.path.dirname(os.path.abspath(__file__))
  309. except NameError:
  310. # 在 Jupyter notebook 环境中,使用当前工作目录
  311. current_dir = os.getcwd()
  312. # 如果当前目录不是 future 目录,尝试查找
  313. if not os.path.exists(os.path.join(current_dir, 'transaction.csv')):
  314. # 尝试查找 future 目录
  315. if 'future' in current_dir:
  316. pass # 已经在 future 目录中
  317. else:
  318. # 尝试向上查找 future 目录
  319. parent_dir = os.path.dirname(current_dir)
  320. future_dir = os.path.join(parent_dir, 'future')
  321. if os.path.exists(os.path.join(future_dir, 'transaction.csv')):
  322. current_dir = future_dir
  323. output_dir = os.path.join(current_dir, 'K')
  324. # 确保输出目录存在
  325. os.makedirs(output_dir, exist_ok=True)
  326. print(f"输出目录: {output_dir}")
  327. # 1. 读取和筛选开仓记录
  328. print("\n=== 步骤1: 读取和筛选开仓记录 ===")
  329. open_positions = read_and_filter_open_positions(csv_path)
  330. if len(open_positions) == 0:
  331. print("未找到开仓记录,退出")
  332. return
  333. # 2. 处理每条开仓记录
  334. print(f"\n=== 步骤2: 处理 {len(open_positions)} 条开仓记录 ===")
  335. success_count = 0
  336. fail_count = 0
  337. for idx, row in open_positions.iterrows():
  338. print(f"\n--- 处理第 {idx + 1}/{len(open_positions)} 条记录 ---")
  339. try:
  340. # 提取合约编号和日期
  341. contract_code, trade_date = extract_contract_code_and_date(row)
  342. if contract_code is None or trade_date is None:
  343. print(f"跳过:无法提取合约编号或日期")
  344. fail_count += 1
  345. continue
  346. print(f"合约编号: {contract_code}, 开仓日期: {trade_date}")
  347. # 计算交易日范围
  348. start_date, end_date = calculate_trade_days_range(trade_date, days_before=60, days_after=10)
  349. if start_date is None or end_date is None:
  350. print(f"跳过:无法计算交易日范围")
  351. fail_count += 1
  352. continue
  353. print(f"数据范围: {start_date} 至 {end_date}")
  354. # 获取K线数据
  355. kline_data = get_kline_data(contract_code, start_date, end_date)
  356. if kline_data is None or len(kline_data) == 0:
  357. print(f"跳过:无法获取K线数据")
  358. fail_count += 1
  359. continue
  360. print(f"获取到 {len(kline_data)} 条K线数据")
  361. # 计算均线
  362. kline_data = calculate_moving_averages(kline_data)
  363. # 过滤数据
  364. filtered_data = filter_data_with_ma(kline_data)
  365. if len(filtered_data) == 0:
  366. print(f"跳过:过滤后无有效数据")
  367. fail_count += 1
  368. continue
  369. print(f"过滤后剩余 {len(filtered_data)} 条有效数据")
  370. # 生成文件名
  371. contract_simple = contract_code.split('.')[0] # 提取合约编号的简约部分
  372. filename = f"{contract_simple}_{trade_date.strftime('%Y%m%d')}.png"
  373. save_path = os.path.join(output_dir, filename)
  374. # 绘制K线图
  375. plot_kline_chart(filtered_data, contract_code, trade_date, save_path)
  376. success_count += 1
  377. print(f"✓ 成功处理")
  378. except Exception as e:
  379. print(f"✗ 处理时出错: {str(e)}")
  380. fail_count += 1
  381. continue
  382. # 输出统计信息
  383. print(f"\n=== 处理完成 ===")
  384. print(f"成功: {success_count} 条")
  385. print(f"失败: {fail_count} 条")
  386. print(f"总计: {len(open_positions)} 条")
  387. # 使用示例
  388. if __name__ == "__main__":
  389. print("=" * 60)
  390. print("K线复原工具")
  391. print("=" * 60)
  392. reconstruct_kline_from_transactions()
  393. print("\n=== 完成 ===")