annotations.py 1.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243
  1. from __future__ import absolute_import
  2. from celery.utils.functional import firstmethod, mpromise
  3. from celery.utils.imports import instantiate
  4. _first_match = firstmethod("annotate")
  5. _first_match_any = firstmethod("annotate_any")
  6. def resolve_all(anno, task):
  7. return filter(None, (_first_match(anno, task), _first_match_any(anno)))
  8. class MapAnnotation(dict):
  9. def annotate_any(self):
  10. try:
  11. return dict(self["*"])
  12. except KeyError:
  13. pass
  14. def annotate(self, task):
  15. try:
  16. return dict(self[task.name])
  17. except KeyError:
  18. pass
  19. def prepare(annotations):
  20. """Expands the :setting:`CELERY_ANNOTATIONS` setting."""
  21. def expand_annotation(annotation):
  22. if isinstance(annotation, dict):
  23. return MapAnnotation(annotation)
  24. elif isinstance(annotation, basestring):
  25. return mpromise(instantiate, annotation)
  26. return annotation
  27. if annotations is None:
  28. return ()
  29. elif not isinstance(annotations, (list, tuple)):
  30. annotations = (annotations, )
  31. return map(expand_annotation, annotations)