trade_logic.py 14 KB


  1. # app/services/trade_logic.py
  2. """
  3. Business logic related to TradeRecord generation and synchronization.
  4. """
  5. from app import db
  6. from app.models.trade import TradeRecord
  7. from app.models.transaction import TransactionRecord
  8. from datetime import datetime
  9. import traceback
  10. def generate_trade_from_transactions(transactions):
  11. """从交易记录生成交易汇总记录对象 (但不写入数据库)"""
  12. if not transactions:
  13. # print("没有交易记录可用于生成汇总")
  14. return None
  15. # print(f"从{len(transactions)}条交易记录尝试生成交易汇总")
  16. open_trans = None
  17. close_trans = []
  18. # 找到开仓交易
  19. for trans in transactions:
  20. if trans.position_type is None:
  21. print(f" 警告: 交易ID={trans.id}缺少position_type")
  22. continue
  23. if trans.position_type in [0, 2]: # 开多 or 开空
  24. if not open_trans: # 只取第一个开仓交易
  25. open_trans = trans
  26. elif trans.position_type in [1, 3]: # 平多 or 平空
  27. close_trans.append(trans)
  28. else:
  29. print(f" 警告: 交易ID={trans.id}的仓位类型{trans.position_type}无效")
  30. if not open_trans:
  31. # print(" 没有找到有效的开仓交易")
  32. return None
  33. # 确保开仓和平仓交易匹配
  34. valid_close_trans = []
  35. for trans in close_trans:
  36. if (open_trans.position_type == 0 and trans.position_type == 1) or \
  37. (open_trans.position_type == 2 and trans.position_type == 3):
  38. valid_close_trans.append(trans)
  39. close_trans = valid_close_trans
  40. # 计算平均售价和收益
  41. total_close_amount = sum(t.price * t.volume for t in close_trans if t.price is not None and t.volume is not None)
  42. total_close_volume = sum(t.volume for t in close_trans if t.volume is not None)
  43. average_sale_price = total_close_amount / total_close_volume if total_close_volume > 0 else None
  44. # 计算收益
  45. single_profit = None
  46. if average_sale_price is not None and open_trans.contract_multiplier is not None and open_trans.price is not None and total_close_volume is not None:
  47. try:
  48. if open_trans.position_type == 0: # 多头
  49. single_profit = (average_sale_price - open_trans.price) * total_close_volume * open_trans.contract_multiplier
  50. else: # 空头
  51. single_profit = (open_trans.price - average_sale_price) * total_close_volume * open_trans.contract_multiplier
  52. except TypeError as e:
  53. print(f" 计算收益时发生类型错误: {e}. Open price: {open_trans.price}, Avg sale price: {average_sale_price}, Vol: {total_close_volume}, Multiplier: {open_trans.contract_multiplier}")
  54. single_profit = None
  55. # 计算投资额 (开仓成本)
  56. investment_amount = None
  57. if open_trans.price is not None and open_trans.volume is not None and open_trans.contract_multiplier is not None:
  58. try:
  59. investment_amount = open_trans.price * open_trans.volume * open_trans.contract_multiplier
  60. except TypeError:
  61. print(f" 计算投资额时发生类型错误: Price: {open_trans.price}, Vol: {open_trans.volume}, Multiplier: {open_trans.contract_multiplier}")
  62. investment_amount = None
  63. # 计算投资收益率
  64. investment_profit_rate = single_profit / investment_amount if single_profit is not None and investment_amount and investment_amount != 0 else None
  65. # 计算持仓天数
  66. close_time = max(t.transaction_time for t in close_trans if t.transaction_time) if close_trans else None
  67. holding_days = (close_time - open_trans.transaction_time).days if close_time and open_trans.transaction_time else None
  68. # 计算年化收益率
  69. annual_profit_rate = investment_profit_rate * 365 / holding_days if investment_profit_rate is not None and holding_days and holding_days > 0 else None
  70. # 创建交易汇总记录对象
  71. try:
  72. roll_trade_main_id = getattr(open_trans, 'roll_id', None)
  73. trade = TradeRecord(
  74. roll_trade_main_id=roll_trade_main_id,
  75. contract_code=open_trans.contract_code,
  76. name=open_trans.name,
  77. account=open_trans.account,
  78. strategy_id=open_trans.strategy_ids,
  79. strategy_name=open_trans.strategy_name,
  80. position_type=0 if open_trans.position_type == 0 else 1,
  81. candle_pattern_id=open_trans.candle_pattern_ids,
  82. candle_pattern=open_trans.candle_pattern,
  83. open_time=open_trans.transaction_time,
  84. close_time=close_time,
  85. position_volume=open_trans.volume,
  86. contract_multiplier=open_trans.contract_multiplier,
  87. past_position_cost=investment_amount,
  88. average_sale_price=average_sale_price,
  89. single_profit=single_profit,
  90. investment_profit=single_profit,
  91. investment_profit_rate=investment_profit_rate,
  92. holding_days=holding_days,
  93. annual_profit_rate=annual_profit_rate,
  94. trade_type=open_trans.trade_type,
  95. confidence_index=open_trans.confidence_index,
  96. similarity_evaluation=open_trans.similarity_evaluation,
  97. long_trend_ids=getattr(open_trans, 'long_trend_ids', None),
  98. long_trend_name=getattr(open_trans, 'long_trend_name', None),
  99. mid_trend_ids=getattr(open_trans, 'mid_trend_ids', None),
  100. mid_trend_name=getattr(open_trans, 'mid_trend_name', None)
  101. )
  102. return trade
  103. except Exception as e:
  104. print(f" 创建交易汇总记录对象时出错: {str(e)}")
  105. print(traceback.format_exc())
  106. return None
  107. def update_trade_record(trade_id):
  108. """
  109. 根据关联的 TransactionRecords 重新计算并更新 TradeRecord。
  110. 如果计算结果有效,则更新或创建 TradeRecord。
  111. 如果计算结果无效(例如,没有开仓交易),则删除现有的 TradeRecord。
  112. """
  113. if trade_id is None:
  114. print(" update_trade_record 收到 None trade_id。跳过。")
  115. return {"code": 1, "msg": "trade_id 为空"}
  116. # print(f"正在更新 trade_id: {trade_id} 的 TradeRecord")
  117. try:
  118. existing_trade = TradeRecord.query.get(trade_id) # 使用 get 获取主键
  119. transactions = TransactionRecord.query.filter_by(trade_id=trade_id)\
  120. .order_by(TransactionRecord.transaction_time)\
  121. .all()
  122. if not transactions:
  123. # print(f" 未找到 trade_id {trade_id} 的交易记录。")
  124. if existing_trade:
  125. # print(f" 正在删除现有的 TradeRecord {trade_id} (因无交易记录)。")
  126. db.session.delete(existing_trade)
  127. # else:
  128. # print(f" 无需删除,TradeRecord {trade_id} 不存在。")
  129. # db.session.commit() # 移除此处的 commit
  130. return {"code": 0, "msg": f"已删除无交易记录的 TradeRecord {trade_id}"}
  131. # 根据交易记录生成理论状态
  132. generated_trade_obj = generate_trade_from_transactions(transactions)
  133. if generated_trade_obj:
  134. # print(f" 为 {trade_id} 生成了有效的交易数据。")
  135. if existing_trade:
  136. # print(f" 正在更新现有的 TradeRecord {trade_id}。")
  137. # 从生成的对象更新现有记录的字段
  138. existing_trade.roll_trade_main_id = generated_trade_obj.roll_trade_main_id
  139. existing_trade.contract_code = generated_trade_obj.contract_code
  140. existing_trade.name = generated_trade_obj.name
  141. existing_trade.account = generated_trade_obj.account
  142. existing_trade.strategy_id = generated_trade_obj.strategy_id
  143. existing_trade.strategy_name = generated_trade_obj.strategy_name
  144. existing_trade.position_type = generated_trade_obj.position_type
  145. existing_trade.candle_pattern_id = generated_trade_obj.candle_pattern_id
  146. existing_trade.candle_pattern = generated_trade_obj.candle_pattern
  147. existing_trade.open_time = generated_trade_obj.open_time
  148. existing_trade.close_time = generated_trade_obj.close_time
  149. existing_trade.position_volume = generated_trade_obj.position_volume
  150. existing_trade.contract_multiplier = generated_trade_obj.contract_multiplier
  151. existing_trade.past_position_cost = generated_trade_obj.past_position_cost
  152. existing_trade.average_sale_price = generated_trade_obj.average_sale_price
  153. existing_trade.single_profit = generated_trade_obj.single_profit
  154. existing_trade.investment_profit = generated_trade_obj.investment_profit
  155. existing_trade.investment_profit_rate = generated_trade_obj.investment_profit_rate
  156. existing_trade.holding_days = generated_trade_obj.holding_days
  157. existing_trade.annual_profit_rate = generated_trade_obj.annual_profit_rate
  158. existing_trade.trade_type = generated_trade_obj.trade_type
  159. existing_trade.confidence_index = generated_trade_obj.confidence_index
  160. existing_trade.similarity_evaluation = generated_trade_obj.similarity_evaluation
  161. existing_trade.long_trend_ids = generated_trade_obj.long_trend_ids
  162. existing_trade.long_trend_name = generated_trade_obj.long_trend_name
  163. existing_trade.mid_trend_ids = generated_trade_obj.mid_trend_ids
  164. existing_trade.mid_trend_name = generated_trade_obj.mid_trend_name
  165. else:
  166. # print(f" 正在为 trade_id {trade_id} 创建新的 TradeRecord。")
  167. generated_trade_obj.id = trade_id # 显式设置 ID
  168. db.session.add(generated_trade_obj)
  169. else:
  170. # 无法从交易记录生成有效的交易
  171. # print(f" 为 {trade_id} 生成了无效/不完整的交易数据。")
  172. if existing_trade:
  173. # print(f" 正在删除现有的 TradeRecord {trade_id} (因数据无效/不完整)。")
  174. db.session.delete(existing_trade)
  175. # else:
  176. # print(f" 无需删除,TradeRecord {trade_id} 不存在。")
  177. # db.session.commit() # 移除此处的 commit
  178. # print(f" 成功提交 TradeRecord {trade_id} 的更改。")
  179. return {"code": 0, "msg": f"成功更新 TradeRecord {trade_id}"}
  180. except Exception as e:
  181. db.session.rollback()
  182. print(f" 处理 TradeRecord {trade_id} 时出错: {e}")
  183. print(traceback.format_exc())
  184. return {"code": 1, "msg": f"处理 TradeRecord {trade_id} 时出错: {str(e)}"}
  185. def sync_trades_after_import(trade_ids):
  186. """
  187. 为给定的 trade_id 列表同步 TradeRecords。
  188. 为每个唯一的 trade_id 调用 update_trade_record。
  189. """
  190. if not trade_ids:
  191. print("未提供用于同步的 trade ID。")
  192. return
  193. valid_trade_ids = set()
  194. for tid in trade_ids:
  195. if tid is not None:
  196. try:
  197. valid_trade_ids.add(int(tid))
  198. except (ValueError, TypeError):
  199. print(f" 跳过无效的 trade_id: {tid}")
  200. if not valid_trade_ids:
  201. print("过滤后未找到有效的 trade ID。")
  202. return
  203. print(f"正在为 {len(valid_trade_ids)} 个唯一的 trade ID 同步 TradeRecords...")
  204. errors = []
  205. success_count = 0
  206. for trade_id in valid_trade_ids:
  207. try:
  208. update_trade_record(trade_id)
  209. success_count += 1
  210. except Exception as e:
  211. error_msg = f"同步 trade_id {trade_id} 时发生严重错误: {e}"
  212. print(f" {error_msg}")
  213. print(traceback.format_exc())
  214. errors.append(error_msg)
  215. if not errors:
  216. try:
  217. db.session.commit()
  218. print(" 成功提交所有数据库更改。")
  219. except Exception as e:
  220. db.session.rollback()
  221. commit_error_msg = f"提交数据库事务时发生严重错误: {e}"
  222. print(f" {commit_error_msg}")
  223. print(traceback.format_exc())
  224. errors.append(commit_error_msg)
  225. else:
  226. db.session.rollback()
  227. print(" 检测到错误,正在回滚数据库更改。")
  228. sync_status = "同步完成。"
  229. if errors:
  230. sync_status = f"同步完成,但有 {len(errors)} 个错误。"
  231. print(f"同步期间的错误: {errors}")
  232. print(sync_status)
  233. # 返回同步结果
  234. return {'code': 1 if errors else 0, 'msg': sync_status, 'errors': errors, 'success_count': success_count}
  235. def sync_all_trades_from_transactions():
  236. """
  237. 从所有 TransactionRecords 中同步 TradeRecords,并清理孤立的 TradeRecords。
  238. """
  239. print("开始从所有交易记录中全面同步交易汇总...")
  240. try:
  241. # 从 TransactionRecord 获取所有非空的、唯一的 trade_id
  242. transaction_trade_ids = {item[0] for item in db.session.query(TransactionRecord.trade_id).distinct() if item[0] is not None}
  243. print(f" 从交易记录中找到 {len(transaction_trade_ids)} 个唯一的 trade ID。")
  244. # 从 TradeRecord 获取所有 ID
  245. trade_record_ids = {item[0] for item in db.session.query(TradeRecord.id).distinct() if item[0] is not None}
  246. print(f" 从交易汇总表中找到 {len(trade_record_ids)} 个唯一的 ID。")
  247. # 合并所有需要检查的 ID
  248. all_ids_to_sync = transaction_trade_ids.union(trade_record_ids)
  249. if not all_ids_to_sync:
  250. print(" 数据库中没有任何交易或交易汇总记录可供同步。")
  251. return {'code': 0, 'msg': '没有需要同步的交易。', 'errors': [], 'success_count': 0}
  252. print(f" 共计需要同步 {len(all_ids_to_sync)} 个唯一的 ID。")
  253. # 使用现有的同步逻辑
  254. result = sync_trades_after_import(list(all_ids_to_sync))
  255. print("全面同步完成。")
  256. return result
  257. except Exception as e:
  258. db.session.rollback()
  259. error_msg = f"全面同步期间发生严重错误: {e}"
  260. print(f" {error_msg}")
  261. print(traceback.format_exc())
  262. return {'code': 1, 'msg': error_msg, 'errors': [error_msg], 'success_count': 0}