tyrant.py 3.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. """celery.backends.tyrant"""
  2. from django.core.exceptions import ImproperlyConfigured
  3. try:
  4. import pytyrant
  5. except ImportError:
  6. raise ImproperlyConfigured(
  7. "The Tokyo Tyrant backend requires the pytyrant library.")
  8. from celery.backends.base import BaseBackend
  9. from django.conf import settings
  10. from carrot.messaging import serialize, deserialize
  11. try:
  12. import cPickle as pickle
  13. except ImportError:
  14. import pickle
  15. class Backend(BaseBackend):
  16. """Tokyo Cabinet based task backend store.
  17. .. attribute:: tyrant_host
  18. The hostname to the Tokyo Tyrant server.
  19. .. attribute:: tyrant_port
  20. The port to the Tokyo Tyrant server.
  21. """
  22. tyrant_host = None
  23. tyrant_port = None
  24. capabilities = ["ResultStore"]
  25. def __init__(self, tyrant_host=None, tyrant_port=None):
  26. """Initialize Tokyo Tyrant backend instance.
  27. Raises :class:`django.core.exceptions.ImproperlyConfigured` if
  28. :setting:`TT_HOST` or :setting:`TT_PORT` is not set.
  29. """
  30. self.tyrant_host = tyrant_host or \
  31. getattr(settings, "TT_HOST", self.tyrant_host)
  32. self.tyrant_port = tyrant_port or \
  33. getattr(settings, "TT_PORT", self.tyrant_port)
  34. if not self.tyrant_host or not self.tyrant_port:
  35. raise ImproperlyConfigured(
  36. "To use the Tokyo Tyrant backend, you have to "
  37. "set the TT_HOST and TT_PORT settings in your settings.py")
  38. super(Backend, self).__init__()
  39. self._cache = {}
  40. def get_server(self):
  41. """Get :class:`pytyrant.PyTyrant`` instance with the current
  42. server configuration."""
  43. return pytyrant.PyTyrant.open(self.tyrant_host, self.tyrant_port)
  44. def _cache_key(self, task_id):
  45. return "celery-task-meta-%s" % task_id
  46. def store_result(self, task_id, result, status):
  47. """Store task result and status."""
  48. if status == "DONE":
  49. result = self.prepare_result(result)
  50. elif status == "FAILURE":
  51. result = self.prepare_exception(result)
  52. meta = {"status": status, "result": pickle.dumps(result)}
  53. self.get_server()[self._cache_key(task_id)] = serialize(meta)
  54. def get_status(self, task_id):
  55. """Get the status for a task."""
  56. return self._get_task_meta_for(task_id)["status"]
  57. def get_result(self, task_id):
  58. """Get the result of a task."""
  59. meta = self._get_task_meta_for(task_id)
  60. if meta["status"] == "FAILURE":
  61. return self.exception_to_python(meta["result"])
  62. else:
  63. return meta["result"]
  64. def is_done(self, task_id):
  65. """Returns ``True`` if the task executed successfully."""
  66. return self.get_status(task_id) == "DONE"
  67. def _get_task_meta_for(self, task_id):
  68. if task_id in self._cache:
  69. return self._cache[task_id]
  70. meta = self.get_server().get(self._cache_key(task_id))
  71. if not meta:
  72. return {"status": "PENDING", "result": None}
  73. meta = deserialize(meta)
  74. meta["result"] = pickle.loads(meta.get("result", None))
  75. if meta.get("status") == "DONE":
  76. self._cache[task_id] = meta
  77. return meta