| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118 |
- import sys
- import os
- # 添加当前目录到Python路径
- sys.path.append(os.path.dirname(__file__))
- from typing import Any, Iterator
- from rag_base import RAGBase
- from docx import Document
- from docx.document import Document as DocumentType
- from docx.oxml.table import CT_Tbl
- from docx.oxml.text.paragraph import CT_P
- from docx.table import Table
- from docx.text.paragraph import Paragraph
- import math
- class DocxReader(RAGBase):
- """Docx格式文档处理器,实现Docx文档的读写和常用操作"""
- def __init__(self):
- self.docx_file = None
- self.content = None
- def read(self, read_path):
- self.is_path_exist(read_path) # 检查文件是否存在
- docx_file = Document(read_path)
- self.docx_file = docx_file
- self.process()
- def write(self, write_path: str, mode='w', encoding='utf-8', *args, **kwargs):
- # 逐行写入文件
- with open(write_path, mode, encoding=encoding) as f:
- for line in self.content:
- f.write(line + '\n')
- # 重置
- self.reset()
- def reset(self):
- self.__init__()
- def process(self):
- content = [] # 全部文本的段落和表格
- for block in self.iter_block_items(self.docx_file):
- if isinstance(block, Paragraph):
- # 处理段落
- block = self.process_text_content(block)
- if block:
- content.append(block)
- elif isinstance(block, Table):
- # 处理表格
- block = self.process_table_content(block)
- if block:
- content.append(block)
- self.content = content
- def text_generator(self, *args: Any, **kwargs: Any) -> Iterator[str]:
- """每次迭代返回一个文本段落"""
- # 对于长文本每500个段落为1组,分批处理
- const_num = 500
- for i in range(math.ceil(len(self.content) / const_num)):
- content_group = self.content[i*const_num:i*const_num + const_num:1]
- yield '\n'.join(content_group)
- @staticmethod
- def iter_block_items(parent: DocumentType):
- """
- 生成一个文档中所有块级元素(段落和表格)的生成器,按它们在文档中出现的顺序
- """
- if isinstance(parent, DocumentType):
- parent_elm = parent.element.body
- else:
- raise ValueError("something's not right")
- for child in parent_elm.iterchildren():
- if isinstance(child, CT_P):
- yield Paragraph(child, parent) # 产生段落
- elif isinstance(child, CT_Tbl):
- yield Table(child, parent) # 产生表格
- @staticmethod
- def process_text_content(block:Paragraph)-> str:
- if isinstance(block, Paragraph):
- # 处理段落
- text = block.text.strip()
- return text
- else:
- raise ValueError("值类型错误, 应为Paragraph.",block)
- @staticmethod
- def process_table_content(block:Table)-> str:
- if not isinstance(block, Table):
- raise ValueError("值类型错误, 应为Table.",block)
- table_lines = []
- for i, row in enumerate(block.rows):
- # 清理单元格内容,移除可能干扰Markdown的字符
- row_content = []
- for cell in row.cells:
- cell_text = cell.text.replace('|', '‖').replace('\n', ' ') # 转义管道符,替换换行
- row_content.append(cell_text.strip())
- # 构建表格行
- table_line = '| ' + ' | '.join(row_content) + ' |'
- table_lines.append(table_line)
- # 添加表头分隔线(在第二行后)
- if i == 0 and len(table_lines) == 1:
- header_separator = '| ' + ' | '.join(['---' for _ in row.cells]) + ' |'
- table_lines.append(header_separator)
- return '\n'.join(table_lines)
- def process_picture_content(self):
- pass
- if __name__ == '__main__':
- """测试代码"""
- docx_reader = DocxReader()
- docx_reader.read(r'D:\code\tem_1103\RAG资料库—开发\污水处理项目方案\药剂单页汇总.pdf')
- for i in docx_reader.text_generator():
- print(len(i))
- docx_reader.write('docx_test.md','w', 'utf-8')
|