123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112 |
- # -*- coding: utf-8 -*-
- """
- celery.utils.threads
- ~~~~~~~~~~~~~~~~~~~~
- Threading utilities.
- """
- from __future__ import absolute_import
- import os
- import sys
- import threading
- import traceback
- from kombu.syn import detect_environment
- _Thread = threading.Thread
- _Event = threading._Event
- active_count = (getattr(threading, 'active_count', None) or
- threading.activeCount)
- USE_PURE_LOCALS = os.environ.get("USE_PURE_LOCALS")
- class Event(_Event):
- if not hasattr(_Event, 'is_set'): # pragma: no cover
- is_set = _Event.isSet
- class Thread(_Thread):
- if not hasattr(_Thread, 'is_alive'): # pragma: no cover
- is_alive = _Thread.isAlive
- if not hasattr(_Thread, 'daemon'): # pragma: no cover
- daemon = property(_Thread.isDaemon, _Thread.setDaemon)
- if not hasattr(_Thread, 'name'): # pragma: no cover
- name = property(_Thread.getName, _Thread.setName)
- class bgThread(Thread):
- def __init__(self, name=None, **kwargs):
- super(bgThread, self).__init__()
- self._is_shutdown = Event()
- self._is_stopped = Event()
- self.daemon = True
- self.name = name or self.__class__.__name__
- def body(self):
- raise NotImplementedError('subclass responsibility')
- def on_crash(self, msg, *fmt, **kwargs):
- sys.stderr.write((msg + '\n') % fmt)
- exc_info = sys.exc_info()
- try:
- traceback.print_exception(exc_info[0], exc_info[1], exc_info[2],
- None, sys.stderr)
- finally:
- del(exc_info)
- def run(self):
- body = self.body
- shutdown_set = self._is_shutdown.is_set
- try:
- while not shutdown_set():
- try:
- body()
- except Exception, exc:
- try:
- self.on_crash('%r crashed: %r', self.name, exc)
- self._set_stopped()
- finally:
- os._exit(1) # exiting by normal means won't work
- finally:
- self._set_stopped()
- def _set_stopped(self):
- try:
- self._is_stopped.set()
- except TypeError: # pragma: no cover
- # we lost the race at interpreter shutdown,
- # so gc collected built-in modules.
- pass
- def stop(self):
- """Graceful shutdown."""
- self._is_shutdown.set()
- self._is_stopped.wait()
- if self.is_alive():
- self.join(1e100)
- if detect_environment() == 'default' and not USE_PURE_LOCALS:
- class LocalStack(threading.local):
- def __init__(self):
- self.stack = []
- self.push = self.stack.append
- self.pop = self.stack.pop
- @property
- def top(self):
- try:
- return self.stack[-1]
- except (AttributeError, IndexError):
- return None
- else:
- # See #706
- from celery.local import LocalStack # noqa
|