""" 线程安全的共享状态总线,用于三模型之间的松耦合通信。 """ import threading import time from collections import deque from datetime import datetime from typing import Any, Dict, List, Optional class SharedState: def __init__(self, history_size: int = 10): self._lock = threading.RLock() self._history_size = history_size self._channels: Dict[str, _Channel] = {} def publish(self, channel: str, data: dict) -> None: with self._lock: if channel not in self._channels: self._channels[channel] = _Channel(self._history_size) self._channels[channel].write(data) def read(self, channel: str, max_age_seconds: Optional[float] = None) -> Optional[dict]: with self._lock: ch = self._channels.get(channel) if ch is None: return None return ch.read(max_age_seconds) def read_history(self, channel: str, limit: int = 10) -> List[dict]: with self._lock: ch = self._channels.get(channel) if ch is None: return [] return list(ch.history) class _Channel: __slots__ = ("latest", "updated_at", "history") def __init__(self, history_size: int): self.latest: Optional[dict] = None self.updated_at: Optional[float] = None self.history: deque = deque(maxlen=history_size) def write(self, data: dict) -> None: self.latest = data self.updated_at = time.time() self.history.append({"data": data, "ts": self.updated_at}) def read(self, max_age_seconds: Optional[float] = None) -> Optional[dict]: if self.latest is None: return None if max_age_seconds is not None and self.updated_at is not None: if time.time() - self.updated_at > max_age_seconds: return None return self.latest