| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715 |
- # 导入函数库
- from jqdata import *
- from jqdata import finance
- import pandas as pd
- from datetime import date, datetime, timedelta
- import re
- # 趋势突破交易策略 v003
- # 基于均线趋势突破的期货交易策略 - 修复版本
- # 设置以便完整打印 DataFrame
- pd.set_option('display.max_rows', None)
- pd.set_option('display.max_columns', None)
- pd.set_option('display.width', None)
- pd.set_option('display.max_colwidth', 20)
- ## 初始化函数
- def initialize(context):
- # 设定沪深300作为基准
- set_benchmark('000300.XSHG')
- # 开启动态复权模式(真实价格)
- set_option('use_real_price', True)
- log.info('趋势突破交易策略初始化开始')
- ### 期货相关设定 ###
- # 设定账户为金融账户
- set_subportfolios([SubPortfolioConfig(cash=context.portfolio.starting_cash, type='index_futures')])
- # 期货类每笔交易时的手续费
- set_order_cost(OrderCost(open_commission=0.000023, close_commission=0.000023, close_today_commission=0.0023), type='index_futures')
- # 设置期货交易的滑点
- set_slippage(StepRelatedSlippage(2))
-
- # 初始化全局变量
- g.usage_percentage = 0.8 # 最大持股数量
- g.default_days = 10 # 判断趋势线的最小天数
- g.continuous_days_length = 5 # 破趋势后的观察天数
- g.change_direction_days = 5 # 检查均线穿过的天数范围
- g.crossed_symbols_history = {} # 用于存储过去几天的穿越信息
- g.trade_history = {} # 初始化交易记录
- g.change_fail_history = {} # 初始化换月建仓失败的记录
- g.high_low_ma_relations = {}
-
- # 定义默认的保证金比例
- g.default_margin_rates = {
- 'long': {'A': 0.07, 'AG': 0.04, 'AL': 0.05, 'AO': 0.05, 'AP': 0.08, 'AU': 0.04, 'B': 0.05,
- 'BC': 0.13, 'BR': 0.07, 'BU': 0.04, 'C': 0.07, 'CF': 0.05, 'CJ': 0.07, 'CS': 0.07,
- 'CU': 0.05, 'CY': 0.05, 'EB': 0.12, 'EC': 0.12, 'EG': 0.05, 'FG': 0.05, 'FU': 0.08,
- 'HC': 0.04, 'I': 0.1, 'J': 0.22, 'JD': 0.08, 'JM': 0.22,
- 'L': 0.07, 'LC': 0.05, 'LH': 0.1, 'LR': 0.05, 'LU': 0.15, 'M': 0.07, 'MA': 0.05, 'NI': 0.05, 'NR': 0.13, 'OI': 0.05,
- 'P': 0.05, 'PB': 0.05, 'PF': 0.1, 'PG': 0.05, 'PK': 0.05,
- 'PP': 0.07, 'RB': 0.05, 'RI': 0.05, 'RM': 0.05, 'RU': 0.05,
- 'SA': 0.05, 'SC': 0.12, 'SF': 0.05, 'SH': 0.05, 'SI': 0.13, 'SM': 0.05, 'SN': 0.05, 'SP': 0.1, 'SR': 0.05,
- 'SS': 0.05, 'TA': 0.05, 'UR': 0.09, 'V': 0.07,
- 'Y': 0.05, 'ZC': 0.05, 'ZN': 0.05},
- 'short': {'A': 0.07, 'AG': 0.04, 'AL': 0.05, 'AO': 0.05, 'AP': 0.08, 'AU': 0.04, 'B': 0.05,
- 'BC': 0.13, 'BR': 0.07, 'BU': 0.04, 'C': 0.07, 'CF': 0.05, 'CJ': 0.07, 'CS': 0.07,
- 'CU': 0.05, 'CY': 0.05, 'EB': 0.12, 'EC': 0.12, 'EG': 0.05, 'FG': 0.05, 'FU': 0.08,
- 'HC': 0.04, 'I': 0.1, 'J': 0.22, 'JD': 0.08, 'JM': 0.22,
- 'L': 0.07, 'LC': 0.05, 'LH': 0.1, 'LR': 0.05, 'LU': 0.15, 'M': 0.07, 'MA': 0.05, 'NI': 0.05, 'NR': 0.13, 'OI': 0.05,
- 'P': 0.05, 'PB': 0.05, 'PF': 0.1, 'PG': 0.05, 'PK': 0.05,
- 'PP': 0.07, 'RB': 0.05, 'RI': 0.05, 'RM': 0.05, 'RU': 0.05,
- 'SA': 0.05, 'SC': 0.12, 'SF': 0.05, 'SH': 0.05, 'SI': 0.13, 'SM': 0.05, 'SN': 0.05, 'SP': 0.1, 'SR': 0.05,
- 'SS': 0.05, 'TA': 0.05, 'UR': 0.09, 'V': 0.07,
- 'Y': 0.05, 'ZC': 0.05, 'ZN': 0.05}
- }
-
- g.multiplier = {
- 'A': 10, 'AG': 15, 'AL': 5, 'AO': 20, 'AP': 10, 'AU': 1000, 'B': 10,
- 'BC': 5, 'BR': 5, 'BU': 10, 'C': 10, 'CF': 5, 'CJ': 5, 'CS': 10,
- 'CU': 5, 'CY': 5, 'EB': 5, 'EC': 50, 'EG': 10, 'FG': 20, 'FU': 10,
- 'HC': 10, 'I': 100, 'J': 60, 'JD': 5, 'JM': 100,
- 'L': 5, 'LC': 1, 'LH': 16, 'LR': 0.05, 'LU': 10, 'M': 10, 'MA': 10, 'NI': 1, 'NR': 10, 'OI': 10,
- 'P': 10, 'PB': 5, 'PF': 5, 'PG': 20, 'PK': 5,
- 'PP': 5, 'RB': 10, 'RI': 0.05, 'RM': 10, 'RU': 10,
- 'SA': 20, 'SC': 1000, 'SF': 5, 'SH': 30, 'SI': 5, 'SM': 5, 'SN': 1, 'SP': 10, 'SR': 10,
- 'SS': 5, 'TA': 5, 'UR': 20, 'V': 5,
- 'Y': 10, 'ZC': 0.05, 'ZN': 5
- }
-
- # 临时止损检查
- run_daily(loss_control, time='21:15:00', reference_security='IF1808.CCFX')
- run_daily(loss_control, time='21:45:00', reference_security='IF1808.CCFX')
- run_daily(loss_control, time='22:15:00', reference_security='IF1808.CCFX')
- run_daily(loss_control, time='22:45:00', reference_security='IF1808.CCFX')
- run_daily(loss_control, time='09:15:00', reference_security='IF1808.CCFX')
- run_daily(loss_control, time='09:45:00', reference_security='IF1808.CCFX')
- run_daily(loss_control, time='10:15:00', reference_security='IF1808.CCFX')
- run_daily(loss_control, time='10:45:00', reference_security='IF1808.CCFX')
- run_daily(loss_control, time='11:15:00', reference_security='IF1808.CCFX')
- run_daily(loss_control, time='13:15:00', reference_security='IF1808.CCFX')
- run_daily(loss_control, time='13:45:00', reference_security='IF1808.CCFX')
- run_daily(loss_control, time='14:15:00', reference_security='IF1808.CCFX')
- run_daily(loss_control, time='14:56:00', reference_security='IF1808.CCFX')
- # 收盘前运行
- run_daily(before_market_close, time='14:55', reference_security='IF8888.CCFX')
- ############################ 主程序中执行函数 ###################################
- def before_market_close(context):
- """收盘前主策略执行"""
- print("-" * 20 + "New day ending!" + "-" * 20)
- if len(g.trade_history) > 0:
- print(f'当前交易记录: {g.trade_history}, 换月失败记录: {g.change_fail_history}')
-
- # 1. 检查趋势
- trend_symbols, daily_data_info = check_trend(context)
- print_list_elements("趋势标的", trend_symbols)
-
- # 2. 检查影线穿越
- crossed_symbols = check_shadow_cross(context, trend_symbols, daily_data_info)
- print_list_elements("穿越标的", crossed_symbols)
-
- # 3. 检查买入条件
- buy_symbols = check_buy_condition(context, crossed_symbols)
- print_list_elements("买入标的", buy_symbols)
-
- # 4. 执行交易
- subportfolio = context.subportfolios[0]
- hold_symbols = set(subportfolio.long_positions.keys()) | set(subportfolio.short_positions.keys())
-
- for symbol, line_label, line_type, direction in buy_symbols:
- if check_symbol_prefix_match(symbol, hold_symbols):
- print(f'已有相似持仓 {symbol},跳过交易')
- else:
- print(f'无相似持仓 {symbol},执行交易')
- value_to_invest = calculate_order_value(context, symbol, direction)
- open_position(symbol, value_to_invest, direction, line_label)
- def loss_control(context):
- """止损控制"""
- print("-" * 20 + "止损控制" + "-" * 20)
-
- # 检查是否有正常的夜盘
- now = context.current_dt.time()
- now_hour = now.hour
- if now_hour >= 21:
- test_future = get_dominant_future("A")
- if test_future:
- test_data = attribute_history(test_future, 1, '1m', 'close')
- test_hour = test_data.index[0].hour
- if test_hour <= 15:
- print(f'最近的数据小时为: {test_hour},该夜盘不存在,直接停止')
- return
-
- # 先把换月买入失败的重新买入
- if len(g.change_fail_history) > 0:
- print(f'检查换月失败的{g.change_fail_history}')
- for symbol in list(g.change_fail_history.keys()):
- direction = g.change_fail_history[symbol]['direction']
- value_to_invest = calculate_order_value(context, symbol, direction)
- success = open_position(symbol, value_to_invest, direction, g.change_fail_history[symbol].get("line_label", "unknown"))
- if success:
- del g.change_fail_history[symbol]
-
- # 检查损失
- target_time = datetime.strptime('14:55:00', '%H:%M:%S').time()
- potential_future_list = get_potential_future_list(context)
-
- # 遍历所有持仓
- subportfolio = context.subportfolios[0]
- all_positions = list(subportfolio.long_positions.values()) + list(subportfolio.short_positions.values())
-
- for position in all_positions:
- if position.security in potential_future_list:
- # 检查固定止损
- if check_loss_for_close(context, position, position.side):
- continue
-
- # 检查基于动态跟踪线的平仓条件
- if now > target_time:
- check_ma_for_close(context, position, 0.01, 4)
- else:
- check_ma_for_close(context, position, 0.003, 4)
- ############################ 交易模块 ###################################
- def order_target_value_(security, value, direction):
- """自定义下单"""
- if value == 0:
- log.debug("平仓 %s" % (security))
- else:
- log.debug("下单 %s 金额 %f" % (security, value))
- return order_target_value(security, value, side=direction)
- def open_position(security, value, direction, line_label):
- """开仓"""
- order = order_target_value_(security, value, direction)
- if order is not None and order.filled > 0:
- print(f'成功开仓 {security} 方向 {direction} 线标 {line_label}')
- g.trade_history[security] = {
- 'entry_price': get_current_data()[security].last_price,
- 'position_value': value,
- 'direction': direction,
- 'line_label': line_label,
- 'finish_time': order.finish_time
- }
- return True
- return False
- def close_position(position, direction):
- """平仓"""
- security = position.security
- order = order_target_value_(security, 0, direction)
- if order is not None:
- if order.status == OrderStatus.held and order.filled == order.amount:
- # 如果成功平仓,从交易历史中移除该标的
- if security in g.trade_history:
- del g.trade_history[security]
- return True
- return False
- ############################ 策略核心函数 ###################################
- def get_potential_future_list(context):
- """获取可交易的期货品种列表"""
- potential_night_list = ['NI', 'CF', 'PF', 'Y', 'M', 'B', 'SN', 'RM']
- potential_day_list = ['JD', 'UR']
-
- if str(context.current_dt.time())[:2] in ('21', '22'):
- potential_icon_list = potential_night_list
- else:
- potential_icon_list = potential_day_list + potential_night_list
-
- potential_future_list = []
- for i in potential_icon_list:
- dominant_future = get_dominant_future(i)
- if dominant_future:
- potential_future_list.append(dominant_future)
-
- return potential_future_list
- def check_trend(context):
- """检查品类的主连是否满足形成趋势的条件"""
- print("-" * 20 + "检查趋势" + "-" * 20)
- trend_symbols = []
- daily_data_info = {}
-
- # 获取可以交易的所有标的主连
- potential_future_list = get_potential_future_list(context)
- potential_future_list = [item for item in potential_future_list if item not in g.trade_history.keys()]
-
- # 针对所有标的需要的基础数据
- ma_crosses_dict = {}
- for symbol in potential_future_list:
- # 获取50天的收盘价数据
- close_data = attribute_history(symbol, 50, '1d', ['close', 'high', 'low', 'open'], df=True)
- close_series = close_data['close']
- # 计算移动平均线
- ma5 = close_series.rolling(window=5).mean()
- ma10 = close_series.rolling(window=10).mean()
- ma20 = close_series.rolling(window=20).mean()
- ma30 = close_series.rolling(window=30).mean()
- trend_info = {'symbol': symbol, 'trend_lines': []}
- daily_data = []
- # 初始化连续天数计数器
- continuous_days = {
- 'above_ma5': [0] * g.continuous_days_length,
- 'below_ma5': [0] * g.continuous_days_length,
- 'above_ma10': [0] * g.continuous_days_length,
- 'below_ma10': [0] * g.continuous_days_length,
- 'above_ma20': [0] * g.continuous_days_length,
- 'below_ma20': [0] * g.continuous_days_length,
- 'above_ma30': [0] * g.continuous_days_length,
- 'below_ma30': [0] * g.continuous_days_length
- }
-
- # 获取连续在某一个均线上或者下的天数
- for i in range(len(close_series)):
- update_continuous_days(continuous_days, close_series[i], ma5[i], ma10[i], ma20[i], ma30[i], g.continuous_days_length)
- # 收集每日数据
- day_data = {
- 'date': close_data.index[i].date(),
- 'close': close_series[i],
- 'high': close_data['high'][i],
- 'low': close_data['low'][i],
- 'open': close_data['open'][i],
- 'ma5': ma5[i],
- 'ma10': ma10[i],
- 'ma20': ma20[i],
- 'ma30': ma30[i],
- 'continuous_above_ma5': continuous_days['above_ma5'].copy(),
- 'continuous_below_ma5': continuous_days['below_ma5'].copy(),
- 'continuous_above_ma10': continuous_days['above_ma10'].copy(),
- 'continuous_below_ma10': continuous_days['below_ma10'].copy(),
- 'continuous_above_ma20': continuous_days['above_ma20'].copy(),
- 'continuous_below_ma20': continuous_days['below_ma20'].copy(),
- 'continuous_above_ma30': continuous_days['above_ma30'].copy(),
- 'continuous_below_ma30': continuous_days['below_ma30'].copy()
- }
- daily_data.append(day_data)
-
- daily_data_info[symbol] = daily_data
-
- # 检查过去一定天数内均线的相交次数
- ma_crosses_dict[symbol] = count_ma_crosses(daily_data_info[symbol], g.change_direction_days)
-
- # 检查哪些均线会形成什么类型的趋势
- for ma_type in ['ma5', 'ma10', 'ma20', 'ma30']:
- above_days = continuous_days[f'above_{ma_type}']
- below_days = continuous_days[f'below_{ma_type}']
-
- above_condition = any(day >= g.default_days for day in above_days)
- below_condition = any(day >= g.default_days for day in below_days)
-
- if above_condition:
- trend_info['trend_lines'].append((ma_type, 'support'))
-
- if below_condition:
- trend_info['trend_lines'].append((ma_type, 'resistance'))
- if trend_info['trend_lines']:
- trend_symbols.append(trend_info)
-
- # 去除掉在一段时间内均线过于频繁交叉的对象
- valid_trend_symbols = []
-
- for trend_info in trend_symbols:
- symbol = trend_info["symbol"]
- ma_crosses = ma_crosses_dict[symbol]
-
- if ma_crosses <= 3:
- valid_trend_symbols.append(trend_info)
-
- return valid_trend_symbols, daily_data_info
- def check_shadow_cross(context, trend_symbols, daily_data_info):
- """检查影线穿越"""
- print("-" * 20 + "检查影线穿越" + "-" * 20)
-
- # 检查并进行换月
- switch_result = position_auto_switch(context)
- if switch_result:
- print(f'换月结果: {switch_result}')
-
- today_crossed_symbols = {}
-
- for trend_info in trend_symbols:
- symbol = trend_info['symbol']
-
- # 获取昨天的数据
- yesterday_data = daily_data_info[symbol][-1]
-
- # 临时存储当前标的的穿越信息
- symbol_crosses = []
- for line_label, line_type in trend_info['trend_lines']:
- yesterday_day = yesterday_data['date']
- close = yesterday_data['close']
- high = yesterday_data['high']
- low = yesterday_data['low']
- ma_value = yesterday_data[line_label]
- close_ma_difference = (close - ma_value) / ma_value
-
- # 删除指定的键
- yesterday_data_copy = yesterday_data.copy()
- keys_to_remove = ['continuous_above_ma5', 'continuous_below_ma5', 'continuous_above_ma10', 'continuous_below_ma10', 'continuous_above_ma20', 'continuous_below_ma20', 'continuous_above_ma30', 'continuous_below_ma30']
- for key in keys_to_remove:
- yesterday_data_copy.pop(key, None)
- # 判断上一个交易日完整的数据中是否穿越
- if (line_type == 'support' and low < ma_value and close_ma_difference <= 0.0005) or (line_type == 'resistance' and high > ma_value and close_ma_difference >= -0.0005):
- symbol_crosses.append({
- 'date': yesterday_day,
- 'symbol': symbol,
- 'line_label': line_label,
- 'line_type': line_type,
- 'direction': 'long' if line_type == 'support' else 'short',
- 'latest_data': yesterday_data_copy
- })
- print(f"{symbol} 穿越 {line_label} ({line_type})")
- # 从符合条件的趋势线中选择数字较大的
- if symbol_crosses:
- best_cross = max(symbol_crosses, key=lambda x: int(''.join(filter(str.isdigit, x['line_label']))))
- update_crossed_symbols_history(context, best_cross)
-
- # 针对出现趋势变化的检查均线顺序、破趋势天数、收盘价和均线的最高点关系
- for symbol, records in g.crossed_symbols_history.items():
- latest_record = max(records, key=lambda x: x['date'])
-
- # 检查各种条件
- relation_check = check_ma_relations(context, symbol)
- duration_check, ma_close = check_cross_details(context, symbol)
-
- if relation_check and duration_check and ma_close:
- print(f'{symbol}满足所有条件,加入购买清单')
- today_crossed_symbols[symbol] = [latest_record]
-
- return [item for sublist in today_crossed_symbols.values() for item in sublist]
- def check_buy_condition(context, crossed_symbols):
- """检查买入条件"""
- print("-" * 20 + "检查买入条件" + "-" * 20)
- buy_symbols = []
- for crossed_symbol in crossed_symbols:
- symbol = crossed_symbol['symbol']
- line_label = crossed_symbol['line_label']
- line_type = crossed_symbol['line_type']
-
- # 获取最新数据
- latest_data = crossed_symbol['latest_data']
- ma_value = latest_data[line_label]
- close_price = latest_data['close']
- today_change = (latest_data['close'] - latest_data['open']) / latest_data['open']
- if line_label != "ma5":
- if line_type == 'support' and 1.005 * ma_value <= close_price <= 1.02 * ma_value and today_change >= -0.002:
- buy_symbols.append((symbol, line_label, 'support', 'long'))
- elif line_type == 'resistance' and 0.98 * ma_value <= close_price <= 0.995 * ma_value and today_change <= 0.002:
- buy_symbols.append((symbol, line_label, 'resistance', 'short'))
- return buy_symbols
- ############################ 辅助函数 ###################################
- def calculate_order_value(context, security, direction):
- """计算可以用于交易的金额"""
- current_price = get_current_data()[security].last_price
- underlying_symbol = security.split('.')[0][:-4]
- # 获取保证金比例
- margin_rate = g.default_margin_rates.get(direction, {}).get(underlying_symbol, 0.10)
- # 获取合约乘数
- multiplier = g.multiplier.get(underlying_symbol, 10)
- # 计算单手保证金
- single_hand_margin = current_price * multiplier * margin_rate
- # 根据单手保证金决定购买手数
- if single_hand_margin <= 20000:
- total_margin = 20000
- else:
- total_margin = single_hand_margin
- return total_margin
- def update_continuous_days(continuous_days, close, ma5, ma10, ma20, ma30, length):
- """更新连续天数"""
- for key in ['above_ma5', 'below_ma5', 'above_ma10', 'below_ma10', 'above_ma20', 'below_ma20', 'above_ma30', 'below_ma30']:
- condition_met = False
- if ((key == 'above_ma5' and close > ma5) or
- (key == 'below_ma5' and close < ma5) or
- (key == 'above_ma10' and close > ma10) or
- (key == 'below_ma10' and close < ma10) or
- (key == 'above_ma20' and close > ma20) or
- (key == 'below_ma20' and close < ma20) or
- (key == 'above_ma30' and close > ma30) or
- (key == 'below_ma30' and close < ma30)):
- condition_met = True
-
- if condition_met:
- continuous_days[key].insert(0, continuous_days[key][0] + 1)
- else:
- continuous_days[key].insert(0, 0)
- continuous_days[key] = continuous_days[key][:length]
- def count_ma_crosses(future_data, days):
- """计算均线交叉次数"""
- recent_data = future_data[-days:]
- ma5 = [day['ma5'] for day in recent_data]
- ma10 = [day['ma10'] for day in recent_data]
- ma20 = [day['ma20'] for day in recent_data]
- ma30 = [day['ma30'] for day in recent_data]
-
- cross_5_10 = sum([1 for i in range(1, len(ma5)) if ((ma5[-i] > ma10[-i] and ma5[-i-1] < ma10[-i-1]) or
- (ma5[-i] < ma10[-i] and ma5[-i-1] > ma10[-i-1]))])
- cross_5_20 = sum([1 for i in range(1, len(ma5)) if ((ma5[-i] > ma20[-i] and ma5[-i-1] < ma20[-i-1]) or
- (ma5[-i] < ma20[-i] and ma5[-i-1] > ma20[-i-1]))])
- cross_5_30 = sum([1 for i in range(1, len(ma5)) if ((ma5[-i] > ma30[-i] and ma5[-i-1] < ma30[-i-1]) or
- (ma5[-i] < ma30[-i] and ma5[-i-1] > ma30[-i-1]))])
- cross_10_20 = sum([1 for i in range(1, len(ma10)) if ((ma10[-i] > ma20[-i] and ma10[-i-1] < ma20[-i-1]) or
- (ma10[-i] < ma20[-i] and ma10[-i-1] > ma20[-i-1]))])
- cross_10_30 = sum([1 for i in range(1, len(ma10)) if ((ma10[-i] > ma30[-i] and ma10[-i-1] < ma30[-i-1]) or
- (ma10[-i] < ma30[-i] and ma10[-i-1] > ma30[-i-1]))])
- cross_20_30 = sum([1 for i in range(1, len(ma20)) if ((ma20[-i] > ma30[-i] and ma20[-i-1] < ma30[-i-1]) or
- (ma20[-i] < ma30[-i] and ma20[-i-1] > ma30[-i-1]))])
-
- return cross_5_10 + cross_5_20 + cross_5_30 + cross_10_20 + cross_10_30 + cross_20_30
- def update_crossed_symbols_history(context, new_record):
- """更新穿越历史记录"""
- symbol = new_record['symbol']
-
- if symbol not in g.crossed_symbols_history:
- g.crossed_symbols_history[symbol] = []
-
- g.crossed_symbols_history[symbol].append(new_record)
- def check_ma_relations(context, symbol):
- """检查MA均线之间的关系"""
- if symbol not in g.crossed_symbols_history:
- return False
-
- latest_record = g.crossed_symbols_history[symbol][-1]
- line_label = latest_record['line_label'].lower()
- line_type = latest_record['line_type']
- MA_values = latest_record['latest_data']
-
- MA5 = MA_values['ma5']
- MA10 = MA_values['ma10']
- MA20 = MA_values['ma20']
- MA30 = MA_values['ma30']
-
- conditions = {
- ('ma10', 'resistance'): MA5 > MA10 > MA20 > MA30,
- ('ma10', 'support'): MA30 > MA20 > MA10 > MA5,
- ('ma20', 'resistance'): (MA30 >= MA20 * 0.999 and MA5 >= MA10 * 0.999) or (MA30 > MA20 > MA10 > MA5),
- ('ma20', 'support'): (MA20 >= MA30 * 0.999 and MA10 >= MA5 * 0.999) or (MA5 > MA10 > MA20 > MA30),
- ('ma30', 'resistance'): (MA30 >= MA20 * 0.999 and MA5 >= MA10 * 0.999) or (MA30 > MA20 > MA10 > MA5),
- ('ma30', 'support'): (MA20 >= MA30 * 0.999 and MA10 >= MA5 * 0.999) or (MA5 > MA10 > MA20 > MA30),
- }
-
- return conditions.get((line_label, line_type), False)
- def check_cross_details(context, symbol):
- """检查穿越详情"""
- if symbol not in g.crossed_symbols_history:
- return False, False
-
- all_records = g.crossed_symbols_history[symbol]
- if not all_records:
- return False, False
- # 检查穿越天数
- first_day = all_records[0]['date']
- today = context.current_dt.date()
- all_days = get_trade_days(first_day, today)
- duration_length = len(all_days)
- cross_duration = duration_length <= 6
- # 检查价格与均线的关系
- ma_close = False
- for record in all_records:
- line_label = record['line_label']
- ma_price = record['latest_data'][line_label]
- close = record['latest_data']['close']
- ma_close_rate = abs((close - ma_price) / ma_price)
- if ma_close_rate <= 0.02:
- ma_close = True
- return cross_duration, ma_close
- def check_loss_for_close(context, position, direction, initial_loss_limit=-4000, loss_increment_per_day=200):
- """检查固定止损"""
- if position.security not in g.trade_history:
- return False
-
- trade_info = g.trade_history.get(position.security, {})
- finish_time = trade_info.get('finish_time')
-
- # 获取持仓天数
- holding_days = 0
- if finish_time:
- finish_date = finish_time.date()
- current_date = context.current_dt.date()
- all_trade_days = get_all_trade_days()
- holding_days = sum((finish_date <= d <= current_date) for d in all_trade_days)
-
- # 调整损失限制
- adjusted_loss_limit = initial_loss_limit + holding_days * loss_increment_per_day
-
- # 计算盈亏
- multiplier = position.value / (position.total_amount * position.price)
- if direction == 'long':
- revenue = multiplier * (position.price - position.acc_avg_cost) * position.total_amount
- else:
- revenue = multiplier * (position.acc_avg_cost - position.price) * position.total_amount
- if revenue < adjusted_loss_limit:
- close_position(position, g.trade_history[position.security]['direction'])
- return True
- return False
- def check_ma_for_close(context, position, offset_ratio, days_for_adjustment):
- """根据均线止损"""
- security = position.security
- if security not in g.trade_history:
- return False
-
- trade_info = g.trade_history.get(position.security, {})
- finish_time = trade_info.get('finish_time')
- line_label = trade_info.get('line_label')
-
- # 获取持仓天数
- holding_days = 0
- if finish_time:
- finish_date = finish_time.date()
- current_date = context.current_dt.date()
- all_trade_days = get_all_trade_days()
- holding_days = sum((finish_date <= d <= current_date) for d in all_trade_days)
-
- # 计算变化率
- today_price = get_current_data()[position.security].last_price
- avg_daily_change_rate = calculate_average_daily_change_rate(position.security)
- historical_data = attribute_history(position.security, 1, '1d', ['close'])
- yesterday_close = historical_data['close'].iloc[-1]
- today_change_rate = abs((today_price - yesterday_close) / yesterday_close)
- # 选择止损均线
- close_line = None
- if today_change_rate >= 1.5 * avg_daily_change_rate:
- close_line = 'ma5'
- elif holding_days <= days_for_adjustment:
- close_line = line_label
- else:
- close_line = 'ma5' if today_change_rate >= 1.2 * avg_daily_change_rate else 'ma10'
-
- # 计算MA值并进行平仓判断
- ma_values = calculate_ma_values(position.security, [5, 10, 20, 30])
- adjusted_ma_value = ma_values[close_line] * (1 + offset_ratio if position.side == 'short' else 1 - offset_ratio)
- if (position.side == 'long' and today_price < adjusted_ma_value) or \
- (position.side == 'short' and today_price > adjusted_ma_value):
- close_position(position, g.trade_history[position.security]['direction'])
- return True
- return False
- def calculate_average_daily_change_rate(security, days=30):
- """计算日均变化率"""
- historical_data = attribute_history(security, days + 1, '1d', ['close'])
- daily_change_rates = abs(historical_data['close'].pct_change()).iloc[1:]
- return daily_change_rates.mean()
- def calculate_ma_values(security, ma_periods):
- """计算MA值"""
- historical_data = attribute_history(security, max(ma_periods), '1d', ['close'])
- today_price = get_current_data()[security].last_price
- close_prices = historical_data['close'].tolist() + [today_price]
- ma_values = {f'ma{period}': sum(close_prices[-period:]) / period for period in ma_periods}
- return ma_values
- def check_symbol_prefix_match(symbol, hold_symbols):
- """检查是否有相似持仓"""
- symbol_prefix = symbol[:-9]
-
- for hold_symbol in hold_symbols:
- hold_symbol_prefix = hold_symbol[:-9]
- if symbol_prefix == hold_symbol_prefix:
- return True
- return False
- def print_list_elements(title, elements):
- """打印列表元素"""
- print(f"{title}:")
- for item in elements:
- print(item)
- ############################ 移仓换月函数 ###################################
- def position_auto_switch(context, pindex=0):
- """期货自动移仓换月"""
- import re
- subportfolio = context.subportfolios[pindex]
- symbols = set(subportfolio.long_positions.keys()) | set(subportfolio.short_positions.keys())
- switch_result = []
-
- for symbol in symbols:
- match = re.match(r"(?P<underlying_symbol>[A-Z]{1,})", symbol)
- if not match:
- continue
-
- dominant = get_dominant_future(match.groupdict()["underlying_symbol"])
- if not dominant or dominant <= symbol:
- continue
-
- cur = get_current_data()
- symbol_last_price = cur[symbol].last_price
- dominant_last_price = cur[dominant].last_price
-
- for positions_ in (subportfolio.long_positions, subportfolio.short_positions):
- if symbol not in positions_.keys():
- continue
-
- position = positions_[symbol]
- amount = position.total_amount
- side = position.side
-
- # 检查涨跌停限制
- if side == "long":
- symbol_low_limit = cur[symbol].low_limit
- dominant_high_limit = cur[dominant].high_limit
- if symbol_last_price <= symbol_low_limit or dominant_last_price >= dominant_high_limit:
- continue
- else:
- symbol_high_limit = cur[symbol].high_limit
- dominant_low_limit = cur[dominant].low_limit
- if symbol_last_price >= symbol_high_limit or dominant_last_price <= dominant_low_limit:
- continue
-
- # 执行移仓换月
- order_old = order_target(symbol, 0, side=side)
- if order_old is not None and order_old.filled > 0:
- order_new = order_target(dominant, amount, side=side)
- if order_new is not None and order_new.filled > 0:
- # 换月成功,更新交易记录
- if symbol in g.trade_history:
- g.trade_history[dominant] = g.trade_history[symbol]
- del g.trade_history[symbol]
- switch_result.append({"before": symbol, "after": dominant, "side": side})
- print(f"移仓换月成功: {symbol} -> {dominant}")
- else:
- # 换月失败,记录失败信息
- if symbol in g.trade_history:
- g.change_fail_history[dominant] = g.trade_history[symbol]
- del g.trade_history[symbol]
- print(f"移仓换月失败: {symbol} -> {dominant}")
-
- return switch_result
|