| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657 |
- """
- 线程安全的共享状态总线,用于三模型之间的松耦合通信。
- """
- import threading
- import time
- from collections import deque
- from datetime import datetime
- from typing import Any, Dict, List, Optional
- class SharedState:
- def __init__(self, history_size: int = 10):
- self._lock = threading.RLock()
- self._history_size = history_size
- self._channels: Dict[str, _Channel] = {}
- def publish(self, channel: str, data: dict) -> None:
- with self._lock:
- if channel not in self._channels:
- self._channels[channel] = _Channel(self._history_size)
- self._channels[channel].write(data)
- def read(self, channel: str, max_age_seconds: Optional[float] = None) -> Optional[dict]:
- with self._lock:
- ch = self._channels.get(channel)
- if ch is None:
- return None
- return ch.read(max_age_seconds)
- def read_history(self, channel: str, limit: int = 10) -> List[dict]:
- with self._lock:
- ch = self._channels.get(channel)
- if ch is None:
- return []
- return list(ch.history)
- class _Channel:
- __slots__ = ("latest", "updated_at", "history")
- def __init__(self, history_size: int):
- self.latest: Optional[dict] = None
- self.updated_at: Optional[float] = None
- self.history: deque = deque(maxlen=history_size)
- def write(self, data: dict) -> None:
- self.latest = data
- self.updated_at = time.time()
- self.history.append({"data": data, "ts": self.updated_at})
- def read(self, max_age_seconds: Optional[float] = None) -> Optional[dict]:
- if self.latest is None:
- return None
- if max_age_seconds is not None and self.updated_at is not None:
- if time.time() - self.updated_at > max_age_seconds:
- return None
- return self.latest
|