| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175 |
- import pandas as pd
- import numpy as np
- from jqdata import *
- import datetime
- import matplotlib.pyplot as plt
- def get_all_key_info(the_type='futures'):
- """Get all futures main contracts information"""
- main = get_all_securities(types=[the_type]).reset_index()
- the_main = main[main.display_name.str.endswith('主力合约')]
- the_main.rename(columns={'index': 'code'}, inplace=True)
- all_future_list = the_main['code'].unique()
- return the_main, all_future_list
- def get_period_range(start_year, end_year):
- """Get trading period range between years"""
- now = datetime.datetime.now()
- start_date = datetime.datetime(start_year, 1, 1)
- end_date = datetime.datetime(end_year, 12, 31)
- if end_year == now.year and now < end_date:
- end_date = now
- trade_days = get_trade_days(start_date=start_date, end_date=end_date)
- actual_start_date = trade_days[0]
- actual_end_date = trade_days[-1]
- print(f'Analysis period: {actual_start_date} to {actual_end_date}')
- return actual_start_date, actual_end_date
- def get_future_data(future_code, start_date, end_date):
- """Get price data for a single future contract"""
- data = get_price(future_code,
- start_date=start_date,
- end_date=end_date,
- frequency='daily',
- fields=['open', 'close', 'high', 'low', 'volume'],
- skip_paused=False,
- panel=False)
-
- if data is None or len(data) == 0:
- return None
-
- # Create a copy to avoid SettingWithCopyWarning
- data = data.copy()
-
- # Calculate 5-day MA
- data['MA5'] = data['close'].rolling(window=5).mean()
- return data
- def analyze_ma_crosses(data):
- """Analyze MA crosses and calculate statistics"""
- if data is None or len(data) < 5: # Need at least 5 days for MA5
- return pd.DataFrame()
-
- # Create a copy of the data to avoid warnings
- data = data.copy()
-
- # Initialize cross detection
- data['cross_up'] = (data['open'] < data['MA5']) & (data['close'] > data['MA5'])
- data['cross_down'] = (data['open'] > data['MA5']) & (data['close'] < data['MA5'])
-
- # Get indices where crosses occur
- cross_dates = data.index[data['cross_up'] | data['cross_down']].tolist()
-
- results = []
- for i in range(len(cross_dates) - 1):
- current_date = cross_dates[i]
- next_date = cross_dates[i + 1]
-
- # Get cross type
- is_up_cross = data.loc[current_date, 'cross_up']
-
- # Calculate trading days between crosses
- trading_days = len(data.loc[current_date:next_date].index) - 1
-
- # Calculate price change
- start_price = data.loc[current_date, 'close']
- end_price = data.loc[next_date, 'close']
- price_change_pct = (end_price - start_price) / start_price * 100
-
- results.append({
- 'cross_date': current_date,
- 'next_cross_date': next_date,
- 'cross_type': 'Upward' if is_up_cross else 'Downward',
- 'trading_days': trading_days,
- 'price_change_pct': price_change_pct
- })
-
- return pd.DataFrame(results)
- def analyze_all_futures(start_year, end_year):
- """Analyze MA crosses for all futures contracts"""
- start_date, end_date = get_period_range(start_year, end_year)
- all_future_df, all_future_list = get_all_key_info()
-
- all_results = []
-
- for future in all_future_list:
- print(f'Analyzing {future}...')
- data = get_future_data(future, start_date, end_date)
- if data is not None and len(data) >= 5:
- results = analyze_ma_crosses(data)
- if not results.empty:
- results['future_code'] = future
- all_results.append(results)
-
- if not all_results:
- return pd.DataFrame()
-
- combined_results = pd.concat(all_results, ignore_index=True)
- return combined_results
- def generate_statistics(results):
- """Generate summary statistics for the analysis"""
- if results.empty:
- print("No results to analyze")
- return
-
- # Group by future code and cross type
- stats = results.groupby(['future_code', 'cross_type']).agg({
- 'trading_days': ['count', 'mean', 'std', 'min', 'max'],
- 'price_change_pct': ['mean', 'std', 'min', 'max']
- }).round(2)
-
- return stats
- def plot_results(results):
- """Plot distribution of trading days and price changes"""
- if results.empty:
- print("No results to plot")
- return
-
- fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))
-
- # Plot trading days distribution
- for cross_type in ['Upward', 'Downward']:
- data = results[results['cross_type'] == cross_type]['trading_days']
- ax1.hist(data, bins=30, alpha=0.5, label=cross_type)
- ax1.set_title('Distribution of Trading Days Between Crosses')
- ax1.set_xlabel('Trading Days')
- ax1.set_ylabel('Frequency')
- ax1.legend()
-
- # Plot price change distribution
- for cross_type in ['Upward', 'Downward']:
- data = results[results['cross_type'] == cross_type]['price_change_pct']
- ax2.hist(data, bins=30, alpha=0.5, label=cross_type)
- ax2.set_title('Distribution of Price Changes')
- ax2.set_xlabel('Price Change (%)')
- ax2.set_ylabel('Frequency')
- ax2.legend()
-
- plt.tight_layout()
- plt.show()
- def main():
- # Set analysis period (e.g., last 2 years)
- current_year = datetime.datetime.now().year
- results = analyze_all_futures(current_year - 1, current_year)
-
- if not results.empty:
- # Generate and display statistics
- stats = generate_statistics(results)
- print("\nSummary Statistics:")
- print(stats)
-
- # Plot distributions
- plot_results(results)
-
- # Export results to CSV
- results.to_csv('ma_cross_analysis_results.csv', index=False)
- stats.to_csv('ma_cross_analysis_stats.csv')
- if __name__ == "__main__":
- main()
|