| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304 | 
							- # -*- coding: utf-8 -*-
 
- """
 
-     celery.worker.bootsteps
 
-     ~~~~~~~~~~~~~~~~~~~~~~~
 
-     The boot-steps!
 
- """
 
- from __future__ import absolute_import
 
- from collections import defaultdict
 
- from importlib import import_module
 
- from threading import Event
 
- from celery.datastructures import DependencyGraph
 
- from celery.utils.imports import instantiate
 
- from celery.utils.log import get_logger
 
- from celery.utils.threads import default_socket_timeout
 
- try:
 
-     from greenlet import GreenletExit
 
-     IGNORE_ERRORS = (GreenletExit, )
 
- except ImportError:  # pragma: no cover
 
-     IGNORE_ERRORS = ()
 
- #: Default socket timeout at shutdown.
 
- SHUTDOWN_SOCKET_TIMEOUT = 5.0
 
- #: States
 
- RUN = 0x1
 
- CLOSE = 0x2
 
- TERMINATE = 0x3
 
- logger = get_logger(__name__)
 
- def qualname(c):
 
-     return '.'.join([c.namespace.name, c.name.capitalize()])
 
- class Namespace(object):
 
-     """A namespace containing bootsteps.
 
-     Every step must belong to a namespace.
 
-     When step classes are created they are added to the
 
-     mapping of unclaimed steps.  The steps will be
 
-     claimed when the namespace they belong to is created.
 
-     :keyword name: Set the name of this namespace.
 
-     :keyword app: Set the Celery app for this namespace.
 
-     """
 
-     name = None
 
-     state = None
 
-     started = 0
 
-     _unclaimed = defaultdict(dict)
 
-     def __init__(self, name=None, app=None, on_start=None,
 
-             on_close=None, on_stopped=None):
 
-         self.app = app
 
-         self.name = name or self.name
 
-         self.on_start = on_start
 
-         self.on_close = on_close
 
-         self.on_stopped = on_stopped
 
-         self.services = []
 
-         self.shutdown_complete = Event()
 
-     def start(self, parent):
 
-         self.state = RUN
 
-         if self.on_start:
 
-             self.on_start()
 
-         for i, step in enumerate(parent.steps):
 
-             if step:
 
-                 logger.debug('Starting %s...', qualname(step))
 
-                 self.started = i + 1
 
-                 print('STARTING: %r' % (step.start, ))
 
-                 step.start(parent)
 
-                 logger.debug('%s OK!', qualname(step))
 
-     def close(self, parent):
 
-         if self.on_close:
 
-             self.on_close()
 
-         for step in parent.steps:
 
-             close = getattr(step, 'close', None)
 
-             if close:
 
-                 close(parent)
 
-     def restart(self, parent, description='Restarting', attr='stop'):
 
-         with default_socket_timeout(SHUTDOWN_SOCKET_TIMEOUT):  # Issue 975
 
-             for step in reversed(parent.steps):
 
-                 if step:
 
-                     logger.debug('%s %s...', description, qualname(step))
 
-                     fun = getattr(step, attr, None)
 
-                     if fun:
 
-                         fun(parent)
 
-     def stop(self, parent, close=True, terminate=False):
 
-         what = 'Terminating' if terminate else 'Stopping'
 
-         if self.state in (CLOSE, TERMINATE):
 
-             return
 
-         self.close(parent)
 
-         if self.state != RUN or self.started != len(parent.steps):
 
-             # Not fully started, can safely exit.
 
-             self.state = TERMINATE
 
-             self.shutdown_complete.set()
 
-             return
 
-         self.state = CLOSE
 
-         self.restart(parent, what, 'terminate' if terminate else 'stop')
 
-         if self.on_stopped:
 
-             self.on_stopped()
 
-         self.state = TERMINATE
 
-         self.shutdown_complete.set()
 
-     def join(self, timeout=None):
 
-         try:
 
-             # Will only get here if running green,
 
-             # makes sure all greenthreads have exited.
 
-             self.shutdown_complete.wait(timeout=timeout)
 
-         except IGNORE_ERRORS:
 
-             pass
 
-     def modules(self):
 
-         """Subclasses can override this to return a
 
-         list of modules to import before steps are claimed."""
 
-         return []
 
-     def load_modules(self):
 
-         """Will load the steps modules this namespace depends on."""
 
-         for m in self.modules():
 
-             self.import_module(m)
 
-     def apply(self, parent, **kwargs):
 
-         """Apply the steps in this namespace to an object.
 
-         This will apply the ``__init__`` and ``include`` methods
 
-         of each steps with the object as argument.
 
-         For :class:`StartStopStep` the services created
 
-         will also be added the the objects ``steps`` attribute.
 
-         """
 
-         self._debug('Loading modules.')
 
-         self.load_modules()
 
-         self._debug('Claiming steps.')
 
-         self.steps = self._claim()
 
-         self._debug('Building boot step graph.')
 
-         self.boot_steps = [self.bind_step(name, parent, **kwargs)
 
-                                 for name in self._finalize_boot_steps()]
 
-         self._debug('New boot order: {%s}',
 
-                 ', '.join(c.name for c in self.boot_steps))
 
-         for step in self.boot_steps:
 
-             step.include(parent)
 
-         return self
 
-     def bind_step(self, name, parent, **kwargs):
 
-         """Bind step to parent object and this namespace."""
 
-         comp = self[name](parent, **kwargs)
 
-         comp.namespace = self
 
-         return comp
 
-     def import_module(self, module):
 
-         return import_module(module)
 
-     def __getitem__(self, name):
 
-         return self.steps[name]
 
-     def _find_last(self):
 
-         for C in self.steps.itervalues():
 
-             if C.last:
 
-                 return C
 
-     def _finalize_boot_steps(self):
 
-         G = self.graph = DependencyGraph((C.name, C.requires)
 
-                             for C in self.steps.itervalues())
 
-         last = self._find_last()
 
-         if last:
 
-             for obj in G:
 
-                 if obj != last.name:
 
-                     G.add_edge(last.name, obj)
 
-         return G.topsort()
 
-     def _claim(self):
 
-         return self._unclaimed[self.name]
 
-     def _debug(self, msg, *args):
 
-         return logger.debug('[%s] ' + msg,
 
-                             *(self.name.capitalize(), ) + args)
 
- def _prepare_requires(req):
 
-     if not isinstance(req, basestring):
 
-         req = req.name
 
-     return req
 
- class StepType(type):
 
-     """Metaclass for steps."""
 
-     def __new__(cls, name, bases, attrs):
 
-         abstract = attrs.pop('abstract', False)
 
-         if not abstract:
 
-             try:
 
-                 cname = attrs['name']
 
-             except KeyError:
 
-                 raise NotImplementedError('Steps must be named')
 
-             namespace = attrs.get('namespace', None)
 
-             if not namespace:
 
-                 attrs['namespace'], _, attrs['name'] = cname.partition('.')
 
-         attrs['requires'] = tuple(_prepare_requires(req)
 
-                                     for req in attrs.get('requires', ()))
 
-         cls = super(StepType, cls).__new__(cls, name, bases, attrs)
 
-         if not abstract:
 
-             Namespace._unclaimed[cls.namespace][cls.name] = cls
 
-         return cls
 
- class Step(object):
 
-     """A Bootstep.
 
-     The :meth:`__init__` method is called when the step
 
-     is bound to a parent object, and can as such be used
 
-     to initialize attributes in the parent object at
 
-     parent instantiation-time.
 
-     """
 
-     __metaclass__ = StepType
 
-     #: The name of the step, or the namespace
 
-     #: and the name of the step separated by dot.
 
-     name = None
 
-     #: List of other steps that that must be started before this step.
 
-     #: Note that all dependencies must be in the same namespace.
 
-     requires = ()
 
-     #: can be used to specify the namespace,
 
-     #: if the name does not include it.
 
-     namespace = None
 
-     #: if set the step will not be registered,
 
-     #: but can be used as a base class.
 
-     abstract = True
 
-     #: Optional obj created by the :meth:`create` method.
 
-     #: This is used by :class:`StartStopStep` to keep the
 
-     #: original service object.
 
-     obj = None
 
-     #: This flag is reserved for the workers Consumer,
 
-     #: since it is required to always be started last.
 
-     #: There can only be one object marked with lsat
 
-     #: in every namespace.
 
-     last = False
 
-     #: This provides the default for :meth:`include_if`.
 
-     enabled = True
 
-     def __init__(self, parent, **kwargs):
 
-         pass
 
-     def create(self, parent):
 
-         """Create the step."""
 
-         pass
 
-     def include_if(self, parent):
 
-         """An optional predicate that decided whether this
 
-         step should be created."""
 
-         return self.enabled
 
-     def instantiate(self, qualname, *args, **kwargs):
 
-         return instantiate(qualname, *args, **kwargs)
 
-     def include(self, parent):
 
-         if self.include_if(parent):
 
-             self.obj = self.create(parent)
 
-             return True
 
- class StartStopStep(Step):
 
-     abstract = True
 
-     def start(self, parent):
 
-         if self.obj:
 
-             return self.obj.start()
 
-     def stop(self, parent):
 
-         if self.obj:
 
-             return self.obj.stop()
 
-     def close(self, parent):
 
-         pass
 
-     def terminate(self, parent):
 
-         self.stop(parent)
 
-     def include(self, parent):
 
-         if super(StartStopStep, self).include(parent):
 
-             parent.steps.append(self)
 
 
  |