소스 검색

Terminate pool at SystemExit

Ask Solem 16 년 전
부모
커밋
22491d7062
3개의 변경된 파일18개의 추가작업 그리고 1개의 파일을 삭제
  1. 3 0
      celery/bin/celeryd.py
  2. 9 1
      celery/pool.py
  3. 6 0
      celery/worker.py

+ 3 - 0
celery/bin/celeryd.py

@@ -165,6 +165,9 @@ def run_worker(concurrency=DAEMON_CONCURRENCY, daemon=False,
     except Exception, e:
     except Exception, e:
         emergency_error(logfile, "celeryd raised exception %s: %s\n%s" % (
         emergency_error(logfile, "celeryd raised exception %s: %s\n%s" % (
                             e.__class__, e, traceback.format_exc()))
                             e.__class__, e, traceback.format_exc()))
+    except:
+        context.close()
+        raise
 
 
 
 
 OPTION_LIST = (
 OPTION_LIST = (

+ 9 - 1
celery/pool.py

@@ -55,10 +55,14 @@ class TaskPool(object):
         self._processes = {}
         self._processes = {}
         self._pool = multiprocessing.Pool(processes=self.limit)
         self._pool = multiprocessing.Pool(processes=self.limit)
 
 
+    def terminate(self):
+        """Terminate the pool."""
+        self._pool.terminate()
+
     def _terminate_and_restart(self):
     def _terminate_and_restart(self):
         """INTERNAL: Terminate and restart the pool."""
         """INTERNAL: Terminate and restart the pool."""
         try:
         try:
-            self._pool.terminate()
+            self.terminate()
         except OSError:
         except OSError:
             pass
             pass
         self._start()
         self._start()
@@ -194,6 +198,10 @@ class TaskPool(object):
         been collected."""
         been collected."""
 
 
         if isinstance(ret_value, ExceptionInfo):
         if isinstance(ret_value, ExceptionInfo):
+            if isinstance(ret_value.exception, KeyboardInterrupt) or \
+                    isinstance(ret_value.exception, SystemExit):
+                self.terminate()
+                raise ret_value.exception
             for errback in errbacks:
             for errback in errbacks:
                 errback(ret_value, meta)
                 errback(ret_value, meta)
         else:
         else:

+ 6 - 0
celery/worker.py

@@ -95,6 +95,8 @@ def jail(task_id, func, args, kwargs):
                         for k, v in kwargs.items()])
                         for k, v in kwargs.items()])
     try:
     try:
         result = func(*args, **kwargs)
         result = func(*args, **kwargs)
+    except (SystemExit, KeyboardInterrupt):
+        raise
     except Exception, exc:
     except Exception, exc:
         default_backend.mark_as_failure(task_id, exc)
         default_backend.mark_as_failure(task_id, exc)
         return ExceptionInfo(sys.exc_info())
         return ExceptionInfo(sys.exc_info())
@@ -453,3 +455,7 @@ class WorkController(object):
                 self.logger.critical("Message queue raised %s: %s\n%s" % (
                 self.logger.critical("Message queue raised %s: %s\n%s" % (
                              exc.__class__, exc, traceback.format_exc()))
                              exc.__class__, exc, traceback.format_exc()))
                 continue
                 continue
+            except:
+                self.pool.terminate()
+                raise
+