Browse Source

Worker: Keep count of all tasks accepted (index for sum total_count)

Ask Solem 11 years ago
parent
commit
3af8a8124d
2 changed files with 7 additions and 3 deletions
  1. 2 2
      celery/worker/heartbeat.py
  2. 5 1
      celery/worker/state.py

+ 2 - 2
celery/worker/heartbeat.py

@@ -12,7 +12,7 @@ from __future__ import absolute_import
 from celery.five import values
 from celery.utils.sysinfo import load_average
 
-from .state import SOFTWARE_INFO, active_requests, total_count
+from .state import SOFTWARE_INFO, active_requests, all_total_count
 
 __all__ = ['Heart']
 
@@ -40,7 +40,7 @@ class Heart(object):
     def _send(self, event):
         return self.eventer.send(event, freq=self.interval,
                                  active=len(active_requests),
-                                 processed=sum(values(total_count)),
+                                 processed=all_total_count[0],
                                  loadavg=load_average(),
                                  **SOFTWARE_INFO)
 

+ 5 - 1
celery/worker/state.py

@@ -51,6 +51,9 @@ active_requests = set()
 #: count of tasks accepted by the worker, sorted by type.
 total_count = Counter()
 
+#: count of all tasks accepted by the worker
+all_total_count = [0]
+
 #: the list of currently revoked tasks.  Persistent if statedb set.
 revoked = LimitedSet(maxlen=REVOKES_MAX, expires=REVOKE_EXPIRES)
 
@@ -68,10 +71,11 @@ def maybe_shutdown():
         raise SystemTerminate()
 
 
-def task_accepted(request):
+def task_accepted(request, _all_total_count=all_total_count):
     """Updates global state when a task has been accepted."""
     active_requests.add(request)
     total_count[request.name] += 1
+    all_total_count[0] += 1
 
 
 def task_ready(request):