| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246 |
- from Reader.docx_reader import DocxReader
- from Reader.pdf_reader import PdfReader
- import threading
- import time
- import os
- from models import deepseek_online
- class Reader:
- def __init__(self, workers:int=2, task_max_count:int=100):
- # 任务列表
- self.task_list = [] # 任务列表
- self.finish_list = [] # 完成列表
- # 互斥锁,注意不可重入
- self.__lock = threading.Lock()
- # 条件变量
- self.__condition = threading.Condition(self.__lock)
- # 任务列表最大元素
- self.__list_max_limit = task_max_count
- # 开启线程
- self.__threads = []
- # 线程数量
- self.workers = workers
- # 运行状态
- self.__is_running = False
- # 守卫线程
- self.__guard_thread = None
- self.__guard_period = 6 # 守护周期,单位:秒
- # 预处理文档保存路径
- self.tem_save_path = './tem_files'
- # 处理后的输出文件路径
- self.chat_output_path = './chat_output'
- def start(self):
- """
- 启动
- Returns:
- """
- # 先给他把路径创建上,免得用到的地方报错
- # 创建临时路径,存放预处理文档
- if not os.path.exists(self.tem_save_path):
- os.mkdir(self.tem_save_path)
- # 创建LLM输出路径
- if not os.path.exists(self.chat_output_path):
- os.mkdir(self.chat_output_path)
- # 设置状态
- with self.__condition:
- # 开启状态
- self.__is_running = True
- for i in range(self.workers):
- print(f'start: 准备开启consumer线程:{i}')
- self.__threads.append(threading.Thread(target=self.consumer))
- self.__threads[i].start()
- print("start: 准备开启守卫线程")
- self.__start_guard()
- def stop(self):
- """
- 关闭
- Returns:
- """
- # 设置状态
- with self.__condition:
- # 关闭状态
- self.__is_running = False
- # 唤醒所有线程
- self.__condition.notify_all()
- print('stop: 关闭时唤醒全部线程')
- # 释放锁
- for i, t in enumerate(self.__threads):
- if t.is_alive():
- t.join()
- print(f'stop: 等待守护线程结束, 预计时间{self.__guard_period}s')
- if self.__guard_thread.is_alive():
- self.__guard_thread.join()
- print(f'stop: 所有线程均已退出')
- def __guard(self):
- """
- 守护函数
- Returns:
- """
- prefixed = f'guard({threading.get_ident()}):'
- print(prefixed + "启动")
- while self.__is_running:
- running_thread = 0
- for t in self.__threads:
- if t.is_alive():
- running_thread += 1
- else:
- print(prefixed+f"{t.ident}未存活")
- if running_thread != self.workers:
- print(prefixed + "有线程挂了!应该再启动一个")
- # 启动挂掉的线程
- pass # 暂不实现
- else:
- task_num = len(self.task_list)
- finish_num = len(os.listdir(self.chat_output_path)) # 通过读取已处理文件的个数来确定
- print(prefixed + f"消费者线程运行数量: {running_thread}, 已处理任务数量:{finish_num}, 剩余任务数量: {task_num}")
- time.sleep(self.__guard_period)
- def __start_guard(self):
- self.__guard_thread = threading.Thread(target=self.__guard)
- self.__guard_thread.start()
- def add_task(self, task: str)->bool:
- """
- 向任务列表添加任务
- Args:
- task: 待处理的文件路径
- Returns: 成功返回True,失败返回False
- """
- flag = False
- with self.__condition:
- # 判断列表是否超限
- if len(self.task_list) < self.__list_max_limit:
- # 未超限添加新任务
- self.task_list.append(task)
- flag = True
- # 唤醒正在等待的线程1个
- self.__condition.notify()
- print('add_task: 添加1个任务,唤醒1个线程')
- return flag
- def get_task_list_len(self)-> int:
- """
- 获取任务列表长度
- Returns: 整数
- """
- with self.__lock:
- task_list_len = len(self.task_list)
- return task_list_len
- def get_finish_list_len(self)->int:
- """
- 获取完成列表长度
- Returns:
- """
- with self.__lock:
- finish_list_len = len(self.finish_list)
- return finish_list_len
- def consumer(self):
- """
- 任务处理函数
- Returns:
- """
- thread_id = threading.get_ident() # 线程id
- prefixed = f'consumer({thread_id}):'
- print(prefixed + '开启消费者线程')
- while self.__is_running:
- with self.__condition:
- # 获取锁
- while len(self.task_list) == 0 and self.__is_running:
- print(prefixed + '消费者进入等待')
- self.__condition.wait() # 进入等待
- print(prefixed + '消费者被唤醒')
- if not self.__is_running:
- break
- # 被唤醒,从任务列表取数据
- data = self.task_list.pop(0)
- # 释放锁
- # 处理数据,文件路径
- print(prefixed + '正在处理:',data)
- self.process(data)
- print(prefixed + '处理完成:',data)
- print(prefixed + '关闭消费者线程')
- def process(self, file_path:str):
- file_base_name = os.path.basename(file_path) # 文件名称
- file_dir_name = os.path.dirname(file_path) # 文件路径名
- file_name, file_extension = os.path.splitext(file_base_name) # 文件名 + 后缀名
- final_path = os.path.join(self.tem_save_path, file_name + '.md')
- llm_output_path = os.path.join(self.chat_output_path, file_name + '.md')
- # 步骤0:构建llm推理模型,注意模型也涉及多线程,在线api可以不管
- model = deepseek_online.Model_()
- # 步骤1:读取并处理文档
- if file_extension == '.docx':
- # 读取器设置, 保证每个线程都有自己的处理器,避免竞态
- generator = DocxReader() # docx读取器
- generator.read(file_path)
- elif file_extension == '.pdf':
- # 读取器设置, 保证每个线程都有自己的处理器,避免竞态
- generator = PdfReader() # pdf读取器
- generator.read(file_path)
- else:
- print("未知的文件类型:", file_extension)
- return
- # 步骤2:进入文档生成器,调用模型处理
- for idx, c in enumerate(generator.text_generator()): # c 就是文本内容,str
- llm_output = ''
- try:
- llm_output = model.chat_stream(c)
- except Exception as e:
- print("LLM推理时发生意外错误:",e)
- # 步骤3:保存LLM输出
- if idx > 0:
- llm_output_path = llm_output_path.replace('.md', '-'+str(idx)+'.md')
- with open(llm_output_path, 'w', encoding='utf-8') as f_llm:
- f_llm.write(llm_output)
- # 步骤4:写入文件,结束文档处理,重置docx读取器状态
- generator.write(final_path, 'w', 'utf-8')
- if __name__ == '__main__':
- # 测试代码
- # r = Reader()
- # worker_iter_times = 2
- # def worker(word:str):
- # for i in range(worker_iter_times):
- # print('i want to say: ,', word)
- # r.add_task(str(i))
- # workers = 10
- # threads = [threading.Thread(target=worker, kwargs={"word": str(i)}) for i in range(workers)]
- # for t in threads:
- # t.start()
- # for t in threads:
- # t.join()
- # print(f'now task list count is {worker_iter_times*workers}: ',r.get_task_list_len())
- # del threads
- # part 2
- # r = Reader()
- # r.start()
- # r.add_task(r'D:\code\rag_tools\终版-(0119)沭阳县循环经济产业园污废水资源化项目方案简述(3).docx')
- # print(r.task_list)
- # r.add_task(r'D:\code\rag_tools\(0119)沭阳县循环经济产业园污废水资源化项目方案简述(3).pdf')
- # print(r.task_list)
- # for i in range(10):
- # r.add_task('a')
- # r.stop()
- # part3测试process函数
- r = Reader()
- # r.process(r'D:\code\rag_tools\uploads\终版-(0119)沭阳县循环经济产业园污废水资源化项目方案简述(3).docx')
- # r.process(r'D:\code\rag_tools\uploads\(0119)沭阳县循环经济产业园污废水资源化项目方案简述(3).pdf')
- r.process(r"D:\code\tem_1103\RAG资料库—开发\污水处理项目方案\TP-1-3 BioSecure生物安全性保障工艺技术包技术方案模板-V1.docx")
|