base.py 9.2 KB


  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.loaders.base
  4. ~~~~~~~~~~~~~~~~~~~
  5. Loader base class.
  6. """
  7. from __future__ import absolute_import
  8. import imp as _imp
  9. import importlib
  10. import os
  11. import re
  12. import sys
  13. from datetime import datetime
  14. from kombu.utils import json
  15. from kombu.utils import cached_property
  16. from kombu.utils.encoding import safe_str
  17. from celery import signals
  18. from celery.datastructures import DictAttribute, force_mapping
  19. from celery.five import reraise, string_t
  20. from celery.utils.functional import maybe_list
  21. from celery.utils.imports import (
  22. import_from_cwd, symbol_by_name, NotAPackage, find_module,
  23. )
  24. __all__ = ['BaseLoader']
  25. _RACE_PROTECTION = False
  26. CONFIG_INVALID_NAME = """\
  27. Error: Module '{module}' doesn't exist, or it's not a valid \
  28. Python module name.
  29. """
  30. CONFIG_WITH_SUFFIX = CONFIG_INVALID_NAME + """\
  31. Did you mean '{suggest}'?
  32. """
  33. unconfigured = object()
  34. class BaseLoader(object):
  35. """The base class for loaders.
  36. Loaders handles,
  37. * Reading celery client/worker configurations.
  38. * What happens when a task starts?
  39. See :meth:`on_task_init`.
  40. * What happens when the worker starts?
  41. See :meth:`on_worker_init`.
  42. * What happens when the worker shuts down?
  43. See :meth:`on_worker_shutdown`.
  44. * What modules are imported to find tasks?
  45. """
  46. builtin_modules = frozenset()
  47. configured = False
  48. override_backends = {}
  49. worker_initialized = False
  50. _conf = unconfigured
  51. def __init__(self, app, **kwargs):
  52. self.app = app
  53. self.task_modules = set()
  54. def now(self, utc=True):
  55. if utc:
  56. return datetime.utcnow()
  57. return datetime.now()
  58. def on_task_init(self, task_id, task):
  59. """This method is called before a task is executed."""
  60. pass
  61. def on_process_cleanup(self):
  62. """This method is called after a task is executed."""
  63. pass
  64. def on_worker_init(self):
  65. """This method is called when the worker (:program:`celery worker`)
  66. starts."""
  67. pass
  68. def on_worker_shutdown(self):
  69. """This method is called when the worker (:program:`celery worker`)
  70. shuts down."""
  71. pass
  72. def on_worker_process_init(self):
  73. """This method is called when a child process starts."""
  74. pass
  75. def import_task_module(self, module):
  76. self.task_modules.add(module)
  77. return self.import_from_cwd(module)
  78. def import_module(self, module, package=None):
  79. return importlib.import_module(module, package=package)
  80. def import_from_cwd(self, module, imp=None, package=None):
  81. return import_from_cwd(
  82. module,
  83. self.import_module if imp is None else imp,
  84. package=package,
  85. )
  86. def import_default_modules(self):
  87. signals.import_modules.send(sender=self.app)
  88. return [
  89. self.import_task_module(m) for m in (
  90. tuple(self.builtin_modules) +
  91. tuple(maybe_list(self.app.conf.imports)) +
  92. tuple(maybe_list(self.app.conf.include))
  93. )
  94. ]
  95. def init_worker(self):
  96. if not self.worker_initialized:
  97. self.worker_initialized = True
  98. self.import_default_modules()
  99. self.on_worker_init()
  100. def shutdown_worker(self):
  101. self.on_worker_shutdown()
  102. def init_worker_process(self):
  103. self.on_worker_process_init()
  104. def config_from_object(self, obj, silent=False):
  105. if isinstance(obj, string_t):
  106. try:
  107. obj = self._smart_import(obj, imp=self.import_from_cwd)
  108. except (ImportError, AttributeError):
  109. if silent:
  110. return False
  111. raise
  112. self._conf = force_mapping(obj)
  113. return True
  114. def _smart_import(self, path, imp=None):
  115. imp = self.import_module if imp is None else imp
  116. if ':' in path:
  117. # Path includes attribute so can just jump here.
  118. # e.g. ``os.path:abspath``.
  119. return symbol_by_name(path, imp=imp)
  120. # Not sure if path is just a module name or if it includes an
  121. # attribute name (e.g. ``os.path``, vs, ``os.path.abspath``).
  122. try:
  123. return imp(path)
  124. except ImportError:
  125. # Not a module name, so try module + attribute.
  126. return symbol_by_name(path, imp=imp)
  127. def _import_config_module(self, name):
  128. try:
  129. self.find_module(name)
  130. except NotAPackage:
  131. if name.endswith('.py'):
  132. reraise(NotAPackage, NotAPackage(CONFIG_WITH_SUFFIX.format(
  133. module=name, suggest=name[:-3])), sys.exc_info()[2])
  134. reraise(NotAPackage, NotAPackage(CONFIG_INVALID_NAME.format(
  135. module=name)), sys.exc_info()[2])
  136. else:
  137. return self.import_from_cwd(name)
  138. def find_module(self, module):
  139. return find_module(module)
  140. def cmdline_config_parser(
  141. self, args, namespace='celery',
  142. re_type=re.compile(r'\((\w+)\)'),
  143. extra_types={'json': json.loads},
  144. override_types={'tuple': 'json',
  145. 'list': 'json',
  146. 'dict': 'json'}):
  147. from celery.app.defaults import Option, NAMESPACES
  148. namespace = namespace and namespace.lower()
  149. typemap = dict(Option.typemap, **extra_types)
  150. def getarg(arg):
  151. """Parse a single configuration definition from
  152. the command-line."""
  153. # ## find key/value
  154. # ns.key=value|ns_key=value (case insensitive)
  155. key, value = arg.split('=', 1)
  156. key = key.lower().replace('.', '_')
  157. # ## find namespace.
  158. # .key=value|_key=value expands to default namespace.
  159. if key[0] == '_':
  160. ns, key = namespace, key[1:]
  161. else:
  162. # find namespace part of key
  163. ns, key = key.split('_', 1)
  164. ns_key = (ns and ns + '_' or '') + key
  165. # (type)value makes cast to custom type.
  166. cast = re_type.match(value)
  167. if cast:
  168. type_ = cast.groups()[0]
  169. type_ = override_types.get(type_, type_)
  170. value = value[len(cast.group()):]
  171. value = typemap[type_](value)
  172. else:
  173. try:
  174. value = NAMESPACES[ns.lower()][key].to_python(value)
  175. except ValueError as exc:
  176. # display key name in error message.
  177. raise ValueError('{0!r}: {1}'.format(ns_key, exc))
  178. return ns_key, value
  179. return dict(getarg(arg) for arg in args)
  180. def mail_admins(self, subject, body, fail_silently=False,
  181. sender=None, to=None, host=None, port=None,
  182. user=None, password=None, timeout=None,
  183. use_ssl=False, use_tls=False, charset='us-ascii'):
  184. message = self.mail.Message(sender=sender, to=to,
  185. subject=safe_str(subject),
  186. body=safe_str(body),
  187. charset=charset)
  188. mailer = self.mail.Mailer(host=host, port=port,
  189. user=user, password=password,
  190. timeout=timeout, use_ssl=use_ssl,
  191. use_tls=use_tls)
  192. mailer.send(message, fail_silently=fail_silently)
  193. def read_configuration(self, env='CELERY_CONFIG_MODULE'):
  194. try:
  195. custom_config = os.environ[env]
  196. except KeyError:
  197. pass
  198. else:
  199. if custom_config:
  200. usercfg = self._import_config_module(custom_config)
  201. return DictAttribute(usercfg)
  202. def autodiscover_tasks(self, packages, related_name='tasks'):
  203. self.task_modules.update(
  204. mod.__name__ for mod in autodiscover_tasks(packages or (),
  205. related_name) if mod)
  206. @property
  207. def conf(self):
  208. """Loader configuration."""
  209. if self._conf is unconfigured:
  210. self._conf = self.read_configuration()
  211. return self._conf
  212. @cached_property
  213. def mail(self):
  214. return self.import_module('celery.utils.mail')
  215. def autodiscover_tasks(packages, related_name='tasks'):
  216. global _RACE_PROTECTION
  217. if _RACE_PROTECTION:
  218. return ()
  219. _RACE_PROTECTION = True
  220. try:
  221. return [find_related_module(pkg, related_name) for pkg in packages]
  222. finally:
  223. _RACE_PROTECTION = False
  224. def find_related_module(package, related_name):
  225. """Given a package name and a module name, tries to find that
  226. module."""
  227. # Django 1.7 allows for speciying a class name in INSTALLED_APPS.
  228. # (Issue #2248).
  229. try:
  230. importlib.import_module(package)
  231. except ImportError:
  232. package, _, _ = package.rpartition('.')
  233. if not package:
  234. raise
  235. try:
  236. pkg_path = importlib.import_module(package).__path__
  237. except AttributeError:
  238. return
  239. try:
  240. _imp.find_module(related_name, pkg_path)
  241. except ImportError:
  242. return
  243. return importlib.import_module('{0}.{1}'.format(package, related_name))