Bladeren bron

增加了一个获取期货标的历史主力合约时间的dominant_contract_tracker.py

maxfeng 3 maanden geleden
bovenliggende
commit
7d74161c49
1 gewijzigde bestanden met toevoegingen van 251 en 0 verwijderingen
  1. 251 0
      Lib/future/dominant_contract_tracker.py

+ 251 - 0
Lib/future/dominant_contract_tracker.py

@@ -0,0 +1,251 @@
+# 主力合约跟踪工具
+# 用于获取指定时间范围内某个或某些标的的主力合约变化信息
+
+from jqdata import *
+import pandas as pd
+from datetime import datetime, timedelta
+import warnings
+warnings.filterwarnings('ignore')
+
+def get_dominant_contracts_history(symbol_list, time_range):
+    """
+    获取指定时间范围内某个或某些标的的主力合约历史变化
+    
+    参数:
+        symbol_list (list): 标的编码列表,如 ['FU', 'BU']
+        time_range (list): 时间范围,格式为 ['20240101', '20250101']
+    
+    返回:
+        pandas.DataFrame: 包含以下列的数据框
+            - symbol: 原始标的编码
+            - dominant_contract: 主力合约完整编码
+            - start_date: 主力合约开始日期
+            - end_date: 主力合约结束日期
+    """
+    
+    # 验证输入参数
+    if not isinstance(symbol_list, list) or len(symbol_list) == 0:
+        raise ValueError("symbol_list必须是非空列表")
+    
+    if not isinstance(time_range, list) or len(time_range) != 2:
+        raise ValueError("time_range必须是包含两个日期的列表")
+    
+    # 转换日期格式
+    try:
+        start_date = datetime.strptime(time_range[0], '%Y%m%d').date()
+        end_date = datetime.strptime(time_range[1], '%Y%m%d').date()
+    except ValueError:
+        raise ValueError("日期格式错误,请使用YYYYMMDD格式")
+    
+    if start_date >= end_date:
+        raise ValueError("开始日期必须早于结束日期")
+    
+    print(f"开始获取主力合约数据...")
+    print(f"标的列表: {symbol_list}")
+    print(f"时间范围: {start_date} 至 {end_date}")
+    
+    # 存储所有结果的列表
+    all_results = []
+    
+    # 逐个处理每个标的
+    for symbol in symbol_list:
+        print(f"\n处理标的: {symbol}")
+        
+        try:
+            # 获取该标的在指定时间范围内的主力合约历史
+            symbol_results = _get_symbol_dominant_history(symbol, start_date, end_date)
+            all_results.extend(symbol_results)
+            
+        except Exception as e:
+            print(f"处理标的 {symbol} 时发生错误: {str(e)}")
+            continue
+    
+    # 将结果转换为DataFrame
+    if all_results:
+        result_df = pd.DataFrame(all_results)
+        result_df = result_df.sort_values(['symbol', 'start_date']).reset_index(drop=True)
+        
+        print(f"\n成功获取 {len(result_df)} 条主力合约记录")
+        return result_df
+    else:
+        print("\n未获取到任何主力合约数据")
+        return pd.DataFrame(columns=['symbol', 'dominant_contract', 'start_date', 'end_date'])
+
+
+def _get_symbol_dominant_history(symbol, start_date, end_date):
+    """
+    获取单个标的的主力合约历史变化
+    
+    参数:
+        symbol (str): 标的编码
+        start_date (date): 开始日期
+        end_date (date): 结束日期
+    
+    返回:
+        list: 主力合约记录列表
+    """
+    
+    results = []
+    current_date = start_date
+    current_dominant = None
+    period_start = None
+    
+    # 按日遍历时间范围
+    while current_date <= end_date:
+        
+        # 跳过非交易日
+        if not _is_trading_day(current_date):
+            current_date += timedelta(days=1)
+            continue
+        
+        try:
+            # 获取当日主力合约
+            dominant_contract = get_dominant_future(symbol, current_date)
+            
+            if dominant_contract is None:
+                current_date += timedelta(days=1)
+                continue
+                
+            # 如果是新的主力合约
+            if dominant_contract != current_dominant:
+                
+                # 如果不是第一个合约,结束上一个合约的记录
+                if current_dominant is not None and period_start is not None:
+                    results.append({
+                        'symbol': symbol,
+                        'dominant_contract': current_dominant,
+                        'start_date': period_start,
+                        'end_date': current_date - timedelta(days=1)
+                    })
+                
+                # 开始新合约的记录
+                current_dominant = dominant_contract
+                period_start = current_date
+                
+                print(f"  {current_date}: 主力合约变更为 {dominant_contract}")
+            
+        except Exception as e:
+            print(f"  获取 {current_date} 的主力合约时出错: {str(e)}")
+        
+        current_date += timedelta(days=1)
+    
+    # 处理最后一个合约
+    if current_dominant is not None and period_start is not None:
+        results.append({
+            'symbol': symbol,
+            'dominant_contract': current_dominant,
+            'start_date': period_start,
+            'end_date': end_date
+        })
+    
+    return results
+
+
+def _is_trading_day(check_date):
+    """
+    检查是否为交易日
+    
+    参数:
+        check_date (date): 要检查的日期
+    
+    返回:
+        bool: 是否为交易日
+    """
+    try:
+        # 使用聚宽API检查交易日
+        trade_days = get_trade_days(end_date=check_date, count=1)
+        if len(trade_days) > 0:
+            return trade_days[0].date() == check_date
+        return False
+    except:
+        # 如果API调用失败,简单判断是否为工作日
+        return check_date.weekday() < 5
+
+
+def get_dominant_contracts_summary(symbol_list, time_range):
+    """
+    获取主力合约变化的汇总信息
+    
+    参数:
+        symbol_list (list): 标的编码列表
+        time_range (list): 时间范围
+    
+    返回:
+        dict: 汇总信息
+    """
+    
+    df = get_dominant_contracts_history(symbol_list, time_range)
+    
+    if df.empty:
+        return {}
+    
+    summary = {}
+    
+    for symbol in symbol_list:
+        symbol_data = df[df['symbol'] == symbol]
+        
+        if not symbol_data.empty:
+            summary[symbol] = {
+                '主力合约数量': len(symbol_data),
+                '合约列表': symbol_data['dominant_contract'].tolist(),
+                '平均持续天数': (symbol_data['end_date'] - symbol_data['start_date']).dt.days.mean()
+            }
+        else:
+            summary[symbol] = {
+                '主力合约数量': 0,
+                '合约列表': [],
+                '平均持续天数': 0
+            }
+    
+    return summary
+
+
+def export_to_csv(symbol_list, time_range, filename=None):
+    """
+    导出主力合约数据到CSV文件
+    
+    参数:
+        symbol_list (list): 标的编码列表
+        time_range (list): 时间范围
+        filename (str): 导出文件名,如果为None则自动生成
+    
+    返回:
+        str: 导出的文件路径
+    """
+    
+    df = get_dominant_contracts_history(symbol_list, time_range)
+    
+    if filename is None:
+        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
+        symbols_str = '_'.join(symbol_list)
+        filename = f"dominant_contracts_{symbols_str}_{time_range[0]}_{time_range[1]}_{timestamp}.csv"
+    
+    df.to_csv(filename, index=False, encoding='utf-8-sig')
+    print(f"数据已导出到: {filename}")
+    
+    return filename
+
+
+# 使用示例
+if __name__ == "__main__":
+    
+    # 示例1: 获取基本数据
+    symbol_list = ['FU', 'BU']
+    time_range = ['20240101', '20250101']
+    
+    print("=== 获取主力合约历史数据 ===")
+    df = get_dominant_contracts_history(symbol_list, time_range)
+    print("\n结果预览:")
+    print(df.head(10))
+    
+    print("\n=== 获取汇总信息 ===")
+    summary = get_dominant_contracts_summary(symbol_list, time_range)
+    for symbol, info in summary.items():
+        print(f"\n{symbol}:")
+        for key, value in info.items():
+            print(f"  {key}: {value}")
+    
+    print("\n=== 导出数据到CSV ===")
+    csv_file = export_to_csv(symbol_list, time_range)
+    
+    print("\n=== 完成 ===")