base.py 8.5 KB


  1. # -*- coding: utf-8 -*-
  2. """Loader base class."""
  3. from __future__ import absolute_import, unicode_literals
  4. import imp as _imp
  5. import importlib
  6. import os
  7. import re
  8. import sys
  9. from datetime import datetime
  10. from kombu.utils import json
  11. from kombu.utils.objects import cached_property
  12. from celery import signals
  13. from celery.five import reraise, string_t
  14. from celery.utils.collections import DictAttribute, force_mapping
  15. from celery.utils.functional import maybe_list
  16. from celery.utils.imports import (
  17. import_from_cwd, symbol_by_name, NotAPackage, find_module,
  18. )
  19. __all__ = ('BaseLoader',)
  20. _RACE_PROTECTION = False
  21. CONFIG_INVALID_NAME = """\
  22. Error: Module '{module}' doesn't exist, or it's not a valid \
  23. Python module name.
  24. """
  25. CONFIG_WITH_SUFFIX = CONFIG_INVALID_NAME + """\
  26. Did you mean '{suggest}'?
  27. """
  28. unconfigured = object()
  29. class BaseLoader(object):
  30. """Base class for loaders.
  31. Loaders handles,
  32. * Reading celery client/worker configurations.
  33. * What happens when a task starts?
  34. See :meth:`on_task_init`.
  35. * What happens when the worker starts?
  36. See :meth:`on_worker_init`.
  37. * What happens when the worker shuts down?
  38. See :meth:`on_worker_shutdown`.
  39. * What modules are imported to find tasks?
  40. """
  41. builtin_modules = frozenset()
  42. configured = False
  43. override_backends = {}
  44. worker_initialized = False
  45. _conf = unconfigured
  46. def __init__(self, app, **kwargs):
  47. self.app = app
  48. self.task_modules = set()
  49. def now(self, utc=True):
  50. if utc:
  51. return datetime.utcnow()
  52. return datetime.now()
  53. def on_task_init(self, task_id, task):
  54. """Called before a task is executed."""
  55. pass
  56. def on_process_cleanup(self):
  57. """Called after a task is executed."""
  58. pass
  59. def on_worker_init(self):
  60. """Called when the worker (:program:`celery worker`) starts."""
  61. pass
  62. def on_worker_shutdown(self):
  63. """Called when the worker (:program:`celery worker`) shuts down."""
  64. pass
  65. def on_worker_process_init(self):
  66. """Called when a child process starts."""
  67. pass
  68. def import_task_module(self, module):
  69. self.task_modules.add(module)
  70. return self.import_from_cwd(module)
  71. def import_module(self, module, package=None):
  72. return importlib.import_module(module, package=package)
  73. def import_from_cwd(self, module, imp=None, package=None):
  74. return import_from_cwd(
  75. module,
  76. self.import_module if imp is None else imp,
  77. package=package,
  78. )
  79. def import_default_modules(self):
  80. responses = signals.import_modules.send(sender=self.app)
  81. # Prior to this point loggers are not yet set up properly, need to
  82. # check responses manually and reraised exceptions if any, otherwise
  83. # they'll be silenced, making it incredibly difficult to debug.
  84. for _, response in responses:
  85. if isinstance(response, Exception):
  86. raise response
  87. return [self.import_task_module(m) for m in self.default_modules]
  88. def init_worker(self):
  89. if not self.worker_initialized:
  90. self.worker_initialized = True
  91. self.import_default_modules()
  92. self.on_worker_init()
  93. def shutdown_worker(self):
  94. self.on_worker_shutdown()
  95. def init_worker_process(self):
  96. self.on_worker_process_init()
  97. def config_from_object(self, obj, silent=False):
  98. if isinstance(obj, string_t):
  99. try:
  100. obj = self._smart_import(obj, imp=self.import_from_cwd)
  101. except (ImportError, AttributeError):
  102. if silent:
  103. return False
  104. raise
  105. self._conf = force_mapping(obj)
  106. return True
  107. def _smart_import(self, path, imp=None):
  108. imp = self.import_module if imp is None else imp
  109. if ':' in path:
  110. # Path includes attribute so can just jump
  111. # here (e.g., ``os.path:abspath``).
  112. return symbol_by_name(path, imp=imp)
  113. # Not sure if path is just a module name or if it includes an
  114. # attribute name (e.g., ``os.path``, vs, ``os.path.abspath``).
  115. try:
  116. return imp(path)
  117. except ImportError:
  118. # Not a module name, so try module + attribute.
  119. return symbol_by_name(path, imp=imp)
  120. def _import_config_module(self, name):
  121. try:
  122. self.find_module(name)
  123. except NotAPackage:
  124. if name.endswith('.py'):
  125. reraise(NotAPackage, NotAPackage(CONFIG_WITH_SUFFIX.format(
  126. module=name, suggest=name[:-3])), sys.exc_info()[2])
  127. reraise(NotAPackage, NotAPackage(CONFIG_INVALID_NAME.format(
  128. module=name)), sys.exc_info()[2])
  129. else:
  130. return self.import_from_cwd(name)
  131. def find_module(self, module):
  132. return find_module(module)
  133. def cmdline_config_parser(
  134. self, args, namespace='celery',
  135. re_type=re.compile(r'\((\w+)\)'),
  136. extra_types={'json': json.loads},
  137. override_types={'tuple': 'json',
  138. 'list': 'json',
  139. 'dict': 'json'}):
  140. from celery.app.defaults import Option, NAMESPACES
  141. namespace = namespace and namespace.lower()
  142. typemap = dict(Option.typemap, **extra_types)
  143. def getarg(arg):
  144. """Parse single configuration from command-line."""
  145. # ## find key/value
  146. # ns.key=value|ns_key=value (case insensitive)
  147. key, value = arg.split('=', 1)
  148. key = key.lower().replace('.', '_')
  149. # ## find name-space.
  150. # .key=value|_key=value expands to default name-space.
  151. if key[0] == '_':
  152. ns, key = namespace, key[1:]
  153. else:
  154. # find name-space part of key
  155. ns, key = key.split('_', 1)
  156. ns_key = (ns and ns + '_' or '') + key
  157. # (type)value makes cast to custom type.
  158. cast = re_type.match(value)
  159. if cast:
  160. type_ = cast.groups()[0]
  161. type_ = override_types.get(type_, type_)
  162. value = value[len(cast.group()):]
  163. value = typemap[type_](value)
  164. else:
  165. try:
  166. value = NAMESPACES[ns.lower()][key].to_python(value)
  167. except ValueError as exc:
  168. # display key name in error message.
  169. raise ValueError('{0!r}: {1}'.format(ns_key, exc))
  170. return ns_key, value
  171. return dict(getarg(arg) for arg in args)
  172. def read_configuration(self, env='CELERY_CONFIG_MODULE'):
  173. try:
  174. custom_config = os.environ[env]
  175. except KeyError:
  176. pass
  177. else:
  178. if custom_config:
  179. usercfg = self._import_config_module(custom_config)
  180. return DictAttribute(usercfg)
  181. def autodiscover_tasks(self, packages, related_name='tasks'):
  182. self.task_modules.update(
  183. mod.__name__ for mod in autodiscover_tasks(packages or (),
  184. related_name) if mod)
  185. @cached_property
  186. def default_modules(self):
  187. return (
  188. tuple(self.builtin_modules) +
  189. tuple(maybe_list(self.app.conf.imports)) +
  190. tuple(maybe_list(self.app.conf.include))
  191. )
  192. @property
  193. def conf(self):
  194. """Loader configuration."""
  195. if self._conf is unconfigured:
  196. self._conf = self.read_configuration()
  197. return self._conf
  198. def autodiscover_tasks(packages, related_name='tasks'):
  199. global _RACE_PROTECTION
  200. if _RACE_PROTECTION:
  201. return ()
  202. _RACE_PROTECTION = True
  203. try:
  204. return [find_related_module(pkg, related_name) for pkg in packages]
  205. finally:
  206. _RACE_PROTECTION = False
  207. def find_related_module(package, related_name):
  208. """Find module in package."""
  209. # Django 1.7 allows for speciying a class name in INSTALLED_APPS.
  210. # (Issue #2248).
  211. try:
  212. importlib.import_module(package)
  213. except ImportError:
  214. package, _, _ = package.rpartition('.')
  215. if not package:
  216. raise
  217. try:
  218. pkg_path = importlib.import_module(package).__path__
  219. except AttributeError:
  220. return
  221. try:
  222. _imp.find_module(related_name, pkg_path)
  223. except ImportError:
  224. return
  225. return importlib.import_module('{0}.{1}'.format(package, related_name))