import pandas as pd import numpy as np from jqdata import * import datetime import matplotlib.pyplot as plt def get_all_key_info(the_type='futures'): """Get all futures main contracts information""" main = get_all_securities(types=[the_type]).reset_index() the_main = main[main.display_name.str.endswith('主力合约')] the_main.rename(columns={'index': 'code'}, inplace=True) all_future_list = the_main['code'].unique() return the_main, all_future_list def get_period_range(start_year, end_year): """Get trading period range between years""" now = datetime.datetime.now() start_date = datetime.datetime(start_year, 1, 1) end_date = datetime.datetime(end_year, 12, 31) if end_year == now.year and now < end_date: end_date = now trade_days = get_trade_days(start_date=start_date, end_date=end_date) actual_start_date = trade_days[0] actual_end_date = trade_days[-1] print(f'Analysis period: {actual_start_date} to {actual_end_date}') return actual_start_date, actual_end_date def get_future_data(future_code, start_date, end_date): """Get price data for a single future contract""" data = get_price(future_code, start_date=start_date, end_date=end_date, frequency='daily', fields=['open', 'close', 'high', 'low', 'volume'], skip_paused=False, panel=False) if data is None or len(data) == 0: return None # Create a copy to avoid SettingWithCopyWarning data = data.copy() # Calculate 5-day MA data['MA5'] = data['close'].rolling(window=5).mean() return data def analyze_ma_crosses(data): """Analyze MA crosses and calculate statistics""" if data is None or len(data) < 5: # Need at least 5 days for MA5 return pd.DataFrame() # Create a copy of the data to avoid warnings data = data.copy() # Initialize cross detection data['cross_up'] = (data['open'] < data['MA5']) & (data['close'] > data['MA5']) data['cross_down'] = (data['open'] > data['MA5']) & (data['close'] < data['MA5']) # Get indices where crosses occur cross_dates = data.index[data['cross_up'] | data['cross_down']].tolist() results = [] for i in range(len(cross_dates) - 1): current_date = cross_dates[i] next_date = cross_dates[i + 1] # Get cross type is_up_cross = data.loc[current_date, 'cross_up'] # Calculate trading days between crosses trading_days = len(data.loc[current_date:next_date].index) - 1 # Calculate price change start_price = data.loc[current_date, 'close'] end_price = data.loc[next_date, 'close'] price_change_pct = (end_price - start_price) / start_price * 100 results.append({ 'cross_date': current_date, 'next_cross_date': next_date, 'cross_type': 'Upward' if is_up_cross else 'Downward', 'trading_days': trading_days, 'price_change_pct': price_change_pct }) return pd.DataFrame(results) def analyze_all_futures(start_year, end_year): """Analyze MA crosses for all futures contracts""" start_date, end_date = get_period_range(start_year, end_year) all_future_df, all_future_list = get_all_key_info() all_results = [] for future in all_future_list: print(f'Analyzing {future}...') data = get_future_data(future, start_date, end_date) if data is not None and len(data) >= 5: results = analyze_ma_crosses(data) if not results.empty: results['future_code'] = future all_results.append(results) if not all_results: return pd.DataFrame() combined_results = pd.concat(all_results, ignore_index=True) return combined_results def generate_statistics(results): """Generate summary statistics for the analysis""" if results.empty: print("No results to analyze") return # Group by future code and cross type stats = results.groupby(['future_code', 'cross_type']).agg({ 'trading_days': ['count', 'mean', 'std', 'min', 'max'], 'price_change_pct': ['mean', 'std', 'min', 'max'] }).round(2) return stats def plot_results(results): """Plot distribution of trading days and price changes""" if results.empty: print("No results to plot") return fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6)) # Plot trading days distribution for cross_type in ['Upward', 'Downward']: data = results[results['cross_type'] == cross_type]['trading_days'] ax1.hist(data, bins=30, alpha=0.5, label=cross_type) ax1.set_title('Distribution of Trading Days Between Crosses') ax1.set_xlabel('Trading Days') ax1.set_ylabel('Frequency') ax1.legend() # Plot price change distribution for cross_type in ['Upward', 'Downward']: data = results[results['cross_type'] == cross_type]['price_change_pct'] ax2.hist(data, bins=30, alpha=0.5, label=cross_type) ax2.set_title('Distribution of Price Changes') ax2.set_xlabel('Price Change (%)') ax2.set_ylabel('Frequency') ax2.legend() plt.tight_layout() plt.show() def main(): # Set analysis period (e.g., last 2 years) current_year = datetime.datetime.now().year results = analyze_all_futures(current_year - 1, current_year) if not results.empty: # Generate and display statistics stats = generate_statistics(results) print("\nSummary Statistics:") print(stats) # Plot distributions plot_results(results) # Export results to CSV results.to_csv('ma_cross_analysis_results.csv', index=False) stats.to_csv('ma_cross_analysis_stats.csv') if __name__ == "__main__": main()