123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512 |
- from __future__ import annotations
- import collections.abc as c
- import sys
- import typing as t
- import weakref
- from collections import defaultdict
- from contextlib import contextmanager
- from functools import cached_property
- from inspect import iscoroutinefunction
- from ._utilities import make_id
- from ._utilities import make_ref
- from ._utilities import Symbol
- F = t.TypeVar("F", bound=c.Callable[..., t.Any])
- ANY = Symbol("ANY")
- """Symbol for "any sender"."""
- ANY_ID = 0
- class Signal:
- """A notification emitter.
- :param doc: The docstring for the signal.
- """
- ANY = ANY
- """An alias for the :data:`~blinker.ANY` sender symbol."""
- set_class: type[set[t.Any]] = set
- """The set class to use for tracking connected receivers and senders.
- Python's ``set`` is unordered. If receivers must be dispatched in the order
- they were connected, an ordered set implementation can be used.
- .. versionadded:: 1.7
- """
- @cached_property
- def receiver_connected(self) -> Signal:
- """Emitted at the end of each :meth:`connect` call.
- The signal sender is the signal instance, and the :meth:`connect`
- arguments are passed through: ``receiver``, ``sender``, and ``weak``.
- .. versionadded:: 1.2
- """
- return Signal(doc="Emitted after a receiver connects.")
- @cached_property
- def receiver_disconnected(self) -> Signal:
- """Emitted at the end of each :meth:`disconnect` call.
- The sender is the signal instance, and the :meth:`disconnect` arguments
- are passed through: ``receiver`` and ``sender``.
- This signal is emitted **only** when :meth:`disconnect` is called
- explicitly. This signal cannot be emitted by an automatic disconnect
- when a weakly referenced receiver or sender goes out of scope, as the
- instance is no longer be available to be used as the sender for this
- signal.
- An alternative approach is available by subscribing to
- :attr:`receiver_connected` and setting up a custom weakref cleanup
- callback on weak receivers and senders.
- .. versionadded:: 1.2
- """
- return Signal(doc="Emitted after a receiver disconnects.")
- def __init__(self, doc: str | None = None) -> None:
- if doc:
- self.__doc__ = doc
- self.receivers: dict[
- t.Any, weakref.ref[c.Callable[..., t.Any]] | c.Callable[..., t.Any]
- ] = {}
- """The map of connected receivers. Useful to quickly check if any
- receivers are connected to the signal: ``if s.receivers:``. The
- structure and data is not part of the public API, but checking its
- boolean value is.
- """
- self.is_muted: bool = False
- self._by_receiver: dict[t.Any, set[t.Any]] = defaultdict(self.set_class)
- self._by_sender: dict[t.Any, set[t.Any]] = defaultdict(self.set_class)
- self._weak_senders: dict[t.Any, weakref.ref[t.Any]] = {}
- def connect(self, receiver: F, sender: t.Any = ANY, weak: bool = True) -> F:
- """Connect ``receiver`` to be called when the signal is sent by
- ``sender``.
- :param receiver: The callable to call when :meth:`send` is called with
- the given ``sender``, passing ``sender`` as a positional argument
- along with any extra keyword arguments.
- :param sender: Any object or :data:`ANY`. ``receiver`` will only be
- called when :meth:`send` is called with this sender. If ``ANY``, the
- receiver will be called for any sender. A receiver may be connected
- to multiple senders by calling :meth:`connect` multiple times.
- :param weak: Track the receiver with a :mod:`weakref`. The receiver will
- be automatically disconnected when it is garbage collected. When
- connecting a receiver defined within a function, set to ``False``,
- otherwise it will be disconnected when the function scope ends.
- """
- receiver_id = make_id(receiver)
- sender_id = ANY_ID if sender is ANY else make_id(sender)
- if weak:
- self.receivers[receiver_id] = make_ref(
- receiver, self._make_cleanup_receiver(receiver_id)
- )
- else:
- self.receivers[receiver_id] = receiver
- self._by_sender[sender_id].add(receiver_id)
- self._by_receiver[receiver_id].add(sender_id)
- if sender is not ANY and sender_id not in self._weak_senders:
- # store a cleanup for weakref-able senders
- try:
- self._weak_senders[sender_id] = make_ref(
- sender, self._make_cleanup_sender(sender_id)
- )
- except TypeError:
- pass
- if "receiver_connected" in self.__dict__ and self.receiver_connected.receivers:
- try:
- self.receiver_connected.send(
- self, receiver=receiver, sender=sender, weak=weak
- )
- except TypeError:
- # TODO no explanation or test for this
- self.disconnect(receiver, sender)
- raise
- return receiver
- def connect_via(self, sender: t.Any, weak: bool = False) -> c.Callable[[F], F]:
- """Connect the decorated function to be called when the signal is sent
- by ``sender``.
- The decorated function will be called when :meth:`send` is called with
- the given ``sender``, passing ``sender`` as a positional argument along
- with any extra keyword arguments.
- :param sender: Any object or :data:`ANY`. ``receiver`` will only be
- called when :meth:`send` is called with this sender. If ``ANY``, the
- receiver will be called for any sender. A receiver may be connected
- to multiple senders by calling :meth:`connect` multiple times.
- :param weak: Track the receiver with a :mod:`weakref`. The receiver will
- be automatically disconnected when it is garbage collected. When
- connecting a receiver defined within a function, set to ``False``,
- otherwise it will be disconnected when the function scope ends.=
- .. versionadded:: 1.1
- """
- def decorator(fn: F) -> F:
- self.connect(fn, sender, weak)
- return fn
- return decorator
- @contextmanager
- def connected_to(
- self, receiver: c.Callable[..., t.Any], sender: t.Any = ANY
- ) -> c.Generator[None, None, None]:
- """A context manager that temporarily connects ``receiver`` to the
- signal while a ``with`` block executes. When the block exits, the
- receiver is disconnected. Useful for tests.
- :param receiver: The callable to call when :meth:`send` is called with
- the given ``sender``, passing ``sender`` as a positional argument
- along with any extra keyword arguments.
- :param sender: Any object or :data:`ANY`. ``receiver`` will only be
- called when :meth:`send` is called with this sender. If ``ANY``, the
- receiver will be called for any sender.
- .. versionadded:: 1.1
- """
- self.connect(receiver, sender=sender, weak=False)
- try:
- yield None
- finally:
- self.disconnect(receiver)
- @contextmanager
- def muted(self) -> c.Generator[None, None, None]:
- """A context manager that temporarily disables the signal. No receivers
- will be called if the signal is sent, until the ``with`` block exits.
- Useful for tests.
- """
- self.is_muted = True
- try:
- yield None
- finally:
- self.is_muted = False
- def send(
- self,
- sender: t.Any | None = None,
- /,
- *,
- _async_wrapper: c.Callable[
- [c.Callable[..., c.Coroutine[t.Any, t.Any, t.Any]]], c.Callable[..., t.Any]
- ]
- | None = None,
- **kwargs: t.Any,
- ) -> list[tuple[c.Callable[..., t.Any], t.Any]]:
- """Call all receivers that are connected to the given ``sender``
- or :data:`ANY`. Each receiver is called with ``sender`` as a positional
- argument along with any extra keyword arguments. Return a list of
- ``(receiver, return value)`` tuples.
- The order receivers are called is undefined, but can be influenced by
- setting :attr:`set_class`.
- If a receiver raises an exception, that exception will propagate up.
- This makes debugging straightforward, with an assumption that correctly
- implemented receivers will not raise.
- :param sender: Call receivers connected to this sender, in addition to
- those connected to :data:`ANY`.
- :param _async_wrapper: Will be called on any receivers that are async
- coroutines to turn them into sync callables. For example, could run
- the receiver with an event loop.
- :param kwargs: Extra keyword arguments to pass to each receiver.
- .. versionchanged:: 1.7
- Added the ``_async_wrapper`` argument.
- """
- if self.is_muted:
- return []
- results = []
- for receiver in self.receivers_for(sender):
- if iscoroutinefunction(receiver):
- if _async_wrapper is None:
- raise RuntimeError("Cannot send to a coroutine function.")
- result = _async_wrapper(receiver)(sender, **kwargs)
- else:
- result = receiver(sender, **kwargs)
- results.append((receiver, result))
- return results
- async def send_async(
- self,
- sender: t.Any | None = None,
- /,
- *,
- _sync_wrapper: c.Callable[
- [c.Callable[..., t.Any]], c.Callable[..., c.Coroutine[t.Any, t.Any, t.Any]]
- ]
- | None = None,
- **kwargs: t.Any,
- ) -> list[tuple[c.Callable[..., t.Any], t.Any]]:
- """Await all receivers that are connected to the given ``sender``
- or :data:`ANY`. Each receiver is called with ``sender`` as a positional
- argument along with any extra keyword arguments. Return a list of
- ``(receiver, return value)`` tuples.
- The order receivers are called is undefined, but can be influenced by
- setting :attr:`set_class`.
- If a receiver raises an exception, that exception will propagate up.
- This makes debugging straightforward, with an assumption that correctly
- implemented receivers will not raise.
- :param sender: Call receivers connected to this sender, in addition to
- those connected to :data:`ANY`.
- :param _sync_wrapper: Will be called on any receivers that are sync
- callables to turn them into async coroutines. For example,
- could call the receiver in a thread.
- :param kwargs: Extra keyword arguments to pass to each receiver.
- .. versionadded:: 1.7
- """
- if self.is_muted:
- return []
- results = []
- for receiver in self.receivers_for(sender):
- if not iscoroutinefunction(receiver):
- if _sync_wrapper is None:
- raise RuntimeError("Cannot send to a non-coroutine function.")
- result = await _sync_wrapper(receiver)(sender, **kwargs)
- else:
- result = await receiver(sender, **kwargs)
- results.append((receiver, result))
- return results
- def has_receivers_for(self, sender: t.Any) -> bool:
- """Check if there is at least one receiver that will be called with the
- given ``sender``. A receiver connected to :data:`ANY` will always be
- called, regardless of sender. Does not check if weakly referenced
- receivers are still live. See :meth:`receivers_for` for a stronger
- search.
- :param sender: Check for receivers connected to this sender, in addition
- to those connected to :data:`ANY`.
- """
- if not self.receivers:
- return False
- if self._by_sender[ANY_ID]:
- return True
- if sender is ANY:
- return False
- return make_id(sender) in self._by_sender
- def receivers_for(
- self, sender: t.Any
- ) -> c.Generator[c.Callable[..., t.Any], None, None]:
- """Yield each receiver to be called for ``sender``, in addition to those
- to be called for :data:`ANY`. Weakly referenced receivers that are not
- live will be disconnected and skipped.
- :param sender: Yield receivers connected to this sender, in addition
- to those connected to :data:`ANY`.
- """
- # TODO: test receivers_for(ANY)
- if not self.receivers:
- return
- sender_id = make_id(sender)
- if sender_id in self._by_sender:
- ids = self._by_sender[ANY_ID] | self._by_sender[sender_id]
- else:
- ids = self._by_sender[ANY_ID].copy()
- for receiver_id in ids:
- receiver = self.receivers.get(receiver_id)
- if receiver is None:
- continue
- if isinstance(receiver, weakref.ref):
- strong = receiver()
- if strong is None:
- self._disconnect(receiver_id, ANY_ID)
- continue
- yield strong
- else:
- yield receiver
- def disconnect(self, receiver: c.Callable[..., t.Any], sender: t.Any = ANY) -> None:
- """Disconnect ``receiver`` from being called when the signal is sent by
- ``sender``.
- :param receiver: A connected receiver callable.
- :param sender: Disconnect from only this sender. By default, disconnect
- from all senders.
- """
- sender_id: c.Hashable
- if sender is ANY:
- sender_id = ANY_ID
- else:
- sender_id = make_id(sender)
- receiver_id = make_id(receiver)
- self._disconnect(receiver_id, sender_id)
- if (
- "receiver_disconnected" in self.__dict__
- and self.receiver_disconnected.receivers
- ):
- self.receiver_disconnected.send(self, receiver=receiver, sender=sender)
- def _disconnect(self, receiver_id: c.Hashable, sender_id: c.Hashable) -> None:
- if sender_id == ANY_ID:
- if self._by_receiver.pop(receiver_id, None) is not None:
- for bucket in self._by_sender.values():
- bucket.discard(receiver_id)
- self.receivers.pop(receiver_id, None)
- else:
- self._by_sender[sender_id].discard(receiver_id)
- self._by_receiver[receiver_id].discard(sender_id)
- def _make_cleanup_receiver(
- self, receiver_id: c.Hashable
- ) -> c.Callable[[weakref.ref[c.Callable[..., t.Any]]], None]:
- """Create a callback function to disconnect a weakly referenced
- receiver when it is garbage collected.
- """
- def cleanup(ref: weakref.ref[c.Callable[..., t.Any]]) -> None:
- # If the interpreter is shutting down, disconnecting can result in a
- # weird ignored exception. Don't call it in that case.
- if not sys.is_finalizing():
- self._disconnect(receiver_id, ANY_ID)
- return cleanup
- def _make_cleanup_sender(
- self, sender_id: c.Hashable
- ) -> c.Callable[[weakref.ref[t.Any]], None]:
- """Create a callback function to disconnect all receivers for a weakly
- referenced sender when it is garbage collected.
- """
- assert sender_id != ANY_ID
- def cleanup(ref: weakref.ref[t.Any]) -> None:
- self._weak_senders.pop(sender_id, None)
- for receiver_id in self._by_sender.pop(sender_id, ()):
- self._by_receiver[receiver_id].discard(sender_id)
- return cleanup
- def _cleanup_bookkeeping(self) -> None:
- """Prune unused sender/receiver bookkeeping. Not threadsafe.
- Connecting & disconnecting leaves behind a small amount of bookkeeping
- data. Typical workloads using Blinker, for example in most web apps,
- Flask, CLI scripts, etc., are not adversely affected by this
- bookkeeping.
- With a long-running process performing dynamic signal routing with high
- volume, e.g. connecting to function closures, senders are all unique
- object instances. Doing all of this over and over may cause memory usage
- to grow due to extraneous bookkeeping. (An empty ``set`` for each stale
- sender/receiver pair.)
- This method will prune that bookkeeping away, with the caveat that such
- pruning is not threadsafe. The risk is that cleanup of a fully
- disconnected receiver/sender pair occurs while another thread is
- connecting that same pair. If you are in the highly dynamic, unique
- receiver/sender situation that has lead you to this method, that failure
- mode is perhaps not a big deal for you.
- """
- for mapping in (self._by_sender, self._by_receiver):
- for ident, bucket in list(mapping.items()):
- if not bucket:
- mapping.pop(ident, None)
- def _clear_state(self) -> None:
- """Disconnect all receivers and senders. Useful for tests."""
- self._weak_senders.clear()
- self.receivers.clear()
- self._by_sender.clear()
- self._by_receiver.clear()
- class NamedSignal(Signal):
- """A named generic notification emitter. The name is not used by the signal
- itself, but matches the key in the :class:`Namespace` that it belongs to.
- :param name: The name of the signal within the namespace.
- :param doc: The docstring for the signal.
- """
- def __init__(self, name: str, doc: str | None = None) -> None:
- super().__init__(doc)
- #: The name of this signal.
- self.name: str = name
- def __repr__(self) -> str:
- base = super().__repr__()
- return f"{base[:-1]}; {self.name!r}>" # noqa: E702
- class Namespace(dict[str, NamedSignal]):
- """A dict mapping names to signals."""
- def signal(self, name: str, doc: str | None = None) -> NamedSignal:
- """Return the :class:`NamedSignal` for the given ``name``, creating it
- if required. Repeated calls with the same name return the same signal.
- :param name: The name of the signal.
- :param doc: The docstring of the signal.
- """
- if name not in self:
- self[name] = NamedSignal(name, doc)
- return self[name]
- class _PNamespaceSignal(t.Protocol):
- def __call__(self, name: str, doc: str | None = None) -> NamedSignal: ...
- default_namespace: Namespace = Namespace()
- """A default :class:`Namespace` for creating named signals. :func:`signal`
- creates a :class:`NamedSignal` in this namespace.
- """
- signal: _PNamespaceSignal = default_namespace.signal
- """Return a :class:`NamedSignal` in :data:`default_namespace` with the given
- ``name``, creating it if required. Repeated calls with the same name return the
- same signal.
- """
|