threads.py 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.utils.threads
  4. ~~~~~~~~~~~~~~~~~~~~
  5. Threading utilities.
  6. """
  7. from __future__ import absolute_import, print_function, unicode_literals
  8. import os
  9. import socket
  10. import sys
  11. import threading
  12. import traceback
  13. from contextlib import contextmanager
  14. from celery.local import Proxy
  15. from celery.five import THREAD_TIMEOUT_MAX, items, python_2_unicode_compatible
  16. __all__ = [
  17. 'bgThread', 'Local', 'LocalStack', 'LocalManager',
  18. 'get_ident', 'default_socket_timeout',
  19. ]
  20. USE_FAST_LOCALS = os.environ.get('USE_FAST_LOCALS')
  21. PY3 = sys.version_info[0] == 3
  22. @contextmanager
  23. def default_socket_timeout(timeout):
  24. prev = socket.getdefaulttimeout()
  25. socket.setdefaulttimeout(timeout)
  26. yield
  27. socket.setdefaulttimeout(prev)
  28. class bgThread(threading.Thread):
  29. def __init__(self, name=None, **kwargs):
  30. super(bgThread, self).__init__()
  31. self._is_shutdown = threading.Event()
  32. self._is_stopped = threading.Event()
  33. self.daemon = True
  34. self.name = name or self.__class__.__name__
  35. def body(self):
  36. raise NotImplementedError('subclass responsibility')
  37. def on_crash(self, msg, *fmt, **kwargs):
  38. print(msg.format(*fmt), file=sys.stderr)
  39. exc_info = sys.exc_info()
  40. try:
  41. traceback.print_exception(exc_info[0], exc_info[1], exc_info[2],
  42. None, sys.stderr)
  43. finally:
  44. del(exc_info)
  45. def run(self):
  46. body = self.body
  47. shutdown_set = self._is_shutdown.is_set
  48. try:
  49. while not shutdown_set():
  50. try:
  51. body()
  52. except Exception as exc:
  53. try:
  54. self.on_crash('{0!r} crashed: {1!r}', self.name, exc)
  55. self._set_stopped()
  56. finally:
  57. os._exit(1) # exiting by normal means won't work
  58. finally:
  59. self._set_stopped()
  60. def _set_stopped(self):
  61. try:
  62. self._is_stopped.set()
  63. except TypeError: # pragma: no cover
  64. # we lost the race at interpreter shutdown,
  65. # so gc collected built-in modules.
  66. pass
  67. def stop(self):
  68. """Graceful shutdown."""
  69. self._is_shutdown.set()
  70. self._is_stopped.wait()
  71. if self.is_alive():
  72. self.join(THREAD_TIMEOUT_MAX)
  73. try:
  74. from greenlet import getcurrent as get_ident
  75. except ImportError: # pragma: no cover
  76. try:
  77. from _thread import get_ident # noqa
  78. except ImportError:
  79. try:
  80. from thread import get_ident # noqa
  81. except ImportError: # pragma: no cover
  82. try:
  83. from _dummy_thread import get_ident # noqa
  84. except ImportError:
  85. from dummy_thread import get_ident # noqa
  86. def release_local(local):
  87. """Releases the contents of the local for the current context.
  88. This makes it possible to use locals without a manager.
  89. Example::
  90. >>> loc = Local()
  91. >>> loc.foo = 42
  92. >>> release_local(loc)
  93. >>> hasattr(loc, 'foo')
  94. False
  95. With this function one can release :class:`Local` objects as well
  96. as :class:`StackLocal` objects. However it is not possible to
  97. release data held by proxies that way, one always has to retain
  98. a reference to the underlying local object in order to be able
  99. to release it.
  100. .. versionadded:: 0.6.1
  101. """
  102. local.__release_local__()
  103. class Local(object):
  104. __slots__ = ('__storage__', '__ident_func__')
  105. def __init__(self):
  106. object.__setattr__(self, '__storage__', {})
  107. object.__setattr__(self, '__ident_func__', get_ident)
  108. def __iter__(self):
  109. return iter(items(self.__storage__))
  110. def __call__(self, proxy):
  111. """Create a proxy for a name."""
  112. return Proxy(self, proxy)
  113. def __release_local__(self):
  114. self.__storage__.pop(self.__ident_func__(), None)
  115. def __getattr__(self, name):
  116. try:
  117. return self.__storage__[self.__ident_func__()][name]
  118. except KeyError:
  119. raise AttributeError(name)
  120. def __setattr__(self, name, value):
  121. ident = self.__ident_func__()
  122. storage = self.__storage__
  123. try:
  124. storage[ident][name] = value
  125. except KeyError:
  126. storage[ident] = {name: value}
  127. def __delattr__(self, name):
  128. try:
  129. del self.__storage__[self.__ident_func__()][name]
  130. except KeyError:
  131. raise AttributeError(name)
  132. class _LocalStack(object):
  133. """This class works similar to a :class:`Local` but keeps a stack
  134. of objects instead. This is best explained with an example::
  135. >>> ls = LocalStack()
  136. >>> ls.push(42)
  137. >>> ls.top
  138. 42
  139. >>> ls.push(23)
  140. >>> ls.top
  141. 23
  142. >>> ls.pop()
  143. 23
  144. >>> ls.top
  145. 42
  146. They can be force released by using a :class:`LocalManager` or with
  147. the :func:`release_local` function but the correct way is to pop the
  148. item from the stack after using. When the stack is empty it will
  149. no longer be bound to the current context (and as such released).
  150. By calling the stack without arguments it will return a proxy that
  151. resolves to the topmost item on the stack.
  152. """
  153. def __init__(self):
  154. self._local = Local()
  155. def __release_local__(self):
  156. self._local.__release_local__()
  157. def _get__ident_func__(self):
  158. return self._local.__ident_func__
  159. def _set__ident_func__(self, value):
  160. object.__setattr__(self._local, '__ident_func__', value)
  161. __ident_func__ = property(_get__ident_func__, _set__ident_func__)
  162. del _get__ident_func__, _set__ident_func__
  163. def __call__(self):
  164. def _lookup():
  165. rv = self.top
  166. if rv is None:
  167. raise RuntimeError('object unbound')
  168. return rv
  169. return Proxy(_lookup)
  170. def push(self, obj):
  171. """Pushes a new item to the stack"""
  172. rv = getattr(self._local, 'stack', None)
  173. if rv is None:
  174. self._local.stack = rv = []
  175. rv.append(obj)
  176. return rv
  177. def pop(self):
  178. """Remove the topmost item from the stack, will return the
  179. old value or `None` if the stack was already empty.
  180. """
  181. stack = getattr(self._local, 'stack', None)
  182. if stack is None:
  183. return None
  184. elif len(stack) == 1:
  185. release_local(self._local)
  186. return stack[-1]
  187. else:
  188. return stack.pop()
  189. def __len__(self):
  190. stack = getattr(self._local, 'stack', None)
  191. return len(stack) if stack else 0
  192. @property
  193. def stack(self):
  194. """get_current_worker_task uses this to find
  195. the original task that was executed by the worker."""
  196. stack = getattr(self._local, 'stack', None)
  197. if stack is not None:
  198. return stack
  199. return []
  200. @property
  201. def top(self):
  202. """The topmost item on the stack. If the stack is empty,
  203. `None` is returned.
  204. """
  205. try:
  206. return self._local.stack[-1]
  207. except (AttributeError, IndexError):
  208. return None
  209. @python_2_unicode_compatible
  210. class LocalManager(object):
  211. """Local objects cannot manage themselves. For that you need a local
  212. manager. You can pass a local manager multiple locals or add them
  213. later by appending them to ``manager.locals``. Every time the manager
  214. cleans up, it will clean up all the data left in the locals for this
  215. context.
  216. The ``ident_func`` parameter can be added to override the default ident
  217. function for the wrapped locals.
  218. """
  219. def __init__(self, locals=None, ident_func=None):
  220. if locals is None:
  221. self.locals = []
  222. elif isinstance(locals, Local):
  223. self.locals = [locals]
  224. else:
  225. self.locals = list(locals)
  226. if ident_func is not None:
  227. self.ident_func = ident_func
  228. for local in self.locals:
  229. object.__setattr__(local, '__ident_func__', ident_func)
  230. else:
  231. self.ident_func = get_ident
  232. def get_ident(self):
  233. """Return the context identifier the local objects use internally
  234. for this context. You cannot override this method to change the
  235. behavior but use it to link other context local objects (such as
  236. SQLAlchemy's scoped sessions) to the Werkzeug locals."""
  237. return self.ident_func()
  238. def cleanup(self):
  239. """Manually clean up the data in the locals for this context.
  240. Call this at the end of the request or use ``make_middleware()``.
  241. """
  242. for local in self.locals:
  243. release_local(local)
  244. def __repr__(self):
  245. return '<{0} storages: {1}>'.format(
  246. self.__class__.__name__, len(self.locals))
  247. class _FastLocalStack(threading.local):
  248. def __init__(self):
  249. self.stack = []
  250. self.push = self.stack.append
  251. self.pop = self.stack.pop
  252. @property
  253. def top(self):
  254. try:
  255. return self.stack[-1]
  256. except (AttributeError, IndexError):
  257. return None
  258. def __len__(self):
  259. return len(self.stack)
  260. if USE_FAST_LOCALS: # pragma: no cover
  261. LocalStack = _FastLocalStack
  262. else:
  263. # - See #706
  264. # since each thread has its own greenlet we can just use those as
  265. # identifiers for the context. If greenlets are not available we
  266. # fall back to the current thread ident.
  267. LocalStack = _LocalStack # noqa