瀏覽代碼

try: finally is available in 2.4, just not try: except: finally

Ask Solem 15 年之前
父節點
當前提交
85fb35af30

+ 3 - 10
celery/backends/amqp.py

@@ -96,18 +96,11 @@ class AMQPBackend(BaseDictBackend):
         consumer = self._consumer_for_task_id(task_id, connection)
         consumer.register_callback(callback)
 
-        did_exc = None
-
         try:
             consumer.iterconsume().next()
-        except Exception, e:
-            did_exc = e
-
-        consumer.backend.channel.queue_delete(routing_key)
-        consumer.close()
-
-        if did_exc:
-            raise did_exc
+        finally:
+            consumer.backend.channel.queue_delete(routing_key)
+            consumer.close()
 
         self._cache[task_id] = results[0]
         return results[0]

+ 11 - 17
celery/beat.py

@@ -186,25 +186,19 @@ class ClockService(object):
         if embedded_process:
             platform.set_process_title("celerybeat")
 
-        did_exc = None
-
         try:
-            while True:
-                if self._shutdown.isSet():
-                    break
-                interval = self.scheduler.tick()
-                self.debug("ClockService: Waking up %s." % (
-                        humanize_seconds(interval, prefix="in ")))
-                time.sleep(interval)
-        except (KeyboardInterrupt, SystemExit):
+            try:
+                while True:
+                    if self._shutdown.isSet():
+                        break
+                    interval = self.scheduler.tick()
+                    self.debug("ClockService: Waking up %s." % (
+                            humanize_seconds(interval, prefix="in ")))
+                    time.sleep(interval)
+            except (KeyboardInterrupt, SystemExit):
+                self.sync()
+        finally:
             self.sync()
-        except Exception, e:
-            did_exc = e
-
-        self.sync()
-        
-        if did_exc:
-            raise did_exc
 
     def sync(self):
         if self._schedule is not None and not self._in_sync:

+ 2 - 8
celery/events.py

@@ -54,16 +54,10 @@ class EventDispatcher(object):
             return
 
         self._lock.acquire()
-        did_exc = None
         try:
             self.publisher.send(Event(type, hostname=self.hostname))
-        except Exception, e:
-            did_exc = e
-
-        self._lock.release()
-        
-        if did_exc:
-            raise did_exc
+        finally:
+            self._lock.release()
 
     def close(self):
         """Close the event dispatcher."""

+ 4 - 16
celery/execute/__init__.py

@@ -76,17 +76,11 @@ def apply_async(task, args=None, kwargs=None, countdown=None, eta=None,
     exchange = options.get("exchange")
 
     publish = publisher or task.get_publisher(connection, exchange=exchange)
-    did_exc = None
     try:
         task_id = publish.delay_task(task.name, args, kwargs, task_id=task_id,
                                      countdown=countdown, eta=eta, **options)
-    except Exception, e:
-        did_exc = e
-
-    publisher or publish.close()
-
-    if did_exc:
-        raise did_exc
+    finally:
+        publisher or publish.close()
 
     return task.AsyncResult(task_id)
 
@@ -98,17 +92,11 @@ def send_task(name, args=None, kwargs=None, countdown=None, eta=None,
 
     exchange = options.get("exchange")
     publish = publisher or TaskPublisher(connection, exchange=exchange)
-    did_exc = None
     try:
         task_id = publish.delay_task(name, args, kwargs, task_id=task_id,
                                      countdown=countdown, eta=eta, **options)
-    except Exception, e:
-        did_exc = e
-
-    publisher or publish.close()
-    
-    if did_exc:
-        raise did_exc
+    finally:
+        publisher or publish.close()
 
     return result_cls(task_id)
 

+ 2 - 7
celery/loaders/djangoapp.py

@@ -62,17 +62,12 @@ def autodiscover():
     if _RACE_PROTECTION:
         return
     _RACE_PROTECTION = True
-    did_exc = None
     try:
         return filter(None, [find_related_module(app, "tasks")
                                 for app in settings.INSTALLED_APPS])
-    except Exception, e:
-        did_exc = e
+    finally:
+        _RACE_PROTECTION = False
 
-    _RACE_PROTECTION = False
-
-    if did_exc:
-        raise did_exc
 
 def find_related_module(app, related_name):
     """Given an application name and a module name, tries to find that

+ 11 - 21
celery/log.py

@@ -79,7 +79,6 @@ def emergency_error(logfile, message):
     some other reason."""
     closefh = noop
     logfile = logfile or sys.__stderr__
-    did_exc = None
     if hasattr(logfile, "write"):
         logfh = logfile
     else:
@@ -90,14 +89,9 @@ def emergency_error(logfile, message):
                         "asctime": time.asctime(),
                         "pid": os.getpid(),
                         "message": message})
-    except Exception, e:
-        did_exc = e
+    finally:
+        closefh()
 
-    closefh()
-
-    if did_exc:
-        raise did_exc
-        
 
 def redirect_stdouts_to_logger(logger, loglevel=None):
     """Redirect :class:`sys.stdout` and :class:`sys.stderr` to a
@@ -140,20 +134,16 @@ class LoggingProxy(object):
 
                 def handleError(self, record):
                     exc_info = sys.exc_info()
-                    did_exc = None
                     try:
-                        traceback.print_exception(exc_info[0], exc_info[1],
-                                                  exc_info[2], None,
-                                                  sys.__stderr__)
-                    except IOError:
-                        pass    # see python issue 5971
-                    except Exception, e:
-                        did_exc = e
-
-                    del(exc_info)
-                    
-                    if did_exc:
-                        raise did_exc
+                        try:
+                            traceback.print_exception(exc_info[0],
+                                                      exc_info[1],
+                                                      exc_info[2],
+                                                      None, sys.__stderr__)
+                        except IOError:
+                            pass    # see python issue 5971
+                    finally:
+                        del(exc_info)
 
             handler.handleError = WithSafeHandleError().handleError
 

+ 2 - 8
celery/messaging.py

@@ -136,17 +136,11 @@ def with_connection(fun):
         kwargs["connection"] = conn = connection or \
                 establish_connection(connect_timeout=timeout)
         close_connection = not connection and conn.close or noop
-        did_exc = None
 
         try:
             return fun(*args, **kwargs)
-        except Exception, e:
-            did_exc = e
-
-        close_connection()
-
-        if did_exc:
-            raise did_exc
+        finally:
+            close_connection()
 
     return _inner
 

+ 4 - 12
celery/task/base.py

@@ -551,22 +551,14 @@ class TaskSet(object):
         taskset_id = gen_unique_id()
         conn = self.task.establish_connection(connect_timeout=connect_timeout)
         publisher = self.task.get_publisher(connection=conn)
-        did_exc = None
         try:
             subtasks = [self.apply_part(arglist, taskset_id, publisher)
                             for arglist in self.arguments]
-        except Exception, e:
-            did_exc = e
+        finally:
+            publisher.close()
+            conn.close()
 
-        publisher.close()
-        conn.close()
-
-        if did_exc:
-            raise did_exc
-
-        result = TaskSetResult(taskset_id, subtasks)
-
-        return result
+        return TaskSetResult(taskset_id, subtasks)
 
     def apply_part(self, arglist, taskset_id, publisher):
         args, kwargs, opts = padlist(arglist, 3, default={})

+ 4 - 17
celery/task/control.py

@@ -15,16 +15,10 @@ def discard_all(connection=None,
 
     """
     consumers = get_consumer_set(connection=connection)
-    did_exc = None
     try:
         return consumers.discard_all()
-    except Exception, e:
-        did_exc = e
-
-    consumers.close()
-
-    if did_exc:
-        raise did_exc
+    finally:
+        consumers.close()
 
 
 def revoke(task_id, destination=None, connection=None,
@@ -86,14 +80,7 @@ def broadcast(command, arguments=None, destination=None, connection=None,
     arguments = arguments or {}
 
     broadcast = BroadcastPublisher(connection)
-    did_exc = None
     try:
         broadcast.send(command, arguments, destination=destination)
-    except Exception, e:
-        did_exc = e
-
-    broadcast.close()
-    
-    if did_exc:
-        raise did_exc
-
+    finally:
+        broadcast.close()

+ 2 - 8
celery/utils/patch.py

@@ -8,7 +8,6 @@ def _patch_logger_class():
 
     from multiprocessing.process import current_process
     logging._acquireLock()
-    did_exc = None
     try:
         OldLoggerClass = logging.getLoggerClass()
         if not getattr(OldLoggerClass, '_process_aware', False):
@@ -21,14 +20,9 @@ def _patch_logger_class():
                     record.processName = current_process()._name
                     return record
             logging.setLoggerClass(ProcessAwareLogger)
-    except Exception, e:
-        did_exc = e
+    finally:
+        logging._releaseLock()
 
-    logging._releaseLock()
-
-    if did_exc:
-        raise did_exc
-        
 
 def ensure_process_aware_logger():
     global _process_aware

+ 2 - 8
celery/worker/__init__.py

@@ -165,20 +165,14 @@ class WorkController(object):
     def start(self):
         """Starts the workers main loop."""
         self._state = "RUN"
-        did_exc = None
 
         try:
             for component in self.components:
                 self.logger.debug("Starting thread %s..." % \
                         component.__class__.__name__)
                 component.start()
-        except Exception, e:
-            did_exc = e
-
-        self.stop()
-
-        if did_exc:
-            raise did_exc
+        finally:
+            self.stop()
 
     def process_task(self, wrapper):
         """Process task by sending it to the pool of workers."""

+ 2 - 9
celery/worker/heartbeat.py

@@ -46,17 +46,10 @@ class Heart(threading.Thread):
                 break
             sleep(1)
 
-        did_exc = None
-
         try:
             dispatch("worker-offline")
-        except Exception, e:
-            did_exc = e
-
-        self._stopped.set()
-
-        if did_exc:
-            raise did_exc
+        finally:
+            self._stopped.set()
 
     def stop(self):
         """Gracefully shutdown the thread."""

+ 2 - 8
celery/worker/job.py

@@ -123,16 +123,10 @@ class WorkerTaskTrace(TaskTrace):
 
 def execute_and_trace(task_name, *args, **kwargs):
     platform.set_mp_process_title("celeryd", info=task_name)
-    did_exc = None
     try:
         return WorkerTaskTrace(task_name, *args, **kwargs).execute_safe()
-    except Exception, e:
-        did_exc = e
-
-    platform.set_mp_process_title("celeryd")
-    
-    if did_exc:
-        raise did_exc
+    finally:
+        platform.set_mp_process_title("celeryd")
 
 
 class TaskWrapper(object):