data_update.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418
  1. """
  2. 数据更新服务
  3. 负责定期更新期货数据
  4. """
  5. import logging
  6. import threading
  7. import yaml
  8. from pathlib import Path
  9. from apscheduler.schedulers.background import BackgroundScheduler
  10. from apscheduler.triggers.cron import CronTrigger
  11. from app.database.db_manager import db
  12. from app.models.future_info import FutureInfo, FutureDaily
  13. from app.services.data_scraper import FutureDataScraper
  14. from retrying import retry
  15. import os
  16. logger = logging.getLogger(__name__)
  17. class DataUpdateService:
  18. """
  19. 数据更新服务
  20. 用于定期更新数据库中的期货数据
  21. """
  22. _instance = None
  23. _lock = threading.Lock()
  24. def __new__(cls, app=None):
  25. with cls._lock:
  26. if cls._instance is None:
  27. cls._instance = super().__new__(cls)
  28. cls._instance.app = None
  29. cls._instance.scraper = None # 延迟初始化爬虫
  30. cls._instance.scheduler = None # 延迟初始化调度器
  31. cls._instance.config = None
  32. cls._instance._initialized = False
  33. return cls._instance
  34. def __init__(self, app=None):
  35. with self._lock:
  36. if not self._initialized:
  37. logger.info("初始化数据更新服务...")
  38. self.app = app
  39. self.config = None # 延迟加载配置
  40. self._initialized = True
  41. if app is not None:
  42. self.init_app(app)
  43. def _load_config(self):
  44. """
  45. 加载配置文件,优先从配置服务读取定时任务配置
  46. """
  47. config_path = Path("config.yaml")
  48. # 尝试从配置服务读取定时任务配置
  49. try:
  50. from app.services.config_service import get_int_config, get_str_config
  51. morning_hour = get_int_config('schedule_morning_hour', 9)
  52. morning_minute = get_int_config('schedule_morning_minute', 0)
  53. afternoon_hour = get_int_config('schedule_afternoon_hour', 15)
  54. afternoon_minute = get_int_config('schedule_afternoon_minute', 0)
  55. config_from_service = {
  56. "data_update": {
  57. "schedule": [
  58. {"hour": str(morning_hour), "minute": str(morning_minute)},
  59. {"hour": str(afternoon_hour), "minute": str(afternoon_minute)}
  60. ]
  61. }
  62. }
  63. logger.info(f"从配置服务加载定时任务配置: {morning_hour}:{morning_minute:02d}, {afternoon_hour}:{afternoon_minute:02d}")
  64. return config_from_service
  65. except Exception as e:
  66. logger.warning(f"从配置服务读取定时配置失败,尝试读取配置文件: {e}")
  67. # 如果配置服务读取失败,回退到配置文件
  68. if not config_path.exists():
  69. # 创建默认配置
  70. default_config = {
  71. "data_update": {
  72. "schedule": [
  73. {"hour": "9", "minute": "0"},
  74. {"hour": "15", "minute": "0"}
  75. ]
  76. }
  77. }
  78. try:
  79. config_filename = get_str_config('config_filename', 'config.yaml')
  80. config_path = Path(config_filename)
  81. except:
  82. pass
  83. with open(config_path, "w", encoding="utf-8") as f:
  84. yaml.dump(default_config, f, allow_unicode=True)
  85. return default_config
  86. with open(config_path, "r", encoding="utf-8") as f:
  87. return yaml.safe_load(f)
  88. def init_app(self, app):
  89. """
  90. 初始化应用
  91. Args:
  92. app: Flask应用实例
  93. """
  94. logger.info(f"初始化应用到数据更新服务,应用ID: {id(app)}")
  95. self.app = app
  96. # 在有Flask应用上下文时加载配置和初始化组件
  97. with app.app_context():
  98. # 延迟加载配置
  99. if self.config is None:
  100. self.config = self._load_config()
  101. logger.info("数据更新服务配置加载完成")
  102. # 延迟初始化爬虫
  103. if self.scraper is None:
  104. self.scraper = FutureDataScraper()
  105. logger.info("数据爬虫初始化完成")
  106. # 在主进程中初始化调度器
  107. if not self.scheduler:
  108. self.scheduler = BackgroundScheduler()
  109. if not app.debug or os.environ.get('WERKZEUG_RUN_MAIN') == 'true':
  110. logger.info("在主进程中启动调度器...")
  111. self.start_scheduler()
  112. def _ensure_scraper_initialized(self):
  113. """确保数据爬虫已初始化"""
  114. if self.scraper is None:
  115. logger.info("延迟初始化数据爬虫...")
  116. self.scraper = FutureDataScraper()
  117. def _ensure_config_initialized(self):
  118. """确保配置已加载"""
  119. if self.config is None:
  120. logger.info("延迟加载数据更新服务配置...")
  121. self.config = self._load_config()
  122. def update_future_daily_data(self):
  123. """
  124. 更新期货每日数据
  125. 从网站爬取最新的期货数据,存入future_daily表,然后更新future_info表
  126. """
  127. try:
  128. if not self.app:
  129. logger.error("应用上下文未设置,无法更新数据")
  130. return False
  131. with self.app.app_context():
  132. logger.info("开始更新期货每日数据...")
  133. # 确保爬虫已初始化
  134. self._ensure_scraper_initialized()
  135. # 更新future_daily表
  136. records_count = self.scraper.update_future_daily(db.session, FutureDaily)
  137. if records_count > 0:
  138. # 根据future_daily表更新future_info表
  139. updated_count = self.scraper.update_future_info_from_daily(db.session, FutureInfo, FutureDaily)
  140. logger.info(f"期货数据更新完成: 爬取{records_count}条记录,更新{updated_count}条期货基础信息")
  141. return True
  142. else:
  143. logger.warning("期货数据更新失败: 未能爬取任何数据")
  144. return False
  145. except Exception as e:
  146. logger.error(f"期货数据更新出错: {str(e)}")
  147. raise
  148. def manual_update(self):
  149. """
  150. 手动触发数据更新
  151. Returns:
  152. dict: 更新结果
  153. """
  154. try:
  155. logger.info("开始手动更新数据...")
  156. with self.app.app_context():
  157. # 确保爬虫已初始化
  158. self._ensure_scraper_initialized()
  159. # 1. 更新future_daily表
  160. logger.info("正在更新future_daily表...")
  161. records_count = self.scraper.update_future_daily(db.session, FutureDaily)
  162. if records_count > 0:
  163. # 2. 更新future_info表
  164. logger.info("正在根据future_daily表更新future_info表...")
  165. updated_count = self.scraper.update_future_info_from_daily(db.session, FutureInfo, FutureDaily)
  166. return {
  167. 'code': 0,
  168. 'msg': f'数据更新成功:新增{records_count}条每日数据,更新{updated_count}条期货信息',
  169. 'data': {
  170. 'daily_count': records_count,
  171. 'info_count': updated_count
  172. }
  173. }
  174. else:
  175. logger.warning("未能获取到新的每日数据")
  176. return {
  177. 'code': 1,
  178. 'msg': '未能获取到新的数据,可能是网络问题或数据源未更新',
  179. 'data': None
  180. }
  181. except Exception as e:
  182. error_msg = f"手动更新数据失败: {str(e)}"
  183. logger.error(error_msg)
  184. return {
  185. 'code': 1,
  186. 'msg': error_msg,
  187. 'data': None
  188. }
  189. def start_scheduler(self):
  190. """
  191. 启动定时任务调度器
  192. """
  193. if not self.scheduler:
  194. logger.warning("调度器未初始化,无法启动")
  195. return
  196. if self.scheduler.running:
  197. logger.info(f"调度器已经在运行中,scheduler_id: {id(self.scheduler)}")
  198. return
  199. try:
  200. logger.info(f"开始启动调度器,scheduler_id: {id(self.scheduler)}")
  201. # 获取调度器配置,优先从配置服务读取
  202. try:
  203. from app.services.config_service import get_str_config, get_int_config, get_bool_config
  204. scheduler_timezone = get_str_config('scheduler_timezone', 'Asia/Shanghai')
  205. scheduler_max_instances = get_int_config('scheduler_max_instances', 1)
  206. scheduler_coalesce = get_bool_config('scheduler_coalesce', True)
  207. scheduler_misfire_grace_time = get_int_config('scheduler_misfire_grace_time', 60)
  208. logger.info(f"从配置服务读取调度器配置: timezone={scheduler_timezone}, max_instances={scheduler_max_instances}")
  209. except Exception as e:
  210. logger.warning(f"从配置服务读取调度器配置失败,使用默认配置: {e}")
  211. # 确保配置已加载
  212. self._ensure_config_initialized()
  213. scheduler_config = self.config.get("data_update", {}).get("scheduler", {})
  214. scheduler_timezone = scheduler_config.get("timezone", "Asia/Shanghai")
  215. scheduler_max_instances = scheduler_config.get("max_instances", 1)
  216. scheduler_coalesce = scheduler_config.get("coalesce", True)
  217. scheduler_misfire_grace_time = scheduler_config.get("misfire_grace_time", 60)
  218. # 配置调度器
  219. self.scheduler.configure(
  220. timezone=scheduler_timezone,
  221. max_instances=scheduler_max_instances,
  222. coalesce=scheduler_coalesce,
  223. misfire_grace_time=scheduler_misfire_grace_time
  224. )
  225. # 从配置文件读取定时设置
  226. # 确保配置已加载
  227. self._ensure_config_initialized()
  228. schedule_config = self.config.get("data_update", {}).get("schedule", [])
  229. logger.info(f"读取到的定时配置: {schedule_config}")
  230. # 添加定时任务
  231. for schedule in schedule_config:
  232. try:
  233. hour = schedule.get("hour", "*")
  234. minute = schedule.get("minute", "0")
  235. job_id = f"update_future_data_{hour}_{minute}"
  236. # 创建触发器
  237. trigger = CronTrigger(
  238. hour=hour,
  239. minute=minute,
  240. timezone=scheduler_timezone # 使用前面获取的配置
  241. )
  242. # 如果启用重试,创建重试装饰器
  243. if schedule.get("retry", False):
  244. # 从配置服务获取重试配置
  245. try:
  246. from app.services.config_service import get_int_config
  247. max_retries = get_int_config('retry_max_attempts', 3)
  248. retry_delay = get_int_config('retry_delay_seconds', 300)
  249. except Exception as e:
  250. logger.warning(f"获取重试配置失败,使用默认值: {e}")
  251. max_retries = schedule.get("max_retries", 3)
  252. retry_delay = schedule.get("retry_delay", 300)
  253. @retry(
  254. stop_max_attempt_number=max_retries + 1,
  255. wait_fixed=retry_delay * 1000, # 毫秒
  256. retry_on_exception=lambda e: isinstance(e, Exception)
  257. )
  258. def wrapped_task():
  259. return self.update_future_daily_data()
  260. task_func = wrapped_task
  261. else:
  262. task_func = self.update_future_daily_data
  263. # 添加任务
  264. job = self.scheduler.add_job(
  265. task_func,
  266. trigger=trigger,
  267. id=job_id,
  268. replace_existing=True,
  269. max_instances=1,
  270. coalesce=True
  271. )
  272. # 安全地获取下次运行时间
  273. next_run = job.next_run_time.strftime('%Y-%m-%d %H:%M:%S') if job and hasattr(job, 'next_run_time') and job.next_run_time else '未知'
  274. logger.info(f"添加定时任务: {job_id}, 下次运行时间: {next_run}")
  275. except Exception as e:
  276. logger.error(f"添加定时任务失败: {str(e)}")
  277. # 启动调度器
  278. if not self.scheduler.running:
  279. self.scheduler.start()
  280. logger.info(f"数据更新调度器已启动,当前状态: {self.get_scheduler_status()}")
  281. except Exception as e:
  282. logger.error(f"启动调度器失败: {str(e)}")
  283. # 确保调度器被正确关闭
  284. if self.scheduler and self.scheduler.running:
  285. try:
  286. self.scheduler.shutdown()
  287. except Exception as shutdown_error:
  288. logger.warning(f"关闭失败的调度器时出错: {str(shutdown_error)}")
  289. def stop_scheduler(self):
  290. """
  291. 停止定时任务调度器
  292. """
  293. if not self.scheduler:
  294. logger.debug("调度器未初始化,无需停止")
  295. return
  296. if not self.scheduler.running:
  297. # logger.debug(f"调度器未在运行,scheduler_id: {id(self.scheduler)}")
  298. return
  299. logger.info(f"开始停止调度器,scheduler_id: {id(self.scheduler)}")
  300. try:
  301. self.scheduler.shutdown()
  302. logger.info("数据更新调度器已停止")
  303. except Exception as e:
  304. logger.warning(f"停止调度器时出错: {str(e)}")
  305. def get_scheduler_status(self):
  306. """
  307. 获取调度器状态
  308. """
  309. if not self.scheduler:
  310. return {
  311. "status": "未初始化",
  312. "jobs": []
  313. }
  314. try:
  315. jobs = []
  316. if self.scheduler.running:
  317. for job in self.scheduler.get_jobs():
  318. jobs.append({
  319. "id": job.id,
  320. "next_run_time": job.next_run_time.strftime("%Y-%m-%d %H:%M:%S") if job.next_run_time else None,
  321. "trigger": str(job.trigger)
  322. })
  323. return {
  324. "status": "运行中" if self.scheduler.running else "已停止",
  325. "jobs": jobs
  326. }
  327. except Exception as e:
  328. logger.warning(f"获取调度器状态时出错: {str(e)}")
  329. return {
  330. "status": "获取状态失败",
  331. "jobs": []
  332. }
  333. # 创建服务实例
  334. data_update_service = DataUpdateService()
  335. def init_data_update_service(app):
  336. """
  337. 初始化数据更新服务
  338. Args:
  339. app: Flask应用实例
  340. """
  341. data_update_service.init_app(app)
  342. # 设置应用标记,用于跟踪是否已经初始化
  343. app._future_data_initialized = False
  344. # 在Flask 2.2+中,before_first_request被移除,使用after_request替代
  345. @app.after_request
  346. def after_request_handler(response):
  347. # 检查是否需要初始化数据
  348. if not app._future_data_initialized:
  349. app._future_data_initialized = True
  350. # 在后台线程中执行数据更新
  351. thread = threading.Thread(target=data_update_service.update_future_daily_data)
  352. thread.daemon = True
  353. thread.start()
  354. return response
  355. # 启动定时任务调度器
  356. data_update_service.start_scheduler()
  357. # 应用关闭时停止调度器
  358. @app.teardown_appcontext
  359. def stop_scheduler(exception=None):
  360. data_update_service.stop_scheduler()