Browse Source

There, should've gotten most of core away from using finally.

Jesper Noehr 15 years ago
parent
commit
998b9e1fcf

+ 10 - 3
celery/backends/amqp.py

@@ -96,11 +96,18 @@ class AMQPBackend(BaseDictBackend):
         consumer = self._consumer_for_task_id(task_id, connection)
         consumer.register_callback(callback)
 
+        did_exc = None
+
         try:
             consumer.iterconsume().next()
-        finally:
-            consumer.backend.channel.queue_delete(routing_key)
-            consumer.close()
+        except Exception, e:
+            did_exc = e
+
+        consumer.backend.channel.queue_delete(routing_key)
+        consumer.close()
+
+        if did_exc:
+            raise did_exc
 
         self._cache[task_id] = results[0]
         return results[0]

+ 9 - 2
celery/beat.py

@@ -186,6 +186,8 @@ class ClockService(object):
         if embedded_process:
             platform.set_process_title("celerybeat")
 
+        did_exc = None
+
         try:
             while True:
                 if self._shutdown.isSet():
@@ -196,8 +198,13 @@ class ClockService(object):
                 time.sleep(interval)
         except (KeyboardInterrupt, SystemExit):
             self.sync()
-        finally:
-            self.sync()
+        except Exception, e:
+            did_exc = e
+
+        self.sync()
+        
+        if did_exc:
+            raise did_exc
 
     def sync(self):
         if self._schedule is not None and not self._in_sync:

+ 8 - 2
celery/events.py

@@ -54,10 +54,16 @@ class EventDispatcher(object):
             return
 
         self._lock.acquire()
+        did_exc = None
         try:
             self.publisher.send(Event(type, hostname=self.hostname))
-        finally:
-            self._lock.release()
+        except Exception, e:
+            did_exc = e
+
+        self._lock.release()
+        
+        if did_exc:
+            raise did_exc
 
     def close(self):
         """Close the event dispatcher."""

+ 16 - 4
celery/execute/__init__.py

@@ -76,11 +76,17 @@ def apply_async(task, args=None, kwargs=None, countdown=None, eta=None,
     exchange = options.get("exchange")
 
     publish = publisher or task.get_publisher(connection, exchange=exchange)
+    did_exc = None
     try:
         task_id = publish.delay_task(task.name, args, kwargs, task_id=task_id,
                                      countdown=countdown, eta=eta, **options)
-    finally:
-        publisher or publish.close()
+    except Exception, e:
+        did_exc = e
+
+    publisher or publish.close()
+
+    if did_exc:
+        raise did_exc
 
     return task.AsyncResult(task_id)
 
@@ -92,11 +98,17 @@ def send_task(name, args=None, kwargs=None, countdown=None, eta=None,
 
     exchange = options.get("exchange")
     publish = publisher or TaskPublisher(connection, exchange=exchange)
+    did_exc = None
     try:
         task_id = publish.delay_task(name, args, kwargs, task_id=task_id,
                                      countdown=countdown, eta=eta, **options)
-    finally:
-        publisher or publish.close()
+    except Exception, e:
+        did_exc = e
+
+    publisher or publish.close()
+    
+    if did_exc:
+        raise did_exc
 
     return result_cls(task_id)
 

+ 7 - 2
celery/loaders/djangoapp.py

@@ -62,12 +62,17 @@ def autodiscover():
     if _RACE_PROTECTION:
         return
     _RACE_PROTECTION = True
+    did_exc = None
     try:
         return filter(None, [find_related_module(app, "tasks")
                                 for app in settings.INSTALLED_APPS])
-    finally:
-        _RACE_PROTECTION = False
+    except Exception, e:
+        did_exc = e
 
+    _RACE_PROTECTION = False
+
+    if did_exc:
+        raise did_exc
 
 def find_related_module(app, related_name):
     """Given an application name and a module name, tries to find that

+ 16 - 4
celery/log.py

@@ -79,6 +79,7 @@ def emergency_error(logfile, message):
     some other reason."""
     closefh = noop
     logfile = logfile or sys.__stderr__
+    did_exc = None
     if hasattr(logfile, "write"):
         logfh = logfile
     else:
@@ -89,9 +90,14 @@ def emergency_error(logfile, message):
                         "asctime": time.asctime(),
                         "pid": os.getpid(),
                         "message": message})
-    finally:
-        closefh()
+    except Exception, e:
+        did_exc = e
 
+    closefh()
+
+    if did_exc:
+        raise did_exc
+        
 
 def redirect_stdouts_to_logger(logger, loglevel=None):
     """Redirect :class:`sys.stdout` and :class:`sys.stderr` to a
@@ -134,14 +140,20 @@ class LoggingProxy(object):
 
                 def handleError(self, record):
                     exc_info = sys.exc_info()
+                    did_exc = None
                     try:
                         traceback.print_exception(exc_info[0], exc_info[1],
                                                   exc_info[2], None,
                                                   sys.__stderr__)
                     except IOError:
                         pass    # see python issue 5971
-                    finally:
-                        del(exc_info)
+                    except Exception, e:
+                        did_exc = e
+
+                    del(exc_info)
+                    
+                    if did_exc:
+                        raise did_exc
 
             handler.handleError = WithSafeHandleError().handleError
 

+ 8 - 2
celery/messaging.py

@@ -136,11 +136,17 @@ def with_connection(fun):
         kwargs["connection"] = conn = connection or \
                 establish_connection(connect_timeout=timeout)
         close_connection = not connection and conn.close or noop
+        did_exc = None
 
         try:
             return fun(*args, **kwargs)
-        finally:
-            close_connection()
+        except Exception, e:
+            did_exc = e
+
+        close_connection()
+
+        if did_exc:
+            raise did_exc
 
     return _inner
 

+ 10 - 3
celery/task/base.py

@@ -551,12 +551,19 @@ class TaskSet(object):
         taskset_id = gen_unique_id()
         conn = self.task.establish_connection(connect_timeout=connect_timeout)
         publisher = self.task.get_publisher(connection=conn)
+        did_exc = None
         try:
             subtasks = [self.apply_part(arglist, taskset_id, publisher)
                             for arglist in self.arguments]
-        finally:
-            publisher.close()
-            conn.close()
+        except Exception, e:
+            did_exc = e
+
+        publisher.close()
+        conn.close()
+
+        if did_exc:
+            raise did_exc
+
         result = TaskSetResult(taskset_id, subtasks)
 
         return result

+ 17 - 4
celery/task/control.py

@@ -15,10 +15,16 @@ def discard_all(connection=None,
 
     """
     consumers = get_consumer_set(connection=connection)
+    did_exc = None
     try:
         return consumers.discard_all()
-    finally:
-        consumers.close()
+    except Exception, e:
+        did_exc = e
+
+    consumers.close()
+
+    if did_exc:
+        raise did_exc
 
 
 def revoke(task_id, destination=None, connection=None,
@@ -80,7 +86,14 @@ def broadcast(command, arguments=None, destination=None, connection=None,
     arguments = arguments or {}
 
     broadcast = BroadcastPublisher(connection)
+    did_exc = None
     try:
         broadcast.send(command, arguments, destination=destination)
-    finally:
-        broadcast.close()
+    except Exception, e:
+        did_exc = e
+
+    broadcast.close()
+    
+    if did_exc:
+        raise did_exc
+

+ 8 - 2
celery/utils/patch.py

@@ -8,6 +8,7 @@ def _patch_logger_class():
 
     from multiprocessing.process import current_process
     logging._acquireLock()
+    did_exc = None
     try:
         OldLoggerClass = logging.getLoggerClass()
         if not getattr(OldLoggerClass, '_process_aware', False):
@@ -20,9 +21,14 @@ def _patch_logger_class():
                     record.processName = current_process()._name
                     return record
             logging.setLoggerClass(ProcessAwareLogger)
-    finally:
-        logging._releaseLock()
+    except Exception, e:
+        did_exc = e
 
+    logging._releaseLock()
+
+    if did_exc:
+        raise did_exc
+        
 
 def ensure_process_aware_logger():
     global _process_aware

+ 8 - 2
celery/worker/__init__.py

@@ -165,14 +165,20 @@ class WorkController(object):
     def start(self):
         """Starts the workers main loop."""
         self._state = "RUN"
+        did_exc = None
 
         try:
             for component in self.components:
                 self.logger.debug("Starting thread %s..." % \
                         component.__class__.__name__)
                 component.start()
-        finally:
-            self.stop()
+        except Exception, e:
+            did_exc = e
+
+        self.stop()
+
+        if did_exc:
+            raise did_exc
 
     def process_task(self, wrapper):
         """Process task by sending it to the pool of workers."""

+ 9 - 2
celery/worker/heartbeat.py

@@ -46,10 +46,17 @@ class Heart(threading.Thread):
                 break
             sleep(1)
 
+        did_exc = None
+
         try:
             dispatch("worker-offline")
-        finally:
-            self._stopped.set()
+        except Exception, e:
+            did_exc = e
+
+        self._stopped.set()
+
+        if did_exc:
+            raise did_exc
 
     def stop(self):
         """Gracefully shutdown the thread."""

+ 8 - 2
celery/worker/job.py

@@ -123,10 +123,16 @@ class WorkerTaskTrace(TaskTrace):
 
 def execute_and_trace(task_name, *args, **kwargs):
     platform.set_mp_process_title("celeryd", info=task_name)
+    did_exc = None
     try:
         return WorkerTaskTrace(task_name, *args, **kwargs).execute_safe()
-    finally:
-        platform.set_mp_process_title("celeryd")
+    except Exception, e:
+        did_exc = e
+
+    platform.set_mp_process_title("celeryd")
+    
+    if did_exc:
+        raise did_exc
 
 
 class TaskWrapper(object):