reader.py 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246
  1. from Reader.docx_reader import DocxReader
  2. from Reader.pdf_reader import PdfReader
  3. import threading
  4. import time
  5. import os
  6. from models import deepseek_online
  7. class Reader:
  8. def __init__(self, workers:int=2, task_max_count:int=100):
  9. # 任务列表
  10. self.task_list = [] # 任务列表
  11. self.finish_list = [] # 完成列表
  12. # 互斥锁,注意不可重入
  13. self.__lock = threading.Lock()
  14. # 条件变量
  15. self.__condition = threading.Condition(self.__lock)
  16. # 任务列表最大元素
  17. self.__list_max_limit = task_max_count
  18. # 开启线程
  19. self.__threads = []
  20. # 线程数量
  21. self.workers = workers
  22. # 运行状态
  23. self.__is_running = False
  24. # 守卫线程
  25. self.__guard_thread = None
  26. self.__guard_period = 6 # 守护周期,单位:秒
  27. # 预处理文档保存路径
  28. self.tem_save_path = './tem_files'
  29. # 处理后的输出文件路径
  30. self.chat_output_path = './chat_output'
  31. def start(self):
  32. """
  33. 启动
  34. Returns:
  35. """
  36. # 先给他把路径创建上,免得用到的地方报错
  37. # 创建临时路径,存放预处理文档
  38. if not os.path.exists(self.tem_save_path):
  39. os.mkdir(self.tem_save_path)
  40. # 创建LLM输出路径
  41. if not os.path.exists(self.chat_output_path):
  42. os.mkdir(self.chat_output_path)
  43. # 设置状态
  44. with self.__condition:
  45. # 开启状态
  46. self.__is_running = True
  47. for i in range(self.workers):
  48. print(f'start: 准备开启consumer线程:{i}')
  49. self.__threads.append(threading.Thread(target=self.consumer))
  50. self.__threads[i].start()
  51. print("start: 准备开启守卫线程")
  52. self.__start_guard()
  53. def stop(self):
  54. """
  55. 关闭
  56. Returns:
  57. """
  58. # 设置状态
  59. with self.__condition:
  60. # 关闭状态
  61. self.__is_running = False
  62. # 唤醒所有线程
  63. self.__condition.notify_all()
  64. print('stop: 关闭时唤醒全部线程')
  65. # 释放锁
  66. for i, t in enumerate(self.__threads):
  67. if t.is_alive():
  68. t.join()
  69. print(f'stop: 等待守护线程结束, 预计时间{self.__guard_period}s')
  70. if self.__guard_thread.is_alive():
  71. self.__guard_thread.join()
  72. print(f'stop: 所有线程均已退出')
  73. def __guard(self):
  74. """
  75. 守护函数
  76. Returns:
  77. """
  78. prefixed = f'guard({threading.get_ident()}):'
  79. print(prefixed + "启动")
  80. while self.__is_running:
  81. running_thread = 0
  82. for t in self.__threads:
  83. if t.is_alive():
  84. running_thread += 1
  85. else:
  86. print(prefixed+f"{t.ident}未存活")
  87. if running_thread != self.workers:
  88. print(prefixed + "有线程挂了!应该再启动一个")
  89. # 启动挂掉的线程
  90. pass # 暂不实现
  91. else:
  92. task_num = len(self.task_list)
  93. finish_num = len(os.listdir(self.chat_output_path)) # 通过读取已处理文件的个数来确定
  94. print(prefixed + f"消费者线程运行数量: {running_thread}, 已处理任务数量:{finish_num}, 剩余任务数量: {task_num}")
  95. time.sleep(self.__guard_period)
  96. def __start_guard(self):
  97. self.__guard_thread = threading.Thread(target=self.__guard)
  98. self.__guard_thread.start()
  99. def add_task(self, task: str)->bool:
  100. """
  101. 向任务列表添加任务
  102. Args:
  103. task: 待处理的文件路径
  104. Returns: 成功返回True,失败返回False
  105. """
  106. flag = False
  107. with self.__condition:
  108. # 判断列表是否超限
  109. if len(self.task_list) < self.__list_max_limit:
  110. # 未超限添加新任务
  111. self.task_list.append(task)
  112. flag = True
  113. # 唤醒正在等待的线程1个
  114. self.__condition.notify()
  115. print('add_task: 添加1个任务,唤醒1个线程')
  116. return flag
  117. def get_task_list_len(self)-> int:
  118. """
  119. 获取任务列表长度
  120. Returns: 整数
  121. """
  122. with self.__lock:
  123. task_list_len = len(self.task_list)
  124. return task_list_len
  125. def get_finish_list_len(self)->int:
  126. """
  127. 获取完成列表长度
  128. Returns:
  129. """
  130. with self.__lock:
  131. finish_list_len = len(self.finish_list)
  132. return finish_list_len
  133. def consumer(self):
  134. """
  135. 任务处理函数
  136. Returns:
  137. """
  138. thread_id = threading.get_ident() # 线程id
  139. prefixed = f'consumer({thread_id}):'
  140. print(prefixed + '开启消费者线程')
  141. while self.__is_running:
  142. with self.__condition:
  143. # 获取锁
  144. while len(self.task_list) == 0 and self.__is_running:
  145. print(prefixed + '消费者进入等待')
  146. self.__condition.wait() # 进入等待
  147. print(prefixed + '消费者被唤醒')
  148. if not self.__is_running:
  149. break
  150. # 被唤醒,从任务列表取数据
  151. data = self.task_list.pop(0)
  152. # 释放锁
  153. # 处理数据,文件路径
  154. print(prefixed + '正在处理:',data)
  155. self.process(data)
  156. print(prefixed + '处理完成:',data)
  157. print(prefixed + '关闭消费者线程')
  158. def process(self, file_path:str):
  159. file_base_name = os.path.basename(file_path) # 文件名称
  160. file_dir_name = os.path.dirname(file_path) # 文件路径名
  161. file_name, file_extension = os.path.splitext(file_base_name) # 文件名 + 后缀名
  162. final_path = os.path.join(self.tem_save_path, file_name + '.md')
  163. llm_output_path = os.path.join(self.chat_output_path, file_name + '.md')
  164. # 步骤0:构建llm推理模型,注意模型也涉及多线程,在线api可以不管
  165. model = deepseek_online.Model_()
  166. # 步骤1:读取并处理文档
  167. if file_extension == '.docx':
  168. # 读取器设置, 保证每个线程都有自己的处理器,避免竞态
  169. generator = DocxReader() # docx读取器
  170. generator.read(file_path)
  171. elif file_extension == '.pdf':
  172. # 读取器设置, 保证每个线程都有自己的处理器,避免竞态
  173. generator = PdfReader() # pdf读取器
  174. generator.read(file_path)
  175. else:
  176. print("未知的文件类型:", file_extension)
  177. return
  178. # 步骤2:进入文档生成器,调用模型处理
  179. for idx, c in enumerate(generator.text_generator()): # c 就是文本内容,str
  180. llm_output = ''
  181. try:
  182. llm_output = model.chat_stream(c)
  183. except Exception as e:
  184. print("LLM推理时发生意外错误:",e)
  185. # 步骤3:保存LLM输出
  186. if idx > 0:
  187. llm_output_path = llm_output_path.replace('.md', '-'+str(idx)+'.md')
  188. with open(llm_output_path, 'w', encoding='utf-8') as f_llm:
  189. f_llm.write(llm_output)
  190. # 步骤4:写入文件,结束文档处理,重置docx读取器状态
  191. generator.write(final_path, 'w', 'utf-8')
  192. if __name__ == '__main__':
  193. # 测试代码
  194. # r = Reader()
  195. # worker_iter_times = 2
  196. # def worker(word:str):
  197. # for i in range(worker_iter_times):
  198. # print('i want to say: ,', word)
  199. # r.add_task(str(i))
  200. # workers = 10
  201. # threads = [threading.Thread(target=worker, kwargs={"word": str(i)}) for i in range(workers)]
  202. # for t in threads:
  203. # t.start()
  204. # for t in threads:
  205. # t.join()
  206. # print(f'now task list count is {worker_iter_times*workers}: ',r.get_task_list_len())
  207. # del threads
  208. # part 2
  209. # r = Reader()
  210. # r.start()
  211. # r.add_task(r'D:\code\rag_tools\终版-(0119)沭阳县循环经济产业园污废水资源化项目方案简述(3).docx')
  212. # print(r.task_list)
  213. # r.add_task(r'D:\code\rag_tools\(0119)沭阳县循环经济产业园污废水资源化项目方案简述(3).pdf')
  214. # print(r.task_list)
  215. # for i in range(10):
  216. # r.add_task('a')
  217. # r.stop()
  218. # part3测试process函数
  219. r = Reader()
  220. # r.process(r'D:\code\rag_tools\uploads\终版-(0119)沭阳县循环经济产业园污废水资源化项目方案简述(3).docx')
  221. # r.process(r'D:\code\rag_tools\uploads\(0119)沭阳县循环经济产业园污废水资源化项目方案简述(3).pdf')
  222. r.process(r"D:\code\tem_1103\RAG资料库—开发\污水处理项目方案\TP-1-3 BioSecure生物安全性保障工艺技术包技术方案模板-V1.docx")