| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424 | # -*- coding: utf-8 -*-"""    celery.bootsteps    ~~~~~~~~~~~~~~~~    A directed acyclic graph of reusable components."""from __future__ import absolute_import, unicode_literalsfrom collections import dequefrom threading import Eventfrom kombu.common import ignore_errorsfrom kombu.utils import symbol_by_namefrom kombu.utils.encoding import bytes_to_strfrom .datastructures import DependencyGraph, GraphFormatterfrom .five import values, with_metaclassfrom .utils.imports import instantiate, qualnamefrom .utils.log import get_loggertry:    from greenlet import GreenletExit    IGNORE_ERRORS = (GreenletExit,)except ImportError:  # pragma: no cover    IGNORE_ERRORS = ()__all__ = ['Blueprint', 'Step', 'StartStopStep', 'ConsumerStep']#: StatesRUN = 0x1CLOSE = 0x2TERMINATE = 0x3logger = get_logger(__name__)debug = logger.debugdef _pre(ns, fmt):    return '| {0}: {1}'.format(ns.alias, fmt)def _label(s):    return s.name.rsplit('.', 1)[-1]class StepFormatter(GraphFormatter):    """Graph formatter for :class:`Blueprint`."""    blueprint_prefix = '⧉'    conditional_prefix = '∘'    blueprint_scheme = {        'shape': 'parallelogram',        'color': 'slategray4',        'fillcolor': 'slategray3',    }    def label(self, step):        return step and '{0}{1}'.format(            self._get_prefix(step),            bytes_to_str(                (step.label or _label(step)).encode('utf-8', 'ignore')),        )    def _get_prefix(self, step):        if step.last:            return self.blueprint_prefix        if step.conditional:            return self.conditional_prefix        return ''    def node(self, obj, **attrs):        scheme = self.blueprint_scheme if obj.last else self.node_scheme        return self.draw_node(obj, scheme, attrs)    def edge(self, a, b, **attrs):        if a.last:            attrs.update(arrowhead='none', color='darkseagreen3')        return self.draw_edge(a, b, self.edge_scheme, attrs)class Blueprint(object):    """Blueprint containing bootsteps that can be applied to objects.    :keyword steps: List of steps.    :keyword name: Set explicit name for this blueprint.    :keyword app: Set the Celery app for this blueprint.    :keyword on_start: Optional callback applied after blueprint start.    :keyword on_close: Optional callback applied before blueprint close.    :keyword on_stopped: Optional callback applied after blueprint stopped.    """    GraphFormatter = StepFormatter    name = None    state = None    started = 0    default_steps = set()    state_to_name = {        0: 'initializing',        RUN: 'running',        CLOSE: 'closing',        TERMINATE: 'terminating',    }    def __init__(self, steps=None, name=None, app=None,                 on_start=None, on_close=None, on_stopped=None):        self.app = app        self.name = name or self.name or qualname(type(self))        self.types = set(steps or []) | set(self.default_steps)        self.on_start = on_start        self.on_close = on_close        self.on_stopped = on_stopped        self.shutdown_complete = Event()        self.steps = {}    def start(self, parent):        self.state = RUN        if self.on_start:            self.on_start()        for i, step in enumerate(s for s in parent.steps if s is not None):            self._debug('Starting %s', step.alias)            self.started = i + 1            step.start(parent)            debug('^-- substep ok')    def human_state(self):        return self.state_to_name[self.state or 0]    def info(self, parent):        info = {}        for step in parent.steps:            info.update(step.info(parent) or {})        return info    def close(self, parent):        if self.on_close:            self.on_close()        self.send_all(parent, 'close', 'closing', reverse=False)    def restart(self, parent, method='stop',                description='restarting', propagate=False):        self.send_all(parent, method, description, propagate=propagate)    def send_all(self, parent, method,                 description=None, reverse=True, propagate=True, args=()):        description = description or method.replace('_', ' ')        steps = reversed(parent.steps) if reverse else parent.steps        for step in steps:            if step:                fun = getattr(step, method, None)                if fun is not None:                    self._debug('%s %s...',                                description.capitalize(), step.alias)                    try:                        fun(parent, *args)                    except Exception as exc:                        if propagate:                            raise                        logger.error(                            'Error on %s %s: %r',                            description, step.alias, exc, exc_info=1,                        )    def stop(self, parent, close=True, terminate=False):        what = 'terminating' if terminate else 'stopping'        if self.state in (CLOSE, TERMINATE):            return        if self.state != RUN or self.started != len(parent.steps):            # Not fully started, can safely exit.            self.state = TERMINATE            self.shutdown_complete.set()            return        self.close(parent)        self.state = CLOSE        self.restart(            parent, 'terminate' if terminate else 'stop',            description=what, propagate=False,        )        if self.on_stopped:            self.on_stopped()        self.state = TERMINATE        self.shutdown_complete.set()    def join(self, timeout=None):        try:            # Will only get here if running green,            # makes sure all greenthreads have exited.            self.shutdown_complete.wait(timeout=timeout)        except IGNORE_ERRORS:            pass    def apply(self, parent, **kwargs):        """Apply the steps in this blueprint to an object.        This will apply the ``__init__`` and ``include`` methods        of each step, with the object as argument::            step = Step(obj)            ...            step.include(obj)        For :class:`StartStopStep` the services created        will also be added to the objects ``steps`` attribute.        """        self._debug('Preparing bootsteps.')        order = self.order = []        steps = self.steps = self.claim_steps()        self._debug('Building graph...')        for S in self._finalize_steps(steps):            step = S(parent, **kwargs)            steps[step.name] = step            order.append(step)        self._debug('New boot order: {%s}',                    ', '.join(s.alias for s in self.order))        for step in order:            step.include(parent)        return self    def connect_with(self, other):        self.graph.adjacent.update(other.graph.adjacent)        self.graph.add_edge(type(other.order[0]), type(self.order[-1]))    def __getitem__(self, name):        return self.steps[name]    def _find_last(self):        return next((C for C in values(self.steps) if C.last), None)    def _firstpass(self, steps):        for step in values(steps):            step.requires = [symbol_by_name(dep) for dep in step.requires]        stream = deque(step.requires for step in values(steps))        while stream:            for node in stream.popleft():                node = symbol_by_name(node)                if node.name not in self.steps:                    steps[node.name] = node                stream.append(node.requires)    def _finalize_steps(self, steps):        last = self._find_last()        self._firstpass(steps)        it = ((C, C.requires) for C in values(steps))        G = self.graph = DependencyGraph(            it, formatter=self.GraphFormatter(root=last),        )        if last:            for obj in G:                if obj != last:                    G.add_edge(last, obj)        try:            return G.topsort()        except KeyError as exc:            raise KeyError('unknown bootstep: %s' % exc)    def claim_steps(self):        return dict(self.load_step(step) for step in self._all_steps())    def _all_steps(self):        return self.types | self.app.steps[self.name.lower()]    def load_step(self, step):        step = symbol_by_name(step)        return step.name, step    def _debug(self, msg, *args):        return debug(_pre(self, msg), *args)    @property    def alias(self):        return _label(self)class StepType(type):    """Metaclass for steps."""    def __new__(cls, name, bases, attrs):        module = attrs.get('__module__')        qname = '{0}.{1}'.format(module, name) if module else name        attrs.update(            __qualname__=qname,            name=attrs.get('name') or qname,        )        return super(StepType, cls).__new__(cls, name, bases, attrs)    def __str__(self):        return self.name    def __repr__(self):        return 'step:{0.name}{{{0.requires!r}}}'.format(self)@with_metaclass(StepType)class Step(object):    """A Bootstep.    The :meth:`__init__` method is called when the step    is bound to a parent object, and can as such be used    to initialize attributes in the parent object at    parent instantiation-time.    """    #: Optional step name, will use qualname if not specified.    name = None    #: Optional short name used for graph outputs and in logs.    label = None    #: Set this to true if the step is enabled based on some condition.    conditional = False    #: List of other steps that that must be started before this step.    #: Note that all dependencies must be in the same blueprint.    requires = ()    #: This flag is reserved for the workers Consumer,    #: since it is required to always be started last.    #: There can only be one object marked last    #: in every blueprint.    last = False    #: This provides the default for :meth:`include_if`.    enabled = True    def __init__(self, parent, **kwargs):        pass    def include_if(self, parent):        """An optional predicate that decides whether this        step should be created."""        return self.enabled    def instantiate(self, name, *args, **kwargs):        return instantiate(name, *args, **kwargs)    def _should_include(self, parent):        if self.include_if(parent):            return True, self.create(parent)        return False, None    def include(self, parent):        return self._should_include(parent)[0]    def create(self, parent):        """Create the step."""        pass    def __repr__(self):        return '<step: {0.alias}>'.format(self)    @property    def alias(self):        return self.label or _label(self)    def info(self, obj):        passclass StartStopStep(Step):    #: Optional obj created by the :meth:`create` method.    #: This is used by :class:`StartStopStep` to keep the    #: original service object.    obj = None    def start(self, parent):        if self.obj:            return self.obj.start()    def stop(self, parent):        if self.obj:            return self.obj.stop()    def close(self, parent):        pass    def terminate(self, parent):        if self.obj:            return getattr(self.obj, 'terminate', self.obj.stop)()    def include(self, parent):        inc, ret = self._should_include(parent)        if inc:            self.obj = ret            parent.steps.append(self)        return incclass ConsumerStep(StartStopStep):    requires = ('celery.worker.consumer:Connection',)    consumers = None    def get_consumers(self, channel):        raise NotImplementedError('missing get_consumers')    def start(self, c):        channel = c.connection.channel()        self.consumers = self.get_consumers(channel)        for consumer in self.consumers or []:            consumer.consume()    def stop(self, c):        self._close(c, True)    def shutdown(self, c):        self._close(c, False)    def _close(self, c, cancel_consumers=True):        channels = set()        for consumer in self.consumers or []:            if cancel_consumers:                ignore_errors(c.connection, consumer.cancel)            if consumer.channel:                channels.add(consumer.channel)        for channel in channels:            ignore_errors(c.connection, channel.close)
 |