Browse Source

Catch exceptions occuring in Mediator+Pool callbacks. Thanks to Christopher Hesse.

Ask Solem 14 years ago
parent
commit
6970fffe95
2 changed files with 25 additions and 3 deletions
  1. 15 2
      celery/concurrency/processes/__init__.py
  2. 10 1
      celery/worker/controllers.py

+ 15 - 2
celery/concurrency/processes/__init__.py

@@ -3,6 +3,8 @@
 Process Pools.
 
 """
+import traceback
+
 from time import sleep, time
 
 from celery import log
@@ -136,9 +138,20 @@ class TaskPool(object):
             if isinstance(ret_value.exception, (
                     SystemExit, KeyboardInterrupt)):
                 raise ret_value.exception
-            [errback(ret_value) for errback in errbacks]
+            [self.safe_apply_callback(errback, ret_value)
+                    for errback in errbacks]
         else:
-            [callback(ret_value) for callback in callbacks]
+            [self.safe_apply_callback(callback, ret_value)
+                    for callback in callbacks]
+
+    def safe_apply_callback(self, fun, *args):
+        try:
+            fun(*args)
+        except:
+            self.logger.error("Pool callback raised exception: %s" % (
+                traceback.format_exc(), ))
+
+
 
     @property
     def info(self):

+ 10 - 1
celery/worker/controllers.py

@@ -4,6 +4,7 @@ Worker Controller Threads
 
 """
 import threading
+import traceback
 from Queue import Empty as QueueEmpty
 
 from celery.app import app_or_default
@@ -33,6 +34,7 @@ class Mediator(threading.Thread):
         self._shutdown = threading.Event()
         self._stopped = threading.Event()
         self.setDaemon(True)
+        self.setName(self.__class__.__name__)
 
     def move(self):
         try:
@@ -47,7 +49,14 @@ class Mediator(threading.Thread):
         self.logger.debug(
             "Mediator: Running callback for task: %s[%s]" % (
                 task.task_name, task.task_id))
-        self.callback(task)                 # execute
+
+        try:
+            self.callback(task)
+        except Exception, exc:
+            self.logger.error("Mediator callback raised exception %r\n%s" % (
+                exc, traceback.format_exc()))
+
+
 
     def run(self):
         while not self._shutdown.isSet():