# 导入函数库 from jqdata import * from jqdata import finance import pandas as pd from datetime import date, datetime, timedelta import re # 趋势突破交易策略 v003 # 基于均线趋势突破的期货交易策略 - 修复版本 # 设置以便完整打印 DataFrame pd.set_option('display.max_rows', None) pd.set_option('display.max_columns', None) pd.set_option('display.width', None) pd.set_option('display.max_colwidth', 20) ## 初始化函数 def initialize(context): # 设定沪深300作为基准 set_benchmark('000300.XSHG') # 开启动态复权模式(真实价格) set_option('use_real_price', True) log.info('趋势突破交易策略初始化开始') ### 期货相关设定 ### # 设定账户为金融账户 set_subportfolios([SubPortfolioConfig(cash=context.portfolio.starting_cash, type='index_futures')]) # 期货类每笔交易时的手续费 set_order_cost(OrderCost(open_commission=0.000023, close_commission=0.000023, close_today_commission=0.0023), type='index_futures') # 设置期货交易的滑点 set_slippage(StepRelatedSlippage(2)) # 初始化全局变量 g.usage_percentage = 0.8 # 最大持股数量 g.default_days = 10 # 判断趋势线的最小天数 g.continuous_days_length = 5 # 破趋势后的观察天数 g.change_direction_days = 5 # 检查均线穿过的天数范围 g.crossed_symbols_history = {} # 用于存储过去几天的穿越信息 g.trade_history = {} # 初始化交易记录 g.change_fail_history = {} # 初始化换月建仓失败的记录 g.high_low_ma_relations = {} # 定义默认的保证金比例 g.default_margin_rates = { 'long': {'A': 0.07, 'AG': 0.04, 'AL': 0.05, 'AO': 0.05, 'AP': 0.08, 'AU': 0.04, 'B': 0.05, 'BC': 0.13, 'BR': 0.07, 'BU': 0.04, 'C': 0.07, 'CF': 0.05, 'CJ': 0.07, 'CS': 0.07, 'CU': 0.05, 'CY': 0.05, 'EB': 0.12, 'EC': 0.12, 'EG': 0.05, 'FG': 0.05, 'FU': 0.08, 'HC': 0.04, 'I': 0.1, 'J': 0.22, 'JD': 0.08, 'JM': 0.22, 'L': 0.07, 'LC': 0.05, 'LH': 0.1, 'LR': 0.05, 'LU': 0.15, 'M': 0.07, 'MA': 0.05, 'NI': 0.05, 'NR': 0.13, 'OI': 0.05, 'P': 0.05, 'PB': 0.05, 'PF': 0.1, 'PG': 0.05, 'PK': 0.05, 'PP': 0.07, 'RB': 0.05, 'RI': 0.05, 'RM': 0.05, 'RU': 0.05, 'SA': 0.05, 'SC': 0.12, 'SF': 0.05, 'SH': 0.05, 'SI': 0.13, 'SM': 0.05, 'SN': 0.05, 'SP': 0.1, 'SR': 0.05, 'SS': 0.05, 'TA': 0.05, 'UR': 0.09, 'V': 0.07, 'Y': 0.05, 'ZC': 0.05, 'ZN': 0.05}, 'short': {'A': 0.07, 'AG': 0.04, 'AL': 0.05, 'AO': 0.05, 'AP': 0.08, 'AU': 0.04, 'B': 0.05, 'BC': 0.13, 'BR': 0.07, 'BU': 0.04, 'C': 0.07, 'CF': 0.05, 'CJ': 0.07, 'CS': 0.07, 'CU': 0.05, 'CY': 0.05, 'EB': 0.12, 'EC': 0.12, 'EG': 0.05, 'FG': 0.05, 'FU': 0.08, 'HC': 0.04, 'I': 0.1, 'J': 0.22, 'JD': 0.08, 'JM': 0.22, 'L': 0.07, 'LC': 0.05, 'LH': 0.1, 'LR': 0.05, 'LU': 0.15, 'M': 0.07, 'MA': 0.05, 'NI': 0.05, 'NR': 0.13, 'OI': 0.05, 'P': 0.05, 'PB': 0.05, 'PF': 0.1, 'PG': 0.05, 'PK': 0.05, 'PP': 0.07, 'RB': 0.05, 'RI': 0.05, 'RM': 0.05, 'RU': 0.05, 'SA': 0.05, 'SC': 0.12, 'SF': 0.05, 'SH': 0.05, 'SI': 0.13, 'SM': 0.05, 'SN': 0.05, 'SP': 0.1, 'SR': 0.05, 'SS': 0.05, 'TA': 0.05, 'UR': 0.09, 'V': 0.07, 'Y': 0.05, 'ZC': 0.05, 'ZN': 0.05} } g.multiplier = { 'A': 10, 'AG': 15, 'AL': 5, 'AO': 20, 'AP': 10, 'AU': 1000, 'B': 10, 'BC': 5, 'BR': 5, 'BU': 10, 'C': 10, 'CF': 5, 'CJ': 5, 'CS': 10, 'CU': 5, 'CY': 5, 'EB': 5, 'EC': 50, 'EG': 10, 'FG': 20, 'FU': 10, 'HC': 10, 'I': 100, 'J': 60, 'JD': 5, 'JM': 100, 'L': 5, 'LC': 1, 'LH': 16, 'LR': 0.05, 'LU': 10, 'M': 10, 'MA': 10, 'NI': 1, 'NR': 10, 'OI': 10, 'P': 10, 'PB': 5, 'PF': 5, 'PG': 20, 'PK': 5, 'PP': 5, 'RB': 10, 'RI': 0.05, 'RM': 10, 'RU': 10, 'SA': 20, 'SC': 1000, 'SF': 5, 'SH': 30, 'SI': 5, 'SM': 5, 'SN': 1, 'SP': 10, 'SR': 10, 'SS': 5, 'TA': 5, 'UR': 20, 'V': 5, 'Y': 10, 'ZC': 0.05, 'ZN': 5 } # 临时止损检查 run_daily(loss_control, time='21:15:00', reference_security='IF1808.CCFX') run_daily(loss_control, time='21:45:00', reference_security='IF1808.CCFX') run_daily(loss_control, time='22:15:00', reference_security='IF1808.CCFX') run_daily(loss_control, time='22:45:00', reference_security='IF1808.CCFX') run_daily(loss_control, time='09:15:00', reference_security='IF1808.CCFX') run_daily(loss_control, time='09:45:00', reference_security='IF1808.CCFX') run_daily(loss_control, time='10:15:00', reference_security='IF1808.CCFX') run_daily(loss_control, time='10:45:00', reference_security='IF1808.CCFX') run_daily(loss_control, time='11:15:00', reference_security='IF1808.CCFX') run_daily(loss_control, time='13:15:00', reference_security='IF1808.CCFX') run_daily(loss_control, time='13:45:00', reference_security='IF1808.CCFX') run_daily(loss_control, time='14:15:00', reference_security='IF1808.CCFX') run_daily(loss_control, time='14:56:00', reference_security='IF1808.CCFX') # 收盘前运行 run_daily(before_market_close, time='14:55', reference_security='IF8888.CCFX') ############################ 主程序中执行函数 ################################### def before_market_close(context): """收盘前主策略执行""" print("-" * 20 + "New day ending!" + "-" * 20) if len(g.trade_history) > 0: print(f'当前交易记录: {g.trade_history}, 换月失败记录: {g.change_fail_history}') # 1. 检查趋势 trend_symbols, daily_data_info = check_trend(context) print_list_elements("趋势标的", trend_symbols) # 2. 检查影线穿越 crossed_symbols = check_shadow_cross(context, trend_symbols, daily_data_info) print_list_elements("穿越标的", crossed_symbols) # 3. 检查买入条件 buy_symbols = check_buy_condition(context, crossed_symbols) print_list_elements("买入标的", buy_symbols) # 4. 执行交易 subportfolio = context.subportfolios[0] hold_symbols = set(subportfolio.long_positions.keys()) | set(subportfolio.short_positions.keys()) for symbol, line_label, line_type, direction in buy_symbols: if check_symbol_prefix_match(symbol, hold_symbols): print(f'已有相似持仓 {symbol},跳过交易') else: print(f'无相似持仓 {symbol},执行交易') value_to_invest = calculate_order_value(context, symbol, direction) open_position(symbol, value_to_invest, direction, line_label) def loss_control(context): """止损控制""" print("-" * 20 + "止损控制" + "-" * 20) # 检查是否有正常的夜盘 now = context.current_dt.time() now_hour = now.hour if now_hour >= 21: test_future = get_dominant_future("A") if test_future: test_data = attribute_history(test_future, 1, '1m', 'close') test_hour = test_data.index[0].hour if test_hour <= 15: print(f'最近的数据小时为: {test_hour},该夜盘不存在,直接停止') return # 先把换月买入失败的重新买入 if len(g.change_fail_history) > 0: print(f'检查换月失败的{g.change_fail_history}') for symbol in list(g.change_fail_history.keys()): direction = g.change_fail_history[symbol]['direction'] value_to_invest = calculate_order_value(context, symbol, direction) success = open_position(symbol, value_to_invest, direction, g.change_fail_history[symbol].get("line_label", "unknown")) if success: del g.change_fail_history[symbol] # 检查损失 target_time = datetime.strptime('14:55:00', '%H:%M:%S').time() potential_future_list = get_potential_future_list(context) # 遍历所有持仓 subportfolio = context.subportfolios[0] all_positions = list(subportfolio.long_positions.values()) + list(subportfolio.short_positions.values()) for position in all_positions: if position.security in potential_future_list: # 检查固定止损 if check_loss_for_close(context, position, position.side): continue # 检查基于动态跟踪线的平仓条件 if now > target_time: check_ma_for_close(context, position, 0.01, 4) else: check_ma_for_close(context, position, 0.003, 4) ############################ 交易模块 ################################### def order_target_value_(security, value, direction): """自定义下单""" if value == 0: log.debug("平仓 %s" % (security)) else: log.debug("下单 %s 金额 %f" % (security, value)) return order_target_value(security, value, side=direction) def open_position(security, value, direction, line_label): """开仓""" order = order_target_value_(security, value, direction) if order is not None and order.filled > 0: print(f'成功开仓 {security} 方向 {direction} 线标 {line_label}') g.trade_history[security] = { 'entry_price': get_current_data()[security].last_price, 'position_value': value, 'direction': direction, 'line_label': line_label, 'finish_time': order.finish_time } return True return False def close_position(position, direction): """平仓""" security = position.security order = order_target_value_(security, 0, direction) if order is not None: if order.status == OrderStatus.held and order.filled == order.amount: # 如果成功平仓,从交易历史中移除该标的 if security in g.trade_history: del g.trade_history[security] return True return False ############################ 策略核心函数 ################################### def get_potential_future_list(context): """获取可交易的期货品种列表""" potential_night_list = ['NI', 'CF', 'PF', 'Y', 'M', 'B', 'SN', 'RM'] potential_day_list = ['JD', 'UR'] if str(context.current_dt.time())[:2] in ('21', '22'): potential_icon_list = potential_night_list else: potential_icon_list = potential_day_list + potential_night_list potential_future_list = [] for i in potential_icon_list: dominant_future = get_dominant_future(i) if dominant_future: potential_future_list.append(dominant_future) return potential_future_list def check_trend(context): """检查品类的主连是否满足形成趋势的条件""" print("-" * 20 + "检查趋势" + "-" * 20) trend_symbols = [] daily_data_info = {} # 获取可以交易的所有标的主连 potential_future_list = get_potential_future_list(context) potential_future_list = [item for item in potential_future_list if item not in g.trade_history.keys()] # 针对所有标的需要的基础数据 ma_crosses_dict = {} for symbol in potential_future_list: # 获取50天的收盘价数据 close_data = attribute_history(symbol, 50, '1d', ['close', 'high', 'low', 'open'], df=True) close_series = close_data['close'] # 计算移动平均线 ma5 = close_series.rolling(window=5).mean() ma10 = close_series.rolling(window=10).mean() ma20 = close_series.rolling(window=20).mean() ma30 = close_series.rolling(window=30).mean() trend_info = {'symbol': symbol, 'trend_lines': []} daily_data = [] # 初始化连续天数计数器 continuous_days = { 'above_ma5': [0] * g.continuous_days_length, 'below_ma5': [0] * g.continuous_days_length, 'above_ma10': [0] * g.continuous_days_length, 'below_ma10': [0] * g.continuous_days_length, 'above_ma20': [0] * g.continuous_days_length, 'below_ma20': [0] * g.continuous_days_length, 'above_ma30': [0] * g.continuous_days_length, 'below_ma30': [0] * g.continuous_days_length } # 获取连续在某一个均线上或者下的天数 for i in range(len(close_series)): update_continuous_days(continuous_days, close_series[i], ma5[i], ma10[i], ma20[i], ma30[i], g.continuous_days_length) # 收集每日数据 day_data = { 'date': close_data.index[i].date(), 'close': close_series[i], 'high': close_data['high'][i], 'low': close_data['low'][i], 'open': close_data['open'][i], 'ma5': ma5[i], 'ma10': ma10[i], 'ma20': ma20[i], 'ma30': ma30[i], 'continuous_above_ma5': continuous_days['above_ma5'].copy(), 'continuous_below_ma5': continuous_days['below_ma5'].copy(), 'continuous_above_ma10': continuous_days['above_ma10'].copy(), 'continuous_below_ma10': continuous_days['below_ma10'].copy(), 'continuous_above_ma20': continuous_days['above_ma20'].copy(), 'continuous_below_ma20': continuous_days['below_ma20'].copy(), 'continuous_above_ma30': continuous_days['above_ma30'].copy(), 'continuous_below_ma30': continuous_days['below_ma30'].copy() } daily_data.append(day_data) daily_data_info[symbol] = daily_data # 检查过去一定天数内均线的相交次数 ma_crosses_dict[symbol] = count_ma_crosses(daily_data_info[symbol], g.change_direction_days) # 检查哪些均线会形成什么类型的趋势 for ma_type in ['ma5', 'ma10', 'ma20', 'ma30']: above_days = continuous_days[f'above_{ma_type}'] below_days = continuous_days[f'below_{ma_type}'] above_condition = any(day >= g.default_days for day in above_days) below_condition = any(day >= g.default_days for day in below_days) if above_condition: trend_info['trend_lines'].append((ma_type, 'support')) if below_condition: trend_info['trend_lines'].append((ma_type, 'resistance')) if trend_info['trend_lines']: trend_symbols.append(trend_info) # 去除掉在一段时间内均线过于频繁交叉的对象 valid_trend_symbols = [] for trend_info in trend_symbols: symbol = trend_info["symbol"] ma_crosses = ma_crosses_dict[symbol] if ma_crosses <= 3: valid_trend_symbols.append(trend_info) return valid_trend_symbols, daily_data_info def check_shadow_cross(context, trend_symbols, daily_data_info): """检查影线穿越""" print("-" * 20 + "检查影线穿越" + "-" * 20) # 检查并进行换月 switch_result = position_auto_switch(context) if switch_result: print(f'换月结果: {switch_result}') today_crossed_symbols = {} for trend_info in trend_symbols: symbol = trend_info['symbol'] # 获取昨天的数据 yesterday_data = daily_data_info[symbol][-1] # 临时存储当前标的的穿越信息 symbol_crosses = [] for line_label, line_type in trend_info['trend_lines']: yesterday_day = yesterday_data['date'] close = yesterday_data['close'] high = yesterday_data['high'] low = yesterday_data['low'] ma_value = yesterday_data[line_label] close_ma_difference = (close - ma_value) / ma_value # 删除指定的键 yesterday_data_copy = yesterday_data.copy() keys_to_remove = ['continuous_above_ma5', 'continuous_below_ma5', 'continuous_above_ma10', 'continuous_below_ma10', 'continuous_above_ma20', 'continuous_below_ma20', 'continuous_above_ma30', 'continuous_below_ma30'] for key in keys_to_remove: yesterday_data_copy.pop(key, None) # 判断上一个交易日完整的数据中是否穿越 if (line_type == 'support' and low < ma_value and close_ma_difference <= 0.0005) or (line_type == 'resistance' and high > ma_value and close_ma_difference >= -0.0005): symbol_crosses.append({ 'date': yesterday_day, 'symbol': symbol, 'line_label': line_label, 'line_type': line_type, 'direction': 'long' if line_type == 'support' else 'short', 'latest_data': yesterday_data_copy }) print(f"{symbol} 穿越 {line_label} ({line_type})") # 从符合条件的趋势线中选择数字较大的 if symbol_crosses: best_cross = max(symbol_crosses, key=lambda x: int(''.join(filter(str.isdigit, x['line_label'])))) update_crossed_symbols_history(context, best_cross) # 针对出现趋势变化的检查均线顺序、破趋势天数、收盘价和均线的最高点关系 for symbol, records in g.crossed_symbols_history.items(): latest_record = max(records, key=lambda x: x['date']) # 检查各种条件 relation_check = check_ma_relations(context, symbol) duration_check, ma_close = check_cross_details(context, symbol) if relation_check and duration_check and ma_close: print(f'{symbol}满足所有条件,加入购买清单') today_crossed_symbols[symbol] = [latest_record] return [item for sublist in today_crossed_symbols.values() for item in sublist] def check_buy_condition(context, crossed_symbols): """检查买入条件""" print("-" * 20 + "检查买入条件" + "-" * 20) buy_symbols = [] for crossed_symbol in crossed_symbols: symbol = crossed_symbol['symbol'] line_label = crossed_symbol['line_label'] line_type = crossed_symbol['line_type'] # 获取最新数据 latest_data = crossed_symbol['latest_data'] ma_value = latest_data[line_label] close_price = latest_data['close'] today_change = (latest_data['close'] - latest_data['open']) / latest_data['open'] if line_label != "ma5": if line_type == 'support' and 1.005 * ma_value <= close_price <= 1.02 * ma_value and today_change >= -0.002: buy_symbols.append((symbol, line_label, 'support', 'long')) elif line_type == 'resistance' and 0.98 * ma_value <= close_price <= 0.995 * ma_value and today_change <= 0.002: buy_symbols.append((symbol, line_label, 'resistance', 'short')) return buy_symbols ############################ 辅助函数 ################################### def calculate_order_value(context, security, direction): """计算可以用于交易的金额""" current_price = get_current_data()[security].last_price underlying_symbol = security.split('.')[0][:-4] # 获取保证金比例 margin_rate = g.default_margin_rates.get(direction, {}).get(underlying_symbol, 0.10) # 获取合约乘数 multiplier = g.multiplier.get(underlying_symbol, 10) # 计算单手保证金 single_hand_margin = current_price * multiplier * margin_rate # 根据单手保证金决定购买手数 if single_hand_margin <= 20000: total_margin = 20000 else: total_margin = single_hand_margin return total_margin def update_continuous_days(continuous_days, close, ma5, ma10, ma20, ma30, length): """更新连续天数""" for key in ['above_ma5', 'below_ma5', 'above_ma10', 'below_ma10', 'above_ma20', 'below_ma20', 'above_ma30', 'below_ma30']: condition_met = False if ((key == 'above_ma5' and close > ma5) or (key == 'below_ma5' and close < ma5) or (key == 'above_ma10' and close > ma10) or (key == 'below_ma10' and close < ma10) or (key == 'above_ma20' and close > ma20) or (key == 'below_ma20' and close < ma20) or (key == 'above_ma30' and close > ma30) or (key == 'below_ma30' and close < ma30)): condition_met = True if condition_met: continuous_days[key].insert(0, continuous_days[key][0] + 1) else: continuous_days[key].insert(0, 0) continuous_days[key] = continuous_days[key][:length] def count_ma_crosses(future_data, days): """计算均线交叉次数""" recent_data = future_data[-days:] ma5 = [day['ma5'] for day in recent_data] ma10 = [day['ma10'] for day in recent_data] ma20 = [day['ma20'] for day in recent_data] ma30 = [day['ma30'] for day in recent_data] cross_5_10 = sum([1 for i in range(1, len(ma5)) if ((ma5[-i] > ma10[-i] and ma5[-i-1] < ma10[-i-1]) or (ma5[-i] < ma10[-i] and ma5[-i-1] > ma10[-i-1]))]) cross_5_20 = sum([1 for i in range(1, len(ma5)) if ((ma5[-i] > ma20[-i] and ma5[-i-1] < ma20[-i-1]) or (ma5[-i] < ma20[-i] and ma5[-i-1] > ma20[-i-1]))]) cross_5_30 = sum([1 for i in range(1, len(ma5)) if ((ma5[-i] > ma30[-i] and ma5[-i-1] < ma30[-i-1]) or (ma5[-i] < ma30[-i] and ma5[-i-1] > ma30[-i-1]))]) cross_10_20 = sum([1 for i in range(1, len(ma10)) if ((ma10[-i] > ma20[-i] and ma10[-i-1] < ma20[-i-1]) or (ma10[-i] < ma20[-i] and ma10[-i-1] > ma20[-i-1]))]) cross_10_30 = sum([1 for i in range(1, len(ma10)) if ((ma10[-i] > ma30[-i] and ma10[-i-1] < ma30[-i-1]) or (ma10[-i] < ma30[-i] and ma10[-i-1] > ma30[-i-1]))]) cross_20_30 = sum([1 for i in range(1, len(ma20)) if ((ma20[-i] > ma30[-i] and ma20[-i-1] < ma30[-i-1]) or (ma20[-i] < ma30[-i] and ma20[-i-1] > ma30[-i-1]))]) return cross_5_10 + cross_5_20 + cross_5_30 + cross_10_20 + cross_10_30 + cross_20_30 def update_crossed_symbols_history(context, new_record): """更新穿越历史记录""" symbol = new_record['symbol'] if symbol not in g.crossed_symbols_history: g.crossed_symbols_history[symbol] = [] g.crossed_symbols_history[symbol].append(new_record) def check_ma_relations(context, symbol): """检查MA均线之间的关系""" if symbol not in g.crossed_symbols_history: return False latest_record = g.crossed_symbols_history[symbol][-1] line_label = latest_record['line_label'].lower() line_type = latest_record['line_type'] MA_values = latest_record['latest_data'] MA5 = MA_values['ma5'] MA10 = MA_values['ma10'] MA20 = MA_values['ma20'] MA30 = MA_values['ma30'] conditions = { ('ma10', 'resistance'): MA5 > MA10 > MA20 > MA30, ('ma10', 'support'): MA30 > MA20 > MA10 > MA5, ('ma20', 'resistance'): (MA30 >= MA20 * 0.999 and MA5 >= MA10 * 0.999) or (MA30 > MA20 > MA10 > MA5), ('ma20', 'support'): (MA20 >= MA30 * 0.999 and MA10 >= MA5 * 0.999) or (MA5 > MA10 > MA20 > MA30), ('ma30', 'resistance'): (MA30 >= MA20 * 0.999 and MA5 >= MA10 * 0.999) or (MA30 > MA20 > MA10 > MA5), ('ma30', 'support'): (MA20 >= MA30 * 0.999 and MA10 >= MA5 * 0.999) or (MA5 > MA10 > MA20 > MA30), } return conditions.get((line_label, line_type), False) def check_cross_details(context, symbol): """检查穿越详情""" if symbol not in g.crossed_symbols_history: return False, False all_records = g.crossed_symbols_history[symbol] if not all_records: return False, False # 检查穿越天数 first_day = all_records[0]['date'] today = context.current_dt.date() all_days = get_trade_days(first_day, today) duration_length = len(all_days) cross_duration = duration_length <= 6 # 检查价格与均线的关系 ma_close = False for record in all_records: line_label = record['line_label'] ma_price = record['latest_data'][line_label] close = record['latest_data']['close'] ma_close_rate = abs((close - ma_price) / ma_price) if ma_close_rate <= 0.02: ma_close = True return cross_duration, ma_close def check_loss_for_close(context, position, direction, initial_loss_limit=-4000, loss_increment_per_day=200): """检查固定止损""" if position.security not in g.trade_history: return False trade_info = g.trade_history.get(position.security, {}) finish_time = trade_info.get('finish_time') # 获取持仓天数 holding_days = 0 if finish_time: finish_date = finish_time.date() current_date = context.current_dt.date() all_trade_days = get_all_trade_days() holding_days = sum((finish_date <= d <= current_date) for d in all_trade_days) # 调整损失限制 adjusted_loss_limit = initial_loss_limit + holding_days * loss_increment_per_day # 计算盈亏 multiplier = position.value / (position.total_amount * position.price) if direction == 'long': revenue = multiplier * (position.price - position.acc_avg_cost) * position.total_amount else: revenue = multiplier * (position.acc_avg_cost - position.price) * position.total_amount if revenue < adjusted_loss_limit: close_position(position, g.trade_history[position.security]['direction']) return True return False def check_ma_for_close(context, position, offset_ratio, days_for_adjustment): """根据均线止损""" security = position.security if security not in g.trade_history: return False trade_info = g.trade_history.get(position.security, {}) finish_time = trade_info.get('finish_time') line_label = trade_info.get('line_label') # 获取持仓天数 holding_days = 0 if finish_time: finish_date = finish_time.date() current_date = context.current_dt.date() all_trade_days = get_all_trade_days() holding_days = sum((finish_date <= d <= current_date) for d in all_trade_days) # 计算变化率 today_price = get_current_data()[position.security].last_price avg_daily_change_rate = calculate_average_daily_change_rate(position.security) historical_data = attribute_history(position.security, 1, '1d', ['close']) yesterday_close = historical_data['close'].iloc[-1] today_change_rate = abs((today_price - yesterday_close) / yesterday_close) # 选择止损均线 close_line = None if today_change_rate >= 1.5 * avg_daily_change_rate: close_line = 'ma5' elif holding_days <= days_for_adjustment: close_line = line_label else: close_line = 'ma5' if today_change_rate >= 1.2 * avg_daily_change_rate else 'ma10' # 计算MA值并进行平仓判断 ma_values = calculate_ma_values(position.security, [5, 10, 20, 30]) adjusted_ma_value = ma_values[close_line] * (1 + offset_ratio if position.side == 'short' else 1 - offset_ratio) if (position.side == 'long' and today_price < adjusted_ma_value) or \ (position.side == 'short' and today_price > adjusted_ma_value): close_position(position, g.trade_history[position.security]['direction']) return True return False def calculate_average_daily_change_rate(security, days=30): """计算日均变化率""" historical_data = attribute_history(security, days + 1, '1d', ['close']) daily_change_rates = abs(historical_data['close'].pct_change()).iloc[1:] return daily_change_rates.mean() def calculate_ma_values(security, ma_periods): """计算MA值""" historical_data = attribute_history(security, max(ma_periods), '1d', ['close']) today_price = get_current_data()[security].last_price close_prices = historical_data['close'].tolist() + [today_price] ma_values = {f'ma{period}': sum(close_prices[-period:]) / period for period in ma_periods} return ma_values def check_symbol_prefix_match(symbol, hold_symbols): """检查是否有相似持仓""" symbol_prefix = symbol[:-9] for hold_symbol in hold_symbols: hold_symbol_prefix = hold_symbol[:-9] if symbol_prefix == hold_symbol_prefix: return True return False def print_list_elements(title, elements): """打印列表元素""" print(f"{title}:") for item in elements: print(item) ############################ 移仓换月函数 ################################### def position_auto_switch(context, pindex=0): """期货自动移仓换月""" import re subportfolio = context.subportfolios[pindex] symbols = set(subportfolio.long_positions.keys()) | set(subportfolio.short_positions.keys()) switch_result = [] for symbol in symbols: match = re.match(r"(?P[A-Z]{1,})", symbol) if not match: continue dominant = get_dominant_future(match.groupdict()["underlying_symbol"]) if not dominant or dominant <= symbol: continue cur = get_current_data() symbol_last_price = cur[symbol].last_price dominant_last_price = cur[dominant].last_price for positions_ in (subportfolio.long_positions, subportfolio.short_positions): if symbol not in positions_.keys(): continue position = positions_[symbol] amount = position.total_amount side = position.side # 检查涨跌停限制 if side == "long": symbol_low_limit = cur[symbol].low_limit dominant_high_limit = cur[dominant].high_limit if symbol_last_price <= symbol_low_limit or dominant_last_price >= dominant_high_limit: continue else: symbol_high_limit = cur[symbol].high_limit dominant_low_limit = cur[dominant].low_limit if symbol_last_price >= symbol_high_limit or dominant_last_price <= dominant_low_limit: continue # 执行移仓换月 order_old = order_target(symbol, 0, side=side) if order_old is not None and order_old.filled > 0: order_new = order_target(dominant, amount, side=side) if order_new is not None and order_new.filled > 0: # 换月成功,更新交易记录 if symbol in g.trade_history: g.trade_history[dominant] = g.trade_history[symbol] del g.trade_history[symbol] switch_result.append({"before": symbol, "after": dominant, "side": side}) print(f"移仓换月成功: {symbol} -> {dominant}") else: # 换月失败,记录失败信息 if symbol in g.trade_history: g.change_fail_history[dominant] = g.trade_history[symbol] del g.trade_history[symbol] print(f"移仓换月失败: {symbol} -> {dominant}") return switch_result