|
@@ -1,18 +1,25 @@
|
|
# -*- coding: utf-8 -*-
|
|
# -*- coding: utf-8 -*-
|
|
"""Implementation of the Observer pattern."""
|
|
"""Implementation of the Observer pattern."""
|
|
from __future__ import absolute_import, unicode_literals
|
|
from __future__ import absolute_import, unicode_literals
|
|
|
|
+import sys
|
|
|
|
+import threading
|
|
import weakref
|
|
import weakref
|
|
|
|
+import warnings
|
|
|
|
+from celery.exceptions import CDeprecationWarning
|
|
from celery.five import python_2_unicode_compatible, range, text_t
|
|
from celery.five import python_2_unicode_compatible, range, text_t
|
|
from celery.local import PromiseProxy, Proxy
|
|
from celery.local import PromiseProxy, Proxy
|
|
|
|
+from celery.utils.functional import fun_accepts_kwargs
|
|
from celery.utils.log import get_logger
|
|
from celery.utils.log import get_logger
|
|
-from . import saferef
|
|
|
|
|
|
+try:
|
|
|
|
+ from weakref import WeakMethod
|
|
|
|
+except ImportError:
|
|
|
|
+ from .weakref_backports import WeakMethod # noqa
|
|
|
|
|
|
__all__ = ['Signal']
|
|
__all__ = ['Signal']
|
|
|
|
|
|
|
|
+PY3 = sys.version_info[0] >= 3
|
|
logger = get_logger(__name__)
|
|
logger = get_logger(__name__)
|
|
|
|
|
|
-WEAKREF_TYPES = (weakref.ReferenceType, saferef.BoundMethodWeakref)
|
|
|
|
-
|
|
|
|
|
|
|
|
def _make_id(target): # pragma: no cover
|
|
def _make_id(target): # pragma: no cover
|
|
if isinstance(target, Proxy):
|
|
if isinstance(target, Proxy):
|
|
@@ -23,25 +30,42 @@ def _make_id(target): # pragma: no cover
|
|
if hasattr(target, '__func__'):
|
|
if hasattr(target, '__func__'):
|
|
return (id(target.__self__), id(target.__func__))
|
|
return (id(target.__self__), id(target.__func__))
|
|
return id(target)
|
|
return id(target)
|
|
|
|
+NONE_ID = _make_id(None)
|
|
|
|
+
|
|
|
|
+NO_RECEIVERS = object()
|
|
|
|
|
|
|
|
|
|
@python_2_unicode_compatible
|
|
@python_2_unicode_compatible
|
|
class Signal(object): # pragma: no cover
|
|
class Signal(object): # pragma: no cover
|
|
- """Observer pattern implementation.
|
|
|
|
|
|
+ """Create new signal.
|
|
|
|
|
|
- Arguments:
|
|
|
|
|
|
+ Keyword Arguments:
|
|
providing_args (List): A list of the arguments this signal can pass
|
|
providing_args (List): A list of the arguments this signal can pass
|
|
along in a :meth:`send` call.
|
|
along in a :meth:`send` call.
|
|
|
|
+ use_caching (bool): Enable receiver cache.
|
|
|
|
+ name (str): Name of signal, used for debugging purposes.
|
|
"""
|
|
"""
|
|
|
|
|
|
#: Holds a dictionary of
|
|
#: Holds a dictionary of
|
|
#: ``{receiverkey (id): weakref(receiver)}`` mappings.
|
|
#: ``{receiverkey (id): weakref(receiver)}`` mappings.
|
|
receivers = None
|
|
receivers = None
|
|
|
|
|
|
- def __init__(self, providing_args=None):
|
|
|
|
|
|
+ def __init__(self, providing_args=None, use_caching=True, name=None):
|
|
self.receivers = []
|
|
self.receivers = []
|
|
self.providing_args = set(
|
|
self.providing_args = set(
|
|
providing_args if providing_args is not None else [])
|
|
providing_args if providing_args is not None else [])
|
|
|
|
+ self.lock = threading.Lock()
|
|
|
|
+ self.use_caching = use_caching
|
|
|
|
+ self.name = name
|
|
|
|
+ # For convenience we create empty caches even if they are not used.
|
|
|
|
+ # A note about caching: if use_caching is defined, then for each
|
|
|
|
+ # distinct sender we cache the receivers that sender has in
|
|
|
|
+ # 'sender_receivers_cache'. The cache is cleaned when .connect() or
|
|
|
|
+ # .disconnect() is called and populated on .send().
|
|
|
|
+ self.sender_receivers_cache = (
|
|
|
|
+ weakref.WeakKeyDictionary() if use_caching else {}
|
|
|
|
+ )
|
|
|
|
+ self._dead_receivers = False
|
|
|
|
|
|
def _connect_proxy(self, fun, sender, weak, dispatch_uid):
|
|
def _connect_proxy(self, fun, sender, weak, dispatch_uid):
|
|
return self.connect(
|
|
return self.connect(
|
|
@@ -57,8 +81,7 @@ class Signal(object): # pragma: no cover
|
|
receive signals. Receivers must be hashable objects.
|
|
receive signals. Receivers must be hashable objects.
|
|
|
|
|
|
if weak is :const:`True`, then receiver must be
|
|
if weak is :const:`True`, then receiver must be
|
|
- weak-referenceable (more precisely :func:`saferef.safe_ref()`
|
|
|
|
- must be able to create a reference to the receiver).
|
|
|
|
|
|
+ weak-referenceable.
|
|
|
|
|
|
Receivers must be able to accept keyword arguments.
|
|
Receivers must be able to accept keyword arguments.
|
|
|
|
|
|
@@ -67,7 +90,7 @@ class Signal(object): # pragma: no cover
|
|
`dispatch_uid`.
|
|
`dispatch_uid`.
|
|
|
|
|
|
sender (Any): The sender to which the receiver should respond.
|
|
sender (Any): The sender to which the receiver should respond.
|
|
- Must either be of type :class:`Signal`, or :const:`None` to
|
|
|
|
|
|
+ Must either be a Python object, or :const:`None` to
|
|
receive events from any sender.
|
|
receive events from any sender.
|
|
|
|
|
|
weak (bool): Whether to use weak references to the receiver.
|
|
weak (bool): Whether to use weak references to the receiver.
|
|
@@ -82,39 +105,61 @@ class Signal(object): # pragma: no cover
|
|
def _handle_options(sender=None, weak=True, dispatch_uid=None):
|
|
def _handle_options(sender=None, weak=True, dispatch_uid=None):
|
|
|
|
|
|
def _connect_signal(fun):
|
|
def _connect_signal(fun):
|
|
- receiver = fun
|
|
|
|
-
|
|
|
|
- if isinstance(sender, PromiseProxy):
|
|
|
|
- sender.__then__(
|
|
|
|
- self._connect_proxy, fun, sender, weak, dispatch_uid,
|
|
|
|
- )
|
|
|
|
- return fun
|
|
|
|
-
|
|
|
|
- if dispatch_uid:
|
|
|
|
- lookup_key = (dispatch_uid, _make_id(sender))
|
|
|
|
- else:
|
|
|
|
- lookup_key = (_make_id(receiver), _make_id(sender))
|
|
|
|
-
|
|
|
|
- if weak:
|
|
|
|
- receiver = saferef.safe_ref(
|
|
|
|
- receiver, on_delete=self._remove_receiver,
|
|
|
|
- )
|
|
|
|
-
|
|
|
|
- for r_key, _ in self.receivers:
|
|
|
|
- if r_key == lookup_key:
|
|
|
|
- break
|
|
|
|
- else:
|
|
|
|
- self.receivers.append((lookup_key, receiver))
|
|
|
|
-
|
|
|
|
|
|
+ self._connect_signal(fun, sender, weak, dispatch_uid)
|
|
return fun
|
|
return fun
|
|
-
|
|
|
|
return _connect_signal
|
|
return _connect_signal
|
|
|
|
|
|
if args and callable(args[0]):
|
|
if args and callable(args[0]):
|
|
return _handle_options(*args[1:], **kwargs)(args[0])
|
|
return _handle_options(*args[1:], **kwargs)(args[0])
|
|
return _handle_options(*args, **kwargs)
|
|
return _handle_options(*args, **kwargs)
|
|
|
|
|
|
- def disconnect(self, receiver=None, sender=None, weak=True,
|
|
|
|
|
|
+ def _connect_signal(self, receiver, sender, weak, dispatch_uid):
|
|
|
|
+ assert callable(receiver), 'Signal receivers must be callable'
|
|
|
|
+ if not fun_accepts_kwargs(receiver):
|
|
|
|
+ raise ValueError(
|
|
|
|
+ 'Signal receiver must accept keyword arguments.')
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ if isinstance(sender, PromiseProxy):
|
|
|
|
+ sender.__then__(
|
|
|
|
+ self._connect_proxy, receiver, sender, weak, dispatch_uid,
|
|
|
|
+ )
|
|
|
|
+ return receiver
|
|
|
|
+
|
|
|
|
+ if dispatch_uid:
|
|
|
|
+ lookup_key = (dispatch_uid, _make_id(sender))
|
|
|
|
+ else:
|
|
|
|
+ lookup_key = (_make_id(receiver), _make_id(sender))
|
|
|
|
+
|
|
|
|
+ if weak:
|
|
|
|
+ ref = weakref.ref
|
|
|
|
+ receiver_object = receiver
|
|
|
|
+ # Check for bound methods
|
|
|
|
+ try:
|
|
|
|
+ receiver.__self__
|
|
|
|
+ receiver.__func__
|
|
|
|
+ except AttributeError:
|
|
|
|
+ pass
|
|
|
|
+ else:
|
|
|
|
+ ref = WeakMethod
|
|
|
|
+ receiver_object = receiver.__self__
|
|
|
|
+ if PY3:
|
|
|
|
+ receiver = ref(receiver)
|
|
|
|
+ weakref.finalize(receiver_object, self._remove_receiver)
|
|
|
|
+ else:
|
|
|
|
+ receiver = ref(receiver, self._remove_receiver)
|
|
|
|
+
|
|
|
|
+ with self.lock:
|
|
|
|
+ self._clear_dead_receivers()
|
|
|
|
+ for r_key, _ in self.receivers:
|
|
|
|
+ if r_key == lookup_key:
|
|
|
|
+ break
|
|
|
|
+ else:
|
|
|
|
+ self.receivers.append((lookup_key, receiver))
|
|
|
|
+ self.sender_receivers_cache.clear()
|
|
|
|
+ return receiver
|
|
|
|
+
|
|
|
|
+ def disconnect(self, receiver=None, sender=None, weak=None,
|
|
dispatch_uid=None):
|
|
dispatch_uid=None):
|
|
"""Disconnect receiver from sender for signal.
|
|
"""Disconnect receiver from sender for signal.
|
|
|
|
|
|
@@ -132,16 +177,29 @@ class Signal(object): # pragma: no cover
|
|
dispatch_uid (Hashable): The unique identifier of the receiver
|
|
dispatch_uid (Hashable): The unique identifier of the receiver
|
|
to disconnect.
|
|
to disconnect.
|
|
"""
|
|
"""
|
|
|
|
+ if weak is not None:
|
|
|
|
+ warnings.warn(
|
|
|
|
+ 'Passing `weak` to disconnect has no effect.',
|
|
|
|
+ CDeprecationWarning, stacklevel=2)
|
|
if dispatch_uid:
|
|
if dispatch_uid:
|
|
lookup_key = (dispatch_uid, _make_id(sender))
|
|
lookup_key = (dispatch_uid, _make_id(sender))
|
|
else:
|
|
else:
|
|
lookup_key = (_make_id(receiver), _make_id(sender))
|
|
lookup_key = (_make_id(receiver), _make_id(sender))
|
|
|
|
|
|
- for index in range(len(self.receivers)):
|
|
|
|
- (r_key, _) = self.receivers[index]
|
|
|
|
- if r_key == lookup_key:
|
|
|
|
- del self.receivers[index]
|
|
|
|
- break
|
|
|
|
|
|
+ disconnected = False
|
|
|
|
+ with self.lock:
|
|
|
|
+ self._clear_dead_receivers()
|
|
|
|
+ for index in range(len(self.receivers)):
|
|
|
|
+ (r_key, _) = self.receivers[index]
|
|
|
|
+ if r_key == lookup_key:
|
|
|
|
+ disconnected = True
|
|
|
|
+ del self.receivers[index]
|
|
|
|
+ break
|
|
|
|
+ self.sender_receivers_cache.clear()
|
|
|
|
+ return disconnected
|
|
|
|
+
|
|
|
|
+ def has_listeners(self, sender=None):
|
|
|
|
+ return bool(self._live_receivers(sender))
|
|
|
|
|
|
def send(self, sender, **named):
|
|
def send(self, sender, **named):
|
|
"""Send signal from sender to all connected receivers.
|
|
"""Send signal from sender to all connected receivers.
|
|
@@ -159,53 +217,88 @@ class Signal(object): # pragma: no cover
|
|
List: of tuple pairs: `[(receiver, response), … ]`.
|
|
List: of tuple pairs: `[(receiver, response), … ]`.
|
|
"""
|
|
"""
|
|
responses = []
|
|
responses = []
|
|
- if not self.receivers:
|
|
|
|
|
|
+ if not self.receivers or \
|
|
|
|
+ self.sender_receivers_cache.get(sender) is NO_RECEIVERS:
|
|
return responses
|
|
return responses
|
|
|
|
|
|
- for receiver in self._live_receivers(_make_id(sender)):
|
|
|
|
|
|
+ for receiver in self._live_receivers(sender):
|
|
try:
|
|
try:
|
|
response = receiver(signal=self, sender=sender, **named)
|
|
response = receiver(signal=self, sender=sender, **named)
|
|
except Exception as exc: # pylint: disable=broad-except
|
|
except Exception as exc: # pylint: disable=broad-except
|
|
|
|
+ if not hasattr(exc, '__traceback__'):
|
|
|
|
+ exc.__traceback__ = sys.exc_info()[2]
|
|
logger.exception(
|
|
logger.exception(
|
|
'Signal handler %r raised: %r', receiver, exc)
|
|
'Signal handler %r raised: %r', receiver, exc)
|
|
|
|
+ responses.append((receiver, exc))
|
|
else:
|
|
else:
|
|
responses.append((receiver, response))
|
|
responses.append((receiver, response))
|
|
return responses
|
|
return responses
|
|
-
|
|
|
|
- def _live_receivers(self, senderkey):
|
|
|
|
|
|
+ send_robust = send # Compat with Django interface.
|
|
|
|
+
|
|
|
|
+ def _clear_dead_receivers(self):
|
|
|
|
+ # Warning: caller is assumed to hold self.lock
|
|
|
|
+ if self._dead_receivers:
|
|
|
|
+ self._dead_receivers = False
|
|
|
|
+ new_receivers = []
|
|
|
|
+ for r in self.receivers:
|
|
|
|
+ if isinstance(r[1], weakref.ReferenceType) and r[1]() is None:
|
|
|
|
+ continue
|
|
|
|
+ new_receivers.append(r)
|
|
|
|
+ self.receivers = new_receivers
|
|
|
|
+
|
|
|
|
+ def _live_receivers(self, sender):
|
|
"""Filter sequence of receivers to get resolved, live receivers.
|
|
"""Filter sequence of receivers to get resolved, live receivers.
|
|
|
|
|
|
This checks for weak references and resolves them, then returning only
|
|
This checks for weak references and resolves them, then returning only
|
|
live receivers.
|
|
live receivers.
|
|
"""
|
|
"""
|
|
- none_senderkey = _make_id(None)
|
|
|
|
- receivers = []
|
|
|
|
-
|
|
|
|
- for (_, r_senderkey), receiver in self.receivers:
|
|
|
|
- if r_senderkey == none_senderkey or r_senderkey == senderkey:
|
|
|
|
- if isinstance(receiver, WEAKREF_TYPES):
|
|
|
|
- # Dereference the weak reference.
|
|
|
|
- receiver = receiver()
|
|
|
|
- if receiver is not None:
|
|
|
|
|
|
+ receivers = None
|
|
|
|
+ if self.use_caching and not self._dead_receivers:
|
|
|
|
+ receivers = self.sender_receivers_cache.get(sender)
|
|
|
|
+ # We could end up here with NO_RECEIVERS even if we do check this
|
|
|
|
+ # case in .send() prior to calling _Live_receivers() due to
|
|
|
|
+ # concurrent .send() call.
|
|
|
|
+ if receivers is NO_RECEIVERS:
|
|
|
|
+ return []
|
|
|
|
+ if receivers is None:
|
|
|
|
+ with self.lock:
|
|
|
|
+ self._clear_dead_receivers()
|
|
|
|
+ senderkey = _make_id(sender)
|
|
|
|
+ receivers = []
|
|
|
|
+ for (receiverkey, r_senderkey), receiver in self.receivers:
|
|
|
|
+ if r_senderkey == NONE_ID or r_senderkey == senderkey:
|
|
receivers.append(receiver)
|
|
receivers.append(receiver)
|
|
- else:
|
|
|
|
- receivers.append(receiver)
|
|
|
|
- return receivers
|
|
|
|
|
|
+ if self.use_caching:
|
|
|
|
+ if not receivers:
|
|
|
|
+ self.sender_receivers_cache[sender] = NO_RECEIVERS
|
|
|
|
+ else:
|
|
|
|
+ # Note: we must cache the weakref versions.
|
|
|
|
+ self.sender_receivers_cache[sender] = receivers
|
|
|
|
+ non_weak_receivers = []
|
|
|
|
+ for receiver in receivers:
|
|
|
|
+ if isinstance(receiver, weakref.ReferenceType):
|
|
|
|
+ # Dereference the weak reference.
|
|
|
|
+ receiver = receiver()
|
|
|
|
+ if receiver is not None:
|
|
|
|
+ non_weak_receivers.append(receiver)
|
|
|
|
+ else:
|
|
|
|
+ non_weak_receivers.append(receiver)
|
|
|
|
+ return non_weak_receivers
|
|
|
|
|
|
- def _remove_receiver(self, receiver):
|
|
|
|
|
|
+ def _remove_receiver(self, receiver=None):
|
|
"""Remove dead receivers from connections."""
|
|
"""Remove dead receivers from connections."""
|
|
- to_remove = []
|
|
|
|
- for key, connected_receiver in self.receivers:
|
|
|
|
- if connected_receiver == receiver:
|
|
|
|
- to_remove.append(key)
|
|
|
|
- for key in to_remove:
|
|
|
|
- for idx, (r_key, _) in enumerate(self.receivers):
|
|
|
|
- if r_key == key:
|
|
|
|
- del self.receivers[idx]
|
|
|
|
|
|
+ # Mark that the self..receivers first has dead weakrefs. If so,
|
|
|
|
+ # we will clean those up in connect, disconnect and _live_receivers
|
|
|
|
+ # while holding self.lock. Note that doing the cleanup here isn't a
|
|
|
|
+ # good idea, _remove_receiver() will be called as a side effect of
|
|
|
|
+ # garbage collection, and so the call can happen wh ile we are already
|
|
|
|
+ # holding self.lock.
|
|
|
|
+ self._dead_receivers = True
|
|
|
|
|
|
def __repr__(self):
|
|
def __repr__(self):
|
|
"""``repr(signal)``."""
|
|
"""``repr(signal)``."""
|
|
- return '<Signal: {0}>'.format(type(self).__name__)
|
|
|
|
|
|
+ return '<{0}: {1} providing_args={2!r}>'.format(
|
|
|
|
+ type(self).__name__, self.name, self.providing_args)
|
|
|
|
|
|
def __str__(self):
|
|
def __str__(self):
|
|
"""``str(signal)``."""
|
|
"""``str(signal)``."""
|