docx_reader.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. import sys
  2. import os
  3. # 添加当前目录到Python路径
  4. sys.path.append(os.path.dirname(__file__))
  5. from typing import Any, Iterator
  6. from rag_base import RAGBase
  7. from docx import Document
  8. from docx.document import Document as DocumentType
  9. from docx.oxml.table import CT_Tbl
  10. from docx.oxml.text.paragraph import CT_P
  11. from docx.table import Table
  12. from docx.text.paragraph import Paragraph
  13. import math
  14. class DocxReader(RAGBase):
  15. """Docx格式文档处理器,实现Docx文档的读写和常用操作"""
  16. def __init__(self):
  17. self.docx_file = None
  18. self.content = None
  19. def read(self, read_path):
  20. self.is_path_exist(read_path) # 检查文件是否存在
  21. docx_file = Document(read_path)
  22. self.docx_file = docx_file
  23. self.process()
  24. def write(self, write_path: str, mode='w', encoding='utf-8', *args, **kwargs):
  25. # 逐行写入文件
  26. with open(write_path, mode, encoding=encoding) as f:
  27. for line in self.content:
  28. f.write(line + '\n')
  29. # 重置
  30. self.reset()
  31. def reset(self):
  32. self.__init__()
  33. def process(self):
  34. content = [] # 全部文本的段落和表格
  35. for block in self.iter_block_items(self.docx_file):
  36. if isinstance(block, Paragraph):
  37. # 处理段落
  38. block = self.process_text_content(block)
  39. if block:
  40. content.append(block)
  41. elif isinstance(block, Table):
  42. # 处理表格
  43. block = self.process_table_content(block)
  44. if block:
  45. content.append(block)
  46. self.content = content
  47. def text_generator(self, *args: Any, **kwargs: Any) -> Iterator[str]:
  48. """每次迭代返回一个文本段落"""
  49. # 对于长文本每500个段落为1组,分批处理
  50. const_num = 500
  51. for i in range(math.ceil(len(self.content) / const_num)):
  52. content_group = self.content[i*const_num:i*const_num + const_num:1]
  53. yield '\n'.join(content_group)
  54. @staticmethod
  55. def iter_block_items(parent: DocumentType):
  56. """
  57. 生成一个文档中所有块级元素(段落和表格)的生成器,按它们在文档中出现的顺序
  58. """
  59. if isinstance(parent, DocumentType):
  60. parent_elm = parent.element.body
  61. else:
  62. raise ValueError("something's not right")
  63. for child in parent_elm.iterchildren():
  64. if isinstance(child, CT_P):
  65. yield Paragraph(child, parent) # 产生段落
  66. elif isinstance(child, CT_Tbl):
  67. yield Table(child, parent) # 产生表格
  68. @staticmethod
  69. def process_text_content(block:Paragraph)-> str:
  70. if isinstance(block, Paragraph):
  71. # 处理段落
  72. text = block.text.strip()
  73. return text
  74. else:
  75. raise ValueError("值类型错误, 应为Paragraph.",block)
  76. @staticmethod
  77. def process_table_content(block:Table)-> str:
  78. if not isinstance(block, Table):
  79. raise ValueError("值类型错误, 应为Table.",block)
  80. table_lines = []
  81. for i, row in enumerate(block.rows):
  82. # 清理单元格内容,移除可能干扰Markdown的字符
  83. row_content = []
  84. for cell in row.cells:
  85. cell_text = cell.text.replace('|', '‖').replace('\n', ' ') # 转义管道符,替换换行
  86. row_content.append(cell_text.strip())
  87. # 构建表格行
  88. table_line = '| ' + ' | '.join(row_content) + ' |'
  89. table_lines.append(table_line)
  90. # 添加表头分隔线(在第二行后)
  91. if i == 0 and len(table_lines) == 1:
  92. header_separator = '| ' + ' | '.join(['---' for _ in row.cells]) + ' |'
  93. table_lines.append(header_separator)
  94. return '\n'.join(table_lines)
  95. def process_picture_content(self):
  96. pass
  97. if __name__ == '__main__':
  98. """测试代码"""
  99. docx_reader = DocxReader()
  100. docx_reader.read(r'D:\code\tem_1103\RAG资料库—开发\污水处理项目方案\药剂单页汇总.pdf')
  101. for i in docx_reader.text_generator():
  102. print(len(i))
  103. docx_reader.write('docx_test.md','w', 'utf-8')