from Reader.docx_reader import DocxReader from Reader.pdf_reader import PdfReader import threading import time import os from models import deepseek_online class Reader: def __init__(self, workers:int=2, task_max_count:int=100): # 任务列表 self.task_list = [] # 任务列表 self.finish_list = [] # 完成列表 # 互斥锁,注意不可重入 self.__lock = threading.Lock() # 条件变量 self.__condition = threading.Condition(self.__lock) # 任务列表最大元素 self.__list_max_limit = task_max_count # 开启线程 self.__threads = [] # 线程数量 self.workers = workers # 运行状态 self.__is_running = False # 守卫线程 self.__guard_thread = None self.__guard_period = 6 # 守护周期,单位:秒 # 预处理文档保存路径 self.tem_save_path = './tem_files' # 处理后的输出文件路径 self.chat_output_path = './chat_output' def start(self): """ 启动 Returns: """ # 先给他把路径创建上,免得用到的地方报错 # 创建临时路径,存放预处理文档 if not os.path.exists(self.tem_save_path): os.mkdir(self.tem_save_path) # 创建LLM输出路径 if not os.path.exists(self.chat_output_path): os.mkdir(self.chat_output_path) # 设置状态 with self.__condition: # 开启状态 self.__is_running = True for i in range(self.workers): print(f'start: 准备开启consumer线程:{i}') self.__threads.append(threading.Thread(target=self.consumer)) self.__threads[i].start() print("start: 准备开启守卫线程") self.__start_guard() def stop(self): """ 关闭 Returns: """ # 设置状态 with self.__condition: # 关闭状态 self.__is_running = False # 唤醒所有线程 self.__condition.notify_all() print('stop: 关闭时唤醒全部线程') # 释放锁 for i, t in enumerate(self.__threads): if t.is_alive(): t.join() print(f'stop: 等待守护线程结束, 预计时间{self.__guard_period}s') if self.__guard_thread.is_alive(): self.__guard_thread.join() print(f'stop: 所有线程均已退出') def __guard(self): """ 守护函数 Returns: """ prefixed = f'guard({threading.get_ident()}):' print(prefixed + "启动") while self.__is_running: running_thread = 0 for t in self.__threads: if t.is_alive(): running_thread += 1 else: print(prefixed+f"{t.ident}未存活") if running_thread != self.workers: print(prefixed + "有线程挂了!应该再启动一个") # 启动挂掉的线程 pass # 暂不实现 else: task_num = len(self.task_list) finish_num = len(os.listdir(self.chat_output_path)) # 通过读取已处理文件的个数来确定 print(prefixed + f"消费者线程运行数量: {running_thread}, 已处理任务数量:{finish_num}, 剩余任务数量: {task_num}") time.sleep(self.__guard_period) def __start_guard(self): self.__guard_thread = threading.Thread(target=self.__guard) self.__guard_thread.start() def add_task(self, task: str)->bool: """ 向任务列表添加任务 Args: task: 待处理的文件路径 Returns: 成功返回True,失败返回False """ flag = False with self.__condition: # 判断列表是否超限 if len(self.task_list) < self.__list_max_limit: # 未超限添加新任务 self.task_list.append(task) flag = True # 唤醒正在等待的线程1个 self.__condition.notify() print('add_task: 添加1个任务,唤醒1个线程') return flag def get_task_list_len(self)-> int: """ 获取任务列表长度 Returns: 整数 """ with self.__lock: task_list_len = len(self.task_list) return task_list_len def get_finish_list_len(self)->int: """ 获取完成列表长度 Returns: """ with self.__lock: finish_list_len = len(self.finish_list) return finish_list_len def consumer(self): """ 任务处理函数 Returns: """ thread_id = threading.get_ident() # 线程id prefixed = f'consumer({thread_id}):' print(prefixed + '开启消费者线程') while self.__is_running: with self.__condition: # 获取锁 while len(self.task_list) == 0 and self.__is_running: print(prefixed + '消费者进入等待') self.__condition.wait() # 进入等待 print(prefixed + '消费者被唤醒') if not self.__is_running: break # 被唤醒,从任务列表取数据 data = self.task_list.pop(0) # 释放锁 # 处理数据,文件路径 print(prefixed + '正在处理:',data) self.process(data) print(prefixed + '处理完成:',data) print(prefixed + '关闭消费者线程') def process(self, file_path:str): file_base_name = os.path.basename(file_path) # 文件名称 file_dir_name = os.path.dirname(file_path) # 文件路径名 file_name, file_extension = os.path.splitext(file_base_name) # 文件名 + 后缀名 final_path = os.path.join(self.tem_save_path, file_name + '.md') llm_output_path = os.path.join(self.chat_output_path, file_name + '.md') # 步骤0:构建llm推理模型,注意模型也涉及多线程,在线api可以不管 model = deepseek_online.Model_() # 步骤1:读取并处理文档 if file_extension == '.docx': # 读取器设置, 保证每个线程都有自己的处理器,避免竞态 generator = DocxReader() # docx读取器 generator.read(file_path) elif file_extension == '.pdf': # 读取器设置, 保证每个线程都有自己的处理器,避免竞态 generator = PdfReader() # pdf读取器 generator.read(file_path) else: print("未知的文件类型:", file_extension) return # 步骤2:进入文档生成器,调用模型处理 for idx, c in enumerate(generator.text_generator()): # c 就是文本内容,str llm_output = '' try: llm_output = model.chat_stream(c) except Exception as e: print("LLM推理时发生意外错误:",e) # 步骤3:保存LLM输出 if idx > 0: llm_output_path = llm_output_path.replace('.md', '-'+str(idx)+'.md') with open(llm_output_path, 'w', encoding='utf-8') as f_llm: f_llm.write(llm_output) # 步骤4:写入文件,结束文档处理,重置docx读取器状态 generator.write(final_path, 'w', 'utf-8') if __name__ == '__main__': # 测试代码 # r = Reader() # worker_iter_times = 2 # def worker(word:str): # for i in range(worker_iter_times): # print('i want to say: ,', word) # r.add_task(str(i)) # workers = 10 # threads = [threading.Thread(target=worker, kwargs={"word": str(i)}) for i in range(workers)] # for t in threads: # t.start() # for t in threads: # t.join() # print(f'now task list count is {worker_iter_times*workers}: ',r.get_task_list_len()) # del threads # part 2 # r = Reader() # r.start() # r.add_task(r'D:\code\rag_tools\终版-(0119)沭阳县循环经济产业园污废水资源化项目方案简述(3).docx') # print(r.task_list) # r.add_task(r'D:\code\rag_tools\(0119)沭阳县循环经济产业园污废水资源化项目方案简述(3).pdf') # print(r.task_list) # for i in range(10): # r.add_task('a') # r.stop() # part3测试process函数 r = Reader() # r.process(r'D:\code\rag_tools\uploads\终版-(0119)沭阳县循环经济产业园污废水资源化项目方案简述(3).docx') # r.process(r'D:\code\rag_tools\uploads\(0119)沭阳县循环经济产业园污废水资源化项目方案简述(3).pdf') r.process(r"D:\code\tem_1103\RAG资料库—开发\污水处理项目方案\TP-1-3 BioSecure生物安全性保障工艺技术包技术方案模板-V1.docx")