threads.py 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.utils.threads
  4. ~~~~~~~~~~~~~~~~~~~~
  5. Threading utilities.
  6. """
  7. from __future__ import absolute_import, print_function
  8. import os
  9. import socket
  10. import sys
  11. import threading
  12. import traceback
  13. from contextlib import contextmanager
  14. from celery.local import Proxy
  15. from celery.utils.compat import THREAD_TIMEOUT_MAX
  16. USE_FAST_LOCALS = os.environ.get('USE_FAST_LOCALS')
  17. class bgThread(threading.Thread):
  18. def __init__(self, name=None, **kwargs):
  19. super(bgThread, self).__init__()
  20. self._is_shutdown = threading.Event()
  21. self._is_stopped = threading.Event()
  22. self.daemon = True
  23. self.name = name or self.__class__.__name__
  24. def body(self):
  25. raise NotImplementedError('subclass responsibility')
  26. def on_crash(self, msg, *fmt, **kwargs):
  27. print(msg.format(*fmt), file=sys.stderr)
  28. exc_info = sys.exc_info()
  29. try:
  30. traceback.print_exception(exc_info[0], exc_info[1], exc_info[2],
  31. None, sys.stderr)
  32. finally:
  33. del(exc_info)
  34. def run(self):
  35. body = self.body
  36. shutdown_set = self._is_shutdown.is_set
  37. try:
  38. while not shutdown_set():
  39. try:
  40. body()
  41. except Exception as exc:
  42. try:
  43. self.on_crash('{0!r} crashed: {1!r}', self.name, exc)
  44. self._set_stopped()
  45. finally:
  46. os._exit(1) # exiting by normal means won't work
  47. finally:
  48. self._set_stopped()
  49. def _set_stopped(self):
  50. try:
  51. self._is_stopped.set()
  52. except TypeError: # pragma: no cover
  53. # we lost the race at interpreter shutdown,
  54. # so gc collected built-in modules.
  55. pass
  56. def stop(self):
  57. """Graceful shutdown."""
  58. self._is_shutdown.set()
  59. self._is_stopped.wait()
  60. if self.is_alive():
  61. self.join(THREAD_TIMEOUT_MAX)
  62. try:
  63. from greenlet import getcurrent as get_ident
  64. except ImportError: # pragma: no cover
  65. try:
  66. from thread import get_ident # noqa
  67. except ImportError: # pragma: no cover
  68. try:
  69. from dummy_thread import get_ident # noqa
  70. except ImportError: # pragma: no cover
  71. from _thread import get_ident # noqa
  72. def release_local(local):
  73. """Releases the contents of the local for the current context.
  74. This makes it possible to use locals without a manager.
  75. Example::
  76. >>> loc = Local()
  77. >>> loc.foo = 42
  78. >>> release_local(loc)
  79. >>> hasattr(loc, 'foo')
  80. False
  81. With this function one can release :class:`Local` objects as well
  82. as :class:`StackLocal` objects. However it is not possible to
  83. release data held by proxies that way, one always has to retain
  84. a reference to the underlying local object in order to be able
  85. to release it.
  86. .. versionadded:: 0.6.1
  87. """
  88. local.__release_local__()
  89. class Local(object):
  90. __slots__ = ('__storage__', '__ident_func__')
  91. def __init__(self):
  92. object.__setattr__(self, '__storage__', {})
  93. object.__setattr__(self, '__ident_func__', get_ident)
  94. def __iter__(self):
  95. return iter(self.__storage__.items())
  96. def __call__(self, proxy):
  97. """Create a proxy for a name."""
  98. return Proxy(self, proxy)
  99. def __release_local__(self):
  100. self.__storage__.pop(self.__ident_func__(), None)
  101. def __getattr__(self, name):
  102. try:
  103. return self.__storage__[self.__ident_func__()][name]
  104. except KeyError:
  105. raise AttributeError(name)
  106. def __setattr__(self, name, value):
  107. ident = self.__ident_func__()
  108. storage = self.__storage__
  109. try:
  110. storage[ident][name] = value
  111. except KeyError:
  112. storage[ident] = {name: value}
  113. def __delattr__(self, name):
  114. try:
  115. del self.__storage__[self.__ident_func__()][name]
  116. except KeyError:
  117. raise AttributeError(name)
  118. class _LocalStack(object):
  119. """This class works similar to a :class:`Local` but keeps a stack
  120. of objects instead. This is best explained with an example::
  121. >>> ls = LocalStack()
  122. >>> ls.push(42)
  123. >>> ls.top
  124. 42
  125. >>> ls.push(23)
  126. >>> ls.top
  127. 23
  128. >>> ls.pop()
  129. 23
  130. >>> ls.top
  131. 42
  132. They can be force released by using a :class:`LocalManager` or with
  133. the :func:`release_local` function but the correct way is to pop the
  134. item from the stack after using. When the stack is empty it will
  135. no longer be bound to the current context (and as such released).
  136. By calling the stack without arguments it returns a proxy that
  137. resolves to the topmost item on the stack.
  138. """
  139. def __init__(self):
  140. self._local = Local()
  141. def __release_local__(self):
  142. self._local.__release_local__()
  143. def _get__ident_func__(self):
  144. return self._local.__ident_func__
  145. def _set__ident_func__(self, value):
  146. object.__setattr__(self._local, '__ident_func__', value)
  147. __ident_func__ = property(_get__ident_func__, _set__ident_func__)
  148. del _get__ident_func__, _set__ident_func__
  149. def __call__(self):
  150. def _lookup():
  151. rv = self.top
  152. if rv is None:
  153. raise RuntimeError('object unbound')
  154. return rv
  155. return Proxy(_lookup)
  156. def push(self, obj):
  157. """Pushes a new item to the stack"""
  158. rv = getattr(self._local, 'stack', None)
  159. if rv is None:
  160. self._local.stack = rv = []
  161. rv.append(obj)
  162. return rv
  163. def pop(self):
  164. """Removes the topmost item from the stack, will return the
  165. old value or `None` if the stack was already empty.
  166. """
  167. stack = getattr(self._local, 'stack', None)
  168. if stack is None:
  169. return None
  170. elif len(stack) == 1:
  171. release_local(self._local)
  172. return stack[-1]
  173. else:
  174. return stack.pop()
  175. def __len__(self):
  176. stack = getattr(self._local, 'stack', None)
  177. return len(stack) if stack else 0
  178. @property
  179. def stack(self):
  180. """get_current_worker_task uses this to find
  181. the original task that was executed by the worker."""
  182. stack = getattr(self._local, 'stack', None)
  183. if stack is not None:
  184. return stack
  185. return []
  186. @property
  187. def top(self):
  188. """The topmost item on the stack. If the stack is empty,
  189. `None` is returned.
  190. """
  191. try:
  192. return self._local.stack[-1]
  193. except (AttributeError, IndexError):
  194. return None
  195. class LocalManager(object):
  196. """Local objects cannot manage themselves. For that you need a local
  197. manager. You can pass a local manager multiple locals or add them
  198. later by appending them to `manager.locals`. Everytime the manager
  199. cleans up it, will clean up all the data left in the locals for this
  200. context.
  201. The `ident_func` parameter can be added to override the default ident
  202. function for the wrapped locals.
  203. """
  204. def __init__(self, locals=None, ident_func=None):
  205. if locals is None:
  206. self.locals = []
  207. elif isinstance(locals, Local):
  208. self.locals = [locals]
  209. else:
  210. self.locals = list(locals)
  211. if ident_func is not None:
  212. self.ident_func = ident_func
  213. for local in self.locals:
  214. object.__setattr__(local, '__ident_func__', ident_func)
  215. else:
  216. self.ident_func = get_ident
  217. def get_ident(self):
  218. """Return the context identifier the local objects use internally
  219. for this context. You cannot override this method to change the
  220. behavior but use it to link other context local objects (such as
  221. SQLAlchemy's scoped sessions) to the Werkzeug locals."""
  222. return self.ident_func()
  223. def cleanup(self):
  224. """Manually clean up the data in the locals for this context.
  225. Call this at the end of the request or use `make_middleware()`.
  226. """
  227. for local in self.locals:
  228. release_local(local)
  229. def __repr__(self):
  230. return '<{0} storages: {1}>'.format(
  231. self.__class__.__name__, len(self.locals))
  232. @contextmanager
  233. def default_socket_timeout(timeout):
  234. prev = socket.getdefaulttimeout()
  235. socket.setdefaulttimeout(timeout)
  236. yield
  237. socket.setdefaulttimeout(prev)
  238. class _FastLocalStack(threading.local):
  239. def __init__(self):
  240. self.stack = []
  241. self.push = self.stack.append
  242. self.pop = self.stack.pop
  243. @property
  244. def top(self):
  245. try:
  246. return self.stack[-1]
  247. except (AttributeError, IndexError):
  248. return None
  249. def __len__(self):
  250. return len(self.stack)
  251. if USE_FAST_LOCALS:
  252. LocalStack = _FastLocalStack
  253. else:
  254. # - See #706
  255. # since each thread has its own greenlet we can just use those as
  256. # identifiers for the context. If greenlets are not available we
  257. # fall back to the current thread ident.
  258. LocalStack = _LocalStack # noqa