base.py 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275
  1. # -*- coding: utf-8 -*-
  2. """Loader base class."""
  3. from __future__ import absolute_import, unicode_literals
  4. import imp as _imp
  5. import importlib
  6. import os
  7. import re
  8. import sys
  9. from datetime import datetime
  10. from kombu.utils import json
  11. from celery import signals
  12. from celery.utils.collections import DictAttribute, force_mapping
  13. from celery.utils.functional import maybe_list
  14. from celery.utils.imports import (
  15. import_from_cwd, symbol_by_name, NotAPackage, find_module,
  16. )
  17. __all__ = ['BaseLoader']
  18. _RACE_PROTECTION = False
  19. CONFIG_INVALID_NAME = """\
  20. Error: Module '{module}' doesn't exist, or it's not a valid \
  21. Python module name.
  22. """
  23. CONFIG_WITH_SUFFIX = CONFIG_INVALID_NAME + """\
  24. Did you mean '{suggest}'?
  25. """
  26. unconfigured = object()
  27. class BaseLoader:
  28. """The base class for loaders.
  29. Loaders handles,
  30. * Reading celery client/worker configurations.
  31. * What happens when a task starts?
  32. See :meth:`on_task_init`.
  33. * What happens when the worker starts?
  34. See :meth:`on_worker_init`.
  35. * What happens when the worker shuts down?
  36. See :meth:`on_worker_shutdown`.
  37. * What modules are imported to find tasks?
  38. """
  39. builtin_modules = frozenset()
  40. configured = False
  41. override_backends = {}
  42. worker_initialized = False
  43. _conf = unconfigured
  44. def __init__(self, app, **kwargs):
  45. self.app = app
  46. self.task_modules = set()
  47. def now(self, utc=True):
  48. if utc:
  49. return datetime.utcnow()
  50. return datetime.now()
  51. def on_task_init(self, task_id, task):
  52. """This method is called before a task is executed."""
  53. pass
  54. def on_process_cleanup(self):
  55. """This method is called after a task is executed."""
  56. pass
  57. def on_worker_init(self):
  58. """This method is called when the worker (:program:`celery worker`)
  59. starts."""
  60. pass
  61. def on_worker_shutdown(self):
  62. """This method is called when the worker (:program:`celery worker`)
  63. shuts down."""
  64. pass
  65. def on_worker_process_init(self):
  66. """This method is called when a child process starts."""
  67. pass
  68. def import_task_module(self, module):
  69. self.task_modules.add(module)
  70. return self.import_from_cwd(module)
  71. def import_module(self, module, package=None):
  72. return importlib.import_module(module, package=package)
  73. def import_from_cwd(self, module, imp=None, package=None):
  74. return import_from_cwd(
  75. module,
  76. self.import_module if imp is None else imp,
  77. package=package,
  78. )
  79. def import_default_modules(self):
  80. signals.import_modules.send(sender=self.app)
  81. return [
  82. self.import_task_module(m) for m in (
  83. tuple(self.builtin_modules) +
  84. tuple(maybe_list(self.app.conf.imports)) +
  85. tuple(maybe_list(self.app.conf.include))
  86. )
  87. ]
  88. def init_worker(self):
  89. if not self.worker_initialized:
  90. self.worker_initialized = True
  91. self.import_default_modules()
  92. self.on_worker_init()
  93. def shutdown_worker(self):
  94. self.on_worker_shutdown()
  95. def init_worker_process(self):
  96. self.on_worker_process_init()
  97. def config_from_object(self, obj, silent=False):
  98. if isinstance(obj, str):
  99. try:
  100. obj = self._smart_import(obj, imp=self.import_from_cwd)
  101. except (ImportError, AttributeError):
  102. if silent:
  103. return False
  104. raise
  105. self._conf = force_mapping(obj)
  106. return True
  107. def _smart_import(self, path, imp=None):
  108. imp = self.import_module if imp is None else imp
  109. if ':' in path:
  110. # Path includes attribute so can just jump here.
  111. # e.g. ``os.path:abspath``.
  112. return symbol_by_name(path, imp=imp)
  113. # Not sure if path is just a module name or if it includes an
  114. # attribute name (e.g. ``os.path``, vs, ``os.path.abspath``).
  115. try:
  116. return imp(path)
  117. except ImportError:
  118. # Not a module name, so try module + attribute.
  119. return symbol_by_name(path, imp=imp)
  120. def _import_config_module(self, name):
  121. try:
  122. self.find_module(name)
  123. except NotAPackage:
  124. if name.endswith('.py'):
  125. raise NotAPackage(CONFIG_WITH_SUFFIX.format(
  126. module=name, suggest=name[:-3])).with_traceback(sys.exc_info()[2])
  127. raise NotAPackage(CONFIG_INVALID_NAME.format(
  128. module=name)).with_traceback(sys.exc_info()[2])
  129. else:
  130. return self.import_from_cwd(name)
  131. def find_module(self, module):
  132. return find_module(module)
  133. def cmdline_config_parser(
  134. self, args, namespace='celery',
  135. re_type=re.compile(r'\((\w+)\)'),
  136. extra_types={'json': json.loads},
  137. override_types={'tuple': 'json',
  138. 'list': 'json',
  139. 'dict': 'json'}):
  140. from celery.app.defaults import Option, NAMESPACES
  141. namespace = namespace and namespace.lower()
  142. typemap = dict(Option.typemap, **extra_types)
  143. def getarg(arg):
  144. """Parse a single configuration definition from
  145. the command-line."""
  146. # ## find key/value
  147. # ns.key=value|ns_key=value (case insensitive)
  148. key, value = arg.split('=', 1)
  149. key = key.lower().replace('.', '_')
  150. # ## find name-space.
  151. # .key=value|_key=value expands to default name-space.
  152. if key[0] == '_':
  153. ns, key = namespace, key[1:]
  154. else:
  155. # find name-space part of key
  156. ns, key = key.split('_', 1)
  157. ns_key = (ns and ns + '_' or '') + key
  158. # (type)value makes cast to custom type.
  159. cast = re_type.match(value)
  160. if cast:
  161. type_ = cast.groups()[0]
  162. type_ = override_types.get(type_, type_)
  163. value = value[len(cast.group()):]
  164. value = typemap[type_](value)
  165. else:
  166. try:
  167. value = NAMESPACES[ns.lower()][key].to_python(value)
  168. except ValueError as exc:
  169. # display key name in error message.
  170. raise ValueError('{0!r}: {1}'.format(ns_key, exc))
  171. return ns_key, value
  172. return dict(getarg(arg) for arg in args)
  173. def read_configuration(self, env='CELERY_CONFIG_MODULE'):
  174. try:
  175. custom_config = os.environ[env]
  176. except KeyError:
  177. pass
  178. else:
  179. if custom_config:
  180. usercfg = self._import_config_module(custom_config)
  181. return DictAttribute(usercfg)
  182. def autodiscover_tasks(self, packages, related_name='tasks'):
  183. self.task_modules.update(
  184. mod.__name__ for mod in autodiscover_tasks(packages or (),
  185. related_name) if mod)
  186. @property
  187. def conf(self):
  188. """Loader configuration."""
  189. if self._conf is unconfigured:
  190. self._conf = self.read_configuration()
  191. return self._conf
  192. def autodiscover_tasks(packages, related_name='tasks'):
  193. global _RACE_PROTECTION
  194. if _RACE_PROTECTION:
  195. return ()
  196. _RACE_PROTECTION = True
  197. try:
  198. return [find_related_module(pkg, related_name) for pkg in packages]
  199. finally:
  200. _RACE_PROTECTION = False
  201. def find_related_module(package, related_name):
  202. """Given a package name and a module name, tries to find that
  203. module."""
  204. # Django 1.7 allows for speciying a class name in INSTALLED_APPS.
  205. # (Issue #2248).
  206. try:
  207. importlib.import_module(package)
  208. except ImportError:
  209. package, _, _ = package.rpartition('.')
  210. if not package:
  211. raise
  212. try:
  213. pkg_path = importlib.import_module(package).__path__
  214. except AttributeError:
  215. return
  216. try:
  217. _imp.find_module(related_name, pkg_path)
  218. except ImportError:
  219. return
  220. return importlib.import_module('{0}.{1}'.format(package, related_name))