Browse Source

celery.worker.state: Keep track of currently executing tasks

Ask Solem 15 years ago
parent
commit
ccdf31351d
2 changed files with 25 additions and 5 deletions
  1. 10 5
      celery/worker/job.py
  2. 15 0
      celery/worker/state.py

+ 10 - 5
celery/worker/job.py

@@ -4,15 +4,16 @@ import socket
 import warnings
 
 from celery import conf
-from celery import platform
 from celery import log
+from celery import platform
+from celery.datastructures import ExceptionInfo
+from celery.execute.trace import TaskTrace
+from celery.loaders import current_loader
+from celery.registry import tasks
 from celery.utils import noop, kwdict, fun_takes_kwargs
 from celery.utils.mail import mail_admins
+from celery.worker import state
 from celery.worker.revoke import revoked
-from celery.loaders import current_loader
-from celery.execute.trace import TaskTrace
-from celery.registry import tasks
-from celery.datastructures import ExceptionInfo
 
 # pep8.py borks on a inline signature separator and
 # says "trailing whitespace" ;)
@@ -361,6 +362,7 @@ class TaskRequest(object):
         return result
 
     def on_accepted(self):
+        state.task_accepted(self.task_name)
         if not self.task.acks_late:
             self.acknowledge()
         self.send_event("task-started", uuid=self.task_id)
@@ -368,6 +370,7 @@ class TaskRequest(object):
             self.task_name, self.task_id))
 
     def on_timeout(self, soft):
+        state.task_ready(self.task_name)
         if soft:
             self.logger.warning("Soft time limit exceeded for %s[%s]" % (
                 self.task_name, self.task_id))
@@ -383,6 +386,7 @@ class TaskRequest(object):
     def on_success(self, ret_value):
         """The handler used if the task was successfully processed (
         without raising an exception)."""
+        state.task_ready(self.task_name)
 
         if self.task.acks_late:
             self.acknowledge()
@@ -399,6 +403,7 @@ class TaskRequest(object):
 
     def on_failure(self, exc_info):
         """The handler used if the task raised an exception."""
+        state.task_ready(self.task_name)
 
         if self.task.acks_late:
             self.acknowledge()

+ 15 - 0
celery/worker/state.py

@@ -0,0 +1,15 @@
+from celery.utils.compat import defaultdict
+
+active = defaultdict(lambda: 0)
+total = defaultdict(lambda: 0)
+
+
+def task_accepted(task_name):
+    active[task_name] += 1
+    total[task_name] += 1
+
+
+def task_ready(task_name):
+    active[task_name] -= 1
+    if active[task_name] == 0:
+        active.pop(task_name, None)