base.py 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384
  1. import os
  2. import sys
  3. from importlib import import_module
  4. BUILTIN_MODULES = ["celery.task"]
  5. class BaseLoader(object):
  6. """The base class for loaders.
  7. Loaders handles to following things:
  8. * Reading celery client/worker configurations.
  9. * What happens when a task starts?
  10. See :meth:`on_task_init`.
  11. * What happens when the worker starts?
  12. See :meth:`on_worker_init`.
  13. * What modules are imported to find tasks?
  14. """
  15. _conf_cache = None
  16. worker_initialized = False
  17. override_backends = {}
  18. configured = False
  19. def on_task_init(self, task_id, task):
  20. """This method is called before a task is executed."""
  21. pass
  22. def on_process_cleanup(self):
  23. """This method is called after a task is executed."""
  24. pass
  25. def on_worker_init(self):
  26. """This method is called when the worker (``celeryd``) starts."""
  27. pass
  28. def import_task_module(self, module):
  29. return self.import_from_cwd(module)
  30. def import_module(self, module):
  31. return import_module(module)
  32. def import_default_modules(self):
  33. imports = getattr(self.conf, "CELERY_IMPORTS", None) or []
  34. imports = set(list(imports) + BUILTIN_MODULES)
  35. return map(self.import_task_module, imports)
  36. def init_worker(self):
  37. if not self.worker_initialized:
  38. self.worker_initialized = True
  39. self.on_worker_init()
  40. def import_from_cwd(self, module, imp=None):
  41. """Import module, but make sure it finds modules
  42. located in the current directory.
  43. Modules located in the current directory has
  44. precedence over modules located in ``sys.path``.
  45. """
  46. if imp is None:
  47. imp = self.import_module
  48. cwd = os.getcwd()
  49. if cwd in sys.path:
  50. return imp(module)
  51. sys.path.insert(0, cwd)
  52. try:
  53. return imp(module)
  54. finally:
  55. try:
  56. sys.path.remove(cwd)
  57. except ValueError: # pragma: no cover
  58. pass
  59. @property
  60. def conf(self):
  61. """Loader configuration."""
  62. if not self._conf_cache:
  63. self._conf_cache = self.read_configuration()
  64. return self._conf_cache