init_trend_info.py 8.5 KB


  1. """
  2. 初始化trend_info表的脚本
  3. 从CSV文件中读取数据,按照规则生成存储趋势信息
  4. 根据规则:
  5. 1. 从"name"解析出对应的字段:
  6. 1.1 首先获得"name"的长度,如果是13,"category"一定是1;如果长度是4或5,"category"一定是2;如果长度是8"category"可能是0或1
  7. 1.2 如果是13,那么最后5个字符就是"extra_info",剩下的就是8个字符
  8. 1.3 如果长度是8,分析最后两个字符,如果是"上涨"或"下跌","category"是0,否则是1
  9. 1.4 针对8个字符的部分,每两个字符为一组,如果"category"是0,对应的分别是"time_range", "amplitude", "speed_type", "trend_type",然后根据内容找到id
  10. 1.5 针对8个字符的部分,每两个字符为一组,如果"category"是1,对应的分别是"time_range", "position", "amplitude", "trend_type",然后根据内容找到id
  11. """
  12. import os
  13. import csv
  14. import logging
  15. from pathlib import Path
  16. from app import db, create_app
  17. from app.models.dimension import (
  18. TrendInfo, DimTimeRange, DimAmplitude,
  19. DimPosition, DimSpeedType, DimTrendType, CandleInfo
  20. )
  21. import pandas as pd
  22. # 配置日志
  23. logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  24. logger = logging.getLogger(__name__)
  25. def load_csv_data():
  26. """加载CSV文件数据"""
  27. csv_path = Path(__file__).parent.parent / 'config' / 'trend_info.csv'
  28. data = []
  29. if not csv_path.exists():
  30. logger.error(f"CSV文件不存在: {csv_path}")
  31. raise FileNotFoundError(f"CSV文件不存在: {csv_path}")
  32. try:
  33. with open(csv_path, 'r', encoding='utf-8') as file:
  34. reader = csv.DictReader(file)
  35. for row in reader:
  36. data.append(row)
  37. logger.info(f"从CSV文件成功加载了{len(data)}条记录")
  38. return data
  39. except Exception as e:
  40. logger.error(f"加载CSV文件时出错: {e}")
  41. raise
  42. def init_trend_info():
  43. """初始化trend_info表"""
  44. # 检查表中是否已有数据
  45. if db.session.query(TrendInfo).count() > 0:
  46. logger.info("trend_info表已有数据,跳过初始化")
  47. return
  48. # 加载CSV数据
  49. try:
  50. data = load_csv_data()
  51. except Exception as e:
  52. logger.error(f"加载CSV数据失败: {e}")
  53. return
  54. # 提前查询所有需要的维度数据
  55. time_ranges = {tr.name: tr for tr in db.session.query(DimTimeRange).all()}
  56. amplitudes = {a.name: a for a in db.session.query(DimAmplitude).all()}
  57. positions = {p.name: p for p in db.session.query(DimPosition).all()}
  58. speed_types = {s.name: s for s in db.session.query(DimSpeedType).all()}
  59. trend_types = {t.name: t for t in db.session.query(DimTrendType).all()}
  60. # 创建所有trend_info记录
  61. trend_records = []
  62. errors = []
  63. for i, row in enumerate(data, 1):
  64. try:
  65. name = row['name'].strip()
  66. if not name:
  67. logger.warning(f"第{i}行缺少name值,跳过")
  68. continue
  69. # 根据name的长度和特征确定category及其他字段
  70. category, time_range_id, amplitude_id, position_id, speed_type_id, trend_type_id, extra_info = parse_name(
  71. name, time_ranges, amplitudes, positions, speed_types, trend_types)
  72. # 创建TrendInfo对象
  73. trend_info = TrendInfo(
  74. category=category,
  75. name=name,
  76. time_range_id=time_range_id,
  77. position_id=position_id,
  78. amplitude_id=amplitude_id,
  79. speed_type_id=speed_type_id,
  80. trend_type_id=trend_type_id,
  81. extra_info=extra_info
  82. )
  83. trend_records.append(trend_info)
  84. except Exception as e:
  85. error_msg = f"处理第{i}行时出错: {e}, 行数据: {row}"
  86. logger.error(error_msg)
  87. errors.append(error_msg)
  88. if errors:
  89. logger.warning(f"初始化过程中有{len(errors)}个错误,请检查日志获取详情")
  90. try:
  91. # 批量添加记录并提交
  92. db.session.add_all(trend_records)
  93. db.session.commit()
  94. logger.info(f"成功初始化 {len(trend_records)} 条trend_info记录")
  95. except Exception as e:
  96. db.session.rollback()
  97. logger.error(f"提交数据到数据库时出错: {e}")
  98. raise
  99. def parse_name(name, time_ranges, amplitudes, positions, speed_types, trend_types):
  100. """
  101. 从name解析出category和其他字段的ID
  102. Args:
  103. name: 趋势名称
  104. time_ranges: 时间范围映射表 {name: object}
  105. amplitudes: 幅度范围映射表 {name: object}
  106. positions: 位置范围映射表 {name: object}
  107. speed_types: 速度类型映射表 {name: object}
  108. trend_types: 趋势类型映射表 {name: object}
  109. Returns:
  110. tuple: (category, time_range_id, amplitude_id, position_id, speed_type_id, trend_type_id, extra_info)
  111. """
  112. name_length = len(name)
  113. # 初始化所有字段为None
  114. category = None
  115. time_range_id = None
  116. amplitude_id = None
  117. position_id = None
  118. speed_type_id = None
  119. trend_type_id = None
  120. extra_info = None
  121. # 根据长度判断category
  122. if name_length == 13:
  123. # 长度为13,一定是category=1,带extra_info
  124. category = 1
  125. extra_info = name[-5:]
  126. main_part = name[:8]
  127. elif name_length in [4, 5]:
  128. # 长度为4或5,一定是category=2,其他字段为空
  129. category = 2
  130. return category, None, None, None, None, None, None
  131. elif name_length == 8:
  132. # 长度为8,需要进一步分析
  133. main_part = name
  134. if name.endswith('上涨') or name.endswith('下跌'):
  135. category = 0
  136. else:
  137. category = 1
  138. else:
  139. # 其他长度,出错
  140. raise ValueError(f"无法处理的名称长度: {name_length}, 名称: {name}")
  141. # 拆分main_part为4个两字符的部分
  142. parts = [main_part[i:i+2] for i in range(0, 8, 2)]
  143. if category == 0:
  144. # 对于category=0,顺序是:时间范围、幅度范围、速度类型、趋势类型
  145. time_range_name, amplitude_name, speed_type_name, trend_type_name = parts
  146. if time_range_name in time_ranges:
  147. time_range_id = time_ranges[time_range_name].id
  148. else:
  149. raise ValueError(f"找不到时间范围: {time_range_name}")
  150. if amplitude_name in amplitudes:
  151. amplitude_id = amplitudes[amplitude_name].id
  152. else:
  153. raise ValueError(f"找不到幅度范围: {amplitude_name}")
  154. if speed_type_name in speed_types:
  155. speed_type_id = speed_types[speed_type_name].id
  156. else:
  157. raise ValueError(f"找不到速度类型: {speed_type_name}")
  158. if trend_type_name in trend_types:
  159. trend_type_id = trend_types[trend_type_name].id
  160. else:
  161. raise ValueError(f"找不到趋势类型: {trend_type_name}")
  162. elif category == 1:
  163. # 对于category=1,顺序是:时间范围、位置范围、幅度范围、趋势类型(震荡)
  164. time_range_name, position_name, amplitude_name, trend_type_name = parts
  165. if time_range_name in time_ranges:
  166. time_range_id = time_ranges[time_range_name].id
  167. else:
  168. raise ValueError(f"找不到时间范围: {time_range_name}")
  169. if position_name in positions:
  170. position_id = positions[position_name].id
  171. else:
  172. raise ValueError(f"找不到位置范围: {position_name}")
  173. if amplitude_name in amplitudes:
  174. amplitude_id = amplitudes[amplitude_name].id
  175. else:
  176. raise ValueError(f"找不到幅度范围: {amplitude_name}")
  177. # 对于category=1,趋势类型固定为"震荡"
  178. if '震荡' in trend_types:
  179. trend_type_id = trend_types['震荡'].id
  180. else:
  181. raise ValueError("找不到趋势类型: 震荡")
  182. return category, time_range_id, amplitude_id, position_id, speed_type_id, trend_type_id, extra_info
  183. if __name__ == "__main__":
  184. logger.info("开始执行trend_info表初始化")
  185. app = create_app()
  186. with app.app_context():
  187. try:
  188. init_trend_info()
  189. logger.info("trend_info表初始化完成")
  190. except Exception as e:
  191. logger.error(f"初始化trend_info表时发生错误: {e}")
  192. raise