|
|
@@ -1,24 +1,35 @@
|
|
|
import os
|
|
|
from datetime import datetime
|
|
|
-from pdf2image import convert_from_path
|
|
|
+from pdf2image import convert_from_path, pdfinfo_from_path
|
|
|
from paddleocr import PaddleOCR
|
|
|
+from paddleocr import PPStructure
|
|
|
import re
|
|
|
import numpy as np
|
|
|
import logging
|
|
|
|
|
|
# 配置日志记录
|
|
|
-logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
|
|
+logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(filename)s - %(message)s')
|
|
|
|
|
|
-# 初始化 PaddleOCR,指定使用中文和英文模型,并开启表格识别
|
|
|
-# 注意:首次运行时会自动下载模型文件
|
|
|
+# 初始化 PaddleOCR,指定使用中文和英文模型
|
|
|
+# 主要用于非表格文本识别或作为备用
|
|
|
try:
|
|
|
- # 尝试CPU版本,如果需要GPU加速且环境支持,可改为 use_gpu=True
|
|
|
- ocr = PaddleOCR(use_angle_cls=True, lang='ch', use_gpu=False, show_log=False, use_space_char=True, table=True)
|
|
|
- logging.info("PaddleOCR 初始化成功 (CPU)。")
|
|
|
+ # 尝试CPU版本
|
|
|
+ ocr = PaddleOCR(use_angle_cls=True, lang='ch', use_gpu=False, show_log=False, use_space_char=True)
|
|
|
+ logging.info("PaddleOCR (基础 OCR) 初始化成功 (CPU)。")
|
|
|
except Exception as e:
|
|
|
logging.error(f"PaddleOCR 初始化失败: {e}")
|
|
|
ocr = None # 标记初始化失败
|
|
|
|
|
|
+# 初始化 PPStructure,专门用于表格识别
|
|
|
+try:
|
|
|
+ # layout=False 表示我们主要关心表格内容识别,而不是完整的文档布局分析
|
|
|
+ # 如果需要更复杂的文档结构分析,可以将 layout 设置为 True 或使用默认值
|
|
|
+ table_engine = PPStructure(lang='ch', show_log=False, layout=False)
|
|
|
+ logging.info("PPStructure (表格引擎) 初始化成功。")
|
|
|
+except Exception as e:
|
|
|
+ logging.error(f"PPStructure 初始化失败: {e}")
|
|
|
+ table_engine = None # 标记初始化失败
|
|
|
+
|
|
|
class PdfProcessor:
|
|
|
"""
|
|
|
处理单个 PDF 文件,提取记录日期和持仓信息。
|
|
|
@@ -33,7 +44,9 @@ class PdfProcessor:
|
|
|
self.pdf_path = pdf_path
|
|
|
self.record_date = self.extract_record_date()
|
|
|
if not ocr:
|
|
|
- logging.warning("PaddleOCR 未成功初始化,extract_holdings 将无法工作。")
|
|
|
+ logging.warning("PaddleOCR 未成功初始化,通用 OCR 功能可能受限。")
|
|
|
+ if not table_engine:
|
|
|
+ logging.warning("PPStructure 未成功初始化,extract_holdings 将无法进行表格识别。")
|
|
|
|
|
|
|
|
|
def extract_record_date(self):
|
|
|
@@ -64,144 +77,163 @@ class PdfProcessor:
|
|
|
def _extract_codes_from_table(self, table_html):
|
|
|
""" 从识别出的表格HTML中提取股票代码 """
|
|
|
codes = set() # 使用集合去重
|
|
|
- # 简单的HTML解析,提取<td>标签内容
|
|
|
- # 注意:PaddleOCR返回的HTML结构可能变化,这里需要健壮性处理
|
|
|
- # 找到<tbody>开始的部分
|
|
|
- tbody_match = re.search(r'<tbody.*?>(.*?)</tbody>', table_html, re.DOTALL | re.IGNORECASE)
|
|
|
- if not tbody_match:
|
|
|
- logging.warning("未能在表格HTML中找到 tbody。")
|
|
|
- return list(codes)
|
|
|
-
|
|
|
- tbody_content = tbody_match.group(1)
|
|
|
-
|
|
|
# 查找所有行 <tr>
|
|
|
- rows = re.findall(r'<tr.*?>(.*?)</tr>', tbody_content, re.DOTALL | re.IGNORECASE)
|
|
|
- if not rows:
|
|
|
- logging.warning("未能在表格 tbody 中找到任何行 <tr>。")
|
|
|
+ rows_html = re.findall(r'<tr.*?>(.*?)</tr>', table_html, re.DOTALL | re.IGNORECASE)
|
|
|
+ if not rows_html:
|
|
|
+ logging.warning("未能在表格 HTML 中找到任何行 <tr>。")
|
|
|
return list(codes)
|
|
|
|
|
|
- header_cells_text = []
|
|
|
- if rows:
|
|
|
- header_cells = re.findall(r'<td.*?>(.*?)</td>', rows[0], re.DOTALL | re.IGNORECASE)
|
|
|
- header_cells_text = [re.sub('<[^<]+?>', '', cell).strip() for cell in header_cells] # 清理HTML标签
|
|
|
-
|
|
|
- code_col_idx = self._find_code_column_index(header_cells_text)
|
|
|
-
|
|
|
- if code_col_idx == -1:
|
|
|
- logging.warning(f"未能在表头 {header_cells_text} 中找到'代码'列。")
|
|
|
+ header_row_idx = -1
|
|
|
+ code_col_idx = -1
|
|
|
+
|
|
|
+ # 遍历所有行以查找表头和代码列
|
|
|
+ for idx, row_content in enumerate(rows_html):
|
|
|
+ cells = re.findall(r'<td.*?>(.*?)</td>', row_content, re.DOTALL | re.IGNORECASE)
|
|
|
+ cells_text = [re.sub('<[^<]+?>', '', cell).strip() for cell in cells]
|
|
|
+
|
|
|
+ # 尝试在该行中查找代码列
|
|
|
+ current_code_col_idx = self._find_code_column_index(cells_text)
|
|
|
+ if current_code_col_idx != -1:
|
|
|
+ header_row_idx = idx
|
|
|
+ code_col_idx = current_code_col_idx
|
|
|
+ logging.info(f"在第 {idx+1} 行找到表头,'代码'列索引: {code_col_idx}")
|
|
|
+ break # 找到第一个包含代码列的行就认为是表头
|
|
|
+
|
|
|
+ # 如果未找到表头行或代码列
|
|
|
+ if header_row_idx == -1:
|
|
|
+ logging.warning("未能在任何行中找到包含'代码'列的表头。")
|
|
|
return list(codes)
|
|
|
- logging.info(f"找到'代码'列索引: {code_col_idx}")
|
|
|
-
|
|
|
|
|
|
- # 从第二行开始(跳过表头)处理数据行
|
|
|
- for row_html in rows[1:]:
|
|
|
- cells = re.findall(r'<td.*?>(.*?)</td>', row_html, re.DOTALL | re.IGNORECASE)
|
|
|
+ # 从表头的下一行开始处理数据行
|
|
|
+ for row_html_content in rows_html[header_row_idx + 1:]:
|
|
|
+ cells = re.findall(r'<td.*?>(.*?)</td>', row_html_content, re.DOTALL | re.IGNORECASE)
|
|
|
if len(cells) > code_col_idx:
|
|
|
# 提取目标列单元格内容并清理HTML标签
|
|
|
cell_content_raw = cells[code_col_idx]
|
|
|
cell_content = re.sub('<[^<]+?>', '', cell_content_raw).strip()
|
|
|
- # 清理常见的OCR错误,例如 'B' -> '8', 'O'/'o' -> '0'
|
|
|
+ # 清理常见的OCR错误,例如 'B' -> '8', 'O'/'o' -> '0'
|
|
|
cell_content = cell_content.replace('B', '8').replace('O', '0').replace('o', '0')
|
|
|
# 提取所有连续的6位数字作为潜在代码
|
|
|
potential_codes = re.findall(r'\b(\d{6})\b', cell_content)
|
|
|
for code in potential_codes:
|
|
|
# 验证是否是常见的A股代码开头
|
|
|
- if code.startswith(('0', '3', '6', '8')): # 增加科创板'8'开头
|
|
|
+ if code.startswith(('0', '1', '3', '5', '6', '8')): # 增加科创板'8'开头,'1'开头为转债,'5'为ETF
|
|
|
codes.add(code)
|
|
|
else:
|
|
|
logging.debug(f"忽略无效代码格式: {code} in cell '{cell_content}'")
|
|
|
-
|
|
|
else:
|
|
|
- logging.warning(f"行数据单元格数量 ({len(cells)}) 少于代码列索引 ({code_col_idx}),跳过此行: {row_html}")
|
|
|
+ # 如果行单元格数不足,记录警告,但继续处理下一行,以防表格结构不规则
|
|
|
+ logging.warning(f"行数据单元格数量 ({len(cells)}) 少于代码列索引 ({code_col_idx}),尝试跳过此行。 行内容: {row_html_content[:100]}...")
|
|
|
|
|
|
return list(codes)
|
|
|
|
|
|
|
|
|
def extract_holdings(self):
|
|
|
"""
|
|
|
- 解析 PDF 文件(特别是最后一页),提取持仓表格中的股票代码。
|
|
|
- 使用 pdf2image 和 PaddleOCR。
|
|
|
+ 解析 PDF 文件,从最后一个包含股票代码的表格里提取持仓代码。
|
|
|
+ 使用 pdf2image 转换页面,PPStructure 进行表格识别。
|
|
|
|
|
|
Returns:
|
|
|
list: 包含股票代码的列表。如果解析失败或找不到表格,返回空列表。
|
|
|
"""
|
|
|
- if not ocr:
|
|
|
- logging.error("PaddleOCR 未初始化,无法提取持仓。")
|
|
|
+ if not table_engine:
|
|
|
+ logging.error("PPStructure 未初始化,无法提取表格持仓。")
|
|
|
return []
|
|
|
if not self.pdf_path or not os.path.exists(self.pdf_path):
|
|
|
logging.error(f"PDF 文件路径无效或文件不存在: {self.pdf_path}")
|
|
|
return []
|
|
|
|
|
|
logging.info(f"开始处理文件: {self.pdf_path}")
|
|
|
- extracted_codes = []
|
|
|
|
|
|
try:
|
|
|
- # 1. PDF 转图片 (尝试只转最后一页以提高效率)
|
|
|
+ # 1. PDF 转图片 (处理后3页以提高找到表格的可能性)
|
|
|
# 注意:poppler路径可能需要根据系统配置调整
|
|
|
# 在Windows上,可能需要指定 poppler_path='C:/path/to/poppler/bin'
|
|
|
try:
|
|
|
- images = convert_from_path(self.pdf_path, first_page=None, last_page=1, dpi=300) # 先尝试只转第一页,如果需要可以改回最后一页
|
|
|
- if not images:
|
|
|
- images = convert_from_path(self.pdf_path, dpi=300) # 如果第一页失败,尝试转所有页
|
|
|
- # target_image = images[-1] # 假设表格在最后一页
|
|
|
- target_image = images[0] # 改为处理第一页,根据实际pdf调整
|
|
|
- logging.info(f"PDF 页面成功转换为图片。")
|
|
|
+ # 使用 pdfinfo_from_path 高效获取总页数
|
|
|
+ info = pdfinfo_from_path(self.pdf_path)
|
|
|
+ total_pages = info["Pages"]
|
|
|
+ # total_pages = len(convert_from_path(self.pdf_path, first_page=1, last_page=1, dpi=72)) # 低分辨率只获取页数
|
|
|
+ logging.info(f"PDF总页数: {total_pages}")
|
|
|
+
|
|
|
+ # 处理最后3页或全部页面(如果总页数少于3)
|
|
|
+ start_page = max(1, total_pages - 2) # 确保起始页至少是1
|
|
|
+ logging.info(f"开始处理PDF从第{start_page}页到第{total_pages}页")
|
|
|
+
|
|
|
+ images = convert_from_path(self.pdf_path, first_page=start_page, last_page=total_pages, dpi=300)
|
|
|
+ if not images:
|
|
|
+ logging.warning("无法转换后几页,尝试转换全部页面")
|
|
|
+ images = convert_from_path(self.pdf_path, dpi=300)
|
|
|
+
|
|
|
+ logging.info(f"成功转换了 {len(images)} 页PDF为图片")
|
|
|
except Exception as convert_err:
|
|
|
- logging.error(f"使用 pdf2image 转换 PDF 失败: {convert_err}")
|
|
|
- # 尝试直接用 PaddleOCR 处理 PDF 路径 (如果支持) - 通常不直接支持PDF
|
|
|
- # result = ocr.ocr(self.pdf_path, cls=True) # 这行可能无效,PaddleOCR主要处理图片
|
|
|
- return [] # 转换失败则无法继续
|
|
|
-
|
|
|
- # 将 PIL Image 转换为 numpy array (PaddleOCR 需要)
|
|
|
- img_np = np.array(target_image)
|
|
|
-
|
|
|
- # 2. 使用 PaddleOCR 进行识别 (包括表格)
|
|
|
- logging.info("开始使用 PaddleOCR 进行 OCR 和表格识别...")
|
|
|
- # result 是一个列表,每个元素对应一个检测到的文本框或表格
|
|
|
- # 对于表格,结构信息在 result[i][1][1] 中(通常是html格式)
|
|
|
- result = ocr.ocr(img_np, cls=True) # 尝试不开启表格识别,看文本识别效果
|
|
|
- logging.info(f"PaddleOCR 处理完成,获得 {len(result[0]) if result else 0} 个结果块。")
|
|
|
-
|
|
|
-
|
|
|
- # 3 & 4. 查找表格并提取代码
|
|
|
- found_codes_in_table = False
|
|
|
- if result and result[0]: # 检查结果是否为空
|
|
|
- # 优先查找PaddleOCR直接识别出的表格结构
|
|
|
- table_html_content = None
|
|
|
- for i, block in enumerate(result[0]):
|
|
|
- # block 结构: [[box], (text, confidence)] or for table [[box], ('html', table_html)]
|
|
|
- # 检查是否是表格识别结果
|
|
|
- # 新版PaddleOCR可能直接返回表格html在特定位置
|
|
|
- # 需要根据实际返回结果调整查找逻辑
|
|
|
- # 这里假设表格结果的文本部分是'<table>...'的html字符串
|
|
|
-
|
|
|
- # 尝试从文本块中查找可能的表格标记或内容
|
|
|
- text_content = block[1][0]
|
|
|
- if isinstance(text_content, str) and '<table' in text_content and '<td' in text_content:
|
|
|
- logging.info(f"在结果块 {i} 中找到疑似表格HTML。")
|
|
|
- table_html_content = text_content
|
|
|
- codes_from_html = self._extract_codes_from_table(table_html_content)
|
|
|
- if codes_from_html:
|
|
|
- extracted_codes.extend(codes_from_html)
|
|
|
- found_codes_in_table = True
|
|
|
- logging.info(f"从表格HTML中提取到 {len(codes_from_html)} 个代码。")
|
|
|
- # break # 找到一个表格就处理,假设只有一个目标表格
|
|
|
-
|
|
|
-
|
|
|
- # 如果没有直接找到表格HTML,尝试从所有识别文本中提取6位数字代码
|
|
|
- if not found_codes_in_table:
|
|
|
- logging.warning("未直接找到表格结构,尝试从所有文本中提取6位数字代码。")
|
|
|
- all_text = " ".join([block[1][0] for block in result[0] if isinstance(block[1][0], str)])
|
|
|
- # 清理常见的OCR错误
|
|
|
- all_text = all_text.replace('B', '8').replace('O', '0').replace('o', '0')
|
|
|
- potential_codes = re.findall(r'\b([0368]\d{5})\b', all_text) # 匹配以0,3,6,8开头的6位数字
|
|
|
- if potential_codes:
|
|
|
- extracted_codes.extend(list(set(potential_codes))) # 去重后添加
|
|
|
- logging.info(f"从文本中提取到 {len(set(potential_codes))} 个潜在代码。")
|
|
|
-
|
|
|
- if not extracted_codes:
|
|
|
- logging.warning(f"未能从文件 {os.path.basename(self.pdf_path)} 中提取到任何股票代码。")
|
|
|
-
|
|
|
+ logging.error(f"使用 pdf2image 转换 PDF 失败: {convert_err}")
|
|
|
+ return [] # 转换失败则无法继续
|
|
|
+
|
|
|
+ # 从后向前处理图片,优先处理最后一页
|
|
|
+ for page_idx, page_image in enumerate(reversed(images)):
|
|
|
+ page_num = total_pages - page_idx
|
|
|
+ logging.info(f"正在处理第 {page_num} 页")
|
|
|
+
|
|
|
+ # 将 PIL Image 转换为 numpy array (PPStructure/PaddleOCR 需要)
|
|
|
+ img_np = np.array(page_image)
|
|
|
+
|
|
|
+ # 2. 使用 PPStructure 进行表格识别
|
|
|
+ logging.info(f"开始对第 {page_num} 页使用 PPStructure 进行表格识别...")
|
|
|
+ try:
|
|
|
+ structure_result = table_engine(img_np)
|
|
|
+ logging.info(f"第 {page_num} 页 PPStructure 处理完成,获得 {len(structure_result)} 个结构元素。")
|
|
|
+ except Exception as struct_err:
|
|
|
+ logging.error(f"第 {page_num} 页 PPStructure 处理失败: {struct_err}")
|
|
|
+ structure_result = [] # 出错则结果为空
|
|
|
+
|
|
|
+ if structure_result:
|
|
|
+ page_tables_html = [] # 存储当前页面找到的所有表格HTML
|
|
|
+ # 遍历 PPStructure 返回的所有结构元素
|
|
|
+ for item in structure_result:
|
|
|
+ # 检查元素类型是否为 'table' (忽略大小写)
|
|
|
+ if item.get('type', '').lower() == 'table':
|
|
|
+ logging.info(f"在第 {page_num} 页找到一个 '{item['type']}' 类型的结构")
|
|
|
+ # 检查是否有识别结果 'res' 和 HTML内容 'html'
|
|
|
+ if 'res' in item and isinstance(item['res'], dict) and 'html' in item['res']:
|
|
|
+ table_html = item['res']['html']
|
|
|
+
|
|
|
+ # 新增检查
|
|
|
+ if isinstance(table_html, str) and "群内禁止任何形式的广告" in table_html:
|
|
|
+ logging.info(f"在第 {page_num} 页检测到包含特定广告语的假表格,跳过。")
|
|
|
+ continue # 跳过这个假表格
|
|
|
+
|
|
|
+ # 有些情况下可能缺少外层<table>标签,补上以兼容解析逻辑
|
|
|
+ if isinstance(table_html, str) and not table_html.strip().startswith('<table'):
|
|
|
+ table_html = f'<table>{table_html}</table>'
|
|
|
+ logging.debug("为表格HTML添加了外层 <table> 标签。")
|
|
|
+ page_tables_html.append(table_html)
|
|
|
+ else:
|
|
|
+ logging.warning(f"表格结构检测到,但缺少 'res' 或 'res.html' 内容: {item}")
|
|
|
+
|
|
|
+ # 如果当前页面通过 PPStructure 找到了表格
|
|
|
+ if page_tables_html:
|
|
|
+ logging.info(f"在第 {page_num} 页通过 PPStructure 找到 {len(page_tables_html)} 个表格结构。尝试提取代码...")
|
|
|
+ # 从最后一个检测到的表格开始尝试提取代码 (符合优先处理页面末尾表格的逻辑)
|
|
|
+ for table_html_content in reversed(page_tables_html):
|
|
|
+ codes_from_html = self._extract_codes_from_table(table_html_content)
|
|
|
+ if codes_from_html:
|
|
|
+ # 成功提取到代码,立即去重、排序并返回
|
|
|
+ final_codes = sorted(list(set(codes_from_html)))
|
|
|
+ logging.info(f"在第 {page_num} 页的表格中成功提取到 {len(final_codes)} 个代码,停止处理。代码: {final_codes if len(final_codes) < 10 else final_codes[:10] + ['...']}")
|
|
|
+ return final_codes
|
|
|
+ # 如果当前表格未提取到代码,继续尝试页面上的下一个表格(顺序向前)
|
|
|
+ # 如果当前页面的所有表格都尝试完仍未找到代码,则记录信息
|
|
|
+ logging.info(f"第 {page_num} 页检测到的表格均未成功提取到有效代码。")
|
|
|
+ else:
|
|
|
+ logging.info(f"第 {page_num} 页未通过 PPStructure 找到明确的表格结构。")
|
|
|
+ # 可选:在这里添加使用基础 OCR 结果进行关键词搜索的备用逻辑
|
|
|
+
|
|
|
+ # 循环继续处理上一页 (即 page_num 减小的方向)
|
|
|
+
|
|
|
+ # 将最终的返回和日志移到循环外部
|
|
|
+ logging.warning(f"在检查的页面范围内 ({start_page}-{total_pages}),未能从任何表格中提取到有效股票代码: {os.path.basename(self.pdf_path)}")
|
|
|
+ return [] # 返回空列表
|
|
|
|
|
|
except ImportError as ie:
|
|
|
logging.error(f"缺少必要的库: {ie}。请安装 paddleocr, paddlepaddle, pdf2image。")
|
|
|
@@ -210,11 +242,6 @@ class PdfProcessor:
|
|
|
logging.error(f"处理文件 {self.pdf_path} 时发生错误: {e}", exc_info=True) # exc_info=True 打印堆栈信息
|
|
|
return []
|
|
|
|
|
|
- # 返回去重后的列表
|
|
|
- final_codes = sorted(list(set(extracted_codes)))
|
|
|
- logging.info(f"文件 {os.path.basename(self.pdf_path)} 处理完毕,提取到 {len(final_codes)} 个唯一代码: {final_codes if len(final_codes) < 10 else final_codes[:10] + ['...']}")
|
|
|
- return final_codes
|
|
|
-
|
|
|
# # 本地测试 (取消注释以进行测试)
|
|
|
# if __name__ == '__main__':
|
|
|
# # 确保测试PDF文件存在于指定路径
|