||
- # K线复原工具
- # 用于从交易记录CSV文件中提取开仓记录,获取对应的K线数据并绘制包含均线的K线图
- from jqdata import *
- import pandas as pd
- import numpy as np
- import matplotlib.pyplot as plt
- import matplotlib.patches as patches
- from datetime import datetime, timedelta, date
- import re
- from tqdm import tqdm
- import os
- import zipfile
- import warnings
- warnings.filterwarnings('ignore')
- # 中文字体设置(虽然图片内文字用英文,但保留设置以防需要)
- plt.rcParams['font.sans-serif'] = ['SimHei', 'Microsoft YaHei', 'DejaVu Sans']
- plt.rcParams['axes.unicode_minus'] = False
- def _get_current_directory():
- """
- 获取当前文件所在目录,兼容 Jupyter notebook 环境
-
- 返回:
- str: 当前目录路径
- """
- try:
- # 在普通 Python 脚本中,使用 __file__
- current_dir = os.path.dirname(os.path.abspath(__file__))
- except NameError:
- # 在 Jupyter notebook 环境中,__file__ 不存在,使用当前工作目录
- current_dir = os.getcwd()
- # 如果当前目录不是 future 目录,尝试查找
- if not os.path.exists(os.path.join(current_dir, 'transaction1.csv')):
- # 尝试查找 future 目录
- if 'future' not in current_dir:
- # 尝试向上查找 future 目录
- parent_dir = os.path.dirname(current_dir)
- future_dir = os.path.join(parent_dir, 'future')
- if os.path.exists(os.path.join(future_dir, 'transaction1.csv')):
- current_dir = future_dir
- return current_dir
- def read_and_filter_open_positions(csv_path):
- """
- 读取CSV文件并筛选出开仓记录
-
- 参数:
- csv_path (str): CSV文件路径
-
- 返回:
- pandas.DataFrame: 包含开仓记录的DataFrame
- """
- # 尝试多种编码格式
- encodings = ['utf-8-sig', 'utf-8', 'gbk', 'gb2312', 'gb18030', 'latin1']
-
- for encoding in encodings:
- try:
- df = pd.read_csv(csv_path, encoding=encoding)
-
- # 筛选交易类型第一个字符为"开"的行
- open_positions = df[df['交易类型'].str[0] == '开'].copy()
-
- print(f"成功使用 {encoding} 编码读取CSV文件")
- print(f"从CSV文件中读取到 {len(df)} 条记录")
- print(f"筛选出 {len(open_positions)} 条开仓记录")
-
- return open_positions
- except UnicodeDecodeError:
- continue
- except Exception as e:
- # 如果是其他错误(比如列名不存在),也尝试下一种编码
- if encoding == encodings[-1]:
- # 最后一种编码也失败了,抛出错误
- print(f"读取CSV文件时出错: {str(e)}")
- raise
- continue
-
- # 所有编码都失败了
- print(f"无法使用任何编码格式读取CSV文件: {csv_path}")
- return pd.DataFrame()
- def extract_contract_code_and_date(row):
- """
- 从标的列提取合约编号,从日期列提取日期,从委托时间计算实际交易日,从成交价列提取成交价,从交易类型提取开仓方向
-
- 参数:
- row (pandas.Series): DataFrame的一行数据
-
- 返回:
- tuple: (contract_code, actual_trade_date, trade_price, direction, order_time) 或 (None, None, None, None, None) 如果提取失败
- """
- try:
- # 提取合约编号:从"标的"列中提取括号内的内容
- target_str = str(row['标的'])
- match = re.search(r'\(([^)]+)\)', target_str)
- if match:
- contract_code = match.group(1)
- else:
- print(f"无法从标的 '{target_str}' 中提取合约编号")
- return None, None, None
-
- # 提取日期,支持多种日期格式
- date_str = str(row['日期']).strip()
-
- # 尝试多种日期格式
- date_formats = [
- '%Y-%m-%d', # 2025-01-02
- '%d/%m/%Y', # 14/10/2025
- '%Y/%m/%d', # 2025/01/02
- '%d-%m-%Y', # 14-10-2025
- '%Y%m%d', # 20250102
- ]
-
- base_date = None
- for date_format in date_formats:
- try:
- base_date = datetime.strptime(date_str, date_format).date()
- break
- except ValueError:
- continue
-
- if base_date is None:
- print(f"日期格式错误: {date_str} (支持的格式: YYYY-MM-DD, DD/MM/YYYY, YYYY/MM/DD, DD-MM-YYYY, YYYYMMDD)")
- return None, None, None
-
- # 提取委托时间,判断是否是晚上(>=21:00)
- order_time_str = str(row['委托时间']).strip()
- try:
- # 解析时间格式 HH:MM:SS 或 HH:MM
- time_parts = order_time_str.split(':')
- hour = int(time_parts[0])
-
- # 如果委托时间 >= 21:00,需要找到下一个交易日
- if hour >= 21:
- # 使用get_trade_days获取从base_date开始的交易日
- # count=2 表示获取包括base_date在内的2个交易日
- # 如果base_date是交易日,则返回[base_date, next_trade_day]
- # 如果base_date不是交易日,则返回下一个交易日和再下一个交易日
- try:
- trade_days = get_trade_days(start_date=base_date, count=2)
- # print(f"trade_days: {trade_days}")
- if len(trade_days) >= 2:
- # 取第二个交易日(索引1)作为实际交易日
- next_trade_day = trade_days[1]
- if isinstance(next_trade_day, datetime):
- actual_trade_date = next_trade_day.date()
- elif isinstance(next_trade_day, date):
- actual_trade_date = next_trade_day
- else:
- # 如果类型不对,尝试转换
- actual_trade_date = base_date
- print(f"警告:获取的交易日类型异常: {type(next_trade_day)}")
- elif len(trade_days) == 1:
- # 如果只返回1个交易日,说明base_date就是交易日,但已经是最后一个交易日
- # 这种情况应该取下一个交易日,但可能超出了数据范围
- first_day = trade_days[0]
- if isinstance(first_day, datetime):
- actual_trade_date = first_day.date()
- elif isinstance(first_day, date):
- actual_trade_date = first_day
- else:
- actual_trade_date = base_date
- print(f"警告:只获取到1个交易日,可能已到数据边界")
- else:
- # 如果获取失败,使用base_date
- actual_trade_date = base_date
- print(f"警告:无法获取下一个交易日,使用原始日期")
- except Exception as e:
- # 如果获取交易日失败,使用base_date
- actual_trade_date = base_date
- print(f"获取交易日时出错: {str(e)},使用原始日期")
- else:
- # 委托时间 < 21:00,使用原始日期
- actual_trade_date = base_date
- except Exception as e:
- # 如果解析时间失败,使用原始日期
- print(f"解析委托时间失败: {order_time_str}, 使用原始日期")
- actual_trade_date = base_date
-
- # print(f"成交日期:{date_str},委托时间:{order_time_str},实际交易日:{actual_trade_date}")
-
- # 提取成交价
- try:
- trade_price = float(row['成交价'])
- except (ValueError, KeyError):
- print(f"无法提取成交价: {row.get('成交价', 'N/A')}")
- return None, None, None, None, None
-
- # 提取开仓方向:从"交易类型"列提取,开多是'long',开空是'short'
- try:
- trade_type = str(row['交易类型']).strip()
- if '开多' in trade_type or '多' in trade_type:
- direction = 'long'
- elif '开空' in trade_type or '空' in trade_type:
- direction = 'short'
- else:
- print(f"无法识别交易方向: {trade_type}")
- direction = 'unknown'
- except (KeyError, ValueError):
- print(f"无法提取交易类型")
- direction = 'unknown'
-
- return contract_code, actual_trade_date, trade_price, direction, order_time_str
- except Exception as e:
- print(f"提取合约编号和日期时出错: {str(e)}")
- return None, None, None, None, None
- def calculate_trade_days_range(trade_date, days_before=100, days_after=10):
- """
- 计算交易日范围:往前days_before个交易日,往后days_after个交易日
-
- 参数:
- trade_date (date): 开仓日期
- days_before (int): 往前交易日数量,默认100
- days_after (int): 往后交易日数量,默认10
-
- 返回:
- tuple: (start_date, end_date) 或 (None, None) 如果计算失败
- """
- try:
- # 往前找:从trade_date往前找days_before个交易日
- # get_trade_days(end_date=trade_date, count=n) 返回包括trade_date在内的n个交易日
- # 所以需要count=days_before+1,第一个就是days_before个交易日前的日期
- trade_days_before = get_trade_days(end_date=trade_date, count=days_before + 1)
- if len(trade_days_before) < days_before + 1:
- print(f"无法获取足够的往前交易日,只获取到 {len(trade_days_before)} 个")
- return None, None
-
- # 处理返回的日期对象:可能是date或datetime类型
- first_day = trade_days_before[0]
- if isinstance(first_day, datetime):
- start_date = first_day.date()
- elif isinstance(first_day, date):
- start_date = first_day
- else:
- start_date = first_day
-
- # 往后找:从trade_date往后找days_after个交易日
- # get_trade_days(start_date=trade_date, count=n) 返回包括trade_date在内的n个交易日
- # 所以需要count=days_after+1,最后一个就是days_after个交易日后的日期
- trade_days_after = get_trade_days(start_date=trade_date, count=days_after + 1)
- if len(trade_days_after) < days_after + 1:
- print(f"无法获取足够的往后交易日,只获取到 {len(trade_days_after)} 个")
- return None, None
-
- # 处理返回的日期对象:可能是date或datetime类型
- last_day = trade_days_after[-1]
- if isinstance(last_day, datetime):
- end_date = last_day.date()
- elif isinstance(last_day, date):
- end_date = last_day
- else:
- end_date = last_day
-
- return start_date, end_date
- except Exception as e:
- print(f"计算交易日范围时出错: {str(e)}")
- return None, None
- def get_kline_data(contract_code, start_date, end_date):
- """
- 获取指定合约在时间范围内的K线数据
-
- 参数:
- contract_code (str): 合约编号,如 'JD2502.XDCE'
- start_date (date): 开始日期
- end_date (date): 结束日期
-
- 返回:
- pandas.DataFrame: 包含OHLC数据的DataFrame,如果获取失败返回None
- """
- try:
- # 使用get_price获取K线数据
- price_data = get_price(
- contract_code,
- start_date=start_date,
- end_date=end_date,
- frequency='1d',
- fields=['open', 'close', 'high', 'low']
- )
-
- if price_data is None or len(price_data) == 0:
- print(f"未获取到 {contract_code} 在 {start_date} 至 {end_date} 的数据")
- return None
-
- return price_data
- except Exception as e:
- print(f"获取K线数据时出错: {str(e)}")
- return None
- def calculate_moving_averages(data):
- """
- 计算5K, 10K, 20K, 30K均线
-
- 参数:
- data (pandas.DataFrame): 包含close列的DataFrame
-
- 返回:
- pandas.DataFrame: 添加了均线列的DataFrame
- """
- data = data.copy()
-
- # 计算均线
- data['ma5'] = data['close'].rolling(window=5).mean()
- data['ma10'] = data['close'].rolling(window=10).mean()
- data['ma20'] = data['close'].rolling(window=20).mean()
- data['ma30'] = data['close'].rolling(window=30).mean()
-
- return data
- def filter_data_with_ma(data):
- """
- 过滤掉任何一条均线为空的日期
-
- 参数:
- data (pandas.DataFrame): 包含均线列的DataFrame
-
- 返回:
- pandas.DataFrame: 过滤后的DataFrame
- """
- # 过滤掉任何一条均线为空的日期
- filtered_data = data.dropna(subset=['ma5', 'ma10', 'ma20', 'ma30'])
-
- return filtered_data
- def plot_kline_chart(data, contract_code, trade_date, trade_price, direction, order_time, save_path):
- """
- 绘制K线图(包含均线和开仓日期、成交价、方向、委托时间标注)
-
- 参数:
- data (pandas.DataFrame): 包含OHLC和均线数据的DataFrame
- contract_code (str): 合约编号
- trade_date (date): 实际交易日
- trade_price (float): 成交价
- direction (str): 开仓方向,'long'或'short'
- order_time (str): 委托时间
- save_path (str): 保存路径
- """
- try:
- # 创建图表
- fig, ax = plt.subplots(figsize=(16, 10))
-
- # 准备数据
- dates = data.index
- opens = data['open']
- highs = data['high']
- lows = data['low']
- closes = data['close']
-
- # 调试:打印数据结构信息(仅第一次调用时打印)
- if not hasattr(plot_kline_chart, '_debug_printed'):
- print(f"\n=== K线数据索引类型调试信息 ===")
- print(f"索引类型: {type(dates)}")
- print(f"索引数据类型: {type(dates[0]) if len(dates) > 0 else 'N/A'}")
- print(f"前3个索引值: {[dates[i] for i in range(min(3, len(dates)))]}")
- print(f"索引是否为DatetimeIndex: {isinstance(dates, pd.DatetimeIndex)}")
- print(f"================================\n")
- plot_kline_chart._debug_printed = True
-
- # 统一转换为date类型进行比较
- trade_date_normalized = trade_date
- if isinstance(trade_date, datetime):
- trade_date_normalized = trade_date.date()
- elif isinstance(trade_date, pd.Timestamp):
- trade_date_normalized = trade_date.date()
- elif not isinstance(trade_date, date):
- try:
- trade_date_normalized = pd.to_datetime(trade_date).date()
- except:
- pass
-
- # 找到开仓日期在数据中的位置
- trade_date_idx = None
-
- # 如果索引是DatetimeIndex,直接使用date比较
- if isinstance(dates, pd.DatetimeIndex):
- # 将DatetimeIndex转换为date进行比较
- try:
- # 使用normalize()将时间部分去掉,然后比较date
- trade_date_normalized_dt = pd.Timestamp(trade_date_normalized)
- # 查找匹配的日期
- mask = dates.normalize() == trade_date_normalized_dt
- if mask.any():
- trade_date_idx = mask.argmax()
- except Exception as e:
- print(f"使用DatetimeIndex匹配时出错: {e}")
-
- # 如果还没找到,使用循环方式查找
- if trade_date_idx is None:
- for i, date_idx in enumerate(dates):
- date_to_compare = None
- # 处理pandas Timestamp类型
- if isinstance(date_idx, pd.Timestamp):
- date_to_compare = date_idx.date()
- elif isinstance(date_idx, datetime):
- date_to_compare = date_idx.date()
- elif isinstance(date_idx, date):
- date_to_compare = date_idx
- elif hasattr(date_idx, 'date'):
- try:
- date_to_compare = date_idx.date()
- except:
- pass
-
- # 比较日期
- if date_to_compare is not None and date_to_compare == trade_date_normalized:
- trade_date_idx = i
- break
-
- # 如果还是没找到,尝试查找最接近的日期(前后各找1天)
- if trade_date_idx is None:
- print(f"警告:未找到精确匹配的交易日 {trade_date_normalized}")
- print(f" 尝试查找前后1天的日期...")
- for offset in [-1, 1]:
- try:
- target_date = trade_date_normalized + timedelta(days=offset)
- for i, date_idx in enumerate(dates):
- date_to_compare = None
- if isinstance(date_idx, pd.Timestamp):
- date_to_compare = date_idx.date()
- elif isinstance(date_idx, datetime):
- date_to_compare = date_idx.date()
- elif isinstance(date_idx, date):
- date_to_compare = date_idx
-
- if date_to_compare == target_date:
- trade_date_idx = i
- print(f" 找到最接近的日期 {target_date} (偏移{offset}天) 在索引 {i}")
- break
- if trade_date_idx is not None:
- break
- except:
- pass
-
- # 绘制K线
- for i in range(len(data)):
- date_idx = dates[i]
- open_price = opens.iloc[i]
- high_price = highs.iloc[i]
- low_price = lows.iloc[i]
- close_price = closes.iloc[i]
-
- # K线颜色:红涨绿跌
- color = 'red' if close_price > open_price else 'green'
- edge_color = 'darkred' if close_price > open_price else 'darkgreen'
-
- # 绘制影线(最高价到最低价的竖线)
- ax.plot([i, i], [low_price, high_price], color='black', linewidth=1)
-
- # 绘制实体(开盘价到收盘价的矩形)
- body_height = abs(close_price - open_price)
- if body_height == 0:
- body_height = 0.01 # 避免高度为0
- bottom = min(open_price, close_price)
-
- # 使用矩形绘制K线实体
- rect = patches.Rectangle((i-0.4, bottom), 0.8, body_height,
- linewidth=1, edgecolor=edge_color,
- facecolor=color, alpha=0.8)
- ax.add_patch(rect)
-
- # 绘制均线
- ax.plot(range(len(data)), data['ma5'], label='MA5', color='blue', linewidth=1.5, alpha=0.8)
- ax.plot(range(len(data)), data['ma10'], label='MA10', color='orange', linewidth=1.5, alpha=0.8)
- ax.plot(range(len(data)), data['ma20'], label='MA20', color='purple', linewidth=1.5, alpha=0.8)
- ax.plot(range(len(data)), data['ma30'], label='MA30', color='brown', linewidth=1.5, alpha=0.8)
-
- # 标注开仓日期位置和成交价
- if trade_date_idx is not None:
- # 绘制标记点(使用成交价)
- ax.plot(trade_date_idx, trade_price, marker='*', markersize=20,
- color='yellow', markeredgecolor='black', markeredgewidth=2,
- label='Open Position', zorder=10)
- # 添加垂直线
- ax.axvline(x=trade_date_idx, color='yellow', linestyle='--',
- linewidth=2, alpha=0.7, zorder=5)
-
- # 标注日期、成交价、方向和委托时间文本
- date_label = trade_date.strftime('%Y-%m-%d')
- price_label = f'Price: {trade_price:.2f}'
- direction_label = f'Direction: {direction}'
- time_label = f'Time: {order_time}'
-
- # 计算文本位置(在标记点上方,确保可见)
- price_range = highs.max() - lows.min()
- y_offset = max(price_range * 0.08, (highs.max() - trade_price) * 0.3) # 至少8%的价格范围,或30%的上方空间
- text_y = trade_price + y_offset
-
- # 如果文本位置超出图表范围,放在标记点下方
- if text_y > highs.max():
- text_y = trade_price - price_range * 0.08
-
- # 添加文本标注(包含所有信息)
- annotation_text = f'{date_label}\n{price_label}\n{direction_label}\n{time_label}'
- ax.text(trade_date_idx, text_y, annotation_text,
- fontsize=10, ha='center', va='bottom',
- bbox=dict(boxstyle='round,pad=0.6', facecolor='yellow', alpha=0.9, edgecolor='black', linewidth=1.5),
- zorder=11, weight='bold')
- else:
- # 即使没找到精确日期,也尝试标注(使用最接近的日期)
- print(f"警告:交易日 {trade_date} 不在K线数据范围内,无法标注")
-
- # 设置图表标题和标签(使用英文)
- contract_simple = contract_code.split('.')[0] # 提取合约编号的简约部分
- ax.set_title(f'{contract_simple} ({contract_code}) K-Line Chart\n'
- f'Period: {dates[0].strftime("%Y-%m-%d")} to {dates[-1].strftime("%Y-%m-%d")} '
- f'({len(data)} bars)',
- fontsize=14, fontweight='bold', pad=20)
-
- ax.set_xlabel('Time', fontsize=12)
- ax.set_ylabel('Price', fontsize=12)
- ax.grid(True, alpha=0.3)
- ax.legend(loc='lower left', fontsize=10)
-
- # 设置x轴标签
- step = max(1, len(data) // 10) # 显示约10个时间标签
- tick_positions = range(0, len(data), step)
- tick_labels = []
- for pos in tick_positions:
- date_val = dates[pos]
- if isinstance(date_val, date):
- tick_labels.append(date_val.strftime('%Y-%m-%d'))
- elif isinstance(date_val, datetime):
- tick_labels.append(date_val.strftime('%Y-%m-%d'))
- else:
- tick_labels.append(str(date_val))
-
- ax.set_xticks(tick_positions)
- ax.set_xticklabels(tick_labels, rotation=45, ha='right')
-
- # 添加统计信息(使用英文)
- max_price = highs.max()
- min_price = lows.min()
- latest_close = closes.iloc[-1]
- first_close = closes.iloc[0]
- total_change = (latest_close - first_close) / first_close * 100
-
- stats_text = (f'High: {max_price:.2f}\n'
- f'Low: {min_price:.2f}\n'
- f'Latest Close: {latest_close:.2f}\n'
- f'Total Change: {total_change:+.2f}%')
-
- ax.text(0.02, 0.98, stats_text, transform=ax.transAxes,
- verticalalignment='top', bbox=dict(boxstyle='round',
- facecolor='wheat', alpha=0.8), fontsize=10)
-
- # 调整布局并保存
- plt.tight_layout()
- plt.savefig(save_path, dpi=150, bbox_inches='tight')
- # print(f"K线图已保存到: {save_path}")
- plt.close(fig)
-
- except Exception as e:
- print(f"绘制K线图时出错: {str(e)}")
- # 确保即使出错也关闭图形
- try:
- plt.close('all')
- except:
- pass
- raise
- def create_zip_archive(directory_path, zip_filename=None):
- """
- 将指定目录打包成zip文件
-
- 参数:
- directory_path (str): 要打包的目录路径
- zip_filename (str): zip文件名,如果为None则自动生成
-
- 返回:
- str: zip文件路径
- """
- if not os.path.exists(directory_path):
- print(f"目录不存在: {directory_path}")
- return None
-
- if zip_filename is None:
- # 自动生成zip文件名
- timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
- dir_name = os.path.basename(os.path.normpath(directory_path))
- zip_filename = f"{dir_name}_{timestamp}.zip"
- # 保存在目录的父目录中
- zip_path = os.path.join(os.path.dirname(directory_path), zip_filename)
- else:
- zip_path = zip_filename
-
- try:
- print(f"\n=== 开始打包目录: {directory_path} ===")
- file_count = 0
- with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
- # 遍历目录中的所有文件
- for root, dirs, files in os.walk(directory_path):
- for file in files:
- file_path = os.path.join(root, file)
- # 计算相对路径:相对于要打包的目录
- arcname = os.path.relpath(file_path, directory_path)
- zipf.write(file_path, arcname)
- file_count += 1
-
- # 获取zip文件大小
- zip_size = os.path.getsize(zip_path) / (1024 * 1024) # MB
- print(f"✓ 打包完成: {zip_path}")
- print(f" 包含文件数: {file_count} 个")
- print(f" 文件大小: {zip_size:.2f} MB")
-
- return zip_path
- except Exception as e:
- print(f"✗ 打包时出错: {str(e)}")
- return None
- def reconstruct_kline_from_transactions(csv_filename=None, output_dir=None):
- """
- 主函数:从交易记录中复原K线图
-
- 参数:
- csv_filename (str): CSV文件名,如果为None则需要在代码中设置文件名
- output_dir (str): 输出目录,如果为None则自动设置为当前目录的K子目录
- """
- # ========== 路径配置:只需在这里设置CSV文件名 ==========
- if csv_filename is None:
- # 设置CSV文件名(只需修改文件名,不需要完整路径)
- csv_filename = 'transaction4.csv'
- # ====================================================
-
- # 获取当前目录并拼接CSV文件路径
- current_dir = _get_current_directory()
- csv_path = os.path.join(current_dir, csv_filename)
-
- # 自动设置输出目录
- if output_dir is None:
- output_dir = os.path.join(current_dir, 'K')
-
- # 确保输出目录存在
- os.makedirs(output_dir, exist_ok=True)
- print(f"输出目录: {output_dir}")
-
- # 1. 读取和筛选开仓记录
- print("\n=== 步骤1: 读取和筛选开仓记录 ===")
- open_positions = read_and_filter_open_positions(csv_path)
-
- if len(open_positions) == 0:
- print("未找到开仓记录,退出")
- return
-
- # 2. 处理每条开仓记录
- print(f"\n=== 步骤2: 处理 {len(open_positions)} 条开仓记录 ===")
- success_count = 0
- fail_count = 0
-
- for idx, row in tqdm(open_positions.iterrows(), total=len(open_positions), desc="处理开仓记录"):
- # print(f"\n--- 处理第 {idx + 1}/{len(open_positions)} 条记录 ---")
-
- try:
- # 提取合约编号、实际交易日、成交价、开仓方向和委托时间
- contract_code, actual_trade_date, trade_price, direction, order_time = extract_contract_code_and_date(row)
- if contract_code is None or actual_trade_date is None or trade_price is None or direction is None or order_time is None:
- print(f"跳过:无法提取完整信息(合约编号、日期、成交价、方向或委托时间)")
- fail_count += 1
- continue
-
- # 计算交易日范围
- start_date, end_date = calculate_trade_days_range(actual_trade_date, days_before=100, days_after=10)
- if start_date is None or end_date is None:
- print(f"跳过:无法计算交易日范围")
- fail_count += 1
- continue
-
- # 获取K线数据
- kline_data = get_kline_data(contract_code, start_date, end_date)
- if kline_data is None or len(kline_data) == 0:
- print(f"跳过:无法获取K线数据")
- fail_count += 1
- continue
-
- # 计算均线
- kline_data = calculate_moving_averages(kline_data)
-
- # 过滤数据
- filtered_data = filter_data_with_ma(kline_data)
- if len(filtered_data) == 0:
- print(f"跳过:过滤后无有效数据")
- fail_count += 1
- continue
-
- # 生成文件名
- contract_simple = contract_code.split('.')[0] # 提取合约编号的简约部分
- filename = f"{contract_simple}_{actual_trade_date.strftime('%Y%m%d')}.png"
- save_path = os.path.join(output_dir, filename)
-
- # 绘制K线图(传入实际交易日和成交价)
- plot_kline_chart(filtered_data, contract_code, actual_trade_date, trade_price, direction, order_time, save_path)
-
- success_count += 1
-
- except Exception as e:
- print(f"✗ 处理时出错: {str(e)}")
- fail_count += 1
- continue
-
- # 输出统计信息
- print(f"\n=== 处理完成 ===")
- print(f"成功: {success_count} 条")
- print(f"失败: {fail_count} 条")
- print(f"总计: {success_count + fail_count} 条")
-
- # 3. 打包图片目录
- if success_count > 0:
- print(f"\n=== 步骤3: 打包图片目录 ===")
- zip_path = create_zip_archive(output_dir)
- if zip_path:
- print(f"✓ 打包文件已保存: {zip_path}")
- else:
- print(f"✗ 打包失败")
- else:
- print(f"\n未生成任何图片,跳过打包步骤")
- # 使用示例
- if __name__ == "__main__":
- print("=" * 60)
- print("K线复原工具")
- print("=" * 60)
-
- reconstruct_kline_from_transactions()
-
- print("\n=== 完成 ===")
|