data_scraper.py 29 KB


  1. """
  2. 数据爬取服务
  3. 用于从外部网站获取期货数据
  4. """
  5. import requests
  6. from bs4 import BeautifulSoup
  7. import pandas as pd
  8. import logging
  9. from datetime import datetime
  10. import re
  11. logger = logging.getLogger(__name__)
  12. class FutureDataScraper:
  13. """期货数据爬取类"""
  14. def __init__(self, url=None):
  15. """初始化爬虫"""
  16. # 从配置服务获取数据源URL
  17. if url is None:
  18. try:
  19. from app.services.config_service import get_str_config
  20. url = get_str_config('data_source_url', 'http://121.37.80.177/fees.html')
  21. except Exception as e:
  22. logger.warning(f"获取配置失败,使用默认URL: {e}")
  23. url = "http://121.37.80.177/fees.html"
  24. self.url = url
  25. logger.info(f"期货数据爬虫初始化,URL: {self.url}")
  26. def fetch_future_daily(self):
  27. """
  28. 从网页爬取期货每日数据
  29. 返回一个包含期货数据的DataFrame
  30. """
  31. try:
  32. # 检查当前时间是否在交易时间内
  33. current_time = datetime.now().time()
  34. # 从配置服务获取交易时间
  35. try:
  36. from app.services.config_service import get_str_config
  37. trading_start = get_str_config('trading_start_time', '09:00')
  38. trading_end = get_str_config('trading_end_time', '17:00')
  39. except Exception as e:
  40. logger.warning(f"获取交易时间配置失败,使用默认值: {e}")
  41. trading_start = '09:00'
  42. trading_end = '17:00'
  43. is_trading_hours = (
  44. current_time >= datetime.strptime(trading_start, '%H:%M').time() and
  45. current_time <= datetime.strptime(trading_end, '%H:%M').time()
  46. )
  47. logger.debug(f"当前时间: {current_time}, 交易时间: {trading_start}-{trading_end}, 是否在交易时间内: {is_trading_hours}")
  48. # 发送HTTP请求
  49. logger.debug(f"开始请求URL: {self.url}")
  50. response = requests.get(self.url)
  51. response.raise_for_status() # 如果请求失败则抛出异常
  52. # 尝试多种编码方式
  53. try:
  54. from app.services.config_service import get_str_config
  55. encoding_list = get_str_config('encoding_attempts', 'gb2312,gbk,gb18030,utf-8')
  56. encodings = [enc.strip() for enc in encoding_list.split(',')]
  57. except Exception as e:
  58. logger.warning(f"获取编码配置失败,使用默认值: {e}")
  59. encodings = ['gb2312', 'gbk', 'gb18030', 'utf-8']
  60. html_text = None
  61. for encoding in encodings:
  62. try:
  63. html_text = response.content.decode(encoding)
  64. print(f'成功使用 {encoding} 编码解析')
  65. break
  66. except UnicodeDecodeError:
  67. continue
  68. if html_text is None:
  69. html_text = response.content.decode('utf-8', errors='ignore')
  70. print('使用UTF-8忽略错误模式解析')
  71. # 使用BeautifulSoup解析HTML
  72. soup = BeautifulSoup(html_text, 'html.parser')
  73. # 查找表格
  74. table = soup.find('table')
  75. if not table:
  76. logger.error("未找到数据表格")
  77. logger.debug(f"页面内容前100字符: {html_text[:100]}")
  78. return None
  79. logger.debug("找到数据表格,开始解析")
  80. # 解析表格数据
  81. headers = []
  82. header_row = table.find('tr')
  83. if header_row:
  84. headers = [th.text.strip() for th in header_row.find_all('th')]
  85. logger.debug(f"表头: {headers}")
  86. # 检查表头是否包含预期的中文字段
  87. if not any(('交易所' in h or '合约' in h) for h in headers):
  88. logger.warning("表头可能存在编码问题,没有找到预期的中文字段")
  89. else:
  90. logger.warning("未找到表头行")
  91. rows = []
  92. data_rows = table.find_all('tr')[1:] # 跳过表头行
  93. logger.debug(f"找到 {len(data_rows)} 行数据")
  94. # 用于存储每个品种的所有合约数据
  95. product_contracts = {}
  96. # 查找持仓量列的位置
  97. open_interest_col = -1
  98. for i, header in enumerate(headers):
  99. if '持仓量' in header:
  100. open_interest_col = i
  101. break
  102. # 如果没找到持仓量列,使用经验位置
  103. if open_interest_col == -1:
  104. try:
  105. from app.services.config_service import get_int_config
  106. default_col = get_int_config('open_interest_column_position', 21)
  107. except Exception as e:
  108. logger.warning(f"获取持仓量列配置失败,使用默认值: {e}")
  109. default_col = 21
  110. open_interest_col = default_col if len(headers) > default_col else len(headers) - 2
  111. logger.debug(f"未找到持仓量列,使用经验位置 {open_interest_col} (配置默认值: {default_col})")
  112. else:
  113. logger.debug(f"找到持仓量列位置: {open_interest_col}")
  114. # 第一遍遍历:收集所有合约数据
  115. for row in data_rows:
  116. cols = row.find_all('td')
  117. if not cols:
  118. continue
  119. # 获取合约代码和品种代码
  120. contract_code = cols[1].text.strip()
  121. product_code = ''.join([c for c in contract_code if c.isalpha()]).upper()
  122. # 获取持仓量(用于判断主力合约)
  123. open_interest = 0
  124. if open_interest_col >= 0 and len(cols) > open_interest_col:
  125. oi_text = cols[open_interest_col].text.strip()
  126. try:
  127. # 清理持仓量文本,移除逗号等
  128. import re
  129. oi_clean = re.sub(r'[^\d]', '', oi_text)
  130. if oi_clean:
  131. open_interest = int(oi_clean)
  132. except (ValueError, TypeError):
  133. open_interest = 0
  134. # 将合约信息存储到对应品种的列表中
  135. if product_code not in product_contracts:
  136. product_contracts[product_code] = []
  137. product_contracts[product_code].append((contract_code, open_interest))
  138. # 确定主力合约(根据持仓量)
  139. product_main_contracts = {}
  140. for product_code, contracts in product_contracts.items():
  141. if contracts:
  142. # 按持仓量排序,取持仓量最大的合约
  143. main_contract = max(contracts, key=lambda x: x[1])[0]
  144. max_oi = max(contracts, key=lambda x: x[1])[1]
  145. product_main_contracts[product_code] = main_contract
  146. # logger.debug(f"根据持仓量确定主力合约: {product_code} {main_contract} (持仓量: {max_oi})")
  147. logger.debug(f"识别出 {len(product_main_contracts)} 个主力合约")
  148. # 第二遍遍历:处理所有数据行
  149. for row in data_rows:
  150. cols = row.find_all('td')
  151. if not cols:
  152. continue
  153. # 获取合约代码和品种代码
  154. contract_code = cols[1].text.strip()
  155. product_code = ''.join([c for c in contract_code if c.isalpha()]).upper()
  156. # 判断是否为主力合约
  157. is_main = False
  158. if product_code in product_main_contracts:
  159. if contract_code == product_main_contracts[product_code]:
  160. is_main = True
  161. # logger.debug(f"标记主力合约: {product_code} {contract_code}")
  162. # 收集行数据
  163. row_data = [col.text.strip() for col in cols]
  164. row_data.append(is_main) # 添加主力合约标志
  165. rows.append(row_data)
  166. # 创建DataFrame
  167. headers.append('is_main_contract') # 添加主力合约标志列
  168. # 确保所有行的长度与表头一致
  169. for i, row in enumerate(rows):
  170. if len(row) != len(headers):
  171. logger.warning(f"第{i+1}行数据长度({len(row)})与表头长度({len(headers)})不一致,进行调整")
  172. # 如果行长度不够,用空字符串填充
  173. if len(row) < len(headers):
  174. row.extend([''] * (len(headers) - len(row)))
  175. # 如果行长度超过,截断
  176. elif len(row) > len(headers):
  177. row = row[:len(headers)]
  178. rows[i] = row
  179. df = pd.DataFrame(rows, columns=headers)
  180. # 记录日志
  181. logger.debug(f"成功获取期货数据,共{len(df)}条记录")
  182. logger.debug(f"数据前5行: {df.head()}")
  183. return df
  184. except Exception as e:
  185. logger.error(f"获取期货数据失败: {str(e)}", exc_info=True)
  186. return None
  187. def update_future_daily(self, db_session, FutureDaily):
  188. """
  189. 更新数据库中的future_daily表
  190. 参数:
  191. db_session: 数据库会话
  192. FutureDaily: 期货日数据模型类
  193. 返回:
  194. 更新的记录数量
  195. """
  196. try:
  197. # 获取数据
  198. df = self.fetch_future_daily()
  199. if df is None or df.empty:
  200. logger.error("无法更新期货日数据: 未获取到数据")
  201. return 0
  202. # 打印表头查看具体的字段名
  203. logger.debug(f"表格的字段名: {list(df.columns)}")
  204. # 清空当前数据表
  205. db_session.query(FutureDaily).delete()
  206. # 创建新记录
  207. records = []
  208. update_time = datetime.now()
  209. # 遍历DataFrame中的每一行
  210. for idx, row in df.iterrows():
  211. try:
  212. # 提取合约代码和品种代码
  213. contract_code = row.get('合约代码', '')
  214. if not contract_code:
  215. logger.warning(f"第{idx+1}行没有合约代码,跳过")
  216. continue
  217. # 提取品种代码(合约代码中的字母部分)
  218. product_code = ''.join([c for c in contract_code if c.isalpha()])
  219. # 记录处理的行号和关键字段
  220. # logger.debug(f"处理第{idx+1}行,合约代码: {contract_code}, 产品代码: {product_code}")
  221. # 创建新记录
  222. record = FutureDaily(
  223. exchange=row.get('交易所', ''),
  224. contract_code=contract_code,
  225. contract_name=row.get('合约名称', ''),
  226. product_code=product_code,
  227. product_name=row.get('品种名称', ''),
  228. contract_multiplier=self._safe_float(row.get('合约乘数', 0)),
  229. price_tick=self._safe_float(row.get('最小跳动', 0)),
  230. open_fee_rate=self._safe_float(row.get('开仓费率', 0)),
  231. open_fee=self._safe_float(row.get('开仓费用/手', 0)),
  232. close_fee_rate=self._safe_float(row.get('平仓费率', 0)),
  233. close_fee=self._safe_float(row.get('平仓费用/手', 0)),
  234. close_today_fee_rate=self._safe_float(row.get('平今费率', 0)),
  235. close_today_fee=self._safe_float(row.get('平今费用/手', 0)),
  236. long_margin_rate=self._safe_float(row.get('做多保证金率', 0)),
  237. long_margin_fee=self._safe_float(row.get('做多保证金/手', 0)),
  238. short_margin_rate=self._safe_float(row.get('做空保证金率', 0)),
  239. short_margin_fee=self._safe_float(row.get('做空保证金/手', 0)),
  240. latest_price=self._safe_float(row.get('最新价', 0)),
  241. open_interest=self._safe_int(row.get('持仓量', 0)),
  242. volume=self._safe_int(row.get('成交量', 0)),
  243. is_main_contract=row.get('is_main_contract', False),
  244. update_time=update_time
  245. )
  246. if row.get('is_main_contract', False):
  247. logger.debug(f"保存主力合约到数据库: {product_code} {contract_code}")
  248. records.append(record)
  249. except Exception as e:
  250. logger.error(f"解析期货日数据行失败(行号:{idx+1}): {str(e)}", exc_info=True)
  251. continue
  252. # 批量添加记录
  253. if records:
  254. db_session.add_all(records)
  255. db_session.commit()
  256. logger.debug(f"成功更新期货日数据,共{len(records)}条记录")
  257. return len(records)
  258. else:
  259. logger.warning("无期货日数据可更新")
  260. return 0
  261. except Exception as e:
  262. logger.error(f"更新期货日数据失败: {str(e)}", exc_info=True)
  263. db_session.rollback()
  264. return 0
  265. def _normalize_contract_code(self, contract_code):
  266. """
  267. 标准化合约代码格式
  268. 例如:将 'AP505' 转换为 'AP2505',同时确保字母部分为大写
  269. """
  270. try:
  271. if not contract_code:
  272. return contract_code
  273. # 提取字母部分和数字部分,并将字母转换为大写
  274. letters = ''.join(c for c in contract_code if c.isalpha()).upper()
  275. numbers = ''.join(c for c in contract_code if c.isdigit())
  276. # 如果数字部分是3位数,在前面加上2
  277. if len(numbers) == 3:
  278. numbers = '2' + numbers
  279. return letters + numbers
  280. except Exception as e:
  281. logger.error(f"合约代码格式转换失败: {str(e)}")
  282. return contract_code
  283. def update_future_info_from_daily(self, db_session, FutureInfo, FutureDaily):
  284. """
  285. 根据future_daily表更新future_info表的数据
  286. 参数:
  287. db_session: 数据库会话
  288. FutureInfo: 期货基础信息模型类
  289. FutureDaily: 期货日数据模型类
  290. 返回:
  291. 更新的记录数量
  292. """
  293. try:
  294. # 获取当前数据库中的所有期货基础信息
  295. # 将contract_letter转换为大写用于统一比较
  296. futures = {f.contract_letter.upper(): f for f in db_session.query(FutureInfo).all()}
  297. logger.debug(f"从future_info表获取到{len(futures)}个期货品种")
  298. # 获取最新的future_daily数据的所有产品代码
  299. product_data = {}
  300. # 记录每个品种的主力合约
  301. product_main_contracts = {}
  302. main_contract_records = db_session.query(FutureDaily).filter(FutureDaily.is_main_contract == True).all()
  303. logger.debug(f"查询到{len(main_contract_records)}条主力合约记录")
  304. for daily in main_contract_records:
  305. product_code_upper = daily.product_code.upper()
  306. if product_code_upper not in product_main_contracts:
  307. product_main_contracts[product_code_upper] = self._normalize_contract_code(daily.contract_code)
  308. # logger.debug(f"从数据库获取主力合约: {product_code_upper} -> {daily.contract_code}")
  309. logger.debug(f"整理得到{len(product_main_contracts)}个主力合约: {product_main_contracts}")
  310. # 如果没有找到主力合约,尝试根据持仓量重新识别
  311. if not product_main_contracts:
  312. logger.debug("未找到主力合约,尝试根据持仓量重新识别...")
  313. # 按品种分组,找出每个品种持仓量最大的合约
  314. product_contracts = {}
  315. for daily in db_session.query(FutureDaily).all():
  316. product_code_upper = daily.product_code.upper()
  317. if product_code_upper not in product_contracts:
  318. product_contracts[product_code_upper] = []
  319. product_contracts[product_code_upper].append((daily.contract_code, daily.open_interest or 0))
  320. # 确定每个品种的主力合约
  321. for product_code, contracts in product_contracts.items():
  322. if contracts:
  323. # 按持仓量排序,取持仓量最大的合约
  324. main_contract = max(contracts, key=lambda x: x[1])[0]
  325. max_oi = max(contracts, key=lambda x: x[1])[1]
  326. product_main_contracts[product_code] = self._normalize_contract_code(main_contract)
  327. logger.debug(f"根据持仓量识别主力合约: {product_code} {main_contract} (持仓量: {max_oi})")
  328. logger.debug(f"重新识别出 {len(product_main_contracts)} 个主力合约")
  329. # 只获取主力合约的数据用于更新future_info表
  330. main_daily_records = db_session.query(FutureDaily).filter(FutureDaily.is_main_contract == True).all()
  331. logger.debug(f"查询主力合约数据: 找到{len(main_daily_records)}条记录")
  332. for daily in main_daily_records:
  333. product_code_upper = daily.product_code.upper()
  334. product_data[product_code_upper] = daily
  335. # logger.debug(f"获取主力合约数据: {product_code_upper} -> {daily.contract_code}")
  336. logger.debug(f"从future_daily表获取到{len(product_data)}个主力合约的数据: {list(product_data.keys())}")
  337. # 更新计数器
  338. updated_count = 0
  339. not_found_count = 0
  340. # 更新期货基础信息
  341. logger.debug(f"开始匹配和更新期货基础信息: future_info有{len(futures)}个品种,product_data有{len(product_data)}个品种")
  342. for contract_letter, future in futures.items():
  343. contract_letter_upper = contract_letter.upper()
  344. # logger.debug(f"检查期货品种: {contract_letter_upper} -> 是否在product_data中: {contract_letter_upper in product_data}")
  345. if contract_letter_upper in product_data:
  346. daily = product_data[contract_letter_upper]
  347. # 记录更新前的值
  348. old_values = {
  349. 'exchange': future.exchange,
  350. 'contract_multiplier': future.contract_multiplier,
  351. 'long_margin_rate': future.long_margin_rate,
  352. 'short_margin_rate': future.short_margin_rate,
  353. 'open_fee': future.open_fee,
  354. 'close_fee': future.close_fee,
  355. 'close_today_rate': future.close_today_rate,
  356. 'close_today_fee': future.close_today_fee,
  357. 'current_main_contract': future.current_main_contract
  358. }
  359. # 更新字段
  360. future.exchange = daily.exchange
  361. future.contract_multiplier = daily.contract_multiplier
  362. future.long_margin_rate = daily.long_margin_rate
  363. future.short_margin_rate = daily.short_margin_rate
  364. future.open_fee = daily.open_fee
  365. future.close_fee = daily.close_fee
  366. future.close_today_rate = daily.close_today_fee_rate
  367. future.close_today_fee = daily.close_today_fee
  368. # 根据当前价格和保证金率计算保证金金额
  369. if hasattr(daily, 'latest_price') and daily.latest_price and future.contract_multiplier:
  370. if future.long_margin_rate:
  371. future.long_margin_amount = daily.latest_price * future.contract_multiplier * future.long_margin_rate
  372. if future.short_margin_rate:
  373. future.short_margin_amount = daily.latest_price * future.contract_multiplier * future.short_margin_rate
  374. # 更新主力合约
  375. if contract_letter_upper in product_main_contracts:
  376. future.current_main_contract = product_main_contracts[contract_letter_upper]
  377. # 检查是否有实际更新
  378. has_changes = False
  379. changes = []
  380. for field, old_value in old_values.items():
  381. new_value = getattr(future, field)
  382. if old_value != new_value:
  383. has_changes = True
  384. changes.append(f"{field}: {old_value} -> {new_value}")
  385. if has_changes:
  386. logger.debug(f"更新期货 {future.contract_letter} ({future.name}): {', '.join(changes)}")
  387. updated_count += 1
  388. else:
  389. not_found_count += 1
  390. logger.warning(f"未找到期货 {future.contract_letter} ({future.name}) 的每日数据")
  391. # 更新不在future_info中的主力合约
  392. for product_code, contract_code in product_main_contracts.items():
  393. if product_code not in futures:
  394. # 创建新的期货基础信息记录
  395. daily = product_data.get(product_code)
  396. if daily:
  397. future_info = FutureInfo(
  398. contract_letter=product_code,
  399. name=daily.product_name,
  400. market=0, # 默认为国内市场
  401. exchange=daily.exchange,
  402. contract_multiplier=daily.contract_multiplier,
  403. long_margin_rate=daily.long_margin_rate,
  404. short_margin_rate=daily.short_margin_rate,
  405. open_fee=daily.open_fee,
  406. close_fee=daily.close_fee,
  407. close_today_rate=daily.close_today_fee_rate,
  408. close_today_fee=daily.close_today_fee,
  409. # 计算保证金金额
  410. long_margin_amount=(daily.latest_price * daily.contract_multiplier * daily.long_margin_rate) if daily.latest_price and daily.long_margin_rate else None,
  411. short_margin_amount=(daily.latest_price * daily.contract_multiplier * daily.short_margin_rate) if daily.latest_price and daily.short_margin_rate else None,
  412. current_main_contract=contract_code
  413. )
  414. db_session.add(future_info)
  415. updated_count += 1
  416. logger.debug(f"新增期货品种 {product_code} ({daily.product_name})")
  417. # 提交更改
  418. db_session.commit()
  419. logger.debug(f"根据期货日数据成功更新{updated_count}条期货基础信息,{not_found_count}个期货未找到对应数据")
  420. return updated_count
  421. except Exception as e:
  422. logger.error(f"根据期货日数据更新期货基础信息失败: {str(e)}")
  423. db_session.rollback()
  424. return 0
  425. def update_future_info(self, db_session, FutureInfo):
  426. """
  427. 更新数据库中的期货基础信息 (直接从网页获取)
  428. 参数:
  429. db_session: 数据库会话
  430. FutureInfo: 期货基础信息模型类
  431. 返回:
  432. 更新的记录数量
  433. """
  434. try:
  435. # 获取数据
  436. df = self.fetch_future_daily()
  437. if df is None or df.empty:
  438. logger.error("无法更新期货基础信息: 未获取到数据")
  439. return 0
  440. # 获取当前数据库中的所有期货基础信息
  441. futures = {f.contract_letter: f for f in db_session.query(FutureInfo).all()}
  442. # 更新计数器
  443. updated_count = 0
  444. # 遍历DataFrame中的每一行
  445. for _, row in df.iterrows():
  446. try:
  447. # 提取合约字母
  448. contract_code = row.get('合约代码', '')
  449. if not contract_code:
  450. continue
  451. # 假设合约代码的前1-2位是合约字母
  452. contract_letter = ''.join([c for c in contract_code if c.isalpha()])
  453. # 如果合约字母在数据库中存在,则更新相应字段
  454. if contract_letter in futures:
  455. future = futures[contract_letter]
  456. # 更新字段
  457. future.exchange = row.get('交易所', '')
  458. future.contract_multiplier = self._safe_float(row.get('合约乘数', 0))
  459. future.long_margin_rate = self._safe_float(row.get('做多保证金率', 0))
  460. future.short_margin_rate = self._safe_float(row.get('做空保证金率', 0))
  461. future.open_fee = self._safe_float(row.get('开仓费用/手', 0))
  462. future.close_fee = self._safe_float(row.get('平仓费用/手', 0))
  463. future.close_today_rate = self._safe_float(row.get('平今费率', 0))
  464. future.close_today_fee = self._safe_float(row.get('平今费用/手', 0))
  465. # 更新保证金金额字段(从网站的"做多1手保证金"和"做空1手保证金")
  466. future.long_margin_amount = self._safe_float(row.get('做多1手保证金', 0))
  467. future.short_margin_amount = self._safe_float(row.get('做空1手保证金', 0))
  468. # 如果网站字段名不同,尝试其他可能的字段名
  469. if future.long_margin_amount is None:
  470. future.long_margin_amount = self._safe_float(row.get('1手做多保证金', 0))
  471. if future.short_margin_amount is None:
  472. future.short_margin_amount = self._safe_float(row.get('1手做空保证金', 0))
  473. # 如果是主连合约,更新主力合约字段
  474. if row.get('is_main_contract', False):
  475. future.current_main_contract = contract_code
  476. updated_count += 1
  477. except Exception as e:
  478. logger.error(f"更新单个期货信息失败: {str(e)}")
  479. continue
  480. # 提交更改
  481. db_session.commit()
  482. logger.debug(f"成功更新{updated_count}条期货基础信息")
  483. return updated_count
  484. except Exception as e:
  485. logger.error(f"更新期货基础信息失败: {str(e)}")
  486. db_session.rollback()
  487. return 0
  488. def _safe_float(self, value):
  489. """安全地转换为浮点数"""
  490. try:
  491. if pd.isna(value):
  492. return None
  493. return float(value)
  494. except (ValueError, TypeError):
  495. return None
  496. def _safe_int(self, value):
  497. """安全地转换为整数"""
  498. try:
  499. if pd.isna(value):
  500. return None
  501. return int(value)
  502. except (ValueError, TypeError):
  503. return None