Jelajahi Sumber

Adds support for the new billiard REMAP_SIGTERM envvar. Closes #2839

Requires celery/billiard@6b4ff8470a22e8d98f4219bc2828cdcae4381473
Ask Solem 9 tahun lalu
induk
melakukan
7612d78ead

+ 3 - 1
celery/app/control.py

@@ -11,6 +11,8 @@ from __future__ import absolute_import
 
 import warnings
 
+from billiard.common import TERM_SIGNAME
+
 from kombu.pidbox import Mailbox
 from kombu.utils import cached_property
 
@@ -151,7 +153,7 @@ class Control(object):
         })
 
     def revoke(self, task_id, destination=None, terminate=False,
-               signal='SIGTERM', **kwargs):
+               signal=TERM_SIGNAME, **kwargs):
         """Tell all (or specific) workers to revoke a task by id.
 
         If a task is revoked, the workers will ignore the task and

+ 5 - 1
celery/concurrency/prefork.py

@@ -10,6 +10,7 @@ from __future__ import absolute_import
 
 import os
 
+from billiard.common import REMAP_SIGTERM, TERM_SIGNAME
 from billiard import forking_enable
 from billiard.pool import RUN, CLOSE, Pool as BlockingPool
 
@@ -32,7 +33,10 @@ WORKER_SIGRESET = {
 }
 
 #: List of signals to ignore when a child process starts.
-WORKER_SIGIGNORE = {'SIGINT'}
+if REMAP_SIGTERM:
+    WORKER_SIGIGNORE = {'SIGINT', TERM_SIGNAME}
+else:
+    WORKER_SIGIGNORE = {'SIGINT'}
 
 logger = get_logger(__name__)
 warning, debug = logger.warning, logger.debug

+ 2 - 1
celery/worker/control.py

@@ -11,6 +11,7 @@ from __future__ import absolute_import
 import io
 import tempfile
 
+from billiard.common import TERM_SIGNAME
 from kombu.utils.encoding import safe_repr
 
 from celery.exceptions import WorkerShutdown
@@ -73,7 +74,7 @@ def revoke(state, task_id, terminate=False, signal=None, **kwargs):
 
     revoked.update(task_ids)
     if terminate:
-        signum = _signals.signum(signal or 'TERM')
+        signum = _signals.signum(signal or TERM_SIGNAME)
         # reserved_requests changes size during iteration
         # so need to consume the items first, then terminate after.
         requests = set(_find_requests_by_id(

+ 2 - 1
celery/worker/request.py

@@ -15,6 +15,7 @@ import sys
 from datetime import datetime
 from weakref import ref
 
+from billiard.common import TERM_SIGNAME
 from kombu.utils.encoding import safe_repr, safe_str
 
 from celery import signals
@@ -234,7 +235,7 @@ class Request(object):
                 return True
 
     def terminate(self, pool, signal=None):
-        signal = _signals.signum(signal or 'TERM')
+        signal = _signals.signum(signal or TERM_SIGNAME)
         if self.time_start:
             pool.terminate_job(self.worker_pid, signal)
             self._announce_revoked('terminated', True, signal, False)