import pandas as pd import numpy as np from jqdata import * import datetime import matplotlib.pyplot as plt def get_all_key_info(the_type='futures'): """获取所有期货主力合约信息并进行日期过滤""" main = get_all_securities(types=[the_type]).reset_index() the_main = main[main.display_name.str.endswith('主力合约')].copy() # 使用copy()避免警告 the_main.rename(columns={'index': 'code'}, inplace=True) # 将start_date转换为datetime格式 the_main['start_date'] = pd.to_datetime(the_main['start_date']) return the_main def get_future_data_with_ma(future_code, start_date, end_date): """获取单个期货合约的价格数据并计算多条移动平均线""" try: # if future_code != "A9999.XDCE": # print(f"跳过: {future_code}的数据") # return None # else: print(f"获取期货: {future_code}的数据") data = get_price(future_code, start_date=start_date, end_date=end_date, frequency='daily', fields=['open', 'close', 'high', 'low', 'volume'], skip_paused=False, panel=False) if data is None or len(data) == 0: return None # 重置索引,使日期成为一列,避免复制警告 data = data.reset_index() # 计算多条移动平均线 data['MA5'] = data['close'].rolling(window=5).mean() data['MA10'] = data['close'].rolling(window=10).mean() data['MA20'] = data['close'].rolling(window=20).mean() data['MA30'] = data['close'].rolling(window=30).mean() data.to_csv("A9999.csv", index=False, encoding='utf-8-sig') return data except Exception as e: print(f"获取{future_code}数据时出错: {str(e)}") return None def check_multi_ma_cross(row): """ 检查单日K线是否向上或向下穿越了至少3条均线。 这个新版本精确计算被K线实体穿越的均线数量。 返回: 'up', 'down', 或 None """ date = row['index'] open_price = row['open'] close_price = row['close'] ma_values = [row['MA5'], row['MA10'], row['MA20'], row['MA30']] # print(f"date: {date}, open_price: {open_price}, close_price: {close_price}") # print(f"ma_values: {ma_values}") # 如果任何均线为NaN则跳过,确保数据完整性 if pd.isna(row['MA5']) or pd.isna(row['MA10']) or pd.isna(row['MA20']) or pd.isna(row['MA30']): # print(f"date: {date}, MA5: {row['MA5']}, MA10: {row['MA10']}, MA20: {row['MA20']}, MA30: {row['MA30']}") # print("ma里有NaN跳过") return None # 如果开盘价和收盘价相等,不可能有穿越发生 if open_price == close_price: # print("开盘价和收盘价相等,跳过") return None crossed_mas_count = 0 # print("开始检查穿越情况") # 情况一:上涨(阳线),检查上穿 if close_price > open_price: # print(f"收盘价大于开盘价: open_price: {open_price}, close_price: {close_price}") for ma in ma_values: # print(f"检查{ma}和收盘价和开盘价的关系") # 精确判断:开盘价在均线下方,且收盘价在均线上方 if open_price < ma and close_price > ma: crossed_mas_count += 1 if crossed_mas_count >= 3: return 'up' # 情况二:下跌(阴线),检查下穿 elif open_price > close_price: for ma in ma_values: # 精确判断:开盘价在均线上方,且收盘价在均线下方 if open_price > ma and close_price < ma: crossed_mas_count += 1 if crossed_mas_count >= 3: return 'down' # 如果以上条件都不满足(包括穿越数量不足),则返回None # print("以上情况均不满足,跳过") return None def analyze_multi_ma_crosses(data, future_code, future_name): """分析多均线穿越并计算未来收益率""" if data is None or len(data) < 30: # 需要至少30天数据来计算30日均线 return pd.DataFrame() results = [] for i in range(len(data)): row = data.iloc[i] cross_type = check_multi_ma_cross(row) if cross_type is not None: # 使用包含日期的'index'列 cross_date = row['index'] open_price = row['open'] close_price = row['close'] # 计算当日收益率(收盘价和开盘价的变化率) intraday_return = (close_price - open_price) / open_price * 100 # 计算未来收益率 future_5d_return = None future_20d_return = None future_30d_return = None # 后5日收益率 if i + 5 < len(data): future_5d_price = data.iloc[i + 5]['close'] future_5d_return = (future_5d_price - close_price) / close_price * 100 # 后10日收益率 if i + 10 < len(data): future_10d_price = data.iloc[i + 10]['close'] future_10d_return = (future_5d_price - close_price) / close_price * 100 # 后20日收益率 if i + 20 < len(data): future_20d_price = data.iloc[i + 20]['close'] future_20d_return = (future_20d_price - close_price) / close_price * 100 # 后30日收益率 if i + 30 < len(data): future_30d_price = data.iloc[i + 30]['close'] future_30d_return = (future_30d_price - close_price) / close_price * 100 # 【【【 这是关键的修复点 】】】 # 从 'row' 中获取当天的均线值,而不是从 'data' 中获取整个列 results.append({ '代码': future_code, '名称': future_name, '日期': cross_date, '方向': '上穿' if cross_type == 'up' else '下穿', '开盘价': round(open_price, 2), '收盘价': round(close_price, 2), 'MA5': round(row['MA5'], 2), 'MA10': round(row['MA10'], 2), 'MA20': round(row['MA20'], 2), 'MA30': round(row['MA30'], 2), '当日变化率': round(intraday_return, 2), '后5日变化率': round(future_5d_return, 2) if future_5d_return is not None else None, '后10日变化率': round(future_10d_return, 2) if future_10d_return is not None else None, '后20日变化率': round(future_20d_return, 2) if future_20d_return is not None else None, '后30日变化率': round(future_30d_return, 2) if future_30d_return is not None else None }) # 将结果转换为DataFrame并确保列的顺序 result_df = pd.DataFrame(results, columns=[ '日期', '代码', '名称', '开盘价', '收盘价', '当日变化率', '方向', 'MA5', 'MA10', 'MA20', 'MA30', '后5日变化率', '后10日变化率', '后20日变化率', '后30日变化率' ]) return result_df def analyze_all_futures_multi_ma(start_date, end_date): """分析所有期货合约的多均线穿越情况""" all_future_df = get_all_key_info() # 过滤在分析结束日期之前有数据且在分析结束日期之前开始交易的期货 valid_futures = all_future_df[ (all_future_df['start_date'] <= pd.to_datetime(end_date)) ].copy() print(f"找到{len(valid_futures)}个期货合约需要分析") # 创建代码到名称的映射 code_to_name = dict(zip(valid_futures['code'], valid_futures['display_name'])) all_results = [] total_futures = len(valid_futures) for idx, row in valid_futures.iterrows(): future = row['code'] future_start_date = row['start_date'] # 调整开始日期为分析开始日期或期货开始日期中的较晚者 effective_start_date = max(pd.to_datetime(start_date), future_start_date) print(f'正在分析 {future} ({idx+1}/{total_futures}) 从 {effective_start_date.strftime("%Y-%m-%d")} 开始...') try: data = get_future_data_with_ma(future, effective_start_date, end_date) if data is not None and len(data) >= 30: future_name = code_to_name.get(future, future) results = analyze_multi_ma_crosses(data, future, future_name) if not results.empty: all_results.append(results) print(f' 为{future}找到{len(results)}次多均线穿越') else: print(f' {future}未找到多均线穿越') else: print(f' {future}数据不足 (获得{len(data) if data is not None else 0}天数据)') except Exception as e: print(f' 分析{future}时出错: {str(e)}') continue if not all_results: print("未找到多均线穿越结果") return pd.DataFrame() combined_results = pd.concat(all_results, ignore_index=True) return combined_results def generate_summary_stats(results): """生成多均线穿越分析的汇总统计""" if results.empty: print("没有结果可供分析") return print(f"\n=== 多均线穿越分析结果汇总 ===") print(f"总共发现 {len(results)} 次多均线穿越事件") print(f"涉及 {results['代码'].nunique()} 个不同的期货品种") # 按穿越方向统计 cross_type_stats = results['方向'].value_counts() print(f"\n穿越方向分布:") for cross_type, count in cross_type_stats.items(): print(f" {cross_type}: {count} 次") # 收益率统计 print(f"\n收益率统计:") for col in ['当日变化率', '后5日变化率', '后20日变化率', '后30日变化率']: valid_data = results[col].dropna() if len(valid_data) > 0: print(f" {col}:") print(f" 平均值: {valid_data.mean():.2f}%") print(f" 中位数: {valid_data.median():.2f}%") print(f" 标准差: {valid_data.std():.2f}%") print(f" 最小值: {valid_data.min():.2f}%") print(f" 最大值: {valid_data.max():.2f}%") # 按品种统计 print(f"\n各品种穿越次数统计:") variety_stats = results.groupby('代码').size().sort_values(ascending=False) for code, count in variety_stats.head(10).items(): name = results[results['代码'] == code]['名称'].iloc[0] print(f" {code} ({name}): {count} 次") def main(): """运行多均线穿越分析的主函数""" # 设置分析期间 start_date = datetime.datetime(2024, 6, 1) end_date = datetime.datetime(2025, 6, 1) print(f"开始分析期货多均线穿越情况...") print(f"分析期间: {start_date.strftime('%Y-%m-%d')} 到 {end_date.strftime('%Y-%m-%d')}") # 分析所有期货 results = analyze_all_futures_multi_ma(start_date, end_date) if not results.empty: # 生成并显示统计信息 generate_summary_stats(results) # 导出结果到CSV文件 output_filename = 'multi_ma_cross_analysis_results.csv' results.to_csv(output_filename, index=False, encoding='utf-8-sig') print(f"\n结果已保存到文件: {output_filename}") # 显示前几行结果 print(f"\n前10条结果预览:") print(results.head(10).to_string(index=False)) return results else: print("未找到符合条件的多均线穿越事件") return pd.DataFrame() if __name__ == "__main__": results = main()