base.py 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.loaders.base
  4. ~~~~~~~~~~~~~~~~~~~
  5. Loader base class.
  6. :copyright: (c) 2009 - 2011 by Ask Solem.
  7. :license: BSD, see LICENSE for more details.
  8. """
  9. from __future__ import absolute_import
  10. import importlib
  11. import os
  12. import re
  13. import warnings
  14. from anyjson import deserialize
  15. from ..datastructures import DictAttribute
  16. from ..exceptions import ImproperlyConfigured
  17. from ..utils import (cached_property, get_cls_by_name,
  18. import_from_cwd as _import_from_cwd)
  19. from ..utils.functional import maybe_list
  20. BUILTIN_MODULES = frozenset(["celery.task"])
  21. ERROR_ENVVAR_NOT_SET = (
  22. """The environment variable %r is not set,
  23. and as such the configuration could not be loaded.
  24. Please set this variable and make it point to
  25. a configuration module.""")
  26. class BaseLoader(object):
  27. """The base class for loaders.
  28. Loaders handles,
  29. * Reading celery client/worker configurations.
  30. * What happens when a task starts?
  31. See :meth:`on_task_init`.
  32. * What happens when the worker starts?
  33. See :meth:`on_worker_init`.
  34. * What modules are imported to find tasks?
  35. """
  36. builtin_modules = BUILTIN_MODULES
  37. configured = False
  38. error_envvar_not_set = ERROR_ENVVAR_NOT_SET
  39. override_backends = {}
  40. worker_initialized = False
  41. _conf = None
  42. def __init__(self, app=None, **kwargs):
  43. from ..app import app_or_default
  44. self.app = app_or_default(app)
  45. def on_task_init(self, task_id, task):
  46. """This method is called before a task is executed."""
  47. pass
  48. def on_process_cleanup(self):
  49. """This method is called after a task is executed."""
  50. pass
  51. def on_worker_init(self):
  52. """This method is called when the worker (:program:`celeryd`)
  53. starts."""
  54. pass
  55. def on_worker_process_init(self):
  56. """This method is called when a child process starts."""
  57. pass
  58. def import_task_module(self, module):
  59. return self.import_from_cwd(module)
  60. def import_module(self, module, package=None):
  61. return importlib.import_module(module, package=package)
  62. def import_from_cwd(self, module, imp=None, package=None):
  63. return _import_from_cwd(module,
  64. self.import_module if imp is None else imp,
  65. package=package)
  66. def import_default_modules(self):
  67. imports = set(maybe_list(self.conf.get("CELERY_IMPORTS") or ()))
  68. return [self.import_task_module(module)
  69. for module in imports | self.builtin_modules]
  70. def init_worker(self):
  71. if not self.worker_initialized:
  72. self.worker_initialized = True
  73. self.on_worker_init()
  74. def init_worker_process(self):
  75. self.on_worker_process_init()
  76. def config_from_envvar(self, variable_name, silent=False):
  77. module_name = os.environ.get(variable_name)
  78. if not module_name:
  79. if silent:
  80. return False
  81. raise ImproperlyConfigured(self.error_envvar_not_set % module_name)
  82. return self.config_from_object(module_name, silent=silent)
  83. def config_from_object(self, obj, silent=False):
  84. if isinstance(obj, basestring):
  85. try:
  86. if "." in obj:
  87. obj = get_cls_by_name(obj, imp=self.import_from_cwd)
  88. else:
  89. obj = self.import_from_cwd(obj)
  90. except (ImportError, AttributeError):
  91. if silent:
  92. return False
  93. raise
  94. if not hasattr(obj, "__getitem__"):
  95. obj = DictAttribute(obj)
  96. self._conf = obj
  97. return True
  98. def cmdline_config_parser(self, args, namespace="celery",
  99. re_type=re.compile(r"\((\w+)\)"),
  100. extra_types={"json": deserialize},
  101. override_types={"tuple": "json",
  102. "list": "json",
  103. "dict": "json"}):
  104. from ..app.defaults import Option, NAMESPACES
  105. namespace = namespace.upper()
  106. typemap = dict(Option.typemap, **extra_types)
  107. def getarg(arg):
  108. """Parse a single configuration definition from
  109. the command line."""
  110. ## find key/value
  111. # ns.key=value|ns_key=value (case insensitive)
  112. key, value = arg.split('=', 1)
  113. key = key.upper().replace(".", "_")
  114. ## find namespace.
  115. # .key=value|_key=value expands to default namespace.
  116. if key[0] == '_':
  117. ns, key = namespace, key[1:]
  118. else:
  119. # find namespace part of key
  120. ns, key = key.split('_', 1)
  121. ns_key = (ns and ns + "_" or "") + key
  122. # (type)value makes cast to custom type.
  123. cast = re_type.match(value)
  124. if cast:
  125. type_ = cast.groups()[0]
  126. type_ = override_types.get(type_, type_)
  127. value = value[len(cast.group()):]
  128. value = typemap[type_](value)
  129. else:
  130. try:
  131. value = NAMESPACES[ns][key].to_python(value)
  132. except ValueError, exc:
  133. # display key name in error message.
  134. raise ValueError("%r: %s" % (ns_key, exc))
  135. return ns_key, value
  136. return dict(map(getarg, args))
  137. def mail_admins(self, subject, body, fail_silently=False,
  138. sender=None, to=None, host=None, port=None,
  139. user=None, password=None, timeout=None,
  140. use_ssl=False, use_tls=False):
  141. try:
  142. message = self.mail.Message(sender=sender, to=to,
  143. subject=subject, body=body)
  144. mailer = self.mail.Mailer(host=host, port=port,
  145. user=user, password=password,
  146. timeout=timeout, use_ssl=use_ssl,
  147. use_tls=use_tls)
  148. mailer.send(message)
  149. except Exception, exc:
  150. if not fail_silently:
  151. raise
  152. warnings.warn(self.mail.SendmailWarning(
  153. "Mail could not be sent: %r %r" % (
  154. exc, {"To": to, "Subject": subject})))
  155. @property
  156. def conf(self):
  157. """Loader configuration."""
  158. if self._conf is None:
  159. self._conf = self.read_configuration()
  160. return self._conf
  161. @cached_property
  162. def mail(self):
  163. return self.import_module("celery.utils.mail")