瀏覽代碼

New constants for task status states in celery.states

Ask Solem 15 年之前
父節點
當前提交
727345ded6

+ 3 - 2
celery/backends/amqp.py

@@ -2,6 +2,7 @@
 from carrot.messaging import Consumer, Publisher
 
 from celery import conf
+from celery import states
 from celery.messaging import establish_connection
 from celery.backends.base import BaseBackend
 
@@ -79,7 +80,7 @@ class AMQPBackend(BaseBackend):
 
     def is_successful(self, task_id):
         """Returns ``True`` if task with ``task_id`` has been executed."""
-        return self.get_status(task_id) == "SUCCESS"
+        return self.get_status(task_id) == states.SUCCESS
 
     def get_status(self, task_id):
         """Get the status of a task."""
@@ -118,7 +119,7 @@ class AMQPBackend(BaseBackend):
     def get_result(self, task_id):
         """Get the result for a task."""
         result = self._get_task_meta_for(task_id)
-        if result["status"] == "FAILURE":
+        if result["status"] in states.EXCEPTION_STATES:
             return self.exception_to_python(result["result"])
         else:
             return result["result"]

+ 13 - 20
celery/backends/base.py

@@ -6,25 +6,22 @@ from billiard.serialization import get_pickled_exception
 from billiard.serialization import get_pickleable_exception
 
 from celery.exceptions import TimeoutError
-
-READY_STATES = frozenset(["SUCCESS", "FAILURE"])
-UNREADY_STATES = frozenset(["PENDING", "RETRY"])
-EXCEPTION_STATES = frozenset(["RETRY", "FAILURE"])
+from celery import states
 
 
 class BaseBackend(object):
     """The base backend class. All backends should inherit from this."""
 
-    READY_STATES = READY_STATES
-    UNREADY_STATES = UNREADY_STATES
-    EXCEPTION_STATES = EXCEPTION_STATES
+    READY_STATES = states.READY_STATES
+    UNREADY_STATES = states.UNREADY_STATES
+    EXCEPTION_STATES = states.EXCEPTION_STATES
 
     TimeoutError = TimeoutError
 
     capabilities = []
 
     def encode_result(self, result, status):
-        if status == "SUCCESS":
+        if status == states.SUCCESS:
             return self.prepare_value(result)
         elif status in self.EXCEPTION_STATES:
             return self.prepare_exception(result)
@@ -36,17 +33,17 @@ class BaseBackend(object):
 
     def mark_as_done(self, task_id, result):
         """Mark task as successfully executed."""
-        return self.store_result(task_id, result, status="SUCCESS")
+        return self.store_result(task_id, result, status=states.SUCCESS)
 
     def mark_as_failure(self, task_id, exc, traceback=None):
         """Mark task as executed with failure. Stores the execption."""
-        return self.store_result(task_id, exc, status="FAILURE",
+        return self.store_result(task_id, exc, status=states.FAILURE,
                                  traceback=traceback)
 
     def mark_as_retry(self, task_id, exc, traceback=None):
         """Mark task as being retries. Stores the current
         exception (if any)."""
-        return self.store_result(task_id, exc, status="RETRY",
+        return self.store_result(task_id, exc, status=states.RETRY,
                                  traceback=traceback)
 
     def prepare_exception(self, exc):
@@ -78,7 +75,7 @@ class BaseBackend(object):
 
     def is_successful(self, task_id):
         """Returns ``True`` if the task was successfully executed."""
-        return self.get_status(task_id) == "SUCCESS"
+        return self.get_status(task_id) == states.SUCCESS
 
     def cleanup(self):
         """Backend cleanup. Is run by
@@ -102,9 +99,9 @@ class BaseBackend(object):
 
         while True:
             status = self.get_status(task_id)
-            if status == "SUCCESS":
+            if status == states.SUCCESS:
                 return self.get_result(task_id)
-            elif status == "FAILURE":
+            elif status == states.FAILURE:
                 raise self.get_result(task_id)
             # avoid hammering the CPU checking status.
             time.sleep(sleep_inbetween)
@@ -169,18 +166,14 @@ class KeyValueStoreBackend(BaseBackend):
         meta = self._get_task_meta_for(task_id)
         return meta["traceback"]
 
-    def is_successful(self, task_id):
-        """Returns ``True`` if the task executed successfully."""
-        return self.get_status(task_id) == "SUCCESS"
-
     def _get_task_meta_for(self, task_id):
         """Get task metadata for a task by id."""
         if task_id in self._cache:
             return self._cache[task_id]
         meta = self.get(self.get_cache_key_for_task(task_id))
         if not meta:
-            return {"status": "PENDING", "result": None}
+            return {"status": states.PENDING, "result": None}
         meta = pickle.loads(str(meta))
-        if meta.get("status") == "SUCCESS":
+        if meta.get("status") == states.SUCCESS:
             self._cache[task_id] = meta
         return meta

+ 3 - 6
celery/backends/database.py

@@ -1,3 +1,4 @@
+from celery import states
 from celery.models import TaskMeta, TaskSetMeta
 from celery.backends.base import BaseBackend
 
@@ -23,10 +24,6 @@ class DatabaseBackend(BaseBackend):
         TaskSetMeta.objects.store_result(taskset_id, result)
         return result
 
-    def is_successful(self, task_id):
-        """Returns ``True`` if task with ``task_id`` has been executed."""
-        return self.get_status(task_id) == "SUCCESS"
-
     def get_status(self, task_id):
         """Get the status of a task."""
         return self._get_task_meta_for(task_id).status
@@ -38,7 +35,7 @@ class DatabaseBackend(BaseBackend):
     def get_result(self, task_id):
         """Get the result for a task."""
         meta = self._get_task_meta_for(task_id)
-        if meta.status == "FAILURE":
+        if meta.status in states.EXCEPTION_STATES:
             return self.exception_to_python(meta.result)
         else:
             return meta.result
@@ -48,7 +45,7 @@ class DatabaseBackend(BaseBackend):
         if task_id in self._cache:
             return self._cache[task_id]
         meta = TaskMeta.objects.get_task(task_id)
-        if meta.status == "SUCCESS":
+        if meta.status == states.SUCCESS:
             self._cache[task_id] = meta
         return meta
 

+ 6 - 9
celery/backends/mongodb.py

@@ -8,9 +8,10 @@ try:
 except ImportError:
     pymongo = None
 
+from celery import conf
+from celery import states
 from celery.backends.base import BaseBackend
 from celery.loaders import load_settings
-from celery.conf import TASK_RESULT_EXPIRES
 
 
 class Bunch:
@@ -113,10 +114,6 @@ class MongoBackend(BaseBackend):
 
         taskmeta_collection.save(meta, safe=True)
 
-    def is_successful(self, task_id):
-        """Returns ``True`` if the task executed successfully."""
-        return self.get_status(task_id) == "SUCCESS"
-
     def get_status(self, task_id):
         """Get status of a task."""
         return self._get_task_meta_for(task_id)["status"]
@@ -129,7 +126,7 @@ class MongoBackend(BaseBackend):
     def get_result(self, task_id):
         """Get the result for a task."""
         meta = self._get_task_meta_for(task_id)
-        if meta["status"] == "FAILURE":
+        if meta["status"] in states.EXCEPTION_STATES:
             return self.exception_to_python(meta["result"])
         else:
             return meta["result"]
@@ -143,7 +140,7 @@ class MongoBackend(BaseBackend):
         taskmeta_collection = db[self.mongodb_taskmeta_collection]
         obj = taskmeta_collection.find_one({"_id": task_id})
         if not obj:
-            return {"status": "PENDING", "result": None}
+            return {"status": states.PENDING, "result": None}
 
         meta = {
             "task_id": obj["_id"],
@@ -152,7 +149,7 @@ class MongoBackend(BaseBackend):
             "date_done": obj["date_done"],
             "traceback": pickle.loads(str(obj["traceback"])),
         }
-        if meta["status"] == "SUCCESS":
+        if meta["status"] == states.SUCCESS:
             self._cache[task_id] = meta
 
         return meta
@@ -163,6 +160,6 @@ class MongoBackend(BaseBackend):
         taskmeta_collection = db[self.mongodb_taskmeta_collection]
         taskmeta_collection.remove({
                 "date_done": {
-                    "$lt": datetime.now() - TASK_RESULT_EXPIRES,
+                    "$lt": datetime.now() - conf.TASK_RESULT_EXPIRES,
                  }
         })

+ 9 - 8
celery/execute/trace.py

@@ -1,6 +1,7 @@
 import sys
 import traceback
 
+from celery import states
 from celery import signals
 from celery.registry import tasks
 from celery.exceptions import RetryTaskError
@@ -8,7 +9,7 @@ from celery.datastructures import ExceptionInfo
 
 
 class TraceInfo(object):
-    def __init__(self, status="PENDING", retval=None, exc_info=None):
+    def __init__(self, status=states.PENDING, retval=None, exc_info=None):
         self.status = status
         self.retval = retval
         self.exc_info = exc_info
@@ -25,13 +26,13 @@ class TraceInfo(object):
         """Trace the execution of a function, calling the appropiate callback
         if the function raises retry, an failure or returned successfully."""
         try:
-            return cls("SUCCESS", retval=fun(*args, **kwargs))
+            return cls(states.SUCCESS, retval=fun(*args, **kwargs))
         except (SystemExit, KeyboardInterrupt):
             raise
         except RetryTaskError, exc:
-            return cls("RETRY", retval=exc, exc_info=sys.exc_info())
+            return cls(states.RETRY, retval=exc, exc_info=sys.exc_info())
         except Exception, exc:
-            return cls("FAILURE", retval=exc, exc_info=sys.exc_info())
+            return cls(states.FAILURE, retval=exc, exc_info=sys.exc_info())
 
 
 class TaskTrace(object):
@@ -42,11 +43,11 @@ class TaskTrace(object):
         self.args = args
         self.kwargs = kwargs
         self.task = task or tasks[self.task_name]
-        self.status = "PENDING"
+        self.status = states.PENDING
         self.strtb = None
-        self._trace_handlers = {"FAILURE": self.handle_failure,
-                                "RETRY": self.handle_retry,
-                                "SUCCESS": self.handle_success}
+        self._trace_handlers = {states.FAILURE: self.handle_failure,
+                                states.RETRY: self.handle_retry,
+                                states.SUCCESS: self.handle_success}
 
     def __call__(self):
         return self.execute()

+ 2 - 4
celery/managers.py

@@ -7,6 +7,8 @@ from django.db import models
 from django.db import transaction
 from django.db.models.query import QuerySet
 
+from celery import states
+
 
 def transaction_retry(max_retries=1):
     """Decorator for methods doing database operations.
@@ -96,10 +98,6 @@ class TaskManager(ResultManager):
         task, created = self.get_or_create(task_id=task_id)
         return task
 
-    def is_successful(self, task_id):
-        """Returns ``True`` if the task was executed successfully."""
-        return self.get_task(task_id).status == "SUCCESS"
-
     @transaction_retry(max_retries=2)
     def store_result(self, task_id, result, status, traceback=None):
         """Store the result and status of a task.

+ 3 - 8
celery/models.py

@@ -5,22 +5,17 @@ from django.utils.translation import ugettext_lazy as _
 from picklefield.fields import PickledObjectField
 
 from celery import conf
+from celery import states
 from celery.managers import TaskManager, TaskSetManager
 
-TASK_STATUS_PENDING = "PENDING"
-TASK_STATUS_RETRY = "RETRY"
-TASK_STATUS_FAILURE = "FAILURE"
-TASK_STATUS_SUCCESS = "SUCCESS"
-TASK_STATUSES = (TASK_STATUS_PENDING, TASK_STATUS_RETRY,
-                 TASK_STATUS_FAILURE, TASK_STATUS_SUCCESS)
-TASK_STATUSES_CHOICES = zip(TASK_STATUSES, TASK_STATUSES)
+TASK_STATUSES_CHOICES = zip(states.ALL_STATES, states.ALL_STATES)
 
 
 class TaskMeta(models.Model):
     """Task result/status."""
     task_id = models.CharField(_(u"task id"), max_length=255, unique=True)
     status = models.CharField(_(u"task status"), max_length=50,
-            default=TASK_STATUS_PENDING, choices=TASK_STATUSES_CHOICES)
+            default=states.PENDING, choices=TASK_STATUSES_CHOICES)
     result = PickledObjectField()
     date_done = models.DateTimeField(_(u"done at"), auto_now=True)
     traceback = models.TextField(_(u"traceback"), blank=True, null=True)

+ 8 - 7
celery/result.py

@@ -6,6 +6,7 @@ Asynchronous result types.
 import time
 from itertools import imap
 
+from celery import states
 from celery.utils import any, all
 from celery.backends import default_backend
 from celery.messaging import with_connection
@@ -256,10 +257,10 @@ class TaskSetResult(object):
                             for subtask in self.subtasks)
         while results:
             for task_id, pending_result in results.items():
-                if pending_result.status == "SUCCESS":
+                if pending_result.status == states.SUCCESS:
                     results.pop(task_id, None)
                     yield pending_result.result
-                elif pending_result.status == "FAILURE":
+                elif pending_result.status == states.FAILURE:
                     raise pending_result.result
 
     def join(self, timeout=None):
@@ -289,9 +290,9 @@ class TaskSetResult(object):
 
         while True:
             for position, pending_result in enumerate(self.subtasks):
-                if pending_result.status == "SUCCESS":
+                if pending_result.status == states.SUCCESS:
                     results[position] = pending_result.result
-                elif pending_result.status == "FAILURE":
+                elif pending_result.status == states.FAILURE:
                     raise pending_result.result
             if results.full():
                 # Make list copy, so the returned type is not a position
@@ -320,7 +321,7 @@ class EagerResult(BaseAsyncResult):
 
     def successful(self):
         """Returns ``True`` if the task executed without failure."""
-        return self.status == "SUCCESS"
+        return self.status == states.SUCCESS
 
     def ready(self):
         """Returns ``True`` if the task has been executed."""
@@ -328,9 +329,9 @@ class EagerResult(BaseAsyncResult):
 
     def wait(self, timeout=None):
         """Wait until the task has been executed and return its result."""
-        if self.status == "SUCCESS":
+        if self.status == states.SUCCESS:
             return self.result
-        elif self.status == "FAILURE":
+        elif self.status == states.FAILURE:
             raise self.result.exception
 
     def revoke(self):

+ 10 - 0
celery/states.py

@@ -0,0 +1,10 @@
+PENDING = "PENDING"
+SUCCESS = "SUCCESS"
+FAILURE = "FAILURE"
+RETRY = "RETRY"
+
+READY_STATES = frozenset([SUCCESS, FAILURE])
+UNREADY_STATES = frozenset([PENDING, RETRY])
+EXCEPTION_STATES = frozenset([RETRY, FAILURE])
+
+ALL_STATES = frozenset([PENDING, SUCCESS, FAILURE, RETRY])

+ 6 - 3
celery/tests/test_models.py

@@ -27,11 +27,14 @@ class TestModels(unittest.TestCase):
 
         self.assertEquals(TaskMeta.objects.get_task(m1.task_id).task_id,
                 m1.task_id)
-        self.assertFalse(TaskMeta.objects.is_successful(m1.task_id))
+        self.assertFalse(
+                TaskMeta.objects.get_task(m1.task_id).status == "SUCCESS")
         TaskMeta.objects.store_result(m1.task_id, True, status="SUCCESS")
         TaskMeta.objects.store_result(m2.task_id, True, status="SUCCESS")
-        self.assertTrue(TaskMeta.objects.is_successful(m1.task_id))
-        self.assertTrue(TaskMeta.objects.is_successful(m2.task_id))
+        self.assertTrue(
+                TaskMeta.objects.get_task(m1.task_id).status == "SUCCESS")
+        self.assertTrue(
+                TaskMeta.objects.get_task(m2.task_id).status == "SUCCESS")
 
         # Have to avoid save() because it applies the auto_now=True.
         TaskMeta.objects.filter(task_id=m1.task_id).update(