abstract.py 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.abstract
  4. ~~~~~~~~~~~~~~~
  5. Implements components and boot-steps.
  6. :copyright: (c) 2009 - 2012 by Ask Solem.
  7. :license: BSD, see LICENSE for more details.
  8. """
  9. from __future__ import absolute_import
  10. from collections import defaultdict
  11. from importlib import import_module
  12. from .datastructures import DependencyGraph
  13. from .utils import instantiate
  14. class Namespace(object):
  15. """A namespace containing components.
  16. Every component must belong to a namespace.
  17. When component classes are created they are added to the
  18. mapping of unclaimed components. The components will be
  19. claimed when the namespace they belong to is created.
  20. :keyword name: Set the name of this namespace.
  21. :keyword app: Set the Celery app for this namespace.
  22. """
  23. name = None
  24. _unclaimed = defaultdict(dict)
  25. _started_count = 0
  26. def __init__(self, name=None, app=None, logger=None):
  27. self.app = app
  28. self.name = name or self.name
  29. self.logger = logger or self.app.log.get_default_logger()
  30. self.services = []
  31. def modules(self):
  32. """Subclasses can override this to return a
  33. list of modules to import before components are claimed."""
  34. return []
  35. def load_modules(self):
  36. """Will load the component modules this namespace depends on."""
  37. for m in self.modules():
  38. self.import_module(m)
  39. def apply(self, parent, **kwargs):
  40. """Apply the components in this namespace to an object.
  41. This will apply the ``__init__`` and ``include`` methods
  42. of each components with the object as argument.
  43. For ``StartStopComponents`` the services created
  44. will also be added the the objects ``components`` attribute.
  45. """
  46. self._debug("Loading modules.")
  47. self.load_modules()
  48. self._debug("Claiming components.")
  49. self.components = self._claim()
  50. self._debug("Building boot step graph.")
  51. self.boot_steps = [self.bind_component(name, parent, **kwargs)
  52. for name in self._finalize_boot_steps()]
  53. self._debug("New boot order: %r", [c.name for c in self.boot_steps])
  54. for component in self.boot_steps:
  55. component.include(parent)
  56. return self
  57. def bind_component(self, name, parent, **kwargs):
  58. """Bind component to parent object and this namespace."""
  59. comp = self[name](parent, **kwargs)
  60. comp.namespace = self
  61. return comp
  62. def import_module(self, module):
  63. return import_module(module)
  64. def __getitem__(self, name):
  65. return self.components[name]
  66. def _find_last(self):
  67. for C in self.components.itervalues():
  68. if C.last:
  69. return C
  70. def _finalize_boot_steps(self):
  71. G = self.graph = DependencyGraph((C.name, C.requires)
  72. for C in self.components.itervalues())
  73. last = self._find_last()
  74. if last:
  75. for obj in G:
  76. if obj != last.name:
  77. G.add_edge(last.name, obj)
  78. return G.topsort()
  79. def _claim(self):
  80. return self._unclaimed[self.name]
  81. def _debug(self, msg, *args):
  82. return self.logger.debug("[%s] " + msg,
  83. *(self.name.capitalize(), ) + args)
  84. class ComponentType(type):
  85. """Metaclass for components."""
  86. def __new__(cls, name, bases, attrs):
  87. abstract = attrs.pop("abstract", False)
  88. if not abstract:
  89. try:
  90. cname = attrs["name"]
  91. except KeyError:
  92. raise NotImplementedError("Components must be named")
  93. namespace = attrs.get("namespace", None)
  94. if not namespace:
  95. attrs["namespace"], _, attrs["name"] = cname.partition('.')
  96. cls = super(ComponentType, cls).__new__(cls, name, bases, attrs)
  97. if not abstract:
  98. Namespace._unclaimed[cls.namespace][cls.name] = cls
  99. return cls
  100. class Component(object):
  101. """A component.
  102. The :meth:`__init__` method is called when the component
  103. is bound to a parent object, and can as such be used
  104. to initialize attributes in the parent object at
  105. parent instantiation-time.
  106. """
  107. __metaclass__ = ComponentType
  108. #: The name of the component, or the namespace
  109. #: and the name of the component separated by dot.
  110. name = None
  111. #: List of component names this component depends on.
  112. #: Note that the dependencies must be in the same namespace.
  113. requires = ()
  114. #: can be used to specify the namespace,
  115. #: if the name does not include it.
  116. namespace = None
  117. #: if set the component will not be registered,
  118. #: but can be used as a component base class.
  119. abstract = True
  120. #: Optional obj created by the :meth:`create` method.
  121. #: This is used by StartStopComponents to keep the
  122. #: original service object.
  123. obj = None
  124. #: This flag is reserved for the workers Consumer,
  125. #: since it is required to always be started last.
  126. #: There can only be one object marked with lsat
  127. #: in every namespace.
  128. last = False
  129. #: This provides the default for :meth:`include_if`.
  130. enabled = True
  131. def __init__(self, parent, **kwargs):
  132. pass
  133. def create(self, parent):
  134. """Create the component."""
  135. pass
  136. def include_if(self, parent):
  137. """An optional predicate that decided whether this
  138. component should be created."""
  139. return self.enabled
  140. def instantiate(self, qualname, *args, **kwargs):
  141. return instantiate(qualname, *args, **kwargs)
  142. def include(self, parent):
  143. if self.include_if(parent):
  144. self.obj = self.create(parent)
  145. return True
  146. class StartStopComponent(Component):
  147. abstract = True
  148. terminable = False
  149. def start(self):
  150. return self.obj.start()
  151. def stop(self):
  152. return self.obj.stop()
  153. def terminate(self):
  154. if self.terminable:
  155. return self.obj.terminate()
  156. return self.obj.stop()
  157. def include(self, parent):
  158. if super(StartStopComponent, self).include(parent):
  159. parent.components.append(self.obj)