# 克隆自聚宽文章:https://www.joinquant.com/post/53374 # 标题:机位有限,实盘策略送人 # 作者:韶华不负 # 导入函数库 from jqdata import * from six import BytesIO from jqlib.technical_analysis import * from jqfactor import get_factor_values import numpy as np import pandas as pd import time # 初始化函数,设定基准等等 def after_code_changed(context): # 输出内容到日志 log.info() log.info('初始函数开始运行且全局只运行一次') unschedule_all() # 过滤掉order系列API产生的比error级别低的log # log.set_level('order', 'error') set_params() #1 设置策略参数 set_variables() #2 设置中间变量 set_backtest() #3 设置回测条件 disable_cache() ### 股票相关设定 ### # 股票类每笔交易时的手续费是:买入时佣金万分之三,卖出时佣金万分之三加千分之一印花税, 每笔交易佣金最低扣5块钱 set_order_cost(OrderCost(close_tax=0.001, open_commission=0.0003, close_commission=0.0003, min_commission=5), type='stock') ## 运行函数(reference_security为运行时间的参考标的;传入的标的只做种类区分,因此传入'000300.XSHG'或'510300.XSHG'是一样的) # 开盘前运行 run_daily(before_market_open, time='6:00') # 竞价时运行 run_daily(call_auction, time='09:25:15') # 开盘时运行-测试不同的买入时间 #run_daily(market_open, time='09:45') # 收盘时运行 run_daily(market_run, time='14:55') # 收盘后运行 #run_daily(after_market_close, time='20:00') # 收盘后运行 #run_daily(after_market_analysis, time='21:00') #1 设置策略参数 def set_params(): #设置全局参数 g.index ='all' #all-zz-300-500-1000,single-个股信号诊断 g.code = '000679.XSHE' g.switch = 10 #1-低位首板低开,2-首阳倒挂,10-两者合并 #池1-低板低开 #买前量价过滤 g.check_dura_1 =5 #默认是20,可以测试10-60 g.pre_rise_1 = 1.1 #前期(20天)的涨幅上限,10等同不设限 g.optimz_control = 1 #0-默认不控制,1-采用板型过滤 g.volume_control_1 = 2 #0-默认不控制,1-周期放量控制,2-周期倍量控制,3,倍量控制(相对昨日),4-放量(240-0.9)加倍量(20-5)的最佳回测叠加 g.volume_period_1 = 20 #放量控制周期,240-120-90-60 g.volume_ratio_1 = 5 #放量控制和周期最高量的比值,0.9/0.8 #竞价面 g.auction_open_highlimit_1 = 0.98 #竞价开盘上限,999-不设限 g.auction_open_lowlimit_1 = 0 #竞价开盘下限,0-不设限,1-相当于高开 g.begin_times = ' 09:24:55' g.end_times = ' 09:25:10' #池2-低板倒挂 g.check_dura_2 =20 #默认是20,可以测试10-60 g.pre_rise_2 = 1.1 #前期(20天)的涨幅上限,10等同不设限 g.strong_limit = 1.06 #D日的涨幅下限 g.trap_limit = 1.06 #D1日的高开套下限 g.filt_type = 'S' #进行二次择优,B-基本面,V-量价面,S-静态(流通盘) g.cirm_up = 45 #流通盘过滤上线,9999-不设限 g.volume_control_2 = 2 #0-默认不控制,1-周期放量控制,2-周期倍量控制,3,倍量控制(相对昨日),4-放量(240-0.9)加倍量(20-5)的最佳回测叠加 g.volume_period_2 = 20 #放量控制周期,240-120-90-60 g.volume_ratio_2 = 5 #放量控制和周期最高量的比值,0.9/0.8 #竞价面 g.auction_open_highlimit_2 = 0.98 #竞价开盘上限,999-不设限;设定低开更有效 g.auction_turn_highlimit_2 = 0 #竞价开盘下限,0-不设限,1-相当于高开 #2 设置中间变量 def set_variables(): #暂时未用,测试用全池 g.stocknum = 0 #持仓数,0-代表全取,2/4/8,因为地板倒和低板低各取1 #3 设置回测条件 def set_backtest(): ## 设定g.index作为基准 if g.index == 'all': set_benchmark('000001.XSHG') elif g.index == 'single': set_benchmark(g.code) else: set_benchmark(g.index) # 开启动态复权模式(真实价格) set_option('use_real_price', True) set_option("avoid_future_data", True) #显示所有列 pd.set_option('display.max_columns', None) #显示所有行 pd.set_option('display.max_rows', None) log.set_level('order', 'error') # 设置报错等级 ## 开盘前运行函数 def before_market_open(context): log.info('----------------------------------------------------------------') log.info('------------------------美好的一天开始了------------------------') log.info('----------------------------------------------------------------') # 输出运行时间 log.info('函数运行时间(before_market_open):'+str(context.current_dt.time())) #0,预置全局参数 today_date = context.current_dt.date() lastd_date = context.previous_date befor_date = get_trade_days(end_date=today_date, count=3)[0] all_data = get_current_data() g.pool_1 = [] g.pool_2 = [] num1,num2,num3,num4,num5,num6=0,0,0,0,0,0 #用于过程追踪 #0,构建基准指数票池,三去+去新 start_time = time.time() if g.index =='all': stocklist = list(get_all_securities(['stock']).index) #取all elif g.index == 'zz': stocklist = get_index_stocks('000300.XSHG', date = None) + get_index_stocks('000905.XSHG', date = None) + get_index_stocks('000852.XSHG', date = None) elif g.index =='single': stocklist = [g.code] #个股信号诊断 else: stocklist = get_index_stocks(g.index, date = None) num1 = len(stocklist) stocklist = [stockcode for stockcode in stocklist if not all_data[stockcode].paused] stocklist = [stockcode for stockcode in stocklist if not all_data[stockcode].is_st] stocklist = [stockcode for stockcode in stocklist if'退' not in all_data[stockcode].name] stocklist = [stockcode for stockcode in stocklist if stockcode[0:3] != '688'] stocklist = [stockcode for stockcode in stocklist if (today_date-get_security_info(stockcode).start_date).days>20] #stocklist = [stockcode for stockcode in stocklist if stockcode[0:1] == '3'] #专为3字头 num2 = len(stocklist) end_time = time.time() print('Step0,基准%s,原始%d只,四去后共%d只,构建耗时:%.1f 秒' % (g.index,num1,num2,end_time-start_time)) if g.switch ==1 or g.switch ==10: #1.1,池1过滤 start_time = time.time() list_201 = get_up_filter_jiang(context,stocklist,lastd_date,1,1,0) list_201 = get_up_filter_jiang(context,list_201,lastd_date,20,1,0) num3 = len(list_201) end_time = time.time() print('Step1.1,%d日首板共%d只,相对位置过滤共%d只,构建耗时:%.1f 秒' % (20,num3,len(list_201),end_time-start_time)) log.info(list_201) #2.1,池1优化 start_time = time.time() list_201 = get_rise_filter(context,list_201,lastd_date,g.check_dura_1,g.pre_rise_1,-1) if g.optimz_control !=0: g.pool_1 = get_up_optimize_filter(context,list_201) num4 = len(g.pool_1) else: g.pool_1 = list_201 if g.volume_control_1 !=0: g.pool_1 = get_highvolume_filter(context,g.pool_1,g.volume_control_1,g.volume_period_1,g.volume_ratio_1) else: g.pool_1 = list_201 end_time = time.time() print('Step2.1,涨幅限制%.2f共%d只,板型优化共%d只,量能控制共%d只,构建耗时:%.1f 秒' % (g.pre_rise_1,len(list_201),num4,len(g.pool_1),end_time-start_time)) log.info('地板待选:%s' % g.pool_1) if g.switch ==2 or g.switch ==10: #1.2,低位票(ROC20<1.1),D日穿线大阳(7点以上)或涨停后,D1日高开套人(开0点上下落7点以上)+不能反包D日, start_time = time.time() list_66 = get_rise_filter(context,stocklist,lastd_date,g.check_dura_2,g.pre_rise_2,-1) num3 = len(list_66) list_66 = get_rainbow_strong_filter(context,list_66,befor_date,g.strong_limit) num4 = len(list_66) list_66 = get_trap_filter(context,list_66,lastd_date,g.trap_limit) end_time = time.time() print('Step1.21,周期涨幅低于%.2f共%d只,前日%.2f强势共%d只,昨日发套%.2f共%d只,构建耗时:%.1f 秒' % (g.pre_rise_2,num3,g.strong_limit,num4,g.trap_limit,len(list_66),end_time-start_time)) log.info(list_66) if g.volume_control_2 !=0: list_66 = get_highvolume_filter(context,list_66,g.volume_control_2,g.volume_period_2,g.volume_ratio_2) end_time = time.time() print('Step1.22,量能控制共%d只,构建耗时:%.1f 秒' % (len(list_66),end_time-start_time)) log.info(list_66) #2.2,从基本面,量价面进行择优 start_time = time.time() if g.filt_type !=False: g.pool_2 = get_trap_optimize_filter(context,list_66,lastd_date,g.filt_type) end_time = time.time() print('Step2.2,%s择优共%d只,构建耗时:%.1f 秒' % (g.filt_type,len(g.pool_2),end_time-start_time)) log.info('半岛待选:%s' % g.pool_2) def call_auction(context): log.info('函数运行时间(Call_auction):'+str(context.current_dt.time())) open_list=[] current_data = get_current_data() today_date = context.current_dt.date() lastd_date = context.previous_date g.buy_list=[] if len(g.pool_2) !=0: df_trap= pd.DataFrame(columns=['code','price','open','turn']) df_auction = get_call_auction(g.pool_2,start_date=today_date,end_date=today_date,fields=['time','current','volume','money']) for i in range(len(df_auction)): stockcode = df_auction.code.values[i] price = df_auction.current.values[i] df_price = get_price(stockcode,end_date=lastd_date,frequency='daily',fields=['close','high'],count=5) open = price/df_price.close[-1] turn = price/df_price.high[-1] df_trap = df_trap.append({'code': stockcode,'price': price,'open': open,'turn': turn}, ignore_index=True) df_trap = df_trap[(df_trap.open < g.auction_open_highlimit_2)] #竞开过滤 df_trap.sort_values(by='turn', ascending=True, inplace=True) turn_list = list(df_trap.code)[:round(0.5*len(g.pool_2))] #发版 g.buy_list = turn_list if len(g.pool_1) !=0: df_auction = get_call_auction(g.pool_1,start_date=today_date,end_date=today_date,fields=['time','current','volume','money']) for i in range(len(df_auction)): stockcode = df_auction.code.values[i] price = df_auction.current.values[i] df_price = get_price(stockcode,end_date=lastd_date,frequency='daily',fields=['close'],count=5) if (price/df_price.close[-1] g.auction_open_lowlimit_1): g.buy_list.append(stockcode) if len(g.buy_list) ==0: #if len(g.buy_list) <=1: #测试 log.info('今日无买信') return else: log.info('*****今日买信共%d只*****:' % len(g.buy_list)) log.info(g.buy_list) #因信号收集屏蔽,回测打开 total_value = context.portfolio.total_value buy_cash = round(0.5*total_value/len(g.buy_list)) for stockcode in g.buy_list: if stockcode in list(context.portfolio.positions.keys()): continue buy_stock(context,stockcode,buy_cash) return ## 开盘时运行函数 def market_open(context): log.info('函数运行时间(market_open):'+str(context.current_dt.time())) today_date = context.current_dt.date() lastd_date = context.previous_date current_data = get_current_data() if len(g.buy_list) ==0: log.info('今日无买信') return else: log.info('*****今日买信共%d只*****:' % len(g.buy_list)) log.info(g.buy_list) total_value = context.portfolio.total_value buy_cash = 0.5*total_value/len(g.buy_list) for stockcode in g.buy_list: if stockcode in list(context.portfolio.positions.keys()): continue buy_stock(context,stockcode,buy_cash) ## 收盘时运行函数 def market_run(context): log.info('函数运行时间(market_close):'+str(context.current_dt.time())) today_date = context.current_dt.date() lastd_date = context.previous_date current_data = get_current_data() #尾盘只卖 hour = context.current_dt.hour minute = context.current_dt.minute for stockcode in context.portfolio.positions: if current_data[stockcode].paused == True: continue if context.portfolio.positions[stockcode].closeable_amount ==0: continue #非停出 if current_data[stockcode].last_price != current_data[stockcode].high_limit: log.info('非涨停即出%s' % stockcode) sell_stock(context,stockcode,0) continue ## 收盘后运行函数 def after_market_close(context): log.info(str('函数运行时间(after_market_close):'+str(context.current_dt.time()))) g.trade_days = g.trade_days +1 log.info(g.trade_days) log.info(g.open_num,g.open_nums_days) log.info(g.turn_num,g.turn_nums_days) """ ---------------------------------函数定义-主要策略----------------------------------------------- """ #周期涨跌幅(1/-1)过滤 def get_rise_filter(context, stocklist, check_date, check_duration,rise_level,direction): # 输出运行时间 log.info('-函数运行时间(get_rise_filter):'+str(context.current_dt.time())) poollist =[] if len(stocklist)==0: log.info("输入为空") return poollist by_date = get_trade_days(end_date=check_date, count=check_duration)[0] h = get_price(security=stocklist, end_date=check_date, frequency='1d', fields=['close','paused'], count=check_duration, panel=False) if direction == -1: temp = h.groupby('code')['close'].apply(lambda x: x.values[-1]/x.values[-check_duration] rise_level) poollist = temp[temp].index.tolist() return poollist #蒋的方法,N天M涨停过滤 def get_up_filter_jiang(context,stocklist,check_date,check_duration,up_num,direction): # 输出运行时间 log.info('-函数运行时间(get_up_filter_jiang):'+str(context.current_dt.time())) #0,预置,今天是D日 all_data = get_current_data() poollist=[] if len(stocklist)==0: log.info("输入为空") return poollist # 交易日历 trd_days = get_trade_days(end_date=check_date, count=check_duration) # array[datetime.date] s_trd_days = pd.Series(range(len(trd_days)), index=trd_days) # Series[index:交易日期,value:第几个交易日] back_date = trd_days[0] #2,形态过滤,一月内两次以上涨停(盘中过10%也算) start_time = time.time() # 取数 df_price = get_price(stocklist,end_date=check_date,frequency='1d',fields=['pre_close','open','close','high','high_limit','low_limit','paused'] ,skip_paused=False,fq='pre',count=check_duration,panel=False,fill_paused=True) # 过滤出涨停的股票,按time索引 df_up = df_price[(df_price.close == df_price.high_limit) & (df_price.paused == 0)].set_index('time') # 标注出df_up中的time对应的是第几个交易日(ith) df_up['ith'] = s_trd_days code_set = set(df_up.code.values) if direction ==1: poollist =[stockcode for stockcode in code_set if ((len(df_up[df_up.code ==stockcode]) > up_num))] elif direction ==-1: poollist =[stockcode for stockcode in code_set if ((len(df_up[df_up.code ==stockcode]) < up_num))] else: poollist =[stockcode for stockcode in code_set if ((len(df_up[df_up.code ==stockcode]) == up_num))] end_time = time.time() #log.info('---%d天(%s--%s)%d次涨停过滤出%d只标的,构建耗时:%.1f 秒' % (check_duration,back_date,check_date,up_num,len(poollist),end_time-start_time)) #log.info(poollist) return poollist #根据定义的选股条件选股 def get_rainbow_strong_filter(context,stocklist,check_date,strong_limit): log.info('-函数运行时间(get_rainbow_strong_filter):'+str(context.current_dt.time())) all_data = get_current_data() poollist=[] if len(stocklist)==0: log.info("输入为空") return poollist # 取数 df_price = get_price(stocklist,end_date=check_date,frequency='1d',fields=['pre_close','open','close','low','high','high_limit','low_limit','paused'] ,skip_paused=False,fq='pre',count=1,panel=False,fill_paused=True) df_up = df_price[(df_price.close > df_price.open) & (df_price.close == df_price.high_limit) & (df_price.paused == 0)] up_list = df_up.code.values.tolist() print('---涨停板过滤后共%d只' % len(up_list)) #排除一字板 df_strong = df_price[(df_price.close != df_price.high_limit) & (df_price.close > strong_limit*df_price.pre_close) & (df_price.paused == 0)] strong_list = df_strong.code.values.tolist() print('---大阳过滤后共%d只' % len(strong_list)) MA5 = MA(strong_list, check_date=check_date, timeperiod=5) MA10 = MA(strong_list, check_date=check_date, timeperiod=10) MA20 = MA(strong_list, check_date=check_date, timeperiod=20) strong_list = [stockcode for stockcode in strong_list if ((df_price[df_price.code == stockcode].low.values < MA5[stockcode] and df_price[df_price.code == stockcode ].close.values > MA5[stockcode]) or (df_price[df_price.code == stockcode].low.values < MA10[stockcode] and df_price[df_price.code == stockcode].close.values > MA10[ stockcode]) or (df_price[df_price.code == stockcode].low.values < MA20[stockcode] and df_price[df_price.code == stockcode].close.values > MA20[stockcode]))] print('---均线过滤后共%d只' % len(strong_list)) poollist = list(set(up_list+strong_list)) print('---合并后共%d只' % len(poollist)) return poollist #根据定义的选股条件选股 #D1日高开套人(开0点上下落7点以上)+不能反包D日 def get_trap_filter(context,stocklist,check_date,trap_limit): log.info('-函数运行时间(get_trap_filter):'+str(context.current_dt.time())) all_data = get_current_data() poollist=[] if len(stocklist)==0: log.info("输入为空") return poollist # 取数 df_price = get_price(stocklist,end_date=check_date,frequency='1d',fields=['pre_close','open','close','low','high','high_limit','low_limit','paused'] ,skip_paused=False,fq='pre',count=1,panel=False,fill_paused=True) df_trap = df_price[(df_price.open > df_price.pre_close) & (df_price.open > df_price.close) & (df_price.high/df_price.close >trap_limit) & (df_price.close/df_price.pre_close >0.93)] poollist = df_trap.code.values.tolist() return poollist #针对输入进行基本面/量价面的过滤择优 def get_trap_optimize_filter(context,stocklist,check_date,filt_type): log.info('-函数运行时间(get_trap_optimize_filter):'+str(context.current_dt.time())) all_data = get_current_data() poollist=[] if len(stocklist)==0: log.info("输入为空") return stocklist #静态,流通盘<40 if filt_type =="S": df_value = get_valuation(stocklist, end_date=check_date, count=1, fields=['circulating_market_cap']) #先新后老 df_value = df_value.dropna() #df_value.sort_values(by='circulating_market_cap', ascending=True, inplace=True) #cirm_list = list(df_value.code)[:int(0.6*len(stocklist))] poollist = list(df_value[df_value.circulating_market_cap < g.cirm_up].code) #绝对 elif filt_type == 'A': df_value = get_valuation(stocklist, end_date=check_date, count=1, fields=['circulating_market_cap']) #先新后老 df_value = df_value.dropna() poollist = list(df_value[df_value.circulating_market_cap < g.cirm_up].code) #绝对 df_money = get_money_flow(poollist, end_date=check_date, count=1, fields=['sec_code','date','net_pct_main']) poollist = list(df_money[df_money.net_pct_main <-1].sec_code) return poollist #去除T日是一字/T字/尾盘封板弱 def get_up_optimize_filter(context,stocklist): log.info('-函数运行时间(get_up_optimize_filter):'+str(context.current_dt.time())) today_date = context.current_dt.date() lastd_date = context.previous_date all_data = get_current_data() poollist =[] if len(stocklist)==0: log.info("输入为空") return poollist #其他条件,在循环中过滤 for stockcode in stocklist: #过滤掉一字板,T字板; df_lastd = get_price(stockcode,end_date=lastd_date,frequency='daily',fields=['open','close','high','high_limit','low_limit'],count=1) if (df_lastd['open'][0] == df_lastd['high_limit'][0] and df_lastd['close'][0] == df_lastd['high_limit'][0]): continue #过滤掉尾盘封板弱的; df_last30 = get_bars(stockcode, count=60, unit='1m', fields=['open','close','high','low'],include_now=True,df=True) if (df_last30['low'][:].min() != df_lastd['high_limit'][0]) & (df_last30['high'][:].max() == df_lastd['high_limit'][0]): continue """ df_value = get_valuation(stockcode, end_date=lastd_date, count=1, fields=['circulating_market_cap','pe_ratio','pb_ratio']) #先新后老 if len(df_value) ==0 or df_value['pe_ratio'].values[0] <0: #存在数据缺失 continue """ poollist.append(stockcode) return poollist ##过滤N天内M倍最高量,X-买入前量能过滤,1X-为持仓的量能过滤 def get_highvolume_filter(context,stocklist,control_mode,check_dura,volume_ratio): log.info('-函数运行时间(get_highvolume_filter):'+str(context.current_dt.time())) lastd_date = context.previous_date poollist =[] if len(stocklist)==0: log.info("输入为空") return poollist for stockcode in stocklist: if control_mode ==1: df_price = get_price(stockcode,end_date=lastd_date,frequency='daily',fields=['volume'],count=check_dura) if df_price['volume'][-1] > volume_ratio*df_price['volume'].max(): continue poollist.append(stockcode) elif control_mode ==2: df_price = get_price(stockcode,end_date=lastd_date,frequency='daily',fields=['volume'],count=check_dura) if df_price['volume'][-1] > volume_ratio*df_price['volume'].mean(): continue poollist.append(stockcode) elif control_mode ==3: df_price = get_price(stockcode,end_date=lastd_date,frequency='daily',fields=['volume'],count=check_dura) if df_price['volume'][-1] > volume_ratio*df_price['volume'][-2]: continue poollist.append(stockcode) elif control_mode ==4: df_price = get_price(stockcode,end_date=lastd_date,frequency='daily',fields=['volume'],count=240) if df_price['volume'][-1] > 0.9*df_price['volume'].max(): continue if df_price['volume'][-1] > 5*df_price['volume'][-20:].mean(): continue poollist.append(stockcode) print('---量能控制%d-%d天放%.1f量过滤后共%d只' % (control_mode,check_dura,volume_ratio,len(poollist))) return poollist #按比例过滤 def get_factor_filter_list(context,stock_list,jqfactor,sort,p1,p2): yesterday = context.previous_date score_list = get_factor_values(stock_list, jqfactor, end_date=yesterday, count=1)[jqfactor].iloc[0].tolist() df = pd.DataFrame(columns=['code','score']) df['code'] = stock_list df['score'] = score_list df = df.dropna() df.sort_values(by='score', ascending=sort, inplace=True) filter_list = list(df.code)[int(p1*len(stock_list)):int(p2*len(stock_list))] return filter_list """ ---------------------------------函数定义-辅助函数----------------------------------------------- """ ##买入函数 def buy_stock(context,stockcode,cash): today_date = context.current_dt.date() current_data = get_current_data() if stockcode[0:2] == '68': last_price = current_data[stockcode].last_price if order_target_value(stockcode,cash,MarketOrderStyle(1.1*last_price)) != None: #科创板需要设定限值 log.info('%s买入%s' % (today_date,stockcode)) else: if order_target_value(stockcode, cash) != None: log.info('%s买入%s' % (today_date,stockcode)) ##卖出函数 def sell_stock(context,stockcode,cash): today_date = context.current_dt.date() current_data = get_current_data() if stockcode[0:2] == '68': last_price = current_data[stockcode].last_price if order_target_value(stockcode,cash,MarketOrderStyle(0.9*last_price)) != None: #科创板需要设定限值 log.info('%s卖出%s' % (today_date,stockcode)) else: if order_target_value(stockcode,cash) != None: log.info('%s卖出%s' % (today_date,stockcode))