tyrant.py 3.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. """celery.backends.tyrant"""
  2. from django.core.exceptions import ImproperlyConfigured
  3. try:
  4. import pytyrant
  5. except ImportError:
  6. raise ImproperlyConfigured(
  7. "The Tokyo Tyrant backend requires the pytyrant library.")
  8. from celery.backends.base import BaseBackend
  9. from django.conf import settings
  10. from carrot.messaging import serialize, deserialize
  11. try:
  12. import cPickle as pickle
  13. except ImportError:
  14. import pickle
  15. class Backend(BaseBackend):
  16. """Tokyo Cabinet based task backend store.
  17. .. attribute:: tyrant_host
  18. The hostname to the Tokyo Tyrant server.
  19. .. attribute:: tyrant_port
  20. The port to the Tokyo Tyrant server.
  21. """
  22. tyrant_host = None
  23. tyrant_port = None
  24. def __init__(self, tyrant_host=None, tyrant_port=None):
  25. """Initialize Tokyo Tyrant backend instance.
  26. Raises :class:`django.core.exceptions.ImproperlyConfigured` if
  27. :setting:`TT_HOST` or :setting:`TT_PORT` is not set.
  28. """
  29. self.tyrant_host = kwargs.get("tyrant_host",
  30. getattr(settings, "TT_HOST", self.tyrant_host))
  31. self.tyrant_port = kwargs.get("tyrant_port",
  32. getattr(settings, "TT_PORT", self.tyrant_port))
  33. if not self.tyrant_host or not self.tyrant_port:
  34. raise ImproperlyConfigured(
  35. "To use the Tokyo Tyrant backend, you have to "
  36. "set the TT_HOST and TT_PORT settings in your settings.py")
  37. super(Backend, self).__init__(*args, **kwargs)
  38. self._cache = {}
  39. def get_server(self):
  40. """Get :class:`pytyrant.PyTyrant`` instance with the current
  41. server configuration."""
  42. return pytyrant.PyTyrant.open(self.tyrant_host, self.tyrant_port)
  43. def _cache_key(self, task_id):
  44. return "celery-task-meta-%s" % task_id
  45. def store_result(self, task_id, result, status):
  46. """Store task result and status."""
  47. result = self.prepare_result(result)
  48. meta = {"status": status, "result": pickle.dumps(result)}
  49. self.get_server()[self._cache_key(task_id)] = serialize(meta)
  50. def get_status(self, task_id):
  51. """Get the status for a task."""
  52. return self._get_task_meta_for(self, task_id)["status"]
  53. def get_result(self, task_id):
  54. """Get the result of a task."""
  55. return self._get_task_meta_for(self, task_id)["result"]
  56. def is_done(self, task_id):
  57. """Returns ``True`` if the task executed successfully."""
  58. return self.get_status(task_id) == "DONE"
  59. def _get_task_meta_for(self, task_id):
  60. if task_id in self._cache:
  61. return self._cache[task_id]
  62. meta = self.get_server().get(self._cache_key(task_id))
  63. if not meta:
  64. return {"status": "PENDING", "result": None}
  65. meta = deserialize(meta)
  66. meta["result"] = pickle.loads(meta.get("result", None))
  67. if meta.get("status") == "DONE":
  68. self._cache[task_id] = meta
  69. return meta