| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410 |
- # 克隆自聚宽文章:https://www.joinquant.com/post/33636
- # 标题:etf基金溢价-改进版-高收益低回撤-速度已最优
- # 作者:发锅
- # 核心改动就是调整了成交量检查标准变为过去10天最小值都要满足条件
- # 本策略网址:https://www.joinquant.com/algorithm/live/index?backtestId=73d74ffd00f110ba66a454c11f10de93
- # 导入函数库
- from jqdata import *
- from jqlib.technical_analysis import *
- import numpy as np
- import pandas as pd
- import statsmodels.api as sm
- import datetime as dt
- import time
- # 初始化函数,设定基准等等
- def initialize(context):
- # start_check_time = time.time()
- # 设定沪深300作为基准
- set_benchmark('000300.XSHG')
- # 开启异步报单
- set_option('async_order', True)
- # 开启动态复权模式(真实价格)
- set_option('use_real_price', True)
- # 是否未来函数
- set_option("avoid_future_data", True)
- # 过滤掉order系列API产生的比error级别低的log
- # log.set_level('order', 'error')
- # 初始化全局变量
- g.loss_limit = 0.9 # 单基金止损比例
- g.drop_limit_days = 20 # 止损卖出后多少天不重新买入
- g.control_days = 0 # 初始化控制全局止损之后暂停的天数
- g.total_limit_days = 30 # 检查全局止损比例的天数范围
- g.total_limit_rate = 0.15 # 全局止损比例
- g.cool_days = 0 # 全局止损后多少天内不持仓,必须小于g.total_limit_days
- g.rate_list = []
- g.check_loss_list = []
- g.just_sell_list = []
- g.total_value_list = []
- g.hold_list = []
- g.holiday = ['2010-02-12','2010-04-30','2010-09-30','2011-02-01','2011-04-29','2011-09-30','2012-01-20','2012-04-27','2012-09-28','2013-02-08',
- '2013-04-26','2013-09-30','2014-01-30','2014-04-30','2014-09-30','2015-02-17','2015-04-30','2015-09-30','2016-02-05','2016-04-29','2016-09-30',
- '2017-01-26','2017-04-28','2017-09-29','2018-02-14','2018-04-27','2018-09-28','2019-02-01','2019-04-30','2019-09-30','2020-01-23','2020-04-30',
- '2020-09-30','2021-02-10','2021-04-30','2021-09-30','2022-01-28','2022-04-29','2022-09-30','2023-01-20','2023-04-28','2023-09-28','2024-02-09',
- '2024-04-30','2024-09-30','2025-01-28','2025-04-30','2025-09-30','2026-02-15','2026-04-30','2026-09-30' ]
- set_order_cost(OrderCost(close_tax=0.000, open_commission=0.00025, close_commission=0.00025, min_commission=0), type='fund')
- set_slippage(PriceRelatedSlippage(0.002),type='fund')
- run_daily(before_market_open, '09:20', reference_security='000300.XSHG')
- run_daily(market_open, '09:30', reference_security='000300.XSHG')
- run_daily(check_loss_up, time='14:10', reference_security='000300.XSHG')
- run_daily(print_position_info, time='15:10', reference_security='000300.XSHG')
- # end_check_time = time.time()
- # elapsed_time = end_check_time - start_check_time
- # print(f"initialize time: {elapsed_time}")
- def before_market_open(context):
- start_check_time = time.time()
- # 获取基金
- fund_list = get_all_securities(['lof', 'etf'], context.previous_date).index.tolist()
- g.length1 = len(fund_list)
- # 过滤太新的基金
- fund_list = filter_new_fund(context,fund_list)
- # 嘉实元和事件,所以在2019年5月之后不再买入
- if context.current_dt.date() >= np.datetime64('2019-05-01') and ('505888.XSHG' in fund_list):
- fund_list.remove('505888.XSHG')
- print('remove 505888.XSHG')
- # 成交额过滤
- df = history(count=10, unit='1d', field="money", security_list=fund_list).T # 整体花费时间少于0.16秒
- df['min_money_10d'] = df.min(axis=1)
- # cur_total_value = context.portfolio.total_value
- # money_threshold = 2 * cur_total_value
- # print(f"cur_total_value: {cur_total_value}")
- money_threshold = 500000 # 固定值
- df = df[df['min_money_10d'] > money_threshold] # 因为最大持仓是5种,那么资金就是5等分,那么只要是总金额的2倍,实际就是10倍
-
- # 新的获取净值的方式,不是通过"get_extras"而是通过query和get_all_securities的方法
- future_list = df.index.tolist() # 不需要去重,没有重复
- temp_future_list = [item[:6] for item in future_list] # 和codes = list(set(temp_future_list))一致,也不存在重复
- codes = list(set(temp_future_list))
-
- # 获取当前日期的前10个交易日(包含当前日期)
- latest_trade_day = context.previous_date
- latest_trade_day = latest_trade_day.strftime('%Y-%m-%d')
- # 获取基金净值,目前除了159001以外,用for循环和in_的方式获得的数量一致
-
- # 避免一次性获取数据量太大,进行分割查询
- batch_size = 400
- all_temp_dfs = []
-
- # 按400个一组分割 temp_future_list 并逐组查询
- for i in range(0, len(temp_future_list), batch_size):
- batch = temp_future_list[i:i + batch_size]
- temp_batch_df = finance.run_query(
- query(finance.FUND_NET_VALUE)
- .filter(finance.FUND_NET_VALUE.code.in_(batch))
- .filter(finance.FUND_NET_VALUE.day==latest_trade_day)
- .order_by(finance.FUND_NET_VALUE.day.desc())
- )
- all_temp_dfs.append(temp_batch_df)
-
- # 合并所有批次的结果
- temp_df = pd.concat(all_temp_dfs, ignore_index=True)
-
- # 获取所有基金的完整代码列表,并转为 DataFrame 便于匹配
- all_funds = get_all_securities(['fund'])
- all_funds = all_funds.reset_index() # 重置索引,使 'index' 列成为 DataFrame 列
- all_funds['code_prefix'] = all_funds['index'].str[:6] # 提取前6位数字
-
- # 获取 temp_df 并提取 code 的前6位数字,方便匹配
- temp_df['code_prefix'] = temp_df['code'].str[:6]
-
- # 将 temp_df 中的 code_prefix 匹配到 all_funds 中的 code_prefix,以获取完整的 code
- merged_df = pd.merge(temp_df, all_funds[['index', 'code_prefix']],
- left_on='code_prefix', right_on='code_prefix', how='left')
-
- # 检查是否存在未匹配的行,并打印出来
- unmatched_df = merged_df[merged_df['index'].isna()]
- if not unmatched_df.empty:
- print("Unmatched entries:")
- print(unmatched_df[['code', 'code_prefix']])
-
- # 使用完整的 code 作为索引,并重命名列为 'unit_net_value'
- df = merged_df.dropna(subset=['index']).set_index('index')[['net_value']].rename(columns={'net_value': 'unit_net_value'})
-
- g.fund_list = df # 基金和净值的df
- # print(f"check g.fund_list structure: {type(g.fund_list)}, data: {g.fund_list}")
- log.info('开盘前记录净值...')
- end_check_time = time.time()
- elapsed_time = end_check_time - start_check_time
- print(f"before_market_open time: {elapsed_time}")
- def market_open(context):
- start_check_time = time.time()
- df = g.fund_list
- length2 = len(df)
- current = get_current_data()
- fund_list = df.index.tolist()
- ## 获得基金最新价
- try:
- df['last_price'] = [current[c].last_price for c in fund_list]
- except Exception as e:
- print(f"error: {e}")
- # print("df: ")
- # print(df)
- ## 计算溢价
- df['premium'] = (df.last_price / df.unit_net_value - 1) * 100 #最新价格小于净值的小于0
- ## 根据溢价大小排序
- if hasattr(df, 'sort'): # 如果有sort方法就用sort,没有用sort_values
- df = df.sort(['premium'], ascending = True)
- else:
- df = df.sort_values(['premium'], ascending = True)
-
- df = df[(df.premium < 0)]
- special_rate = len(df)/g.length1 # 最新价格低于净值的占比10%以内
- g.rate_list.append(special_rate)
- g.rate_list = g.rate_list[-10:] # 最近10天的special_rate
- while len(g.rate_list) < 10:
- g.rate_list.append(g.rate_list[0])
-
- print(f"g.rate_list - length: {len(g.rate_list)}, mean: {mean(g.rate_list)}")
- if g.cool_days == 0:
- if (len(g.rate_list) == 10) and (mean(g.rate_list) > 0.1): # 比例过低就不执行买入卖出的操作
- target_fund_list = df[:30].index.tolist()
- target_fund_list = [stock for stock in target_fund_list if stock not in g.just_sell_list]
-
- target_fund_list = target_fund_list[:30]
- g.max_position = len(target_fund_list)
-
- # 卖出
- for fund in context.portfolio.positions.keys():
- # 卖出不在股票池或节假日前清仓
- if fund not in target_fund_list or str(context.current_dt.date()) in g.holiday:
- order_target_value(fund, 0)
-
- # 买入, 节假日前不开仓
- if str(context.current_dt.date()) not in g.holiday:
- # for fund in target_fund_list:
- # now_position = g.max_position - len(context.portfolio.positions)
- # if now_position == 0:
- # continue
- # if fund not in context.portfolio.positions.keys():
- # position = context.portfolio.available_cash / now_position
- # order_target_value(fund, position)
- # 计算每个标的的10日平均成交额
- money_df = history(count=10, unit='1d', field="money", security_list=target_fund_list).T
- money_df['avg_money'] = money_df.min(axis=1) # 过去10天成交额的最小值
-
- # 计算单个持仓的标准金额
- standard_position = context.portfolio.available_cash / g.max_position
- max_position_limit = standard_position * 3 # 单个标的最大持仓限制
- print(f"max_position_limit: {max_position_limit}")
-
- for fund in target_fund_list:
- if fund not in context.portfolio.positions.keys():
- # 计算该标的的目标买入金额
- min_buy_amount = money_df.loc[fund, 'avg_money'] / 5 # 最小买入金额
- target_amount = min(max_position_limit,
- max(min_buy_amount, standard_position))
- print(f"{fund} min_buy_amount: {min_buy_amount} and target_amount: {target_amount}")
- # 确保不超过可用资金
- if target_amount <= context.portfolio.available_cash:
- order_target_value(fund, target_amount)
-
- elif (len(g.rate_list) == 10) and (mean(g.rate_list) <= 0.1):
- if g.hold_list:
- clear_position(context)
- else:
- g.cool_days -= 1
-
- # 更新持有的基金池
- g.hold_list= []
- for position in list(context.portfolio.positions.values()):
- fund = position.security
- g.hold_list.append(fund)
-
- end_check_time = time.time()
- elapsed_time = end_check_time - start_check_time
- print(f"market_open time: {elapsed_time}")
- ## 收盘后运行函数
- def after_market_close(context):
- pass
- # 1-6 调整亏损比例过大的股票
- def check_loss_up(context):
- start_check_time = time.time()
- if g.hold_list:
- check_loss_list = []
- for stock in g.hold_list:
- position = context.portfolio.positions[stock]
- price = position.price
- avg_cost = position.avg_cost
- # print('check %s, price: %2f, avg_cost: %2f' % (stock, price, avg_cost))
- if price < g.loss_limit * avg_cost:
- log.info("[%s]损失比例过高,卖出" % stock)
- close_position(position)
- check_loss_list.append(stock)
-
- if check_loss_list:
- g.check_loss_list.append(check_loss_list)
- else:
- g.check_loss_list.append(['nothing'])
- if len(g.check_loss_list) > g.drop_limit_days:
- g.check_loss_list = g.check_loss_list[-g.drop_limit_days:]
-
- temp_set = set()
- for check_loss_list in g.check_loss_list:
- temp_set = temp_set.union(set(check_loss_list))
- # 不要购买的股票列表,过往20天因为止损而卖出的股票
- g.just_sell_list = list(temp_set)
- check_total_value(context)
-
- end_check_time = time.time()
- elapsed_time = end_check_time - start_check_time
- print(f"check_loss_up time: {elapsed_time}")
- # 1-7 检查整体资金比例
- def check_total_value(context):
- start_check_time = time.time()
- total_money_today = context.portfolio.total_value
- g.total_value_list.append(total_money_today)
- print('检查整体资金比例g.total_value_list: ', len(g.total_value_list))
- print(g.total_value_list)
- if len(g.total_value_list) >= g.total_limit_days:
- g.total_value_list = g.total_value_list[-g.total_limit_days:] # 只考虑最近20天的跌幅来判断是否清仓
- biggest_pullback = (total_money_today - max(g.total_value_list))/max(g.total_value_list)
- print('检查近 %d 的最大损失为 %2f' % (g.total_limit_days, biggest_pullback * 100))
- if biggest_pullback < - g.total_limit_rate: # 当跌幅超过最大限制,则清空仓位
- clear_position(context)
- if g.control_days == 0: # 设定空仓天数
- print('清仓后,未修正的g.control_days为: ', g.control_days)
- g.control_days = g.cool_days
- print('清仓后,修正g.control_days为: ', g.control_days)
- print('持仓情况为: ', g.hold_list)
- print('判断标准为: ', (not g.hold_list))
- if not g.hold_list: # 如果卖光了,那么调整检查全盘资金的数据量,保留10天的数据,因为检查是最近20天,暂停10天
- g.total_value_list = g.total_value_list[-(g.total_limit_days-g.cool_days):]
-
- end_check_time = time.time()
- elapsed_time = end_check_time - start_check_time
- print(f"check_total_value time: {elapsed_time}")
- #3-1 交易模块-自定义下单
- def order_target_value_(security, value):
- if value == 0:
- log.debug("Selling out %s" % (security))
- else:
- log.debug("Order %s to value %f" % (security, value))
- return order_target_value(security, value)
- #3-3 交易模块-平仓
- def close_position(position):
- security = position.security
- order = order_target_value_(security, 0) # 可能会因停牌失败
- if order != None:
- if order.status == OrderStatus.held and order.filled == order.amount:
- return True
- return False
- #3-5 交易模块 - 清仓
- def clear_position(context):
- if context.portfolio.positions:
- g.cool_days = 5 # 清仓后5天不进行买入操作
- log.info("==> 清仓,卖出所有股票")
- for stock in context.portfolio.positions.keys():
- position = context.portfolio.positions[stock]
- close_position(position)
- #2-7 过滤次新股
- def filter_new_fund(context,stock_list):
- start_check_time = time.time()
- yesterday = context.previous_date
- end_check_time = time.time()
- elapsed_time = end_check_time - start_check_time
- print(f"filter_new_fund time: {elapsed_time}")
- return [stock for stock in stock_list if not yesterday - get_security_info(stock).start_date < datetime.timedelta(days=5)]
- # 清理list里nan的模块
- def clean_List_nan(List):
- Myarray=np.array(List)
- x = float('nan')
- for elem in Myarray:
- if math.isnan(x):
- x = 0.0
- return Myarray
-
- #4-1 打印每日持仓信息
- def print_position_info(context):
- #打印当天成交记录
- trades = get_trades()
- for _trade in trades.values():
- print('成交记录:'+str(_trade))
- #打印账户信息
- for position in list(context.portfolio.positions.values()):
- securities=position.security
- cost=position.avg_cost
- price=position.price
- ret=100*(price/cost-1)
- value=position.value
- amount=position.total_amount
- print('代码:{}'.format(securities))
- print('成本价:{}'.format(format(cost,'.2f')))
- print('现价:{}'.format(price))
- print('收益率:{}%'.format(format(ret,'.2f')))
- print('持仓(股):{}'.format(amount))
- print('市值:{}'.format(format(value,'.2f')))
- print('———————————————————————————————————')
-
- # 5-1 临时检查的一些函数,查获取基金净值的方法有没有漏洞
- def check_fund_net_value(result_df, result_temp_df):
- # 1. 比较每个 code 的记录数差异
- # 按 code 统计每个查询结果的记录数
- result_counts_1 = result_df.groupby('code').size()
- result_counts_2 = result_temp_df.groupby('code').size()
- print(f"result_counts_1: {result_counts_1.shape}, result_counts_2: {result_counts_2.shape}")
-
- # 将 result_counts_1 和 result_counts_2 转换为 DataFrame 并进行 join 对齐
- result_counts_df = pd.DataFrame({
- 'count_1': result_counts_1,
- 'count_2': result_counts_2
- }).fillna(0) # 使用 0 填充缺失值
-
- # 找到记录数不一致的 code
- mismatched_codes = result_counts_df[result_counts_df['count_1'] != result_counts_df['count_2']].index.tolist()
- # print(f"Records mismatch for codes: {mismatched_codes}")
-
- # 找出在两种方法中缺失的 code
- missing_in_result_temp_df = list(set(result_counts_1.index) - set(result_counts_2.index))
- missing_in_result_df = list(set(result_counts_2.index) - set(result_counts_1.index))
- print(f"Codes missing in result_temp_df: {missing_in_result_temp_df}")
- print(f"Codes missing in result_df: {missing_in_result_df}")
-
- # 创建一个包含 'code' 和 'day' 的 DataFrame,用于标记存在情况
- all_codes_days = pd.DataFrame({
- 'code': pd.concat([result_df['code'], result_temp_df['code']]),
- 'day': pd.concat([result_df['day'], result_temp_df['day']])
- }).drop_duplicates()
-
- # 使用 merge 操作标记 result_df 和 result_temp_df 中的数据存在情况
- all_codes_days = all_codes_days.merge(
- result_temp_df[['code', 'day']], on=['code', 'day'], how='left', indicator='in_result_temp_df'
- ).merge(
- result_df[['code', 'day']], on=['code', 'day'], how='left', indicator='in_result_df'
- )
-
- # 添加标志列
- all_codes_days['in_result_temp_df'] = (all_codes_days['in_result_temp_df'] == 'both').astype(int)
- all_codes_days['in_result_df'] = (all_codes_days['in_result_df'] == 'both').astype(int)
-
- # 去掉第3和第4列都为1的数据
- filtered_df = all_codes_days[~((all_codes_days['in_result_temp_df'] == 1) & (all_codes_days['in_result_df'] == 1))]
-
- # 打印结果
- # print(filtered_df)
- for fund in missing_in_result_temp_df:
- print(f"check {fund}")
- test_df1 = finance.run_query(
- query(finance.FUND_NET_VALUE)
- .filter(finance.FUND_NET_VALUE.code == fund)
- .order_by(finance.FUND_NET_VALUE.day.desc())
- )
- temp_list2 = [fund]
- test_df2 = finance.run_query(query(finance.FUND_NET_VALUE).filter(finance.FUND_NET_VALUE.code.in_(temp_list2)).order_by(finance.FUND_NET_VALUE.day.desc()))
- test_df3 = test_df2.groupby('code').head(10).reset_index(drop=True)
- print(f"test_df1: {test_df1.shape}")
- print(f"test_df3: {test_df3.shape}")
- print(f"test_df2: {test_df2.shape}")
-
|