|  | @@ -11,11 +11,14 @@ from celery.worker.job import TaskWrapper
 | 
	
		
			
				|  |  |  from celery.registry import NotRegistered
 | 
	
		
			
				|  |  |  from celery.messaging import get_consumer_set
 | 
	
		
			
				|  |  |  from celery.conf import DAEMON_CONCURRENCY, DAEMON_LOG_FILE
 | 
	
		
			
				|  |  | +from celery.conf import AMQP_CONNECTION_RETRY, AMQP_CONNECTION_MAX_RETRIES
 | 
	
		
			
				|  |  |  from celery.log import setup_logger
 | 
	
		
			
				|  |  |  from celery.pool import TaskPool
 | 
	
		
			
				|  |  | +from celery.utils import retry_over_time
 | 
	
		
			
				|  |  |  from Queue import Queue
 | 
	
		
			
				|  |  |  import traceback
 | 
	
		
			
				|  |  |  import logging
 | 
	
		
			
				|  |  | +import socket
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  class AMQPListener(object):
 | 
	
	
		
			
				|  | @@ -48,8 +51,25 @@ class AMQPListener(object):
 | 
	
		
			
				|  |  |          self.logger = logger
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      def start(self):
 | 
	
		
			
				|  |  | -        """Start processing AMQP messages."""
 | 
	
		
			
				|  |  | -        task_consumer = self.reset_connection()
 | 
	
		
			
				|  |  | +        """Start the consumer.
 | 
	
		
			
				|  |  | +        
 | 
	
		
			
				|  |  | +        If the connection is lost, it tries to re-establish the connection
 | 
	
		
			
				|  |  | +        over time and restart consuming messages.
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        """
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        while True:
 | 
	
		
			
				|  |  | +            self.reset_connection()
 | 
	
		
			
				|  |  | +            try:
 | 
	
		
			
				|  |  | +                self.consume_messages()
 | 
	
		
			
				|  |  | +            except (socket.error,
 | 
	
		
			
				|  |  | +                    self.amqp_connection.AMQPConnectionException):
 | 
	
		
			
				|  |  | +                self.logger.error("AMQPListener: Connection to broker lost. "
 | 
	
		
			
				|  |  | +                                + "Trying to re-establish connection...")
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    def consume_messages(self):
 | 
	
		
			
				|  |  | +        """Consume messages forever (or until an exception is raised)."""
 | 
	
		
			
				|  |  | +        task_consumer = self.task_consumer
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          self.logger.debug("AMQPListener: Starting message consumer...")
 | 
	
		
			
				|  |  |          it = task_consumer.iterconsume(limit=None)
 | 
	
	
		
			
				|  | @@ -58,7 +78,7 @@ class AMQPListener(object):
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          while True:
 | 
	
		
			
				|  |  |              it.next()
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | +        
 | 
	
		
			
				|  |  |      def stop(self):
 | 
	
		
			
				|  |  |          """Stop processing AMQP messages and close the connection
 | 
	
		
			
				|  |  |          to the broker."""
 | 
	
	
		
			
				|  | @@ -109,10 +129,36 @@ class AMQPListener(object):
 | 
	
		
			
				|  |  |          self.logger.debug(
 | 
	
		
			
				|  |  |                  "AMQPListener: Re-establishing connection to the broker...")
 | 
	
		
			
				|  |  |          self.close_connection()
 | 
	
		
			
				|  |  | -        self.amqp_connection = DjangoAMQPConnection()
 | 
	
		
			
				|  |  | +        self.amqp_connection = self._open_connection()
 | 
	
		
			
				|  |  |          self.task_consumer = get_consumer_set(connection=self.amqp_connection)
 | 
	
		
			
				|  |  |          self.task_consumer.register_callback(self.receive_message)
 | 
	
		
			
				|  |  | -        return self.task_consumer
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    def _open_connection(self):
 | 
	
		
			
				|  |  | +        """Retries connecting to the AMQP broker over time.
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        See :func:`carrot.utils.retry_over_time`.
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        """
 | 
	
		
			
				|  |  | +        
 | 
	
		
			
				|  |  | +        def _connection_error_handler(exc, interval):
 | 
	
		
			
				|  |  | +            """Callback handler for connection errors."""
 | 
	
		
			
				|  |  | +            self.logger.error("AMQP Listener: Connection Error: %s. " % exc
 | 
	
		
			
				|  |  | +                     + "Trying again in %d seconds..." % interval)
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        def _establish_connection():
 | 
	
		
			
				|  |  | +            """Establish a connection to the AMQP broker."""
 | 
	
		
			
				|  |  | +            conn = DjangoAMQPConnection()
 | 
	
		
			
				|  |  | +            connected = conn.connection # Connection is established lazily.
 | 
	
		
			
				|  |  | +            return conn
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        if not AMQP_CONNECTION_RETRY:
 | 
	
		
			
				|  |  | +            return _establish_connection()
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        conn = retry_over_time(_establish_connection, socket.error,
 | 
	
		
			
				|  |  | +                               errback=_connection_error_handler,
 | 
	
		
			
				|  |  | +                               max_retries=AMQP_CONNECTION_MAX_RETRIES)
 | 
	
		
			
				|  |  | +        self.logger.debug("AMQPListener: Connection Established.")
 | 
	
		
			
				|  |  | +        return conn
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  class WorkController(object):
 | 
	
	
		
			
				|  | @@ -233,7 +279,7 @@ class WorkController(object):
 | 
	
		
			
				|  |  |              except Exception, exc:
 | 
	
		
			
				|  |  |                  self.logger.critical("Internal error %s: %s\n%s" % (
 | 
	
		
			
				|  |  |                                  exc.__class__, exc, traceback.format_exc()))
 | 
	
		
			
				|  |  | -        except (SystemExit, KeyboardInterrupt):
 | 
	
		
			
				|  |  | +        except:
 | 
	
		
			
				|  |  |              self.stop()
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      def process_task(self, task):
 |