Kaynağa Gözat

Refer worker request info to absolute time (#3684)

* Add Régis Behmo to the list of contributors

* Report absolute time on inspect
Régis B 7 yıl önce
ebeveyn
işleme
fd6e475826
3 değiştirilmiş dosya ile 11 ekleme ve 2 silme
  1. 1 0
      CONTRIBUTORS.txt
  2. 4 2
      celery/worker/request.py
  3. 6 0
      t/unit/worker/test_request.py

+ 1 - 0
CONTRIBUTORS.txt

@@ -255,3 +255,4 @@ Arpan Shah, 2017/09/12
 Tobias 'rixx' Kunze, 2017/08/20
 Mikhail Wolfson, 2017/12/11
 Alex Garel, 2018/01/04
+Régis Behmo 2018/01/20

+ 4 - 2
celery/worker/request.py

@@ -9,6 +9,7 @@ from __future__ import absolute_import, unicode_literals
 import logging
 import sys
 from datetime import datetime
+from time import time
 from weakref import ref
 
 from billiard.common import TERM_SIGNAME
@@ -20,7 +21,7 @@ from celery.app.trace import trace_task, trace_task_ret
 from celery.exceptions import (Ignore, InvalidTaskError, Reject, Retry,
                                TaskRevokedError, Terminated,
                                TimeLimitExceeded, WorkerLostError)
-from celery.five import python_2_unicode_compatible, string
+from celery.five import monotonic, python_2_unicode_compatible, string
 from celery.platforms import signals as _signals
 from celery.utils.functional import maybe, noop
 from celery.utils.log import get_logger
@@ -287,7 +288,8 @@ class Request(object):
     def on_accepted(self, pid, time_accepted):
         """Handler called when task is accepted by worker pool."""
         self.worker_pid = pid
-        self.time_start = time_accepted
+        # Convert monotonic time_accepted to absolute time
+        self.time_start = time() - (monotonic() - time_accepted)
         task_accepted(self)
         if not self.task.acks_late:
             self.acknowledge()

+ 6 - 0
t/unit/worker/test_request.py

@@ -7,6 +7,7 @@ import signal
 import socket
 import sys
 from datetime import datetime, timedelta
+from time import time
 
 import pytest
 from billiard.einfo import ExceptionInfo
@@ -516,6 +517,11 @@ class test_Request(RequestCase):
             job.on_accepted(pid=314, time_accepted=monotonic())
             pool.terminate_job.assert_called_with(314, signum)
 
+    def test_on_accepted_time_start(self):
+        job = self.xRequest()
+        job.on_accepted(pid=os.getpid(), time_accepted=monotonic())
+        assert time() - job.time_start < 1
+
     def test_on_success_acks_early(self):
         job = self.xRequest()
         job.time_start = 1