registry.py 1.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061
  1. # -*- coding: utf-8 -*-
  2. """Registry of available tasks."""
  3. import inspect
  4. from importlib import import_module
  5. from celery._state import get_current_app
  6. from celery.exceptions import NotRegistered
  7. __all__ = ['TaskRegistry']
  8. class TaskRegistry(dict):
  9. NotRegistered = NotRegistered
  10. def __missing__(self, key):
  11. raise self.NotRegistered(key)
  12. def register(self, task):
  13. """Register a task in the task registry.
  14. The task will be automatically instantiated if not already an
  15. instance.
  16. """
  17. self[task.name] = inspect.isclass(task) and task() or task
  18. def unregister(self, name):
  19. """Unregister task by name.
  20. Arguments:
  21. name (str): name of the task to unregister, or a
  22. :class:`celery.task.base.Task` with a valid `name` attribute.
  23. Raises:
  24. celery.exceptions.NotRegistered: if the task is not registered.
  25. """
  26. try:
  27. self.pop(getattr(name, 'name', name))
  28. except KeyError:
  29. raise self.NotRegistered(name)
  30. # -- these methods are irrelevant now and will be removed in 4.0
  31. def regular(self):
  32. return self.filter_types('regular')
  33. def periodic(self):
  34. return self.filter_types('periodic')
  35. def filter_types(self, type):
  36. return {name: task for name, task in self.items()
  37. if getattr(task, 'type', 'regular') == type}
  38. def _unpickle_task(name):
  39. return get_current_app().tasks[name]
  40. def _unpickle_task_v2(name, module=None):
  41. if module:
  42. import_module(module)
  43. return get_current_app().tasks[name]