# K线复原工具 # 用于从交易记录CSV文件中提取开仓记录,获取对应的K线数据并绘制包含均线的K线图 from jqdata import * import pandas as pd import numpy as np import matplotlib.pyplot as plt import matplotlib.patches as patches from datetime import datetime, timedelta, date import re from tqdm import tqdm import os import zipfile import warnings warnings.filterwarnings('ignore') # 中文字体设置(虽然图片内文字用英文,但保留设置以防需要) plt.rcParams['font.sans-serif'] = ['SimHei', 'Microsoft YaHei', 'DejaVu Sans'] plt.rcParams['axes.unicode_minus'] = False def _get_current_directory(): """ 获取当前文件所在目录,兼容 Jupyter notebook 环境 返回: str: 当前目录路径 """ try: # 在普通 Python 脚本中,使用 __file__ current_dir = os.path.dirname(os.path.abspath(__file__)) except NameError: # 在 Jupyter notebook 环境中,__file__ 不存在,使用当前工作目录 current_dir = os.getcwd() # 如果当前目录不是 future 目录,尝试查找 if not os.path.exists(os.path.join(current_dir, 'transaction1.csv')): # 尝试查找 future 目录 if 'future' not in current_dir: # 尝试向上查找 future 目录 parent_dir = os.path.dirname(current_dir) future_dir = os.path.join(parent_dir, 'future') if os.path.exists(os.path.join(future_dir, 'transaction1.csv')): current_dir = future_dir return current_dir def read_and_filter_open_positions(csv_path): """ 读取CSV文件并筛选出开仓记录 参数: csv_path (str): CSV文件路径 返回: pandas.DataFrame: 包含开仓记录的DataFrame """ # 尝试多种编码格式 encodings = ['utf-8-sig', 'utf-8', 'gbk', 'gb2312', 'gb18030', 'latin1'] for encoding in encodings: try: df = pd.read_csv(csv_path, encoding=encoding) # 筛选交易类型第一个字符为"开"的行 open_positions = df[df['交易类型'].str[0] == '开'].copy() print(f"成功使用 {encoding} 编码读取CSV文件") print(f"从CSV文件中读取到 {len(df)} 条记录") print(f"筛选出 {len(open_positions)} 条开仓记录") return open_positions except UnicodeDecodeError: continue except Exception as e: # 如果是其他错误(比如列名不存在),也尝试下一种编码 if encoding == encodings[-1]: # 最后一种编码也失败了,抛出错误 print(f"读取CSV文件时出错: {str(e)}") raise continue # 所有编码都失败了 print(f"无法使用任何编码格式读取CSV文件: {csv_path}") return pd.DataFrame() def extract_contract_code_and_date(row): """ 从标的列提取合约编号,从日期列提取日期,从委托时间计算实际交易日,从成交价列提取成交价,从交易类型提取开仓方向 参数: row (pandas.Series): DataFrame的一行数据 返回: tuple: (contract_code, actual_trade_date, trade_price, direction, order_time) 或 (None, None, None, None, None) 如果提取失败 """ try: # 提取合约编号:从"标的"列中提取括号内的内容 target_str = str(row['标的']) match = re.search(r'\(([^)]+)\)', target_str) if match: contract_code = match.group(1) else: print(f"无法从标的 '{target_str}' 中提取合约编号") return None, None, None # 提取日期,支持多种日期格式 date_str = str(row['日期']).strip() # 尝试多种日期格式 date_formats = [ '%Y-%m-%d', # 2025-01-02 '%d/%m/%Y', # 14/10/2025 '%Y/%m/%d', # 2025/01/02 '%d-%m-%Y', # 14-10-2025 '%Y%m%d', # 20250102 ] base_date = None for date_format in date_formats: try: base_date = datetime.strptime(date_str, date_format).date() break except ValueError: continue if base_date is None: print(f"日期格式错误: {date_str} (支持的格式: YYYY-MM-DD, DD/MM/YYYY, YYYY/MM/DD, DD-MM-YYYY, YYYYMMDD)") return None, None, None # 提取委托时间,判断是否是晚上(>=21:00) order_time_str = str(row['委托时间']).strip() try: # 解析时间格式 HH:MM:SS 或 HH:MM time_parts = order_time_str.split(':') hour = int(time_parts[0]) # 如果委托时间 >= 21:00,需要找到下一个交易日 if hour >= 21: # 使用get_trade_days获取从base_date开始的交易日 # count=2 表示获取包括base_date在内的2个交易日 # 如果base_date是交易日,则返回[base_date, next_trade_day] # 如果base_date不是交易日,则返回下一个交易日和再下一个交易日 try: trade_days = get_trade_days(start_date=base_date, count=2) # print(f"trade_days: {trade_days}") if len(trade_days) >= 2: # 取第二个交易日(索引1)作为实际交易日 next_trade_day = trade_days[1] if isinstance(next_trade_day, datetime): actual_trade_date = next_trade_day.date() elif isinstance(next_trade_day, date): actual_trade_date = next_trade_day else: # 如果类型不对,尝试转换 actual_trade_date = base_date print(f"警告:获取的交易日类型异常: {type(next_trade_day)}") elif len(trade_days) == 1: # 如果只返回1个交易日,说明base_date就是交易日,但已经是最后一个交易日 # 这种情况应该取下一个交易日,但可能超出了数据范围 first_day = trade_days[0] if isinstance(first_day, datetime): actual_trade_date = first_day.date() elif isinstance(first_day, date): actual_trade_date = first_day else: actual_trade_date = base_date print(f"警告:只获取到1个交易日,可能已到数据边界") else: # 如果获取失败,使用base_date actual_trade_date = base_date print(f"警告:无法获取下一个交易日,使用原始日期") except Exception as e: # 如果获取交易日失败,使用base_date actual_trade_date = base_date print(f"获取交易日时出错: {str(e)},使用原始日期") else: # 委托时间 < 21:00,使用原始日期 actual_trade_date = base_date except Exception as e: # 如果解析时间失败,使用原始日期 print(f"解析委托时间失败: {order_time_str}, 使用原始日期") actual_trade_date = base_date # print(f"成交日期:{date_str},委托时间:{order_time_str},实际交易日:{actual_trade_date}") # 提取成交价 try: trade_price = float(row['成交价']) except (ValueError, KeyError): print(f"无法提取成交价: {row.get('成交价', 'N/A')}") return None, None, None, None, None # 提取开仓方向:从"交易类型"列提取,开多是'long',开空是'short' try: trade_type = str(row['交易类型']).strip() if '开多' in trade_type or '多' in trade_type: direction = 'long' elif '开空' in trade_type or '空' in trade_type: direction = 'short' else: print(f"无法识别交易方向: {trade_type}") direction = 'unknown' except (KeyError, ValueError): print(f"无法提取交易类型") direction = 'unknown' return contract_code, actual_trade_date, trade_price, direction, order_time_str except Exception as e: print(f"提取合约编号和日期时出错: {str(e)}") return None, None, None, None, None def calculate_trade_days_range(trade_date, days_before=100, days_after=10): """ 计算交易日范围:往前days_before个交易日,往后days_after个交易日 参数: trade_date (date): 开仓日期 days_before (int): 往前交易日数量,默认100 days_after (int): 往后交易日数量,默认10 返回: tuple: (start_date, end_date) 或 (None, None) 如果计算失败 """ try: # 往前找:从trade_date往前找days_before个交易日 # get_trade_days(end_date=trade_date, count=n) 返回包括trade_date在内的n个交易日 # 所以需要count=days_before+1,第一个就是days_before个交易日前的日期 trade_days_before = get_trade_days(end_date=trade_date, count=days_before + 1) if len(trade_days_before) < days_before + 1: print(f"无法获取足够的往前交易日,只获取到 {len(trade_days_before)} 个") return None, None # 处理返回的日期对象:可能是date或datetime类型 first_day = trade_days_before[0] if isinstance(first_day, datetime): start_date = first_day.date() elif isinstance(first_day, date): start_date = first_day else: start_date = first_day # 往后找:从trade_date往后找days_after个交易日 # get_trade_days(start_date=trade_date, count=n) 返回包括trade_date在内的n个交易日 # 所以需要count=days_after+1,最后一个就是days_after个交易日后的日期 trade_days_after = get_trade_days(start_date=trade_date, count=days_after + 1) if len(trade_days_after) < days_after + 1: print(f"无法获取足够的往后交易日,只获取到 {len(trade_days_after)} 个") return None, None # 处理返回的日期对象:可能是date或datetime类型 last_day = trade_days_after[-1] if isinstance(last_day, datetime): end_date = last_day.date() elif isinstance(last_day, date): end_date = last_day else: end_date = last_day return start_date, end_date except Exception as e: print(f"计算交易日范围时出错: {str(e)}") return None, None def get_kline_data(contract_code, start_date, end_date): """ 获取指定合约在时间范围内的K线数据 参数: contract_code (str): 合约编号,如 'JD2502.XDCE' start_date (date): 开始日期 end_date (date): 结束日期 返回: pandas.DataFrame: 包含OHLC数据的DataFrame,如果获取失败返回None """ try: # 使用get_price获取K线数据 price_data = get_price( contract_code, start_date=start_date, end_date=end_date, frequency='1d', fields=['open', 'close', 'high', 'low'] ) if price_data is None or len(price_data) == 0: print(f"未获取到 {contract_code} 在 {start_date} 至 {end_date} 的数据") return None return price_data except Exception as e: print(f"获取K线数据时出错: {str(e)}") return None def calculate_moving_averages(data): """ 计算5K, 10K, 20K, 30K均线 参数: data (pandas.DataFrame): 包含close列的DataFrame 返回: pandas.DataFrame: 添加了均线列的DataFrame """ data = data.copy() # 计算均线 data['ma5'] = data['close'].rolling(window=5).mean() data['ma10'] = data['close'].rolling(window=10).mean() data['ma20'] = data['close'].rolling(window=20).mean() data['ma30'] = data['close'].rolling(window=30).mean() return data def filter_data_with_ma(data): """ 过滤掉任何一条均线为空的日期 参数: data (pandas.DataFrame): 包含均线列的DataFrame 返回: pandas.DataFrame: 过滤后的DataFrame """ # 过滤掉任何一条均线为空的日期 filtered_data = data.dropna(subset=['ma5', 'ma10', 'ma20', 'ma30']) return filtered_data def plot_kline_chart(data, contract_code, trade_date, trade_price, direction, order_time, save_path): """ 绘制K线图(包含均线和开仓日期、成交价、方向、委托时间标注) 参数: data (pandas.DataFrame): 包含OHLC和均线数据的DataFrame contract_code (str): 合约编号 trade_date (date): 实际交易日 trade_price (float): 成交价 direction (str): 开仓方向,'long'或'short' order_time (str): 委托时间 save_path (str): 保存路径 """ try: # 创建图表 fig, ax = plt.subplots(figsize=(16, 10)) # 准备数据 dates = data.index opens = data['open'] highs = data['high'] lows = data['low'] closes = data['close'] # 调试:打印数据结构信息(仅第一次调用时打印) if not hasattr(plot_kline_chart, '_debug_printed'): print(f"\n=== K线数据索引类型调试信息 ===") print(f"索引类型: {type(dates)}") print(f"索引数据类型: {type(dates[0]) if len(dates) > 0 else 'N/A'}") print(f"前3个索引值: {[dates[i] for i in range(min(3, len(dates)))]}") print(f"索引是否为DatetimeIndex: {isinstance(dates, pd.DatetimeIndex)}") print(f"================================\n") plot_kline_chart._debug_printed = True # 统一转换为date类型进行比较 trade_date_normalized = trade_date if isinstance(trade_date, datetime): trade_date_normalized = trade_date.date() elif isinstance(trade_date, pd.Timestamp): trade_date_normalized = trade_date.date() elif not isinstance(trade_date, date): try: trade_date_normalized = pd.to_datetime(trade_date).date() except: pass # 找到开仓日期在数据中的位置 trade_date_idx = None # 如果索引是DatetimeIndex,直接使用date比较 if isinstance(dates, pd.DatetimeIndex): # 将DatetimeIndex转换为date进行比较 try: # 使用normalize()将时间部分去掉,然后比较date trade_date_normalized_dt = pd.Timestamp(trade_date_normalized) # 查找匹配的日期 mask = dates.normalize() == trade_date_normalized_dt if mask.any(): trade_date_idx = mask.argmax() except Exception as e: print(f"使用DatetimeIndex匹配时出错: {e}") # 如果还没找到,使用循环方式查找 if trade_date_idx is None: for i, date_idx in enumerate(dates): date_to_compare = None # 处理pandas Timestamp类型 if isinstance(date_idx, pd.Timestamp): date_to_compare = date_idx.date() elif isinstance(date_idx, datetime): date_to_compare = date_idx.date() elif isinstance(date_idx, date): date_to_compare = date_idx elif hasattr(date_idx, 'date'): try: date_to_compare = date_idx.date() except: pass # 比较日期 if date_to_compare is not None and date_to_compare == trade_date_normalized: trade_date_idx = i break # 如果还是没找到,尝试查找最接近的日期(前后各找1天) if trade_date_idx is None: print(f"警告:未找到精确匹配的交易日 {trade_date_normalized}") print(f" 尝试查找前后1天的日期...") for offset in [-1, 1]: try: target_date = trade_date_normalized + timedelta(days=offset) for i, date_idx in enumerate(dates): date_to_compare = None if isinstance(date_idx, pd.Timestamp): date_to_compare = date_idx.date() elif isinstance(date_idx, datetime): date_to_compare = date_idx.date() elif isinstance(date_idx, date): date_to_compare = date_idx if date_to_compare == target_date: trade_date_idx = i print(f" 找到最接近的日期 {target_date} (偏移{offset}天) 在索引 {i}") break if trade_date_idx is not None: break except: pass # 绘制K线 for i in range(len(data)): date_idx = dates[i] open_price = opens.iloc[i] high_price = highs.iloc[i] low_price = lows.iloc[i] close_price = closes.iloc[i] # K线颜色:红涨绿跌 color = 'red' if close_price > open_price else 'green' edge_color = 'darkred' if close_price > open_price else 'darkgreen' # 绘制影线(最高价到最低价的竖线) ax.plot([i, i], [low_price, high_price], color='black', linewidth=1) # 绘制实体(开盘价到收盘价的矩形) body_height = abs(close_price - open_price) if body_height == 0: body_height = 0.01 # 避免高度为0 bottom = min(open_price, close_price) # 使用矩形绘制K线实体 rect = patches.Rectangle((i-0.4, bottom), 0.8, body_height, linewidth=1, edgecolor=edge_color, facecolor=color, alpha=0.8) ax.add_patch(rect) # 绘制均线 ax.plot(range(len(data)), data['ma5'], label='MA5', color='blue', linewidth=1.5, alpha=0.8) ax.plot(range(len(data)), data['ma10'], label='MA10', color='orange', linewidth=1.5, alpha=0.8) ax.plot(range(len(data)), data['ma20'], label='MA20', color='purple', linewidth=1.5, alpha=0.8) ax.plot(range(len(data)), data['ma30'], label='MA30', color='brown', linewidth=1.5, alpha=0.8) # 标注开仓日期位置和成交价 if trade_date_idx is not None: # 绘制标记点(使用成交价) ax.plot(trade_date_idx, trade_price, marker='*', markersize=20, color='yellow', markeredgecolor='black', markeredgewidth=2, label='Open Position', zorder=10) # 添加垂直线 ax.axvline(x=trade_date_idx, color='yellow', linestyle='--', linewidth=2, alpha=0.7, zorder=5) # 标注日期、成交价、方向和委托时间文本 date_label = trade_date.strftime('%Y-%m-%d') price_label = f'Price: {trade_price:.2f}' direction_label = f'Direction: {direction}' time_label = f'Time: {order_time}' # 计算文本位置(在标记点上方,确保可见) price_range = highs.max() - lows.min() y_offset = max(price_range * 0.08, (highs.max() - trade_price) * 0.3) # 至少8%的价格范围,或30%的上方空间 text_y = trade_price + y_offset # 如果文本位置超出图表范围,放在标记点下方 if text_y > highs.max(): text_y = trade_price - price_range * 0.08 # 添加文本标注(包含所有信息) annotation_text = f'{date_label}\n{price_label}\n{direction_label}\n{time_label}' ax.text(trade_date_idx, text_y, annotation_text, fontsize=10, ha='center', va='bottom', bbox=dict(boxstyle='round,pad=0.6', facecolor='yellow', alpha=0.9, edgecolor='black', linewidth=1.5), zorder=11, weight='bold') else: # 即使没找到精确日期,也尝试标注(使用最接近的日期) print(f"警告:交易日 {trade_date} 不在K线数据范围内,无法标注") # 设置图表标题和标签(使用英文) contract_simple = contract_code.split('.')[0] # 提取合约编号的简约部分 ax.set_title(f'{contract_simple} ({contract_code}) K-Line Chart\n' f'Period: {dates[0].strftime("%Y-%m-%d")} to {dates[-1].strftime("%Y-%m-%d")} ' f'({len(data)} bars)', fontsize=14, fontweight='bold', pad=20) ax.set_xlabel('Time', fontsize=12) ax.set_ylabel('Price', fontsize=12) ax.grid(True, alpha=0.3) ax.legend(loc='lower left', fontsize=10) # 设置x轴标签 step = max(1, len(data) // 10) # 显示约10个时间标签 tick_positions = range(0, len(data), step) tick_labels = [] for pos in tick_positions: date_val = dates[pos] if isinstance(date_val, date): tick_labels.append(date_val.strftime('%Y-%m-%d')) elif isinstance(date_val, datetime): tick_labels.append(date_val.strftime('%Y-%m-%d')) else: tick_labels.append(str(date_val)) ax.set_xticks(tick_positions) ax.set_xticklabels(tick_labels, rotation=45, ha='right') # 添加统计信息(使用英文) max_price = highs.max() min_price = lows.min() latest_close = closes.iloc[-1] first_close = closes.iloc[0] total_change = (latest_close - first_close) / first_close * 100 stats_text = (f'High: {max_price:.2f}\n' f'Low: {min_price:.2f}\n' f'Latest Close: {latest_close:.2f}\n' f'Total Change: {total_change:+.2f}%') ax.text(0.02, 0.98, stats_text, transform=ax.transAxes, verticalalignment='top', bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.8), fontsize=10) # 调整布局并保存 plt.tight_layout() plt.savefig(save_path, dpi=150, bbox_inches='tight') # print(f"K线图已保存到: {save_path}") plt.close(fig) except Exception as e: print(f"绘制K线图时出错: {str(e)}") # 确保即使出错也关闭图形 try: plt.close('all') except: pass raise def create_zip_archive(directory_path, zip_filename=None): """ 将指定目录打包成zip文件 参数: directory_path (str): 要打包的目录路径 zip_filename (str): zip文件名,如果为None则自动生成 返回: str: zip文件路径 """ if not os.path.exists(directory_path): print(f"目录不存在: {directory_path}") return None if zip_filename is None: # 自动生成zip文件名 timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') dir_name = os.path.basename(os.path.normpath(directory_path)) zip_filename = f"{dir_name}_{timestamp}.zip" # 保存在目录的父目录中 zip_path = os.path.join(os.path.dirname(directory_path), zip_filename) else: zip_path = zip_filename try: print(f"\n=== 开始打包目录: {directory_path} ===") file_count = 0 with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf: # 遍历目录中的所有文件 for root, dirs, files in os.walk(directory_path): for file in files: file_path = os.path.join(root, file) # 计算相对路径:相对于要打包的目录 arcname = os.path.relpath(file_path, directory_path) zipf.write(file_path, arcname) file_count += 1 # 获取zip文件大小 zip_size = os.path.getsize(zip_path) / (1024 * 1024) # MB print(f"✓ 打包完成: {zip_path}") print(f" 包含文件数: {file_count} 个") print(f" 文件大小: {zip_size:.2f} MB") return zip_path except Exception as e: print(f"✗ 打包时出错: {str(e)}") return None def reconstruct_kline_from_transactions(csv_filename=None, output_dir=None): """ 主函数:从交易记录中复原K线图 参数: csv_filename (str): CSV文件名,如果为None则需要在代码中设置文件名 output_dir (str): 输出目录,如果为None则自动设置为当前目录的K子目录 """ # ========== 路径配置:只需在这里设置CSV文件名 ========== if csv_filename is None: # 设置CSV文件名(只需修改文件名,不需要完整路径) csv_filename = 'transaction4.csv' # ==================================================== # 获取当前目录并拼接CSV文件路径 current_dir = _get_current_directory() csv_path = os.path.join(current_dir, csv_filename) # 自动设置输出目录 if output_dir is None: output_dir = os.path.join(current_dir, 'K') # 确保输出目录存在 os.makedirs(output_dir, exist_ok=True) print(f"输出目录: {output_dir}") # 1. 读取和筛选开仓记录 print("\n=== 步骤1: 读取和筛选开仓记录 ===") open_positions = read_and_filter_open_positions(csv_path) if len(open_positions) == 0: print("未找到开仓记录,退出") return # 2. 处理每条开仓记录 print(f"\n=== 步骤2: 处理 {len(open_positions)} 条开仓记录 ===") success_count = 0 fail_count = 0 for idx, row in tqdm(open_positions.iterrows(), total=len(open_positions), desc="处理开仓记录"): # print(f"\n--- 处理第 {idx + 1}/{len(open_positions)} 条记录 ---") try: # 提取合约编号、实际交易日、成交价、开仓方向和委托时间 contract_code, actual_trade_date, trade_price, direction, order_time = extract_contract_code_and_date(row) if contract_code is None or actual_trade_date is None or trade_price is None or direction is None or order_time is None: print(f"跳过:无法提取完整信息(合约编号、日期、成交价、方向或委托时间)") fail_count += 1 continue # 计算交易日范围 start_date, end_date = calculate_trade_days_range(actual_trade_date, days_before=100, days_after=10) if start_date is None or end_date is None: print(f"跳过:无法计算交易日范围") fail_count += 1 continue # 获取K线数据 kline_data = get_kline_data(contract_code, start_date, end_date) if kline_data is None or len(kline_data) == 0: print(f"跳过:无法获取K线数据") fail_count += 1 continue # 计算均线 kline_data = calculate_moving_averages(kline_data) # 过滤数据 filtered_data = filter_data_with_ma(kline_data) if len(filtered_data) == 0: print(f"跳过:过滤后无有效数据") fail_count += 1 continue # 生成文件名 contract_simple = contract_code.split('.')[0] # 提取合约编号的简约部分 filename = f"{contract_simple}_{actual_trade_date.strftime('%Y%m%d')}.png" save_path = os.path.join(output_dir, filename) # 绘制K线图(传入实际交易日和成交价) plot_kline_chart(filtered_data, contract_code, actual_trade_date, trade_price, direction, order_time, save_path) success_count += 1 except Exception as e: print(f"✗ 处理时出错: {str(e)}") fail_count += 1 continue # 输出统计信息 print(f"\n=== 处理完成 ===") print(f"成功: {success_count} 条") print(f"失败: {fail_count} 条") print(f"总计: {success_count + fail_count} 条") # 3. 打包图片目录 if success_count > 0: print(f"\n=== 步骤3: 打包图片目录 ===") zip_path = create_zip_archive(output_dir) if zip_path: print(f"✓ 打包文件已保存: {zip_path}") else: print(f"✗ 打包失败") else: print(f"\n未生成任何图片,跳过打包步骤") # 使用示例 if __name__ == "__main__": print("=" * 60) print("K线复原工具") print("=" * 60) reconstruct_kline_from_transactions() print("\n=== 完成 ===")