Browse Source

Move meta store to celery.backends and delay task now returns Job object.

Ask Solem 16 years ago
parent
commit
7f37744c4f

+ 55 - 0
celery/backends/__init__.py

@@ -0,0 +1,55 @@
+from functools import partial
+from django.conf import settings
+import sys
+
+DEFAULT_BACKEND = "database"
+CELERY_BACKEND = getattr(settings, "CELERY_BACKEND", DEFAULT_BACKEND)
+
+
+class BaseJob(object):
+
+    def __init__(self, task_id, backend):
+        self.task_id = task_id
+        self.backend = backend
+
+    def __str__(self):
+        return self.task_id
+
+    def __repr__(self):
+        return "<Job: %s>" % self.task_id
+
+    def is_done(self):
+        return self.backend.is_done(self.task_id)
+
+    def wait_for(self):
+        return self.backend.wait_for(self.task_id)
+
+    @property
+    def result(self):
+        if self.status == "DONE":
+            return self.backend.get_result(self.task_id)
+        return None
+
+    @property
+    def status(self):
+        return self.backend.get_status(self.task_id)
+
+
+
+def get_backend_cls(backend):
+    if backend.find(".") == -1:
+        backend = "celery.backends.%s" % backend
+    __import__(backend)
+    backend_module = sys.modules[backend]
+    return getattr(backend_module, 'Backend')
+
+get_default_backend_cls = partial(get_backend_cls, CELERY_BACKEND)
+DefaultBackend = get_default_backend_cls()
+default_backend = DefaultBackend()
+
+class Job(BaseJob):
+    
+    def __init__(self, task_id):
+        super(Job, self).__init__(task_id, backend=default_backend)
+
+

+ 46 - 0
celery/backends/base.py

@@ -0,0 +1,46 @@
+import time
+
+
+class TimeOutError(Exception):
+    """The operation has timed out."""
+
+
+class BaseBackend(object):
+
+    TimeOutError = TimeOutError
+    
+    def __init__(self):
+        pass
+
+    def mark_as_done(self, task_id, result):
+        raise NotImplementedError(
+                "Backends must implement the mark_as_done method")
+
+    def get_status(self, task_id):
+        raise NotImplementedError(
+                "Backends must implement the get_status method")
+
+    def prepare_result(self, result):
+        if result is None:
+            return True
+        return result
+        
+    def get_result(self, task_id):
+        raise NotImplementedError(
+                "Backends must implement the get_result method")
+
+    def is_done(self, task_id):
+        return self.get_status(task_id) == "DONE"
+
+    def cleanup(self):
+        pass
+
+    def wait_for(self, task_id, timeout=None):
+        time_start = time.time()
+        while True:
+            status = self.get_status(task_id)
+            if status == "DONE":
+                return self.get_result(task_id)
+            if timeout and time.time() > time_start + timeout:
+                raise self.TimeOutError(
+                        "Timed out while waiting for task %s" % (task_id))

+ 42 - 0
celery/backends/cache.py

@@ -0,0 +1,42 @@
+from django.core.cache import cache
+from celery.backends.base import BaseBackend
+try:
+    import cPickle as pickle
+except ImportError:
+    import pickle
+
+
+class Backend(BaseBackend):
+
+    def __init__(self, *args, **kwargs):
+        super(Backend, self).__init__(*args, **kwargs)
+        self._cache = {}
+
+    def _cache_key(self, task_id):
+        return "celery-task-meta-%s" % task_id
+
+    def mark_as_done(self, task_id, result):
+        """Mark task as done (executed)."""
+        result = self.prepare_result(result)
+        meta = {"status": "DONE", "result": pickle.dumps(result)}
+        cache.set(self._cache_key(task_id), meta)
+
+    def get_status(self, task_id):
+        return self._get_task_meta_for(self, task_id)["status"]
+
+    def get_result(self, task_id):
+        return self._get_task_meta_for(self, task_id)["result"]
+
+    def is_done(self, task_id):
+        return self.get_status(task_id) == "DONE"
+
+    def _get_task_meta_for(self, task_id):
+        if task_id in self._cache:
+            return self._cache[task_id]
+        meta = cache.get(self._cache_key(task_id))
+        if not meta:
+            return {"status": "PENDING", "result": None}
+        meta["result"] = pickle.loads(meta.get("result", None))
+        if meta.get("status") == "DONE":
+            self._cache[task_id] = meta
+        return meta

+ 37 - 0
celery/backends/database.py

@@ -0,0 +1,37 @@
+from celery.models import TaskMeta
+from celery.backends.base import BaseBackend
+
+
+class Backend(BaseBackend):
+
+    def __init__(self, *args, **kwargs):
+        super(Backend, self).__init__(*args, **kwargs)
+        self._cache = {}
+   
+    def mark_as_done(self, task_id, result):
+        """Mark task as done (executed)."""
+        result = self.prepare_result(result)
+        return TaskMeta.objects.mark_as_done(task_id, result)
+
+    def is_done(self, task_id):
+        """Returns ``True`` if task with ``task_id`` has been executed."""
+        return self.get_status(task_id) == "DONE"
+
+    def get_status(self, task_id):
+        """Get the status of a task."""
+        return self._get_task_meta_for(task_id).status
+
+    def get_result(self, task_id):
+        """Get the result for a task."""
+        return self._get_task_meta_for(task_id).result
+
+    def _get_task_meta_for(self, task_id):
+        if task_id in self._cache:
+            return self._cache[task_id]
+        meta = TaskMeta.objects.get_task(task_id)
+        if meta.status == "DONE":
+            self._cache[task_id] = meta
+        return meta
+
+    def cleanup(self):
+        TaskMeta.objects.delete_expired()

+ 64 - 0
celery/backends/tyrant.py

@@ -0,0 +1,64 @@
+from django.core.exceptions import ImproperlyConfigured
+
+try:
+    import pytyrant
+except ImportError:
+    raise ImproperlyConfigured(
+            "The Tokyo Tyrant backend requires the pytyrant library.")
+
+from celery.backends.base import BaseBackend
+from django.conf import settings
+from carrot.messaging import serialize, deserialize
+try:
+    import cPickle as pickle
+except ImportError:
+    import pickle
+
+TT_HOST = getattr(settings, "TT_HOST", None)
+TT_PORT = getattr(settings, "TT_PORT", None)
+
+if not TT_HOST or not TT_PORT:
+    raise ImproperlyConfigured(
+            "To use the Tokyo Tyrant backend, you have to "
+            "set the TT_HOST and TT_PORT settings in your settings.py")
+else:
+    def get_server():
+        return pytyrant.PyTyrant.open(TT_HOST, TT_PORT)
+    
+
+class Backend(BaseBackend):
+    """Tokyo Cabinet based task backend store."""
+
+    def __init__(self, *args, **kwargs):
+        super(Backend, self).__init__(*args, **kwargs)
+        self._cache = {}
+
+    def _cache_key(self, task_id):
+        return "celery-task-meta-%s" % task_id
+
+    def mark_as_done(self, task_id, result):
+        """Mark task as done (executed)."""
+        result = self.prepare_result(result)
+        meta = {"status": "DONE", "result": pickle.dumps(result)}
+        get_server()[self._cache_key(task_id)] = serialize(meta)
+
+    def get_status(self, task_id):
+        return self._get_task_meta_for(self, task_id)["status"]
+
+    def get_result(self, task_id):
+        return self._get_task_meta_for(self, task_id)["result"]
+
+    def is_done(self, task_id):
+        return self.get_status(task_id) == "DONE"
+
+    def _get_task_meta_for(self, task_id):
+        if task_id in self._cache:
+            return self._cache[task_id]
+        meta = get_server().get(self._cache_key(task_id))
+        if not meta:
+            return {"status": "PENDING", "result": None}
+        meta = deserialize(meta)
+        meta["result"] = pickle.loads(meta.get("result", None))
+        if meta.get("status") == "DONE":
+            self._cache[task_id] = meta
+        return meta

+ 8 - 30
celery/task.py

@@ -6,7 +6,7 @@ from celery.messaging import TaskPublisher, TaskConsumer
 from celery.models import TaskMeta
 from django.core.cache import cache
 from datetime import timedelta
-from celery.models import RetryTask
+from celery.backends import Job, default_backend
 import uuid
 import traceback
 
@@ -26,7 +26,7 @@ def delay_task(task_name, *args, **kwargs):
     publisher = TaskPublisher(connection=DjangoAMQPConnection())
     task_id = publisher.delay_task(task_name, *args, **kwargs)
     publisher.close()
-    return task_id
+    return Job(task_id)
 
 
 def discard_all():
@@ -44,35 +44,14 @@ def discard_all():
     return discarded_count
 
 
-def gen_task_done_cache_key(task_id):
-    """Generate a cache key for marking a task as done."""
-    return "celery-task-done-marker-%s" % task_id
-
-
 def mark_as_done(task_id, result):
-    """Mark task as done (executed).
-
-    if ``settings.TASK_META_USE_DB`` is ``True``, this will
-    use the :class:`celery.models.TaskMeta` model, if not memcached
-    is used.
-
-    """
-    if result is None:
-        result = True
-    if TASK_META_USE_DB:
-        TaskMeta.objects.mark_as_done(task_id)
-    else:
-        cache_key = gen_task_done_cache_key(task_id)
-        cache.set(cache_key, result)
+    """Mark task as done (executed)."""
+    return default_backend.mark_as_done(task_id, result)
 
 
 def is_done(task_id):
     """Returns ``True`` if task with ``task_id`` has been executed."""
-    if TASK_META_USE_DB:
-        return TaskMeta.objects.is_done(task_id)
-    else:
-        cache_key = gen_task_done_cache_key(task_id)
-        return bool(cache.get(cache_key))
+    return default_backend.is_done(task_id)
 
 
 class Task(object):
@@ -158,7 +137,7 @@ class Task(object):
         self.get_publisher().requeue_task(self.name, task_id, args, kwargs)
 
     def retry(self, task_id, args, kwargs):
-        RetryTask.objects.add(self.name, task_id, args, kwargs)
+        retry_queue.put(self.name, task_id, args, kwargs)
 
     def handle_exception(self, exception, retry_args, retry_kwargs):
         pass
@@ -282,6 +261,5 @@ class DeleteExpiredTaskMetaTask(PeriodicTask):
     def run(self, **kwargs):
         logger = self.get_logger(**kwargs)
         logger.info("Deleting expired task meta objects...")
-        TaskMeta.objects.delete_expired()
-if TASK_META_USE_DB:
-    tasks.register(DeleteExpiredTaskMetaTask)
+        default_backend.cleanup()
+tasks.register(DeleteExpiredTaskMetaTask)