|
@@ -2,14 +2,16 @@
|
|
"""A directed acyclic graph of reusable components."""
|
|
"""A directed acyclic graph of reusable components."""
|
|
from collections import deque
|
|
from collections import deque
|
|
from threading import Event
|
|
from threading import Event
|
|
|
|
+from typing import Any, Callable, Mapping, Optional, Set, Sequence, Tuple
|
|
|
|
|
|
from kombu.common import ignore_errors
|
|
from kombu.common import ignore_errors
|
|
from kombu.utils import symbol_by_name
|
|
from kombu.utils import symbol_by_name
|
|
-from kombu.utils.encoding import bytes_to_str
|
|
|
|
|
|
|
|
|
|
+from .utils.abstract import AbstractApp
|
|
from .utils.graph import DependencyGraph, GraphFormatter
|
|
from .utils.graph import DependencyGraph, GraphFormatter
|
|
from .utils.imports import instantiate, qualname
|
|
from .utils.imports import instantiate, qualname
|
|
from .utils.log import get_logger
|
|
from .utils.log import get_logger
|
|
|
|
+from .utils.typing import Timeout
|
|
|
|
|
|
try:
|
|
try:
|
|
from greenlet import GreenletExit
|
|
from greenlet import GreenletExit
|
|
@@ -28,11 +30,155 @@ TERMINATE = 0x3
|
|
logger = get_logger(__name__)
|
|
logger = get_logger(__name__)
|
|
|
|
|
|
|
|
|
|
-def _pre(ns, fmt):
|
|
|
|
|
|
+class StepType(type):
|
|
|
|
+ """Meta-class for steps."""
|
|
|
|
+
|
|
|
|
+ def __new__(cls, name, bases, attrs):
|
|
|
|
+ module = attrs.get('__module__')
|
|
|
|
+ qname = '{0}.{1}'.format(module, name) if module else name
|
|
|
|
+ attrs.update(
|
|
|
|
+ __qualname__=qname,
|
|
|
|
+ name=attrs.get('name') or qname,
|
|
|
|
+ )
|
|
|
|
+ return super().__new__(cls, name, bases, attrs)
|
|
|
|
+
|
|
|
|
+ def __str__(self):
|
|
|
|
+ return self.name
|
|
|
|
+
|
|
|
|
+ def __repr__(self):
|
|
|
|
+ return 'step:{0.name}{{{0.requires!r}}}'.format(self)
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+class Step(metaclass=StepType):
|
|
|
|
+ """A Bootstep.
|
|
|
|
+
|
|
|
|
+ The :meth:`__init__` method is called when the step
|
|
|
|
+ is bound to a parent object, and can as such be used
|
|
|
|
+ to initialize attributes in the parent object at
|
|
|
|
+ parent instantiation-time.
|
|
|
|
+ """
|
|
|
|
+
|
|
|
|
+ #: Optional step name, will use ``qualname`` if not specified.
|
|
|
|
+ name = None
|
|
|
|
+
|
|
|
|
+ #: Optional short name used for graph outputs and in logs.
|
|
|
|
+ label = None
|
|
|
|
+
|
|
|
|
+ #: Set this to true if the step is enabled based on some condition.
|
|
|
|
+ conditional = False
|
|
|
|
+
|
|
|
|
+ #: List of other steps that that must be started before this step.
|
|
|
|
+ #: Note that all dependencies must be in the same blueprint.
|
|
|
|
+ requires = ()
|
|
|
|
+
|
|
|
|
+ #: This flag is reserved for the workers Consumer,
|
|
|
|
+ #: since it is required to always be started last.
|
|
|
|
+ #: There can only be one object marked last
|
|
|
|
+ #: in every blueprint.
|
|
|
|
+ last = False
|
|
|
|
+
|
|
|
|
+ #: This provides the default for :meth:`include_if`.
|
|
|
|
+ enabled = True
|
|
|
|
+
|
|
|
|
+ def __init__(self, parent, **kwargs):
|
|
|
|
+ pass
|
|
|
|
+
|
|
|
|
+ def include_if(self, parent):
|
|
|
|
+ """An optional predicate that decides whether this
|
|
|
|
+ step should be created."""
|
|
|
|
+ return self.enabled
|
|
|
|
+
|
|
|
|
+ def instantiate(self, name, *args, **kwargs):
|
|
|
|
+ return instantiate(name, *args, **kwargs)
|
|
|
|
+
|
|
|
|
+ def _should_include(self, parent):
|
|
|
|
+ if self.include_if(parent):
|
|
|
|
+ return True, self.create(parent)
|
|
|
|
+ return False, None
|
|
|
|
+
|
|
|
|
+ def include(self, parent):
|
|
|
|
+ return self._should_include(parent)[0]
|
|
|
|
+
|
|
|
|
+ def create(self, parent):
|
|
|
|
+ """Create the step."""
|
|
|
|
+ pass
|
|
|
|
+
|
|
|
|
+ def __repr__(self):
|
|
|
|
+ return '<step: {0.alias}>'.format(self)
|
|
|
|
+
|
|
|
|
+ @property
|
|
|
|
+ def alias(self):
|
|
|
|
+ return self.label or _label(self)
|
|
|
|
+
|
|
|
|
+ def info(self, obj):
|
|
|
|
+ pass
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+class StartStopStep(Step):
|
|
|
|
+
|
|
|
|
+ #: Optional obj created by the :meth:`create` method.
|
|
|
|
+ #: This is used by :class:`StartStopStep` to keep the
|
|
|
|
+ #: original service object.
|
|
|
|
+ obj = None
|
|
|
|
+
|
|
|
|
+ def start(self, parent):
|
|
|
|
+ if self.obj:
|
|
|
|
+ return self.obj.start()
|
|
|
|
+
|
|
|
|
+ def stop(self, parent):
|
|
|
|
+ if self.obj:
|
|
|
|
+ return self.obj.stop()
|
|
|
|
+
|
|
|
|
+ def close(self, parent):
|
|
|
|
+ pass
|
|
|
|
+
|
|
|
|
+ def terminate(self, parent):
|
|
|
|
+ if self.obj:
|
|
|
|
+ return getattr(self.obj, 'terminate', self.obj.stop)()
|
|
|
|
+
|
|
|
|
+ def include(self, parent):
|
|
|
|
+ inc, ret = self._should_include(parent)
|
|
|
|
+ if inc:
|
|
|
|
+ self.obj = ret
|
|
|
|
+ parent.steps.append(self)
|
|
|
|
+ return inc
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+class ConsumerStep(StartStopStep):
|
|
|
|
+ requires = ('celery.worker.consumer:Connection',)
|
|
|
|
+ consumers = None
|
|
|
|
+
|
|
|
|
+ def get_consumers(self, channel):
|
|
|
|
+ raise NotImplementedError('missing get_consumers')
|
|
|
|
+
|
|
|
|
+ def start(self, c):
|
|
|
|
+ channel = c.connection.channel()
|
|
|
|
+ self.consumers = self.get_consumers(channel)
|
|
|
|
+ for consumer in self.consumers or []:
|
|
|
|
+ consumer.consume()
|
|
|
|
+
|
|
|
|
+ def stop(self, c):
|
|
|
|
+ self._close(c, True)
|
|
|
|
+
|
|
|
|
+ def shutdown(self, c):
|
|
|
|
+ self._close(c, False)
|
|
|
|
+
|
|
|
|
+ def _close(self, c, cancel_consumers=True):
|
|
|
|
+ channels = set()
|
|
|
|
+ for consumer in self.consumers or []:
|
|
|
|
+ if cancel_consumers:
|
|
|
|
+ ignore_errors(c.connection, consumer.cancel)
|
|
|
|
+ if consumer.channel:
|
|
|
|
+ channels.add(consumer.channel)
|
|
|
|
+ for channel in channels:
|
|
|
|
+ ignore_errors(c.connection, channel.close)
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+def _pre(ns: Step, fmt: str) -> str:
|
|
return '| {0}: {1}'.format(ns.alias, fmt)
|
|
return '| {0}: {1}'.format(ns.alias, fmt)
|
|
|
|
|
|
|
|
|
|
-def _label(s):
|
|
|
|
|
|
+def _label(s: Step) -> str:
|
|
return s.name.rsplit('.', 1)[-1]
|
|
return s.name.rsplit('.', 1)[-1]
|
|
|
|
|
|
|
|
|
|
@@ -47,25 +193,24 @@ class StepFormatter(GraphFormatter):
|
|
'fillcolor': 'slategray3',
|
|
'fillcolor': 'slategray3',
|
|
}
|
|
}
|
|
|
|
|
|
- def label(self, step):
|
|
|
|
|
|
+ def label(self, step: Step) -> str:
|
|
return step and '{0}{1}'.format(
|
|
return step and '{0}{1}'.format(
|
|
self._get_prefix(step),
|
|
self._get_prefix(step),
|
|
- bytes_to_str(
|
|
|
|
- (step.label or _label(step)).encode('utf-8', 'ignore')),
|
|
|
|
|
|
+ (step.label or _label(step)).encode('utf-8', 'ignore').decode(),
|
|
)
|
|
)
|
|
|
|
|
|
- def _get_prefix(self, step):
|
|
|
|
|
|
+ def _get_prefix(self, step: Step) -> str:
|
|
if step.last:
|
|
if step.last:
|
|
return self.blueprint_prefix
|
|
return self.blueprint_prefix
|
|
if step.conditional:
|
|
if step.conditional:
|
|
return self.conditional_prefix
|
|
return self.conditional_prefix
|
|
return ''
|
|
return ''
|
|
|
|
|
|
- def node(self, obj, **attrs):
|
|
|
|
|
|
+ def node(self, obj: Any, **attrs) -> str:
|
|
scheme = self.blueprint_scheme if obj.last else self.node_scheme
|
|
scheme = self.blueprint_scheme if obj.last else self.node_scheme
|
|
return self.draw_node(obj, scheme, attrs)
|
|
return self.draw_node(obj, scheme, attrs)
|
|
|
|
|
|
- def edge(self, a, b, **attrs):
|
|
|
|
|
|
+ def edge(self, a: Any, b: Any, **attrs) -> str:
|
|
if a.last:
|
|
if a.last:
|
|
attrs.update(arrowhead='none', color='darkseagreen3')
|
|
attrs.update(arrowhead='none', color='darkseagreen3')
|
|
return self.draw_edge(a, b, self.edge_scheme, attrs)
|
|
return self.draw_edge(a, b, self.edge_scheme, attrs)
|
|
@@ -85,19 +230,23 @@ class Blueprint:
|
|
"""
|
|
"""
|
|
GraphFormatter = StepFormatter
|
|
GraphFormatter = StepFormatter
|
|
|
|
|
|
- name = None
|
|
|
|
- state = None
|
|
|
|
- started = 0
|
|
|
|
- default_steps = set()
|
|
|
|
- state_to_name = {
|
|
|
|
|
|
+ name = None # type: Optional[str]
|
|
|
|
+ state = None # type: Optional[int]
|
|
|
|
+ started = 0 # type: int
|
|
|
|
+ default_steps = set() # type: Set[Union[str, Step]]
|
|
|
|
+ state_to_name = { # type: Mapping[int, str]
|
|
0: 'initializing',
|
|
0: 'initializing',
|
|
RUN: 'running',
|
|
RUN: 'running',
|
|
CLOSE: 'closing',
|
|
CLOSE: 'closing',
|
|
TERMINATE: 'terminating',
|
|
TERMINATE: 'terminating',
|
|
}
|
|
}
|
|
|
|
|
|
- def __init__(self, steps=None, name=None, app=None,
|
|
|
|
- on_start=None, on_close=None, on_stopped=None):
|
|
|
|
|
|
+ def __init__(self, steps: Optional[Sequence]=None,
|
|
|
|
+ name: Optional[str]=None,
|
|
|
|
+ app: Optional[AbstractApp]=None,
|
|
|
|
+ on_start: Optional[Callable[[], None]]=None,
|
|
|
|
+ on_close: Optional[Callable[[], None]]=None,
|
|
|
|
+ on_stopped: Optional[Callable[[], None]]=None) -> None:
|
|
self.app = app
|
|
self.app = app
|
|
self.name = name or self.name or qualname(type(self))
|
|
self.name = name or self.name or qualname(type(self))
|
|
self.types = set(steps or []) | set(self.default_steps)
|
|
self.types = set(steps or []) | set(self.default_steps)
|
|
@@ -105,9 +254,9 @@ class Blueprint:
|
|
self.on_close = on_close
|
|
self.on_close = on_close
|
|
self.on_stopped = on_stopped
|
|
self.on_stopped = on_stopped
|
|
self.shutdown_complete = Event()
|
|
self.shutdown_complete = Event()
|
|
- self.steps = {}
|
|
|
|
|
|
+ self.steps = {} # type: Mapping[str, Step]
|
|
|
|
|
|
- def start(self, parent):
|
|
|
|
|
|
+ def start(self, parent: Any) -> None:
|
|
self.state = RUN
|
|
self.state = RUN
|
|
if self.on_start:
|
|
if self.on_start:
|
|
self.on_start()
|
|
self.on_start()
|
|
@@ -117,26 +266,28 @@ class Blueprint:
|
|
step.start(parent)
|
|
step.start(parent)
|
|
logger.debug('^-- substep ok')
|
|
logger.debug('^-- substep ok')
|
|
|
|
|
|
- def human_state(self):
|
|
|
|
|
|
+ def human_state(self) -> str:
|
|
return self.state_to_name[self.state or 0]
|
|
return self.state_to_name[self.state or 0]
|
|
|
|
|
|
- def info(self, parent):
|
|
|
|
|
|
+ def info(self, parent: Any) -> Mapping[str, Any]:
|
|
info = {}
|
|
info = {}
|
|
for step in parent.steps:
|
|
for step in parent.steps:
|
|
info.update(step.info(parent) or {})
|
|
info.update(step.info(parent) or {})
|
|
return info
|
|
return info
|
|
|
|
|
|
- def close(self, parent):
|
|
|
|
|
|
+ def close(self, parent: Any) -> None:
|
|
if self.on_close:
|
|
if self.on_close:
|
|
self.on_close()
|
|
self.on_close()
|
|
self.send_all(parent, 'close', 'closing', reverse=False)
|
|
self.send_all(parent, 'close', 'closing', reverse=False)
|
|
|
|
|
|
- def restart(self, parent, method='stop',
|
|
|
|
- description='restarting', propagate=False):
|
|
|
|
|
|
+ def restart(self, parent: Any, method: str='stop',
|
|
|
|
+ description: str='restarting', propagate: bool=False) -> None:
|
|
self.send_all(parent, method, description, propagate=propagate)
|
|
self.send_all(parent, method, description, propagate=propagate)
|
|
|
|
|
|
- def send_all(self, parent, method,
|
|
|
|
- description=None, reverse=True, propagate=True, args=()):
|
|
|
|
|
|
+ def send_all(self, parent: Any, method: str,
|
|
|
|
+ description: Optional[str]=None,
|
|
|
|
+ reverse: bool=True, propagate: bool=True,
|
|
|
|
+ args: Sequence=()) -> None:
|
|
description = description or method.replace('_', ' ')
|
|
description = description or method.replace('_', ' ')
|
|
steps = reversed(parent.steps) if reverse else parent.steps
|
|
steps = reversed(parent.steps) if reverse else parent.steps
|
|
for step in steps:
|
|
for step in steps:
|
|
@@ -155,7 +306,8 @@ class Blueprint:
|
|
description, step.alias, exc, exc_info=1,
|
|
description, step.alias, exc, exc_info=1,
|
|
)
|
|
)
|
|
|
|
|
|
- def stop(self, parent, close=True, terminate=False):
|
|
|
|
|
|
+ def stop(self, parent: Any,
|
|
|
|
+ close: bool=True, terminate: bool=False) -> None:
|
|
what = 'terminating' if terminate else 'stopping'
|
|
what = 'terminating' if terminate else 'stopping'
|
|
if self.state in (CLOSE, TERMINATE):
|
|
if self.state in (CLOSE, TERMINATE):
|
|
return
|
|
return
|
|
@@ -178,7 +330,7 @@ class Blueprint:
|
|
self.state = TERMINATE
|
|
self.state = TERMINATE
|
|
self.shutdown_complete.set()
|
|
self.shutdown_complete.set()
|
|
|
|
|
|
- def join(self, timeout=None):
|
|
|
|
|
|
+ def join(self, timeout: Timeout=None) -> None:
|
|
try:
|
|
try:
|
|
# Will only get here if running green,
|
|
# Will only get here if running green,
|
|
# makes sure all greenthreads have exited.
|
|
# makes sure all greenthreads have exited.
|
|
@@ -186,7 +338,7 @@ class Blueprint:
|
|
except IGNORE_ERRORS:
|
|
except IGNORE_ERRORS:
|
|
pass
|
|
pass
|
|
|
|
|
|
- def apply(self, parent, **kwargs):
|
|
|
|
|
|
+ def apply(self, parent: Any, **kwargs) -> 'Blueprint':
|
|
"""Apply the steps in this blueprint to an object.
|
|
"""Apply the steps in this blueprint to an object.
|
|
|
|
|
|
This will apply the ``__init__`` and ``include`` methods
|
|
This will apply the ``__init__`` and ``include`` methods
|
|
@@ -214,17 +366,17 @@ class Blueprint:
|
|
step.include(parent)
|
|
step.include(parent)
|
|
return self
|
|
return self
|
|
|
|
|
|
- def connect_with(self, other):
|
|
|
|
|
|
+ def connect_with(self, other: 'Blueprint') -> None:
|
|
self.graph.adjacent.update(other.graph.adjacent)
|
|
self.graph.adjacent.update(other.graph.adjacent)
|
|
self.graph.add_edge(type(other.order[0]), type(self.order[-1]))
|
|
self.graph.add_edge(type(other.order[0]), type(self.order[-1]))
|
|
|
|
|
|
- def __getitem__(self, name):
|
|
|
|
|
|
+ def __getitem__(self, name: str) -> Step:
|
|
return self.steps[name]
|
|
return self.steps[name]
|
|
|
|
|
|
- def _find_last(self):
|
|
|
|
|
|
+ def _find_last(self) -> Optional[Step]:
|
|
return next((C for C in self.steps.values() if C.last), None)
|
|
return next((C for C in self.steps.values() if C.last), None)
|
|
|
|
|
|
- def _firstpass(self, steps):
|
|
|
|
|
|
+ def _firstpass(self, steps: Mapping[str, Step]) -> None:
|
|
for step in steps.values():
|
|
for step in steps.values():
|
|
step.requires = [symbol_by_name(dep) for dep in step.requires]
|
|
step.requires = [symbol_by_name(dep) for dep in step.requires]
|
|
stream = deque(step.requires for step in steps.values())
|
|
stream = deque(step.requires for step in steps.values())
|
|
@@ -235,7 +387,7 @@ class Blueprint:
|
|
steps[node.name] = node
|
|
steps[node.name] = node
|
|
stream.append(node.requires)
|
|
stream.append(node.requires)
|
|
|
|
|
|
- def _finalize_steps(self, steps):
|
|
|
|
|
|
+ def _finalize_steps(self, steps: Mapping[str, Step]) -> Sequence[Step]:
|
|
last = self._find_last()
|
|
last = self._find_last()
|
|
self._firstpass(steps)
|
|
self._firstpass(steps)
|
|
it = ((C, C.requires) for C in steps.values())
|
|
it = ((C, C.requires) for C in steps.values())
|
|
@@ -251,163 +403,19 @@ class Blueprint:
|
|
except KeyError as exc:
|
|
except KeyError as exc:
|
|
raise KeyError('unknown bootstep: %s' % exc)
|
|
raise KeyError('unknown bootstep: %s' % exc)
|
|
|
|
|
|
- def claim_steps(self):
|
|
|
|
|
|
+ def claim_steps(self) -> Mapping[str, Step]:
|
|
return dict(self.load_step(step) for step in self._all_steps())
|
|
return dict(self.load_step(step) for step in self._all_steps())
|
|
|
|
|
|
- def _all_steps(self):
|
|
|
|
|
|
+ def _all_steps(self) -> Set:
|
|
return self.types | self.app.steps[self.name.lower()]
|
|
return self.types | self.app.steps[self.name.lower()]
|
|
|
|
|
|
- def load_step(self, step):
|
|
|
|
|
|
+ def load_step(self, step: Step) -> Tuple[str, Step]:
|
|
step = symbol_by_name(step)
|
|
step = symbol_by_name(step)
|
|
return step.name, step
|
|
return step.name, step
|
|
|
|
|
|
- def _debug(self, msg, *args):
|
|
|
|
- return logger.debug(_pre(self, msg), *args)
|
|
|
|
|
|
+ def _debug(self, msg: str, *args) -> None:
|
|
|
|
+ logger.debug(_pre(self, msg), *args)
|
|
|
|
|
|
@property
|
|
@property
|
|
- def alias(self):
|
|
|
|
|
|
+ def alias(self) -> str:
|
|
return _label(self)
|
|
return _label(self)
|
|
-
|
|
|
|
-
|
|
|
|
-class StepType(type):
|
|
|
|
- """Meta-class for steps."""
|
|
|
|
-
|
|
|
|
- def __new__(cls, name, bases, attrs):
|
|
|
|
- module = attrs.get('__module__')
|
|
|
|
- qname = '{0}.{1}'.format(module, name) if module else name
|
|
|
|
- attrs.update(
|
|
|
|
- __qualname__=qname,
|
|
|
|
- name=attrs.get('name') or qname,
|
|
|
|
- )
|
|
|
|
- return super().__new__(cls, name, bases, attrs)
|
|
|
|
-
|
|
|
|
- def __str__(self):
|
|
|
|
- return self.name
|
|
|
|
-
|
|
|
|
- def __repr__(self):
|
|
|
|
- return 'step:{0.name}{{{0.requires!r}}}'.format(self)
|
|
|
|
-
|
|
|
|
-
|
|
|
|
-class Step(metaclass=StepType):
|
|
|
|
- """A Bootstep.
|
|
|
|
-
|
|
|
|
- The :meth:`__init__` method is called when the step
|
|
|
|
- is bound to a parent object, and can as such be used
|
|
|
|
- to initialize attributes in the parent object at
|
|
|
|
- parent instantiation-time.
|
|
|
|
- """
|
|
|
|
-
|
|
|
|
- #: Optional step name, will use ``qualname`` if not specified.
|
|
|
|
- name = None
|
|
|
|
-
|
|
|
|
- #: Optional short name used for graph outputs and in logs.
|
|
|
|
- label = None
|
|
|
|
-
|
|
|
|
- #: Set this to true if the step is enabled based on some condition.
|
|
|
|
- conditional = False
|
|
|
|
-
|
|
|
|
- #: List of other steps that that must be started before this step.
|
|
|
|
- #: Note that all dependencies must be in the same blueprint.
|
|
|
|
- requires = ()
|
|
|
|
-
|
|
|
|
- #: This flag is reserved for the workers Consumer,
|
|
|
|
- #: since it is required to always be started last.
|
|
|
|
- #: There can only be one object marked last
|
|
|
|
- #: in every blueprint.
|
|
|
|
- last = False
|
|
|
|
-
|
|
|
|
- #: This provides the default for :meth:`include_if`.
|
|
|
|
- enabled = True
|
|
|
|
-
|
|
|
|
- def __init__(self, parent, **kwargs):
|
|
|
|
- pass
|
|
|
|
-
|
|
|
|
- def include_if(self, parent):
|
|
|
|
- """An optional predicate that decides whether this
|
|
|
|
- step should be created."""
|
|
|
|
- return self.enabled
|
|
|
|
-
|
|
|
|
- def instantiate(self, name, *args, **kwargs):
|
|
|
|
- return instantiate(name, *args, **kwargs)
|
|
|
|
-
|
|
|
|
- def _should_include(self, parent):
|
|
|
|
- if self.include_if(parent):
|
|
|
|
- return True, self.create(parent)
|
|
|
|
- return False, None
|
|
|
|
-
|
|
|
|
- def include(self, parent):
|
|
|
|
- return self._should_include(parent)[0]
|
|
|
|
-
|
|
|
|
- def create(self, parent):
|
|
|
|
- """Create the step."""
|
|
|
|
- pass
|
|
|
|
-
|
|
|
|
- def __repr__(self):
|
|
|
|
- return '<step: {0.alias}>'.format(self)
|
|
|
|
-
|
|
|
|
- @property
|
|
|
|
- def alias(self):
|
|
|
|
- return self.label or _label(self)
|
|
|
|
-
|
|
|
|
- def info(self, obj):
|
|
|
|
- pass
|
|
|
|
-
|
|
|
|
-
|
|
|
|
-class StartStopStep(Step):
|
|
|
|
-
|
|
|
|
- #: Optional obj created by the :meth:`create` method.
|
|
|
|
- #: This is used by :class:`StartStopStep` to keep the
|
|
|
|
- #: original service object.
|
|
|
|
- obj = None
|
|
|
|
-
|
|
|
|
- def start(self, parent):
|
|
|
|
- if self.obj:
|
|
|
|
- return self.obj.start()
|
|
|
|
-
|
|
|
|
- def stop(self, parent):
|
|
|
|
- if self.obj:
|
|
|
|
- return self.obj.stop()
|
|
|
|
-
|
|
|
|
- def close(self, parent):
|
|
|
|
- pass
|
|
|
|
-
|
|
|
|
- def terminate(self, parent):
|
|
|
|
- if self.obj:
|
|
|
|
- return getattr(self.obj, 'terminate', self.obj.stop)()
|
|
|
|
-
|
|
|
|
- def include(self, parent):
|
|
|
|
- inc, ret = self._should_include(parent)
|
|
|
|
- if inc:
|
|
|
|
- self.obj = ret
|
|
|
|
- parent.steps.append(self)
|
|
|
|
- return inc
|
|
|
|
-
|
|
|
|
-
|
|
|
|
-class ConsumerStep(StartStopStep):
|
|
|
|
- requires = ('celery.worker.consumer:Connection',)
|
|
|
|
- consumers = None
|
|
|
|
-
|
|
|
|
- def get_consumers(self, channel):
|
|
|
|
- raise NotImplementedError('missing get_consumers')
|
|
|
|
-
|
|
|
|
- def start(self, c):
|
|
|
|
- channel = c.connection.channel()
|
|
|
|
- self.consumers = self.get_consumers(channel)
|
|
|
|
- for consumer in self.consumers or []:
|
|
|
|
- consumer.consume()
|
|
|
|
-
|
|
|
|
- def stop(self, c):
|
|
|
|
- self._close(c, True)
|
|
|
|
-
|
|
|
|
- def shutdown(self, c):
|
|
|
|
- self._close(c, False)
|
|
|
|
-
|
|
|
|
- def _close(self, c, cancel_consumers=True):
|
|
|
|
- channels = set()
|
|
|
|
- for consumer in self.consumers or []:
|
|
|
|
- if cancel_consumers:
|
|
|
|
- ignore_errors(c.connection, consumer.cancel)
|
|
|
|
- if consumer.channel:
|
|
|
|
- channels.add(consumer.channel)
|
|
|
|
- for channel in channels:
|
|
|
|
- ignore_errors(c.connection, channel.close)
|
|
|