jiyuhang 2 месяцев назад
Родитель
Сommit
6ae69f9416

+ 36 - 7
app/algorithm/monitor/monitor_mnger.py

@@ -2,6 +2,9 @@ import threading
 import time
 import psutil
 from .monitor_util import Monitor
+from .webskt_mnger import WebMnger
+import signal
+import sys
 
 class MonitorManager:
     """监控管理器,负责管理多个进程的监控任务"""
@@ -11,7 +14,31 @@ class MonitorManager:
         self.monitor_lock = threading.Lock()
         self.interval = interval # 监测间隔默认2秒
         self.timeout = min(interval, 5) # 监测间隔默认2秒
-
+        # websocket管理器
+        self.websocket_manager = WebMnger()  #  保证MonitorManager是单例,那么WebMnger也是单例
+        self.websocket_thread = self.websocket_manager.start_server()
+        if self.websocket_thread.is_alive():
+            print("WebSocket服务器已启动: ")
+        signal.signal(signal.SIGINT, self._signal_handler)
+
+    def __del__(self):
+        try:
+            self.clean_up()
+        except Exception as e:
+            print(f"销毁资源出现异常: {e}")
+
+    def _signal_handler(self, signum, frame):
+        """处理 Ctrl+C 信号"""
+        print("收到退出信号,正在清理资源...")
+        self.clean_up()
+        sys.exit(0)
+
+    def clean_up(self):
+        """清理资源"""
+        # 清理websocket资源管理器
+        self.websocket_manager.stop_server()
+        # 清理自身子线程
+        self.clear_all()
     def start_monitoring(self, pid:int, interval:int=2):
         """开始监控指定PID的进程, 成功返回true,失败返回false"""
         try:
@@ -56,8 +83,10 @@ class MonitorManager:
             return False
     def clear_all(self):
         """清除所有进程的监控数据"""
-        for k, v in self.monitoring_data.items():
-            self.clear_pid(int(k))
+        with self.monitor_lock:
+            pids = list(self.monitoring_data.keys())
+        for pid in pids:
+            self.clear_pid(int(pid))
     def clear_pid(self, pid:int):
         """清除指定进程的监控数据"""
         with self.monitor_lock:
@@ -85,7 +114,6 @@ class MonitorManager:
             return  False
 
 
-
     def __monitor_worker(self, pid:int, interval:int, event):
         """监控工作线程"""
         # 在子线程中创建监视器实例,往后只有子线程自己使用这个监视器,没有静态条件
@@ -105,8 +133,9 @@ class MonitorManager:
                         # 更新监控数据
                         print(f'>>>worker:正在追踪进程{pid}.')
                         monitor.update()
-                        # 存储数据点
-                        pass
+                        # 使用websockets广播监控数据点
+                        self.websocket_manager.broadcast_sync(monitor.data)
+
                     else:
                         # 如果停止监视,那么该子线程会被销毁
                         break
@@ -125,7 +154,7 @@ class MonitorManager:
         """搜索pid是否正在被监测,搜索到就代表正在被监测,返回True,反之亦然"""
         with self.monitor_lock:
             is_exiting =  str(pid) in self.monitoring_data
-            is_running = self.monitoring_data.get(str(pid), False) if is_exiting else False
+            is_running = self.monitoring_data[str(pid)]["is_running"] if is_exiting else False
         return is_exiting and is_running  # 存在且正在运行才认为搜索成功
 
     def get_monitoring_data(self):

+ 33 - 31
app/algorithm/monitor/monitor_util.py

@@ -17,7 +17,7 @@ class Monitor:
         self.curr_system_net_io_counter = None  # 系统网络读写计数器
         self.data_points=[] # 数据点
         self.max_count = 1
-        self.usage = self.__init_usage
+        self.__usage = self.__init_usage
         self.usage_map = {
             "pid": '进程名称',
             "now_time": '当前时间',
@@ -48,11 +48,11 @@ class Monitor:
             self.process = None
         if self.process:
             print(f"成功附加到进程: {self.process.name()} (PID: {pid})")
-            self.usage["name"] = self.process.name()
-            self.usage["pid"] = f'{self.process.pid}'
-            self.usage["pwd"] = self.process.cwd()
-            self.usage["exe"] = self.process.exe()
-            self.usage["start_time"] = datetime.fromtimestamp(self.process.create_time()).strftime('%Y-%m-%d %H:%M:%S')
+            self.__usage["name"] = self.process.name()
+            self.__usage["pid"] = f'{self.process.pid}'
+            self.__usage["pwd"] = self.process.cwd()
+            self.__usage["exe"] = self.process.exe()
+            self.__usage["start_time"] = datetime.fromtimestamp(self.process.create_time()).strftime('%Y-%m-%d %H:%M:%S')
 
 
     @property
@@ -83,28 +83,28 @@ class Monitor:
         rss_mb = mem_info.rss / (1024 * 1024)  # 转换为MB  物理内存
         vms_mb = mem_info.vms / (1024 * 1024)  # 虚拟内存
         memory_usage_percent = self.process.memory_percent()
-        self.usage["rss"] = f'{rss_mb:.2f}MB'  # 物理内存
-        self.usage["vms"] = f'{vms_mb:.2f}MB'  # 虚拟内存
-        self.usage["mem_pct"] = f'{memory_usage_percent:.2f}%'  # 占用百分比
+        self.__usage["rss"] = f'{rss_mb:.2f}MB'  # 物理内存
+        self.__usage["vms"] = f'{vms_mb:.2f}MB'  # 虚拟内存
+        self.__usage["mem_pct"] = f'{memory_usage_percent:.2f}%'  # 占用百分比
 
     def __get_cpu_usage_info(self):
         # 获取CPU使用率,设置采样间隔为1秒
         cpu_usage = self.process.cpu_percent(interval=1.0)
-        self.usage["cpu_pct"] = f'{cpu_usage:.2f}%'
+        self.__usage["cpu_pct"] = f'{cpu_usage:.2f}%'
 
     def __get_disk_io_info(self):
         """统计磁盘读写速度"""
         new_io_counter = self.process.io_counters()
         read_bytes_diff = (new_io_counter.read_bytes - self.curr_disk_io_counter.read_bytes) / (1024 ** 2)
         write_bytes_diff = (new_io_counter.write_bytes - self.curr_disk_io_counter.write_bytes) / (1024 ** 2)
-        if not self.usage.get('now_time', None):
+        if not self.__usage.get('now_time', None):
             raise RuntimeError('存在逻辑错误, 时间获取前被使用')
-        time_diff = (datetime.now()-datetime.strptime(self.usage['now_time'], '%Y-%m-%d %H:%M:%S')).total_seconds()
+        time_diff = (datetime.now() - datetime.strptime(self.__usage['now_time'], '%Y-%m-%d %H:%M:%S')).total_seconds()
         # 计算速率MB/秒
         read_speed_mB = (read_bytes_diff / time_diff) if time_diff > 0 else 0
         write_speed_mB = (write_bytes_diff / time_diff) if time_diff > 0 else 0
-        self.usage['disk_read'] = f'{read_speed_mB:.2f} MB/s'
-        self.usage['disk_write'] = f'{write_speed_mB:.2f} MB/s'
+        self.__usage['disk_read'] = f'{read_speed_mB:.2f} MB/s'
+        self.__usage['disk_write'] = f'{write_speed_mB:.2f} MB/s'
         self.curr_disk_io_counter = new_io_counter
 
     def __get_net_io_info(self):
@@ -112,13 +112,13 @@ class Monitor:
         new_net_io_counter = psutil.net_io_counters()
         sent_bytes_mB = (new_net_io_counter.bytes_sent - self.curr_system_net_io_counter.bytes_sent) / (1024 ** 2)
         recv_bytes_mB = (new_net_io_counter.bytes_recv - self.curr_system_net_io_counter.bytes_recv) / (1024 ** 2)
-        if not self.usage.get('now_time', None):
+        if not self.__usage.get('now_time', None):
             raise RuntimeError('存在逻辑错误, 时间获取前被使用')
-        time_diff = (datetime.now()-datetime.strptime(self.usage['now_time'], '%Y-%m-%d %H:%M:%S')).total_seconds()
+        time_diff = (datetime.now() - datetime.strptime(self.__usage['now_time'], '%Y-%m-%d %H:%M:%S')).total_seconds()
         sys_net_send_speed = (sent_bytes_mB / time_diff ) if time_diff > 0 else 0
         sys_net_recv_speed = (recv_bytes_mB / time_diff ) if time_diff > 0 else 0
-        self.usage['sys_net_send'] = f'{sys_net_send_speed:.2f} MB/s'
-        self.usage['sys_net_recv'] = f'{sys_net_recv_speed:.2f} MB/s'
+        self.__usage['sys_net_send'] = f'{sys_net_send_speed:.2f} MB/s'
+        self.__usage['sys_net_recv'] = f'{sys_net_recv_speed:.2f} MB/s'
         self.curr_system_net_io_counter = new_net_io_counter
 
     def __get_gpu_usage_info(self):
@@ -152,11 +152,11 @@ class Monitor:
                         except:
                             power_watts = 0
                         device_id = f'gpu_{i}_'
-                        self.usage[device_id+'mem'] = memory_mb  # 显存 MB
-                        self.usage[device_id+'rate_gpu'] = gpu_util.gpu if gpu_util else 0  # GPU利用率 %
-                        self.usage[device_id+'rate_mem'] = gpu_util.memory if gpu_util else 0  # 显存利用率 %
-                        self.usage[device_id+'temperature'] = temperature  # 显卡温度 °C
-                        self.usage[device_id+'power'] = power_watts  # 显卡功耗 W
+                        self.__usage[device_id + 'mem'] = memory_mb  # 显存 MB
+                        self.__usage[device_id + 'rate_gpu'] = gpu_util.gpu if gpu_util else 0  # GPU利用率 %
+                        self.__usage[device_id + 'rate_mem'] = gpu_util.memory if gpu_util else 0  # 显存利用率 %
+                        self.__usage[device_id + 'temperature'] = temperature  # 显卡温度 °C
+                        self.__usage[device_id + 'power'] = power_watts  # 显卡功耗 W
                         # print(f"GPU {i}:")
                         # print(f"  内存使用: {memory_mb:.2f} MB")
                         # print(f"  GPU 利用率: {gpu_util.gpu if gpu_util else 0}%")
@@ -176,10 +176,10 @@ class Monitor:
         # 时间间隔
         time.sleep(self.interval)
         # 初始化记录
-        self.usage = self.__init_usage
+        self.__usage = self.__init_usage
         # 刷新时间
-        self.usage['now_time'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
-        self.usage['status'] = self.process.status()
+        self.__usage['now_time'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
+        self.__usage['status'] = self.process.status()
         # 刷新内存使用情况
         self.__get_memory_usage_info()
         # 刷新cpu使用情况
@@ -190,14 +190,14 @@ class Monitor:
         self.__get_net_io_info()
         # 获取GPU使用情况
         self.__get_gpu_usage_info()
-        print(self.usage)
+        print(self.__usage)
         if output_file:
-            self.data_points.append(self.usage.copy())
+            self.data_points.append(self.__usage.copy())
             if len(self.data_points) >= self.max_count:  # 如果达到100个数据点,则保存到CSV文件中,并清空数据点
                 # 检查文件是否存在,以决定是否需要写入标题行
                 file_exists = os.path.isfile(output_file)
                 with open(output_file, 'a', newline='', encoding='utf-8') as csvfile:
-                    writer = csv.DictWriter(csvfile, fieldnames=self.usage.keys())
+                    writer = csv.DictWriter(csvfile, fieldnames=self.__usage.keys())
                     # 如果文件不存在,写入标题行
                     print(f'psutil 保存文件:{output_file}')
                     if not file_exists:
@@ -206,14 +206,16 @@ class Monitor:
                     writer.writerows(self.data_points)
                 # 清空数据点列表
                 self.data_points = []
-
+    @property
+    def data(self):
+        return self.__usage
     def start(self, interval=1, output_file=None):
         if self.process:
             while psutil.pid_exists(self.pid):
                 try:
                     with self.process.oneshot():
                         self.update(output_file=output_file)
-                        print(self.usage)
+                        print(self.__usage)
                 except Exception as e:
                     break
 def main():

+ 230 - 0
app/algorithm/monitor/webskt_mnger.py

@@ -0,0 +1,230 @@
+import threading
+import asyncio
+import websockets
+import time
+import json
+
+class WebMnger:
+    """websockets管理器"""
+    def __init__(self):
+        # 保存所有连接
+        self.connected_clients = set()
+        # 锁
+        self.lock = threading.Lock()  #  务必使用线程锁,因为多个线程会同时操作self.connected_clients
+        # 主服务
+        self.server = None
+        self.event_loop = None
+        self.server_thread = None
+        self.host = "localhost"
+        self.port = 8765
+        # 记录主服务是否完全开启
+        self.server_start_event = threading.Event()
+        # 服务器是否正在停止
+        self._stopping = False
+
+    def __del__(self):
+        """析构函数"""
+        self.stop_server()
+
+
+    async def __handle_client(self, websocket):
+        """每个新连接的客户端都会进入到这个函数"""
+        print("新客户端连接")
+        try:
+            # 注册客户端
+            with self.lock:
+                self.connected_clients.add(websocket)
+            # 处理客户端消息
+            async for message in websocket:
+                if self._stopping:
+                    break
+                print(f"收到消息:{message}")
+                await websocket.send(f"服务器已收到你的消息:{message}")
+        except websockets.exceptions.ConnectionClosed:
+            print(f"客户端 {websocket.remote_address} 断开连接")
+        except Exception as e:
+            print(f"处理客户端 {websocket.remote_address} 时出错: {e}")
+
+    def __run_server(self):
+        """运行主服务"""
+        self.event_loop = asyncio.new_event_loop()
+        asyncio.set_event_loop(self.event_loop)
+        try:
+            async def server_continue():
+                return await websockets.serve(self.__handle_client,
+                                              self.host,
+                                              self.port,
+                                              ping_interval=20,  # 心跳间隔
+                                              ping_timeout=10  # 心跳超时
+                                              )
+            self.server = self.event_loop.run_until_complete(server_continue())
+            print("WebSocket >>> 事件循环创建成功")
+            self.server_start_event.set()
+            self.event_loop.run_forever()
+            print("WebSocket >>> 事件循环已关闭")
+        except Exception as e:
+            print(f"WebSocket >>> 启动服务失败: {e}")
+
+
+    def start_server(self):
+        """启动服务"""
+        if self.server and self.server_thread and self.server_thread.is_alive():
+            print("WebSocket >>> 服务器已经在运行")
+            return self.server_thread  # 返回线程对象
+        # 如果没有运行就创建线程来启动
+        self.server_start_event.clear()
+        server_thread = threading.Thread(target=self.__run_server, daemon=True)
+        server_thread.start()
+        if self.server_start_event.wait(timeout=5):
+            if server_thread.is_alive():
+                print("WebSocket >>> 服务器已启动")
+                self.server_thread = server_thread
+                return server_thread
+            else:
+                print("WebSocket >>> 启动服务失败")
+                return None
+        else:
+            print("WebSocket >>> 启动服务失败")
+            return None
+    def stop_server(self):
+        """停止服务"""
+        if self.server is None and self.event_loop is None and self.server_thread is None:
+            return
+        # 关闭并清理客户端连接
+        with self.lock:
+            clients = self.connected_clients.copy()
+        if self.event_loop and self.event_loop.is_running():
+            for client in clients:
+                try:
+                    # 将异步关闭任务提交到事件循环线程中执行
+                    future = asyncio.run_coroutine_threadsafe(client.close(), self.event_loop)
+                    # 可选:等待一下这个关闭操作完成,但设置较短超时以免阻塞主线程
+                    future.result(timeout=2.0)
+                except Exception as e:
+                    print(f"websockets >>> 关闭连接失败: {e}")
+        else:
+            # 如果事件循环已经不在运行,可能无法优雅关闭,直接清理
+            print("事件循环未运行,无法异步关闭客户端连接")
+
+        # 无论异步关闭是否成功,都从集合中移除客户端
+        with self.lock:
+            self.connected_clients.clear()
+
+        # 停止接收客户端消息
+        self._stopping = True
+        if self.event_loop.is_running():
+            # 关闭服务器
+            async def close_server_and_loop():
+                if self.server:
+                    self.server.close()
+                    await self.server.wait_closed()
+
+
+            future = asyncio.run_coroutine_threadsafe(close_server_and_loop(), self.event_loop)
+            try:
+                future.result(timeout=5)  # 等待最多5秒
+            except asyncio.TimeoutError:
+                print("websockets >>> 停止服务器超时,强制停止事件循环")
+            if self.event_loop.is_running():
+                self.event_loop.call_soon_threadsafe(self.event_loop.stop)
+
+        # 等待服务器线程结束
+        if self.server_thread:
+            self.server_thread.join(timeout=5)
+        print("WebSocket >>> 服务器已停止")
+        self.event_loop  = None
+        self.server_thread = None
+        self.server = None
+
+    def broadcast_sync(self,message):
+        """线程安全的广播方法"""
+        if self.event_loop and self.event_loop.is_running():
+            # 将异步任务提交到事件循环线程中执行
+            future = asyncio.run_coroutine_threadsafe(self.__broadcast(message), self.event_loop)
+            try:
+                # 可选:等待操作完成,但设置超时避免阻塞
+                result = future.result(timeout=2.0)
+                return result
+            except asyncio.TimeoutError:
+                print("broadcast>>>: 广播操作超时")
+        else:
+            print("broadcast>>>: 事件循环未运行,无法广播")
+            return False
+
+    async def __broadcast(self, message):
+        """对所有连接进行组播"""
+        # 根据数据类型进行适当的序列化
+        if isinstance(message, dict) or isinstance(message, list):
+            # 字典或列表转换为JSON
+            message = json.dumps(message, ensure_ascii=False, separators=(',', ':'))
+        elif not isinstance(message, str):
+            # 其他类型转换为字符串
+            message = str(message)
+        print("broadcast>>>: 开始组播消息:", message)
+        with self.lock:
+            if not self.connected_clients:
+                print("broadcast>>>: 没有建立任何连接")
+                return
+            clients = self.connected_clients.copy()
+
+        tasks = []
+        disconnected_clients = []
+        for client in clients:
+            if client.state == websockets.State.OPEN:
+                try:
+                    task = asyncio.create_task(client.send(message))
+                    tasks.append(task)
+                except Exception as e:
+                    print(f"broadcast>>>: 发送消息失败: {e}")
+            else:
+                disconnected_clients.append(client)
+        print(f"broadcast>>>: 组播任务数量:{len(tasks)}")
+        # 并发组播
+        if tasks:
+            try:
+                done, pending = await asyncio.wait(
+                    tasks,
+                    timeout=10.0,  # 设置10秒超时
+                    return_when=asyncio.ALL_COMPLETED
+                )
+
+                # 处理未完成的任务
+                print(f"broadcast>>>: 未完成的任务数量:{len(pending)}, 已经完成的任务数量:{len(done)}")
+                for task in pending:
+                    task.cancel()
+
+                # 检查完成的任务是否有异常
+                for task in done:
+                    if task.exception():
+                        print(f"broadcast>>>: 消息发送失败: {task.exception()}")
+
+            except asyncio.TimeoutError:
+                print("broadcast>>>: 广播消息超时")
+            except Exception as e:
+                print(f"broadcast>>>: 广播过程中发生错误: {e}")
+
+        # 删除断开的客户端
+        if disconnected_clients:
+            with self.lock:
+                for client in disconnected_clients:
+                    if client in self.connected_clients:
+                        self.connected_clients.remove(client)
+
+
+async def main():
+    websocket_manager = WebMnger()
+    websocket_manager.start_server()
+    # 等待服务器完全启动
+    await asyncio.sleep(20)
+    # 测试广播(在同一线程的事件循环中)
+    websocket_manager.broadcast_sync("这是来自服务器的组播测试消息")
+    # 保持服务器运行一段时间
+    await asyncio.sleep(2)
+    websocket_manager.stop_server()
+
+
+if __name__ == "__main__":
+
+    asyncio.run(main())
+
+

+ 19 - 0
tem.py

@@ -0,0 +1,19 @@
+import asyncio
+
+
+async def say_after(delay:int, what:str):
+    await asyncio.sleep(delay)
+    print(what)
+
+async def main():
+    result1 = asyncio.create_task(say_after(5, 'hello'))
+    result2 = asyncio.create_task(say_after(5, 'world'))
+    print("finished")
+    await result1
+    await result2
+    print("done")
+    print(result1)
+    print(result2)
+
+if __name__ == "__main__":
+    asyncio.run(main())

+ 15 - 0
websocket_server.py

@@ -0,0 +1,15 @@
+import websockets
+import asyncio
+
+async def echo(websocket):
+    async for message in websocket:
+        print(f"收到消息:{message}")
+        # await websocket.send(message)
+
+async def main():
+    server = await websockets.serve(echo, "localhost", 8765)
+    print("WebSocket服务器已启动")
+    await server.wait_closed()
+
+if __name__ == "__main__":
+    asyncio.run(main())

+ 46 - 0
websockets_client.py

@@ -0,0 +1,46 @@
+import asyncio
+import websockets
+
+async def simple_client(link=0):
+    url = "ws://localhost:8765"
+    try:
+        async with websockets.connect(url) as websocket:
+            print(f"{link}已连接到服务器")
+            # 创建两个并发任务:发送和接收
+            send_task = asyncio.create_task(send_messages(websocket, link))
+            receive_task = asyncio.create_task(receive_messages(websocket, link))
+
+            # 等待任一任务完成(如果任一任务出错则都停止)
+            done, pending = await asyncio.wait(
+                [send_task, receive_task],
+                return_when=asyncio.FIRST_COMPLETED
+            )
+
+            # 取消未完成的任务
+            for task in pending:
+                task.cancel()
+    except ConnectionRefusedError:
+        print("无法连接到服务器,请确保服务器正在运行")
+    except websockets.exceptions.ConnectionClosed:
+        print("连接异常断开")
+
+# 发送消息的任务
+async def send_messages(websocket, link):
+    while True:
+        await websocket.send(f"Hello, server! i am {link}")
+        print(f"发送消息:Hello, server! i am {link}")
+        await asyncio.sleep(3)
+
+# 接收消息的任务
+async def receive_messages(websocket, link):
+    try:
+        async for message in websocket:
+            print(f"客户端 {link} 收到服务器消息: {message}")
+    except websockets.exceptions.ConnectionClosed:
+        print(f"客户端 {link} 连接已关闭")
+async def main():
+    await asyncio.gather(simple_client(1),
+                         simple_client(2)
+                         )
+if __name__ == "__main__":
+    asyncio.run(main())