| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418 |
- """
- 数据更新服务
- 负责定期更新期货数据
- """
- import logging
- import threading
- import yaml
- from pathlib import Path
- from apscheduler.schedulers.background import BackgroundScheduler
- from apscheduler.triggers.cron import CronTrigger
- from app.database.db_manager import db
- from app.models.future_info import FutureInfo, FutureDaily
- from app.services.data_scraper import FutureDataScraper
- from retrying import retry
- import os
- logger = logging.getLogger(__name__)
- class DataUpdateService:
- """
- 数据更新服务
- 用于定期更新数据库中的期货数据
- """
- _instance = None
- _lock = threading.Lock()
-
- def __new__(cls, app=None):
- with cls._lock:
- if cls._instance is None:
- cls._instance = super().__new__(cls)
- cls._instance.app = None
- cls._instance.scraper = None # 延迟初始化爬虫
- cls._instance.scheduler = None # 延迟初始化调度器
- cls._instance.config = None
- cls._instance._initialized = False
- return cls._instance
-
- def __init__(self, app=None):
- with self._lock:
- if not self._initialized:
- logger.info("初始化数据更新服务...")
- self.app = app
- self.config = None # 延迟加载配置
- self._initialized = True
-
- if app is not None:
- self.init_app(app)
-
- def _load_config(self):
- """
- 加载配置文件,优先从配置服务读取定时任务配置
- """
- config_path = Path("config.yaml")
-
- # 尝试从配置服务读取定时任务配置
- try:
- from app.services.config_service import get_int_config, get_str_config
- morning_hour = get_int_config('schedule_morning_hour', 9)
- morning_minute = get_int_config('schedule_morning_minute', 0)
- afternoon_hour = get_int_config('schedule_afternoon_hour', 15)
- afternoon_minute = get_int_config('schedule_afternoon_minute', 0)
-
- config_from_service = {
- "data_update": {
- "schedule": [
- {"hour": str(morning_hour), "minute": str(morning_minute)},
- {"hour": str(afternoon_hour), "minute": str(afternoon_minute)}
- ]
- }
- }
- logger.info(f"从配置服务加载定时任务配置: {morning_hour}:{morning_minute:02d}, {afternoon_hour}:{afternoon_minute:02d}")
- return config_from_service
-
- except Exception as e:
- logger.warning(f"从配置服务读取定时配置失败,尝试读取配置文件: {e}")
-
- # 如果配置服务读取失败,回退到配置文件
- if not config_path.exists():
- # 创建默认配置
- default_config = {
- "data_update": {
- "schedule": [
- {"hour": "9", "minute": "0"},
- {"hour": "15", "minute": "0"}
- ]
- }
- }
- try:
- config_filename = get_str_config('config_filename', 'config.yaml')
- config_path = Path(config_filename)
- except:
- pass
-
- with open(config_path, "w", encoding="utf-8") as f:
- yaml.dump(default_config, f, allow_unicode=True)
- return default_config
-
- with open(config_path, "r", encoding="utf-8") as f:
- return yaml.safe_load(f)
-
- def init_app(self, app):
- """
- 初始化应用
-
- Args:
- app: Flask应用实例
- """
- logger.info(f"初始化应用到数据更新服务,应用ID: {id(app)}")
- self.app = app
-
- # 在有Flask应用上下文时加载配置和初始化组件
- with app.app_context():
- # 延迟加载配置
- if self.config is None:
- self.config = self._load_config()
- logger.info("数据更新服务配置加载完成")
-
- # 延迟初始化爬虫
- if self.scraper is None:
- self.scraper = FutureDataScraper()
- logger.info("数据爬虫初始化完成")
-
- # 在主进程中初始化调度器
- if not self.scheduler:
- self.scheduler = BackgroundScheduler()
- if not app.debug or os.environ.get('WERKZEUG_RUN_MAIN') == 'true':
- logger.info("在主进程中启动调度器...")
- self.start_scheduler()
-
- def _ensure_scraper_initialized(self):
- """确保数据爬虫已初始化"""
- if self.scraper is None:
- logger.info("延迟初始化数据爬虫...")
- self.scraper = FutureDataScraper()
-
- def _ensure_config_initialized(self):
- """确保配置已加载"""
- if self.config is None:
- logger.info("延迟加载数据更新服务配置...")
- self.config = self._load_config()
-
- def update_future_daily_data(self):
- """
- 更新期货每日数据
-
- 从网站爬取最新的期货数据,存入future_daily表,然后更新future_info表
- """
- try:
- if not self.app:
- logger.error("应用上下文未设置,无法更新数据")
- return False
-
- with self.app.app_context():
- logger.info("开始更新期货每日数据...")
- # 确保爬虫已初始化
- self._ensure_scraper_initialized()
- # 更新future_daily表
- records_count = self.scraper.update_future_daily(db.session, FutureDaily)
- if records_count > 0:
- # 根据future_daily表更新future_info表
- updated_count = self.scraper.update_future_info_from_daily(db.session, FutureInfo, FutureDaily)
- logger.info(f"期货数据更新完成: 爬取{records_count}条记录,更新{updated_count}条期货基础信息")
- return True
- else:
- logger.warning("期货数据更新失败: 未能爬取任何数据")
- return False
- except Exception as e:
- logger.error(f"期货数据更新出错: {str(e)}")
- raise
-
- def manual_update(self):
- """
- 手动触发数据更新
-
- Returns:
- dict: 更新结果
- """
- try:
- logger.info("开始手动更新数据...")
- with self.app.app_context():
- # 确保爬虫已初始化
- self._ensure_scraper_initialized()
- # 1. 更新future_daily表
- logger.info("正在更新future_daily表...")
- records_count = self.scraper.update_future_daily(db.session, FutureDaily)
-
- if records_count > 0:
- # 2. 更新future_info表
- logger.info("正在根据future_daily表更新future_info表...")
- updated_count = self.scraper.update_future_info_from_daily(db.session, FutureInfo, FutureDaily)
-
- return {
- 'code': 0,
- 'msg': f'数据更新成功:新增{records_count}条每日数据,更新{updated_count}条期货信息',
- 'data': {
- 'daily_count': records_count,
- 'info_count': updated_count
- }
- }
- else:
- logger.warning("未能获取到新的每日数据")
- return {
- 'code': 1,
- 'msg': '未能获取到新的数据,可能是网络问题或数据源未更新',
- 'data': None
- }
-
- except Exception as e:
- error_msg = f"手动更新数据失败: {str(e)}"
- logger.error(error_msg)
- return {
- 'code': 1,
- 'msg': error_msg,
- 'data': None
- }
-
- def start_scheduler(self):
- """
- 启动定时任务调度器
- """
- if not self.scheduler:
- logger.warning("调度器未初始化,无法启动")
- return
-
- if self.scheduler.running:
- logger.info(f"调度器已经在运行中,scheduler_id: {id(self.scheduler)}")
- return
-
- try:
- logger.info(f"开始启动调度器,scheduler_id: {id(self.scheduler)}")
-
- # 获取调度器配置,优先从配置服务读取
- try:
- from app.services.config_service import get_str_config, get_int_config, get_bool_config
- scheduler_timezone = get_str_config('scheduler_timezone', 'Asia/Shanghai')
- scheduler_max_instances = get_int_config('scheduler_max_instances', 1)
- scheduler_coalesce = get_bool_config('scheduler_coalesce', True)
- scheduler_misfire_grace_time = get_int_config('scheduler_misfire_grace_time', 60)
-
- logger.info(f"从配置服务读取调度器配置: timezone={scheduler_timezone}, max_instances={scheduler_max_instances}")
- except Exception as e:
- logger.warning(f"从配置服务读取调度器配置失败,使用默认配置: {e}")
- # 确保配置已加载
- self._ensure_config_initialized()
- scheduler_config = self.config.get("data_update", {}).get("scheduler", {})
- scheduler_timezone = scheduler_config.get("timezone", "Asia/Shanghai")
- scheduler_max_instances = scheduler_config.get("max_instances", 1)
- scheduler_coalesce = scheduler_config.get("coalesce", True)
- scheduler_misfire_grace_time = scheduler_config.get("misfire_grace_time", 60)
-
- # 配置调度器
- self.scheduler.configure(
- timezone=scheduler_timezone,
- max_instances=scheduler_max_instances,
- coalesce=scheduler_coalesce,
- misfire_grace_time=scheduler_misfire_grace_time
- )
-
- # 从配置文件读取定时设置
- # 确保配置已加载
- self._ensure_config_initialized()
- schedule_config = self.config.get("data_update", {}).get("schedule", [])
- logger.info(f"读取到的定时配置: {schedule_config}")
-
- # 添加定时任务
- for schedule in schedule_config:
- try:
- hour = schedule.get("hour", "*")
- minute = schedule.get("minute", "0")
- job_id = f"update_future_data_{hour}_{minute}"
-
- # 创建触发器
- trigger = CronTrigger(
- hour=hour,
- minute=minute,
- timezone=scheduler_timezone # 使用前面获取的配置
- )
-
- # 如果启用重试,创建重试装饰器
- if schedule.get("retry", False):
- # 从配置服务获取重试配置
- try:
- from app.services.config_service import get_int_config
- max_retries = get_int_config('retry_max_attempts', 3)
- retry_delay = get_int_config('retry_delay_seconds', 300)
- except Exception as e:
- logger.warning(f"获取重试配置失败,使用默认值: {e}")
- max_retries = schedule.get("max_retries", 3)
- retry_delay = schedule.get("retry_delay", 300)
-
- @retry(
- stop_max_attempt_number=max_retries + 1,
- wait_fixed=retry_delay * 1000, # 毫秒
- retry_on_exception=lambda e: isinstance(e, Exception)
- )
- def wrapped_task():
- return self.update_future_daily_data()
-
- task_func = wrapped_task
- else:
- task_func = self.update_future_daily_data
-
- # 添加任务
- job = self.scheduler.add_job(
- task_func,
- trigger=trigger,
- id=job_id,
- replace_existing=True,
- max_instances=1,
- coalesce=True
- )
-
- # 安全地获取下次运行时间
- next_run = job.next_run_time.strftime('%Y-%m-%d %H:%M:%S') if job and hasattr(job, 'next_run_time') and job.next_run_time else '未知'
- logger.info(f"添加定时任务: {job_id}, 下次运行时间: {next_run}")
-
- except Exception as e:
- logger.error(f"添加定时任务失败: {str(e)}")
-
- # 启动调度器
- if not self.scheduler.running:
- self.scheduler.start()
- logger.info(f"数据更新调度器已启动,当前状态: {self.get_scheduler_status()}")
-
- except Exception as e:
- logger.error(f"启动调度器失败: {str(e)}")
- # 确保调度器被正确关闭
- if self.scheduler and self.scheduler.running:
- try:
- self.scheduler.shutdown()
- except Exception as shutdown_error:
- logger.warning(f"关闭失败的调度器时出错: {str(shutdown_error)}")
-
- def stop_scheduler(self):
- """
- 停止定时任务调度器
- """
- if not self.scheduler:
- logger.debug("调度器未初始化,无需停止")
- return
-
- if not self.scheduler.running:
- # logger.debug(f"调度器未在运行,scheduler_id: {id(self.scheduler)}")
- return
-
- logger.info(f"开始停止调度器,scheduler_id: {id(self.scheduler)}")
- try:
- self.scheduler.shutdown()
- logger.info("数据更新调度器已停止")
- except Exception as e:
- logger.warning(f"停止调度器时出错: {str(e)}")
- def get_scheduler_status(self):
- """
- 获取调度器状态
- """
- if not self.scheduler:
- return {
- "status": "未初始化",
- "jobs": []
- }
-
- try:
- jobs = []
- if self.scheduler.running:
- for job in self.scheduler.get_jobs():
- jobs.append({
- "id": job.id,
- "next_run_time": job.next_run_time.strftime("%Y-%m-%d %H:%M:%S") if job.next_run_time else None,
- "trigger": str(job.trigger)
- })
-
- return {
- "status": "运行中" if self.scheduler.running else "已停止",
- "jobs": jobs
- }
- except Exception as e:
- logger.warning(f"获取调度器状态时出错: {str(e)}")
- return {
- "status": "获取状态失败",
- "jobs": []
- }
- # 创建服务实例
- data_update_service = DataUpdateService()
- def init_data_update_service(app):
- """
- 初始化数据更新服务
-
- Args:
- app: Flask应用实例
- """
- data_update_service.init_app(app)
-
- # 设置应用标记,用于跟踪是否已经初始化
- app._future_data_initialized = False
-
- # 在Flask 2.2+中,before_first_request被移除,使用after_request替代
- @app.after_request
- def after_request_handler(response):
- # 检查是否需要初始化数据
- if not app._future_data_initialized:
- app._future_data_initialized = True
- # 在后台线程中执行数据更新
- thread = threading.Thread(target=data_update_service.update_future_daily_data)
- thread.daemon = True
- thread.start()
- return response
-
- # 启动定时任务调度器
- data_update_service.start_scheduler()
-
- # 应用关闭时停止调度器
- @app.teardown_appcontext
- def stop_scheduler(exception=None):
- data_update_service.stop_scheduler()
|