signal.py 11 KB


  1. # -*- coding: utf-8 -*-
  2. """Implementation of the Observer pattern."""
  3. from __future__ import absolute_import, unicode_literals
  4. import sys
  5. import threading
  6. import weakref
  7. import warnings
  8. from celery.exceptions import CDeprecationWarning
  9. from celery.five import python_2_unicode_compatible, range, text_t
  10. from celery.local import PromiseProxy, Proxy
  11. from celery.utils.functional import fun_accepts_kwargs
  12. from celery.utils.log import get_logger
  13. try:
  14. from weakref import WeakMethod
  15. except ImportError:
  16. from .weakref_backports import WeakMethod # noqa
  17. __all__ = ['Signal']
  18. PY3 = sys.version_info[0] >= 3
  19. logger = get_logger(__name__)
  20. def _make_id(target): # pragma: no cover
  21. if isinstance(target, Proxy):
  22. target = target._get_current_object()
  23. if isinstance(target, (bytes, text_t)):
  24. # see Issue #2475
  25. return target
  26. if hasattr(target, '__func__'):
  27. return (id(target.__self__), id(target.__func__))
  28. return id(target)
  29. NONE_ID = _make_id(None)
  30. NO_RECEIVERS = object()
  31. @python_2_unicode_compatible
  32. class Signal(object): # pragma: no cover
  33. """Create new signal.
  34. Keyword Arguments:
  35. providing_args (List): A list of the arguments this signal can pass
  36. along in a :meth:`send` call.
  37. use_caching (bool): Enable receiver cache.
  38. name (str): Name of signal, used for debugging purposes.
  39. """
  40. #: Holds a dictionary of
  41. #: ``{receiverkey (id): weakref(receiver)}`` mappings.
  42. receivers = None
  43. def __init__(self, providing_args=None, use_caching=True, name=None):
  44. self.receivers = []
  45. self.providing_args = set(
  46. providing_args if providing_args is not None else [])
  47. self.lock = threading.Lock()
  48. self.use_caching = use_caching
  49. self.name = name
  50. # For convenience we create empty caches even if they are not used.
  51. # A note about caching: if use_caching is defined, then for each
  52. # distinct sender we cache the receivers that sender has in
  53. # 'sender_receivers_cache'. The cache is cleaned when .connect() or
  54. # .disconnect() is called and populated on .send().
  55. self.sender_receivers_cache = (
  56. weakref.WeakKeyDictionary() if use_caching else {}
  57. )
  58. self._dead_receivers = False
  59. def _connect_proxy(self, fun, sender, weak, dispatch_uid):
  60. return self.connect(
  61. fun, sender=sender._get_current_object(),
  62. weak=weak, dispatch_uid=dispatch_uid,
  63. )
  64. def connect(self, *args, **kwargs):
  65. """Connect receiver to sender for signal.
  66. Arguments:
  67. receiver (Callable): A function or an instance method which is to
  68. receive signals. Receivers must be hashable objects.
  69. if weak is :const:`True`, then receiver must be
  70. weak-referenceable.
  71. Receivers must be able to accept keyword arguments.
  72. If receivers have a `dispatch_uid` attribute, the receiver will
  73. not be added if another receiver already exists with that
  74. `dispatch_uid`.
  75. sender (Any): The sender to which the receiver should respond.
  76. Must either be a Python object, or :const:`None` to
  77. receive events from any sender.
  78. weak (bool): Whether to use weak references to the receiver.
  79. By default, the module will attempt to use weak references to
  80. the receiver objects. If this parameter is false, then strong
  81. references will be used.
  82. dispatch_uid (Hashable): An identifier used to uniquely identify a
  83. particular instance of a receiver. This will usually be a
  84. string, though it may be anything hashable.
  85. """
  86. def _handle_options(sender=None, weak=True, dispatch_uid=None):
  87. def _connect_signal(fun):
  88. self._connect_signal(fun, sender, weak, dispatch_uid)
  89. return fun
  90. return _connect_signal
  91. if args and callable(args[0]):
  92. return _handle_options(*args[1:], **kwargs)(args[0])
  93. return _handle_options(*args, **kwargs)
  94. def _connect_signal(self, receiver, sender, weak, dispatch_uid):
  95. assert callable(receiver), 'Signal receivers must be callable'
  96. if not fun_accepts_kwargs(receiver):
  97. raise ValueError(
  98. 'Signal receiver must accept keyword arguments.')
  99. if isinstance(sender, PromiseProxy):
  100. sender.__then__(
  101. self._connect_proxy, receiver, sender, weak, dispatch_uid,
  102. )
  103. return receiver
  104. if dispatch_uid:
  105. lookup_key = (dispatch_uid, _make_id(sender))
  106. else:
  107. lookup_key = (_make_id(receiver), _make_id(sender))
  108. if weak:
  109. ref = weakref.ref
  110. receiver_object = receiver
  111. # Check for bound methods
  112. try:
  113. receiver.__self__
  114. receiver.__func__
  115. except AttributeError:
  116. pass
  117. else:
  118. ref = WeakMethod
  119. receiver_object = receiver.__self__
  120. if PY3:
  121. receiver = ref(receiver)
  122. weakref.finalize(receiver_object, self._remove_receiver)
  123. else:
  124. receiver = ref(receiver, self._remove_receiver)
  125. with self.lock:
  126. self._clear_dead_receivers()
  127. for r_key, _ in self.receivers:
  128. if r_key == lookup_key:
  129. break
  130. else:
  131. self.receivers.append((lookup_key, receiver))
  132. self.sender_receivers_cache.clear()
  133. return receiver
  134. def disconnect(self, receiver=None, sender=None, weak=None,
  135. dispatch_uid=None):
  136. """Disconnect receiver from sender for signal.
  137. If weak references are used, disconnect needn't be called.
  138. The receiver will be removed from dispatch automatically.
  139. Arguments:
  140. receiver (Callable): The registered receiver to disconnect.
  141. May be none if `dispatch_uid` is specified.
  142. sender (Any): The registered sender to disconnect.
  143. weak (bool): The weakref state to disconnect.
  144. dispatch_uid (Hashable): The unique identifier of the receiver
  145. to disconnect.
  146. """
  147. if weak is not None:
  148. warnings.warn(
  149. 'Passing `weak` to disconnect has no effect.',
  150. CDeprecationWarning, stacklevel=2)
  151. if dispatch_uid:
  152. lookup_key = (dispatch_uid, _make_id(sender))
  153. else:
  154. lookup_key = (_make_id(receiver), _make_id(sender))
  155. disconnected = False
  156. with self.lock:
  157. self._clear_dead_receivers()
  158. for index in range(len(self.receivers)):
  159. (r_key, _) = self.receivers[index]
  160. if r_key == lookup_key:
  161. disconnected = True
  162. del self.receivers[index]
  163. break
  164. self.sender_receivers_cache.clear()
  165. return disconnected
  166. def has_listeners(self, sender=None):
  167. return bool(self._live_receivers(sender))
  168. def send(self, sender, **named):
  169. """Send signal from sender to all connected receivers.
  170. If any receiver raises an error, the error propagates back through
  171. send, terminating the dispatch loop, so it is quite possible to not
  172. have all receivers called if a raises an error.
  173. Arguments:
  174. sender (Any): The sender of the signal.
  175. Either a specific object or :const:`None`.
  176. **named (Any): Named arguments which will be passed to receivers.
  177. Returns:
  178. List: of tuple pairs: `[(receiver, response), … ]`.
  179. """
  180. responses = []
  181. if not self.receivers or \
  182. self.sender_receivers_cache.get(sender) is NO_RECEIVERS:
  183. return responses
  184. for receiver in self._live_receivers(sender):
  185. try:
  186. response = receiver(signal=self, sender=sender, **named)
  187. except Exception as exc: # pylint: disable=broad-except
  188. if not hasattr(exc, '__traceback__'):
  189. exc.__traceback__ = sys.exc_info()[2]
  190. logger.exception(
  191. 'Signal handler %r raised: %r', receiver, exc)
  192. responses.append((receiver, exc))
  193. else:
  194. responses.append((receiver, response))
  195. return responses
  196. send_robust = send # Compat with Django interface.
  197. def _clear_dead_receivers(self):
  198. # Warning: caller is assumed to hold self.lock
  199. if self._dead_receivers:
  200. self._dead_receivers = False
  201. new_receivers = []
  202. for r in self.receivers:
  203. if isinstance(r[1], weakref.ReferenceType) and r[1]() is None:
  204. continue
  205. new_receivers.append(r)
  206. self.receivers = new_receivers
  207. def _live_receivers(self, sender):
  208. """Filter sequence of receivers to get resolved, live receivers.
  209. This checks for weak references and resolves them, then returning only
  210. live receivers.
  211. """
  212. receivers = None
  213. if self.use_caching and not self._dead_receivers:
  214. receivers = self.sender_receivers_cache.get(sender)
  215. # We could end up here with NO_RECEIVERS even if we do check this
  216. # case in .send() prior to calling _Live_receivers() due to
  217. # concurrent .send() call.
  218. if receivers is NO_RECEIVERS:
  219. return []
  220. if receivers is None:
  221. with self.lock:
  222. self._clear_dead_receivers()
  223. senderkey = _make_id(sender)
  224. receivers = []
  225. for (receiverkey, r_senderkey), receiver in self.receivers:
  226. if r_senderkey == NONE_ID or r_senderkey == senderkey:
  227. receivers.append(receiver)
  228. if self.use_caching:
  229. if not receivers:
  230. self.sender_receivers_cache[sender] = NO_RECEIVERS
  231. else:
  232. # Note: we must cache the weakref versions.
  233. self.sender_receivers_cache[sender] = receivers
  234. non_weak_receivers = []
  235. for receiver in receivers:
  236. if isinstance(receiver, weakref.ReferenceType):
  237. # Dereference the weak reference.
  238. receiver = receiver()
  239. if receiver is not None:
  240. non_weak_receivers.append(receiver)
  241. else:
  242. non_weak_receivers.append(receiver)
  243. return non_weak_receivers
  244. def _remove_receiver(self, receiver=None):
  245. """Remove dead receivers from connections."""
  246. # Mark that the self..receivers first has dead weakrefs. If so,
  247. # we will clean those up in connect, disconnect and _live_receivers
  248. # while holding self.lock. Note that doing the cleanup here isn't a
  249. # good idea, _remove_receiver() will be called as a side effect of
  250. # garbage collection, and so the call can happen wh ile we are already
  251. # holding self.lock.
  252. self._dead_receivers = True
  253. def __repr__(self):
  254. """``repr(signal)``."""
  255. return '<{0}: {1} providing_args={2!r}>'.format(
  256. type(self).__name__, self.name, self.providing_args)
  257. def __str__(self):
  258. """``str(signal)``."""
  259. return repr(self)