registry.py 1.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. """celery.registry"""
  2. import inspect
  3. from celery.exceptions import NotRegistered
  4. from celery.utils.compat import UserDict
  5. class TaskRegistry(UserDict):
  6. NotRegistered = NotRegistered
  7. def __init__(self):
  8. self.data = {}
  9. def regular(self):
  10. """Get all regular task types."""
  11. return self.filter_types("regular")
  12. def periodic(self):
  13. """Get all periodic task types."""
  14. return self.filter_types("periodic")
  15. def register(self, task):
  16. """Register a task in the task registry.
  17. The task will be automatically instantiated if not already an
  18. instance.
  19. """
  20. task = inspect.isclass(task) and task() or task
  21. self.data[task.name] = task
  22. def unregister(self, name):
  23. """Unregister task by name.
  24. :param name: name of the task to unregister, or a
  25. :class:`celery.task.base.Task` with a valid `name` attribute.
  26. :raises celery.exceptions.NotRegistered: if the task has not
  27. been registered.
  28. """
  29. try:
  30. # Might be a task class
  31. name = name.name
  32. except AttributeError:
  33. pass
  34. self.pop(name)
  35. def filter_types(self, type):
  36. """Return all tasks of a specific type."""
  37. return dict((task_name, task) for task_name, task in self.data.items()
  38. if task.type == type)
  39. def __getitem__(self, key):
  40. try:
  41. return UserDict.__getitem__(self, key)
  42. except KeyError:
  43. raise self.NotRegistered(key)
  44. def pop(self, key, *args):
  45. try:
  46. return UserDict.pop(self, key, *args)
  47. except KeyError:
  48. raise self.NotRegistered(key)
  49. #: Global task registry.
  50. tasks = TaskRegistry()
  51. def _unpickle_task(name):
  52. return tasks[name]