| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261 |
- import sys
- import os
- # 添加当前目录到Python路径
- sys.path.append(os.path.dirname(__file__))
- from rag_base import RAGBase
- from typing import List, Any, Iterator
- import pdfplumber
- import math
- class PdfReader(RAGBase):
- """
- pdf读取器,读取处理pdf,整理为markdown格式的文本
- """
- def __init__(self):
- self.page_content = [] # 每一页的文本内容
- def reset(self):
- self.__init__()
- def read(self, read_path: str, password:str=None):
- """
- 读取pdf
- Args:
- read_path: pdf文件路径
- password: 密码
- Returns:
- """
- # 文件存在性检查
- self.is_path_exist(read_path)
- # 位置元组:x0, top, x1, bottom
- # x0:表示该元素矩形区域左边缘到页面最左侧的横向距离
- # top:表示该元素矩形区域上边缘到页面最顶部的纵向距离
- # x1:表示该元素矩形区域右边缘到页面最左侧的横向距离
- # bottom:表示该元素矩形区域下边缘到页面最顶部的纵向距离
- with pdfplumber.open(read_path, password=password) as pdf:
- # 如果pdf页数太多,应该选择分页处理,
- for page_num, page in enumerate(pdf.pages):
- # 处理一页pdf内容
- one_page_text = self.process(page) # 一页文本
- if one_page_text:
- self.page_content.append(one_page_text)
- def process(self, page)->List[dict]:
- """
- 处理每页pdf,输出文本
- Args:
- page:
- Returns:
- """
- # 提取页面中的所有文本行,并获取其位置信息,每行文本是一个list元素
- lines = page.extract_text_lines(keep_blank_chars=True,
- extra_attrs=["fontname", "size"])
- # 先剔除页码行或数字孤行
- lines = [_ for _ in lines if not self.is_page_number(_['text'])]
- # 处理空白页
- if len(lines) == 0:
- return ''
- # 按照top值对行进行排序(升序)
- self.quick_sort(lines, 0, len(lines) - 1)
- # 提取页面中的表格,并获取每个表格的位置信息
- tables = page.find_tables() # 使用 find_tables() 检测表格
- table_data = []
- if tables:
- for table in tables:
- # 处理表格文本
- table_text = self.process_table(table)
- # 获取表格的边界框 (x0, top, x1, bottom)
- bbox = table.bbox
- table_data.append({
- 'text': table_text,
- 'x0': bbox[0], # 表格的位置信息
- 'top': bbox[1], # 表格的位置信息
- 'x1': bbox[2], # 表格的位置信息
- 'bottom': bbox[3], # 表格的位置信息
- })
- # 将表格内容插入行文本,首先我们需要先剔除行文本中的重复表格
- # 规则:如果行文本中的top介于表格位置的top和bottom之间,应该被剔除
- new_lines = []
- table_count = len(table_data)
- for line in lines:
- flag = True
- for j in range(table_count):
- # 剔除冗余表
- if (line.get('top') >= table_data[j].get('top')) and (line.get('top') <= table_data[j].get('bottom')):
- flag = False
- break
- if flag:
- new_lines.append(line)
- # 将表格内容插入新建立的行文本
- new_lines += table_data
- self.quick_sort(new_lines, 0, len(new_lines) - 1)
- # 页面文本融合
- #page_text = self.join_(new_lines)
- page_text = new_lines
- return page_text
- @staticmethod
- def process_table(table)->str:
- """
- 处理表格,输出表格文本
- Args:
- table: pdfplumber.table.Table
- Returns: 表格文本
- """
- # 提取表格数据
- extracted_table = table.extract()
- # 处理非str值
- for i in range(len(extracted_table)):
- for j in range(len(extracted_table[i])):
- extracted_table[i][j] = str(extracted_table[i][j]).strip()
- extracted_table[i][j] = extracted_table[i][j].replace('\n','')
- extracted_table[i][j] = extracted_table[i][j].replace('None','')
- # 将上述二维list表格处理为纯markdown文本
- table_text = ['|' + '|'.join(extracted_table[0]) + '|', '|---' * len(extracted_table[0]) + '|']
- for table_line in extracted_table[1:]:
- for i in range(len(table_line)):
- table_line[i] = table_line[i].replace('\n', ' ').strip()
- table_text.append('|' + ' | '.join(table_line) + '|')
- table_text = '\n'.join(table_text) # 融合为最终大文本
- return table_text
- def write(self, write_path: str, mode='w', encoding='utf-8', *args, **kwargs):
- # 逐页保存
- with open(write_path, mode, encoding=encoding) as f:
- for page in self.page_content:
- f.write(self.join_(page)) # 融合在此处,更省内存
- self.reset()
- def join_(self, page_list)->str:
- """
- 将每一页pdf提取的文字和表格进行融合,输出一个文本text
- Args:
- page_list: pdf提取的文本和表格
- Returns: 拼接后的text
- """
- text = ''
- text_tem_list = page_list.copy()
- # 启发式分段,根据文本位置分析
- # 左侧页边距统计
- statistic_dict = {}
- for i in text_tem_list:
- d = str(int(i.get('x0'))) # 当前元素左侧页边距
- if d not in statistic_dict.keys():
- statistic_dict[d] = 1
- else:
- statistic_dict[d] += 1
- # 处理缩进都不一样的页
- if max(statistic_dict.values()) == min(statistic_dict.values()):
- for i in text_tem_list:
- i['text'] = i['text'] + '\n'
- else:
- # 处理正文页
- max_key = max(statistic_dict, key=statistic_dict.get)
- page_left_distance = int(max_key) + 1 # 正文非段首文字页边距
- for i in text_tem_list:
- if self.is_title(i['text']) : # 正则匹配到标题行
- # 为标题行增加回车符
- i['text'] = i['text'] + '\n'
- elif i['x0'] > page_left_distance:
- # 段首增加回车符
- i['text'] = '\n' + i['text']
- # 融合所有行
- text = ''.join([_.get('text') for _ in text_tem_list])
- # 空格替换
- text = text.replace(' ', '')
- # 回车符替换
- text = text.replace('\n\n\n', '\n')
- text = text.replace('\n\n', '\n')
- return text
- def text_generator(self, *args: Any, **kwargs: Any) -> Iterator[str]:
- """每次迭代返回一页文本"""
- # 每100页为1组
- const_num = 100
- for i in range(math.ceil(len(self.page_content) / const_num)):
- # 融合行文本
- group_content =[self.join_(page_cont) for page_cont in self.page_content[i*const_num:i*const_num + const_num:1]]
- group_content = ''.join(group_content)
- yield group_content
- @staticmethod
- def quick_sort_part(arr: list[dict], low: int, high: int):
- """
- 快速排序内层函数
- Args:
- arr: 待排序数组, 结构如同list[dict], 每个dict包括{text, x0 top x1 bottom}, 以top值进行排序
- low: 左边界
- high: 右边界
- Returns:排序后的基准值索引
- """
- if low >= high:
- return None
- # 设定基准值
- left, right = low, high
- pivot = arr[low].get('top')
- # 右边放大数,左边放小数
- while left < right: # 做一趟排序
- # 先从右面开始找比基准值小的数
- while left < right and arr[right].get('top') >= pivot:
- right -= 1
- # 在右面找到了比基准值小的数,执行一次交换
- if left < right:
- arr[left], arr[right] = arr[right], arr[left]
- left += 1
- # 在左面开始找比基准值大的数
- while left < right and arr[left].get('top') <= pivot:
- left += 1
- # 在左面找到了大于基准值的数, 执行一次交换
- if left < right:
- arr[left], arr[right] = arr[right], arr[left]
- right -= 1
- return left # 返回基准值索引
- def quick_sort(self, arr: list[dict], low: int, high: int):
- """
- 快排序外层函数
- Args:
- arr: 待排序数组
- low: 左侧索引
- high: 右侧索引
- Returns:
- """
- if low >= high:
- return
- # 先排一趟
- mid = self.quick_sort_part(arr, low, high)
- # 排左边
- self.quick_sort(arr, low, mid-1)
- # 排右边
- self.quick_sort(arr, mid+1, high)
- if __name__ == '__main__':
- path = r'D:\code\tem_1103\(0119)沭阳县循环经济产业园污废水资源化项目方案简述(3).pdf'
- #path = r'D:\code\rag_tools\RAG资料库\项目模板-资料整理-to\1、工艺包资料\方案模板、PPT\TP-0 工艺技术包汇总及说明PPT-V2.pdf'
- #path = r'D:\code\rag_tools\RAG资料库\工艺数据相关\RO计算2.pdf'
- reader = PdfReader()
- reader.read(path)
- for i in reader.text_generator():
- print(i)
- reader.write('pdf_test.md', mode='w')
|