base.py 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182
  1. from __future__ import absolute_import
  2. import importlib
  3. import os
  4. import re
  5. import warnings
  6. from anyjson import deserialize
  7. from kombu.utils import cached_property
  8. from celery.datastructures import DictAttribute
  9. from celery.exceptions import ImproperlyConfigured
  10. from celery.utils import get_cls_by_name
  11. from celery.utils import import_from_cwd as _import_from_cwd
  12. BUILTIN_MODULES = frozenset(["celery.task"])
  13. ERROR_ENVVAR_NOT_SET = (
  14. """The environment variable %r is not set,
  15. and as such the configuration could not be loaded.
  16. Please set this variable and make it point to
  17. a configuration module.""")
  18. class BaseLoader(object):
  19. """The base class for loaders.
  20. Loaders handles,
  21. * Reading celery client/worker configurations.
  22. * What happens when a task starts?
  23. See :meth:`on_task_init`.
  24. * What happens when the worker starts?
  25. See :meth:`on_worker_init`.
  26. * What modules are imported to find tasks?
  27. """
  28. builtin_modules = BUILTIN_MODULES
  29. configured = False
  30. error_envvar_not_set = ERROR_ENVVAR_NOT_SET
  31. override_backends = {}
  32. worker_initialized = False
  33. _conf = None
  34. def __init__(self, app=None, **kwargs):
  35. from celery.app import app_or_default
  36. self.app = app_or_default(app)
  37. def on_task_init(self, task_id, task):
  38. """This method is called before a task is executed."""
  39. pass
  40. def on_process_cleanup(self):
  41. """This method is called after a task is executed."""
  42. pass
  43. def on_worker_init(self):
  44. """This method is called when the worker (:program:`celeryd`)
  45. starts."""
  46. pass
  47. def import_task_module(self, module):
  48. return self.import_from_cwd(module)
  49. def import_module(self, module):
  50. return importlib.import_module(module)
  51. def import_from_cwd(self, module, imp=None):
  52. return _import_from_cwd(module,
  53. self.import_module if imp is None else imp)
  54. def import_default_modules(self):
  55. imports = set(list(self.conf.get("CELERY_IMPORTS") or ()))
  56. return [self.import_task_module(module)
  57. for module in imports | self.builtin_modules]
  58. def init_worker(self):
  59. if not self.worker_initialized:
  60. self.worker_initialized = True
  61. self.on_worker_init()
  62. def config_from_envvar(self, variable_name, silent=False):
  63. module_name = os.environ.get(variable_name)
  64. if not module_name:
  65. if silent:
  66. return False
  67. raise ImproperlyConfigured(self.error_envvar_not_set % module_name)
  68. return self.config_from_object(module_name, silent=silent)
  69. def config_from_object(self, obj, silent=False):
  70. if isinstance(obj, basestring):
  71. try:
  72. if "." in obj:
  73. obj = get_cls_by_name(obj, imp=self.import_from_cwd)
  74. else:
  75. obj = self.import_from_cwd(obj)
  76. except (ImportError, AttributeError):
  77. if silent:
  78. return False
  79. raise
  80. if not hasattr(obj, "__getitem__"):
  81. obj = DictAttribute(obj)
  82. self._conf = obj
  83. return True
  84. def cmdline_config_parser(self, args, namespace="celery",
  85. re_type=re.compile(r"\((\w+)\)"),
  86. extra_types={"json": deserialize},
  87. override_types={"tuple": "json",
  88. "list": "json",
  89. "dict": "json"}):
  90. from celery.app.defaults import Option, NAMESPACES
  91. namespace = namespace.upper()
  92. typemap = dict(Option.typemap, **extra_types)
  93. def getarg(arg):
  94. """Parse a single configuration definition from
  95. the command line."""
  96. ## find key/value
  97. # ns.key=value|ns_key=value (case insensitive)
  98. key, value = arg.split('=', 1)
  99. key = key.upper().replace(".", "_")
  100. ## find namespace.
  101. # .key=value|_key=value expands to default namespace.
  102. if key[0] == '_':
  103. ns, key = namespace, key[1:]
  104. else:
  105. # find namespace part of key
  106. ns, key = key.split('_', 1)
  107. ns_key = (ns and ns + "_" or "") + key
  108. # (type)value makes cast to custom type.
  109. cast = re_type.match(value)
  110. if cast:
  111. type_ = cast.groups()[0]
  112. type_ = override_types.get(type_, type_)
  113. value = value[len(cast.group()):]
  114. value = typemap[type_](value)
  115. else:
  116. try:
  117. value = NAMESPACES[ns][key].to_python(value)
  118. except ValueError, exc:
  119. # display key name in error message.
  120. raise ValueError("%r: %s" % (ns_key, exc))
  121. return ns_key, value
  122. return dict(map(getarg, args))
  123. def mail_admins(self, subject, body, fail_silently=False,
  124. sender=None, to=None, host=None, port=None,
  125. user=None, password=None, timeout=None, use_ssl=False):
  126. try:
  127. message = self.mail.Message(sender=sender, to=to,
  128. subject=subject, body=body)
  129. mailer = self.mail.Mailer(host=host, port=port,
  130. user=user, password=password,
  131. timeout=timeout, use_ssl=use_ssl)
  132. mailer.send(message)
  133. except Exception, exc:
  134. if not fail_silently:
  135. raise
  136. warnings.warn(self.mail.SendmailWarning(
  137. "Mail could not be sent: %r %r" % (
  138. exc, {"To": to, "Subject": subject})))
  139. @property
  140. def conf(self):
  141. """Loader configuration."""
  142. if self._conf is None:
  143. self._conf = self.read_configuration()
  144. return self._conf
  145. @cached_property
  146. def mail(self):
  147. return self.import_module("celery.utils.mail")