threads.py 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336
  1. # -*- coding: utf-8 -*-
  2. """Threading primitives and utilities."""
  3. from __future__ import absolute_import, print_function, unicode_literals
  4. import os
  5. import socket
  6. import sys
  7. import threading
  8. import traceback
  9. from contextlib import contextmanager
  10. from celery.local import Proxy
  11. from celery.five import THREAD_TIMEOUT_MAX, items, python_2_unicode_compatible
  12. try:
  13. from greenlet import getcurrent as get_ident
  14. except ImportError: # pragma: no cover
  15. try:
  16. from _thread import get_ident # noqa
  17. except ImportError:
  18. try:
  19. from thread import get_ident # noqa
  20. except ImportError: # pragma: no cover
  21. try:
  22. from _dummy_thread import get_ident # noqa
  23. except ImportError:
  24. from dummy_thread import get_ident # noqa
  25. __all__ = [
  26. 'bgThread', 'Local', 'LocalStack', 'LocalManager',
  27. 'get_ident', 'default_socket_timeout',
  28. ]
  29. USE_FAST_LOCALS = os.environ.get('USE_FAST_LOCALS')
  30. PY3 = sys.version_info[0] == 3
  31. @contextmanager
  32. def default_socket_timeout(timeout):
  33. """Context temporarily setting the default socket timeout."""
  34. prev = socket.getdefaulttimeout()
  35. socket.setdefaulttimeout(timeout)
  36. yield
  37. socket.setdefaulttimeout(prev)
  38. class bgThread(threading.Thread):
  39. """Background service thread."""
  40. def __init__(self, name=None, **kwargs):
  41. super(bgThread, self).__init__()
  42. self._is_shutdown = threading.Event()
  43. self._is_stopped = threading.Event()
  44. self.daemon = True
  45. self.name = name or self.__class__.__name__
  46. def body(self):
  47. raise NotImplementedError()
  48. def on_crash(self, msg, *fmt, **kwargs):
  49. print(msg.format(*fmt), file=sys.stderr)
  50. traceback.print_exc(None, sys.stderr)
  51. def run(self):
  52. body = self.body
  53. shutdown_set = self._is_shutdown.is_set
  54. try:
  55. while not shutdown_set():
  56. try:
  57. body()
  58. except Exception as exc: # pylint: disable=broad-except
  59. try:
  60. self.on_crash('{0!r} crashed: {1!r}', self.name, exc)
  61. self._set_stopped()
  62. finally:
  63. os._exit(1) # exiting by normal means won't work
  64. finally:
  65. self._set_stopped()
  66. def _set_stopped(self):
  67. try:
  68. self._is_stopped.set()
  69. except TypeError: # pragma: no cover
  70. # we lost the race at interpreter shutdown,
  71. # so gc collected built-in modules.
  72. pass
  73. def stop(self):
  74. """Graceful shutdown."""
  75. self._is_shutdown.set()
  76. self._is_stopped.wait()
  77. if self.is_alive():
  78. self.join(THREAD_TIMEOUT_MAX)
  79. def release_local(local):
  80. """Release the contents of the local for the current context.
  81. This makes it possible to use locals without a manager.
  82. With this function one can release :class:`Local` objects as well as
  83. :class:`StackLocal` objects. However it's not possible to
  84. release data held by proxies that way, one always has to retain
  85. a reference to the underlying local object in order to be able
  86. to release it.
  87. Example:
  88. >>> loc = Local()
  89. >>> loc.foo = 42
  90. >>> release_local(loc)
  91. >>> hasattr(loc, 'foo')
  92. False
  93. """
  94. local.__release_local__()
  95. class Local(object):
  96. """Local object."""
  97. __slots__ = ('__storage__', '__ident_func__')
  98. def __init__(self):
  99. object.__setattr__(self, '__storage__', {})
  100. object.__setattr__(self, '__ident_func__', get_ident)
  101. def __iter__(self):
  102. return iter(items(self.__storage__))
  103. def __call__(self, proxy):
  104. """Create a proxy for a name."""
  105. return Proxy(self, proxy)
  106. def __release_local__(self):
  107. self.__storage__.pop(self.__ident_func__(), None)
  108. def __getattr__(self, name):
  109. try:
  110. return self.__storage__[self.__ident_func__()][name]
  111. except KeyError:
  112. raise AttributeError(name)
  113. def __setattr__(self, name, value):
  114. ident = self.__ident_func__()
  115. storage = self.__storage__
  116. try:
  117. storage[ident][name] = value
  118. except KeyError:
  119. storage[ident] = {name: value}
  120. def __delattr__(self, name):
  121. try:
  122. del self.__storage__[self.__ident_func__()][name]
  123. except KeyError:
  124. raise AttributeError(name)
  125. class _LocalStack(object):
  126. """Local stack.
  127. This class works similar to a :class:`Local` but keeps a stack
  128. of objects instead. This is best explained with an example::
  129. >>> ls = LocalStack()
  130. >>> ls.push(42)
  131. >>> ls.top
  132. 42
  133. >>> ls.push(23)
  134. >>> ls.top
  135. 23
  136. >>> ls.pop()
  137. 23
  138. >>> ls.top
  139. 42
  140. They can be force released by using a :class:`LocalManager` or with
  141. the :func:`release_local` function but the correct way is to pop the
  142. item from the stack after using. When the stack is empty it will
  143. no longer be bound to the current context (and as such released).
  144. By calling the stack without arguments it will return a proxy that
  145. resolves to the topmost item on the stack.
  146. """
  147. def __init__(self):
  148. self._local = Local()
  149. def __release_local__(self):
  150. self._local.__release_local__()
  151. def _get__ident_func__(self):
  152. return self._local.__ident_func__
  153. def _set__ident_func__(self, value):
  154. object.__setattr__(self._local, '__ident_func__', value)
  155. __ident_func__ = property(_get__ident_func__, _set__ident_func__)
  156. del _get__ident_func__, _set__ident_func__
  157. def __call__(self):
  158. def _lookup():
  159. rv = self.top
  160. if rv is None:
  161. raise RuntimeError('object unbound')
  162. return rv
  163. return Proxy(_lookup)
  164. def push(self, obj):
  165. """Push a new item to the stack."""
  166. rv = getattr(self._local, 'stack', None)
  167. if rv is None:
  168. # pylint: disable=assigning-non-slot
  169. # This attribute is defined now.
  170. self._local.stack = rv = []
  171. rv.append(obj)
  172. return rv
  173. def pop(self):
  174. """Remove the topmost item from the stack.
  175. Note:
  176. Will return the old value or `None` if the stack was already empty.
  177. """
  178. stack = getattr(self._local, 'stack', None)
  179. if stack is None:
  180. return None
  181. elif len(stack) == 1:
  182. release_local(self._local)
  183. return stack[-1]
  184. else:
  185. return stack.pop()
  186. def __len__(self):
  187. stack = getattr(self._local, 'stack', None)
  188. return len(stack) if stack else 0
  189. @property
  190. def stack(self):
  191. # get_current_worker_task uses this to find
  192. # the original task that was executed by the worker.
  193. stack = getattr(self._local, 'stack', None)
  194. if stack is not None:
  195. return stack
  196. return []
  197. @property
  198. def top(self):
  199. """The topmost item on the stack.
  200. Note:
  201. If the stack is empty, :const:`None` is returned.
  202. """
  203. try:
  204. return self._local.stack[-1]
  205. except (AttributeError, IndexError):
  206. return None
  207. @python_2_unicode_compatible
  208. class LocalManager(object):
  209. """Local objects cannot manage themselves.
  210. For that you need a local manager.
  211. You can pass a local manager multiple locals or add them
  212. later by appending them to ``manager.locals``. Every time the manager
  213. cleans up, it will clean up all the data left in the locals for this
  214. context.
  215. The ``ident_func`` parameter can be added to override the default ident
  216. function for the wrapped locals.
  217. """
  218. def __init__(self, locals=None, ident_func=None):
  219. if locals is None:
  220. self.locals = []
  221. elif isinstance(locals, Local):
  222. self.locals = [locals]
  223. else:
  224. self.locals = list(locals)
  225. if ident_func is not None:
  226. self.ident_func = ident_func
  227. for local in self.locals:
  228. object.__setattr__(local, '__ident_func__', ident_func)
  229. else:
  230. self.ident_func = get_ident
  231. def get_ident(self):
  232. """Return context identifier.
  233. This is the indentifer the local objects use internally
  234. for this context. You cannot override this method to change the
  235. behavior but use it to link other context local objects (such as
  236. SQLAlchemy's scoped sessions) to the Werkzeug locals.
  237. """
  238. return self.ident_func()
  239. def cleanup(self):
  240. """Manually clean up the data in the locals for this context.
  241. Call this at the end of the request or use ``make_middleware()``.
  242. """
  243. for local in self.locals:
  244. release_local(local)
  245. def __repr__(self):
  246. return '<{0} storages: {1}>'.format(
  247. self.__class__.__name__, len(self.locals))
  248. class _FastLocalStack(threading.local):
  249. def __init__(self):
  250. self.stack = []
  251. self.push = self.stack.append
  252. self.pop = self.stack.pop
  253. super(_FastLocalStack, self).__init__()
  254. @property
  255. def top(self):
  256. try:
  257. return self.stack[-1]
  258. except (AttributeError, IndexError):
  259. return None
  260. def __len__(self):
  261. return len(self.stack)
  262. if USE_FAST_LOCALS: # pragma: no cover
  263. LocalStack = _FastLocalStack
  264. else:
  265. # - See #706
  266. # since each thread has its own greenlet we can just use those as
  267. # identifiers for the context. If greenlets aren't available we
  268. # fall back to the current thread ident.
  269. LocalStack = _LocalStack # noqa