""" 数据更新服务 负责定期更新期货数据 """ import logging import threading import yaml from pathlib import Path from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.triggers.cron import CronTrigger from app.database.db_manager import db from app.models.future_info import FutureInfo, FutureDaily from app.services.data_scraper import FutureDataScraper from retrying import retry import os logger = logging.getLogger(__name__) class DataUpdateService: """ 数据更新服务 用于定期更新数据库中的期货数据 """ _instance = None _lock = threading.Lock() def __new__(cls, app=None): with cls._lock: if cls._instance is None: cls._instance = super().__new__(cls) cls._instance.app = None cls._instance.scraper = None # 延迟初始化爬虫 cls._instance.scheduler = None # 延迟初始化调度器 cls._instance.config = None cls._instance._initialized = False return cls._instance def __init__(self, app=None): with self._lock: if not self._initialized: logger.info("初始化数据更新服务...") self.app = app self.config = None # 延迟加载配置 self._initialized = True if app is not None: self.init_app(app) def _load_config(self): """ 加载配置文件,优先从配置服务读取定时任务配置 """ config_path = Path("config.yaml") # 尝试从配置服务读取定时任务配置 try: from app.services.config_service import get_int_config, get_str_config morning_hour = get_int_config('schedule_morning_hour', 9) morning_minute = get_int_config('schedule_morning_minute', 0) afternoon_hour = get_int_config('schedule_afternoon_hour', 15) afternoon_minute = get_int_config('schedule_afternoon_minute', 0) config_from_service = { "data_update": { "schedule": [ {"hour": str(morning_hour), "minute": str(morning_minute)}, {"hour": str(afternoon_hour), "minute": str(afternoon_minute)} ] } } logger.info(f"从配置服务加载定时任务配置: {morning_hour}:{morning_minute:02d}, {afternoon_hour}:{afternoon_minute:02d}") return config_from_service except Exception as e: logger.warning(f"从配置服务读取定时配置失败,尝试读取配置文件: {e}") # 如果配置服务读取失败,回退到配置文件 if not config_path.exists(): # 创建默认配置 default_config = { "data_update": { "schedule": [ {"hour": "9", "minute": "0"}, {"hour": "15", "minute": "0"} ] } } try: config_filename = get_str_config('config_filename', 'config.yaml') config_path = Path(config_filename) except: pass with open(config_path, "w", encoding="utf-8") as f: yaml.dump(default_config, f, allow_unicode=True) return default_config with open(config_path, "r", encoding="utf-8") as f: return yaml.safe_load(f) def init_app(self, app): """ 初始化应用 Args: app: Flask应用实例 """ logger.info(f"初始化应用到数据更新服务,应用ID: {id(app)}") self.app = app # 在有Flask应用上下文时加载配置和初始化组件 with app.app_context(): # 延迟加载配置 if self.config is None: self.config = self._load_config() logger.info("数据更新服务配置加载完成") # 延迟初始化爬虫 if self.scraper is None: self.scraper = FutureDataScraper() logger.info("数据爬虫初始化完成") # 在主进程中初始化调度器 if not self.scheduler: self.scheduler = BackgroundScheduler() if not app.debug or os.environ.get('WERKZEUG_RUN_MAIN') == 'true': logger.info("在主进程中启动调度器...") self.start_scheduler() def _ensure_scraper_initialized(self): """确保数据爬虫已初始化""" if self.scraper is None: logger.info("延迟初始化数据爬虫...") self.scraper = FutureDataScraper() def _ensure_config_initialized(self): """确保配置已加载""" if self.config is None: logger.info("延迟加载数据更新服务配置...") self.config = self._load_config() def update_future_daily_data(self): """ 更新期货每日数据 从网站爬取最新的期货数据,存入future_daily表,然后更新future_info表 """ try: if not self.app: logger.error("应用上下文未设置,无法更新数据") return False with self.app.app_context(): logger.info("开始更新期货每日数据...") # 确保爬虫已初始化 self._ensure_scraper_initialized() # 更新future_daily表 records_count = self.scraper.update_future_daily(db.session, FutureDaily) if records_count > 0: # 根据future_daily表更新future_info表 updated_count = self.scraper.update_future_info_from_daily(db.session, FutureInfo, FutureDaily) logger.info(f"期货数据更新完成: 爬取{records_count}条记录,更新{updated_count}条期货基础信息") return True else: logger.warning("期货数据更新失败: 未能爬取任何数据") return False except Exception as e: logger.error(f"期货数据更新出错: {str(e)}") raise def manual_update(self): """ 手动触发数据更新 Returns: dict: 更新结果 """ try: logger.info("开始手动更新数据...") with self.app.app_context(): # 确保爬虫已初始化 self._ensure_scraper_initialized() # 1. 更新future_daily表 logger.info("正在更新future_daily表...") records_count = self.scraper.update_future_daily(db.session, FutureDaily) if records_count > 0: # 2. 更新future_info表 logger.info("正在根据future_daily表更新future_info表...") updated_count = self.scraper.update_future_info_from_daily(db.session, FutureInfo, FutureDaily) return { 'code': 0, 'msg': f'数据更新成功:新增{records_count}条每日数据,更新{updated_count}条期货信息', 'data': { 'daily_count': records_count, 'info_count': updated_count } } else: logger.warning("未能获取到新的每日数据") return { 'code': 1, 'msg': '未能获取到新的数据,可能是网络问题或数据源未更新', 'data': None } except Exception as e: error_msg = f"手动更新数据失败: {str(e)}" logger.error(error_msg) return { 'code': 1, 'msg': error_msg, 'data': None } def start_scheduler(self): """ 启动定时任务调度器 """ if not self.scheduler: logger.warning("调度器未初始化,无法启动") return if self.scheduler.running: logger.info(f"调度器已经在运行中,scheduler_id: {id(self.scheduler)}") return try: logger.info(f"开始启动调度器,scheduler_id: {id(self.scheduler)}") # 获取调度器配置,优先从配置服务读取 try: from app.services.config_service import get_str_config, get_int_config, get_bool_config scheduler_timezone = get_str_config('scheduler_timezone', 'Asia/Shanghai') scheduler_max_instances = get_int_config('scheduler_max_instances', 1) scheduler_coalesce = get_bool_config('scheduler_coalesce', True) scheduler_misfire_grace_time = get_int_config('scheduler_misfire_grace_time', 60) logger.info(f"从配置服务读取调度器配置: timezone={scheduler_timezone}, max_instances={scheduler_max_instances}") except Exception as e: logger.warning(f"从配置服务读取调度器配置失败,使用默认配置: {e}") # 确保配置已加载 self._ensure_config_initialized() scheduler_config = self.config.get("data_update", {}).get("scheduler", {}) scheduler_timezone = scheduler_config.get("timezone", "Asia/Shanghai") scheduler_max_instances = scheduler_config.get("max_instances", 1) scheduler_coalesce = scheduler_config.get("coalesce", True) scheduler_misfire_grace_time = scheduler_config.get("misfire_grace_time", 60) # 配置调度器 self.scheduler.configure( timezone=scheduler_timezone, max_instances=scheduler_max_instances, coalesce=scheduler_coalesce, misfire_grace_time=scheduler_misfire_grace_time ) # 从配置文件读取定时设置 # 确保配置已加载 self._ensure_config_initialized() schedule_config = self.config.get("data_update", {}).get("schedule", []) logger.info(f"读取到的定时配置: {schedule_config}") # 添加定时任务 for schedule in schedule_config: try: hour = schedule.get("hour", "*") minute = schedule.get("minute", "0") job_id = f"update_future_data_{hour}_{minute}" # 创建触发器 trigger = CronTrigger( hour=hour, minute=minute, timezone=scheduler_timezone # 使用前面获取的配置 ) # 如果启用重试,创建重试装饰器 if schedule.get("retry", False): # 从配置服务获取重试配置 try: from app.services.config_service import get_int_config max_retries = get_int_config('retry_max_attempts', 3) retry_delay = get_int_config('retry_delay_seconds', 300) except Exception as e: logger.warning(f"获取重试配置失败,使用默认值: {e}") max_retries = schedule.get("max_retries", 3) retry_delay = schedule.get("retry_delay", 300) @retry( stop_max_attempt_number=max_retries + 1, wait_fixed=retry_delay * 1000, # 毫秒 retry_on_exception=lambda e: isinstance(e, Exception) ) def wrapped_task(): return self.update_future_daily_data() task_func = wrapped_task else: task_func = self.update_future_daily_data # 添加任务 job = self.scheduler.add_job( task_func, trigger=trigger, id=job_id, replace_existing=True, max_instances=1, coalesce=True ) # 安全地获取下次运行时间 next_run = job.next_run_time.strftime('%Y-%m-%d %H:%M:%S') if job and hasattr(job, 'next_run_time') and job.next_run_time else '未知' logger.info(f"添加定时任务: {job_id}, 下次运行时间: {next_run}") except Exception as e: logger.error(f"添加定时任务失败: {str(e)}") # 启动调度器 if not self.scheduler.running: self.scheduler.start() logger.info(f"数据更新调度器已启动,当前状态: {self.get_scheduler_status()}") except Exception as e: logger.error(f"启动调度器失败: {str(e)}") # 确保调度器被正确关闭 if self.scheduler and self.scheduler.running: try: self.scheduler.shutdown() except Exception as shutdown_error: logger.warning(f"关闭失败的调度器时出错: {str(shutdown_error)}") def stop_scheduler(self): """ 停止定时任务调度器 """ if not self.scheduler: logger.debug("调度器未初始化,无需停止") return if not self.scheduler.running: # logger.debug(f"调度器未在运行,scheduler_id: {id(self.scheduler)}") return logger.info(f"开始停止调度器,scheduler_id: {id(self.scheduler)}") try: self.scheduler.shutdown() logger.info("数据更新调度器已停止") except Exception as e: logger.warning(f"停止调度器时出错: {str(e)}") def get_scheduler_status(self): """ 获取调度器状态 """ if not self.scheduler: return { "status": "未初始化", "jobs": [] } try: jobs = [] if self.scheduler.running: for job in self.scheduler.get_jobs(): jobs.append({ "id": job.id, "next_run_time": job.next_run_time.strftime("%Y-%m-%d %H:%M:%S") if job.next_run_time else None, "trigger": str(job.trigger) }) return { "status": "运行中" if self.scheduler.running else "已停止", "jobs": jobs } except Exception as e: logger.warning(f"获取调度器状态时出错: {str(e)}") return { "status": "获取状态失败", "jobs": [] } # 创建服务实例 data_update_service = DataUpdateService() def init_data_update_service(app): """ 初始化数据更新服务 Args: app: Flask应用实例 """ data_update_service.init_app(app) # 设置应用标记,用于跟踪是否已经初始化 app._future_data_initialized = False # 在Flask 2.2+中,before_first_request被移除,使用after_request替代 @app.after_request def after_request_handler(response): # 检查是否需要初始化数据 if not app._future_data_initialized: app._future_data_initialized = True # 在后台线程中执行数据更新 thread = threading.Thread(target=data_update_service.update_future_daily_data) thread.daemon = True thread.start() return response # 启动定时任务调度器 data_update_service.start_scheduler() # 应用关闭时停止调度器 @app.teardown_appcontext def stop_scheduler(exception=None): data_update_service.stop_scheduler()