# 克隆自聚宽文章:https://www.joinquant.com/post/33636 # 标题:etf基金溢价-改进版-高收益低回撤-速度已最优 # 作者:发锅 # 核心改动就是调整了成交量检查标准变为过去10天最小值都要满足条件 # 本策略网址:https://www.joinquant.com/algorithm/live/index?backtestId=73d74ffd00f110ba66a454c11f10de93 # 导入函数库 from jqdata import * from jqlib.technical_analysis import * import numpy as np import pandas as pd import statsmodels.api as sm import datetime as dt import time # 初始化函数,设定基准等等 def initialize(context): # start_check_time = time.time() # 设定沪深300作为基准 set_benchmark('000300.XSHG') # 开启异步报单 set_option('async_order', True) # 开启动态复权模式(真实价格) set_option('use_real_price', True) # 是否未来函数 set_option("avoid_future_data", True) # 过滤掉order系列API产生的比error级别低的log # log.set_level('order', 'error') # 初始化全局变量 g.loss_limit = 0.9 # 单基金止损比例 g.drop_limit_days = 20 # 止损卖出后多少天不重新买入 g.control_days = 0 # 初始化控制全局止损之后暂停的天数 g.total_limit_days = 30 # 检查全局止损比例的天数范围 g.total_limit_rate = 0.15 # 全局止损比例 g.cool_days = 0 # 全局止损后多少天内不持仓,必须小于g.total_limit_days g.rate_list = [] g.check_loss_list = [] g.just_sell_list = [] g.total_value_list = [] g.hold_list = [] g.holiday = ['2010-02-12','2010-04-30','2010-09-30','2011-02-01','2011-04-29','2011-09-30','2012-01-20','2012-04-27','2012-09-28','2013-02-08', '2013-04-26','2013-09-30','2014-01-30','2014-04-30','2014-09-30','2015-02-17','2015-04-30','2015-09-30','2016-02-05','2016-04-29','2016-09-30', '2017-01-26','2017-04-28','2017-09-29','2018-02-14','2018-04-27','2018-09-28','2019-02-01','2019-04-30','2019-09-30','2020-01-23','2020-04-30', '2020-09-30','2021-02-10','2021-04-30','2021-09-30','2022-01-28','2022-04-29','2022-09-30','2023-01-20','2023-04-28','2023-09-28','2024-02-09', '2024-04-30','2024-09-30','2025-01-28','2025-04-30','2025-09-30','2026-02-15','2026-04-30','2026-09-30' ] set_order_cost(OrderCost(close_tax=0.000, open_commission=0.00025, close_commission=0.00025, min_commission=0), type='fund') set_slippage(PriceRelatedSlippage(0.002),type='fund') run_daily(before_market_open, '09:20', reference_security='000300.XSHG') run_daily(market_open, '09:30', reference_security='000300.XSHG') run_daily(check_loss_up, time='14:10', reference_security='000300.XSHG') # run_daily(print_position_info, time='15:10', reference_security='000300.XSHG') # end_check_time = time.time() # elapsed_time = end_check_time - start_check_time # print(f"initialize time: {elapsed_time}") def before_market_open(context): start_check_time = time.time() # 获取基金 fund_list = get_all_securities(['lof', 'etf'], context.previous_date).index.tolist() g.length1 = len(fund_list) # 过滤太新的基金 fund_list = filter_new_fund(context,fund_list) # 嘉实元和事件,所以在2019年5月之后不再买入 if context.current_dt.date() >= np.datetime64('2019-05-01') and ('505888.XSHG' in fund_list): fund_list.remove('505888.XSHG') print('remove 505888.XSHG') # 成交额过滤 df = history(count=10, unit='1d', field="money", security_list=fund_list).T # 整体花费时间少于0.16秒 df['min_money_10d'] = df.min(axis=1) # cur_total_value = context.portfolio.total_value # money_threshold = 2 * cur_total_value # print(f"cur_total_value: {cur_total_value}") money_threshold = 500000 # 固定值 df = df[df['min_money_10d'] > money_threshold] # 因为最大持仓是5种,那么资金就是5等分,那么只要是总金额的2倍,实际就是10倍 # 新的获取净值的方式,不是通过"get_extras"而是通过query和get_all_securities的方法 future_list = df.index.tolist() # 不需要去重,没有重复 temp_future_list = [item[:6] for item in future_list] # 和codes = list(set(temp_future_list))一致,也不存在重复 codes = list(set(temp_future_list)) # 获取当前日期的前10个交易日(包含当前日期) latest_trade_day = context.previous_date latest_trade_day = latest_trade_day.strftime('%Y-%m-%d') # 获取基金净值,目前除了159001以外,用for循环和in_的方式获得的数量一致 # 避免一次性获取数据量太大,进行分割查询 batch_size = 400 all_temp_dfs = [] # 按400个一组分割 temp_future_list 并逐组查询 for i in range(0, len(temp_future_list), batch_size): batch = temp_future_list[i:i + batch_size] temp_batch_df = finance.run_query( query(finance.FUND_NET_VALUE) .filter(finance.FUND_NET_VALUE.code.in_(batch)) .filter(finance.FUND_NET_VALUE.day==latest_trade_day) .order_by(finance.FUND_NET_VALUE.day.desc()) ) all_temp_dfs.append(temp_batch_df) # 合并所有批次的结果 temp_df = pd.concat(all_temp_dfs, ignore_index=True) # 获取所有基金的完整代码列表,并转为 DataFrame 便于匹配 all_funds = get_all_securities(['fund']) all_funds = all_funds.reset_index() # 重置索引,使 'index' 列成为 DataFrame 列 all_funds['code_prefix'] = all_funds['index'].str[:6] # 提取前6位数字 # 获取 temp_df 并提取 code 的前6位数字,方便匹配 temp_df['code_prefix'] = temp_df['code'].str[:6] # 将 temp_df 中的 code_prefix 匹配到 all_funds 中的 code_prefix,以获取完整的 code merged_df = pd.merge(temp_df, all_funds[['index', 'code_prefix']], left_on='code_prefix', right_on='code_prefix', how='left') # 检查是否存在未匹配的行,并打印出来 unmatched_df = merged_df[merged_df['index'].isna()] if not unmatched_df.empty: print("Unmatched entries:") print(unmatched_df[['code', 'code_prefix']]) # 使用完整的 code 作为索引,并重命名列为 'unit_net_value' df = merged_df.dropna(subset=['index']).set_index('index')[['net_value']].rename(columns={'net_value': 'unit_net_value'}) g.fund_list = df # 基金和净值的df # print(f"check g.fund_list structure: {type(g.fund_list)}, data: {g.fund_list}") log.info('开盘前记录净值...') end_check_time = time.time() elapsed_time = end_check_time - start_check_time print(f"before_market_open time: {elapsed_time}") def market_open(context): start_check_time = time.time() df = g.fund_list length2 = len(df) current = get_current_data() fund_list = df.index.tolist() ## 获得基金最新价 try: df['last_price'] = [current[c].last_price for c in fund_list] except Exception as e: print(f"error: {e}") # print("df: ") # print(df) ## 计算溢价 df['premium'] = (df.last_price / df.unit_net_value - 1) * 100 #最新价格小于净值的小于0 ## 根据溢价大小排序 if hasattr(df, 'sort'): # 如果有sort方法就用sort,没有用sort_values df = df.sort(['premium'], ascending = True) else: df = df.sort_values(['premium'], ascending = True) df = df[(df.premium < 0)] special_rate = len(df)/g.length1 # 最新价格低于净值的占比10%以内 g.rate_list.append(special_rate) g.rate_list = g.rate_list[-10:] # 最近10天的special_rate while len(g.rate_list) < 10: g.rate_list.append(g.rate_list[0]) print(f"g.rate_list - length: {len(g.rate_list)}, mean: {mean(g.rate_list)}") if g.cool_days == 0: if (len(g.rate_list) == 10) and (mean(g.rate_list) > 0.1): # 比例过低就不执行买入卖出的操作 target_fund_list = df[:30].index.tolist() target_fund_list = [stock for stock in target_fund_list if stock not in g.just_sell_list] target_fund_list = target_fund_list[:30] g.max_position = len(target_fund_list) # 卖出 for fund in context.portfolio.positions.keys(): # 卖出不在股票池或节假日前清仓 if fund not in target_fund_list or str(context.current_dt.date()) in g.holiday: order_target_value(fund, 0) # 买入, 节假日前不开仓 if str(context.current_dt.date()) not in g.holiday: # for fund in target_fund_list: # now_position = g.max_position - len(context.portfolio.positions) # if now_position == 0: # continue # if fund not in context.portfolio.positions.keys(): # position = context.portfolio.available_cash / now_position # order_target_value(fund, position) # 计算每个标的的10日平均成交额 money_df = history(count=10, unit='1d', field="money", security_list=target_fund_list).T money_df['avg_money'] = money_df.min(axis=1) # 过去10天成交额的最小值 # 计算单个持仓的标准金额 standard_position = context.portfolio.available_cash / g.max_position max_position_limit = standard_position * 3 # 单个标的最大持仓限制 print(f"max_position_limit: {max_position_limit}") for fund in target_fund_list: if fund not in context.portfolio.positions.keys(): # 计算该标的的目标买入金额 min_buy_amount = money_df.loc[fund, 'avg_money'] / 5 # 最小买入金额 target_amount = min(max_position_limit, max(min_buy_amount, standard_position)) print(f"{fund} min_buy_amount: {min_buy_amount} and target_amount: {target_amount}") # 确保不超过可用资金 if target_amount <= context.portfolio.available_cash: order_target_value(fund, target_amount) elif (len(g.rate_list) == 10) and (mean(g.rate_list) <= 0.1): if g.hold_list: clear_position(context) else: g.cool_days -= 1 # 更新持有的基金池 g.hold_list= [] for position in list(context.portfolio.positions.values()): fund = position.security g.hold_list.append(fund) end_check_time = time.time() elapsed_time = end_check_time - start_check_time print(f"market_open time: {elapsed_time}") ## 收盘后运行函数 def after_market_close(context): pass # 1-6 调整亏损比例过大的股票 def check_loss_up(context): start_check_time = time.time() if g.hold_list: check_loss_list = [] for stock in g.hold_list: position = context.portfolio.positions[stock] price = position.price avg_cost = position.avg_cost # print('check %s, price: %2f, avg_cost: %2f' % (stock, price, avg_cost)) if price < g.loss_limit * avg_cost: log.info("[%s]损失比例过高,卖出" % stock) close_position(position) check_loss_list.append(stock) if check_loss_list: g.check_loss_list.append(check_loss_list) else: g.check_loss_list.append(['nothing']) if len(g.check_loss_list) > g.drop_limit_days: g.check_loss_list = g.check_loss_list[-g.drop_limit_days:] temp_set = set() for check_loss_list in g.check_loss_list: temp_set = temp_set.union(set(check_loss_list)) # 不要购买的股票列表,过往20天因为止损而卖出的股票 g.just_sell_list = list(temp_set) check_total_value(context) end_check_time = time.time() elapsed_time = end_check_time - start_check_time print(f"check_loss_up time: {elapsed_time}") # 1-7 检查整体资金比例 def check_total_value(context): start_check_time = time.time() total_money_today = context.portfolio.total_value g.total_value_list.append(total_money_today) print('检查整体资金比例g.total_value_list: ', len(g.total_value_list)) print(g.total_value_list) if len(g.total_value_list) >= g.total_limit_days: g.total_value_list = g.total_value_list[-g.total_limit_days:] # 只考虑最近20天的跌幅来判断是否清仓 biggest_pullback = (total_money_today - max(g.total_value_list))/max(g.total_value_list) print('检查近 %d 的最大损失为 %2f' % (g.total_limit_days, biggest_pullback * 100)) if biggest_pullback < - g.total_limit_rate: # 当跌幅超过最大限制,则清空仓位 clear_position(context) if g.control_days == 0: # 设定空仓天数 print('清仓后,未修正的g.control_days为: ', g.control_days) g.control_days = g.cool_days print('清仓后,修正g.control_days为: ', g.control_days) print('持仓情况为: ', g.hold_list) print('判断标准为: ', (not g.hold_list)) if not g.hold_list: # 如果卖光了,那么调整检查全盘资金的数据量,保留10天的数据,因为检查是最近20天,暂停10天 g.total_value_list = g.total_value_list[-(g.total_limit_days-g.cool_days):] end_check_time = time.time() elapsed_time = end_check_time - start_check_time print(f"check_total_value time: {elapsed_time}") #3-1 交易模块-自定义下单 def order_target_value_(security, value): if value == 0: log.debug("Selling out %s" % (security)) else: log.debug("Order %s to value %f" % (security, value)) return order_target_value(security, value) #3-3 交易模块-平仓 def close_position(position): security = position.security order = order_target_value_(security, 0) # 可能会因停牌失败 if order != None: if order.status == OrderStatus.held and order.filled == order.amount: return True return False #3-5 交易模块 - 清仓 def clear_position(context): if context.portfolio.positions: g.cool_days = 5 # 清仓后5天不进行买入操作 log.info("==> 清仓,卖出所有股票") for stock in context.portfolio.positions.keys(): position = context.portfolio.positions[stock] close_position(position) #2-7 过滤次新股 def filter_new_fund(context,stock_list): start_check_time = time.time() yesterday = context.previous_date end_check_time = time.time() elapsed_time = end_check_time - start_check_time print(f"filter_new_fund time: {elapsed_time}") return [stock for stock in stock_list if not yesterday - get_security_info(stock).start_date < datetime.timedelta(days=5)] # 清理list里nan的模块 def clean_List_nan(List): Myarray=np.array(List) x = float('nan') for elem in Myarray: if math.isnan(x): x = 0.0 return Myarray #4-1 打印每日持仓信息 def print_position_info(context): #打印当天成交记录 trades = get_trades() for _trade in trades.values(): print('成交记录:'+str(_trade)) #打印账户信息 for position in list(context.portfolio.positions.values()): securities=position.security cost=position.avg_cost price=position.price ret=100*(price/cost-1) value=position.value amount=position.total_amount print('代码:{}'.format(securities)) print('成本价:{}'.format(format(cost,'.2f'))) print('现价:{}'.format(price)) print('收益率:{}%'.format(format(ret,'.2f'))) print('持仓(股):{}'.format(amount)) print('市值:{}'.format(format(value,'.2f'))) print('———————————————————————————————————') # 5-1 临时检查的一些函数,查获取基金净值的方法有没有漏洞 def check_fund_net_value(result_df, result_temp_df): # 1. 比较每个 code 的记录数差异 # 按 code 统计每个查询结果的记录数 result_counts_1 = result_df.groupby('code').size() result_counts_2 = result_temp_df.groupby('code').size() print(f"result_counts_1: {result_counts_1.shape}, result_counts_2: {result_counts_2.shape}") # 将 result_counts_1 和 result_counts_2 转换为 DataFrame 并进行 join 对齐 result_counts_df = pd.DataFrame({ 'count_1': result_counts_1, 'count_2': result_counts_2 }).fillna(0) # 使用 0 填充缺失值 # 找到记录数不一致的 code mismatched_codes = result_counts_df[result_counts_df['count_1'] != result_counts_df['count_2']].index.tolist() # print(f"Records mismatch for codes: {mismatched_codes}") # 找出在两种方法中缺失的 code missing_in_result_temp_df = list(set(result_counts_1.index) - set(result_counts_2.index)) missing_in_result_df = list(set(result_counts_2.index) - set(result_counts_1.index)) print(f"Codes missing in result_temp_df: {missing_in_result_temp_df}") print(f"Codes missing in result_df: {missing_in_result_df}") # 创建一个包含 'code' 和 'day' 的 DataFrame,用于标记存在情况 all_codes_days = pd.DataFrame({ 'code': pd.concat([result_df['code'], result_temp_df['code']]), 'day': pd.concat([result_df['day'], result_temp_df['day']]) }).drop_duplicates() # 使用 merge 操作标记 result_df 和 result_temp_df 中的数据存在情况 all_codes_days = all_codes_days.merge( result_temp_df[['code', 'day']], on=['code', 'day'], how='left', indicator='in_result_temp_df' ).merge( result_df[['code', 'day']], on=['code', 'day'], how='left', indicator='in_result_df' ) # 添加标志列 all_codes_days['in_result_temp_df'] = (all_codes_days['in_result_temp_df'] == 'both').astype(int) all_codes_days['in_result_df'] = (all_codes_days['in_result_df'] == 'both').astype(int) # 去掉第3和第4列都为1的数据 filtered_df = all_codes_days[~((all_codes_days['in_result_temp_df'] == 1) & (all_codes_days['in_result_df'] == 1))] # 打印结果 # print(filtered_df) for fund in missing_in_result_temp_df: print(f"check {fund}") test_df1 = finance.run_query( query(finance.FUND_NET_VALUE) .filter(finance.FUND_NET_VALUE.code == fund) .order_by(finance.FUND_NET_VALUE.day.desc()) ) temp_list2 = [fund] test_df2 = finance.run_query(query(finance.FUND_NET_VALUE).filter(finance.FUND_NET_VALUE.code.in_(temp_list2)).order_by(finance.FUND_NET_VALUE.day.desc())) test_df3 = test_df2.groupby('code').head(10).reset_index(drop=True) print(f"test_df1: {test_df1.shape}") print(f"test_df3: {test_df3.shape}") print(f"test_df2: {test_df2.shape}")