signal.py 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212
  1. # -*- coding: utf-8 -*-
  2. """Implementation of the Observer pattern."""
  3. from __future__ import absolute_import, unicode_literals
  4. import weakref
  5. from celery.five import python_2_unicode_compatible, range, text_t
  6. from celery.local import PromiseProxy, Proxy
  7. from celery.utils.log import get_logger
  8. from . import saferef
  9. __all__ = ['Signal']
  10. logger = get_logger(__name__)
  11. WEAKREF_TYPES = (weakref.ReferenceType, saferef.BoundMethodWeakref)
  12. def _make_id(target): # pragma: no cover
  13. if isinstance(target, Proxy):
  14. target = target._get_current_object()
  15. if isinstance(target, (bytes, text_t)):
  16. # see Issue #2475
  17. return target
  18. if hasattr(target, '__func__'):
  19. return (id(target.__self__), id(target.__func__))
  20. return id(target)
  21. @python_2_unicode_compatible
  22. class Signal(object): # pragma: no cover
  23. """Observer pattern implementation.
  24. Arguments:
  25. providing_args (List): A list of the arguments this signal can pass
  26. along in a :meth:`send` call.
  27. """
  28. #: Holds a dictionary of
  29. #: ``{receiverkey (id): weakref(receiver)}`` mappings.
  30. receivers = None
  31. def __init__(self, providing_args=None):
  32. self.receivers = []
  33. self.providing_args = set(
  34. providing_args if providing_args is not None else [])
  35. def _connect_proxy(self, fun, sender, weak, dispatch_uid):
  36. return self.connect(
  37. fun, sender=sender._get_current_object(),
  38. weak=weak, dispatch_uid=dispatch_uid,
  39. )
  40. def connect(self, *args, **kwargs):
  41. """Connect receiver to sender for signal.
  42. Arguments:
  43. receiver (Callable): A function or an instance method which is to
  44. receive signals. Receivers must be hashable objects.
  45. if weak is :const:`True`, then receiver must be
  46. weak-referenceable (more precisely :func:`saferef.safe_ref()`
  47. must be able to create a reference to the receiver).
  48. Receivers must be able to accept keyword arguments.
  49. If receivers have a `dispatch_uid` attribute, the receiver will
  50. not be added if another receiver already exists with that
  51. `dispatch_uid`.
  52. sender (Any): The sender to which the receiver should respond.
  53. Must either be of type :class:`Signal`, or :const:`None` to
  54. receive events from any sender.
  55. weak (bool): Whether to use weak references to the receiver.
  56. By default, the module will attempt to use weak references to
  57. the receiver objects. If this parameter is false, then strong
  58. references will be used.
  59. dispatch_uid (Hashable): An identifier used to uniquely identify a
  60. particular instance of a receiver. This will usually be a
  61. string, though it may be anything hashable.
  62. """
  63. def _handle_options(sender=None, weak=True, dispatch_uid=None):
  64. def _connect_signal(fun):
  65. receiver = fun
  66. if isinstance(sender, PromiseProxy):
  67. sender.__then__(
  68. self._connect_proxy, fun, sender, weak, dispatch_uid,
  69. )
  70. return fun
  71. if dispatch_uid:
  72. lookup_key = (dispatch_uid, _make_id(sender))
  73. else:
  74. lookup_key = (_make_id(receiver), _make_id(sender))
  75. if weak:
  76. receiver = saferef.safe_ref(
  77. receiver, on_delete=self._remove_receiver,
  78. )
  79. for r_key, _ in self.receivers:
  80. if r_key == lookup_key:
  81. break
  82. else:
  83. self.receivers.append((lookup_key, receiver))
  84. return fun
  85. return _connect_signal
  86. if args and callable(args[0]):
  87. return _handle_options(*args[1:], **kwargs)(args[0])
  88. return _handle_options(*args, **kwargs)
  89. def disconnect(self, receiver=None, sender=None, weak=True,
  90. dispatch_uid=None):
  91. """Disconnect receiver from sender for signal.
  92. If weak references are used, disconnect needn't be called.
  93. The receiver will be removed from dispatch automatically.
  94. Arguments:
  95. receiver (Callable): The registered receiver to disconnect.
  96. May be none if `dispatch_uid` is specified.
  97. sender (Any): The registered sender to disconnect.
  98. weak (bool): The weakref state to disconnect.
  99. dispatch_uid (Hashable): The unique identifier of the receiver
  100. to disconnect.
  101. """
  102. if dispatch_uid:
  103. lookup_key = (dispatch_uid, _make_id(sender))
  104. else:
  105. lookup_key = (_make_id(receiver), _make_id(sender))
  106. for index in range(len(self.receivers)):
  107. (r_key, _) = self.receivers[index]
  108. if r_key == lookup_key:
  109. del self.receivers[index]
  110. break
  111. def send(self, sender, **named):
  112. """Send signal from sender to all connected receivers.
  113. If any receiver raises an error, the error propagates back through
  114. send, terminating the dispatch loop, so it is quite possible to not
  115. have all receivers called if a raises an error.
  116. Arguments:
  117. sender (Any): The sender of the signal.
  118. Either a specific object or :const:`None`.
  119. **named (Any): Named arguments which will be passed to receivers.
  120. Returns:
  121. List: of tuple pairs: `[(receiver, response), … ]`.
  122. """
  123. responses = []
  124. if not self.receivers:
  125. return responses
  126. for receiver in self._live_receivers(_make_id(sender)):
  127. try:
  128. response = receiver(signal=self, sender=sender, **named)
  129. except Exception as exc: # pylint: disable=broad-except
  130. logger.exception(
  131. 'Signal handler %r raised: %r', receiver, exc)
  132. else:
  133. responses.append((receiver, response))
  134. return responses
  135. def _live_receivers(self, senderkey):
  136. """Filter sequence of receivers to get resolved, live receivers.
  137. This checks for weak references and resolves them, then returning only
  138. live receivers.
  139. """
  140. none_senderkey = _make_id(None)
  141. receivers = []
  142. for (_, r_senderkey), receiver in self.receivers:
  143. if r_senderkey == none_senderkey or r_senderkey == senderkey:
  144. if isinstance(receiver, WEAKREF_TYPES):
  145. # Dereference the weak reference.
  146. receiver = receiver()
  147. if receiver is not None:
  148. receivers.append(receiver)
  149. else:
  150. receivers.append(receiver)
  151. return receivers
  152. def _remove_receiver(self, receiver):
  153. """Remove dead receivers from connections."""
  154. to_remove = []
  155. for key, connected_receiver in self.receivers:
  156. if connected_receiver == receiver:
  157. to_remove.append(key)
  158. for key in to_remove:
  159. for idx, (r_key, _) in enumerate(self.receivers):
  160. if r_key == key:
  161. del self.receivers[idx]
  162. def __repr__(self):
  163. """``repr(signal)``."""
  164. return '<Signal: {0}>'.format(type(self).__name__)
  165. def __str__(self):
  166. """``str(signal)``."""
  167. return repr(self)