Forráskód Böngészése

Merge branch 'master' into statistics

Ask Solem 16 éve
szülő
commit
550429b1f7
4 módosított fájl, 19 hozzáadás és 2 törlés
  1. 1 1
      MANIFEST.in
  2. 3 0
      celery/bin/celeryd.py
  3. 9 1
      celery/pool.py
  4. 6 0
      celery/worker.py

+ 1 - 1
MANIFEST.in

@@ -1,5 +1,5 @@
 include AUTHORS
-include CHANGELOG
+include Changelog
 include README
 include MANIFEST.in
 include LICENSE

+ 3 - 0
celery/bin/celeryd.py

@@ -181,6 +181,9 @@ def run_worker(concurrency=DAEMON_CONCURRENCY, daemon=False,
     except Exception, e:
         emergency_error(logfile, "celeryd raised exception %s: %s\n%s" % (
                             e.__class__, e, traceback.format_exc()))
+    except:
+        context.close()
+        raise
 
 
 OPTION_LIST = (

+ 9 - 1
celery/pool.py

@@ -55,10 +55,14 @@ class TaskPool(object):
         self._processes = {}
         self._pool = multiprocessing.Pool(processes=self.limit)
 
+    def terminate(self):
+        """Terminate the pool."""
+        self._pool.terminate()
+
     def _terminate_and_restart(self):
         """INTERNAL: Terminate and restart the pool."""
         try:
-            self._pool.terminate()
+            self.terminate()
         except OSError:
             pass
         self._start()
@@ -194,6 +198,10 @@ class TaskPool(object):
         been collected."""
 
         if isinstance(ret_value, ExceptionInfo):
+            if isinstance(ret_value.exception, KeyboardInterrupt) or \
+                    isinstance(ret_value.exception, SystemExit):
+                self.terminate()
+                raise ret_value.exception
             for errback in errbacks:
                 errback(ret_value, meta)
         else:

+ 6 - 0
celery/worker.py

@@ -98,6 +98,8 @@ def jail(task_id, task_name, func, args, kwargs):
                         for k, v in kwargs.items()])
     try:
         result = func(*args, **kwargs)
+    except (SystemExit, KeyboardInterrupt):
+        raise
     except Exception, exc:
         default_backend.mark_as_failure(task_id, exc)
         retval = ExceptionInfo(sys.exc_info())
@@ -459,3 +461,7 @@ class WorkController(object):
                 self.logger.critical("Message queue raised %s: %s\n%s" % (
                              exc.__class__, exc, traceback.format_exc()))
                 continue
+            except:
+                self.pool.terminate()
+                raise
+