Преглед на файлове

Worker pid is now sent with the task-accepted event.

In addition time_start (used e.g. to calculate the total execution time)
is now taken from what the worker reports, meaning more accuracy
and saving us a call to time.time() (yay ;)

Closes #277.  Should probably open up a new issue to display this
info in monitors (celeryev/djcelerymon)
Ask Solem преди 14 години
родител
ревизия
f0d761bce7
променени са 3 файла, в които са добавени 11 реда и са изтрити 9 реда
  1. 1 1
      celery/concurrency/processes/pool.py
  2. 5 3
      celery/tests/test_worker_job.py
  3. 5 5
      celery/worker/job.py

+ 1 - 1
celery/concurrency/processes/pool.py

@@ -863,7 +863,7 @@ class ApplyResult(object):
         self._time_accepted = time_accepted
         self._worker_pid = pid
         if self._accept_callback:
-            self._accept_callback()
+            self._accept_callback(pid, time_accepted)
         if self._ready:
             self._cache.pop(self._job, None)
 

+ 5 - 3
celery/tests/test_worker_job.py

@@ -1,7 +1,9 @@
 # -*- coding: utf-8 -*-
-import logging
 import anyjson
+import logging
+import os
 import sys
+import time
 
 from datetime import datetime, timedelta
 
@@ -272,14 +274,14 @@ class test_TaskRequest(unittest.TestCase):
 
     def test_on_accepted_acks_early(self):
         tw = TaskRequest(mytask.name, gen_unique_id(), [1], {"f": "x"})
-        tw.on_accepted()
+        tw.on_accepted(pid=os.getpid(), time_accepted=time.time())
         self.assertTrue(tw.acknowledged)
 
     def test_on_accepted_acks_late(self):
         tw = TaskRequest(mytask.name, gen_unique_id(), [1], {"f": "x"})
         mytask.acks_late = True
         try:
-            tw.on_accepted()
+            tw.on_accepted(pid=os.getpid(), time_accepted=time.time())
             self.assertFalse(tw.acknowledged)
         finally:
             mytask.acks_late = False

+ 5 - 5
celery/worker/job.py

@@ -398,15 +398,15 @@ class TaskRequest(object):
         if self.eventer:
             self.eventer.send(type, **fields)
 
-    def on_accepted(self):
+    def on_accepted(self, pid, time_accepted):
         """Handler called when task is accepted by worker pool."""
-        self.time_start = time.time()
+        self.time_start = time_accepted
         state.task_accepted(self)
         if not self.task.acks_late:
             self.acknowledge()
-        self.send_event("task-started", uuid=self.task_id)
-        self.logger.debug("Task accepted: %s[%s]" % (
-            self.task_name, self.task_id))
+        self.send_event("task-started", uuid=self.task_id, pid=pid)
+        self.logger.debug("Task accepted: %s[%s] pid:%r" % (
+            self.task_name, self.task_id, pid))
 
     def on_timeout(self, soft):
         """Handler called if the task times out."""