Browse Source

celerybeat now retries establishing the connection. Closes #419. Thanks to dannyadair

Ask Solem 14 years ago
parent
commit
8ac05a37e2
2 changed files with 15 additions and 5 deletions
  1. 11 1
      celery/beat.py
  2. 4 4
      celery/worker/consumer.py

+ 11 - 1
celery/beat.py

@@ -276,13 +276,23 @@ class Scheduler(object):
     def set_schedule(self, schedule):
         self.data = schedule
 
+    def _ensure_connected(self):
+        # callback called for each retry while the connection
+        # can't be established.
+        def _error_handler(exc, interval):
+            self.logger.error("Celerybeat: Connection error: %s. " % exc
+                            + "Trying again in %s seconds..." % interval)
+
+        return self.connection.ensure_connection(_error_handler,
+                    self.app.conf.BROKER_CONNECTION_MAX_RETRIES)
+
     @cached_property
     def connection(self):
         return self.app.broker_connection()
 
     @cached_property
     def publisher(self):
-        return self.Publisher(connection=self.connection)
+        return self.Publisher(connection=self._ensure_connected())
 
     @property
     def schedule(self):

+ 4 - 4
celery/worker/consumer.py

@@ -580,9 +580,9 @@ class Consumer(object):
 
         """
 
-        def _connection_error_handler(exc, interval):
-            # Callback called for each retry when the connection
-            # can't be established.
+        # Callback called for each retry while the connection
+        # can't be established.
+        def __error_handler(exc, interval):
             self.logger.error("Consumer: Connection Error: %s. " % exc
                             + "Trying again in %d seconds..." % interval)
 
@@ -594,7 +594,7 @@ class Consumer(object):
             conn.connect()
             return conn
 
-        return conn.ensure_connection(_connection_error_handler,
+        return conn.ensure_connection(_error_handler,
                     self.app.conf.BROKER_CONNECTION_MAX_RETRIES)
 
     def stop(self):