threads.py 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.utils.threads
  4. ~~~~~~~~~~~~~~~~~~~~
  5. Threading utilities.
  6. """
  7. from __future__ import absolute_import
  8. import os
  9. import sys
  10. import threading
  11. import traceback
  12. from kombu.syn import detect_environment
  13. _Thread = threading.Thread
  14. _Event = threading._Event
  15. active_count = (getattr(threading, 'active_count', None) or
  16. threading.activeCount)
  17. USE_PURE_LOCALS = os.environ.get("USE_PURE_LOCALS")
  18. class Event(_Event):
  19. if not hasattr(_Event, 'is_set'): # pragma: no cover
  20. is_set = _Event.isSet
  21. class Thread(_Thread):
  22. if not hasattr(_Thread, 'is_alive'): # pragma: no cover
  23. is_alive = _Thread.isAlive
  24. if not hasattr(_Thread, 'daemon'): # pragma: no cover
  25. daemon = property(_Thread.isDaemon, _Thread.setDaemon)
  26. if not hasattr(_Thread, 'name'): # pragma: no cover
  27. name = property(_Thread.getName, _Thread.setName)
  28. class bgThread(Thread):
  29. def __init__(self, name=None, **kwargs):
  30. super(bgThread, self).__init__()
  31. self._is_shutdown = Event()
  32. self._is_stopped = Event()
  33. self.daemon = True
  34. self.name = name or self.__class__.__name__
  35. def body(self):
  36. raise NotImplementedError('subclass responsibility')
  37. def on_crash(self, msg, *fmt, **kwargs):
  38. sys.stderr.write((msg + '\n') % fmt)
  39. exc_info = sys.exc_info()
  40. try:
  41. traceback.print_exception(exc_info[0], exc_info[1], exc_info[2],
  42. None, sys.stderr)
  43. finally:
  44. del(exc_info)
  45. def run(self):
  46. body = self.body
  47. shutdown_set = self._is_shutdown.is_set
  48. try:
  49. while not shutdown_set():
  50. try:
  51. body()
  52. except Exception, exc:
  53. try:
  54. self.on_crash('%r crashed: %r', self.name, exc)
  55. self._set_stopped()
  56. finally:
  57. os._exit(1) # exiting by normal means won't work
  58. finally:
  59. self._set_stopped()
  60. def _set_stopped(self):
  61. try:
  62. self._is_stopped.set()
  63. except TypeError: # pragma: no cover
  64. # we lost the race at interpreter shutdown,
  65. # so gc collected built-in modules.
  66. pass
  67. def stop(self):
  68. """Graceful shutdown."""
  69. self._is_shutdown.set()
  70. self._is_stopped.wait()
  71. if self.is_alive():
  72. self.join(1e100)
  73. if detect_environment() == 'default' and not USE_PURE_LOCALS:
  74. class LocalStack(threading.local):
  75. def __init__(self):
  76. self.stack = []
  77. self.push = self.stack.append
  78. self.pop = self.stack.pop
  79. @property
  80. def top(self):
  81. try:
  82. return self.stack[-1]
  83. except (AttributeError, IndexError):
  84. return None
  85. else:
  86. # See #706
  87. from celery.local import LocalStack # noqa