فهرست منبع

Added experimental support for a "running" status on tasks

Rune Halvorsen 15 سال پیش
والد
کامیت
4fa458f23f
4فایلهای تغییر یافته به همراه20 افزوده شده و 3 حذف شده
  1. 5 0
      celery/backends/base.py
  2. 5 1
      celery/result.py
  3. 9 2
      celery/states.py
  4. 1 0
      celery/worker/job.py

+ 5 - 0
celery/backends/base.py

@@ -36,6 +36,11 @@ class BaseBackend(object):
         raise NotImplementedError(
         raise NotImplementedError(
                 "store_result is not supported by this backend.")
                 "store_result is not supported by this backend.")
 
 
+    def mark_as_running(self, task_id):
+        """Mark a task as currently running"""
+        print "Should mark as running now"
+        return self.store_result(task_id, None, status=states.RUNNING)
+
     def mark_as_done(self, task_id, result):
     def mark_as_done(self, task_id, result):
         """Mark task as successfully executed."""
         """Mark task as successfully executed."""
         return self.store_result(task_id, result, status=states.SUCCESS)
         return self.store_result(task_id, result, status=states.SUCCESS)

+ 5 - 1
celery/result.py

@@ -69,7 +69,7 @@ class BaseAsyncResult(object):
 
 
     def ready(self):
     def ready(self):
         """Returns ``True`` if the task executed successfully, or raised
         """Returns ``True`` if the task executed successfully, or raised
-        an exception. If the task is still pending, or is waiting for retry
+        an exception. If the task is still running, pending, or is waiting for retry
         then ``False`` is returned.
         then ``False`` is returned.
 
 
         :rtype: bool
         :rtype: bool
@@ -128,6 +128,10 @@ class BaseAsyncResult(object):
 
 
                 The task is waiting for execution.
                 The task is waiting for execution.
 
 
+            *RUNNING*
+
+                The task is executing
+
             *RETRY*
             *RETRY*
 
 
                 The task is to be retried, possibly because of failure.
                 The task is to be retried, possibly because of failure.

+ 9 - 2
celery/states.py

@@ -4,6 +4,10 @@
 
 
     Task is waiting for execution or unknown.
     Task is waiting for execution or unknown.
 
 
+.. data:: RUNNING
+
+    Task is curretly executing on a worker
+
 .. data:: SUCCESS
 .. data:: SUCCESS
 
 
     Task has been successfully executed.
     Task has been successfully executed.
@@ -18,6 +22,7 @@
 
 
 """
 """
 PENDING = "PENDING"
 PENDING = "PENDING"
+RUNNING = "RUNNING"
 SUCCESS = "SUCCESS"
 SUCCESS = "SUCCESS"
 FAILURE = "FAILURE"
 FAILURE = "FAILURE"
 RETRY = "RETRY"
 RETRY = "RETRY"
@@ -42,7 +47,9 @@ RETRY = "RETRY"
 
 
 """
 """
 READY_STATES = frozenset([SUCCESS, FAILURE])
 READY_STATES = frozenset([SUCCESS, FAILURE])
-UNREADY_STATES = frozenset([PENDING, RETRY])
+UNREADY_STATES = frozenset([PENDING, RUNNING, RETRY])
 EXCEPTION_STATES = frozenset([RETRY, FAILURE])
 EXCEPTION_STATES = frozenset([RETRY, FAILURE])
 
 
-ALL_STATES = frozenset([PENDING, SUCCESS, FAILURE, RETRY])
+ALL_STATES = frozenset([PENDING, RUNNING, SUCCESS, FAILURE, RETRY])
+
+

+ 1 - 0
celery/worker/job.py

@@ -100,6 +100,7 @@ class WorkerTaskTrace(TaskTrace):
         """Execute, trace and store the result of the task."""
         """Execute, trace and store the result of the task."""
         self.loader.on_task_init(self.task_id, self.task)
         self.loader.on_task_init(self.task_id, self.task)
         self.task.backend.process_cleanup()
         self.task.backend.process_cleanup()
+        self.task.backend.mark_as_running(self.task_id)
         return super(WorkerTaskTrace, self).execute()
         return super(WorkerTaskTrace, self).execute()
 
 
     def handle_success(self, retval, *args):
     def handle_success(self, retval, *args):