# 导入函数库 from jqdata import * from jqdata import finance import pandas as pd from datetime import date, datetime, timedelta from myTT import MA import re # 该版本重点是要实现记录下的2024-12-28以后的11个买入 # 设置以便完整打印 DataFrame pd.set_option('display.max_rows', None) # 设置显示最大行数为无限制 pd.set_option('display.max_columns', None) # 设置显示最大列数为无限制 pd.set_option('display.width', None) # 设置打印宽度为无限制 pd.set_option('display.max_colwidth', 20) # 设置列最大宽度为无限制 ## 初始化函数,设定基准等等 def initialize(context): # 设定沪深300作为基准 set_benchmark('000300.XSHG') # 开启动态复权模式(真实价格) set_option('use_real_price', True) # 过滤掉order系列API产生的比error级别低的log # log.set_level('order', 'error') # 输出内容到日志 log.info() log.info('初始函数开始运行且全局只运行一次') ### 期货相关设定 ### # 设定账户为金融账户 set_subportfolios([SubPortfolioConfig(cash=context.portfolio.starting_cash, type='index_futures')]) # 期货类每笔交易时的手续费是: 买入时万分之0.23,卖出时万分之0.23,平今仓为万分之23 set_order_cost(OrderCost(open_commission=0.000023, close_commission=0.000023,close_today_commission=0.0023), type='index_futures') # 设定保证金比例 # set_option('futures_margin_rate', 0.15) # 设置期货交易的滑点 set_slippage(StepRelatedSlippage(2)) # 运行函数(reference_security为运行时间的参考标的;传入的标的只做种类区分,因此传入'IF8888.CCFX'或'IH1602.CCFX'是一样的) # 注意: before_open/open/close/after_close等相对时间不可用于有夜盘的交易品种,有夜盘的交易品种请指定绝对时间(如9: 30) #初始化全局变量 g.usage_percentage = 0.8 # 最大持股数量 # 定义默认的保证金比例 g.default_margin_rates = { 'long': {'A': 0.07, 'AG': 0.04, 'AL': 0.05, 'AO': 0.05, 'AP': 0.08, 'AU': 0.04, 'B': 0.05, 'BC': 0.13, 'BR': 0.07, 'BU': 0.04, 'C': 0.07, 'CF': 0.05, 'CJ': 0.07, 'CS': 0.07, 'CU': 0.05, 'CY': 0.05, 'EB': 0.12, 'EC': 0.12, 'EG': 0.05, 'FG': 0.05, 'FU': 0.08, 'HC': 0.04, 'I': 0.1, 'J': 0.22, 'JD': 0.08, 'JM': 0.22, 'L': 0.07, 'LC': 0.05, 'LH':0.1, 'LR': 0.05, 'LU': 0.15, 'M': 0.07, 'MA': 0.05, 'NI': 0.05, 'NR': 0.13, 'OI': 0.05, 'P': 0.05, 'PB': 0.05, 'PF': 0.1, 'PG': 0.05, 'PK': 0.05, 'PP': 0.07, 'RB': 0.05, 'RI': 0.05, 'RM': 0.05, 'RU': 0.05, 'SA': 0.05, 'SC': 0.12, 'SF': 0.05, 'SH': 0.05, 'SI': 0.13, 'SM': 0.05, 'SN': 0.05, 'SP': 0.1, 'SR': 0.05, 'SS': 0.05, 'TA': 0.05, 'UR': 0.09, 'V': 0.07, 'Y': 0.05, 'ZC': 0.05, 'ZN': 0.05}, 'short': {'A': 0.07, 'AG': 0.04, 'AL': 0.05, 'AO': 0.05, 'AP': 0.08, 'AU': 0.04, 'B': 0.05, 'BC': 0.13, 'BR': 0.07, 'BU': 0.04, 'C': 0.07, 'CF': 0.05, 'CJ': 0.07, 'CS': 0.07, 'CU': 0.05, 'CY': 0.05, 'EB': 0.12, 'EC': 0.12, 'EG': 0.05, 'FG': 0.05, 'FU': 0.08, 'HC': 0.04, 'I': 0.1, 'J': 0.22, 'JD': 0.08, 'JM': 0.22, 'L': 0.07, 'LC': 0.05, 'LH':0.1, 'LR': 0.05, 'LU': 0.15, 'M': 0.07, 'MA': 0.05, 'NI': 0.05, 'NR': 0.13, 'OI': 0.05, 'P': 0.05, 'PB': 0.05, 'PF': 0.1, 'PG': 0.05, 'PK': 0.05, 'PP': 0.07, 'RB': 0.05, 'RI': 0.05, 'RM': 0.05, 'RU': 0.05, 'SA': 0.05, 'SC': 0.12, 'SF': 0.05, 'SH': 0.05, 'SI': 0.13, 'SM': 0.05, 'SN': 0.05, 'SP': 0.1, 'SR': 0.05, 'SS': 0.05, 'TA': 0.05, 'UR': 0.09, 'V': 0.07, 'Y': 0.05, 'ZC': 0.05, 'ZN': 0.05} } g.multiplier = { 'A': 10, 'AG': 15, 'AL': 5, 'AO': 20, 'AP': 10, 'AU': 1000, 'B': 10, 'BC': 5, 'BR': 5, 'BU': 10, 'C': 10, 'CF': 5, 'CJ': 5, 'CS': 10, 'CU': 5, 'CY': 5, 'EB': 5, 'EC': 50, 'EG': 10, 'FG': 20, 'FU': 10, 'HC': 10, 'I': 100, 'J': 60, 'JD': 5, 'JM': 100, 'L': 5, 'LC': 1, 'LH':16, 'LR': 0.05, 'LU': 10, 'M': 10, 'MA': 10, 'NI': 1, 'NR': 10, 'OI': 10, 'P': 10, 'PB': 5, 'PF': 5, 'PG': 20, 'PK': 5, 'PP': 5, 'RB': 10, 'RI': 0.05, 'RM': 10, 'RU': 10, 'SA': 20, 'SC': 1000, 'SF': 5, 'SH': 30, 'SI': 5, 'SM': 5, 'SN': 1, 'SP': 10, 'SR': 10, 'SS': 5, 'TA': 5, 'UR': 20, 'V': 5, 'Y': 10, 'ZC': 0.05, 'ZN': 5} # 假设所有期货合约的乘数为固定值,如果不同合约有不同的乘数,需要进一步调整 g.default_days = 10 # 判断趋势线的最小天数,某条K线连续这个天数大于收盘价则该K线为压力线;小于则是支撑线 g.continuous_days_length = 5 # 破趋势后的观察天数 g.change_direction_days = 5 # 检查是某个天数范围内有多少次均线穿过 g.crossed_symbols_history = {} # 用于存储过去几天的穿越信息 g.trade_history = {} # 初始化交易记录 g.change_fail_history = {} # 初始化换月建仓失败的记录 # 开盘时运行 # run_daily(market_open, time='21:01', reference_security='IF8888.CCFX') # run_daily(market_open, time='09:01', reference_security='IF8888.CCFX') g.high_low_ma_relations = {} # 临时止损检查 run_daily(loss_control, time='21:05:00', reference_security='IF8888.CCFX') run_daily(loss_control, time='21:35:00', reference_security='IF8888.CCFX') run_daily(loss_control, time='22:05:00', reference_security='IF8888.CCFX') run_daily(loss_control, time='22:35:00', reference_security='IF8888.CCFX') run_daily(loss_control, time='09:05:00', reference_security='IF8888.CCFX') run_daily(loss_control, time='09:35:00', reference_security='IF8888.CCFX') run_daily(loss_control, time='10:05:00', reference_security='IF8888.CCFX') run_daily(loss_control, time='10:35:00', reference_security='IF8888.CCFX') run_daily(loss_control, time='11:05:00', reference_security='IF8888.CCFX') run_daily(loss_control, time='13:05:00', reference_security='IF8888.CCFX') run_daily(loss_control, time='13:35:00', reference_security='IF8888.CCFX') run_daily(loss_control, time='14:05:00', reference_security='IF8888.CCFX') run_daily(loss_control, time='14:35:00', reference_security='IF8888.CCFX') # 收盘前运行 run_daily(before_market_close, time='14:55', reference_security='IF8888.CCFX') ############################ 主程序中执行函数 ################################### #0-0 开盘前运行函数 def market_open(context): # 输出运行时间 # log.info('函数运行时间(before_market_open): '+str(context.current_dt.time())) # 给微信发送消息(添加模拟交易,并绑定微信生效) # send_message('美好的一天~') # 获取当下该品种有夜盘和无夜盘主力期货合约 potential_night_list = ['AG', 'AL', 'AU', 'BU', 'CU', 'FU', 'HC', 'NI', 'PB', 'RB', 'RU', 'SN', 'ZN', 'SP', 'SS', \ 'CF', 'CY', 'FG', 'JR', 'LR', 'MA', 'PM', 'RI', 'RM', 'OI', 'SR', 'TA', 'ZC', 'WH', \ 'WT', 'SA', 'PF', 'A', 'B', 'BB', 'C', 'CS', 'I', 'J', 'JM', 'L', 'M', 'P', 'PP', \ 'V', 'Y', 'EG', 'EB', 'PG'] potential_day_list = ['SF', 'SM', 'UR', 'AP', 'CJ', 'PK', 'FB', 'JD', 'LH', 'SI', 'LC'] # potential_night_list = ['OI'] # potential_day_list = ['UR'] if str(context.current_dt.time())[:2] == '21': potential_icon_list = potential_night_list else: potential_icon_list = potential_day_list potential_future_list = [] for i in potential_icon_list: domaint_future = get_dominant_future(i) if len(domaint_future) > 0: potential_future_list.append(domaint_future) # c_data = get_current_data() target_list = [] for future in potential_future_list: y_data = attribute_history(future, 2, '1m', ['close']) yesterday_close = y_data.iloc[0,0] # c_data = attribute_history(future, 1, '1m', ['open']) today_open = y_data.iloc[-1,0] # print(future, y_data) open_change_rate = (today_open - yesterday_close)/yesterday_close if open_change_rate >= 0.03: target_list.append(future) # print('future: ', future, today_open, yesterday_close, open_change_rate) # print(today_open, yesterday_close, open_change_rate) # print(y_data) if len(target_list) >= 5: value = context.portfolio.cash * g.usage_percentage / len(target_list) for future in target_list: open_position(future, value, 'short') elif len(target_list) > 0 and len(target_list) < 5: value = context.portfolio.cash * g.usage_percentage / 5 for future in target_list: open_position(future, value, 'short') #0-1 判断是否买入函数 def before_market_close(context): print("-"*20 + "New day ending!" + "-"*20) if len(g.trade_history) > 0: print(f'in the main - g.trade_history: {g.trade_history}, g.change_fail_history: {g.change_fail_history}') # loss_control(context) trend_symbols, daily_data_info = check_trend(context) print_list_elements("in the main - trend_symbols", trend_symbols) crossed_symbols = check_shadow_cross(context, trend_symbols, daily_data_info) print_list_elements("in the main - crossed_symbols", crossed_symbols) buy_symbols = check_buy_condition(context, crossed_symbols) print_list_elements("in the main - buy_symbols", buy_symbols) subportfolio = context.subportfolios[0] hold_symbols = set(subportfolio.long_positions.keys()) | set(subportfolio.short_positions.keys()) # print(f'hold_symbols: {hold_symbols}') for symbol, line_label, line_type, direction in buy_symbols: if check_symbol_prefix_match(symbol, hold_symbols): print(f'Have similar symbol in hold_symbols: {hold_symbols} and symbol: {symbol}, stop the deal') else: print(f'No similar symbol in hold_symbols: {hold_symbols} and symbol: {symbol}, continue the deal') value_to_invest = calculate_order_value(context, symbol, direction) open_position(symbol, value_to_invest, direction, line_label) #0-2 止盈止损 def loss_control(context): print("-" * 20 + "loss control" + "-" * 20) # 先检查是否有正常的夜盘 now = context.current_dt.time() now_hour = now.hour if now_hour >= 21: test_future = get_dominant_future("A") test_data = attribute_history(test_future, 1, '1m', 'close') test_hour = test_data.index[0].hour if test_hour <= 15: print(f'最近的数据小时为: {test_hour},该夜盘不存在,直接停止') return # 先把换月买入失败的重新买入放在这里 if len(g.change_fail_history) > 0: print(f'检查换月失败的{g.change_fail_history}') for symbol in g.change_fail_history.keys(): direction = g.change_fail_history[symbol]['direction'] print(f'检查换月失败的direction: {direction}') value_to_invest = calculate_order_value(context, symbol, direction) open_position(symbol, value_to_invest, g.trade_history[symbol]["direction"], g.trade_history[symbol]["line_label"]) # 检查损失 target_time = datetime.strptime('14:55:00', '%H:%M:%S').time() potential_future_list = get_potential_future_list(context) # print(f'check g.trade_history: {g.trade_history}, potential_future_list: {potential_future_list}') # print(f'current_hour: {current_hour}') # 遍历所有持仓 long_positions_dict = context.portfolio.long_positions for position in list(long_positions_dict.values()): # print("security:{0}, position:{1}, value:{2}, time:{3},latest price:{4},cost:{5}".format(position.security, position.total_amount, position.value, position.init_time, position.price, position.acc_avg_cost)) # 检查是否达到损失限制 if position.security in potential_future_list: # print(f'for long position, {position.security} in the potential_future_list: {potential_future_list}') if check_loss_for_close(context, position, 'long'): continue # 如果已平仓,则跳过后续检查 # 检查基于动态跟踪线的平仓条件 if now > target_time: # print("Now {now} is after 14:55:00") check_ma_for_close(context, position, 0.01, 4) else: # print("Now {now} is before or equal to 14:55:00") check_ma_for_close(context, position, 0.003, 4) # print(type(context.subportfolios[0].short_positions)) short_positions_dict = context.portfolio.short_positions for position in list(short_positions_dict.values()): # print("security:{0}, position:{1}, value:{2}, time:{3},latest price:{4},cost:{5}".format(position.security, position.total_amount, position.value, position.init_time, position.price, position.acc_avg_cost)) # 检查是否达到损失限制 if position.security in potential_future_list: # print(f'for short position, {position.security} in the potential_future_list: {potential_future_list}') if check_loss_for_close(context, position, 'short'): continue # 如果已平仓,则跳过后续检查 # 检查基于动态跟踪线的平仓条件 if now >= target_time: # print("Now {now} is after 14:55:00") check_ma_for_close(context, position, 0.01, 4) else: # print("Now {now} is before or equal to 14:55:00") check_ma_for_close(context, position, 0.003, 4) #0-3 收盘后运行函数 def after_market_close(context): log.info(str('函数运行时间(after_market_close):'+str(context.current_dt.time()))) # 得到当天所有成交记录 trades = get_trades() for _trade in trades.values(): log.info('成交记录: '+str(_trade)) log.info('A day ends') log.info('##############################################################') ############################ 买卖函数 ################################### #1-0 交易模块-自定义下单 #报单成功返回报单(不代表一定会成交),否则返回None,应用于 def order_target_value_(security, value, direction): if value == 0: log.debug("Selling out %s" % (security)) else: log.debug("Order %s to value %f" % (security, value)) # 如果股票停牌,创建报单会失败,order_target_value 返回None # 如果股票涨跌停,创建报单会成功,order_target_value 返回Order,但是报单会取消 # 部成部撤的报单,聚宽状态是已撤,此时成交量>0,可通过成交量判断是否有成交 return order_target_value(security, value, side=direction) #1-1 交易模块-开仓 #买入指定价值的证券,报单成功并成交(包括全部成交或部分成交,此时成交量大于0)返回True,报单失败或者报单成功但被取消(此时成交量等于0),返回False def open_position(security, value, direction, line_label): order = order_target_value_(security, value, direction) # print(f'order: {order}') # 查看订单的信息 if order != None and order.filled > 0: print(f'Make an order of {security} with {line_label} and {direction}') g.trade_history[security] = { 'entry_price': get_current_data()[security].last_price, 'position_value': value, 'direction': direction, 'line_label': line_label, 'finish_time': order.finish_time } return True return False #1-2 交易模块-平仓 #卖出指定持仓,报单成功并全部成交返回True,报单失败或者报单成功但被取消(此时成交量等于0),或者报单非全部成交,返回False def close_position(position, direction): security = position.security order = order_target_value_(security, 0, direction) # 可能会因停牌失败 if order != None: if order.status == OrderStatus.held and order.filled == order.amount: # g.sold_future[security] = 0 # 避免连续重复买入 # 如果成功平仓,从交易历史中移除该标的 if security in g.trade_history: del g.trade_history[security] # print(f'after deal, g.trade_history: {g.trade_history}') return True return False #1-3 根据标的情况开仓 def open_position_by_margin(context, security, direction): value_to_invest = calculate_order_value(context, security, direction) order_value(security, value_to_invest, side=direction, pindex=0) ############################ 判断买卖条件的函数 ################################### #2-0 找到可以交易的目标期货标的主连 def get_potential_future_list(context): # potential_night_list = ['AG', 'AL', 'AU', 'BU', 'CU', 'FU', 'HC', 'NI', 'NR', 'PB', 'RB', 'RU', 'SN', 'ZN', 'SP', 'SS', \ # 'CF', 'CY', 'FG', 'LR', 'MA', 'PM', 'RI', 'RM', 'OI', 'SC', 'SH','SR', 'TA', 'ZC', \ # 'SA', 'PF', 'A', 'AO', 'B', 'BC', 'BR', 'C', 'CS', 'I', 'J', 'JM', 'L', 'M', 'P', 'PP', \ # 'V', 'Y', 'EG', 'EB', 'PG'] # potential_day_list = ['EC', 'SF', 'SM', 'UR', 'AP', 'CJ', 'PK', 'JD', 'LH', 'SI', 'LC'] # potential_night_list = ['RM'] # potential_day_list = [''] potential_night_list = ['NI', 'CF', 'PF', 'Y', 'M', 'B', 'SN', 'RM'] potential_day_list = ['JD', 'UR'] if str(context.current_dt.time())[:2] in ('21', '22'): potential_icon_list = potential_night_list else: potential_icon_list = potential_day_list + potential_night_list # print(f'potential_icon_list: {potential_icon_list}') potential_future_list = [] for i in potential_icon_list: dominant_future = get_dominant_future(i) # print(f'i: {i}, dominant_future: {dominant_future}') if len(dominant_future) > 0: potential_future_list.append(dominant_future) return potential_future_list #2-1 检查品类的主连是否满足形成趋势的条件 def check_trend(context): print("-" * 20 + "Check trends" + "-" * 20) trend_symbols = [] daily_data_info = {} # 用于保存每日数据信息 # 获取可以交易的所有标的主连 potential_future_list = get_potential_future_list(context) # print(f'potential_future_list: {potential_future_list}') potential_future_list = [item for item in potential_future_list if item not in g.trade_history.keys()] # 移除已有持仓的标的 # print(f'potential_future_list: {potential_future_list}') # 针对所有标的需要的基础数据 ma_crosses_dict = {} # 用于存储某个symbol过去一段时间均线交叉的数量 for symbol in potential_future_list: # 获取50天的收盘价数据 close_data = attribute_history(symbol, 50, '1d', ['close', 'high', 'low', 'open'], df=True) close_series = close_data['close'] # print(f'Check if future: {symbol} - {close_series}') # 计算5日、10日、20日和30日移动平均线 ma5 = close_series.rolling(window=5).mean() ma10 = close_series.rolling(window=10).mean() ma20 = close_series.rolling(window=20).mean() ma30 = close_series.rolling(window=30).mean() trend_info = {'symbol': symbol, 'trend_lines': []} daily_data = [] # 存储50日的数据 # 初始化连续天数计数器 continuous_days = { 'above_ma5': [0] * g.continuous_days_length, 'below_ma5': [0] * g.continuous_days_length, 'above_ma10': [0] * g.continuous_days_length, 'below_ma10': [0] * g.continuous_days_length, 'above_ma20': [0] * g.continuous_days_length, 'below_ma20': [0] * g.continuous_days_length, 'above_ma30': [0] * g.continuous_days_length, 'below_ma30': [0] * g.continuous_days_length } # 获取连续在某一个均线上或者下的天数 for i in range(len(close_series)): # 针对每一天更新连续天数计数器 symbol_prefix = symbol[:-9] if symbol_prefix == 'RM': # log.debug(f'start updating trend details for {symbol}') update_continuous_days(continuous_days, close_series[i], ma5[i], ma10[i], ma20[i], ma30[i], g.continuous_days_length, True) else: update_continuous_days(continuous_days, close_series[i], ma5[i], ma10[i], ma20[i], ma30[i], g.continuous_days_length) # 收集每日数据 day_data = { 'date': close_data.index[i].date(), 'close': close_series[i], 'high': close_data['high'][i], 'low': close_data['low'][i], 'open': close_data['open'][i], 'ma5': ma5[i], 'ma10': ma10[i], 'ma20': ma20[i], 'ma30': ma30[i], 'continuous_above_ma5': continuous_days['above_ma5'], 'continuous_below_ma5': continuous_days['below_ma5'], 'continuous_above_ma10': continuous_days['above_ma10'], 'continuous_below_ma10': continuous_days['below_ma10'], 'continuous_above_ma20': continuous_days['above_ma20'], 'continuous_below_ma20': continuous_days['below_ma20'], 'continuous_above_ma30': continuous_days['above_ma30'], 'continuous_below_ma30': continuous_days['below_ma30'] } daily_data.append(day_data) # 在处理完所有数据后检查是否有特殊的趋势变化情况出现 # adjust_for_special_condition(daily_data, g.continuous_days_length) daily_data_info[symbol] = daily_data # print(f'checking daily data for {symbol}')#: {daily_data}') # 检查过去一定天数内均线的相交次数,如果太多不会被认定为形成趋势 # print(f'calculate {symbol}') # ma cross numbers with daily_data_info: {daily_data_info[symbol]}') ma_crosses_dict[symbol] = count_ma_crosses(daily_data_info[symbol], g.change_direction_days) # 检查哪些均线会形成什么类型的趋势 for ma_type in ['ma5', 'ma10', 'ma20', 'ma30']: above_days = continuous_days[f'above_{ma_type}'] below_days = continuous_days[f'below_{ma_type}'] # print(f"Checking {ma_type} for {symbol}, above_days: {above_days}, below_days: {below_days}, g.default_days: {g.default_days}") above_condition_result = [day >= g.default_days for day in above_days] below_condition_result = [day >= g.default_days for day in below_days] # print(f"above_condition_result: {above_condition_result}, below_condition_result: {below_condition_result}") above_condition = any(above_condition_result) below_condition = any(below_condition_result) # print(f"above_condition: {above_condition}, below_condition: {below_condition}") if above_condition: # print(f"Adding {ma_type} support for {symbol}") trend_info['trend_lines'].append((ma_type, 'support')) if below_condition: # print(f"Adding {ma_type} resistance for {symbol}") trend_info['trend_lines'].append((ma_type, 'resistance')) if trend_info['trend_lines']: trend_symbols.append(trend_info) # 去除掉在一段时间内均线过于频繁交叉的对象 valid_trend_symbols = [] # print(f'before checking special circumstances, trend_symbols: {trend_symbols}') for trend_info in trend_symbols: # print(f'find trend_info: {trend_info["symbol"]}') symbol = trend_info["symbol"] ma_crosses = ma_crosses_dict[symbol] # log.debug(f'symbol: {symbol}, ma_crosses: {ma_crosses}') if ma_crosses <= 3: valid_trend_symbols.append(trend_info) # print(f'valid_trend_symbols: {valid_trend_symbols}') # 使用有效的趋势信息更新 trend_symbols trend_symbols = valid_trend_symbols # 创建一个集合来存储所有出现在 trend_info 中的标的 trend_symbols_set = set([info['symbol'] for info in trend_symbols]) log.debug(f"trend_symbols: {trend_symbols}, trend_symbols_set: {trend_symbols_set}, valid_trend_symbols: {valid_trend_symbols}") # 打印40日的数据和连续天数信息 # print(f"trend_symbols: {trend_symbols}, trend_symbols_set: {trend_symbols_set}") #, daily_data_info:{daily_data_info}") for symbol, daily_data in daily_data_info.items(): # 只打印出现在 trend_info 中的标的的数据 if symbol in trend_symbols_set: # 查找与当前 symbol 对应的 trend_info current_trend_info = next((item for item in trend_symbols if item['symbol'] == symbol), None) # 打印 symbol 和其趋势信息 trend_lines_str = ', '.join([f"{line[0]}: {line[1]}" for line in current_trend_info['trend_lines']]) if current_trend_info else "无趋势" # print(f"{symbol} 的趋势情况为: {trend_lines_str},满足条件的每日数据:") # print(f"{symbol} 的趋势情况为: ,满足条件的每日数据:") # print(f"daily_data: {daily_data[-1]}") # for data in daily_data: # print(f" 日期: {data['date']}, 收盘价: {data['close']}, MA5: {data['ma5']}, MA10: {data['ma10']}, MA20: {data['ma20']}, \ # 连续高于MA5: {data['continuous_above_ma5']}, 连续低于MA5: {data['continuous_below_ma5']}, \ # 连续高于MA10: {data['continuous_above_ma10']}, 连续低于MA10: {data['continuous_below_ma10']}, \ # 连续高于MA20: {data['continuous_above_ma20']}, 连续低于MA20: {data['continuous_below_ma20']}") # continue return valid_trend_symbols, daily_data_info #2-2 检查行程趋势的主连的趋势是否发生变化 def check_shadow_cross(context, trend_symbols, daily_data_info): print("-" * 20 + "Check shadow cross" + "-" * 20) # 1. 检查并进行换月 current_day = context.current_dt.date() # 获取当前日期字符串 # print(f'check current_day: {current_day}') switch_result = position_auto_switch(context) if switch_result: print(f'switch_result: {switch_result}, g.trade_history: {g.trade_history}, g.change_fail_history: {g.change_fail_history}') temp_crossed_symbols = {} today_crossed_symbols = {} # 1. 检查并清理过时的穿越信息 # for symbol in list(g.crossed_symbols_history.keys()): # 保留最近 g.continuous_days_length 天的数据 # print(f'crossed_symbols_history: {g.crossed_symbols_history}') # g.crossed_symbols_history[symbol] = {date: crosses for date, crosses in g.crossed_symbols_history[symbol].items() if (context.current_dt - datetime.strptime(date, '%Y-%m-%d')).days <= g.continuous_days_length} # if not g.crossed_symbols_history[symbol]: # del g.crossed_symbols_history[symbol] # 如果没有数据,删除该标的的记录 # 2. 根据当天的数据更新g.crossed_symbols_history for trend_info in trend_symbols: symbol = trend_info['symbol'] # print(f'Checking data for {symbol}') # 获取今天的最新数据 # 计算并更新今天的完整数据 today_daily_data_info = daily_data_info.copy() latest_data_today = get_latest_trading_data(symbol, context) print(f'{symbol} latest_data_today: {latest_data_today}') # 计算并更新今天的完整数据 latest_data = calculate_continuous_data(latest_data_today, today_daily_data_info, symbol, g.continuous_days_length) # 根据昨天的价格判断有没有发生突破 yesterday_data = daily_data_info[symbol][-1] print(f'find {symbol} yesterday_data: {yesterday_data}') # 临时存储当前标的的穿越信息 symbol_crosses = [] for line_label, line_type in trend_info['trend_lines']: print(f'line_label in change: {line_label}') yesterday_day = yesterday_data['date'] close = yesterday_data['close'] high = yesterday_data['high'] low = yesterday_data['low'] ma_value = yesterday_data[line_label] close_ma_difference = (close - ma_value) / ma_value # 删除指定的键 yesterday_data_copy = yesterday_data.copy() # print(f'check yesterday_data_copy before changing {yesterday_data_copy}') keys_to_remove = ['continuous_above_ma5', 'continuous_below_ma5', 'continuous_above_ma10', 'continuous_below_ma10', 'continuous_above_ma20', 'continuous_below_ma20', 'continuous_above_ma30', 'continuous_below_ma30'] for key in keys_to_remove: yesterday_data_copy.pop(key, None) # 使用pop方法删除键,如果键不存在则不做任何操作 # print(f'check yesterday_data_copy after changing {yesterday_data_copy}') # 判断上一个交易日完整的数据中是否穿越 if (line_type == 'support' and low < ma_value and close_ma_difference <= 0.0005) or (line_type == 'resistance' and high > ma_value and close_ma_difference >= -0.0005): symbol_crosses.append({ 'date': yesterday_day, 'symbol': symbol, 'line_label': line_label, 'line_type': line_type, 'direction': 'long' if line_type == 'support' else 'short', 'latest_data': yesterday_data_copy }) print(f"{symbol} crossed {line_label} ({line_type}) with ma_value: {ma_value} and close: {close} and high:{high} and low: {low} and close_ma_difference: {close_ma_difference}, latest_data: {yesterday_data_copy}") # 从符合条件的趋势线中选择数字较大的(即优先保留ma10而不是ma5) # print(f'check symbol_crosses: {symbol_crosses}') if symbol_crosses: # today_crossed_symbols.append(symbol) # 记录一下g.crossed_symbols_history已经更新过的symbol # 如果有穿越,选择数字较大的趋势 best_cross = max(symbol_crosses, key=lambda x: int(''.join(filter(str.isdigit, x['line_label'])))) temp_crossed_symbols[symbol] = best_cross # 3.1 针对没有更新过的g.crossed_symbols_history的symbol进行更新 # print(f'check temp_crossed_symbols: {temp_crossed_symbols}, g.crossed_symbols_history: {g.crossed_symbols_history}') if temp_crossed_symbols: # 首先是针对今天出现趋势突破的标的进行历史记录的更新 for symbol, best_cross in temp_crossed_symbols.items(): # print(f'更新历史symbol: {symbol}, best_cross: {best_cross}') update_crossed_symbols_history(context, best_cross) print(f'针对也在今天出现突破的{symbol}完成g.crossed_symbols_history的更新,{g.crossed_symbols_history[symbol]}') #3.2 要针对那些不在今天出现突破的标的但是还在历史趋势里的进行更新,首先将两个字典的键转换成集合 keys_temp_crossed_symbols = set(temp_crossed_symbols.keys()) keys_g_crossed_symbols_history = set(g.crossed_symbols_history.keys()) #3.2.1 找出在g.crossed_symbols_history中但不在temp_crossed_symbols中的键 difference_keys = keys_g_crossed_symbols_history - keys_temp_crossed_symbols print(f"Keys that are in g.crossed_symbols_history: {keys_g_crossed_symbols_history} but not in temp_crossed_symbols: {keys_g_crossed_symbols_history}; {difference_keys}") for symbol in difference_keys: update_record_for_existing_symbol(symbol, 'new_symbol') #3.2.2 找出在在g.crossed_symbols_history和temp_crossed_symbols中都有的key,但是temp里的line_label要小 keys_in_both = keys_temp_crossed_symbols & keys_g_crossed_symbols_history for symbol in keys_in_both: print(f'{symbol}, {keys_temp_crossed_symbols}, {keys_g_crossed_symbols_history}, temp_crossed_symbols: {temp_crossed_symbols}') temp_line_label = temp_crossed_symbols[symbol]['line_label'] g_line_label = g.crossed_symbols_history[symbol][0]['line_label'] temp_line_num = int(temp_line_label.replace('ma', '')) g_line_num = int(g_line_label.replace('ma', '')) # 检查是否需要更新 if g_line_num > temp_line_num: update_record_for_existing_symbol(symbol, 'old_symbol') #4. 针对出现趋势变化的检查均线顺序、破趋势天数、收盘价和均线的最高点关系 for symbol, records in g.crossed_symbols_history.items(): latest_record = max(records, key=lambda x: x['date']) print(f'symbol: {symbol}, records: {records}, latest_record: {latest_record}') # 检查均线得满足一定的条件,以及破趋势的天数和其中收盘价和均线的最高差别 relation_check = check_ma_relations(context, symbol, False) duration_check, ma_close = check_cross_details(context, symbol, False) # 只有均线顺序、破趋势天数、收盘价和均线的最高点关系满足条件,才会被加入购买真正的突破趋势清单 if relation_check and duration_check and ma_close: print(f'{symbol}满足均线顺序、破趋势天数、收盘价和均线的最高点关系满足条件,才会被加入购买真正的突破趋势清单这三项条件,被视为合格标的') today_crossed_symbols[symbol] = [latest_record] print(f'today_crossed_symbols: {today_crossed_symbols}') latest_records = {} # print(f'最后获取还在历史记录里的g.crossed_symbols_history: {g.crossed_symbols_history}') for symbol, records in g.crossed_symbols_history.items(): # 找到日期最大的记录 latest_record = max(records, key=lambda x: x['date']) # 将找到的记录添加到新字典中 latest_records[symbol] = [latest_record] # print(f'latest_records: {len(latest_records)} & {latest_records}, g.crossed_symbols_history: {len(g.crossed_symbols_history)} & {g.crossed_symbols_history}') return [item for sublist in today_crossed_symbols.values() for item in sublist] #2-3 检查趋势发生变化的主连是否满足买入条件 def check_buy_condition(context, crossed_symbols, shadow_body_ratio_threshold=0.5): print("-" * 20 + "Check buy condition" + "-" * 20) buy_symbols = [] update_margin_rate(context) for crossed_symbol in crossed_symbols: # print(f'crossed_symbol: {crossed_symbol}') symbol = crossed_symbol['symbol'] line_label = crossed_symbol['line_label'] line_type = crossed_symbol['line_type'] histroical_data = calculate_daily_data(context, 50, symbol) latest_data_today = histroical_data.iloc[-1] latest_data = { 'date': latest_data_today.name, # 使用索引的 name 属性获取日期 'close': latest_data_today['close'], 'high': latest_data_today['high'], 'low': latest_data_today['low'], 'open': latest_data_today['open'], 'ma5': latest_data_today['ma5'], 'ma10': latest_data_today['ma10'], 'ma20': latest_data_today['ma20'], 'ma30': latest_data_today['ma30'] } # 计算并更新今天的完整数据 # latest_data = crossed_symbol['latest_data'] print(f'check latest_data_today: {latest_data_today}, latest_data: {latest_data}') # 获取突破趋势天数,用来判断突破趋势天数和突破均线的关系 all_records = g.crossed_symbols_history[symbol] if not all_records: debug_print(f"No records found for {symbol}.") return False, False first_day = all_records[0]['date'] today = context.current_dt.date() all_days = get_trade_days(first_day, today) cross_length = len(all_days) ma_lines_in_range, ma_labels_in_range = count_ma_lines_in_range(symbol, latest_data) diff_days_lines = cross_length - ma_lines_in_range print(f'check diff_days_lines: {diff_days_lines}, cross_length: {cross_length}, latest_data: {latest_data}') # 直接使用 latest_data,无需再次更新 MA 关系 ma_value = latest_data[line_label] close_price = latest_data['close'] today_in_change = (latest_data['close'] - latest_data['open'])/latest_data['open'] print(f'symbol: {symbol}, close_price: {close_price}, 1.005: {1.005*ma_value}, 1.02: {1.02*ma_value}, 0.98: {0.98*ma_value}, 0.995: {0.995*ma_value}, today_in_change: {today_in_change}') if line_label != "ma5": if line_type == 'support' and 1.005*ma_value <= close_price <= 1.02*ma_value and today_in_change >= -0.002: buy_symbols.append((symbol, line_label, 'support', 'long')) elif line_type == 'resistance' and 0.98*ma_value <= close_price <= 0.995*ma_value and today_in_change <= 0.002: buy_symbols.append((symbol, line_label, 'resistance', 'short')) return buy_symbols #2-4 根据均线止损 def check_ma_for_close(context, position, offset_ratio, days_for_adjustment): # print("-" * 20 + "MA closing check" + "-" * 20) security = position.security if security not in g.trade_history: # print(f'security: {security} not in trade history') return False trade_info = g.trade_history.get(position.security, {}) finish_time = trade_info.get('finish_time') line_label = trade_info.get('line_label') # 获取所有交易日并计算持仓的交易日数 all_trade_days = get_all_trade_days() holding_days = 0 if finish_time: finish_date = finish_time.date() current_date = context.current_dt.date() holding_days = sum((finish_date <= d <= current_date) for d in all_trade_days) # 计算变化率 today_price = get_current_data()[position.security].last_price avg_daily_change_rate = calculate_average_daily_change_rate(position.security) historical_data = attribute_history(position.security, 1, '1d', ['close']) yesterday_close = historical_data['close'].iloc[-1] today_change_rate = abs((today_price - yesterday_close) / yesterday_close) # 检查是否符合直接使用 ma5 和 ma10 规则的条件 close_line = None if today_change_rate >= 1.5 * avg_daily_change_rate: close_line = 'ma5' elif holding_days <= days_for_adjustment: close_line = line_label else: close_line = 'ma5' if today_change_rate >= 1.2 * avg_daily_change_rate else 'ma10' # 计算 MA 值并进行平仓判断 ma_values = calculate_ma_values(position.security, [5, 10, 20, 30]) adjusted_ma_value = ma_values[close_line] * (1 + offset_ratio if position.side == 'short' else 1 - offset_ratio) # print(f'security: {position.security}, close_line: {close_line}, today_price: {today_price}, ma_price: {ma_values[close_line]}, check_point: {adjusted_ma_value}, offset_ratio: {offset_ratio}') if (position.side == 'long' and today_price < adjusted_ma_value) or \ (position.side == 'short' and today_price > adjusted_ma_value): # print(f"Closing position in {position.security} based on {close_line} with position_side: {position.side} and adjusted_ma_value: {adjusted_ma_value}") close_position(position, g.trade_history[position.security]['direction']) return True return False #2-5 根据最大止损线进行止损 def check_loss_for_close(context, position, direction, initial_loss_limit=-4000, loss_increment_per_day=200): print("-" * 20 + "Last line control" + "-" * 20) trade_info = g.trade_history.get(position.security, {}) finish_time = trade_info.get('finish_time') # 获取所有交易日并计算持仓的交易日数 all_trade_days = get_all_trade_days() holding_days = 0 if finish_time: finish_date = finish_time.date() current_date = context.current_dt.date() holding_days = sum((finish_date <= d <= current_date) for d in all_trade_days) # 调整损失限制 adjusted_loss_limit = initial_loss_limit + holding_days * loss_increment_per_day # print(f"Adjusted Loss Limit for {position.security}: {adjusted_loss_limit}") # print(f'position: {position}, direction: {direction}') if direction == 'long': current_price = position.price multiplier = position.value/(position.total_amount * position.price) revenue = multiplier * (position.price - position.acc_avg_cost) * position.total_amount # print(f"security: {position.security}, current_price: {current_price}, cost: {position.acc_avg_cost}, multiplier: {multiplier}, revenue: {revenue}") if revenue < adjusted_loss_limit: close_position(position, g.trade_history[position.security]['direction']) return True elif direction == 'short': current_price = position.price multiplier = position.value/(position.total_amount * position.price) revenue = multiplier * (position.acc_avg_cost - position.price) * position.total_amount # print(f"security: {position.security}, current_price: {current_price}, cost: {position.acc_avg_cost}, multiplier: {multiplier}, revenue: {revenue}") if revenue < adjusted_loss_limit: close_position(position, g.trade_history[position.security]['direction']) return True return False ############################ 一些辅助函数 ################################### #3-0 计算可以用于交易的金额 def calculate_order_value(context, security, direction): # 获取最新的合约价格 current_price = get_current_data()[security].last_price # 获取合约的品种(从合约代码中提取) underlying_symbol = security.split('.')[0][:-4] # 例如从 'IF1412.CCFX' 提取 'IF' # 获取保证金比例 print(f'检查保证金比例{security}, direction: {direction}') margin_rate = g.default_margin_rates.get(direction, {}).get(underlying_symbol, 0.10) # 默认值作为后备 # 获取合约乘数 multiplier = g.multiplier.get(underlying_symbol, 10) # 计算单手保证金 single_hand_margin = current_price * multiplier * margin_rate # 根据单手保证金决定购买手数 if single_hand_margin <= 20000: total_margin = 20000 else: total_margin = single_hand_margin print(f'calculate margin for {security} multiplier: {multiplier}, margin_rate: {margin_rate}, single_hand_margin: {single_hand_margin}, total_margin: {total_margin}') return total_margin #3-1 更新收盘价和4条ma均线之间的关系 def update_high_low_ma_relations(symbol, high, low, ma5, ma10, ma20, ma30): if symbol not in g.high_low_ma_relations: g.high_low_ma_relations[symbol] = [] today_relations = { 'ma5': {'low_below': low <= 0.997*ma5, 'high_above': high >= 1.003*ma5}, 'ma10': {'low_below': low <= 0.997*ma10, 'high_above': high >= 1.003*ma10}, 'ma20': {'low_below': low <= 0.997*ma20, 'high_above': high >= 1.003*ma20}, 'ma30': {'low_below': low <= 0.997*ma30, 'high_above': high >= 1.003*ma30} } # 保持最近三天的数据 if len(g.high_low_ma_relations[symbol]) >= 3: g.high_low_ma_relations[symbol].pop(0) g.high_low_ma_relations[symbol].append(today_relations) #3-2 更新收盘价是否够购买条件 def check_ma_buy_conditions(symbol, latest_data, line_label, line_type): relations = g.high_low_ma_relations.get(symbol, []) print(f'checking {symbol}, relations: {relations}, line_label: {line_label}, line_type: {line_type}') if not relations: return False # 通过line_label确定是对ma5, ma10还是ma20进行判断 ma_value = latest_data[line_label] close_price = latest_data['close'] # 检查今天的关系是否满足特定条件 for today_relations in relations: if line_type == 'support' and today_relations[line_label]['low_below'] and 1.005*ma_value <= close_price < 1.02*ma_value: return True elif line_type == 'resistance' and today_relations[line_label]['high_above'] and 0.98*ma_value < close_price <= 0.995*ma_value: return True return False #更新和均线关系连续天数的方法 def update_continuous_days(continuous_days, close, ma5, ma10, ma20, ma30, length, message=False): # log.warning(f"before update_continuous_days: {continuous_days}") # 更新连续天数逻辑 for key in ['above_ma5', 'below_ma5', 'above_ma10', 'below_ma10', 'above_ma20', 'below_ma20', 'above_ma30', 'below_ma30']: condition_met = False if ((key == 'above_ma5' and close > ma5) or (key == 'below_ma5' and close < ma5) or (key == 'above_ma10' and close > ma10) or (key == 'below_ma10' and close < ma10) or (key == 'above_ma20' and close > ma20) or (key == 'below_ma20' and close < ma20) or (key == 'above_ma30' and close > ma30) or (key == 'below_ma30' and close < ma30)): condition_met = True # 基本更新 if condition_met: continuous_days[key].insert(0, continuous_days[key][0] + 1) else: continuous_days[key].insert(0, 0) continuous_days[key] = continuous_days[key][:length] # log.warning(f"before dealing ma30: {continuous_days}") # 特殊处理针对ma30 for ma_key in ['above_ma30', 'below_ma30']: max_days = max(continuous_days[ma_key]) > 10 first_day_positive = continuous_days[ma_key][0] > 0 zeros_count = continuous_days[ma_key].count(0) <= 2 message = False if message: log.debug(f'check {ma_key}: max_days-{max_days}, first_day_positive-{first_day_positive}, zeros_count-{zeros_count}') if ma_key == 'below_ma30' and close > ma30 or ma_key == 'above_ma30' and close < ma30: max_ratio = abs((close - ma30) / ma30) # 检查是否满足特殊处理的条件 if max_days and first_day_positive and zeros_count and max_ratio <= 0.015: # log.debug(f"Meet the condition: {ma_key} for {continuous_days[ma_key]}") # 恢复继续计数的情况 if ma_key == 'below_ma30': continuous_days['below_ma30'] = [sum(x > 0 for x in continuous_days['below_ma30'][:i+1]) for i in range(length)] continuous_days['above_ma30'] = [0] * length elif ma_key == 'above_ma30': continuous_days['above_ma30'] = [sum(x > 0 for x in continuous_days['above_ma30'][:i+1]) for i in range(length)] continuous_days['below_ma30'] = [0] * length # log.warning(f"by the end of update_continuous_days: {continuous_days}") #3-4 更新crossed_symbol def create_crossed_symbol_info(symbol, line_label, line_type, latest_data): """Helper function to create crossed symbol info dictionary""" return { 'symbol': symbol, 'line_label': line_label, 'line_type': line_type, 'direction': 'long' if line_type == 'support' else 'short', 'latest_data': latest_data, 'historical_condition_tracker': { 'low_below_ma10': False, 'low_below_ma20': False, 'low_below_ma30': False, 'high_above_ma10': False, 'high_above_ma20': False, 'high_above_ma30': False, 'close_above_threshold': False, 'close_below_threshold': False } } #3-5 获取今天的交易数据 def get_latest_trading_data(symbol, context): # 获取今天日期 today = context.current_dt.date() # 获取昨天的日期和夜盘开始时间(假设从21:00开始) yesterday = today - timedelta(days=1) night_session_start_date = get_night_session_start_date(today) night_session_start = datetime.combine(night_session_start_date, datetime.strptime("21:00", "%H:%M").time()) night_session_start = pd.to_datetime(night_session_start) # print(f'night_session_start: {night_session_start}, {type(night_session_start)}') # 获取分钟级别的数据 minute_data = attribute_history(symbol, 600, '1m', ['close', 'high', 'low', 'open'], df=True) # 打印数据类型和索引的前几个值 # print("minute_data earliest index:", minute_data.index.min()) # print("minute_data latest index:", minute_data.index.max()) # 保留属于当前交易日的数据,包括夜盘 # print("Is night_session_start in minute_data index range:", # night_session_start >= minute_data.index.min() and night_session_start <= minute_data.index.max()) latest_data = minute_data[minute_data.index >= night_session_start] # print("latest_data index after filtering first few values:", latest_data.index[:5]) # print("latest_data index after filtering last few values:", latest_data.index[-5:]) # 计算截至当前时刻的 close, high, low, open, ma5, ma10, ma20 close = latest_data['close'][-1] high = latest_data['high'].max() low = latest_data['low'].min() open_price = latest_data['open'][0] # ma5 = latest_data['close'].rolling(window=5).mean()[-1] # ma10 = latest_data['close'].rolling(window=10).mean()[-1] # ma20 = latest_data['close'].rolling(window=20).mean()[-1] # print(f'symbol: {symbol}, close: {close}, high: {high}, low: {low}, open: {open_price}')#, ma5: {ma5}, ma10: {ma10}, ma20: {ma20}') return {'close': close, 'high': high, 'low': low, 'open': open_price}#, 'ma5': ma5, 'ma10': ma10, 'ma20': ma20} #3-6 获取今天的交易时间 def get_night_session_start_date(current_date): # 如果今天是周一(weekday() == 0),夜盘开始于上一个周五 if current_date.weekday() == 0: return current_date - timedelta(days=3) else: return current_date - timedelta(days=1) #3-7 将当日的数据融入之前的数据里 def calculate_continuous_data(today_data, today_daily_data_info, symbol, continuous_days_length): # 先计算今天的近似MA,获取包括今天在内的近日数据,至少需要 20 天数据来计算 MA20 # print(f'for calculate_continuous_data, today_daily_data_info: {today_daily_data_info}') recent_data = today_daily_data_info[symbol][-29:] + [today_data] # print(f'under calculate_continuous_data, recent_data: {recent_data}') # print(f'today_daily_data_info[symbol]: {today_daily_data_info[symbol]}') # print(f'original today_data: {today_data}') # 计算 MA 值 df_recent = pd.DataFrame(recent_data) # print(f'df_recent: {df_recent}') today_data['ma5'] = df_recent['close'].rolling(window=5).mean().iloc[-1] today_data['ma10'] = df_recent['close'].rolling(window=10).mean().iloc[-1] today_data['ma20'] = df_recent['close'].rolling(window=20).mean().iloc[-1] today_data['ma30'] = df_recent['close'].rolling(window=30).mean().iloc[-1] # print(f'new today_data: {today_data}') # 获取昨天的数据 last_day_data = today_daily_data_info[symbol][-1] # print(f'checking calculate_continuous_data: symbol-{symbol}') # print(f'df_recent: {df_recent.info()}') # print(f'last_day_data: {last_day_data}') # 初始化今天的连续天数数据 new_continuous_data = { 'above_ma5': last_day_data['continuous_above_ma5'].copy(), 'below_ma5': last_day_data['continuous_below_ma5'].copy(), 'above_ma10': last_day_data['continuous_above_ma10'].copy(), 'below_ma10': last_day_data['continuous_below_ma10'].copy(), 'above_ma20': last_day_data['continuous_above_ma20'].copy(), 'below_ma20': last_day_data['continuous_below_ma20'].copy(), 'above_ma30': last_day_data['continuous_above_ma30'].copy(), 'below_ma30': last_day_data['continuous_below_ma30'].copy() } # print(f'new_continuous_data: {new_continuous_data}') # 更新今天的连续天数数据 for ma_label in ['ma5', 'ma10', 'ma20', 'ma30']: # print(f"today close: {today_data['close']}, today ma_label: {today_data[ma_label]}") if today_data['close'] > today_data[ma_label]: new_continuous_data[f'above_{ma_label}'].insert(0, new_continuous_data[f'above_{ma_label}'][0] + 1) new_continuous_data[f'below_{ma_label}'].insert(0, 0) elif today_data['close'] < today_data[ma_label]: new_continuous_data[f'below_{ma_label}'].insert(0, new_continuous_data[f'below_{ma_label}'][0] + 1) new_continuous_data[f'above_{ma_label}'].insert(0, 0) else: # 如果收盘价等于MA值,则重置连续天数 new_continuous_data[f'above_{ma_label}'].insert(0, 0) new_continuous_data[f'below_{ma_label}'].insert(0, 0) # 确保长度不超过 continuous_days_length for key in new_continuous_data: if len(new_continuous_data[key]) > continuous_days_length: new_continuous_data[key] = new_continuous_data[key][:continuous_days_length] # 构建今天的完整数据并返回 today_full_data = {**today_data, **{ 'continuous_above_ma5': new_continuous_data['above_ma5'], 'continuous_below_ma5': new_continuous_data['below_ma5'], 'continuous_above_ma10': new_continuous_data['above_ma10'], 'continuous_below_ma10': new_continuous_data['below_ma10'], 'continuous_above_ma20': new_continuous_data['above_ma20'], 'continuous_below_ma20': new_continuous_data['below_ma20'], 'continuous_above_ma30': new_continuous_data['above_ma30'], 'continuous_below_ma30': new_continuous_data['below_ma30'] }} # print(f'today_full_data: {today_full_data}') return today_full_data #3-8 计算日均变化率 def calculate_average_daily_change_rate(security, days=30): historical_data = attribute_history(security, days + 1, '1d', ['close']) # 获取额外一天的数据以便计算变化率 daily_change_rates = abs(historical_data['close'].pct_change()).iloc[1:] # 排除第一条数据 avg_daily_change_rate = daily_change_rates.mean() # print(f"Average Daily Change Rate for {security}: {avg_daily_change_rate}") return avg_daily_change_rate #3-9 计算所需MA数据 def calculate_ma_values(security, ma_periods): historical_data = attribute_history(security, max(ma_periods), '1d', ['close']) today_price = get_current_data()[security].last_price close_prices = historical_data['close'].tolist() + [today_price] # print(f'check ma prices: security-{security}, close_prices-{close_prices}') ma_values = {f'ma{period}': np.mean(close_prices[-period:]) for period in ma_periods} return ma_values #3-10 判断过去一段时间不包括当天4条ma均线在过去几天有多少次交叉 def count_ma_crosses(future_data, days, if_details=False): recent_data = future_data[-days:] # print(f'future_data: {future_data}, recent_data: {recent_data}') ma5 = [day['ma5'] for day in recent_data] ma10 = [day['ma10'] for day in recent_data] ma20 = [day['ma20'] for day in recent_data] ma30 = [day['ma30'] for day in recent_data] # print(f'ma5: {ma5}, ma10: {ma10}, ma20: {ma20}, ma30: {ma30}') cross_5_10 = sum([1 for i in range(1, len(ma5)) if ((ma5[-i] > ma10[-i] and ma5[-i-1] < ma10[-i-1]) or (ma5[-i] < ma10[-i] and ma5[-i-1] > ma10[-i-1]))]) cross_5_20 = sum([1 for i in range(1, len(ma5)) if ((ma5[-i] > ma20[-i] and ma5[-i-1] < ma20[-i-1]) or (ma5[-i] < ma20[-i] and ma5[-i-1] > ma20[-i-1]))]) cross_5_30 = sum([1 for i in range(1, len(ma5)) if ((ma5[-i] > ma30[-i] and ma5[-i-1] < ma30[-i-1]) or (ma5[-i] < ma30[-i] and ma5[-i-1] > ma30[-i-1]))]) cross_10_20 = sum([1 for i in range(1, len(ma10)) if ((ma10[-i] > ma20[-i] and ma10[-i-1] < ma20[-i-1]) or (ma10[-i] < ma20[-i] and ma10[-i-1] > ma20[-i-1]))]) cross_10_30 = sum([1 for i in range(1, len(ma10)) if ((ma10[-i] > ma30[-i] and ma10[-i-1] < ma30[-i-1]) or (ma10[-i] < ma30[-i] and ma10[-i-1] > ma30[-i-1]))]) cross_20_30 = sum([1 for i in range(1, len(ma20)) if ((ma20[-i] > ma30[-i] and ma20[-i-1] < ma30[-i-1]) or (ma20[-i] < ma30[-i] and ma20[-i-1] > ma30[-i-1]))]) crosses = cross_5_10 + cross_5_20 + cross_5_30 + cross_10_20 + cross_10_30 + cross_20_30 # log.debug(f'crosses: {crosses}, cross_5_10: {cross_5_10}, cross_5_20: {cross_5_20}, cross_5_30: {cross_5_30}, cross_10_20: {cross_10_20}, cross_10_30: {cross_10_30}, cross_20_30: {cross_20_30}') if if_details: check_ma_cross_details(recent_data, days) return crosses #3-11 查看具体均线交叉的日期和价格 def check_ma_cross_details(recent_data, days): # 初始化交叉信息列表 cross_info = [] # 遍历days天的数据 for i in range(1, days): # 检查MA线的交叉 current_date = recent_data[-i]['date'] previous_date = recent_data[-i - 1]['date'] if (ma5[-i] > ma10[-i] and ma5[-i-1] < ma10[-i-1]) or (ma5[-i] < ma10[-i] and ma5[-i-1] > ma10[-i-1]): cross_info.append({ 'Date': previous_date, # 前一天的日期 'MA5': ma5[-i], # MA5的值 'MA10': ma10[-i], # MA10的值 'Date_Next': current_date, # 当天的日期 'MA5_Next': ma5[-i-1], 'MA10_Next': ma10[-i-1], 'Cross_Type': 'MA5_MA10' }) if (ma5[-i] > ma20[-i] and ma5[-i-1] < ma20[-i-1]) or (ma5[-i] < ma20[-i] and ma5[-i-1] > ma20[-i-1]): cross_info.append({ 'Date': previous_date, # 前一天的日期 'MA5': ma5[-i], # MA5的值 'MA20': ma20[-i], # MA20的值 'Date_Next': current_date, # 当天的日期 'MA5_Next': ma5[-i-1], 'MA20_Next': ma20[-i-1], 'Cross_Type': 'MA5_MA20' }) if (ma5[-i] > ma30[-i] and ma5[-i-1] < ma30[-i-1]) or (ma5[-i] < ma30[-i] and ma5[-i-1] > ma30[-i-1]): cross_info.append({ 'Date': previous_date, # 前一天的日期 'MA5': ma5[-i], # MA5的值 'MA30': ma30[-i], # MA30的值 'Date_Next': current_date, # 当天的日期 'MA5_Next': ma5[-i-1], 'MA30_Next': ma30[-i-1], 'Cross_Type': 'MA5_MA30' }) # 检查MA10与其他MA线的交叉 if (ma10[-i] > ma20[-i] and ma10[-i-1] < ma20[-i-1]) or (ma10[-i] < ma20[-i] and ma10[-i-1] > ma20[-i-1]): cross_info.append({ 'Date': previous_date, # 前一天的日期 'MA10': ma10[-i], # MA10的值 'MA20': ma20[-i], # MA20的值 'Date_Next': current_date, # 当天的日期 'MA10_Next': ma10[-i-1], 'MA20_Next': ma20[-i-1], 'Cross_Type': 'MA10_MA20' }) if (ma10[-i] > ma30[-i] and ma10[-i-1] < ma30[-i-1]) or (ma10[-i] < ma30[-i] and ma10[-i-1] > ma30[-i-1]): cross_info.append({ 'Date': previous_date, # 前一天的日期 'MA10': ma10[-i], # MA10的值 'MA30': ma30[-i], # MA30的值 'Date_Next': current_date, # 当天的日期 'MA10_Next': ma10[-i-1], 'MA30_Next': ma30[-i-1], 'Cross_Type': 'MA10_MA30' }) if (ma20[-i] > ma30[-i] and ma20[-i-1] < ma30[-i-1]) or (ma20[-i] < ma30[-i] and ma20[-i-1] > ma30[-i-1]): cross_info.append({ 'Date': previous_date, # 前一天的日期 'MA20': ma10[-i], # MA20的值 'MA30': ma30[-i], # MA30的值 'Date_Next': current_date, # 当天的日期 'MA20_Next': ma20[-i-1], 'MA30_Next': ma30[-i-1], 'Cross_Type': 'MA10_MA30' }) # 将交叉信息转换为DataFrame cross_info_df = pd.DataFrame(cross_info) # 打印交叉信息 # print(f'crosses: {crosses}, cross_info: {cross_info}, cross_info_df: {cross_info_df}') #3-12 查看一下保证金的情况 def update_margin_rate(context): today = context.current_dt.date() margin_rate_df = finance.run_query(query(finance.FUT_MARGIN).filter(finance.FUT_MARGIN.day == '2022-12-22')) # print(f'today date: {today}, margin_rate_df: {margin_rate_df}') #3-13 检查趋势是否有特殊情况 def adjust_for_special_condition(daily_data, continuous_days_length): # 为每个均线条件检查是否满足特殊情况 print("Daily Data:", daily_data) for key in ['above_ma5', 'below_ma5', 'above_ma10', 'below_ma10', 'above_ma20', 'below_ma20', 'above_ma30', 'below_ma30']: for i in range(len(daily_data) - continuous_days_length + 1): print("Processing Date:", daily_data[i]['date']) # 提取特定范围内的数据 subset = daily_data[i:i + continuous_days_length] # 检查最大连续天数是否大于等于10 max_continuous_days = max([day[f'continuous_{key}'][0] for day in subset]) if max_continuous_days >= 10: # 检查连续天数为0的天数 days_with_zero = [day for day in subset if day[f'continuous_{key}'][0] == 0] if len(days_with_zero) <= 3: # 检查收盘价与均线价格之间的比值是否在-0.005到0.005之间 valid_ratio_days = all( -0.005 <= (day['close'] - day[key.split('_')[1]]) / day[key.split('_')[1]] <= 0.005 for day in days_with_zero ) if valid_ratio_days: # 如果满足特殊情况,根据需要调整逻辑 pass # 在这里实现具体的调整逻辑 #3-14 更新突破的列表 def update_crossed_symbols_history(context, new_record): symbol = new_record['symbol'] print(f'更新突破的历史列表,这次是{symbol}') date = new_record['date'] line_label = new_record['line_label'] line_type = new_record['line_type'] # 检查历史穿越记录里是否有相同的标的或者同一族的标的 if len(g.crossed_symbols_history) == 0: print('g.crossed_symbols_history为空') # 这里也要改,要增加该记录 g.crossed_symbols_history[symbol] = [new_record] return if symbol in g.crossed_symbols_history: pass else: similar_symbol = next((s for s in g.crossed_symbols_history if s.split('.')[0] == symbol.split('.')[0]), None) if similar_symbol: pass else: # 这里也要改,要增加该记录 print(f'g.crossed_symbols_history没有{symbol},所以要增加') g.crossed_symbols_history[symbol] = [new_record] return # 情况1:检查是否存在一模一样的symbol if symbol in g.crossed_symbols_history: existing_records = g.crossed_symbols_history[symbol] print(f'temp里现有{symbol}对应的记录: {existing_records}') for record in existing_records: print(f'开始逐条处理{symbol}的记录: {record}和最新的记录{new_record}') # 情况1.1:检查line_label和line_type是否一致 if record['line_label'] == line_label and record['line_type'] == line_type: # 情况1.1.2:检查date是否一样 if record['date'] == date: # 情况1.1.1:更新相同date的数据 print(f'情况1.1.1 查到相同标的{symbol}、趋势线{line_label} & {line_type}、日期{date}的记录{g.crossed_symbols_history[symbol]},删除日期日期相同的那一条,并添加最新的') record.update(new_record) print(f'更新之后的记录{record}') return # 情况1.1.2.1: 添加新记录 else: print(f'情况1.1.2 查到相同标的{symbol}、趋势线{line_label} & {line_type}、不同日期{date}的记录,直接添加新纪录。g.crossed_symbols_history[symbol]: {g.crossed_symbols_history[symbol]}') g.crossed_symbols_history[symbol].append(new_record) print(f'更新之后的记录{g.crossed_symbols_history[symbol]}') break else: # 情况1.1.1: 比较line_label,只保留数字较大的 print(f"情况1.2.1 查到相同标的{symbol},不同趋势线{line_label} & {line_type}和{record['line_label']} & {record['line_type']},更新所有原始记录: {g.crossed_symbols_history[symbol]},添加新纪录") max_label_record = max(existing_records, key=lambda x: int(''.join(filter(str.isdigit, x['line_label'])))) max_label_number = int(''.join(filter(str.isdigit, max_label_record['line_label']))) new_label_number = int(''.join(filter(str.isdigit, line_label))) print(f'max_label_record: {max_label_record}, max_label_number: {max_label_number}, new_label_number: {new_label_number}') if new_label_number > max_label_number: # 创建一个新的列表来保存所有记录的日期 dates_to_save = [record['date'] for record in existing_records] print(f'dates_to_save: {dates_to_save}') # 根据上述的dates_to_save更新新的趋势线的历史数据,首先天数不能超过3 if len(dates_to_save) + 1 <=3: close_data = attribute_history(symbol, 50, '1d', ['close', 'high', 'low', 'open'], df=True) close_series = close_data['close'] # print(f'Check if future: {symbol} - {close_series}') # 计算5日、10日、20日和30日移动平均线 ma5 = close_series.rolling(window=5).mean() ma10 = close_series.rolling(window=10).mean() ma20 = close_series.rolling(window=20).mean() ma30 = close_series.rolling(window=30).mean() print(f'updating data for new lines MA{new_label_number}') # 更新为数字较大的记录 # for record in existing_records: # # 替换 existing_records 中每个记录的 'line_label' 和 'line_type' # record['line_label'] = new_record['line_label'] # record['line_type'] = new_record['line_type'] existing_records.clear() print(f'去除原有{max_label_record}的记录后: {existing_records}') existing_records.append(new_record) print(f'增加最新的记录:{g.crossed_symbols_history[symbol]}') return else: # 情况1.2:检查是否有字母相同的symbol similar_symbol = next((s for s in g.crossed_symbols_history if s.split('.')[0] == symbol.split('.')[0]), None) if similar_symbol: print(f'情况1.2 有字母相同的symbol,similar_symbol: {similar_symbol}, g.crossed_symbols_history: {g.crossed_symbols_history}') # 情况1.2.2:更新为最新的symbol,并继承原始symbol的数据 g.crossed_symbols_history[symbol] = g.crossed_symbols_history.pop(similar_symbol) print(f'更新完之后g.crossed_symbols_history: {g.crossed_symbols_history}') # 递归调用以重新处理新记录 return update_crossed_symbols_history(g.crossed_symbols_history, new_record) else: # 情况1.2.1:直接添加新记录 g.crossed_symbols_history[symbol] = [new_record] print(f'没有查询到该标的{symbol}的相关记录,满足情况1.2.1,增加新纪录{new_record}') return #3-15 检查MA均线之间的关系 def check_ma_relations(context, symbol, debug=False): def debug_print(message): if debug: print(message) debug_print(f'检查均线之间的关系') relation_check = False latest_record = g.crossed_symbols_history[symbol][-1] debug_print(f'latest_record: {latest_record}') line_label = latest_record['line_label'].lower() # 确保大小写一致 line_type = latest_record['line_type'] MA_values = latest_record['latest_data'] MA5 = MA_values['ma5'] MA10 = MA_values['ma10'] MA20 = MA_values['ma20'] MA30 = MA_values['ma30'] debug_print(f'line_label: {line_label}, line_type: {line_type}, MA5: {MA5}, MA10: {MA10}, MA20: {MA20}, MA30: {MA30}') conditions = { ('ma10', 'resistance'): MA5 > MA10 > MA20 > MA30, ('ma10', 'support'): MA30 > MA20 > MA10 > MA5, ('ma20', 'resistance'): (MA30 >= MA20 * 0.999 and MA5 >= MA10 * 0.999) or (MA30 > MA20 > MA10 > MA5), ('ma20', 'support'): (MA20 >= MA30 * 0.999 and MA10 >= MA5 * 0.999) or (MA5 > MA10 > MA20 > MA30), ('ma30', 'resistance'): (MA30 >= MA20 * 0.999 and MA5 >= MA10 * 0.999) or (MA30 > MA20 > MA10 > MA5), ('ma30', 'support'): (MA20 >= MA30 * 0.999 and MA10 >= MA5 * 0.999) or (MA5 > MA10 > MA20 > MA30), } relation_check = conditions.get((line_label, line_type), False) condition_status = "满足" if relation_check else "不满足" debug_print(f'均线为{line_label} & {line_type},{condition_status}MA指定的条件,MA5: {MA5}, MA10: {MA10}, MA20: {MA20}, MA30: {MA30}') return relation_check #3-16 检查破趋势的天数和其中收盘价和均线的最高差别 def check_cross_details(context, symbol, debug=False): def debug_print(message): if debug: print(message) all_records = g.crossed_symbols_history[symbol] if not all_records: debug_print(f"No records found for {symbol}.") return False, False # 检查破趋势的天数 first_day = all_records[0]['date'] today = context.current_dt.date() all_days = get_trade_days(first_day, today) duration_length = len(all_days) cross_duration = duration_length <= 6 debug_print(f"[检查破趋势的天数] first_day: {first_day}, today: {today}, duration_length: {duration_length}, cross_duration: {cross_duration}") # 检查破趋势范围内收盘价和均价差距的情况 ma_close = False for record in all_records: line_label = record['line_label'] ma_price = record['latest_data'][line_label] close = record['latest_data']['close'] ma_close_rate = abs((close - ma_price) / ma_price) if ma_close_rate <= 0.02: ma_close = True debug_print(f"[检查破趋势内收盘价和均线的关系] record: {record}, ma_price: {ma_price}, close: {close}, ma_close_rate: {ma_close_rate}, ma_close: {ma_close}") return cross_duration, ma_close #3-17 计算开盘到收盘之间穿过几条均线 def count_ma_lines_in_range(symbol, latest_data): # 获取开盘和收盘价格 open_price = latest_data['open'] close_price = latest_data['close'] # 确定价格区间的上限和下限 lower_bound = min(open_price, close_price) upper_bound = max(open_price, close_price) # 计算哪些MA线在开盘和收盘价格之间 ma_lines_in_range = 0 ma_labels_in_range = [] for key, value in latest_data.items(): if 'ma' in key and lower_bound <= value <= upper_bound: ma_lines_in_range += 1 ma_labels_in_range.append(key) print(f"在{symbol}的{lower_bound}到{upper_bound}之间,有{ma_lines_in_range}条MA线:{', '.join(ma_labels_in_range)}") return ma_lines_in_range, ma_labels_in_range #3-18 更新不在今天出现或者今天出现但是趋势线不如历史大的记录更新最新一天的数据 def update_record_for_existing_symbol(symbol, case): # 假设 attribute_history, g.crossed_symbols_history 等都已正确定义和初始化 # 根据case参数,执行特定的处理逻辑 if case == 'difference_keys': print(f"处理差异键情况的{symbol}") # 这里是处理差异键情况的逻辑... elif case == 'keys_in_both': print(f"处理line_label差异的{symbol}") # 获取symbol的最新记录 historical_record = max(g.crossed_symbols_history[symbol], key=lambda x: x['date']) historical_record_copy = historical_record.copy() # 获取过去50天的数据 close_data = attribute_history(symbol, 50, '1d', ['close', 'high', 'low', 'open'], df=True) close_series = close_data['close'] # 计算移动平均线 ma5 = close_series.rolling(window=5).mean().iloc[-1] ma10 = close_series.rolling(window=10).mean().iloc[-1] ma20 = close_series.rolling(window=20).mean().iloc[-1] ma30 = close_series.rolling(window=30).mean().iloc[-1] yesterday_data = close_data.iloc[-1] # 更新记录 historical_record_copy['date'] = yesterday_data.name.date() # 假设'name'是一个datetime对象 historical_record_copy['latest_data'] = { 'date': yesterday_data.name.date(), 'close': yesterday_data['close'], 'high': yesterday_data['high'], 'low': yesterday_data['low'], 'open': yesterday_data['open'], 'ma5': ma5, 'ma10': ma10, 'ma20': ma20, 'ma30': ma30, } # 将更新后的记录添加到g.crossed_symbols_history[symbol] g.crossed_symbols_history[symbol].append(historical_record_copy) # print(f'根据{case}更新{symbol}的g.crossed_symbols_history[symbol]: {g.crossed_symbols_history[symbol]}, historical_record_copy: {historical_record_copy}') #3-19 检查有没有相似元素 def check_symbol_prefix_match(symbol, hold_symbols): # 去掉右侧9个字符得到symbol的左侧部分 symbol_prefix = symbol[:-9] # 遍历hold_symbols中的每个元素 for hold_symbol in hold_symbols: # 对每个元素同样去掉右侧9个字符 hold_symbol_prefix = hold_symbol[:-9] # 检查左侧部分是否一致 if symbol_prefix == hold_symbol_prefix: return True # 找到一致的情况,返回True return False # 遍历完成,没有找到一致的情况,返回False #3-20 计算过去一段时间的日内以及均线数据 def calculate_daily_data(context, day, symbol, combine_today=True): """ 计算指定股票的移动平均线并返回一个包含这些数据的DataFrame。 """ # 使用attribute_history函数获取过去50天的交易数据 historical_data = attribute_history(symbol, day, '1d', ['close', 'high', 'low', 'open'], df=True) if combine_today: # 使用get_latest_trading_data函数获取最新的交易数据 latest_data_today = get_latest_trading_data(symbol, context) print(f'check latest_data_today under buy check: {latest_data_today}') # 将最新的交易数据转换成DataFrame并追加到historical_data中 latest_data_df = pd.DataFrame([latest_data_today], index=[context.current_dt.date()]) full_data = pd.concat([historical_data, latest_data_df]) else: full_data = historical_data # 计算移动平均线并将其添加到DataFrame中 full_data['ma5'] = full_data['close'].rolling(window=5).mean() full_data['ma10'] = full_data['close'].rolling(window=10).mean() full_data['ma20'] = full_data['close'].rolling(window=20).mean() full_data['ma30'] = full_data['close'].rolling(window=30).mean() return full_data #3-21 辅助打印list def print_list_elements(title, elements): """ 打印列表中的元素,每个元素占一行。 :param title: 打印前显示的标题 :param elements: 要打印的元素列表 """ print(f"{title}:") for item in elements: print(item) ########################## 自动移仓换月函数 ################################# def position_auto_switch(context,pindex=0,switch_func=None, callback=None): """ 期货自动移仓换月。默认使用市价单进行开平仓。 :param context: 上下文对象 :param pindex: 子仓对象 :param switch_func: 用户自定义的移仓换月函数. 函数原型必须满足: func(context, pindex, previous_dominant_future_position, current_dominant_future_symbol) :param callback: 移仓换月完成后的回调函数。 函数原型必须满足: func(context, pindex, previous_dominant_future_position, current_dominant_future_symbol) :return: 发生移仓换月的标的。类型为列表。 """ import re subportfolio = context.subportfolios[pindex] symbols = set(subportfolio.long_positions.keys()) | set(subportfolio.short_positions.keys()) switch_result = [] for symbol in symbols: match = re.match(r"(?P[A-Z]{1,})", symbol) if not match: # raise ValueError("未知期货标的: {}".format(symbol)) raise ValueError("Unknow target: {}".format(symbol)) else: dominant = get_dominant_future(match.groupdict()["underlying_symbol"]) cur = get_current_data() symbol_last_price = cur[symbol].last_price dominant_last_price = cur[dominant].last_price print(f'current_hold_symbol: {symbol}, current_main_symbol: {dominant}') if dominant > symbol: for positions_ in (subportfolio.long_positions, subportfolio.short_positions): if symbol not in positions_.keys(): continue else : p = positions_[symbol] if switch_func is not None: switch_func(context, pindex, p, dominant) else: amount = p.total_amount # 跌停不能开空和平多,涨停不能开多和平空。 if p.side == "long": symbol_low_limit = cur[symbol].low_limit dominant_high_limit = cur[dominant].high_limit if symbol_last_price <= symbol_low_limit: # log.warning("标的{}跌停,无法平仓。移仓换月取消。".format(symbol)) log.warning("Can't close {} position due to the limit up. Cancelling the exchange.".format(symbol)) continue elif dominant_last_price >= dominant_high_limit: # log.warning("标的{}涨停,无法开仓。移仓换月取消。".format(symbol)) log.warning("Can't close {} position due to the limit down. Cancelling the exchange.".format(symbol)) continue else: # log.info("进行移仓换月: ({0},long) -> ({1},long)".format(symbol, dominant)) log.info("Start the exchange: ({0},long) -> ({1},long)".format(symbol, dominant)) order_old = order_target(symbol,0,side='long') if order_old != None and order_old.filled > 0: order_new = order_target(dominant,amount,side='long') if order_new != None and order_new.filled >0: switch_result.append({"before": symbol, "after":dominant, "side": "long"}) # 换月中的买卖都成功了,则增加新的记录去掉旧的记录 g.trade_history[dominant] = g.trade_history[symbol] del g.trade_history[symbol] else: # log.warning("标的{}交易失败,无法开仓。移仓换月取消。".format(domaint)) log.warning("Trade of {} failed, no new positions. Cancelling the exchange.".format(dominant)) # 换月中的买成功了,卖失败了,则在换月记录里增加新的记录,在交易记录里去掉旧的记录 print(f'换月多头失败{dominant}, {g.trade_history[symbol]}') g.change_fail_history[domaint] = g.trade_history[symbol] del g.trade_history[symbol] if callback: callback(context, pindex, p, dominant) if p.side == "short": symbol_high_limit = cur[symbol].high_limit dominant_low_limit = cur[dominant].low_limit if symbol_last_price >= symbol_high_limit: # log.warning("标的{}涨停,无法平仓。移仓换月取消。".format(symbol)) log.warning("Can't close {} position due to the limit up. Cancelling the exchange.".format(symbol)) continue elif dominant_last_price <= dominant_low_limit: # log.warning("标的{}跌停,无法开仓。移仓换月取消。".format(symbol)) log.warning("Can't close {} position due to the limit down. Cancelling the exchange.".format(symbol)) continue else: # log.info("进行移仓换月: ({0},short) -> ({1},short)".format(symbol, dominant)) log.info("Start the exchange: ({0},short) -> ({1},short)".format(symbol, dominant)) order_old = order_target(symbol,0,side='short') if order_old != None and order_old.filled > 0: print(f'换月做空{dominant},数量位{amount}') order_new = order_target(dominant,amount,side='short') if order_new != None and order_new.filled >0: switch_result.append({"before": symbol, "after":dominant, "side": "short"}) # 换月中的买卖都成功了,则增加新的记录去掉旧的记录 g.trade_history[dominant] = g.trade_history[symbol] del g.trade_history[symbol] else: # log.warning("标的{}交易失败,无法开仓。移仓换月取消。".format(dominant)) log.warning("Trade of {} failed, no new positions. Cancelling the exchange.".format(dominant)) # 换月中的买成功了,卖失败了,则在换月记录里增加新的记录,在交易记录里去掉旧的记录 print(f'换月空头失败{dominant}, {g.trade_history[symbol]}') g.change_fail_history[dominant] = g.trade_history[symbol] print(f'失败记录{g.change_fail_history}') del g.trade_history[symbol] # order_target(symbol,0,side='short') # order_target(dominant,amount,side='short') # switch_result.append({"before": symbol, "after": dominant, "side": "short"}) if callback: callback(context, pindex, p, dominant) return switch_result