kline_reconstruction.py 30 KB


  1. # K线复原工具
  2. # 用于从交易记录CSV文件中提取开仓记录,获取对应的K线数据并绘制包含均线的K线图
  3. from jqdata import *
  4. import pandas as pd
  5. import numpy as np
  6. import matplotlib.pyplot as plt
  7. import matplotlib.patches as patches
  8. from datetime import datetime, timedelta, date
  9. import re
  10. from tqdm import tqdm
  11. import os
  12. import zipfile
  13. import warnings
  14. warnings.filterwarnings('ignore')
  15. # 中文字体设置(虽然图片内文字用英文,但保留设置以防需要)
  16. plt.rcParams['font.sans-serif'] = ['SimHei', 'Microsoft YaHei', 'DejaVu Sans']
  17. plt.rcParams['axes.unicode_minus'] = False
  18. def _get_current_directory():
  19. """
  20. 获取当前文件所在目录,兼容 Jupyter notebook 环境
  21. 返回:
  22. str: 当前目录路径
  23. """
  24. try:
  25. # 在普通 Python 脚本中,使用 __file__
  26. current_dir = os.path.dirname(os.path.abspath(__file__))
  27. except NameError:
  28. # 在 Jupyter notebook 环境中,__file__ 不存在,使用当前工作目录
  29. current_dir = os.getcwd()
  30. # 如果当前目录不是 future 目录,尝试查找
  31. if not os.path.exists(os.path.join(current_dir, 'transaction1.csv')):
  32. # 尝试查找 future 目录
  33. if 'future' not in current_dir:
  34. # 尝试向上查找 future 目录
  35. parent_dir = os.path.dirname(current_dir)
  36. future_dir = os.path.join(parent_dir, 'future')
  37. if os.path.exists(os.path.join(future_dir, 'transaction1.csv')):
  38. current_dir = future_dir
  39. return current_dir
  40. def read_and_filter_open_positions(csv_path):
  41. """
  42. 读取CSV文件并筛选出开仓记录
  43. 参数:
  44. csv_path (str): CSV文件路径
  45. 返回:
  46. pandas.DataFrame: 包含开仓记录的DataFrame
  47. """
  48. # 尝试多种编码格式
  49. encodings = ['utf-8-sig', 'utf-8', 'gbk', 'gb2312', 'gb18030', 'latin1']
  50. for encoding in encodings:
  51. try:
  52. df = pd.read_csv(csv_path, encoding=encoding)
  53. # 筛选交易类型第一个字符为"开"的行
  54. open_positions = df[df['交易类型'].str[0] == '开'].copy()
  55. print(f"成功使用 {encoding} 编码读取CSV文件")
  56. print(f"从CSV文件中读取到 {len(df)} 条记录")
  57. print(f"筛选出 {len(open_positions)} 条开仓记录")
  58. return open_positions
  59. except UnicodeDecodeError:
  60. continue
  61. except Exception as e:
  62. # 如果是其他错误(比如列名不存在),也尝试下一种编码
  63. if encoding == encodings[-1]:
  64. # 最后一种编码也失败了,抛出错误
  65. print(f"读取CSV文件时出错: {str(e)}")
  66. raise
  67. continue
  68. # 所有编码都失败了
  69. print(f"无法使用任何编码格式读取CSV文件: {csv_path}")
  70. return pd.DataFrame()
  71. def extract_contract_code_and_date(row):
  72. """
  73. 从标的列提取合约编号,从日期列提取日期,从委托时间计算实际交易日,从成交价列提取成交价,从交易类型提取开仓方向
  74. 参数:
  75. row (pandas.Series): DataFrame的一行数据
  76. 返回:
  77. tuple: (contract_code, actual_trade_date, trade_price, direction, order_time) 或 (None, None, None, None, None) 如果提取失败
  78. """
  79. try:
  80. # 提取合约编号:从"标的"列中提取括号内的内容
  81. target_str = str(row['标的'])
  82. match = re.search(r'\(([^)]+)\)', target_str)
  83. if match:
  84. contract_code = match.group(1)
  85. else:
  86. print(f"无法从标的 '{target_str}' 中提取合约编号")
  87. return None, None, None
  88. # 提取日期,支持多种日期格式
  89. date_str = str(row['日期']).strip()
  90. # 尝试多种日期格式
  91. date_formats = [
  92. '%Y-%m-%d', # 2025-01-02
  93. '%d/%m/%Y', # 14/10/2025
  94. '%Y/%m/%d', # 2025/01/02
  95. '%d-%m-%Y', # 14-10-2025
  96. '%Y%m%d', # 20250102
  97. ]
  98. base_date = None
  99. for date_format in date_formats:
  100. try:
  101. base_date = datetime.strptime(date_str, date_format).date()
  102. break
  103. except ValueError:
  104. continue
  105. if base_date is None:
  106. print(f"日期格式错误: {date_str} (支持的格式: YYYY-MM-DD, DD/MM/YYYY, YYYY/MM/DD, DD-MM-YYYY, YYYYMMDD)")
  107. return None, None, None
  108. # 提取委托时间,判断是否是晚上(>=21:00)
  109. order_time_str = str(row['委托时间']).strip()
  110. try:
  111. # 解析时间格式 HH:MM:SS 或 HH:MM
  112. time_parts = order_time_str.split(':')
  113. hour = int(time_parts[0])
  114. # 如果委托时间 >= 21:00,需要找到下一个交易日
  115. if hour >= 21:
  116. # 使用get_trade_days获取从base_date开始的交易日
  117. # count=2 表示获取包括base_date在内的2个交易日
  118. # 如果base_date是交易日,则返回[base_date, next_trade_day]
  119. # 如果base_date不是交易日,则返回下一个交易日和再下一个交易日
  120. try:
  121. trade_days = get_trade_days(start_date=base_date, count=2)
  122. # print(f"trade_days: {trade_days}")
  123. if len(trade_days) >= 2:
  124. # 取第二个交易日(索引1)作为实际交易日
  125. next_trade_day = trade_days[1]
  126. if isinstance(next_trade_day, datetime):
  127. actual_trade_date = next_trade_day.date()
  128. elif isinstance(next_trade_day, date):
  129. actual_trade_date = next_trade_day
  130. else:
  131. # 如果类型不对,尝试转换
  132. actual_trade_date = base_date
  133. print(f"警告:获取的交易日类型异常: {type(next_trade_day)}")
  134. elif len(trade_days) == 1:
  135. # 如果只返回1个交易日,说明base_date就是交易日,但已经是最后一个交易日
  136. # 这种情况应该取下一个交易日,但可能超出了数据范围
  137. first_day = trade_days[0]
  138. if isinstance(first_day, datetime):
  139. actual_trade_date = first_day.date()
  140. elif isinstance(first_day, date):
  141. actual_trade_date = first_day
  142. else:
  143. actual_trade_date = base_date
  144. print(f"警告:只获取到1个交易日,可能已到数据边界")
  145. else:
  146. # 如果获取失败,使用base_date
  147. actual_trade_date = base_date
  148. print(f"警告:无法获取下一个交易日,使用原始日期")
  149. except Exception as e:
  150. # 如果获取交易日失败,使用base_date
  151. actual_trade_date = base_date
  152. print(f"获取交易日时出错: {str(e)},使用原始日期")
  153. else:
  154. # 委托时间 < 21:00,使用原始日期
  155. actual_trade_date = base_date
  156. except Exception as e:
  157. # 如果解析时间失败,使用原始日期
  158. print(f"解析委托时间失败: {order_time_str}, 使用原始日期")
  159. actual_trade_date = base_date
  160. # print(f"成交日期:{date_str},委托时间:{order_time_str},实际交易日:{actual_trade_date}")
  161. # 提取成交价
  162. try:
  163. trade_price = float(row['成交价'])
  164. except (ValueError, KeyError):
  165. print(f"无法提取成交价: {row.get('成交价', 'N/A')}")
  166. return None, None, None, None, None
  167. # 提取开仓方向:从"交易类型"列提取,开多是'long',开空是'short'
  168. try:
  169. trade_type = str(row['交易类型']).strip()
  170. if '开多' in trade_type or '多' in trade_type:
  171. direction = 'long'
  172. elif '开空' in trade_type or '空' in trade_type:
  173. direction = 'short'
  174. else:
  175. print(f"无法识别交易方向: {trade_type}")
  176. direction = 'unknown'
  177. except (KeyError, ValueError):
  178. print(f"无法提取交易类型")
  179. direction = 'unknown'
  180. return contract_code, actual_trade_date, trade_price, direction, order_time_str
  181. except Exception as e:
  182. print(f"提取合约编号和日期时出错: {str(e)}")
  183. return None, None, None, None, None
  184. def calculate_trade_days_range(trade_date, days_before=100, days_after=10):
  185. """
  186. 计算交易日范围:往前days_before个交易日,往后days_after个交易日
  187. 参数:
  188. trade_date (date): 开仓日期
  189. days_before (int): 往前交易日数量,默认100
  190. days_after (int): 往后交易日数量,默认10
  191. 返回:
  192. tuple: (start_date, end_date) 或 (None, None) 如果计算失败
  193. """
  194. try:
  195. # 往前找:从trade_date往前找days_before个交易日
  196. # get_trade_days(end_date=trade_date, count=n) 返回包括trade_date在内的n个交易日
  197. # 所以需要count=days_before+1,第一个就是days_before个交易日前的日期
  198. trade_days_before = get_trade_days(end_date=trade_date, count=days_before + 1)
  199. if len(trade_days_before) < days_before + 1:
  200. print(f"无法获取足够的往前交易日,只获取到 {len(trade_days_before)} 个")
  201. return None, None
  202. # 处理返回的日期对象:可能是date或datetime类型
  203. first_day = trade_days_before[0]
  204. if isinstance(first_day, datetime):
  205. start_date = first_day.date()
  206. elif isinstance(first_day, date):
  207. start_date = first_day
  208. else:
  209. start_date = first_day
  210. # 往后找:从trade_date往后找days_after个交易日
  211. # get_trade_days(start_date=trade_date, count=n) 返回包括trade_date在内的n个交易日
  212. # 所以需要count=days_after+1,最后一个就是days_after个交易日后的日期
  213. trade_days_after = get_trade_days(start_date=trade_date, count=days_after + 1)
  214. if len(trade_days_after) < days_after + 1:
  215. print(f"无法获取足够的往后交易日,只获取到 {len(trade_days_after)} 个")
  216. return None, None
  217. # 处理返回的日期对象:可能是date或datetime类型
  218. last_day = trade_days_after[-1]
  219. if isinstance(last_day, datetime):
  220. end_date = last_day.date()
  221. elif isinstance(last_day, date):
  222. end_date = last_day
  223. else:
  224. end_date = last_day
  225. return start_date, end_date
  226. except Exception as e:
  227. print(f"计算交易日范围时出错: {str(e)}")
  228. return None, None
  229. def get_kline_data(contract_code, start_date, end_date):
  230. """
  231. 获取指定合约在时间范围内的K线数据
  232. 参数:
  233. contract_code (str): 合约编号,如 'JD2502.XDCE'
  234. start_date (date): 开始日期
  235. end_date (date): 结束日期
  236. 返回:
  237. pandas.DataFrame: 包含OHLC数据的DataFrame,如果获取失败返回None
  238. """
  239. try:
  240. # 使用get_price获取K线数据
  241. price_data = get_price(
  242. contract_code,
  243. start_date=start_date,
  244. end_date=end_date,
  245. frequency='1d',
  246. fields=['open', 'close', 'high', 'low']
  247. )
  248. if price_data is None or len(price_data) == 0:
  249. print(f"未获取到 {contract_code} 在 {start_date} 至 {end_date} 的数据")
  250. return None
  251. return price_data
  252. except Exception as e:
  253. print(f"获取K线数据时出错: {str(e)}")
  254. return None
  255. def calculate_moving_averages(data):
  256. """
  257. 计算5K, 10K, 20K, 30K均线
  258. 参数:
  259. data (pandas.DataFrame): 包含close列的DataFrame
  260. 返回:
  261. pandas.DataFrame: 添加了均线列的DataFrame
  262. """
  263. data = data.copy()
  264. # 计算均线
  265. data['ma5'] = data['close'].rolling(window=5).mean()
  266. data['ma10'] = data['close'].rolling(window=10).mean()
  267. data['ma20'] = data['close'].rolling(window=20).mean()
  268. data['ma30'] = data['close'].rolling(window=30).mean()
  269. return data
  270. def filter_data_with_ma(data):
  271. """
  272. 过滤掉任何一条均线为空的日期
  273. 参数:
  274. data (pandas.DataFrame): 包含均线列的DataFrame
  275. 返回:
  276. pandas.DataFrame: 过滤后的DataFrame
  277. """
  278. # 过滤掉任何一条均线为空的日期
  279. filtered_data = data.dropna(subset=['ma5', 'ma10', 'ma20', 'ma30'])
  280. return filtered_data
  281. def plot_kline_chart(data, contract_code, trade_date, trade_price, direction, order_time, save_path):
  282. """
  283. 绘制K线图(包含均线和开仓日期、成交价、方向、委托时间标注)
  284. 参数:
  285. data (pandas.DataFrame): 包含OHLC和均线数据的DataFrame
  286. contract_code (str): 合约编号
  287. trade_date (date): 实际交易日
  288. trade_price (float): 成交价
  289. direction (str): 开仓方向,'long'或'short'
  290. order_time (str): 委托时间
  291. save_path (str): 保存路径
  292. """
  293. try:
  294. # 创建图表
  295. fig, ax = plt.subplots(figsize=(16, 10))
  296. # 准备数据
  297. dates = data.index
  298. opens = data['open']
  299. highs = data['high']
  300. lows = data['low']
  301. closes = data['close']
  302. # 调试:打印数据结构信息(仅第一次调用时打印)
  303. if not hasattr(plot_kline_chart, '_debug_printed'):
  304. print(f"\n=== K线数据索引类型调试信息 ===")
  305. print(f"索引类型: {type(dates)}")
  306. print(f"索引数据类型: {type(dates[0]) if len(dates) > 0 else 'N/A'}")
  307. print(f"前3个索引值: {[dates[i] for i in range(min(3, len(dates)))]}")
  308. print(f"索引是否为DatetimeIndex: {isinstance(dates, pd.DatetimeIndex)}")
  309. print(f"================================\n")
  310. plot_kline_chart._debug_printed = True
  311. # 统一转换为date类型进行比较
  312. trade_date_normalized = trade_date
  313. if isinstance(trade_date, datetime):
  314. trade_date_normalized = trade_date.date()
  315. elif isinstance(trade_date, pd.Timestamp):
  316. trade_date_normalized = trade_date.date()
  317. elif not isinstance(trade_date, date):
  318. try:
  319. trade_date_normalized = pd.to_datetime(trade_date).date()
  320. except:
  321. pass
  322. # 找到开仓日期在数据中的位置
  323. trade_date_idx = None
  324. # 如果索引是DatetimeIndex,直接使用date比较
  325. if isinstance(dates, pd.DatetimeIndex):
  326. # 将DatetimeIndex转换为date进行比较
  327. try:
  328. # 使用normalize()将时间部分去掉,然后比较date
  329. trade_date_normalized_dt = pd.Timestamp(trade_date_normalized)
  330. # 查找匹配的日期
  331. mask = dates.normalize() == trade_date_normalized_dt
  332. if mask.any():
  333. trade_date_idx = mask.argmax()
  334. except Exception as e:
  335. print(f"使用DatetimeIndex匹配时出错: {e}")
  336. # 如果还没找到,使用循环方式查找
  337. if trade_date_idx is None:
  338. for i, date_idx in enumerate(dates):
  339. date_to_compare = None
  340. # 处理pandas Timestamp类型
  341. if isinstance(date_idx, pd.Timestamp):
  342. date_to_compare = date_idx.date()
  343. elif isinstance(date_idx, datetime):
  344. date_to_compare = date_idx.date()
  345. elif isinstance(date_idx, date):
  346. date_to_compare = date_idx
  347. elif hasattr(date_idx, 'date'):
  348. try:
  349. date_to_compare = date_idx.date()
  350. except:
  351. pass
  352. # 比较日期
  353. if date_to_compare is not None and date_to_compare == trade_date_normalized:
  354. trade_date_idx = i
  355. break
  356. # 如果还是没找到,尝试查找最接近的日期(前后各找1天)
  357. if trade_date_idx is None:
  358. print(f"警告:未找到精确匹配的交易日 {trade_date_normalized}")
  359. print(f" 尝试查找前后1天的日期...")
  360. for offset in [-1, 1]:
  361. try:
  362. target_date = trade_date_normalized + timedelta(days=offset)
  363. for i, date_idx in enumerate(dates):
  364. date_to_compare = None
  365. if isinstance(date_idx, pd.Timestamp):
  366. date_to_compare = date_idx.date()
  367. elif isinstance(date_idx, datetime):
  368. date_to_compare = date_idx.date()
  369. elif isinstance(date_idx, date):
  370. date_to_compare = date_idx
  371. if date_to_compare == target_date:
  372. trade_date_idx = i
  373. print(f" 找到最接近的日期 {target_date} (偏移{offset}天) 在索引 {i}")
  374. break
  375. if trade_date_idx is not None:
  376. break
  377. except:
  378. pass
  379. # 绘制K线
  380. for i in range(len(data)):
  381. date_idx = dates[i]
  382. open_price = opens.iloc[i]
  383. high_price = highs.iloc[i]
  384. low_price = lows.iloc[i]
  385. close_price = closes.iloc[i]
  386. # K线颜色:红涨绿跌
  387. color = 'red' if close_price > open_price else 'green'
  388. edge_color = 'darkred' if close_price > open_price else 'darkgreen'
  389. # 绘制影线(最高价到最低价的竖线)
  390. ax.plot([i, i], [low_price, high_price], color='black', linewidth=1)
  391. # 绘制实体(开盘价到收盘价的矩形)
  392. body_height = abs(close_price - open_price)
  393. if body_height == 0:
  394. body_height = 0.01 # 避免高度为0
  395. bottom = min(open_price, close_price)
  396. # 使用矩形绘制K线实体
  397. rect = patches.Rectangle((i-0.4, bottom), 0.8, body_height,
  398. linewidth=1, edgecolor=edge_color,
  399. facecolor=color, alpha=0.8)
  400. ax.add_patch(rect)
  401. # 绘制均线
  402. ax.plot(range(len(data)), data['ma5'], label='MA5', color='blue', linewidth=1.5, alpha=0.8)
  403. ax.plot(range(len(data)), data['ma10'], label='MA10', color='orange', linewidth=1.5, alpha=0.8)
  404. ax.plot(range(len(data)), data['ma20'], label='MA20', color='purple', linewidth=1.5, alpha=0.8)
  405. ax.plot(range(len(data)), data['ma30'], label='MA30', color='brown', linewidth=1.5, alpha=0.8)
  406. # 标注开仓日期位置和成交价
  407. if trade_date_idx is not None:
  408. # 绘制标记点(使用成交价)
  409. ax.plot(trade_date_idx, trade_price, marker='*', markersize=20,
  410. color='yellow', markeredgecolor='black', markeredgewidth=2,
  411. label='Open Position', zorder=10)
  412. # 添加垂直线
  413. ax.axvline(x=trade_date_idx, color='yellow', linestyle='--',
  414. linewidth=2, alpha=0.7, zorder=5)
  415. # 标注日期、成交价、方向和委托时间文本
  416. date_label = trade_date.strftime('%Y-%m-%d')
  417. price_label = f'Price: {trade_price:.2f}'
  418. direction_label = f'Direction: {direction}'
  419. time_label = f'Time: {order_time}'
  420. # 计算文本位置(在标记点上方,确保可见)
  421. price_range = highs.max() - lows.min()
  422. y_offset = max(price_range * 0.08, (highs.max() - trade_price) * 0.3) # 至少8%的价格范围,或30%的上方空间
  423. text_y = trade_price + y_offset
  424. # 如果文本位置超出图表范围,放在标记点下方
  425. if text_y > highs.max():
  426. text_y = trade_price - price_range * 0.08
  427. # 添加文本标注(包含所有信息)
  428. annotation_text = f'{date_label}\n{price_label}\n{direction_label}\n{time_label}'
  429. ax.text(trade_date_idx, text_y, annotation_text,
  430. fontsize=10, ha='center', va='bottom',
  431. bbox=dict(boxstyle='round,pad=0.6', facecolor='yellow', alpha=0.9, edgecolor='black', linewidth=1.5),
  432. zorder=11, weight='bold')
  433. else:
  434. # 即使没找到精确日期,也尝试标注(使用最接近的日期)
  435. print(f"警告:交易日 {trade_date} 不在K线数据范围内,无法标注")
  436. # 设置图表标题和标签(使用英文)
  437. contract_simple = contract_code.split('.')[0] # 提取合约编号的简约部分
  438. ax.set_title(f'{contract_simple} ({contract_code}) K-Line Chart\n'
  439. f'Period: {dates[0].strftime("%Y-%m-%d")} to {dates[-1].strftime("%Y-%m-%d")} '
  440. f'({len(data)} bars)',
  441. fontsize=14, fontweight='bold', pad=20)
  442. ax.set_xlabel('Time', fontsize=12)
  443. ax.set_ylabel('Price', fontsize=12)
  444. ax.grid(True, alpha=0.3)
  445. ax.legend(loc='lower left', fontsize=10)
  446. # 设置x轴标签
  447. step = max(1, len(data) // 10) # 显示约10个时间标签
  448. tick_positions = range(0, len(data), step)
  449. tick_labels = []
  450. for pos in tick_positions:
  451. date_val = dates[pos]
  452. if isinstance(date_val, date):
  453. tick_labels.append(date_val.strftime('%Y-%m-%d'))
  454. elif isinstance(date_val, datetime):
  455. tick_labels.append(date_val.strftime('%Y-%m-%d'))
  456. else:
  457. tick_labels.append(str(date_val))
  458. ax.set_xticks(tick_positions)
  459. ax.set_xticklabels(tick_labels, rotation=45, ha='right')
  460. # 添加统计信息(使用英文)
  461. max_price = highs.max()
  462. min_price = lows.min()
  463. latest_close = closes.iloc[-1]
  464. first_close = closes.iloc[0]
  465. total_change = (latest_close - first_close) / first_close * 100
  466. stats_text = (f'High: {max_price:.2f}\n'
  467. f'Low: {min_price:.2f}\n'
  468. f'Latest Close: {latest_close:.2f}\n'
  469. f'Total Change: {total_change:+.2f}%')
  470. ax.text(0.02, 0.98, stats_text, transform=ax.transAxes,
  471. verticalalignment='top', bbox=dict(boxstyle='round',
  472. facecolor='wheat', alpha=0.8), fontsize=10)
  473. # 调整布局并保存
  474. plt.tight_layout()
  475. plt.savefig(save_path, dpi=150, bbox_inches='tight')
  476. # print(f"K线图已保存到: {save_path}")
  477. plt.close(fig)
  478. except Exception as e:
  479. print(f"绘制K线图时出错: {str(e)}")
  480. # 确保即使出错也关闭图形
  481. try:
  482. plt.close('all')
  483. except:
  484. pass
  485. raise
  486. def create_zip_archive(directory_path, zip_filename=None):
  487. """
  488. 将指定目录打包成zip文件
  489. 参数:
  490. directory_path (str): 要打包的目录路径
  491. zip_filename (str): zip文件名,如果为None则自动生成
  492. 返回:
  493. str: zip文件路径
  494. """
  495. if not os.path.exists(directory_path):
  496. print(f"目录不存在: {directory_path}")
  497. return None
  498. if zip_filename is None:
  499. # 自动生成zip文件名
  500. timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
  501. dir_name = os.path.basename(os.path.normpath(directory_path))
  502. zip_filename = f"{dir_name}_{timestamp}.zip"
  503. # 保存在目录的父目录中
  504. zip_path = os.path.join(os.path.dirname(directory_path), zip_filename)
  505. else:
  506. zip_path = zip_filename
  507. try:
  508. print(f"\n=== 开始打包目录: {directory_path} ===")
  509. file_count = 0
  510. with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
  511. # 遍历目录中的所有文件
  512. for root, dirs, files in os.walk(directory_path):
  513. for file in files:
  514. file_path = os.path.join(root, file)
  515. # 计算相对路径:相对于要打包的目录
  516. arcname = os.path.relpath(file_path, directory_path)
  517. zipf.write(file_path, arcname)
  518. file_count += 1
  519. # 获取zip文件大小
  520. zip_size = os.path.getsize(zip_path) / (1024 * 1024) # MB
  521. print(f"✓ 打包完成: {zip_path}")
  522. print(f" 包含文件数: {file_count} 个")
  523. print(f" 文件大小: {zip_size:.2f} MB")
  524. return zip_path
  525. except Exception as e:
  526. print(f"✗ 打包时出错: {str(e)}")
  527. return None
  528. def reconstruct_kline_from_transactions(csv_filename=None, output_dir=None):
  529. """
  530. 主函数:从交易记录中复原K线图
  531. 参数:
  532. csv_filename (str): CSV文件名,如果为None则需要在代码中设置文件名
  533. output_dir (str): 输出目录,如果为None则自动设置为当前目录的K子目录
  534. """
  535. # ========== 路径配置:只需在这里设置CSV文件名 ==========
  536. if csv_filename is None:
  537. # 设置CSV文件名(只需修改文件名,不需要完整路径)
  538. csv_filename = 'transaction4.csv'
  539. # ====================================================
  540. # 获取当前目录并拼接CSV文件路径
  541. current_dir = _get_current_directory()
  542. csv_path = os.path.join(current_dir, csv_filename)
  543. # 自动设置输出目录
  544. if output_dir is None:
  545. output_dir = os.path.join(current_dir, 'K')
  546. # 确保输出目录存在
  547. os.makedirs(output_dir, exist_ok=True)
  548. print(f"输出目录: {output_dir}")
  549. # 1. 读取和筛选开仓记录
  550. print("\n=== 步骤1: 读取和筛选开仓记录 ===")
  551. open_positions = read_and_filter_open_positions(csv_path)
  552. if len(open_positions) == 0:
  553. print("未找到开仓记录,退出")
  554. return
  555. # 2. 处理每条开仓记录
  556. print(f"\n=== 步骤2: 处理 {len(open_positions)} 条开仓记录 ===")
  557. success_count = 0
  558. fail_count = 0
  559. for idx, row in tqdm(open_positions.iterrows(), total=len(open_positions), desc="处理开仓记录"):
  560. # print(f"\n--- 处理第 {idx + 1}/{len(open_positions)} 条记录 ---")
  561. try:
  562. # 提取合约编号、实际交易日、成交价、开仓方向和委托时间
  563. contract_code, actual_trade_date, trade_price, direction, order_time = extract_contract_code_and_date(row)
  564. if contract_code is None or actual_trade_date is None or trade_price is None or direction is None or order_time is None:
  565. print(f"跳过:无法提取完整信息(合约编号、日期、成交价、方向或委托时间)")
  566. fail_count += 1
  567. continue
  568. # 计算交易日范围
  569. start_date, end_date = calculate_trade_days_range(actual_trade_date, days_before=100, days_after=10)
  570. if start_date is None or end_date is None:
  571. print(f"跳过:无法计算交易日范围")
  572. fail_count += 1
  573. continue
  574. # 获取K线数据
  575. kline_data = get_kline_data(contract_code, start_date, end_date)
  576. if kline_data is None or len(kline_data) == 0:
  577. print(f"跳过:无法获取K线数据")
  578. fail_count += 1
  579. continue
  580. # 计算均线
  581. kline_data = calculate_moving_averages(kline_data)
  582. # 过滤数据
  583. filtered_data = filter_data_with_ma(kline_data)
  584. if len(filtered_data) == 0:
  585. print(f"跳过:过滤后无有效数据")
  586. fail_count += 1
  587. continue
  588. # 生成文件名
  589. contract_simple = contract_code.split('.')[0] # 提取合约编号的简约部分
  590. filename = f"{contract_simple}_{actual_trade_date.strftime('%Y%m%d')}.png"
  591. save_path = os.path.join(output_dir, filename)
  592. # 绘制K线图(传入实际交易日和成交价)
  593. plot_kline_chart(filtered_data, contract_code, actual_trade_date, trade_price, direction, order_time, save_path)
  594. success_count += 1
  595. except Exception as e:
  596. print(f"✗ 处理时出错: {str(e)}")
  597. fail_count += 1
  598. continue
  599. # 输出统计信息
  600. print(f"\n=== 处理完成 ===")
  601. print(f"成功: {success_count} 条")
  602. print(f"失败: {fail_count} 条")
  603. print(f"总计: {success_count + fail_count} 条")
  604. # 3. 打包图片目录
  605. if success_count > 0:
  606. print(f"\n=== 步骤3: 打包图片目录 ===")
  607. zip_path = create_zip_archive(output_dir)
  608. if zip_path:
  609. print(f"✓ 打包文件已保存: {zip_path}")
  610. else:
  611. print(f"✗ 打包失败")
  612. else:
  613. print(f"\n未生成任何图片,跳过打包步骤")
  614. # 使用示例
  615. if __name__ == "__main__":
  616. print("=" * 60)
  617. print("K线复原工具")
  618. print("=" * 60)
  619. reconstruct_kline_from_transactions()
  620. print("\n=== 完成 ===")