base.py 7.9 KB


  1. # -*- coding: utf-8 -*-
  2. """Loader base class."""
  3. import imp as _imp
  4. import importlib
  5. import os
  6. import re
  7. import sys
  8. from datetime import datetime
  9. from kombu.utils import json
  10. from celery import signals
  11. from celery.utils.collections import DictAttribute, force_mapping
  12. from celery.utils.functional import maybe_list
  13. from celery.utils.imports import (
  14. import_from_cwd, symbol_by_name, NotAPackage, find_module,
  15. )
  16. __all__ = ['BaseLoader']
  17. _RACE_PROTECTION = False
  18. CONFIG_INVALID_NAME = """\
  19. Error: Module '{module}' doesn't exist, or it's not a valid \
  20. Python module name.
  21. """
  22. CONFIG_WITH_SUFFIX = CONFIG_INVALID_NAME + """\
  23. Did you mean '{suggest}'?
  24. """
  25. unconfigured = object()
  26. class BaseLoader:
  27. """Base class for loaders.
  28. Loaders handles,
  29. * Reading celery client/worker configurations.
  30. * What happens when a task starts?
  31. See :meth:`on_task_init`.
  32. * What happens when the worker starts?
  33. See :meth:`on_worker_init`.
  34. * What happens when the worker shuts down?
  35. See :meth:`on_worker_shutdown`.
  36. * What modules are imported to find tasks?
  37. """
  38. builtin_modules = frozenset()
  39. configured = False
  40. override_backends = {}
  41. worker_initialized = False
  42. _conf = unconfigured
  43. def __init__(self, app, **kwargs):
  44. self.app = app
  45. self.task_modules = set()
  46. def now(self, utc=True):
  47. if utc:
  48. return datetime.utcnow()
  49. return datetime.now()
  50. def on_task_init(self, task_id, task):
  51. """Called before a task is executed."""
  52. pass
  53. def on_process_cleanup(self):
  54. """Called after a task is executed."""
  55. pass
  56. def on_worker_init(self):
  57. """Called when the worker (:program:`celery worker`) starts."""
  58. pass
  59. def on_worker_shutdown(self):
  60. """Called when the worker (:program:`celery worker`) shuts down."""
  61. pass
  62. def on_worker_process_init(self):
  63. """Called when a child process starts."""
  64. pass
  65. def import_task_module(self, module):
  66. self.task_modules.add(module)
  67. return self.import_from_cwd(module)
  68. def import_module(self, module, package=None):
  69. return importlib.import_module(module, package=package)
  70. def import_from_cwd(self, module, imp=None, package=None):
  71. return import_from_cwd(
  72. module,
  73. self.import_module if imp is None else imp,
  74. package=package,
  75. )
  76. def import_default_modules(self):
  77. signals.import_modules.send(sender=self.app)
  78. return [
  79. self.import_task_module(m) for m in (
  80. tuple(self.builtin_modules) +
  81. tuple(maybe_list(self.app.conf.imports)) +
  82. tuple(maybe_list(self.app.conf.include))
  83. )
  84. ]
  85. def init_worker(self):
  86. if not self.worker_initialized:
  87. self.worker_initialized = True
  88. self.import_default_modules()
  89. self.on_worker_init()
  90. def shutdown_worker(self):
  91. self.on_worker_shutdown()
  92. def init_worker_process(self):
  93. self.on_worker_process_init()
  94. def config_from_object(self, obj, silent=False):
  95. if isinstance(obj, str):
  96. try:
  97. obj = self._smart_import(obj, imp=self.import_from_cwd)
  98. except (ImportError, AttributeError):
  99. if silent:
  100. return False
  101. raise
  102. self._conf = force_mapping(obj)
  103. return True
  104. def _smart_import(self, path, imp=None):
  105. imp = self.import_module if imp is None else imp
  106. if ':' in path:
  107. # Path includes attribute so can just jump
  108. # here (e.g., ``os.path:abspath``).
  109. return symbol_by_name(path, imp=imp)
  110. # Not sure if path is just a module name or if it includes an
  111. # attribute name (e.g., ``os.path``, vs, ``os.path.abspath``).
  112. try:
  113. return imp(path)
  114. except ImportError:
  115. # Not a module name, so try module + attribute.
  116. return symbol_by_name(path, imp=imp)
  117. def _import_config_module(self, name):
  118. try:
  119. self.find_module(name)
  120. except NotAPackage:
  121. if name.endswith('.py'):
  122. raise NotAPackage(CONFIG_WITH_SUFFIX.format(
  123. module=name, suggest=name[:-3])
  124. ).with_traceback(sys.exc_info()[2])
  125. raise NotAPackage(CONFIG_INVALID_NAME.format(
  126. module=name)).with_traceback(sys.exc_info()[2])
  127. else:
  128. return self.import_from_cwd(name)
  129. def find_module(self, module):
  130. return find_module(module)
  131. def cmdline_config_parser(
  132. self, args, namespace='celery',
  133. re_type=re.compile(r'\((\w+)\)'),
  134. extra_types={'json': json.loads},
  135. override_types={'tuple': 'json',
  136. 'list': 'json',
  137. 'dict': 'json'}):
  138. from celery.app.defaults import Option, NAMESPACES
  139. namespace = namespace and namespace.lower()
  140. typemap = dict(Option.typemap, **extra_types)
  141. def getarg(arg):
  142. """Parse single configuration from command-line."""
  143. # ## find key/value
  144. # ns.key=value|ns_key=value (case insensitive)
  145. key, value = arg.split('=', 1)
  146. key = key.lower().replace('.', '_')
  147. # ## find name-space.
  148. # .key=value|_key=value expands to default name-space.
  149. if key[0] == '_':
  150. ns, key = namespace, key[1:]
  151. else:
  152. # find name-space part of key
  153. ns, key = key.split('_', 1)
  154. ns_key = (ns and ns + '_' or '') + key
  155. # (type)value makes cast to custom type.
  156. cast = re_type.match(value)
  157. if cast:
  158. type_ = cast.groups()[0]
  159. type_ = override_types.get(type_, type_)
  160. value = value[len(cast.group()):]
  161. value = typemap[type_](value)
  162. else:
  163. try:
  164. value = NAMESPACES[ns.lower()][key].to_python(value)
  165. except ValueError as exc:
  166. # display key name in error message.
  167. raise ValueError('{0!r}: {1}'.format(ns_key, exc))
  168. return ns_key, value
  169. return dict(getarg(arg) for arg in args)
  170. def read_configuration(self, env='CELERY_CONFIG_MODULE'):
  171. try:
  172. custom_config = os.environ[env]
  173. except KeyError:
  174. pass
  175. else:
  176. if custom_config:
  177. usercfg = self._import_config_module(custom_config)
  178. return DictAttribute(usercfg)
  179. def autodiscover_tasks(self, packages, related_name='tasks'):
  180. self.task_modules.update(
  181. mod.__name__ for mod in autodiscover_tasks(packages or (),
  182. related_name) if mod)
  183. @property
  184. def conf(self):
  185. """Loader configuration."""
  186. if self._conf is unconfigured:
  187. self._conf = self.read_configuration()
  188. return self._conf
  189. def autodiscover_tasks(packages, related_name='tasks'):
  190. global _RACE_PROTECTION
  191. if _RACE_PROTECTION:
  192. return ()
  193. _RACE_PROTECTION = True
  194. try:
  195. return [find_related_module(pkg, related_name) for pkg in packages]
  196. finally:
  197. _RACE_PROTECTION = False
  198. def find_related_module(package, related_name):
  199. """Find module in package."""
  200. # Django 1.7 allows for speciying a class name in INSTALLED_APPS.
  201. # (Issue #2248).
  202. try:
  203. importlib.import_module(package)
  204. except ImportError:
  205. package, _, _ = package.rpartition('.')
  206. if not package:
  207. raise
  208. try:
  209. pkg_path = importlib.import_module(package).__path__
  210. except AttributeError:
  211. return
  212. try:
  213. _imp.find_module(related_name, pkg_path)
  214. except ImportError:
  215. return
  216. return importlib.import_module('{0}.{1}'.format(package, related_name))