shared_state.py 1.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657
  1. """
  2. 线程安全的共享状态总线,用于三模型之间的松耦合通信。
  3. """
  4. import threading
  5. import time
  6. from collections import deque
  7. from datetime import datetime
  8. from typing import Any, Dict, List, Optional
  9. class SharedState:
  10. def __init__(self, history_size: int = 10):
  11. self._lock = threading.RLock()
  12. self._history_size = history_size
  13. self._channels: Dict[str, _Channel] = {}
  14. def publish(self, channel: str, data: dict) -> None:
  15. with self._lock:
  16. if channel not in self._channels:
  17. self._channels[channel] = _Channel(self._history_size)
  18. self._channels[channel].write(data)
  19. def read(self, channel: str, max_age_seconds: Optional[float] = None) -> Optional[dict]:
  20. with self._lock:
  21. ch = self._channels.get(channel)
  22. if ch is None:
  23. return None
  24. return ch.read(max_age_seconds)
  25. def read_history(self, channel: str, limit: int = 10) -> List[dict]:
  26. with self._lock:
  27. ch = self._channels.get(channel)
  28. if ch is None:
  29. return []
  30. return list(ch.history)
  31. class _Channel:
  32. __slots__ = ("latest", "updated_at", "history")
  33. def __init__(self, history_size: int):
  34. self.latest: Optional[dict] = None
  35. self.updated_at: Optional[float] = None
  36. self.history: deque = deque(maxlen=history_size)
  37. def write(self, data: dict) -> None:
  38. self.latest = data
  39. self.updated_at = time.time()
  40. self.history.append({"data": data, "ts": self.updated_at})
  41. def read(self, max_age_seconds: Optional[float] = None) -> Optional[dict]:
  42. if self.latest is None:
  43. return None
  44. if max_age_seconds is not None and self.updated_at is not None:
  45. if time.time() - self.updated_at > max_age_seconds:
  46. return None
  47. return self.latest