base.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512
  1. from __future__ import annotations
  2. import collections.abc as c
  3. import sys
  4. import typing as t
  5. import weakref
  6. from collections import defaultdict
  7. from contextlib import contextmanager
  8. from functools import cached_property
  9. from inspect import iscoroutinefunction
  10. from ._utilities import make_id
  11. from ._utilities import make_ref
  12. from ._utilities import Symbol
  13. F = t.TypeVar("F", bound=c.Callable[..., t.Any])
  14. ANY = Symbol("ANY")
  15. """Symbol for "any sender"."""
  16. ANY_ID = 0
  17. class Signal:
  18. """A notification emitter.
  19. :param doc: The docstring for the signal.
  20. """
  21. ANY = ANY
  22. """An alias for the :data:`~blinker.ANY` sender symbol."""
  23. set_class: type[set[t.Any]] = set
  24. """The set class to use for tracking connected receivers and senders.
  25. Python's ``set`` is unordered. If receivers must be dispatched in the order
  26. they were connected, an ordered set implementation can be used.
  27. .. versionadded:: 1.7
  28. """
  29. @cached_property
  30. def receiver_connected(self) -> Signal:
  31. """Emitted at the end of each :meth:`connect` call.
  32. The signal sender is the signal instance, and the :meth:`connect`
  33. arguments are passed through: ``receiver``, ``sender``, and ``weak``.
  34. .. versionadded:: 1.2
  35. """
  36. return Signal(doc="Emitted after a receiver connects.")
  37. @cached_property
  38. def receiver_disconnected(self) -> Signal:
  39. """Emitted at the end of each :meth:`disconnect` call.
  40. The sender is the signal instance, and the :meth:`disconnect` arguments
  41. are passed through: ``receiver`` and ``sender``.
  42. This signal is emitted **only** when :meth:`disconnect` is called
  43. explicitly. This signal cannot be emitted by an automatic disconnect
  44. when a weakly referenced receiver or sender goes out of scope, as the
  45. instance is no longer be available to be used as the sender for this
  46. signal.
  47. An alternative approach is available by subscribing to
  48. :attr:`receiver_connected` and setting up a custom weakref cleanup
  49. callback on weak receivers and senders.
  50. .. versionadded:: 1.2
  51. """
  52. return Signal(doc="Emitted after a receiver disconnects.")
  53. def __init__(self, doc: str | None = None) -> None:
  54. if doc:
  55. self.__doc__ = doc
  56. self.receivers: dict[
  57. t.Any, weakref.ref[c.Callable[..., t.Any]] | c.Callable[..., t.Any]
  58. ] = {}
  59. """The map of connected receivers. Useful to quickly check if any
  60. receivers are connected to the signal: ``if s.receivers:``. The
  61. structure and data is not part of the public API, but checking its
  62. boolean value is.
  63. """
  64. self.is_muted: bool = False
  65. self._by_receiver: dict[t.Any, set[t.Any]] = defaultdict(self.set_class)
  66. self._by_sender: dict[t.Any, set[t.Any]] = defaultdict(self.set_class)
  67. self._weak_senders: dict[t.Any, weakref.ref[t.Any]] = {}
  68. def connect(self, receiver: F, sender: t.Any = ANY, weak: bool = True) -> F:
  69. """Connect ``receiver`` to be called when the signal is sent by
  70. ``sender``.
  71. :param receiver: The callable to call when :meth:`send` is called with
  72. the given ``sender``, passing ``sender`` as a positional argument
  73. along with any extra keyword arguments.
  74. :param sender: Any object or :data:`ANY`. ``receiver`` will only be
  75. called when :meth:`send` is called with this sender. If ``ANY``, the
  76. receiver will be called for any sender. A receiver may be connected
  77. to multiple senders by calling :meth:`connect` multiple times.
  78. :param weak: Track the receiver with a :mod:`weakref`. The receiver will
  79. be automatically disconnected when it is garbage collected. When
  80. connecting a receiver defined within a function, set to ``False``,
  81. otherwise it will be disconnected when the function scope ends.
  82. """
  83. receiver_id = make_id(receiver)
  84. sender_id = ANY_ID if sender is ANY else make_id(sender)
  85. if weak:
  86. self.receivers[receiver_id] = make_ref(
  87. receiver, self._make_cleanup_receiver(receiver_id)
  88. )
  89. else:
  90. self.receivers[receiver_id] = receiver
  91. self._by_sender[sender_id].add(receiver_id)
  92. self._by_receiver[receiver_id].add(sender_id)
  93. if sender is not ANY and sender_id not in self._weak_senders:
  94. # store a cleanup for weakref-able senders
  95. try:
  96. self._weak_senders[sender_id] = make_ref(
  97. sender, self._make_cleanup_sender(sender_id)
  98. )
  99. except TypeError:
  100. pass
  101. if "receiver_connected" in self.__dict__ and self.receiver_connected.receivers:
  102. try:
  103. self.receiver_connected.send(
  104. self, receiver=receiver, sender=sender, weak=weak
  105. )
  106. except TypeError:
  107. # TODO no explanation or test for this
  108. self.disconnect(receiver, sender)
  109. raise
  110. return receiver
  111. def connect_via(self, sender: t.Any, weak: bool = False) -> c.Callable[[F], F]:
  112. """Connect the decorated function to be called when the signal is sent
  113. by ``sender``.
  114. The decorated function will be called when :meth:`send` is called with
  115. the given ``sender``, passing ``sender`` as a positional argument along
  116. with any extra keyword arguments.
  117. :param sender: Any object or :data:`ANY`. ``receiver`` will only be
  118. called when :meth:`send` is called with this sender. If ``ANY``, the
  119. receiver will be called for any sender. A receiver may be connected
  120. to multiple senders by calling :meth:`connect` multiple times.
  121. :param weak: Track the receiver with a :mod:`weakref`. The receiver will
  122. be automatically disconnected when it is garbage collected. When
  123. connecting a receiver defined within a function, set to ``False``,
  124. otherwise it will be disconnected when the function scope ends.=
  125. .. versionadded:: 1.1
  126. """
  127. def decorator(fn: F) -> F:
  128. self.connect(fn, sender, weak)
  129. return fn
  130. return decorator
  131. @contextmanager
  132. def connected_to(
  133. self, receiver: c.Callable[..., t.Any], sender: t.Any = ANY
  134. ) -> c.Generator[None, None, None]:
  135. """A context manager that temporarily connects ``receiver`` to the
  136. signal while a ``with`` block executes. When the block exits, the
  137. receiver is disconnected. Useful for tests.
  138. :param receiver: The callable to call when :meth:`send` is called with
  139. the given ``sender``, passing ``sender`` as a positional argument
  140. along with any extra keyword arguments.
  141. :param sender: Any object or :data:`ANY`. ``receiver`` will only be
  142. called when :meth:`send` is called with this sender. If ``ANY``, the
  143. receiver will be called for any sender.
  144. .. versionadded:: 1.1
  145. """
  146. self.connect(receiver, sender=sender, weak=False)
  147. try:
  148. yield None
  149. finally:
  150. self.disconnect(receiver)
  151. @contextmanager
  152. def muted(self) -> c.Generator[None, None, None]:
  153. """A context manager that temporarily disables the signal. No receivers
  154. will be called if the signal is sent, until the ``with`` block exits.
  155. Useful for tests.
  156. """
  157. self.is_muted = True
  158. try:
  159. yield None
  160. finally:
  161. self.is_muted = False
  162. def send(
  163. self,
  164. sender: t.Any | None = None,
  165. /,
  166. *,
  167. _async_wrapper: c.Callable[
  168. [c.Callable[..., c.Coroutine[t.Any, t.Any, t.Any]]], c.Callable[..., t.Any]
  169. ]
  170. | None = None,
  171. **kwargs: t.Any,
  172. ) -> list[tuple[c.Callable[..., t.Any], t.Any]]:
  173. """Call all receivers that are connected to the given ``sender``
  174. or :data:`ANY`. Each receiver is called with ``sender`` as a positional
  175. argument along with any extra keyword arguments. Return a list of
  176. ``(receiver, return value)`` tuples.
  177. The order receivers are called is undefined, but can be influenced by
  178. setting :attr:`set_class`.
  179. If a receiver raises an exception, that exception will propagate up.
  180. This makes debugging straightforward, with an assumption that correctly
  181. implemented receivers will not raise.
  182. :param sender: Call receivers connected to this sender, in addition to
  183. those connected to :data:`ANY`.
  184. :param _async_wrapper: Will be called on any receivers that are async
  185. coroutines to turn them into sync callables. For example, could run
  186. the receiver with an event loop.
  187. :param kwargs: Extra keyword arguments to pass to each receiver.
  188. .. versionchanged:: 1.7
  189. Added the ``_async_wrapper`` argument.
  190. """
  191. if self.is_muted:
  192. return []
  193. results = []
  194. for receiver in self.receivers_for(sender):
  195. if iscoroutinefunction(receiver):
  196. if _async_wrapper is None:
  197. raise RuntimeError("Cannot send to a coroutine function.")
  198. result = _async_wrapper(receiver)(sender, **kwargs)
  199. else:
  200. result = receiver(sender, **kwargs)
  201. results.append((receiver, result))
  202. return results
  203. async def send_async(
  204. self,
  205. sender: t.Any | None = None,
  206. /,
  207. *,
  208. _sync_wrapper: c.Callable[
  209. [c.Callable[..., t.Any]], c.Callable[..., c.Coroutine[t.Any, t.Any, t.Any]]
  210. ]
  211. | None = None,
  212. **kwargs: t.Any,
  213. ) -> list[tuple[c.Callable[..., t.Any], t.Any]]:
  214. """Await all receivers that are connected to the given ``sender``
  215. or :data:`ANY`. Each receiver is called with ``sender`` as a positional
  216. argument along with any extra keyword arguments. Return a list of
  217. ``(receiver, return value)`` tuples.
  218. The order receivers are called is undefined, but can be influenced by
  219. setting :attr:`set_class`.
  220. If a receiver raises an exception, that exception will propagate up.
  221. This makes debugging straightforward, with an assumption that correctly
  222. implemented receivers will not raise.
  223. :param sender: Call receivers connected to this sender, in addition to
  224. those connected to :data:`ANY`.
  225. :param _sync_wrapper: Will be called on any receivers that are sync
  226. callables to turn them into async coroutines. For example,
  227. could call the receiver in a thread.
  228. :param kwargs: Extra keyword arguments to pass to each receiver.
  229. .. versionadded:: 1.7
  230. """
  231. if self.is_muted:
  232. return []
  233. results = []
  234. for receiver in self.receivers_for(sender):
  235. if not iscoroutinefunction(receiver):
  236. if _sync_wrapper is None:
  237. raise RuntimeError("Cannot send to a non-coroutine function.")
  238. result = await _sync_wrapper(receiver)(sender, **kwargs)
  239. else:
  240. result = await receiver(sender, **kwargs)
  241. results.append((receiver, result))
  242. return results
  243. def has_receivers_for(self, sender: t.Any) -> bool:
  244. """Check if there is at least one receiver that will be called with the
  245. given ``sender``. A receiver connected to :data:`ANY` will always be
  246. called, regardless of sender. Does not check if weakly referenced
  247. receivers are still live. See :meth:`receivers_for` for a stronger
  248. search.
  249. :param sender: Check for receivers connected to this sender, in addition
  250. to those connected to :data:`ANY`.
  251. """
  252. if not self.receivers:
  253. return False
  254. if self._by_sender[ANY_ID]:
  255. return True
  256. if sender is ANY:
  257. return False
  258. return make_id(sender) in self._by_sender
  259. def receivers_for(
  260. self, sender: t.Any
  261. ) -> c.Generator[c.Callable[..., t.Any], None, None]:
  262. """Yield each receiver to be called for ``sender``, in addition to those
  263. to be called for :data:`ANY`. Weakly referenced receivers that are not
  264. live will be disconnected and skipped.
  265. :param sender: Yield receivers connected to this sender, in addition
  266. to those connected to :data:`ANY`.
  267. """
  268. # TODO: test receivers_for(ANY)
  269. if not self.receivers:
  270. return
  271. sender_id = make_id(sender)
  272. if sender_id in self._by_sender:
  273. ids = self._by_sender[ANY_ID] | self._by_sender[sender_id]
  274. else:
  275. ids = self._by_sender[ANY_ID].copy()
  276. for receiver_id in ids:
  277. receiver = self.receivers.get(receiver_id)
  278. if receiver is None:
  279. continue
  280. if isinstance(receiver, weakref.ref):
  281. strong = receiver()
  282. if strong is None:
  283. self._disconnect(receiver_id, ANY_ID)
  284. continue
  285. yield strong
  286. else:
  287. yield receiver
  288. def disconnect(self, receiver: c.Callable[..., t.Any], sender: t.Any = ANY) -> None:
  289. """Disconnect ``receiver`` from being called when the signal is sent by
  290. ``sender``.
  291. :param receiver: A connected receiver callable.
  292. :param sender: Disconnect from only this sender. By default, disconnect
  293. from all senders.
  294. """
  295. sender_id: c.Hashable
  296. if sender is ANY:
  297. sender_id = ANY_ID
  298. else:
  299. sender_id = make_id(sender)
  300. receiver_id = make_id(receiver)
  301. self._disconnect(receiver_id, sender_id)
  302. if (
  303. "receiver_disconnected" in self.__dict__
  304. and self.receiver_disconnected.receivers
  305. ):
  306. self.receiver_disconnected.send(self, receiver=receiver, sender=sender)
  307. def _disconnect(self, receiver_id: c.Hashable, sender_id: c.Hashable) -> None:
  308. if sender_id == ANY_ID:
  309. if self._by_receiver.pop(receiver_id, None) is not None:
  310. for bucket in self._by_sender.values():
  311. bucket.discard(receiver_id)
  312. self.receivers.pop(receiver_id, None)
  313. else:
  314. self._by_sender[sender_id].discard(receiver_id)
  315. self._by_receiver[receiver_id].discard(sender_id)
  316. def _make_cleanup_receiver(
  317. self, receiver_id: c.Hashable
  318. ) -> c.Callable[[weakref.ref[c.Callable[..., t.Any]]], None]:
  319. """Create a callback function to disconnect a weakly referenced
  320. receiver when it is garbage collected.
  321. """
  322. def cleanup(ref: weakref.ref[c.Callable[..., t.Any]]) -> None:
  323. # If the interpreter is shutting down, disconnecting can result in a
  324. # weird ignored exception. Don't call it in that case.
  325. if not sys.is_finalizing():
  326. self._disconnect(receiver_id, ANY_ID)
  327. return cleanup
  328. def _make_cleanup_sender(
  329. self, sender_id: c.Hashable
  330. ) -> c.Callable[[weakref.ref[t.Any]], None]:
  331. """Create a callback function to disconnect all receivers for a weakly
  332. referenced sender when it is garbage collected.
  333. """
  334. assert sender_id != ANY_ID
  335. def cleanup(ref: weakref.ref[t.Any]) -> None:
  336. self._weak_senders.pop(sender_id, None)
  337. for receiver_id in self._by_sender.pop(sender_id, ()):
  338. self._by_receiver[receiver_id].discard(sender_id)
  339. return cleanup
  340. def _cleanup_bookkeeping(self) -> None:
  341. """Prune unused sender/receiver bookkeeping. Not threadsafe.
  342. Connecting & disconnecting leaves behind a small amount of bookkeeping
  343. data. Typical workloads using Blinker, for example in most web apps,
  344. Flask, CLI scripts, etc., are not adversely affected by this
  345. bookkeeping.
  346. With a long-running process performing dynamic signal routing with high
  347. volume, e.g. connecting to function closures, senders are all unique
  348. object instances. Doing all of this over and over may cause memory usage
  349. to grow due to extraneous bookkeeping. (An empty ``set`` for each stale
  350. sender/receiver pair.)
  351. This method will prune that bookkeeping away, with the caveat that such
  352. pruning is not threadsafe. The risk is that cleanup of a fully
  353. disconnected receiver/sender pair occurs while another thread is
  354. connecting that same pair. If you are in the highly dynamic, unique
  355. receiver/sender situation that has lead you to this method, that failure
  356. mode is perhaps not a big deal for you.
  357. """
  358. for mapping in (self._by_sender, self._by_receiver):
  359. for ident, bucket in list(mapping.items()):
  360. if not bucket:
  361. mapping.pop(ident, None)
  362. def _clear_state(self) -> None:
  363. """Disconnect all receivers and senders. Useful for tests."""
  364. self._weak_senders.clear()
  365. self.receivers.clear()
  366. self._by_sender.clear()
  367. self._by_receiver.clear()
  368. class NamedSignal(Signal):
  369. """A named generic notification emitter. The name is not used by the signal
  370. itself, but matches the key in the :class:`Namespace` that it belongs to.
  371. :param name: The name of the signal within the namespace.
  372. :param doc: The docstring for the signal.
  373. """
  374. def __init__(self, name: str, doc: str | None = None) -> None:
  375. super().__init__(doc)
  376. #: The name of this signal.
  377. self.name: str = name
  378. def __repr__(self) -> str:
  379. base = super().__repr__()
  380. return f"{base[:-1]}; {self.name!r}>" # noqa: E702
  381. class Namespace(dict[str, NamedSignal]):
  382. """A dict mapping names to signals."""
  383. def signal(self, name: str, doc: str | None = None) -> NamedSignal:
  384. """Return the :class:`NamedSignal` for the given ``name``, creating it
  385. if required. Repeated calls with the same name return the same signal.
  386. :param name: The name of the signal.
  387. :param doc: The docstring of the signal.
  388. """
  389. if name not in self:
  390. self[name] = NamedSignal(name, doc)
  391. return self[name]
  392. class _PNamespaceSignal(t.Protocol):
  393. def __call__(self, name: str, doc: str | None = None) -> NamedSignal: ...
  394. default_namespace: Namespace = Namespace()
  395. """A default :class:`Namespace` for creating named signals. :func:`signal`
  396. creates a :class:`NamedSignal` in this namespace.
  397. """
  398. signal: _PNamespaceSignal = default_namespace.signal
  399. """Return a :class:`NamedSignal` in :data:`default_namespace` with the given
  400. ``name``, creating it if required. Repeated calls with the same name return the
  401. same signal.
  402. """