from jqdata import * from jqlib.technical_analysis import * import numpy as np import pandas as pd import datetime as dt # 初始化函数,设定基准等等 def initialize(context): # 设定沪深300作为基准 set_benchmark('000300.XSHG') # 开启异步报单 set_option('async_order', True) # 开启动态复权模式(真实价格) set_option('use_real_price', True) # 是否未来函数 set_option("avoid_future_data", True) # 初始化全局变量 g.observation_period = 1260 # 默认观察周期为5年(约252个交易日) g.total_portions = 100 # 总资金分成100份 g.buy_thresholds = { 0.3: 1, # 相对位置30%买入1份 0.2: 2, # 相对位置20%买入2份 0.1: 3, # 相对位置10%买入3份 0.0: 5 # 相对位置最低买入5份 } g.profit_target = 0.5 # 50%的目标收益率 g.take_profit_portion = 0.05 # 每次卖出获利5%的部分 # 记录买入历史和持仓信息 g.buy_records = {} # 记录每个基金在不同阈值的买入状态 g.position_records = {} # 记录每个基金的持仓成本和数量 g.profit_records = {} # 记录已经获利的部分 # 新增:交易信息存储字典 g.trading_records = { 'normal_positions': {}, # 正常网格交易持仓 'profit_positions': {} # 获利后保留的持仓 } # 设置交易费用 set_order_cost(OrderCost(close_tax=0.000, open_commission=0.00025, close_commission=0.00025, min_commission=0), type='fund') # 设定定时运行函数 run_weekly(before_market_open, 1, '09:20') run_daily(market_open, '09:31') run_daily(check_profit, '14:30') # run_daily(print_position_info, '15:30') def before_market_open(context): # 获取目标基金列表 g.target_funds = [ '512880.XSHG', '512980.XSHG', '513050.XSHG', '159938.XSHE', '512070.XSHG', '515180.XSHG', '513520.XSHG', '159952.XSHE', '162412.XSHE', '510300.XSHG', '510050.XSHG', '510500.XSHG', '588080.XSHG', '588000.XSHG', '159901.XSHE', '159915.XSHE', '159919.XSHE', '159922.XSHE', '159920.XSHE', '159985.XSHE', '501018.XSHG', '162411.XSHE' ] # 初始化当日需要检查的基金列表 g.check_funds = [] # log.info("\n=== 周一基金检查 ===") # log.info(f"观察周期: {g.observation_period}个交易日") # log.info("\n基金历史位置统计:") # 记录所有基金的相对位置,用于计算平均值 all_positions = [] for fund in g.target_funds: try: df = get_price(fund, count=g.observation_period, end_date=context.previous_date, fields=['close']) current_price = df['close'][-1] hist_prices = df['close'].values # 计算相对于历史最高价和最低价的位置 hist_high = np.max(hist_prices) hist_low = np.min(hist_prices) price_position = (current_price - hist_low) / (hist_high - hist_low) if hist_high != hist_low else 0 all_positions.append(price_position) # 获取基金名称 fund_info = get_security_info(fund) fund_name = fund_info.display_name if fund_info else fund # log.info(f"基金 {fund}({fund_name}): | 当前价格: {format(current_price, '.4f')}| 相对位置: {format(price_position * 100, '.2f')}%| 价格区间: {format(hist_low, '.4f')} - {format(hist_high, '.4f')}") if price_position <= 0.35: # 如果当前价格在相对位置40%以下 g.check_funds.append(fund) # log.info(" 状态: 需要每日检查 ✓") # else: # log.info(" 状态: 暂不检查 ✗") except Exception as e: # log.error(f"获取基金{fund}数据失败: {str(e)}") continue # 输出统计信息 if all_positions: avg_position = np.mean(all_positions) median_position = np.median(all_positions) low_position_count = sum(1 for p in all_positions if p <= 0.3) # log.info("\n=== 统计信息 ===") # log.info(f"平均相对位置: {format(avg_position * 100, '.2f')}% | 中位相对位置: {format(median_position * 100, '.2f')}% | 低位基金数量: {low_position_count} (相对位置≤30%)") if g.check_funds: # log.info("\n=== 需要每日检查的基金 ===") for fund in g.check_funds: fund_info = get_security_info(fund) fund_name = fund_info.display_name if fund_info else fund # log.info(f"- {fund}({fund_name})") # else: # log.info("\n没有基金需要每日检查") # 在统计信息之后,添加交易持仓信息的输出 log.info("\n=== 交易持仓信息 ===") # 输出正常网格交易持仓 if g.trading_records['normal_positions']: log.info("正常网格交易持仓:") for fund, positions in g.trading_records['normal_positions'].items(): fund_info = get_security_info(fund) fund_name = fund_info.display_name if fund_info else fund for pos in positions: log.info(f"标的 {fund}({fund_name}): " f"买入时间 {pos['buy_time'].strftime('%Y-%m-%d %H:%M')}, " f"相对位置 {format(pos['price_position'] * 100, '.2f')}%, " f"买入数量 {format(pos['shares'], '.2f')}, " f"买入价格 {format(pos['buy_price'], '.4f')}, " f"买入金额 {format(pos['total_amount'], '.2f')}") else: log.info("当前无正常网格交易持仓") # 输出获利保留持仓 if g.trading_records['profit_positions']: log.info("\n获利保留持仓:") for fund, positions in g.trading_records['profit_positions'].items(): fund_info = get_security_info(fund) fund_name = fund_info.display_name if fund_info else fund for pos in positions: log.info(f"标的 {fund}({fund_name}): " f"原始买入时间 {pos['buy_time'].strftime('%Y-%m-%d %H:%M')}, " f"卖出时间 {pos['sell_time'].strftime('%Y-%m-%d %H:%M')}, " f"原始相对位置 {format(pos['price_position'] * 100, '.2f')}%, " f"买入价格 {format(pos['buy_price'], '.4f')}, " f"卖出价格 {format(pos['sell_price'], '.4f')}, " f"卖出数量 {format(pos['sell_shares'], '.2f')}, " f"卖出金额 {format(pos['sell_amount'], '.2f')}, " f"保留份额 {format(pos['remaining_shares'], '.2f')}") else: log.info("当前无获利保留持仓") def market_open(context): # 检查需要买入的基金 for fund in g.check_funds: try: # 过滤停牌和新成立基金 if not filter_paused_fund(context, fund) or not filter_new_fund(context, fund): continue df = get_price(fund, count=g.observation_period, end_date=context.previous_date, fields=['close']) current_price = df['close'][-1] hist_prices = df['close'].values # 计算相对于历史最高价和最低价的位置 hist_high = np.max(hist_prices) hist_low = np.min(hist_prices) price_position = (current_price - hist_low) / (hist_high - hist_low) if hist_high != hist_low else 0 # 如果该基金没有买入记录,初始化记录 if fund not in g.buy_records: g.buy_records[fund] = {threshold: False for threshold in g.buy_thresholds.keys()} # 检查每个阈值是否需要买入 for threshold, portions in g.buy_thresholds.items(): if price_position <= threshold and not g.buy_records[fund][threshold]: # 计算买入金额 portion_value = context.portfolio.total_value / g.total_portions buy_amount = portion_value * portions # 执行买入 order = order_target_value_(fund, buy_amount) if order is not None and order.status == OrderStatus.held and order.filled == order.amount: g.buy_records[fund][threshold] = True # 记录持仓信息 if fund not in g.position_records: g.position_records[fund] = [] g.position_records[fund].append({ 'buy_price': current_price, 'amount': buy_amount, 'threshold': threshold }) # 更新交易记录 if fund not in g.trading_records['normal_positions']: g.trading_records['normal_positions'][fund] = [] # 获取实际成交信息 filled_shares = order.filled total_amount = filled_shares * current_price g.trading_records['normal_positions'][fund].append({ 'buy_time': context.current_dt, 'price_position': price_position, 'shares': filled_shares, 'buy_price': current_price, 'total_amount': total_amount }) # log.info(f"买入基金{fund},价格{current_price},金额{buy_amount},相对位置{format(price_position * 100, '.2f')}%,阈值{format(threshold * 100, '.2f')}%") except Exception as e: # log.error(f"处理基金{fund}买入失败: {str(e)}") continue def check_profit(context): # 检查所有持仓是否达到盈利目标 for fund in list(g.position_records.keys()): # 使用list复制键,避免运行时修改字典 try: if not filter_paused_fund(context, fund): continue current_price = get_current_data()[fund].last_price # 检查每个持仓记录 positions_to_remove = [] # 记录需要移除的持仓 for position in g.position_records[fund]: if current_price >= position['buy_price'] * (1 + g.profit_target): # 计算需要卖出的份额 # 原始投入金额 original_value = position['amount'] # 目标收益金额(比如10%) target_profit = original_value * g.take_profit_portion # 需要卖出的份额 = (原始金额 + 目标收益) / 当前价格 shares_to_sell = (original_value + target_profit) / current_price # 计算卖出金额 sell_amount = shares_to_sell * current_price # 执行卖出 order = order_target_value_(fund, -sell_amount) if order is not None and order.status == OrderStatus.held and order.filled > 0: # 计算实际获利 actual_profit = sell_amount - (sell_amount / current_price * position['buy_price']) # 更新持仓记录 position['amount'] -= sell_amount # 记录获利部分 if fund not in g.profit_records: g.profit_records[fund] = [] g.profit_records[fund].append({ 'sell_price': current_price, 'profit_amount': actual_profit, 'sell_date': context.current_dt, 'sell_shares': shares_to_sell }) # 更新交易记录 - 从normal_positions移除卖出部分 if fund in g.trading_records['normal_positions']: # 找到对应的买入记录 for normal_pos in g.trading_records['normal_positions'][fund]: if normal_pos['buy_price'] == position['buy_price']: # 计算剩余份额 remaining_shares = normal_pos['shares'] - shares_to_sell if remaining_shares > 0: normal_pos['shares'] = remaining_shares normal_pos['total_amount'] = remaining_shares * normal_pos['buy_price'] else: g.trading_records['normal_positions'][fund].remove(normal_pos) # 添加到获利持仓记录 if fund not in g.trading_records['profit_positions']: g.trading_records['profit_positions'][fund] = [] g.trading_records['profit_positions'][fund].append({ 'buy_time': normal_pos['buy_time'], 'sell_time': context.current_dt, 'price_position': normal_pos['price_position'], 'buy_price': normal_pos['buy_price'], 'sell_price': current_price, 'sell_shares': shares_to_sell, 'sell_amount': sell_amount, 'remaining_shares': remaining_shares if remaining_shares > 0 else 0 }) break # 如果该基金没有正常持仓了,清理记录 if not g.trading_records['normal_positions'][fund]: del g.trading_records['normal_positions'][fund] # log.info(f"卖出基金{fund}获利部分: | 买入价格: {format(position['buy_price'], '.4f')}| 当前价格: {format(current_price, '.4f')}| 卖出份额: {format(shares_to_sell, '.2f')}| 卖出金额: {format(sell_amount, '.2f')}| 实际获利: {format(actual_profit, '.2f')}| 剩余金额: {format(position['amount'], '.2f')}") # 如果剩余金额很小,加入待清理列表 if position['amount'] < 100: positions_to_remove.append(position) # 清理需要移除的持仓记录 for position in positions_to_remove: g.buy_records[fund][position['threshold']] = False g.position_records[fund].remove(position) # 如果该基金没有持仓了,清理记录 if not g.position_records[fund]: del g.position_records[fund] except Exception as e: # log.error(f"处理基金{fund}获利了结失败: {str(e)}") continue def print_position_info(context): # 打印当天成交记录 trades = get_trades() for _trade in trades.values(): print(f'成交记录:{str(_trade)}') # 打印账户信息 print("\n=== 当前持仓信息 ===") for position in list(context.portfolio.positions.values()): security = position.security cost = position.avg_cost price = position.price ret = 100 * (price/cost - 1) value = position.value amount = position.total_amount print(f'代码: {security} | 成本价: {format(cost, ".2f")} | 现价: {price} | 收益率: {format(ret, ".2f")}% | 持仓(份): {amount} | 市值: {format(value, ".2f")}') # 打印当日盈利记录 print("\n=== 当日获利记录 ===") for fund in g.profit_records.keys(): if g.profit_records[fund]: today_profits = [record for record in g.profit_records[fund] if record['sell_date'].date() == context.current_dt.date()] if today_profits: print(f"\n基金{fund}今日获利记录:") total_profit = sum([record['profit_amount'] for record in today_profits]) print(f"今日获利:{format(total_profit, '.2f')}") # def after_trading_end(context): # # 打印每日持仓和盈利信息 # log.info("=== 当日持仓信息 ===") # for fund in g.position_records.keys(): # if g.position_records[fund]: # log.info(f"基金{fund}持仓:") # for pos in g.position_records[fund]: # log.info(f"买入价格:{pos['buy_price']},持仓金额:{pos['amount']},买入阈值:{pos['threshold']}") # log.info("=== 当日获利记录 ===") # for fund in g.profit_records.keys(): # if g.profit_records[fund]: # total_profit = sum([record['profit_amount'] for record in g.profit_records[fund]]) # log.info(f"基金{fund}获利记录: | 总获利:{total_profit}") # 交易模块 - 自定义下单 def order_target_value_(security, value): if value == 0: log.debug(f"清空持仓 {security}") else:` log.debug(f"目标下单 {security},金额 {value}") return order_target_value(security, value) # 交易模块 - 开仓 def open_position(security, value): if value == 0: return True order = order_target_value_(security, value) if order is not None and order.status == OrderStatus.held and order.filled == order.amount: return True return False # 交易模块 - 平仓 def close_position(position): security = position.security order = order_target_value_(security, 0) if order is not None and order.status == OrderStatus.held and order.filled == order.amount: return True return False # 交易模块 - 调整仓位 def adjust_position(context, security, target_value): if security not in context.portfolio.positions: if target_value > 0: return open_position(security, target_value) else: position = context.portfolio.positions[security] if target_value == 0: return close_position(position) else: return order_target_value_(security, target_value) return False # 过滤模块 - 过滤停牌基金 def filter_paused_fund(context, security): current_data = get_current_data() return not current_data[security].paused # 过滤模块 - 过滤新成立基金 def filter_new_fund(context, security): yesterday = context.previous_date return not yesterday - get_security_info(security).start_date < datetime.timedelta(days=30) # 过滤成立不足30天的基金