import sys import os # 添加当前目录到Python路径 sys.path.append(os.path.dirname(__file__)) from rag_base import RAGBase from typing import List, Any, Iterator import pdfplumber import math class PdfReader(RAGBase): """ pdf读取器,读取处理pdf,整理为markdown格式的文本 """ def __init__(self): self.page_content = [] # 每一页的文本内容 def reset(self): self.__init__() def read(self, read_path: str, password:str=None): """ 读取pdf Args: read_path: pdf文件路径 password: 密码 Returns: """ # 文件存在性检查 self.is_path_exist(read_path) # 位置元组:x0, top, x1, bottom # x0:表示该元素矩形区域左边缘到页面最左侧的横向距离 # top:表示该元素矩形区域上边缘到页面最顶部的纵向距离 # x1:表示该元素矩形区域右边缘到页面最左侧的横向距离 # bottom:表示该元素矩形区域下边缘到页面最顶部的纵向距离 with pdfplumber.open(read_path, password=password) as pdf: # 如果pdf页数太多,应该选择分页处理, for page_num, page in enumerate(pdf.pages): # 处理一页pdf内容 one_page_text = self.process(page) # 一页文本 if one_page_text: self.page_content.append(one_page_text) def process(self, page)->List[dict]: """ 处理每页pdf,输出文本 Args: page: Returns: """ # 提取页面中的所有文本行,并获取其位置信息,每行文本是一个list元素 lines = page.extract_text_lines(keep_blank_chars=True, extra_attrs=["fontname", "size"]) # 先剔除页码行或数字孤行 lines = [_ for _ in lines if not self.is_page_number(_['text'])] # 处理空白页 if len(lines) == 0: return '' # 按照top值对行进行排序(升序) self.quick_sort(lines, 0, len(lines) - 1) # 提取页面中的表格,并获取每个表格的位置信息 tables = page.find_tables() # 使用 find_tables() 检测表格 table_data = [] if tables: for table in tables: # 处理表格文本 table_text = self.process_table(table) # 获取表格的边界框 (x0, top, x1, bottom) bbox = table.bbox table_data.append({ 'text': table_text, 'x0': bbox[0], # 表格的位置信息 'top': bbox[1], # 表格的位置信息 'x1': bbox[2], # 表格的位置信息 'bottom': bbox[3], # 表格的位置信息 }) # 将表格内容插入行文本,首先我们需要先剔除行文本中的重复表格 # 规则:如果行文本中的top介于表格位置的top和bottom之间,应该被剔除 new_lines = [] table_count = len(table_data) for line in lines: flag = True for j in range(table_count): # 剔除冗余表 if (line.get('top') >= table_data[j].get('top')) and (line.get('top') <= table_data[j].get('bottom')): flag = False break if flag: new_lines.append(line) # 将表格内容插入新建立的行文本 new_lines += table_data self.quick_sort(new_lines, 0, len(new_lines) - 1) # 页面文本融合 #page_text = self.join_(new_lines) page_text = new_lines return page_text @staticmethod def process_table(table)->str: """ 处理表格,输出表格文本 Args: table: pdfplumber.table.Table Returns: 表格文本 """ # 提取表格数据 extracted_table = table.extract() # 处理非str值 for i in range(len(extracted_table)): for j in range(len(extracted_table[i])): extracted_table[i][j] = str(extracted_table[i][j]).strip() extracted_table[i][j] = extracted_table[i][j].replace('\n','') extracted_table[i][j] = extracted_table[i][j].replace('None','') # 将上述二维list表格处理为纯markdown文本 table_text = ['|' + '|'.join(extracted_table[0]) + '|', '|---' * len(extracted_table[0]) + '|'] for table_line in extracted_table[1:]: for i in range(len(table_line)): table_line[i] = table_line[i].replace('\n', ' ').strip() table_text.append('|' + ' | '.join(table_line) + '|') table_text = '\n'.join(table_text) # 融合为最终大文本 return table_text def write(self, write_path: str, mode='w', encoding='utf-8', *args, **kwargs): # 逐页保存 with open(write_path, mode, encoding=encoding) as f: for page in self.page_content: f.write(self.join_(page)) # 融合在此处,更省内存 self.reset() def join_(self, page_list)->str: """ 将每一页pdf提取的文字和表格进行融合,输出一个文本text Args: page_list: pdf提取的文本和表格 Returns: 拼接后的text """ text = '' text_tem_list = page_list.copy() # 启发式分段,根据文本位置分析 # 左侧页边距统计 statistic_dict = {} for i in text_tem_list: d = str(int(i.get('x0'))) # 当前元素左侧页边距 if d not in statistic_dict.keys(): statistic_dict[d] = 1 else: statistic_dict[d] += 1 # 处理缩进都不一样的页 if max(statistic_dict.values()) == min(statistic_dict.values()): for i in text_tem_list: i['text'] = i['text'] + '\n' else: # 处理正文页 max_key = max(statistic_dict, key=statistic_dict.get) page_left_distance = int(max_key) + 1 # 正文非段首文字页边距 for i in text_tem_list: if self.is_title(i['text']) : # 正则匹配到标题行 # 为标题行增加回车符 i['text'] = i['text'] + '\n' elif i['x0'] > page_left_distance: # 段首增加回车符 i['text'] = '\n' + i['text'] # 融合所有行 text = ''.join([_.get('text') for _ in text_tem_list]) # 空格替换 text = text.replace(' ', '') # 回车符替换 text = text.replace('\n\n\n', '\n') text = text.replace('\n\n', '\n') return text def text_generator(self, *args: Any, **kwargs: Any) -> Iterator[str]: """每次迭代返回一页文本""" # 每100页为1组 const_num = 100 for i in range(math.ceil(len(self.page_content) / const_num)): # 融合行文本 group_content =[self.join_(page_cont) for page_cont in self.page_content[i*const_num:i*const_num + const_num:1]] group_content = ''.join(group_content) yield group_content @staticmethod def quick_sort_part(arr: list[dict], low: int, high: int): """ 快速排序内层函数 Args: arr: 待排序数组, 结构如同list[dict], 每个dict包括{text, x0 top x1 bottom}, 以top值进行排序 low: 左边界 high: 右边界 Returns:排序后的基准值索引 """ if low >= high: return None # 设定基准值 left, right = low, high pivot = arr[low].get('top') # 右边放大数,左边放小数 while left < right: # 做一趟排序 # 先从右面开始找比基准值小的数 while left < right and arr[right].get('top') >= pivot: right -= 1 # 在右面找到了比基准值小的数,执行一次交换 if left < right: arr[left], arr[right] = arr[right], arr[left] left += 1 # 在左面开始找比基准值大的数 while left < right and arr[left].get('top') <= pivot: left += 1 # 在左面找到了大于基准值的数, 执行一次交换 if left < right: arr[left], arr[right] = arr[right], arr[left] right -= 1 return left # 返回基准值索引 def quick_sort(self, arr: list[dict], low: int, high: int): """ 快排序外层函数 Args: arr: 待排序数组 low: 左侧索引 high: 右侧索引 Returns: """ if low >= high: return # 先排一趟 mid = self.quick_sort_part(arr, low, high) # 排左边 self.quick_sort(arr, low, mid-1) # 排右边 self.quick_sort(arr, mid+1, high) if __name__ == '__main__': path = r'D:\code\tem_1103\(0119)沭阳县循环经济产业园污废水资源化项目方案简述(3).pdf' #path = r'D:\code\rag_tools\RAG资料库\项目模板-资料整理-to\1、工艺包资料\方案模板、PPT\TP-0 工艺技术包汇总及说明PPT-V2.pdf' #path = r'D:\code\rag_tools\RAG资料库\工艺数据相关\RO计算2.pdf' reader = PdfReader() reader.read(path) for i in reader.text_generator(): print(i) reader.write('pdf_test.md', mode='w')