Selaa lähdekoodia

Propagate to ApplyResult.terminate()

Ask Solem 11 vuotta sitten
vanhempi
commit
712c9afdc6
2 muutettua tiedostoa jossa 11 lisäystä ja 4 poistoa
  1. 3 2
      celery/concurrency/asynpool.py
  2. 8 2
      celery/worker/job.py

+ 3 - 2
celery/concurrency/asynpool.py

@@ -544,8 +544,9 @@ class AsynPool(_pool.Pool):
 
         def _put_back(job):
             # puts back at the end of the queue
-            if job._terminated or job.correlation_id in revoked_tasks:
-                job._set_terminated('process already gone')
+            if job._terminated is not None or \
+                    job.correlation_id in revoked_tasks:
+                job._set_terminated(job._terminated)
             else:
                 # XXX linear lookup, should find a better way,
                 # but this happens rarely and is here to protect against races.

+ 8 - 2
celery/worker/job.py

@@ -15,6 +15,7 @@ import sys
 
 from billiard.einfo import ExceptionInfo
 from datetime import datetime
+from weakref import ref
 
 from kombu.utils import kwdict, reprcall
 from kombu.utils.encoding import safe_repr, safe_str
@@ -89,7 +90,7 @@ class Request(object):
             'hostname', 'eventer', 'connection_errors', 'task', 'eta',
             'expires', 'request_dict', 'acknowledged', 'on_reject',
             'utc', 'time_start', 'worker_pid', '_already_revoked',
-            '_terminate_on_ack',
+            '_terminate_on_ack', '_apply_result',
             '_tzlocal', '__weakref__', '__dict__',
         )
 
@@ -257,6 +258,7 @@ class Request(object):
             timeout=timeout,
             correlation_id=uuid,
         )
+        self._apply_result = ref(result)
         return result
 
     def execute(self, loglevel=None, logfile=None):
@@ -295,12 +297,16 @@ class Request(object):
                 return True
 
     def terminate(self, pool, signal=None):
+        signal = _signals.signum(signal or 'TERM')
         if self.time_start:
-            signal = _signals.signum(signal or 'TERM')
             pool.terminate_job(self.worker_pid, signal)
             self._announce_revoked('terminated', True, signal, False)
         else:
             self._terminate_on_ack = pool, signal
+        if self._apply_result is not None:
+            obj = self._apply_result()  # is a weakref
+            if obj is not None:
+                obj.terminate(signal)
 
     def _announce_revoked(self, reason, terminated, signum, expired):
         task_ready(self)