jiyuhang 3 miesięcy temu
commit
0a4fb2c90f
10 zmienionych plików z 1192 dodań i 0 usunięć
  1. 24 0
      .env
  2. 12 0
      .gitignore
  3. 118 0
      Reader/docx_reader.py
  4. 261 0
      Reader/pdf_reader.py
  5. 111 0
      Reader/rag_base.py
  6. 102 0
      main.py
  7. 0 0
      markdown_reader.py
  8. 318 0
      models/deepseek_online.py
  9. 246 0
      reader.py
  10. 0 0
      txt_reader.py

+ 24 - 0
.env

@@ -0,0 +1,24 @@
+# API KEY
+DEEPSEEK_KEY=sk-349578d2ddfc488fbf0792410967203f
+# API 地址
+DEEPSEEK_URL=https://api.deepseek.com/v1/chat/completions
+# 系统指令
+DEEPSEEK_SYSCMD="你是一名文本处理专家,专门用于优化RAG文本,以提升RAG架构的检索效果。用户将输入一个文本,可能存在markdown格式表格。你的任务是根据文本的语义进行分块,确保每个分块之间的语义内容尽可能差异化和独立。同时,为每个分块添加一个简洁的摘要概括。
+关键要求:
+  语义分块:基于文本的语义相似性进行分块,最大化块间差异,避免语义重叠。
+  表格的高级处理:
+    优化与融合:你被授权对表格进行“优化”处理,若文本中存在表格,则已预处理为markdown格式。核心原则是确保表格中的关键信息和数据关系必须被准确无误地保留和呈现。
+    处理方式:你可以选择:
+      保留:如果表格结构能最清晰地展示信息,则保留其markdown格式,并将其与相关的说明文字整合在同一分块内。
+      删减与转述:如果表格内容冗长或可读性差,你可以删减该表格,但必须将其核心信息、数据结论或关键行列内容,用简洁、连贯的叙述性文字重新组织,并无缝融入到分块的正文中。请务必保证转换后的事实准确性。
+  摘要添加:每个分块前放置一个简明摘要,概括块的核心内容。
+  标题添加:为文章生成一个合适的标题。
+  灵活性处理:如果有目录可以忽略目录部分,但你可以从目录中理解文本结构;允许适当调整段落的划分、对不通顺的语句进行内容适当调整,需要保证与原文意思一致和数据准确;如果文本较短或语义连贯,可将整个文本作为一个块。
+  长度保留原则:优化后的总体文本长度应与原始文本基本保持一致。仅允许移除重复表达、冗余修饰和空话,但必须保留所有事实、数据、逻辑关系和关键细节。
+  输出格式:标题单独一行,每个块的摘要和内容放置在同一个段落内,不同块之间用空行分隔。具体格式示例:
+标题
+块1摘要。块1内容
+块2摘要。块2内容
+...
+请严格遵循以上规则处理用户输入,并输出整理后的文本。
+"

+ 12 - 0
.gitignore

@@ -0,0 +1,12 @@
+__pycache__/
+*.pyc
+.idea/
+*.txt
+*.doc
+*.docx
+*.md
+*.ppt
+*.pptx
+*.pdf
+*.xlsx
+*.csv

+ 118 - 0
Reader/docx_reader.py

@@ -0,0 +1,118 @@
+import sys
+import os
+# 添加当前目录到Python路径
+sys.path.append(os.path.dirname(__file__))
+from typing import Any, Iterator
+from rag_base import RAGBase
+from docx import Document
+from docx.document import Document as DocumentType
+from docx.oxml.table import CT_Tbl
+from docx.oxml.text.paragraph import CT_P
+from docx.table import Table
+from docx.text.paragraph import Paragraph
+import math
+
+class DocxReader(RAGBase):
+    """Docx格式文档处理器,实现Docx文档的读写和常用操作"""
+    def __init__(self):
+        self.docx_file = None
+        self.content = None
+
+    def read(self, read_path):
+        self.is_path_exist(read_path) # 检查文件是否存在
+        docx_file = Document(read_path)
+        self.docx_file = docx_file
+        self.process()
+
+    def write(self, write_path: str, mode='w', encoding='utf-8', *args, **kwargs):
+
+        # 逐行写入文件
+        with open(write_path, mode, encoding=encoding) as f:
+            for line in self.content:
+                f.write(line + '\n')
+        # 重置
+        self.reset()
+
+    def reset(self):
+        self.__init__()
+
+    def process(self):
+        content = []  # 全部文本的段落和表格
+        for block in self.iter_block_items(self.docx_file):
+            if isinstance(block, Paragraph):
+                # 处理段落
+                block = self.process_text_content(block)
+                if block:
+                    content.append(block)
+            elif isinstance(block, Table):
+                # 处理表格
+                block = self.process_table_content(block)
+                if block:
+                    content.append(block)
+        self.content = content
+
+    def text_generator(self, *args: Any, **kwargs: Any) -> Iterator[str]:
+        """每次迭代返回一个文本段落"""
+        # 对于长文本每500个段落为1组,分批处理
+        const_num = 500
+        for i in range(math.ceil(len(self.content) / const_num)):
+            content_group = self.content[i*const_num:i*const_num + const_num:1]
+            yield '\n'.join(content_group)
+
+
+    @staticmethod
+    def iter_block_items(parent: DocumentType):
+        """
+        生成一个文档中所有块级元素(段落和表格)的生成器,按它们在文档中出现的顺序
+        """
+        if isinstance(parent, DocumentType):
+            parent_elm = parent.element.body
+        else:
+            raise ValueError("something's not right")
+
+        for child in parent_elm.iterchildren():
+            if isinstance(child, CT_P):
+                yield Paragraph(child, parent)  # 产生段落
+            elif isinstance(child, CT_Tbl):
+                yield Table(child, parent)  # 产生表格
+
+    @staticmethod
+    def process_text_content(block:Paragraph)-> str:
+        if isinstance(block, Paragraph):
+            # 处理段落
+            text = block.text.strip()
+            return text
+        else:
+            raise ValueError("值类型错误, 应为Paragraph.",block)
+
+    @staticmethod
+    def process_table_content(block:Table)-> str:
+        if not isinstance(block, Table):
+            raise ValueError("值类型错误, 应为Table.",block)
+        table_lines = []
+        for i, row in enumerate(block.rows):
+            # 清理单元格内容,移除可能干扰Markdown的字符
+            row_content = []
+            for cell in row.cells:
+                cell_text = cell.text.replace('|', '‖').replace('\n', ' ')  # 转义管道符,替换换行
+                row_content.append(cell_text.strip())
+            # 构建表格行
+            table_line = '| ' + ' | '.join(row_content) + ' |'
+            table_lines.append(table_line)
+            # 添加表头分隔线(在第二行后)
+            if i == 0 and len(table_lines) == 1:
+                header_separator = '| ' + ' | '.join(['---' for _ in row.cells]) + ' |'
+                table_lines.append(header_separator)
+        return '\n'.join(table_lines)
+
+    def process_picture_content(self):
+        pass
+
+
+if __name__ == '__main__':
+    """测试代码"""
+    docx_reader = DocxReader()
+    docx_reader.read(r'D:\code\tem_1103\RAG资料库—开发\污水处理项目方案\药剂单页汇总.pdf')
+    for i in docx_reader.text_generator():
+        print(len(i))
+    docx_reader.write('docx_test.md','w', 'utf-8')

+ 261 - 0
Reader/pdf_reader.py

@@ -0,0 +1,261 @@
+import sys
+import os
+# 添加当前目录到Python路径
+sys.path.append(os.path.dirname(__file__))
+from rag_base import RAGBase
+from typing import List, Any, Iterator
+import pdfplumber
+import math
+
+class PdfReader(RAGBase):
+    """
+    pdf读取器,读取处理pdf,整理为markdown格式的文本
+    """
+    def __init__(self):
+        self.page_content = []  # 每一页的文本内容
+
+    def reset(self):
+        self.__init__()
+
+
+    def read(self, read_path: str, password:str=None):
+        """
+        读取pdf
+        Args:
+            read_path: pdf文件路径
+            password: 密码
+
+        Returns:
+
+        """
+        # 文件存在性检查
+        self.is_path_exist(read_path)
+        # 位置元组:x0, top, x1, bottom
+        # x0:表示该元素矩形区域左边缘到页面最左侧的横向距离
+        # top:表示该元素矩形区域上边缘到页面最顶部的纵向距离
+        # x1:表示该元素矩形区域右边缘到页面最左侧的横向距离
+        # bottom:表示该元素矩形区域下边缘到页面最顶部的纵向距离
+        with pdfplumber.open(read_path, password=password) as pdf:
+            # 如果pdf页数太多,应该选择分页处理,
+            for page_num, page in enumerate(pdf.pages):
+                # 处理一页pdf内容
+                one_page_text = self.process(page)  # 一页文本
+                if one_page_text:
+                    self.page_content.append(one_page_text)
+
+    def process(self, page)->List[dict]:
+        """
+        处理每页pdf,输出文本
+        Args:
+            page:
+
+        Returns:
+
+        """
+        # 提取页面中的所有文本行,并获取其位置信息,每行文本是一个list元素
+        lines = page.extract_text_lines(keep_blank_chars=True,
+                                        extra_attrs=["fontname", "size"])
+        # 先剔除页码行或数字孤行
+        lines = [_ for _ in lines if not self.is_page_number(_['text'])]
+        # 处理空白页
+        if len(lines) == 0:
+            return ''
+
+        # 按照top值对行进行排序(升序)
+        self.quick_sort(lines, 0, len(lines) - 1)
+        # 提取页面中的表格,并获取每个表格的位置信息
+        tables = page.find_tables()  # 使用 find_tables() 检测表格
+        table_data = []
+        if tables:
+            for table in tables:
+                # 处理表格文本
+                table_text = self.process_table(table)
+                # 获取表格的边界框 (x0, top, x1, bottom)
+                bbox = table.bbox
+                table_data.append({
+                    'text': table_text,
+                    'x0': bbox[0],  # 表格的位置信息
+                    'top': bbox[1],  # 表格的位置信息
+                    'x1': bbox[2],  # 表格的位置信息
+                    'bottom': bbox[3],  # 表格的位置信息
+                })
+        # 将表格内容插入行文本,首先我们需要先剔除行文本中的重复表格
+        # 规则:如果行文本中的top介于表格位置的top和bottom之间,应该被剔除
+        new_lines = []
+        table_count = len(table_data)
+        for line in lines:
+            flag = True
+            for j in range(table_count):
+                # 剔除冗余表
+                if (line.get('top') >= table_data[j].get('top')) and (line.get('top') <= table_data[j].get('bottom')):
+                    flag = False
+                    break
+            if flag:
+                new_lines.append(line)
+        # 将表格内容插入新建立的行文本
+        new_lines += table_data
+        self.quick_sort(new_lines, 0, len(new_lines) - 1)
+        # 页面文本融合
+        #page_text = self.join_(new_lines)
+        page_text = new_lines
+        return page_text
+
+    @staticmethod
+    def process_table(table)->str:
+        """
+        处理表格,输出表格文本
+        Args:
+            table: pdfplumber.table.Table
+
+        Returns: 表格文本
+
+        """
+        # 提取表格数据
+        extracted_table = table.extract()
+        # 处理非str值
+        for i in range(len(extracted_table)):
+            for j in range(len(extracted_table[i])):
+                extracted_table[i][j] = str(extracted_table[i][j]).strip()
+                extracted_table[i][j] = extracted_table[i][j].replace('\n','')
+                extracted_table[i][j] = extracted_table[i][j].replace('None','')
+        # 将上述二维list表格处理为纯markdown文本
+        table_text = ['|' + '|'.join(extracted_table[0]) + '|', '|---' * len(extracted_table[0]) + '|']
+        for table_line in extracted_table[1:]:
+            for i in range(len(table_line)):
+                table_line[i] = table_line[i].replace('\n', ' ').strip()
+            table_text.append('|' + ' | '.join(table_line) + '|')
+        table_text = '\n'.join(table_text)  # 融合为最终大文本
+        return table_text
+
+    def write(self, write_path: str, mode='w', encoding='utf-8', *args, **kwargs):
+        # 逐页保存
+        with open(write_path, mode, encoding=encoding) as f:
+            for page in self.page_content:
+                f.write(self.join_(page))  # 融合在此处,更省内存
+        self.reset()
+
+    def join_(self, page_list)->str:
+        """
+        将每一页pdf提取的文字和表格进行融合,输出一个文本text
+        Args:
+            page_list: pdf提取的文本和表格
+
+        Returns: 拼接后的text
+
+        """
+        text = ''
+        text_tem_list = page_list.copy()
+        # 启发式分段,根据文本位置分析
+        # 左侧页边距统计
+        statistic_dict = {}
+        for i in text_tem_list:
+            d = str(int(i.get('x0')))  # 当前元素左侧页边距
+            if d not in statistic_dict.keys():
+                statistic_dict[d] = 1
+            else:
+                statistic_dict[d] += 1
+        # 处理缩进都不一样的页
+        if max(statistic_dict.values()) == min(statistic_dict.values()):
+            for i in text_tem_list:
+                i['text'] = i['text'] + '\n'
+        else:
+            # 处理正文页
+            max_key = max(statistic_dict, key=statistic_dict.get)
+            page_left_distance = int(max_key) + 1  # 正文非段首文字页边距
+            for i in text_tem_list:
+                if self.is_title(i['text']) :  # 正则匹配到标题行
+                    # 为标题行增加回车符
+                    i['text'] = i['text'] + '\n'
+                elif i['x0'] > page_left_distance:
+                    # 段首增加回车符
+                    i['text'] = '\n' + i['text']
+        # 融合所有行
+        text = ''.join([_.get('text') for _ in text_tem_list])
+        # 空格替换
+        text = text.replace(' ', '')
+        # 回车符替换
+        text = text.replace('\n\n\n', '\n')
+        text = text.replace('\n\n', '\n')
+        return text
+
+    def text_generator(self, *args: Any, **kwargs: Any) -> Iterator[str]:
+        """每次迭代返回一页文本"""
+        # 每100页为1组
+        const_num = 100
+        for i in range(math.ceil(len(self.page_content) / const_num)):
+            # 融合行文本
+            group_content =[self.join_(page_cont) for page_cont in self.page_content[i*const_num:i*const_num + const_num:1]]
+            group_content = ''.join(group_content)
+            yield group_content
+
+
+    @staticmethod
+    def quick_sort_part(arr: list[dict], low: int, high: int):
+        """
+        快速排序内层函数
+        Args:
+            arr: 待排序数组, 结构如同list[dict], 每个dict包括{text, x0 top x1 bottom}, 以top值进行排序
+            low: 左边界
+            high: 右边界
+
+        Returns:排序后的基准值索引
+
+        """
+        if low >= high:
+            return None
+
+        # 设定基准值
+        left, right = low, high
+        pivot = arr[low].get('top')
+        # 右边放大数,左边放小数
+        while left < right: # 做一趟排序
+            # 先从右面开始找比基准值小的数
+            while left < right and arr[right].get('top') >= pivot:
+                right -= 1
+            # 在右面找到了比基准值小的数,执行一次交换
+            if left < right:
+                arr[left], arr[right] = arr[right], arr[left]
+                left += 1
+
+            # 在左面开始找比基准值大的数
+            while left < right and arr[left].get('top') <= pivot:
+                left += 1
+            # 在左面找到了大于基准值的数, 执行一次交换
+            if left < right:
+                arr[left], arr[right] = arr[right], arr[left]
+                right -= 1
+        return left # 返回基准值索引
+
+    def quick_sort(self, arr: list[dict], low: int, high: int):
+        """
+        快排序外层函数
+        Args:
+            arr: 待排序数组
+            low: 左侧索引
+            high: 右侧索引
+
+        Returns:
+
+        """
+        if low >= high:
+            return
+
+        # 先排一趟
+        mid = self.quick_sort_part(arr, low, high)
+        # 排左边
+        self.quick_sort(arr, low, mid-1)
+        # 排右边
+        self.quick_sort(arr, mid+1, high)
+
+
+
+
+if __name__ == '__main__':
+    path = r'D:\code\tem_1103\(0119)沭阳县循环经济产业园污废水资源化项目方案简述(3).pdf'
+    #path = r'D:\code\rag_tools\RAG资料库\项目模板-资料整理-to\1、工艺包资料\方案模板、PPT\TP-0 工艺技术包汇总及说明PPT-V2.pdf'
+    #path = r'D:\code\rag_tools\RAG资料库\工艺数据相关\RO计算2.pdf'
+    reader = PdfReader()
+    reader.read(path)
+    for i in reader.text_generator():
+        print(i)
+    reader.write('pdf_test.md', mode='w')

+ 111 - 0
Reader/rag_base.py

@@ -0,0 +1,111 @@
+from abc import ABC, abstractmethod
+from typing import Iterator
+from typing import Any
+import os
+import re
+
+class RAGBase(ABC):
+
+    @abstractmethod
+    def read(self, read_path:str):
+        """
+        从地址读入文件
+        Args:
+            read_path: 需要读取的文件路径
+
+        Returns:
+
+        """
+        pass
+
+    @abstractmethod
+    def write(self, *args, **kwargs):
+        """
+        向地址写入文件
+        Returns:
+
+        """
+        pass
+
+    @abstractmethod
+    def process(self, *args: Any, **kwargs: Any) -> Any:
+        pass
+
+    @abstractmethod
+    def reset(self):
+        """
+        重置状态
+        Returns:
+
+        """
+        pass
+
+    @abstractmethod
+    def text_generator(self, *args: Any, **kwargs: Any) -> Iterator[str]:
+        """
+        文本生成器
+        Args:
+            *args:
+            **kwargs:
+
+        Returns:返回文本
+
+        """
+        pass
+    @staticmethod
+    def is_path_exist(path:str):
+        """
+        判断路径是否存在,如果不存在就抛出异常
+        Args:
+            path: 文件路径
+
+        Returns:
+
+        """
+        if not os.path.exists(path):
+            raise FileNotFoundError('文件不存在!', path)
+
+    @staticmethod
+    def is_title(text: str) -> bool:
+        """
+        判断文本中是否存在各种格式的标题, 检查开头
+        """
+        title_patterns = [
+            # 数字编号标题
+            r'^\d+(\.\d+)+\s*[\u4e00-\u9fa5a-zA-Z].*',
+            r'^\d+\s+[\u4e00-\u9fa5a-zA-Z]',
+            # 中文数字标题
+            r'^[一二三四五六七八九十]+、\s*[\u4e00-\u9fa5].*',
+            # 带括号的中文数字
+            r'^([一二三四五六七八九十]+)\s*[\u4e00-\u9fa5].*',
+            # 章节标题
+            r'^第[一二三四五六七八九十零百千\d]+[章节条]\s*[\u4e00-\u9fa5].*',
+            # 字母编号标题
+            r'^[A-Z]\.\s*[\u4e00-\u9fa5].*',
+            # 半个小括号组合, 1) a)
+            r'^[\d\w]+\)',
+            # 目录
+            r'^目\s*录',
+        ]
+        for pat in title_patterns:
+            if re.search(pat, text):
+                return True
+        return False
+
+    @staticmethod
+    def is_page_number(text: str) -> bool:
+        """
+        判断文本中是否存在各种格式的标题, 检查开头
+        """
+        patterns = [
+            # 纯页码
+            r'^第.+页(?!\S)',
+            # 仅包含数字
+            r'^\d+$',
+            # 仅包含罗马数字
+            '^M{0,3}(CM|CD|D?C{0,3})(XC|XL|L?X{0,3})(IX|IV|V?I{0,3})$'
+        ]
+        for pat in patterns:
+            if re.search(pat, text):
+                return True
+        return False

+ 102 - 0
main.py

@@ -0,0 +1,102 @@
+import time
+from flask import Flask, request, jsonify
+import logging
+import atexit
+import os
+import sys
+import signal
+from reader import Reader
+
+class FlaskAppManager:
+    def __init__(self):
+        # 创建应用
+        self.app = Flask(__name__)
+        # 创建读取器(进程间独立,子线程间共享)
+        self.reader = Reader()
+        self.shutting_down = False
+        self.setup_handlers()
+        self.upload_dir = "./uploads"
+
+    def setup_handlers(self):
+        """设置所有退出处理程序"""
+        # 信号处理
+        signal.signal(signal.SIGINT, self.signal_handler)
+        signal.signal(signal.SIGTERM, self.signal_handler)
+
+        # atexit 处理
+        atexit.register(self.cleanup)
+
+    def signal_handler(self, signum, frame):
+        """信号处理函数"""
+        if self.shutting_down:
+            return  # 已经在关闭中,忽略重复信号
+
+        self.shutting_down = True
+        signal_name = "SIGINT" if signum == signal.SIGINT else "SIGTERM"
+        print(f"\n🚨 接收到 {signal_name} 信号,开始优雅关闭...")
+
+        self.cleanup()
+        sys.exit(0)  # 正常退出
+
+
+    def cleanup(self):
+        """退出Flask清理资源"""
+        if hasattr(self, '_cleaned'):
+            return
+
+        self._cleaned = True
+        print("🔧 清理资源中...")
+
+
+        # 停止 Reader
+        if hasattr(self.reader, 'stop'):
+            print("停止文件读取器...")
+            self.reader.stop()
+
+        print("✅ 资源清理完成")
+
+    def run(self,port=5000, debug=True, threaded=True):
+        """运行应用"""
+        # 启动Reader
+        self.reader.start()
+        print("🚀 启动 RAG Flask 应用...")
+        print("💡 提示: 按 Ctrl+C 可以优雅关闭应用")
+        # 创建文件目录
+        if not os.path.exists(self.upload_dir):
+            os.makedirs(self.upload_dir)
+
+        try:
+            self.app.run(port=port, debug=debug, threaded=threaded)
+        except Exception as e:
+            print(f"❌ 应用异常: {e}")
+        finally:
+            self.cleanup()
+
+# 创建任务管理器
+app_manager = FlaskAppManager()
+@app_manager.app.route('/api/v1/rag_upload', methods=["POST"])
+def rag_upload():
+    response = {"message": "None"}
+    # 接收文件
+    if 'file' not in request.files:
+        return {"message": "未上传文件"}, 400
+    file = request.files['file']
+    if file.filename == '':
+        return {"message": "空文件名"}, 400
+    # 提取文件类型
+    #
+    # 获取文件名(不包括路径)
+    filename = os.path.basename(file.filename)
+    file_path = os.path.join(app_manager.upload_dir, filename)
+    # 保存文件
+    file.save(file_path)
+    # 向读取器添加任务
+    if app_manager.reader.add_task(file_path):
+        response = ({"message":f"文件【{file.filename}】上传成功"}, 200)
+    else:
+        response = ({"message":f"文件【{file.filename}】上传失败"}, 400)
+    print('检查任务队列', app_manager.reader.task_list)
+    return response
+
+if __name__ == '__main__':
+    app_manager.run(port=5000, debug=False, threaded=True)

+ 0 - 0
markdown_reader.py


+ 318 - 0
models/deepseek_online.py

@@ -0,0 +1,318 @@
+import requests
+import os
+import json
+from dotenv import load_dotenv
+load_dotenv()
+
+class Model_:
+    def __init__(self):
+        self.base_url = os.getenv("DEEPSEEK_URL")
+        self.temperature = 0.5
+        self.max_tokens = 1024*6
+        self.headers = {
+            "Content-Type": "application/json",
+            "Authorization": f"Bearer {os.getenv("DEEPSEEK_KEY")}"
+        }
+    def __set_data(self, prompt, stream):
+
+        return {
+            "model": "deepseek-chat",
+            "messages": [
+                {"role": "system", "content": os.getenv("DEEPSEEK_SYSCMD")},
+                {"role": "user", "content": prompt}
+            ],
+            "stream": stream,
+            "temperature": self.temperature,
+            "max_tokens": self.max_tokens
+        }
+    def chat(self, prompt):
+        data = self.__set_data(prompt, stream=False)
+        response = requests.post(self.base_url, headers=self.headers, json=data, timeout=120)
+
+        if response.status_code == 200:
+            result = response.json()
+            return result['choices'][0]['message']['content']
+        else:
+            raise Exception(f"API错误: {response.status_code} - {response.text}")
+
+    def chat_stream(self, prompt):
+
+        data = self.__set_data(prompt, stream=True)
+        response = requests.post(self.base_url, headers=self.headers, json=data, timeout=60*4, stream=True)
+        if response.status_code != 200:
+            raise Exception(f"API错误: {response.status_code} - {response.text}")
+        # 处理流式响应
+        full_response = ""
+        for line in response.iter_lines():
+            if line:
+                # 解码字节串并移除前缀"data: "
+                decoded_line = line.decode('utf-8')
+                if decoded_line.startswith('data: '):
+                    json_str = decoded_line[6:]  # 移除"data: "前缀
+
+                    # 检查是否是结束标记
+                    if json_str == '[DONE]':
+                        break
+
+                    try:
+                        # 解析JSON数据
+                        chunk_data = json.loads(json_str)
+                        if 'choices' in chunk_data and len(chunk_data['choices']) > 0:
+                            delta = chunk_data['choices'][0].get('delta', {})
+                            if 'content' in delta:
+                                content = delta['content']
+                                print(content, end='', flush=True)  # 流式输出
+                                full_response += content
+                    except json.JSONDecodeError:
+                        continue
+
+        print()  # 最后换行
+        return full_response
+
+if __name__ == "__main__":
+    # 测试代码
+    promt = """沭阳县循环经济产业园污废水资源化项目
+技术及可行性分析方案
+金科环境股份有限公司
+二〇二四年一月
+目 录
+1 项目背景	2
+1.1 主要问题及需求	2
+1.1.1 经济发展、招商引资与水资源环境、生态容量的挑战	2
+1.1.2 水生态环境的挑战	2
+1.2 解决途径及必要性	2
+1.2.1 解决废水排放总量不足,释放环境容量的唯一途径	2
+1.2.2 提高开发区竞争力,促进区域经济可持续发展的需求	2
+1.2.3 扩宽工业企业供水的新思路	3
+1.2.4 对国家、地方、行业法律法规与政策的响应	3
+2 项目实施方案	4
+2.1 设计回用水产水规模及水质标准	4
+2.1.1 设计回用水产水规模	4
+2.1.2 设计水质标准	4
+2.2 建设实施方案亮点说明	5
+2.2.1 采用国内领先的“装配式”水厂建造方式	5
+2.2.2 数字化设计与建设(BIM技术)	6
+2.2.3 智慧生产、“无人”值守	7
+2.3 项目设计方案	7
+2.3.1 厂站设计方案	7
+2.3.1.1 项目选址及占地预估	7
+2.3.1.2 工艺确定及水量平衡图	8
+2.3.1.3 工艺主要设计参数	8
+2.3.1.4 预处理工艺 — 除锰单元设计	8
+2.3.2 厂外管线设计方案	10
+3 项目经济效益分析	12
+3.1 投资概算	12
+3.2 运营成本预估	12
+4 项目实施计划及建议	13
+4.1 项目实施计划建议	13
+4.2 项目实施时间安排(初步)	13
+项目背景
+主要问题及需求
+经济发展、招商引资与水资源环境、生态容量的挑战
+循环经济产业园污水处理厂(沂北扎下污水处理厂)设计处理规模 3 万吨/天,目前实际处理规模约1.8万吨/天,但排污口仅有2万吨/天排放容量,而桐昆集团的入驻(2024年7月该企业污水排放量预计约为7000吨/天,至2025年提高至1.2万吨/天)将增加排污口排放负荷及容量(约30km排污管道已建),污染物总量控制,制约了大型工业企业的引进和发展。
+水生态环境的挑战
+沭阳县共设1个国考水质监测断面和7个省考水质监测断面,对监测断面水环境影响较大的污染因子为氨氮、总磷。目前,沭阳县2座工业污水处理厂出水均按《城镇污水处理厂污染物排放标准》(GB18918-2002)一级A 排放标准,达标排放至新沂河,对水生态环境存在潜在污染。
+2022年9月22日,江苏省地方标准《城镇污水处理厂污染排放标准》(报批稿)公式,对于新建及现有城镇污水处理厂,排放标准将更加严格(准III 类、准IV 类水)。
+解决途径及必要性
+综上所述,沭阳县循环产业园污水再生回用及资源化项目迫在眉睫,项目的实施具有以下意义:
+解决废水排放总量不足,释放环境容量的唯一途径
+项目的实施,推进工业废水循环利用,提升工业水资源集约节约利用水平,提高再生水利用率,从根本上解决产业园区废水排放总量不足的问题,促进大型工业企业的引进和发展,特别是具有高品质用水需求的新型行业,如光伏、半导体,芯片…以及对水质要求较高且用水量较大的行业,如热电厂,印染…
+提高开发区竞争力,促进区域经济可持续发展的需求
+将尾水回用于企业,可以极大的削弱污染物排放对水体的污染,避免超标污染物对区域环境及受纳水体下游区域造成影响,对保护当地流域、水系不受污染,改善城市水环境质量有着积极的作用,使人民的生活环境质量逐步提高,为开发区营造良好的生产、生活环境,促进区域的可持续发展作出重要贡献。
+扩宽工业企业供水的新思路
+开发利用城市污水为拓宽供水来源提供了新的思路,是提高供水可靠性的重要手段。通过循环经济产业园污水处理厂尾水作为中水水源,经处理后作为工业企业生产用水的水源,实现了在工业企业内将常规水源(目前以地下水、河水为主)作为第二备用水源的可能性。
+对国家、地方、行业法律法规与政策的响应
+为深入贯彻国家生态文明思想,践行绿水青山就是金山银山理念,坚持“节水优先、空间均衡、系统治理、两手发力”的治水思路,2021年1月,经国务院同意,发展改革委、工业和信息化部等十部委印发了《关于推进污水资源化利用的指导意见》(发改环资〔2021〕13 号),提出“制定工业废水循环利用等实施方案,细化工作重点和主要任务,形成污水资源化利用‘1+N’政策体系”等要求。2021年12月,工业和信息化部等六部委又联合发布了《工业废水循环利用实施方案》(工信部联节〔2021〕213号),以进一步贯彻落实党中央、国务院关于污水资源化利用决策部署,加快推进工业废水循环利用,达到缓解水资源供需矛盾、减少水污染和保障水生态安全,提升工业水资源集约节约利用水平,促进经济社会全面绿色转型的目的。2022年7月,江苏省水利厅发布《江苏省水利厅关于典型地区再生水利用配置试点建设实施方案的复函》中同意宿迁市作为国家典型地区再生水利用配置试点城市之一。
+综上所述,沭阳污废水再生回用及资源化项目的建设实施对今后开发区的招商引资和经济发展至关重要,完全符合沭阳县的整体利益,响应了国家法律法规及环保政策,有效解决当地水污染问题,是政府为民办实事的一个重大举措,是十分必要和迫切的。
+项目实施方案
+设计回用水产水规模及水质标准
+设计回用水产水规模
+沐阳县循环经济产业园主要用水企业为7家,其中5家造纸厂,2家酒精厂,根据对各企业用水情况及再生水需求调查,汇总结果附下:
+| 序号 | 企业名称 | 行业 | 用水水源 | 现制水成本(元/吨水) | 用水量 (吨/天) | 用水水质 | 备注 |
+| --- | --- | --- | --- | --- | --- | --- | --- |
+| 1 | 江苏凯盛纸业 | 造纸 | 河水 | 0.5 | ~7000 | 满足河水水质即可 |  |
+| 2 | 江苏誉凯实业 | 造纸 | 河水 | 0.5 | ~7200 (一期约4000) | 满足河水水质即可 | 含二期 |
+| 3 | 沭阳宁沭纸业 | 造纸 | 河水 | 0.5 | ~1500 | 满足河水水质即可 |  |
+| 4 | 沭阳弘盛纸业 | 造纸 | 河水 | 0.5 | ~1000 | 满足河水水质即可 |  |
+| 5 | 沭阳俊达纸业 | 造纸 | 河水 | 0.5 | ~600 | 满足河水水质即可 |  |
+各企业用水情况及再生水需求调查汇总表
+说明:根据对企业调研,酒精厂主要生产食品级酒精,对再生水无需求,因此上表未统计酒精厂用水情况。
+因此,本再生回用工程拟分二期建设,其中一期设计回用供水规模为1.0万吨/天(含5%的管网漏损),土建预留远期扩容至1.5万吨/天回用供水规模。
+设计水质标准
+本再生回用工程水源取自循环经济产业园污水处理厂(沂北扎下污水处理厂)尾水,设计进水水质为《城镇污水处理厂污染物排放标准》(GB18918-2002)一级A 排放标准,设计回用水水质标准拟满足《城市污水再生利用 工业用水水质》、《城市污水再生利用 城市杂用水水质》基础上,同时须达到当地河水水质(再生水水质应与地表水水质相当,即可满足现有企业要求),主要水质指标如下,其中重点考核:全盐量、总硬度等,
+根据本项目的设计进出水水质预测分析,本项目回用水产水水质普遍优于现状地表水水质。
+| 检测项目 | 单位 | 设计进水水质 (污水处理厂尾水) | 设计产水水质 (水质预估) | 企业用水水质需求 (河水水质) |
+| --- | --- | --- | --- | --- |
+| pH | —— | 6.0~9.0 | 6.0~9.0 | 6.0 ~ 9.0 |
+| CODCr | mg/L | 50 | ≤5 | 约20 |
+| 氨氮 | mg/L | 5(8) | ≤1 | / |
+| 总氮 | mg/L | 15 | ≤3 | / |
+| 总磷 | mg/L | 0.5 | 0 | / |
+| SS | mg/L | 10 | 0 | / |
+| 全盐量 | mg/L | 1330 | ≤133.0 | 367 |
+| 钙 | mg/L | 86.0 | ≤ 8.6 | / |
+| 镁 | mg/L | 58.5 | ≤ 5.9 | / |
+| 总硬度 (以碳酸钙计) | mg/L | 458 | ≤46 | 244 |
+| 铁 | mg/L | 0.22 | 0 | 0.10 |
+| 锰 | mg/L | 1.92 | 0 | 0.02 |
+| 铝 | mg/L | 0.035 | 0 | / |
+| 总碱度 (以碳酸钙计) | mg/L | 131 | ≤6.6 | 200 |
+| 硫酸盐 | mg/L | 514 | ≤51.4 | 78.2 |
+| 氯化物 | mg/L | 324 | ≤32.4 | 68.0 |
+| 氟化物 | mg/L | 0.858 | ≤0.1 | / |
+各企业用水水质需求及设计进水出水质预测表
+说明:根据本项目的设计进出水水质,回用水产水水质优于现状地表水水质,为降低投资及运行成本,可采用勾兑方式,但经过计算,由于原水硫酸盐偏高,产水硫酸盐浓度是勾兑的限制因素(需进一步确认企业是否对硫酸盐有特殊需求),勾兑水量仅为500m3/d,此水量对设计供水规模影响较小,为保证用水水质的稳定达标,本项目暂不考虑勾兑方式。
+建设实施方案亮点说明
+采用国内领先的“装配式”水厂建造方式
+桐昆集团的入驻,根据企业的建设进度及污水排放情况,亟需释放环境容量,高品质再生水厂须在本年度七月份落地通水,若暂定二月初项目立项工作启动,整个项目的建设周期仅有5个月时间,时间已非常紧迫,如果采用传统的工程模式建设,工期时间长,报批手续繁琐,根本无法保证项目工期,进而影响企业的引进和发展。
+为了保证在有限的5个月时间内完成项目建设,大幅缩短工程建设周期,加快工程建设进度,确保系统尽早投产运行,推荐采用装配式水厂的建设模式,将全厂的设备、设施和构/建筑物集成为一个产品化的智能机组,采用装配式模块化设计,工厂预制、现场组装的方式,方便远期扩容。
+水厂采用模块化布置,尺寸约为33.6m×14.3m,配套水箱基础为33.6m×6.5m,配套预处理及进水缓冲池等为33.6m×16.0m,同时土建预留远期0.5万吨/天的扩产能力,土建占地面积仅需4亩。
+“装配式”再生水厂设计效果图
+数字化设计与建设(BIM技术)
+“装配式”再生水厂采用BIM设计,即建筑信息模型,以三维数字技术为基础,将水厂本身及水厂建造过程三维模型化和数据信息化。
+通过BIM技术的采用,可实现设计阶段的协同设计、可视化设计、管线综合设计,并进行工程量的统计以及成本控制。同时,在施工阶段可进行施工场地分析、可视化交底、施工模拟及施工安全管理,将设计与施工的衔接可视化、智能化,减少人为的失误,保证施工质量与工期。
+智慧生产、“无人”值守
+采用BIM技术设计制造,基于数字化建设及运营管理平台,同时移交给用户基于实体水厂的数字孪生水厂,高度自动化控制,配套数字虚拟巡检运营功能,实现少人或无人值守,可有效降低生产运营成本。
+项目设计方案
+厂站设计方案
+项目选址及占地预估
+根据本项目建设实施方案,本工程拟采用集成式装置,集成度高、占地面积小、安装速度快。为加快工程进度,减少征地,故拟在现状空地上实施本工程。
+水厂采用模块化布置,尺寸约为33.6m×14.3m,配套水箱基础为33.6m×6.5m,配套预处理及进水缓冲池等为33.6m×16.0m,同时土建预留远期0.5万吨/天的扩产能力,土建占地面积仅需4亩。
+沂北扎下污水处理厂附近空地(现沭阳县增塑剂厂),经现场调研,可完全满足本工程建设需要,仅需要少量平整工作,无拆迁安置及重新征地等问题。
+再生水厂选址现状图如下:
+拟建再生水厂规划位置现场图片
+工艺确定及水量平衡图
+本再生回用工程拟分二期建设,其中一期设计回用供水规模为1.0万吨/天(含5%的管网漏损),土建预留远期扩容至1.5万吨/天回用供水规模。
+工艺流程图及水量平衡图(一期项目)如下:
+工艺流程及水量平衡图
+说明:根据本项目的设计进水水质,除再生水厂主工艺流程外,增加预处理除锰工艺(预处理工艺的选择确认还需对污水处理厂出水水质持续检测)
+工艺主要设计参数
+预处理工艺 — 除锰单元设计
+污水厂尾水锰含量1.92mg/L(仅有一次检测数据),需考虑增加除锰预处理工艺,基于本项目建设周期短,工期紧的需求,设计锰砂过滤器作为预处理设备,安装施工方便,建设周期短。
+主要设计参数如下:
+| 序号 | 名称 | 技术参数 |
+| --- | --- | --- |
+| 1 | 净产水能力 | 14800 m3/d |
+| 2 | 滤速 | ≤8m/h |
+| 3 | 运行周期 | 12~24h |
+| 4 | 反洗方式 | 水洗 |
+| 5 | 反洗时间 | 10~15min |
+| 6 | 反洗强度 | 15~20L/m2.s |
+| 7 | 反洗耗水 | 1~3% |
+主工艺 – 再生水厂单元设计
+再生水厂采用装配式设计建设,含装配式超滤模块,装配式反渗透模块,装配式公辅模块及智慧化运营模块。
+主要设备包含:超滤膜通用平台,反渗透膜通用平台,膜污染控制系统,数字化建设管理平台,数字化运营管理平台、无人值守系统。以上组成部分构成一个完整的净化单元,其中膜污染控制系统可以有效防止膜污染、延长膜的使用寿命;数字化建设、运营管理平台及无人值守系统实现新水岛的高度自动化控制,降低水厂的运维人员工作强度、减少运营期间的人工巡检频率。主工艺系统的运行工况,如过滤、反洗、加药、维护性/恢复性化学清洗等均根据实时的进水水质、水温、水量,全自动控制。
+装配式超滤膜通用平台
+| 序号 | 名称 | 技术参数 |
+| --- | --- | --- |
+| 1 | 产水能力 | 13350 m3/d |
+| 2 | 过滤精度 | ≤0.02m |
+| 3 | 运行压差 | 20~100kpa |
+| 4 | 运行pH范围 | 2~12 |
+| 5 | 运行周期 | 30~90min |
+| 6 | 清洗方式 | 脉冲性水反洗 |
+| 7 | 运行通量范围 | 50~80LMH |
+| 8 | 清洗水强度 | 200~230LMH |
+| 9 | 系统回收率 | ≥90% |
+| 10 | 出水SDI | 稳定≤3 |
+| 11 | 出水浊度 | ≤0.1NTU |
+| 12 | 细菌去除率 | ≥99.99% |
+| 13 | 病毒去除率 | ≥99.99% |
+| 14 | 两虫去除率 | ≥99.99% |
+装配式反渗透膜通用平台
+| 序号 | 项目 | 技术参数 |
+| --- | --- | --- |
+| 1 | 产水能力 | 1.0万吨/天 |
+| 2 | 反渗透膜型式 | 卷式聚酰胺复合膜 |
+| 3 | 运行通量范围 | 18~22LMH |
+| 4 | 系统回收率 | ≥75%(15~25℃) |
+| 5 | 系统脱盐率 | ≥95%(15~25℃) |
+| 6 | 运行水温 | 10~25℃ |
+| 7 | 运行pH值 | 2~11 |
+| 8 | 化学清洗pH值 | 1~13 |
+| 9 | 运行压力 | ≤1.5Mpa(15~25℃) |
+| 10 | 进水余氯限值 | ≤0.1ppm |
+厂外管线设计方案
+本工程现阶段主要服务对象为凯盛纸业和誉凯实业(以上两家企业用水水量已超过1万吨/天),并为宁沭纸业、弘盛纸业及俊达纸业等潜在用水户预留扩建空间。再生水厂出水水质好,故采用专管输送,厂外管道按照再生水项目远期供水规模(1.5万吨/天)一次建成,经计算:
+取水管道管径DN600;
+外供水主管管径为DN400;
+往江苏凯盛纸业有限公司支管管径DN350;往江苏誉凯实业有限公司支管管径DN250,预留往沭阳宁沭纸业有限公司支管管径DN125;预留往沭阳弘盛纸业有限公司在誉凯实业公司的管线上直接接管,管径DN100;预留往沭阳俊达纸业有限公司的接口在主管道沿线,直接接管即可,管径DN80。管道拟采用牵引施工,管道材质采用PE100管,阻力系数小,连接方便,施工快捷。
+根据用水公司地址和沂北扎下污水处理厂现状位置,本工程主管道沿线主要通过循循环经一路,分支管管主要沿循环纬一路、循环纬二路等敷设。路径如下图所示:
+图 厂外管线路径示意图
+项目经济效益分析
+投资预算
+本项目投资预算约为4770万元,其中一类费用4150万元,二类费用620万元,投资预算估算表如下表所示:
+| 序号 | 名称 | 工程简要说明 | 金额 (万元) | 备注 |
+| --- | --- | --- | --- | --- |
+| 一类工程费用 | 一类工程费用 | 一类工程费用 | 一类工程费用 | 一类工程费用 |
+| 1 | 装配式再生水车间 | 装配式再生水车间 | 装配式再生水车间 | 装配式再生水车间 |
+| 1.1 | 预处理系统(锰砂过滤器) | 锰砂过滤器集配套设备,超滤膜通用平台,反渗透膜通用平台,膜污染控制系统,精确加药系统;高低压配电系统、中控系统,数字化建设管理平台,数字化运营管理平台系统; | 3,650 |  |
+| 1.2 | 新水岛 | 锰砂过滤器集配套设备,超滤膜通用平台,反渗透膜通用平台,膜污染控制系统,精确加药系统;高低压配电系统、中控系统,数字化建设管理平台,数字化运营管理平台系统; | 3,650 |  |
+| 1.3 | 土建工程 | 装配式模块化设计 | 3,650 |  |
+| 2 | 公用工程 | 公用工程 | 公用工程 | 公用工程 |
+| 2.1 | 厂区公用工程 |  | 100 |  |
+| 3 | 输送管网 |  | 400 | 预估 |
+| 4 | 合计 |  | 4,150 |  |
+| 二类工程、预备费费用 | 二类工程、预备费费用 | 二类工程、预备费费用 | 二类工程、预备费费用 | 二类工程、预备费费用 |
+| 1 | 二类工程、预备费费用 | 二类工程、预备费费用 | 620 | 按一类费用的15% |
+| 以上部分合计 | 以上部分合计 | 以上部分合计 | 4,770 |  |
+运营成本预估
+装机负荷计算
+系统装机负荷:1430KW,运行负荷1255KW。
+运营成本估算
+计算基础数据
+电价0.8元/kw.h考虑;
+药剂按市场价格计算。
+运营成本估算表
+具体运营成本估算如下表所示:
+运营成本估算表
+| 序号 | 项目 | 用量 | 用量 | 单价 | 单价 | 运行费用 |
+| --- | --- | --- | --- | --- | --- | --- |
+| 序号 | 项目 | 用量 | 用量 | 单价 | 单价 | 万元 |
+| 一 | 可变成本 |  |  |  |  |  |
+| 1 | 电费 |  |  |  |  |  |
+| 1.1 | 电费 | 539.88 | 104kWh/年 | 0.8 | 元/kWh | 431.90 |
+| 2 | 药剂费 |  |  |  |  |  |
+| 2.1 | PAC溶液(10%) | 53.29 | t/年 | 1000 | 元/t | 5.33 |
+| 2.2 | NaOH(30%) | 138.18 | t/年 | 900 | 元/t | 12.44 |
+| 2.3 | NaClO(10%) | 54.52 | t/年 | 1300 | 元/t | 7.09 |
+| 2.4 | 柠檬酸 | 18.39 | t/年 | 9500 | 元/t | 17.47 |
+| 2.5 | SBS | 17.63 | t/年 | 6700 | 元/t | 11.81 |
+| 2.6 | 阻垢剂 | 14.60 | t/年 | 40000 | 元/t | 58.40 |
+| 2.7 | 杀菌剂 | 5.79 | t/年 | 28000 | 元/t | 16.22 |
+| 2.8 | 反渗透清洗酸剂 | 7.20 | t/年 | 9000 | 元/t | 6.48 |
+| 2.9 | 反渗透清洗碱剂 | 7.20 | t/年 | 18000 | 元/t | 12.96 |
+| 二 | 固定成本 |  |  |  |  |  |
+| 1 | 固定费用 | 含人员工资,设备维修,日常管理等费用 | 含人员工资,设备维修,日常管理等费用 | 含人员工资,设备维修,日常管理等费用 | 含人员工资,设备维修,日常管理等费用 | 180 |
+| 三 | 运营费用合计(万元/年) | 运营费用合计(万元/年) | 运营费用合计(万元/年) | 运营费用合计(万元/年) | 运营费用合计(万元/年) | 760.10 |
+| 四 | 平均运营费用(元/吨水)(按照近期外供水量10000m3/d计) | 平均运营费用(元/吨水)(按照近期外供水量10000m3/d计) | 平均运营费用(元/吨水)(按照近期外供水量10000m3/d计) | 平均运营费用(元/吨水)(按照近期外供水量10000m3/d计) | 平均运营费用(元/吨水)(按照近期外供水量10000m3/d计) | 2.08 |
+项目实施计划及建议
+项目实施计划建议
+本项目实施计划可采取两种模式:
+(1)由社会资本方主要负责再生水厂的建设投资及后期运营
+|  | 企业方 | 政府方 | 备注 |
+| --- | --- | --- | --- |
+| 建设阶段 | 主投,负责循环经济产业园再生回用主体工程的投资及建设 | 负责配套管网的投资 |  |
+| 运营阶段 | 实现再生水商业化运作,向企业提供等价值的再生水,从而实现项目的投资利润 | 1. 协调相关企业,鼓励其使用再生水; 2. 通过政策鼓励或招商引资,再生水厂产水水量及售水价均满足(水量≥1万吨/天,售水水价≥4.0元/吨水),则无需政府财政补贴,否则仍需政府相关财政补贴; | 1)工业用水价为3.34元/吨水,高于目前售水水价; 2)政府不收取污水处理厂取水及废水处理成本,本项目废水拟回流至污水处理厂再利用; |
+(2)由政府主要负责再生水厂的建设投资及后期运营
+|  | 企业方 | 政府方 | 备注 |
+| --- | --- | --- | --- |
+| 建设阶段 | / | 负责再生水厂建设投资(含配套管网) |  |
+| 运营阶段 | 由具备相关经验的企业负责后期运营,并对运营成本担保 | / |  |
+项目实施时间安排(初步)
+桐昆集团的入驻,根据企业的建设进度及污水排放情况,亟需释放环境容量,高品质再生水厂须在本年度七月份落地通水,若暂定二月初项目立项工作启动,整个项目的建设周期仅有5个月时间,时间已非常紧迫,具体项目的实施时间初步安排如下:
+| 序号 | 工作节点 | 时间计划 | 备注 |
+| --- | --- | --- | --- |
+| 1 | 立项、可研、设计阶段 | 2024年02月01 ~ 03月15日 |  |
+| 2 | 设备采购及建设阶段 | 2024年03月15日~06月14日 |  |
+| 3 | 调试阶段 | 2024年06月15日~06月28日 |  |
+| 4 | 竣工验收 | 2024年06月29日~06月30日 |  |
+| 5 | 项目完成开始供水 | 2024年7月1日 |  |
+"""
+    model = Model_()
+    try:
+        response = model.chat_stream(promt)
+        #print("\n完整内容如下:")
+        #print(response)
+    except Exception as e:
+        print(f"错误: {e}")

+ 246 - 0
reader.py

@@ -0,0 +1,246 @@
+from Reader.docx_reader import DocxReader
+from Reader.pdf_reader import PdfReader
+import threading
+import time
+import os
+from models import deepseek_online
+
+class Reader:
+    def __init__(self, workers:int=2, task_max_count:int=100):
+        # 任务列表
+        self.task_list = []  # 任务列表
+        self.finish_list = []  # 完成列表
+        # 互斥锁,注意不可重入
+        self.__lock = threading.Lock()
+        # 条件变量
+        self.__condition = threading.Condition(self.__lock)
+        # 任务列表最大元素
+        self.__list_max_limit = task_max_count
+
+        # 开启线程
+        self.__threads = []
+        # 线程数量
+        self.workers = workers
+        # 运行状态
+        self.__is_running = False
+        # 守卫线程
+        self.__guard_thread = None
+        self.__guard_period = 6 # 守护周期,单位:秒
+        # 预处理文档保存路径
+        self.tem_save_path = './tem_files'
+        # 处理后的输出文件路径
+        self.chat_output_path = './chat_output'
+
+
+    def start(self):
+        """
+        启动
+        Returns:
+        """
+        # 先给他把路径创建上,免得用到的地方报错
+        # 创建临时路径,存放预处理文档
+        if not os.path.exists(self.tem_save_path):
+            os.mkdir(self.tem_save_path)
+        # 创建LLM输出路径
+        if not os.path.exists(self.chat_output_path):
+            os.mkdir(self.chat_output_path)
+
+        # 设置状态
+        with self.__condition:
+            # 开启状态
+            self.__is_running = True
+        for i in range(self.workers):
+            print(f'start: 准备开启consumer线程:{i}')
+            self.__threads.append(threading.Thread(target=self.consumer))
+            self.__threads[i].start()
+
+        print("start: 准备开启守卫线程")
+        self.__start_guard()
+
+
+    def stop(self):
+        """
+        关闭
+        Returns:
+
+        """
+        # 设置状态
+        with self.__condition:
+            # 关闭状态
+            self.__is_running = False
+            # 唤醒所有线程
+            self.__condition.notify_all()
+            print('stop: 关闭时唤醒全部线程')
+            # 释放锁
+        for i, t in enumerate(self.__threads):
+            if t.is_alive():
+                t.join()
+        print(f'stop: 等待守护线程结束, 预计时间{self.__guard_period}s')
+        if self.__guard_thread.is_alive():
+            self.__guard_thread.join()
+        print(f'stop: 所有线程均已退出')
+
+    def __guard(self):
+        """
+        守护函数
+        Returns:
+
+        """
+        prefixed = f'guard({threading.get_ident()}):'
+        print(prefixed + "启动")
+        while self.__is_running:
+            running_thread = 0
+            for t in self.__threads:
+                if t.is_alive():
+                    running_thread += 1
+                else:
+                    print(prefixed+f"{t.ident}未存活")
+            if running_thread != self.workers:
+                print(prefixed + "有线程挂了!应该再启动一个")
+                # 启动挂掉的线程
+                pass  # 暂不实现
+            else:
+                task_num = len(self.task_list)
+                finish_num = len(os.listdir(self.chat_output_path))  # 通过读取已处理文件的个数来确定
+                print(prefixed + f"消费者线程运行数量: {running_thread}, 已处理任务数量:{finish_num}, 剩余任务数量: {task_num}")
+            time.sleep(self.__guard_period)
+    def __start_guard(self):
+        self.__guard_thread = threading.Thread(target=self.__guard)
+        self.__guard_thread.start()
+
+    def add_task(self, task: str)->bool:
+        """
+        向任务列表添加任务
+        Args:
+            task: 待处理的文件路径
+        Returns: 成功返回True,失败返回False
+
+        """
+        flag = False
+        with self.__condition:
+            # 判断列表是否超限
+            if len(self.task_list) < self.__list_max_limit:
+                # 未超限添加新任务
+                self.task_list.append(task)
+                flag = True
+                # 唤醒正在等待的线程1个
+                self.__condition.notify()
+                print('add_task: 添加1个任务,唤醒1个线程')
+        return flag
+
+    def get_task_list_len(self)-> int:
+        """
+        获取任务列表长度
+        Returns: 整数
+
+        """
+        with self.__lock:
+            task_list_len = len(self.task_list)
+        return task_list_len
+
+    def get_finish_list_len(self)->int:
+        """
+        获取完成列表长度
+        Returns:
+
+        """
+        with self.__lock:
+            finish_list_len = len(self.finish_list)
+        return finish_list_len
+
+    def consumer(self):
+        """
+        任务处理函数
+        Returns:
+        """
+        thread_id = threading.get_ident()  # 线程id
+        prefixed = f'consumer({thread_id}):'
+        print(prefixed + '开启消费者线程')
+        while self.__is_running:
+            with self.__condition:
+                # 获取锁
+                while len(self.task_list) == 0 and self.__is_running:
+                    print(prefixed + '消费者进入等待')
+                    self.__condition.wait()  # 进入等待
+                    print(prefixed + '消费者被唤醒')
+                if not self.__is_running:
+                    break
+                # 被唤醒,从任务列表取数据
+                data = self.task_list.pop(0)
+                # 释放锁
+            # 处理数据,文件路径
+            print(prefixed + '正在处理:',data)
+            self.process(data)
+            print(prefixed + '处理完成:',data)
+        print(prefixed + '关闭消费者线程')
+
+    def process(self, file_path:str):
+
+        file_base_name = os.path.basename(file_path)  # 文件名称
+        file_dir_name = os.path.dirname(file_path)  # 文件路径名
+        file_name, file_extension = os.path.splitext(file_base_name)  # 文件名 + 后缀名
+        final_path = os.path.join(self.tem_save_path, file_name + '.md')
+        llm_output_path = os.path.join(self.chat_output_path, file_name + '.md')
+        # 步骤0:构建llm推理模型,注意模型也涉及多线程,在线api可以不管
+        model = deepseek_online.Model_()
+        # 步骤1:读取并处理文档
+        if file_extension == '.docx':
+            # 读取器设置, 保证每个线程都有自己的处理器,避免竞态
+            generator = DocxReader()  # docx读取器
+            generator.read(file_path)
+
+        elif file_extension == '.pdf':
+            # 读取器设置, 保证每个线程都有自己的处理器,避免竞态
+            generator = PdfReader() # pdf读取器
+            generator.read(file_path)
+        else:
+            print("未知的文件类型:", file_extension)
+            return
+        # 步骤2:进入文档生成器,调用模型处理
+        for idx, c in enumerate(generator.text_generator()):  # c 就是文本内容,str
+            llm_output = ''
+            try:
+                llm_output = model.chat_stream(c)
+            except Exception as e:
+                print("LLM推理时发生意外错误:",e)
+            # 步骤3:保存LLM输出
+            if idx > 0:
+                llm_output_path = llm_output_path.replace('.md', '-'+str(idx)+'.md')
+            with open(llm_output_path, 'w', encoding='utf-8') as f_llm:
+                f_llm.write(llm_output)
+        # 步骤4:写入文件,结束文档处理,重置docx读取器状态
+        generator.write(final_path, 'w', 'utf-8')
+
+if __name__ == '__main__':
+    # 测试代码
+    # r = Reader()
+    # worker_iter_times = 2
+    # def worker(word:str):
+    #     for i in range(worker_iter_times):
+    #         print('i want to say: ,', word)
+    #         r.add_task(str(i))
+    # workers = 10
+    # threads = [threading.Thread(target=worker, kwargs={"word": str(i)}) for i in range(workers)]
+    # for t in threads:
+    #     t.start()
+    # for t in threads:
+    #     t.join()
+    # print(f'now task list count is {worker_iter_times*workers}: ',r.get_task_list_len())
+    # del threads
+
+    # part 2
+    # r = Reader()
+    # r.start()
+    # r.add_task(r'D:\code\rag_tools\终版-(0119)沭阳县循环经济产业园污废水资源化项目方案简述(3).docx')
+    # print(r.task_list)
+    # r.add_task(r'D:\code\rag_tools\(0119)沭阳县循环经济产业园污废水资源化项目方案简述(3).pdf')
+    # print(r.task_list)
+    # for i in range(10):
+    #     r.add_task('a')
+    # r.stop()
+
+    # part3测试process函数
+    r = Reader()
+    # r.process(r'D:\code\rag_tools\uploads\终版-(0119)沭阳县循环经济产业园污废水资源化项目方案简述(3).docx')
+    # r.process(r'D:\code\rag_tools\uploads\(0119)沭阳县循环经济产业园污废水资源化项目方案简述(3).pdf')
+    r.process(r"D:\code\tem_1103\RAG资料库—开发\污水处理项目方案\TP-1-3 BioSecure生物安全性保障工艺技术包技术方案模板-V1.docx")

+ 0 - 0
txt_reader.py