bootsteps.py 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.worker.bootsteps
  4. ~~~~~~~~~~~~~~~~~~~~~~~
  5. The boot-steps!
  6. """
  7. from __future__ import absolute_import
  8. from collections import defaultdict
  9. from importlib import import_module
  10. from threading import Event
  11. from celery.datastructures import DependencyGraph
  12. from celery.utils.imports import instantiate
  13. from celery.utils.log import get_logger
  14. from celery.utils.threads import default_socket_timeout
  15. try:
  16. from greenlet import GreenletExit
  17. IGNORE_ERRORS = (GreenletExit, )
  18. except ImportError: # pragma: no cover
  19. IGNORE_ERRORS = ()
  20. #: Default socket timeout at shutdown.
  21. SHUTDOWN_SOCKET_TIMEOUT = 5.0
  22. #: States
  23. RUN = 0x1
  24. CLOSE = 0x2
  25. TERMINATE = 0x3
  26. logger = get_logger(__name__)
  27. def qualname(c):
  28. return '.'.join([c.namespace.name, c.name.capitalize()])
  29. class Namespace(object):
  30. """A namespace containing bootsteps.
  31. Every step must belong to a namespace.
  32. When step classes are created they are added to the
  33. mapping of unclaimed steps. The steps will be
  34. claimed when the namespace they belong to is created.
  35. :keyword name: Set the name of this namespace.
  36. :keyword app: Set the Celery app for this namespace.
  37. """
  38. name = None
  39. state = None
  40. started = 0
  41. _unclaimed = defaultdict(dict)
  42. def __init__(self, name=None, app=None, on_start=None,
  43. on_close=None, on_stopped=None):
  44. self.app = app
  45. self.name = name or self.name
  46. self.on_start = on_start
  47. self.on_close = on_close
  48. self.on_stopped = on_stopped
  49. self.services = []
  50. self.shutdown_complete = Event()
  51. def start(self, parent):
  52. self.state = RUN
  53. if self.on_start:
  54. self.on_start()
  55. for i, step in enumerate(parent.steps):
  56. if step:
  57. logger.debug('Starting %s...', qualname(step))
  58. self.started = i + 1
  59. print('STARTING: %r' % (step.start, ))
  60. step.start(parent)
  61. logger.debug('%s OK!', qualname(step))
  62. def close(self, parent):
  63. if self.on_close:
  64. self.on_close()
  65. for step in parent.steps:
  66. close = getattr(step, 'close', None)
  67. if close:
  68. close(parent)
  69. def restart(self, parent, description='Restarting', attr='stop'):
  70. with default_socket_timeout(SHUTDOWN_SOCKET_TIMEOUT): # Issue 975
  71. for step in reversed(parent.steps):
  72. if step:
  73. logger.debug('%s %s...', description, qualname(step))
  74. fun = getattr(step, attr, None)
  75. if fun:
  76. fun(parent)
  77. def stop(self, parent, close=True, terminate=False):
  78. what = 'Terminating' if terminate else 'Stopping'
  79. if self.state in (CLOSE, TERMINATE):
  80. return
  81. self.close(parent)
  82. if self.state != RUN or self.started != len(parent.steps):
  83. # Not fully started, can safely exit.
  84. self.state = TERMINATE
  85. self.shutdown_complete.set()
  86. return
  87. self.state = CLOSE
  88. self.restart(parent, what, 'terminate' if terminate else 'stop')
  89. if self.on_stopped:
  90. self.on_stopped()
  91. self.state = TERMINATE
  92. self.shutdown_complete.set()
  93. def join(self, timeout=None):
  94. try:
  95. # Will only get here if running green,
  96. # makes sure all greenthreads have exited.
  97. self.shutdown_complete.wait(timeout=timeout)
  98. except IGNORE_ERRORS:
  99. pass
  100. def modules(self):
  101. """Subclasses can override this to return a
  102. list of modules to import before steps are claimed."""
  103. return []
  104. def load_modules(self):
  105. """Will load the steps modules this namespace depends on."""
  106. for m in self.modules():
  107. self.import_module(m)
  108. def apply(self, parent, **kwargs):
  109. """Apply the steps in this namespace to an object.
  110. This will apply the ``__init__`` and ``include`` methods
  111. of each steps with the object as argument.
  112. For :class:`StartStopStep` the services created
  113. will also be added the the objects ``steps`` attribute.
  114. """
  115. self._debug('Loading modules.')
  116. self.load_modules()
  117. self._debug('Claiming steps.')
  118. self.steps = self._claim()
  119. self._debug('Building boot step graph.')
  120. self.boot_steps = [self.bind_step(name, parent, **kwargs)
  121. for name in self._finalize_boot_steps()]
  122. self._debug('New boot order: {%s}',
  123. ', '.join(c.name for c in self.boot_steps))
  124. for step in self.boot_steps:
  125. step.include(parent)
  126. return self
  127. def bind_step(self, name, parent, **kwargs):
  128. """Bind step to parent object and this namespace."""
  129. comp = self[name](parent, **kwargs)
  130. comp.namespace = self
  131. return comp
  132. def import_module(self, module):
  133. return import_module(module)
  134. def __getitem__(self, name):
  135. return self.steps[name]
  136. def _find_last(self):
  137. for C in self.steps.itervalues():
  138. if C.last:
  139. return C
  140. def _finalize_boot_steps(self):
  141. G = self.graph = DependencyGraph((C.name, C.requires)
  142. for C in self.steps.itervalues())
  143. last = self._find_last()
  144. if last:
  145. for obj in G:
  146. if obj != last.name:
  147. G.add_edge(last.name, obj)
  148. return G.topsort()
  149. def _claim(self):
  150. return self._unclaimed[self.name]
  151. def _debug(self, msg, *args):
  152. return logger.debug('[%s] ' + msg,
  153. *(self.name.capitalize(), ) + args)
  154. def _prepare_requires(req):
  155. if not isinstance(req, basestring):
  156. req = req.name
  157. return req
  158. class StepType(type):
  159. """Metaclass for steps."""
  160. def __new__(cls, name, bases, attrs):
  161. abstract = attrs.pop('abstract', False)
  162. if not abstract:
  163. try:
  164. cname = attrs['name']
  165. except KeyError:
  166. raise NotImplementedError('Steps must be named')
  167. namespace = attrs.get('namespace', None)
  168. if not namespace:
  169. attrs['namespace'], _, attrs['name'] = cname.partition('.')
  170. attrs['requires'] = tuple(_prepare_requires(req)
  171. for req in attrs.get('requires', ()))
  172. cls = super(StepType, cls).__new__(cls, name, bases, attrs)
  173. if not abstract:
  174. Namespace._unclaimed[cls.namespace][cls.name] = cls
  175. return cls
  176. class Step(object):
  177. """A Bootstep.
  178. The :meth:`__init__` method is called when the step
  179. is bound to a parent object, and can as such be used
  180. to initialize attributes in the parent object at
  181. parent instantiation-time.
  182. """
  183. __metaclass__ = StepType
  184. #: The name of the step, or the namespace
  185. #: and the name of the step separated by dot.
  186. name = None
  187. #: List of other steps that that must be started before this step.
  188. #: Note that all dependencies must be in the same namespace.
  189. requires = ()
  190. #: can be used to specify the namespace,
  191. #: if the name does not include it.
  192. namespace = None
  193. #: if set the step will not be registered,
  194. #: but can be used as a base class.
  195. abstract = True
  196. #: Optional obj created by the :meth:`create` method.
  197. #: This is used by :class:`StartStopStep` to keep the
  198. #: original service object.
  199. obj = None
  200. #: This flag is reserved for the workers Consumer,
  201. #: since it is required to always be started last.
  202. #: There can only be one object marked with lsat
  203. #: in every namespace.
  204. last = False
  205. #: This provides the default for :meth:`include_if`.
  206. enabled = True
  207. def __init__(self, parent, **kwargs):
  208. pass
  209. def create(self, parent):
  210. """Create the step."""
  211. pass
  212. def include_if(self, parent):
  213. """An optional predicate that decided whether this
  214. step should be created."""
  215. return self.enabled
  216. def instantiate(self, qualname, *args, **kwargs):
  217. return instantiate(qualname, *args, **kwargs)
  218. def include(self, parent):
  219. if self.include_if(parent):
  220. self.obj = self.create(parent)
  221. return True
  222. class StartStopStep(Step):
  223. abstract = True
  224. def start(self, parent):
  225. if self.obj:
  226. return self.obj.start()
  227. def stop(self, parent):
  228. if self.obj:
  229. return self.obj.stop()
  230. def close(self, parent):
  231. pass
  232. def terminate(self, parent):
  233. self.stop(parent)
  234. def include(self, parent):
  235. if super(StartStopStep, self).include(parent):
  236. parent.steps.append(self)