base.py 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.loaders.base
  4. ~~~~~~~~~~~~~~~~~~~
  5. Loader base class.
  6. """
  7. from __future__ import absolute_import
  8. import anyjson
  9. import importlib
  10. import os
  11. import re
  12. import sys
  13. from datetime import datetime
  14. from kombu.utils import cached_property
  15. from kombu.utils.encoding import safe_str
  16. from celery.datastructures import DictAttribute
  17. from celery.exceptions import ImproperlyConfigured
  18. from celery.utils.imports import (
  19. import_from_cwd, symbol_by_name, NotAPackage, find_module,
  20. )
  21. from celery.utils.functional import maybe_list
  22. ERROR_ENVVAR_NOT_SET = """\
  23. The environment variable %r is not set,
  24. and as such the configuration could not be loaded.
  25. Please set this variable and make it point to
  26. a configuration module."""
  27. CONFIG_INVALID_NAME = """
  28. Error: Module '%(module)s' doesn't exist, or it's not a valid \
  29. Python module name.
  30. """
  31. CONFIG_WITH_SUFFIX = CONFIG_INVALID_NAME + """
  32. Did you mean '%(suggest)s'?
  33. """
  34. class BaseLoader(object):
  35. """The base class for loaders.
  36. Loaders handles,
  37. * Reading celery client/worker configurations.
  38. * What happens when a task starts?
  39. See :meth:`on_task_init`.
  40. * What happens when the worker starts?
  41. See :meth:`on_worker_init`.
  42. * What happens when the worker shuts down?
  43. See :meth:`on_worker_shutdown`.
  44. * What modules are imported to find tasks?
  45. """
  46. builtin_modules = frozenset()
  47. configured = False
  48. error_envvar_not_set = ERROR_ENVVAR_NOT_SET
  49. override_backends = {}
  50. worker_initialized = False
  51. _conf = None
  52. def __init__(self, app=None, **kwargs):
  53. from celery.app import app_or_default
  54. self.app = app_or_default(app)
  55. self.task_modules = set()
  56. def now(self, utc=True):
  57. if utc:
  58. return datetime.utcnow()
  59. return datetime.now()
  60. def on_task_init(self, task_id, task):
  61. """This method is called before a task is executed."""
  62. pass
  63. def on_process_cleanup(self):
  64. """This method is called after a task is executed."""
  65. pass
  66. def on_worker_init(self):
  67. """This method is called when the worker (:program:`celery worker`)
  68. starts."""
  69. pass
  70. def on_worker_shutdown(self):
  71. """This method is called when the worker (:program:`celery worker`)
  72. shuts down."""
  73. pass
  74. def on_worker_process_init(self):
  75. """This method is called when a child process starts."""
  76. pass
  77. def import_task_module(self, module):
  78. self.task_modules.add(module)
  79. return self.import_from_cwd(module)
  80. def import_module(self, module, package=None):
  81. return importlib.import_module(module, package=package)
  82. def import_from_cwd(self, module, imp=None, package=None):
  83. return import_from_cwd(
  84. module,
  85. self.import_module if imp is None else imp,
  86. package=package,
  87. )
  88. def import_default_modules(self):
  89. return [
  90. self.import_task_module(m) for m in (
  91. tuple(self.builtin_modules) +
  92. tuple(maybe_list(self.app.conf.CELERY_IMPORTS)) +
  93. tuple(maybe_list(self.app.conf.CELERY_INCLUDE))
  94. )
  95. ]
  96. def init_worker(self):
  97. if not self.worker_initialized:
  98. self.worker_initialized = True
  99. self.import_default_modules()
  100. self.on_worker_init()
  101. def shutdown_worker(self):
  102. self.on_worker_shutdown()
  103. def init_worker_process(self):
  104. self.on_worker_process_init()
  105. def config_from_envvar(self, variable_name, silent=False):
  106. module_name = os.environ.get(variable_name)
  107. if not module_name:
  108. if silent:
  109. return False
  110. raise ImproperlyConfigured(self.error_envvar_not_set % module_name)
  111. return self.config_from_object(module_name, silent=silent)
  112. def config_from_object(self, obj, silent=False):
  113. if isinstance(obj, basestring):
  114. try:
  115. if '.' in obj:
  116. obj = symbol_by_name(obj, imp=self.import_from_cwd)
  117. else:
  118. obj = self.import_from_cwd(obj)
  119. except (ImportError, AttributeError):
  120. if silent:
  121. return False
  122. raise
  123. if not hasattr(obj, '__getitem__'):
  124. obj = DictAttribute(obj)
  125. self._conf = obj
  126. return True
  127. def _import_config_module(self, name):
  128. try:
  129. self.find_module(name)
  130. except NotAPackage:
  131. if name.endswith('.py'):
  132. raise NotAPackage, NotAPackage(CONFIG_WITH_SUFFIX % {
  133. 'module': name, 'suggest': name[:-3]}), sys.exc_info()[2]
  134. raise NotAPackage, NotAPackage(
  135. CONFIG_INVALID_NAME % {'module': name}), sys.exc_info()[2]
  136. else:
  137. return self.import_from_cwd(name)
  138. def find_module(self, module):
  139. return find_module(module)
  140. def cmdline_config_parser(
  141. self, args, namespace='celery',
  142. re_type=re.compile(r'\((\w+)\)'),
  143. extra_types={'json': anyjson.loads},
  144. override_types={'tuple': 'json',
  145. 'list': 'json',
  146. 'dict': 'json'}):
  147. from celery.app.defaults import Option, NAMESPACES
  148. namespace = namespace.upper()
  149. typemap = dict(Option.typemap, **extra_types)
  150. def getarg(arg):
  151. """Parse a single configuration definition from
  152. the command line."""
  153. ## find key/value
  154. # ns.key=value|ns_key=value (case insensitive)
  155. key, value = arg.split('=', 1)
  156. key = key.upper().replace('.', '_')
  157. ## find namespace.
  158. # .key=value|_key=value expands to default namespace.
  159. if key[0] == '_':
  160. ns, key = namespace, key[1:]
  161. else:
  162. # find namespace part of key
  163. ns, key = key.split('_', 1)
  164. ns_key = (ns and ns + '_' or '') + key
  165. # (type)value makes cast to custom type.
  166. cast = re_type.match(value)
  167. if cast:
  168. type_ = cast.groups()[0]
  169. type_ = override_types.get(type_, type_)
  170. value = value[len(cast.group()):]
  171. value = typemap[type_](value)
  172. else:
  173. try:
  174. value = NAMESPACES[ns][key].to_python(value)
  175. except ValueError, exc:
  176. # display key name in error message.
  177. raise ValueError('%r: %s' % (ns_key, exc))
  178. return ns_key, value
  179. return dict(map(getarg, args))
  180. def mail_admins(self, subject, body, fail_silently=False,
  181. sender=None, to=None, host=None, port=None,
  182. user=None, password=None, timeout=None,
  183. use_ssl=False, use_tls=False):
  184. message = self.mail.Message(sender=sender, to=to,
  185. subject=safe_str(subject),
  186. body=safe_str(body))
  187. mailer = self.mail.Mailer(host=host, port=port,
  188. user=user, password=password,
  189. timeout=timeout, use_ssl=use_ssl,
  190. use_tls=use_tls)
  191. mailer.send(message, fail_silently=fail_silently)
  192. def read_configuration(self):
  193. try:
  194. custom_config = os.environ['CELERY_CONFIG_MODULE']
  195. except KeyError:
  196. pass
  197. else:
  198. usercfg = self._import_config_module(custom_config)
  199. return DictAttribute(usercfg)
  200. return {}
  201. @property
  202. def conf(self):
  203. """Loader configuration."""
  204. if self._conf is None:
  205. self._conf = self.read_configuration()
  206. return self._conf
  207. @cached_property
  208. def mail(self):
  209. return self.import_module('celery.utils.mail')