base.py 6.8 KB


  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.loaders.base
  4. ~~~~~~~~~~~~~~~~~~~
  5. Loader base class.
  6. """
  7. from __future__ import absolute_import
  8. import anyjson
  9. import importlib
  10. import os
  11. import re
  12. from datetime import datetime
  13. from kombu.utils.encoding import safe_str
  14. from celery.datastructures import DictAttribute
  15. from celery.exceptions import ImproperlyConfigured
  16. from celery.utils import cached_property
  17. from celery.utils.imports import import_from_cwd, symbol_by_name
  18. from celery.utils.functional import maybe_list
  19. BUILTIN_MODULES = frozenset()
  20. ERROR_ENVVAR_NOT_SET = (
  21. """The environment variable %r is not set,
  22. and as such the configuration could not be loaded.
  23. Please set this variable and make it point to
  24. a configuration module.""")
  25. class BaseLoader(object):
  26. """The base class for loaders.
  27. Loaders handles,
  28. * Reading celery client/worker configurations.
  29. * What happens when a task starts?
  30. See :meth:`on_task_init`.
  31. * What happens when the worker starts?
  32. See :meth:`on_worker_init`.
  33. * What happens when the worker shuts down?
  34. See :meth:`on_worker_shutdown`.
  35. * What modules are imported to find tasks?
  36. """
  37. builtin_modules = BUILTIN_MODULES
  38. configured = False
  39. error_envvar_not_set = ERROR_ENVVAR_NOT_SET
  40. override_backends = {}
  41. worker_initialized = False
  42. _conf = None
  43. def __init__(self, app=None, **kwargs):
  44. from celery.app import app_or_default
  45. self.app = app_or_default(app)
  46. self.task_modules = set()
  47. def now(self, utc=True):
  48. if utc:
  49. return datetime.utcnow()
  50. return datetime.now()
  51. def on_task_init(self, task_id, task):
  52. """This method is called before a task is executed."""
  53. pass
  54. def on_process_cleanup(self):
  55. """This method is called after a task is executed."""
  56. pass
  57. def on_worker_init(self):
  58. """This method is called when the worker (:program:`celery worker`)
  59. starts."""
  60. pass
  61. def on_worker_shutdown(self):
  62. """This method is called when the worker (:program:`celery worker`)
  63. shuts down."""
  64. pass
  65. def on_worker_process_init(self):
  66. """This method is called when a child process starts."""
  67. pass
  68. def import_task_module(self, module):
  69. self.task_modules.add(module)
  70. return self.import_from_cwd(module)
  71. def import_module(self, module, package=None):
  72. return importlib.import_module(module, package=package)
  73. def import_from_cwd(self, module, imp=None, package=None):
  74. return import_from_cwd(module,
  75. self.import_module if imp is None else imp,
  76. package=package)
  77. def import_default_modules(self):
  78. return [self.import_task_module(m)
  79. for m in set(maybe_list(self.app.conf.CELERY_IMPORTS))
  80. | set(maybe_list(self.app.conf.CELERY_INCLUDE))
  81. | self.builtin_modules]
  82. def init_worker(self):
  83. if not self.worker_initialized:
  84. self.worker_initialized = True
  85. self.import_default_modules()
  86. self.on_worker_init()
  87. def shutdown_worker(self):
  88. self.on_worker_shutdown()
  89. def init_worker_process(self):
  90. self.on_worker_process_init()
  91. def config_from_envvar(self, variable_name, silent=False):
  92. module_name = os.environ.get(variable_name)
  93. if not module_name:
  94. if silent:
  95. return False
  96. raise ImproperlyConfigured(self.error_envvar_not_set % module_name)
  97. return self.config_from_object(module_name, silent=silent)
  98. def config_from_object(self, obj, silent=False):
  99. if isinstance(obj, basestring):
  100. try:
  101. if '.' in obj:
  102. obj = symbol_by_name(obj, imp=self.import_from_cwd)
  103. else:
  104. obj = self.import_from_cwd(obj)
  105. except (ImportError, AttributeError):
  106. if silent:
  107. return False
  108. raise
  109. if not hasattr(obj, '__getitem__'):
  110. obj = DictAttribute(obj)
  111. self._conf = obj
  112. return True
  113. def cmdline_config_parser(self, args, namespace='celery',
  114. re_type=re.compile(r'\((\w+)\)'),
  115. extra_types={'json': anyjson.loads},
  116. override_types={'tuple': 'json',
  117. 'list': 'json',
  118. 'dict': 'json'}):
  119. from celery.app.defaults import Option, NAMESPACES
  120. namespace = namespace.upper()
  121. typemap = dict(Option.typemap, **extra_types)
  122. def getarg(arg):
  123. """Parse a single configuration definition from
  124. the command line."""
  125. ## find key/value
  126. # ns.key=value|ns_key=value (case insensitive)
  127. key, value = arg.split('=', 1)
  128. key = key.upper().replace('.', '_')
  129. ## find namespace.
  130. # .key=value|_key=value expands to default namespace.
  131. if key[0] == '_':
  132. ns, key = namespace, key[1:]
  133. else:
  134. # find namespace part of key
  135. ns, key = key.split('_', 1)
  136. ns_key = (ns and ns + '_' or '') + key
  137. # (type)value makes cast to custom type.
  138. cast = re_type.match(value)
  139. if cast:
  140. type_ = cast.groups()[0]
  141. type_ = override_types.get(type_, type_)
  142. value = value[len(cast.group()):]
  143. value = typemap[type_](value)
  144. else:
  145. try:
  146. value = NAMESPACES[ns][key].to_python(value)
  147. except ValueError as exc:
  148. # display key name in error message.
  149. raise ValueError('%r: %s' % (ns_key, exc))
  150. return ns_key, value
  151. return dict(map(getarg, args))
  152. def mail_admins(self, subject, body, fail_silently=False,
  153. sender=None, to=None, host=None, port=None,
  154. user=None, password=None, timeout=None,
  155. use_ssl=False, use_tls=False):
  156. message = self.mail.Message(sender=sender, to=to,
  157. subject=safe_str(subject),
  158. body=safe_str(body))
  159. mailer = self.mail.Mailer(host=host, port=port,
  160. user=user, password=password,
  161. timeout=timeout, use_ssl=use_ssl,
  162. use_tls=use_tls)
  163. mailer.send(message, fail_silently=fail_silently)
  164. def read_configuration(self):
  165. return {}
  166. @property
  167. def conf(self):
  168. """Loader configuration."""
  169. if self._conf is None:
  170. self._conf = self.read_configuration()
  171. return self._conf
  172. @cached_property
  173. def mail(self):
  174. return self.import_module('celery.utils.mail')