Преглед на файлове

fix issue in gevent when killing celery, fixes #911 and #936

gevent in version prior 1.0 does not work well with signals, and as it
doesn't monkey patch them, signal handlers must be specified using the
gevent.signal method.
Another issue is that the interface for handlers is not the same
with signal.signal and gevent.signal, as gevent.signal does not
call the handler will the signum and the frame.
When celery will depends on gevent 1.0 (still not stable), this fix
won't be necessary.
Thomas Meson преди 12 години
родител
ревизия
7f4761e2e8
променени са 2 файла, в които са добавени 18 реда и са изтрити 10 реда
  1. 10 9
      celery/apps/worker.py
  2. 8 1
      celery/concurrency/gevent.py

+ 10 - 9
celery/apps/worker.py

@@ -292,17 +292,17 @@ class Worker(configurated):
 def _shutdown_handler(worker, sig='TERM', how='Warm', exc=SystemExit,
 def _shutdown_handler(worker, sig='TERM', how='Warm', exc=SystemExit,
         callback=None):
         callback=None):
 
 
-    def _handle_request(signum, frame):
+    def _handle_request(*args):
         set_in_sighandler(True)
         set_in_sighandler(True)
         try:
         try:
             from celery.worker import state
             from celery.worker import state
             if current_process()._name == 'MainProcess':
             if current_process()._name == 'MainProcess':
                 if callback:
                 if callback:
                     callback(worker)
                     callback(worker)
-                safe_say('celeryd: %s shutdown (MainProcess)' % how)
-            if active_thread_count() > 1:
-                setattr(state, {'Warm': 'should_stop',
-                                'Cold': 'should_terminate'}[how], True)
+                    safe_say('celeryd: %s shutdown (MainProcess)' % how)
+                if active_thread_count() > 1:
+                    setattr(state, {'Warm': 'should_stop',
+                                    'Cold': 'should_terminate'}[how], True)
             else:
             else:
                 raise exc()
                 raise exc()
         finally:
         finally:
@@ -338,7 +338,7 @@ def _clone_current_worker():
 
 
 def install_worker_restart_handler(worker, sig='SIGHUP'):
 def install_worker_restart_handler(worker, sig='SIGHUP'):
 
 
-    def restart_worker_sig_handler(signum, frame):
+    def restart_worker_sig_handler(*args):
         """Signal handler restarting the current python program."""
         """Signal handler restarting the current python program."""
         set_in_sighandler(True)
         set_in_sighandler(True)
         safe_say('Restarting celeryd (%s)' % (' '.join(sys.argv), ))
         safe_say('Restarting celeryd (%s)' % (' '.join(sys.argv), ))
@@ -354,7 +354,7 @@ def install_cry_handler():
     if is_jython or is_pypy:  # pragma: no cover
     if is_jython or is_pypy:  # pragma: no cover
         return
         return
 
 
-    def cry_handler(signum, frame):
+    def cry_handler(*args):
         """Signal handler logging the stacktrace of all active threads."""
         """Signal handler logging the stacktrace of all active threads."""
         set_in_sighandler(True)
         set_in_sighandler(True)
         try:
         try:
@@ -367,10 +367,11 @@ def install_cry_handler():
 def install_rdb_handler(envvar='CELERY_RDBSIG',
 def install_rdb_handler(envvar='CELERY_RDBSIG',
                         sig='SIGUSR2'):  # pragma: no cover
                         sig='SIGUSR2'):  # pragma: no cover
 
 
-    def rdb_handler(signum, frame):
+    def rdb_handler(*args):
         """Signal handler setting a rdb breakpoint at the current frame."""
         """Signal handler setting a rdb breakpoint at the current frame."""
         set_in_sighandler(True)
         set_in_sighandler(True)
         try:
         try:
+            _, frame = args
             from celery.contrib import rdb
             from celery.contrib import rdb
             rdb.set_trace(frame)
             rdb.set_trace(frame)
         finally:
         finally:
@@ -381,7 +382,7 @@ def install_rdb_handler(envvar='CELERY_RDBSIG',
 
 
 def install_HUP_not_supported_handler(worker, sig='SIGHUP'):
 def install_HUP_not_supported_handler(worker, sig='SIGHUP'):
 
 
-    def warn_on_HUP_handler(signum, frame):
+    def warn_on_HUP_handler(*args):
         set_in_sighandler(True)
         set_in_sighandler(True)
         try:
         try:
             safe_say('%(sig)s not supported: Restarting with %(sig)s is '
             safe_say('%(sig)s not supported: Restarting with %(sig)s is '

+ 8 - 1
celery/concurrency/gevent.py

@@ -13,8 +13,15 @@ import os
 PATCHED = [0]
 PATCHED = [0]
 if not os.environ.get('GEVENT_NOPATCH') and not PATCHED[0]:
 if not os.environ.get('GEVENT_NOPATCH') and not PATCHED[0]:
     PATCHED[0] += 1
     PATCHED[0] += 1
-    from gevent import monkey
+    from gevent import monkey, version_info
     monkey.patch_all()
     monkey.patch_all()
+    if version_info[0] == 0:
+        # Signals are not working along gevent in version prior 1.0
+        # and they are not monkey patch by monkey.patch_all()
+        from gevent import signal as _gevent_signal
+        _signal = __import__('signal')
+        _signal.signal = _gevent_signal
+
 
 
 from time import time
 from time import time