import sys import os # 添加当前目录到Python路径 sys.path.append(os.path.dirname(__file__)) from typing import Any, Iterator from rag_base import RAGBase from docx import Document from docx.document import Document as DocumentType from docx.oxml.table import CT_Tbl from docx.oxml.text.paragraph import CT_P from docx.table import Table from docx.text.paragraph import Paragraph import math class DocxReader(RAGBase): """Docx格式文档处理器,实现Docx文档的读写和常用操作""" def __init__(self): self.docx_file = None self.content = None def read(self, read_path): self.is_path_exist(read_path) # 检查文件是否存在 docx_file = Document(read_path) self.docx_file = docx_file self.process() def write(self, write_path: str, mode='w', encoding='utf-8', *args, **kwargs): # 逐行写入文件 with open(write_path, mode, encoding=encoding) as f: for line in self.content: f.write(line + '\n') # 重置 self.reset() def reset(self): self.__init__() def process(self): content = [] # 全部文本的段落和表格 for block in self.iter_block_items(self.docx_file): if isinstance(block, Paragraph): # 处理段落 block = self.process_text_content(block) if block: content.append(block) elif isinstance(block, Table): # 处理表格 block = self.process_table_content(block) if block: content.append(block) self.content = content def text_generator(self, *args: Any, **kwargs: Any) -> Iterator[str]: """每次迭代返回一个文本段落""" # 对于长文本每500个段落为1组,分批处理 const_num = 500 for i in range(math.ceil(len(self.content) / const_num)): content_group = self.content[i*const_num:i*const_num + const_num:1] yield '\n'.join(content_group) @staticmethod def iter_block_items(parent: DocumentType): """ 生成一个文档中所有块级元素(段落和表格)的生成器,按它们在文档中出现的顺序 """ if isinstance(parent, DocumentType): parent_elm = parent.element.body else: raise ValueError("something's not right") for child in parent_elm.iterchildren(): if isinstance(child, CT_P): yield Paragraph(child, parent) # 产生段落 elif isinstance(child, CT_Tbl): yield Table(child, parent) # 产生表格 @staticmethod def process_text_content(block:Paragraph)-> str: if isinstance(block, Paragraph): # 处理段落 text = block.text.strip() return text else: raise ValueError("值类型错误, 应为Paragraph.",block) @staticmethod def process_table_content(block:Table)-> str: if not isinstance(block, Table): raise ValueError("值类型错误, 应为Table.",block) table_lines = [] for i, row in enumerate(block.rows): # 清理单元格内容,移除可能干扰Markdown的字符 row_content = [] for cell in row.cells: cell_text = cell.text.replace('|', '‖').replace('\n', ' ') # 转义管道符,替换换行 row_content.append(cell_text.strip()) # 构建表格行 table_line = '| ' + ' | '.join(row_content) + ' |' table_lines.append(table_line) # 添加表头分隔线(在第二行后) if i == 0 and len(table_lines) == 1: header_separator = '| ' + ' | '.join(['---' for _ in row.cells]) + ' |' table_lines.append(header_separator) return '\n'.join(table_lines) def process_picture_content(self): pass if __name__ == '__main__': """测试代码""" docx_reader = DocxReader() docx_reader.read(r'D:\code\tem_1103\RAG资料库—开发\污水处理项目方案\药剂单页汇总.pdf') for i in docx_reader.text_generator(): print(len(i)) docx_reader.write('docx_test.md','w', 'utf-8')