Sfoglia il codice sorgente

celeryd now survives a restart of the AMQP server! Automatically re-establish AMQP broker connection if its lost. New settings:
AMQP_CONNECTION_RETRY + AMQP_CONNECTION_MAX_RETRIES. This requires the master
branch of carrot.

Ask Solem 16 anni fa
parent
commit
584204ced6
4 ha cambiato i file con 146 aggiunte e 7 eliminazioni
  1. 30 0
      celery/conf.py
  2. 56 0
      celery/utils.py
  3. 52 6
      celery/worker/__init__.py
  4. 8 1
      celery/worker/controllers.py

+ 30 - 0
celery/conf.py

@@ -18,6 +18,8 @@ DEFAULT_STATISTICS = False
 DEFAULT_STATISTICS_COLLECT_INTERVAL = 60 * 5
 DEFAULT_ALWAYS_EAGER = False
 DEFAULT_TASK_RESULT_EXPIRES = timedelta(days=5)
+DEFAULT_AMQP_CONNECTION_RETRY = True
+DEFAULT_AMQP_CONNECTION_MAX_RETRIES = 100
 
 """
 .. data:: LOG_LEVELS
@@ -211,3 +213,31 @@ TASK_RESULT_EXPIRES = getattr(settings, "CELERY_TASK_RESULT_EXPIRES",
 # Make sure TASK_RESULT_EXPIRES is a timedelta.
 if isinstance(TASK_RESULT_EXPIRES, int):
     TASK_RESULT_EXPIRES = timedelta(seconds=TASK_RESULT_EXPIRES)
+
+"""
+.. data:: AMQP_CONNECTION_RETRY
+
+Automatically try to re-establish the connection to the AMQP broker if
+it's lost. The time between retries is increased for each retry, and is
+not exhausted before :data:`AMQP_CONNECTION_MAX_RETRIES` is exceeded.
+
+On by default.
+
+"""
+AMQP_CONNECTION_RETRY = getattr(settings, "AMQP_CONNECTION_RETRY",
+                                DEFAULT_AMQP_CONNECTION_RETRY)
+
+"""
+.. data:: AMQP_CONNECTION_MAX_RETRIES
+
+Maximum number of retries before we give up re-establishing a connection
+to the AMQP broker.
+
+If this is set to ``0`` or ``None``, we will retry forever.
+
+Default is ``100`` retries.
+
+"""
+AMQP_CONNECTION_MAX_RETRIES = getattr(settings,
+                                      "AMQP_CONNECTION_MAX_RETRIES",
+                                      DEFAULT_AMQP_CONNECTION_MAX_RETRIES)

+ 56 - 0
celery/utils.py

@@ -4,6 +4,10 @@ Utility functions
 
 """
 import uuid
+import time
+from itertools import repeat
+
+noop = lambda *args, **kwargs: None
 
 
 def chunks(it, n):
@@ -47,5 +51,57 @@ def mitemgetter(*keys):
 
 
 def get_full_cls_name(cls):
+    """With a class, get its full module and class name."""
     return ".".join([cls.__module__,
                      cls.__name__])
+
+def repeatlast(it):
+    """Iterate over all elements in the iterator, and when its exhausted
+    yield the last value infinitely."""
+    for item in it:
+        yield item
+    for item in repeat(item):
+        yield item
+
+
+def retry_over_time(fun, catch, args=[], kwargs={}, errback=noop,
+        max_retries=None, interval_start=2, interval_step=2, interval_max=30):
+    """Retry the function over and over until max retries is exceeded.
+
+    For each retry we sleep a for a while before we try again, this interval
+    is increased for every retry until the max seconds is reached.
+
+    :param fun: The function to try
+    :param catch: Exceptions to catch, can be either tuple or a single
+        exception class.
+    :keyword args: Positional arguments passed on to the function.
+    :keyword kwargs: Keyword arguments passed on to the function.
+    :keyword errback: Callback for when an exception in ``catch`` is raised.
+        The callback must take two arguments: ``exc`` and ``interval``, where
+        ``exc`` is the exception instance, and ``interval`` is the time in
+        seconds to sleep next..
+    :keyword max_retries: Maximum number of retries before we give up.
+        If this is not set, we will retry forever.
+    :keyword interval_start: How long (in seconds) we start sleeping between
+        retries.
+    :keyword interval_step: By how much the interval is increased for each
+        retry.
+    :keyword interval_max: Maximum number of seconds to sleep between retries.
+
+    """
+    retries = 0
+    interval_range = xrange(interval_start,
+                            interval_max + interval_start,
+                            interval_step)
+
+    for interval in repeatlast(interval_range):
+        try:
+            retval = fun(*args, **kwargs)
+        except catch, exc:
+            if max_retries and retries > max_retries:
+                raise
+            errback(exc, interval)
+            retries += 1
+            time.sleep(interval)
+        else:
+            return retval

+ 52 - 6
celery/worker/__init__.py

@@ -11,11 +11,14 @@ from celery.worker.job import TaskWrapper
 from celery.registry import NotRegistered
 from celery.messaging import get_consumer_set
 from celery.conf import DAEMON_CONCURRENCY, DAEMON_LOG_FILE
+from celery.conf import AMQP_CONNECTION_RETRY, AMQP_CONNECTION_MAX_RETRIES
 from celery.log import setup_logger
 from celery.pool import TaskPool
+from celery.utils import retry_over_time
 from Queue import Queue
 import traceback
 import logging
+import socket
 
 
 class AMQPListener(object):
@@ -48,8 +51,25 @@ class AMQPListener(object):
         self.logger = logger
 
     def start(self):
-        """Start processing AMQP messages."""
-        task_consumer = self.reset_connection()
+        """Start the consumer.
+        
+        If the connection is lost, it tries to re-establish the connection
+        over time and restart consuming messages.
+
+        """
+
+        while True:
+            self.reset_connection()
+            try:
+                self.consume_messages()
+            except (socket.error,
+                    self.amqp_connection.AMQPConnectionException):
+                self.logger.error("AMQPListener: Connection to broker lost. "
+                                + "Trying to re-establish connection...")
+
+    def consume_messages(self):
+        """Consume messages forever (or until an exception is raised)."""
+        task_consumer = self.task_consumer
 
         self.logger.debug("AMQPListener: Starting message consumer...")
         it = task_consumer.iterconsume(limit=None)
@@ -58,7 +78,7 @@ class AMQPListener(object):
 
         while True:
             it.next()
-
+        
     def stop(self):
         """Stop processing AMQP messages and close the connection
         to the broker."""
@@ -109,10 +129,36 @@ class AMQPListener(object):
         self.logger.debug(
                 "AMQPListener: Re-establishing connection to the broker...")
         self.close_connection()
-        self.amqp_connection = DjangoAMQPConnection()
+        self.amqp_connection = self._open_connection()
         self.task_consumer = get_consumer_set(connection=self.amqp_connection)
         self.task_consumer.register_callback(self.receive_message)
-        return self.task_consumer
+
+    def _open_connection(self):
+        """Retries connecting to the AMQP broker over time.
+
+        See :func:`carrot.utils.retry_over_time`.
+
+        """
+        
+        def _connection_error_handler(exc, interval):
+            """Callback handler for connection errors."""
+            self.logger.error("AMQP Listener: Connection Error: %s. " % exc
+                     + "Trying again in %d seconds..." % interval)
+
+        def _establish_connection():
+            """Establish a connection to the AMQP broker."""
+            conn = DjangoAMQPConnection()
+            connected = conn.connection # Connection is established lazily.
+            return conn
+
+        if not AMQP_CONNECTION_RETRY:
+            return _establish_connection()
+
+        conn = retry_over_time(_establish_connection, socket.error,
+                               errback=_connection_error_handler,
+                               max_retries=AMQP_CONNECTION_MAX_RETRIES)
+        self.logger.debug("AMQPListener: Connection Established.")
+        return conn
 
 
 class WorkController(object):
@@ -233,7 +279,7 @@ class WorkController(object):
             except Exception, exc:
                 self.logger.critical("Internal error %s: %s\n%s" % (
                                 exc.__class__, exc, traceback.format_exc()))
-        except (SystemExit, KeyboardInterrupt):
+        except:
             self.stop()
 
     def process_task(self, task):

+ 8 - 1
celery/worker/controllers.py

@@ -7,7 +7,9 @@ from celery.backends import default_periodic_status_backend
 from Queue import Empty as QueueEmpty
 from datetime import datetime
 from multiprocessing import get_logger
+import traceback
 import threading
+import socket
 import time
 
 
@@ -120,7 +122,12 @@ class PeriodicWorkController(BackgroundThread):
     def on_iteration(self):
         logger = get_logger()
         logger.debug("PeriodicWorkController: Running periodic tasks...")
-        self.run_periodic_tasks()
+        try:
+            self.run_periodic_tasks()
+        except Exception, exc:
+            logger.error(
+                "PeriodicWorkController got exception: %s\n%s" % (
+                    exc, traceback.format_exc()))
         logger.debug("PeriodicWorkController: Processing hold queue...")
         self.process_hold_queue()
         logger.debug("PeriodicWorkController: Going to sleep...")