base.py 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.loaders.base
  4. ~~~~~~~~~~~~~~~~~~~
  5. Loader base class.
  6. """
  7. from __future__ import absolute_import
  8. import anyjson
  9. import imp
  10. import importlib
  11. import os
  12. import re
  13. import sys
  14. from datetime import datetime
  15. from kombu.utils import cached_property
  16. from kombu.utils.encoding import safe_str
  17. from celery.five import reraise, string_t
  18. from celery.utils.datastructures import DictAttribute
  19. from celery.utils.functional import maybe_list
  20. from celery.utils.imports import (
  21. import_from_cwd, symbol_by_name, NotAPackage, find_module,
  22. )
  23. _RACE_PROTECTION = False
  24. CONFIG_INVALID_NAME = """\
  25. Error: Module '{module}' doesn't exist, or it's not a valid \
  26. Python module name.
  27. """
  28. CONFIG_WITH_SUFFIX = CONFIG_INVALID_NAME + """\
  29. Did you mean '{suggest}'?
  30. """
  31. class BaseLoader(object):
  32. """The base class for loaders.
  33. Loaders handles,
  34. * Reading celery client/worker configurations.
  35. * What happens when a task starts?
  36. See :meth:`on_task_init`.
  37. * What happens when the worker starts?
  38. See :meth:`on_worker_init`.
  39. * What happens when the worker shuts down?
  40. See :meth:`on_worker_shutdown`.
  41. * What modules are imported to find tasks?
  42. """
  43. builtin_modules = frozenset()
  44. configured = False
  45. override_backends = {}
  46. worker_initialized = False
  47. _conf = None
  48. def __init__(self, app=None, **kwargs):
  49. from celery.app import app_or_default
  50. self.app = app_or_default(app)
  51. self.task_modules = set()
  52. def now(self, utc=True):
  53. if utc:
  54. return datetime.utcnow()
  55. return datetime.now()
  56. def on_task_init(self, task_id, task):
  57. """This method is called before a task is executed."""
  58. pass
  59. def on_process_cleanup(self):
  60. """This method is called after a task is executed."""
  61. pass
  62. def on_worker_init(self):
  63. """This method is called when the worker (:program:`celery worker`)
  64. starts."""
  65. pass
  66. def on_worker_shutdown(self):
  67. """This method is called when the worker (:program:`celery worker`)
  68. shuts down."""
  69. pass
  70. def on_worker_process_init(self):
  71. """This method is called when a child process starts."""
  72. pass
  73. def import_task_module(self, module):
  74. self.task_modules.add(module)
  75. return self.import_from_cwd(module)
  76. def import_module(self, module, package=None):
  77. return importlib.import_module(module, package=package)
  78. def import_from_cwd(self, module, imp=None, package=None):
  79. return import_from_cwd(
  80. module,
  81. self.import_module if imp is None else imp,
  82. package=package,
  83. )
  84. def import_default_modules(self):
  85. return [
  86. self.import_task_module(m) for m in (
  87. tuple(self.builtin_modules) +
  88. tuple(maybe_list(self.app.conf.CELERY_IMPORTS)) +
  89. tuple(maybe_list(self.app.conf.CELERY_INCLUDE))
  90. )
  91. ]
  92. def init_worker(self):
  93. if not self.worker_initialized:
  94. self.worker_initialized = True
  95. self.import_default_modules()
  96. self.on_worker_init()
  97. def shutdown_worker(self):
  98. self.on_worker_shutdown()
  99. def init_worker_process(self):
  100. self.on_worker_process_init()
  101. def config_from_object(self, obj, silent=False):
  102. if isinstance(obj, string_t):
  103. try:
  104. if '.' in obj:
  105. obj = symbol_by_name(obj, imp=self.import_from_cwd)
  106. else:
  107. obj = self.import_from_cwd(obj)
  108. except (ImportError, AttributeError):
  109. if silent:
  110. return False
  111. raise
  112. if not hasattr(obj, '__getitem__'):
  113. obj = DictAttribute(obj)
  114. self._conf = obj
  115. return True
  116. def _import_config_module(self, name):
  117. try:
  118. self.find_module(name)
  119. except NotAPackage:
  120. if name.endswith('.py'):
  121. reraise(NotAPackage, NotAPackage(CONFIG_WITH_SUFFIX.format(
  122. module=name, suggest=name[:-3])), sys.exc_info()[2])
  123. reraise(NotAPackage, NotAPackage(CONFIG_INVALID_NAME.format(
  124. module=name)), sys.exc_info()[2])
  125. else:
  126. return self.import_from_cwd(name)
  127. def find_module(self, module):
  128. return find_module(module)
  129. def cmdline_config_parser(
  130. self, args, namespace='celery',
  131. re_type=re.compile(r'\((\w+)\)'),
  132. extra_types={'json': anyjson.loads},
  133. override_types={'tuple': 'json',
  134. 'list': 'json',
  135. 'dict': 'json'}):
  136. from celery.app.defaults import Option, NAMESPACES
  137. namespace = namespace.upper()
  138. typemap = dict(Option.typemap, **extra_types)
  139. def getarg(arg):
  140. """Parse a single configuration definition from
  141. the command-line."""
  142. ## find key/value
  143. # ns.key=value|ns_key=value (case insensitive)
  144. key, value = arg.split('=', 1)
  145. key = key.upper().replace('.', '_')
  146. ## find namespace.
  147. # .key=value|_key=value expands to default namespace.
  148. if key[0] == '_':
  149. ns, key = namespace, key[1:]
  150. else:
  151. # find namespace part of key
  152. ns, key = key.split('_', 1)
  153. ns_key = (ns and ns + '_' or '') + key
  154. # (type)value makes cast to custom type.
  155. cast = re_type.match(value)
  156. if cast:
  157. type_ = cast.groups()[0]
  158. type_ = override_types.get(type_, type_)
  159. value = value[len(cast.group()):]
  160. value = typemap[type_](value)
  161. else:
  162. try:
  163. value = NAMESPACES[ns][key].to_python(value)
  164. except ValueError as exc:
  165. # display key name in error message.
  166. raise ValueError('{0!r}: {1}'.format(ns_key, exc))
  167. return ns_key, value
  168. return dict(getarg(arg) for arg in args)
  169. def mail_admins(self, subject, body, fail_silently=False,
  170. sender=None, to=None, host=None, port=None,
  171. user=None, password=None, timeout=None,
  172. use_ssl=False, use_tls=False):
  173. message = self.mail.Message(sender=sender, to=to,
  174. subject=safe_str(subject),
  175. body=safe_str(body))
  176. mailer = self.mail.Mailer(host=host, port=port,
  177. user=user, password=password,
  178. timeout=timeout, use_ssl=use_ssl,
  179. use_tls=use_tls)
  180. mailer.send(message, fail_silently=fail_silently)
  181. def read_configuration(self, env='CELERY_CONFIG_MODULE'):
  182. try:
  183. custom_config = os.environ[env]
  184. except KeyError:
  185. pass
  186. else:
  187. usercfg = self._import_config_module(custom_config)
  188. return DictAttribute(usercfg)
  189. return {}
  190. def autodiscover_tasks(self, packages, related_name='tasks'):
  191. self.task_modules.update(
  192. mod.__name__ for mod in autodiscover_tasks(packages,
  193. related_name) if mod)
  194. @property
  195. def conf(self):
  196. """Loader configuration."""
  197. if self._conf is None:
  198. self._conf = self.read_configuration()
  199. return self._conf
  200. @cached_property
  201. def mail(self):
  202. return self.import_module('celery.utils.mail')
  203. def autodiscover_tasks(packages, related_name='tasks'):
  204. global _RACE_PROTECTION
  205. if _RACE_PROTECTION:
  206. return
  207. _RACE_PROTECTION = True
  208. try:
  209. return [find_related_module(pkg, related_name) for pkg in packages]
  210. finally:
  211. _RACE_PROTECTION = False
  212. def find_related_module(package, related_name):
  213. """Given a package name and a module name, tries to find that
  214. module."""
  215. try:
  216. pkg_path = importlib.import_module(package).__path__
  217. except AttributeError:
  218. return
  219. try:
  220. imp.find_module(related_name, pkg_path)
  221. except ImportError:
  222. return
  223. return importlib.import_module('{0}.{1}'.format(package, related_name))