bootsteps.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.bootsteps
  4. ~~~~~~~~~~~~~~~~
  5. The bootsteps!
  6. """
  7. from __future__ import absolute_import, unicode_literals
  8. from collections import deque
  9. from importlib import import_module
  10. from threading import Event
  11. from kombu.common import ignore_errors
  12. from kombu.utils import symbol_by_name
  13. from .datastructures import DependencyGraph, GraphFormatter
  14. from .five import values, with_metaclass
  15. from .utils.imports import instantiate, qualname
  16. from .utils.log import get_logger
  17. from .utils.threads import default_socket_timeout
  18. try:
  19. from greenlet import GreenletExit
  20. IGNORE_ERRORS = (GreenletExit, )
  21. except ImportError: # pragma: no cover
  22. IGNORE_ERRORS = ()
  23. #: Default socket timeout at shutdown.
  24. SHUTDOWN_SOCKET_TIMEOUT = 5.0
  25. #: States
  26. RUN = 0x1
  27. CLOSE = 0x2
  28. TERMINATE = 0x3
  29. logger = get_logger(__name__)
  30. debug = logger.debug
  31. def _pre(ns, fmt):
  32. return '| {0}: {1}'.format(ns.alias, fmt)
  33. def _label(s):
  34. return s.name.rsplit('.', 1)[-1]
  35. class StepFormatter(GraphFormatter):
  36. namespace_prefix = '⧉'
  37. conditional_prefix = '∘'
  38. namespace_scheme = {
  39. 'shape': 'parallelogram',
  40. 'color': 'slategray4',
  41. 'fillcolor': 'slategray3',
  42. }
  43. def label(self, step):
  44. return step and '{0}{1}'.format(
  45. self._get_prefix(step),
  46. (step.label or _label(step)).encode('utf-8', 'ignore'),
  47. )
  48. def _get_prefix(self, step):
  49. if step.last:
  50. return self.namespace_prefix
  51. if step.conditional:
  52. return self.conditional_prefix
  53. return ''
  54. def node(self, obj, **attrs):
  55. scheme = self.namespace_scheme if obj.last else self.node_scheme
  56. return self.draw_node(obj, scheme, attrs)
  57. def edge(self, a, b, **attrs):
  58. if a.last:
  59. attrs.update(arrowhead='none', color='darkseagreen3')
  60. return self.draw_edge(a, b, self.edge_scheme, attrs)
  61. class Namespace(object):
  62. """A namespace containing bootsteps.
  63. :keyword steps: List of steps.
  64. :keyword name: Set explicit name for this namespace.
  65. :keyword app: Set the Celery app for this namespace.
  66. :keyword on_start: Optional callback applied after namespace start.
  67. :keyword on_close: Optional callback applied before namespace close.
  68. :keyword on_stopped: Optional callback applied after namespace stopped.
  69. """
  70. GraphFormatter = StepFormatter
  71. name = None
  72. state = None
  73. started = 0
  74. default_steps = set()
  75. def __init__(self, steps=None, name=None, app=None,
  76. on_start=None, on_close=None, on_stopped=None):
  77. self.app = app
  78. self.name = name or self.name or qualname(type(self))
  79. self.types = set(steps or []) | set(self.default_steps)
  80. self.on_start = on_start
  81. self.on_close = on_close
  82. self.on_stopped = on_stopped
  83. self.shutdown_complete = Event()
  84. self.steps = {}
  85. def start(self, parent):
  86. self.state = RUN
  87. if self.on_start:
  88. self.on_start()
  89. for i, step in enumerate(filter(None, parent.steps)):
  90. self._debug('Starting %s', step.alias)
  91. self.started = i + 1
  92. step.start(parent)
  93. debug('^-- substep ok')
  94. def info(self, parent):
  95. info = {}
  96. for step in parent.steps:
  97. info.update(step.info(parent) or {})
  98. return info
  99. def close(self, parent):
  100. if self.on_close:
  101. self.on_close()
  102. for step in parent.steps:
  103. close = getattr(step, 'close', None)
  104. if close:
  105. close(parent)
  106. def restart(self, parent, description='Restarting', attr='stop'):
  107. with default_socket_timeout(SHUTDOWN_SOCKET_TIMEOUT): # Issue 975
  108. for step in reversed(parent.steps):
  109. if step:
  110. self._debug('%s %s...', description, step.alias)
  111. fun = getattr(step, attr, None)
  112. if fun:
  113. fun(parent)
  114. def stop(self, parent, close=True, terminate=False):
  115. what = 'Terminating' if terminate else 'Stopping'
  116. if self.state in (CLOSE, TERMINATE):
  117. return
  118. self.close(parent)
  119. if self.state != RUN or self.started != len(parent.steps):
  120. # Not fully started, can safely exit.
  121. self.state = TERMINATE
  122. self.shutdown_complete.set()
  123. return
  124. self.state = CLOSE
  125. self.restart(parent, what, 'terminate' if terminate else 'stop')
  126. if self.on_stopped:
  127. self.on_stopped()
  128. self.state = TERMINATE
  129. self.shutdown_complete.set()
  130. def join(self, timeout=None):
  131. try:
  132. # Will only get here if running green,
  133. # makes sure all greenthreads have exited.
  134. self.shutdown_complete.wait(timeout=timeout)
  135. except IGNORE_ERRORS:
  136. pass
  137. def apply(self, parent, **kwargs):
  138. """Apply the steps in this namespace to an object.
  139. This will apply the ``__init__`` and ``include`` methods
  140. of each steps with the object as argument.
  141. For :class:`StartStopStep` the services created
  142. will also be added the the objects ``steps`` attribute.
  143. """
  144. self._debug('Preparing bootsteps.')
  145. order = self.order = []
  146. steps = self.steps = self.claim_steps()
  147. self._debug('Building graph...')
  148. for S in self._finalize_steps(steps):
  149. step = S(parent, **kwargs)
  150. steps[step.name] = step
  151. order.append(step)
  152. self._debug('New boot order: {%s}',
  153. ', '.join(s.alias for s in self.order))
  154. for step in order:
  155. step.include(parent)
  156. return self
  157. def connect_with(self, other):
  158. self.graph.adjacent.update(other.graph.adjacent)
  159. self.graph.add_edge(type(other.order[0]), type(self.order[-1]))
  160. def import_module(self, module):
  161. return import_module(module)
  162. def __getitem__(self, name):
  163. return self.steps[name]
  164. def _find_last(self):
  165. for C in values(self.steps):
  166. if C.last:
  167. return C
  168. def _firstpass(self, steps):
  169. stream = deque(step.requires for step in values(steps))
  170. while stream:
  171. for node in stream.popleft():
  172. node = symbol_by_name(node)
  173. if node.name not in self.steps:
  174. steps[node.name] = node
  175. stream.append(node.requires)
  176. def _finalize_steps(self, steps):
  177. last = self._find_last()
  178. self._firstpass(steps)
  179. it = ((C, C.requires) for C in values(steps))
  180. G = self.graph = DependencyGraph(
  181. it, formatter=self.GraphFormatter(root=last),
  182. )
  183. if last:
  184. for obj in G:
  185. if obj != last:
  186. G.add_edge(last, obj)
  187. try:
  188. return G.topsort()
  189. except KeyError as exc:
  190. raise KeyError('unknown bootstep: %s' % exc)
  191. def claim_steps(self):
  192. return dict(self.load_step(step) for step in self._all_steps())
  193. def _all_steps(self):
  194. return self.types | self.app.steps[self.name.lower()]
  195. def load_step(self, step):
  196. step = symbol_by_name(step)
  197. return step.name, step
  198. def _debug(self, msg, *args):
  199. return debug(_pre(self, msg), *args)
  200. @property
  201. def alias(self):
  202. return _label(self)
  203. class StepType(type):
  204. """Metaclass for steps."""
  205. def __new__(cls, name, bases, attrs):
  206. module = attrs.get('__module__')
  207. qname = '{0}.{1}'.format(module, name) if module else name
  208. attrs.update(
  209. __qualname__=qname,
  210. name=attrs.get('name') or qname,
  211. requires=attrs.get('requires', ()),
  212. )
  213. return super(StepType, cls).__new__(cls, name, bases, attrs)
  214. def __str__(self):
  215. return self.name
  216. def __repr__(self):
  217. return 'step:{0.name}{{{0.requires!r}}}'.format(self)
  218. @with_metaclass(StepType)
  219. class Step(object):
  220. """A Bootstep.
  221. The :meth:`__init__` method is called when the step
  222. is bound to a parent object, and can as such be used
  223. to initialize attributes in the parent object at
  224. parent instantiation-time.
  225. """
  226. #: Optional step name, will use qualname if not specified.
  227. name = None
  228. #: Optional short name used for graph outputs and in logs.
  229. label = None
  230. #: Set this to true if the step is enabled based on some condition.
  231. conditional = False
  232. #: List of other steps that that must be started before this step.
  233. #: Note that all dependencies must be in the same namespace.
  234. requires = ()
  235. #: This flag is reserved for the workers Consumer,
  236. #: since it is required to always be started last.
  237. #: There can only be one object marked with lsat
  238. #: in every namespace.
  239. last = False
  240. #: This provides the default for :meth:`include_if`.
  241. enabled = True
  242. def __init__(self, parent, **kwargs):
  243. pass
  244. def include_if(self, parent):
  245. """An optional predicate that decided whether this
  246. step should be created."""
  247. return self.enabled
  248. def instantiate(self, name, *args, **kwargs):
  249. return instantiate(name, *args, **kwargs)
  250. def _should_include(self, parent):
  251. if self.include_if(parent):
  252. return True, self.create(parent)
  253. return False, None
  254. def include(self, parent):
  255. return self._should_include(parent)[0]
  256. def create(self, parent):
  257. """Create the step."""
  258. pass
  259. def __repr__(self):
  260. return '<step: {0.alias}>'.format(self)
  261. @property
  262. def alias(self):
  263. return self.label or _label(self)
  264. def info(self, obj):
  265. pass
  266. class StartStopStep(Step):
  267. #: Optional obj created by the :meth:`create` method.
  268. #: This is used by :class:`StartStopStep` to keep the
  269. #: original service object.
  270. obj = None
  271. def start(self, parent):
  272. if self.obj:
  273. return self.obj.start()
  274. def stop(self, parent):
  275. if self.obj:
  276. return self.obj.stop()
  277. def close(self, parent):
  278. pass
  279. def terminate(self, parent):
  280. self.stop(parent)
  281. def include(self, parent):
  282. inc, ret = self._should_include(parent)
  283. if inc:
  284. self.obj = ret
  285. parent.steps.append(self)
  286. return inc
  287. class ConsumerStep(StartStopStep):
  288. requires = ('Connection', )
  289. consumers = None
  290. def get_consumers(self, channel):
  291. raise NotImplementedError('missing get_consumers')
  292. def start(self, c):
  293. self.consumers = self.get_consumers(c.connection)
  294. for consumer in self.consumers or []:
  295. consumer.consume()
  296. def stop(self, c):
  297. for consumer in self.consumers or []:
  298. ignore_errors(c.connection, consumer.cancel)
  299. def shutdown(self, c):
  300. self.stop(c)
  301. for consumer in self.consumers or []:
  302. if consumer.channel:
  303. ignore_errors(c.connection, consumer.channel.close)