Ver código fonte

Merge branch 'runeh/master'

Ask Solem 15 anos atrás
pai
commit
f0b3d5b78e

+ 5 - 0
celery/backends/base.py

@@ -36,6 +36,11 @@ class BaseBackend(object):
         raise NotImplementedError(
                 "store_result is not supported by this backend.")
 
+    def mark_as_running(self, task_id):
+        """Mark a task as currently running"""
+        print "Should mark as running now"
+        return self.store_result(task_id, None, status=states.RUNNING)
+
     def mark_as_done(self, task_id, result):
         """Mark task as successfully executed."""
         return self.store_result(task_id, result, status=states.SUCCESS)

+ 2 - 0
celery/conf.py

@@ -62,6 +62,7 @@ _DEFAULTS = {
     "CELERY_EVENT_ROUTING_KEY": "celeryevent",
     "CELERY_RESULT_EXCHANGE": "celeryresults",
     "CELERY_MAX_CACHED_RESULTS": 5000,
+    "CELERY_TRACK_RUNNING": False,
 }
 
 _DEPRECATION_FMT = """
@@ -91,6 +92,7 @@ CELERY_CACHE_BACKEND = _get("CELERY_CACHE_BACKEND")
 TASK_SERIALIZER = _get("CELERY_TASK_SERIALIZER")
 TASK_RESULT_EXPIRES = _get("CELERY_TASK_RESULT_EXPIRES")
 IGNORE_RESULT = _get("CELERY_IGNORE_RESULT")
+TRACK_RUNNING = _get("CELERY_TRACK_RUNNING")
 # Make sure TASK_RESULT_EXPIRES is a timedelta.
 if isinstance(TASK_RESULT_EXPIRES, int):
     TASK_RESULT_EXPIRES = timedelta(seconds=TASK_RESULT_EXPIRES)

+ 5 - 1
celery/result.py

@@ -69,7 +69,7 @@ class BaseAsyncResult(object):
 
     def ready(self):
         """Returns ``True`` if the task executed successfully, or raised
-        an exception. If the task is still pending, or is waiting for retry
+        an exception. If the task is still running, pending, or is waiting for retry
         then ``False`` is returned.
 
         :rtype: bool
@@ -128,6 +128,10 @@ class BaseAsyncResult(object):
 
                 The task is waiting for execution.
 
+            *RUNNING*
+
+                The task is executing
+
             *RETRY*
 
                 The task is to be retried, possibly because of failure.

+ 9 - 2
celery/states.py

@@ -4,6 +4,10 @@
 
     Task is waiting for execution or unknown.
 
+.. data:: RUNNING
+
+    Task is curretly executing on a worker
+
 .. data:: SUCCESS
 
     Task has been successfully executed.
@@ -18,6 +22,7 @@
 
 """
 PENDING = "PENDING"
+RUNNING = "RUNNING"
 SUCCESS = "SUCCESS"
 FAILURE = "FAILURE"
 RETRY = "RETRY"
@@ -42,7 +47,9 @@ RETRY = "RETRY"
 
 """
 READY_STATES = frozenset([SUCCESS, FAILURE])
-UNREADY_STATES = frozenset([PENDING, RETRY])
+UNREADY_STATES = frozenset([PENDING, RUNNING, RETRY])
 EXCEPTION_STATES = frozenset([RETRY, FAILURE])
 
-ALL_STATES = frozenset([PENDING, SUCCESS, FAILURE, RETRY])
+ALL_STATES = frozenset([PENDING, RUNNING, SUCCESS, FAILURE, RETRY])
+
+

+ 10 - 0
celery/task/base.py

@@ -153,6 +153,15 @@ class Task(object):
         If ``True`` the task is automatically registered in the task
         registry, which is the default behaviour.
 
+    .. attribute:: track_running
+        If ``True`` the task will report its status as "running" to the
+        broker when the task is executed by a worker.
+        The default value is ``False`` as the normal behaviour is to not
+        report that level of granularity. Tasks are either pending, finished,
+        or waiting to be retried. Having a "running" status can be useful for
+        when there are long running tasks and there is a need to report which
+        task is currently running.
+
 
     The resulting class is callable, which if called will apply the
     :meth:`run` method.
@@ -179,6 +188,7 @@ class Task(object):
     backend = default_backend
     exchange_type = conf.DEFAULT_EXCHANGE_TYPE
     delivery_mode = conf.DEFAULT_DELIVERY_MODE
+    track_running = conf.TRACK_RUNNING
 
     MaxRetriesExceededError = MaxRetriesExceededError
 

+ 2 - 0
celery/worker/job.py

@@ -100,6 +100,8 @@ class WorkerTaskTrace(TaskTrace):
         """Execute, trace and store the result of the task."""
         self.loader.on_task_init(self.task_id, self.task)
         self.task.backend.process_cleanup()
+        if self.task.track_running:
+            self.task.backend.mark_as_running(self.task_id)
         return super(WorkerTaskTrace, self).execute()
 
     def handle_success(self, retval, *args):

+ 1 - 2
examples/celery_http_gateway/urls.py

@@ -1,7 +1,6 @@
 from django.conf.urls.defaults import *
 
 from celery.task.builtins import PingTask
-from celery.views import task_view
 from celery import views as celery_views
 
 # Uncomment the next two lines to enable the admin:
@@ -10,7 +9,7 @@ from celery import views as celery_views
 
 urlpatterns = patterns("",
     url(r'^apply/(?P<task_name>.+?)/', celery_views.apply),
-    url(r'^ping/', task_view(PingTask)),
+    url(r'^ping/', celery_views.task_view(PingTask)),
     url(r'^(?P<task_id>[\w\d\-]+)/done/?$', celery_views.is_task_successful,
         name="celery-is_task_successful"),
     url(r'^(?P<task_id>[\w\d\-]+)/status/?$', celery_views.task_status,