Browse Source

Set default AMQP connection timeout to 4. Also, now configurable via the
CELERY_AMQP_CONNECTION_TIMEOUT setting.

Ask Solem 16 years ago
parent
commit
d4dd459a56
3 changed files with 23 additions and 8 deletions
  1. 1 1
      celery/backends/base.py
  2. 10 0
      celery/conf.py
  3. 12 7
      celery/task.py

+ 1 - 1
celery/backends/base.py

@@ -175,7 +175,7 @@ class BaseBackend(object):
                 return self.get_result(task_id)
             elif status == "FAILURE":
                 raise self.get_result(task_id)
-            time.sleep(0.5) # avoid hammering the CPU checking status.
+            time.sleep(0.05) # avoid hammering the CPU checking status.
             timeout_timer.tick()
 
     def process_cleanup(self):

+ 10 - 0
celery/conf.py

@@ -12,6 +12,7 @@ DEFAULT_DAEMON_PID_FILE = "celeryd.pid"
 DEFAULT_LOG_FMT = '[%(asctime)s: %(levelname)s/%(processName)s] %(message)s'
 DEFAULT_DAEMON_LOG_LEVEL = "INFO"
 DEFAULT_DAEMON_LOG_FILE = "celeryd.log"
+DEFAULT_AMQP_CONNECTION_TIMEOUT = 4
 
 """
 .. data:: LOG_LEVELS
@@ -131,6 +132,15 @@ AMQP_CONSUMER_QUEUE = getattr(settings, "CELERY_AMQP_CONSUMER_QUEUE",
                               DEFAULT_AMQP_CONSUMER_QUEUE)
 
 """
+.. data:: AMQP_CONNECTION_TIMEOUT
+
+    The timeout in seconds before we give up establishing a connection
+    to the AMQP server.
+
+"""
+AMQP_CONNECTION_TIMEOUT = getattr(settings, "CELERY_AMQP_CONNECTION_TIMEOUT",
+                                  DEFAULT_AMQP_CONNECTION_TIMEOUT)
+"""
 .. data:: SEND_CELERY_TASK_ERROR_EMAILS
 
     If set to ``True``, errors in tasks will be sent to admins by e-mail.

+ 12 - 7
celery/task.py

@@ -4,6 +4,7 @@ Working with tasks and task sets.
 
 """
 from carrot.connection import DjangoAMQPConnection
+from celery.conf import AMQP_CONNECTION_TIMEOUT
 from celery.messaging import TaskPublisher, TaskConsumer
 from celery.log import setup_logger
 from celery.registry import tasks
@@ -16,7 +17,7 @@ import pickle
 
 def apply_async(task, args=None, kwargs=None, routing_key=None,
         immediate=None, mandatory=None, connection=None,
-        connect_timeout=None, priority=None):
+        connect_timeout=AMQP_CONNECTION_TIMEOUT, priority=None):
     """Run a task asynchronously by the celery daemon(s).
 
     :param task: The task to run (a callable object, or a :class:`Task`
@@ -100,7 +101,7 @@ def delay_task(task_name, *args, **kwargs):
     return apply_async(task, args, kwargs)
 
 
-def discard_all():
+def discard_all(connect_timeout=AMQP_CONNECTION_TIMEOUT):
     """Discard all waiting tasks.
 
     This will ignore all tasks waiting for execution, and they will
@@ -111,7 +112,7 @@ def discard_all():
     :rtype: int
 
     """
-    amqp_connection = DjangoAMQPConnection()
+    amqp_connection = DjangoAMQPConnection(connect_timeout=connect_timeout)
     consumer = TaskConsumer(connection=amqp_connection)
     discarded_count = consumer.discard_all()
     amqp_connection.close()
@@ -231,7 +232,8 @@ class Task(object):
             >>> publisher.connection.close()
 
         """
-        return TaskPublisher(connection=DjangoAMQPConnection())
+        return TaskPublisher(connection=DjangoAMQPConnection(
+                                connect_timeout=AMQP_CONNECTION_TIMEOUT))
 
     def get_consumer(self):
         """Get a celery task message consumer.
@@ -246,7 +248,8 @@ class Task(object):
             >>> consumer.connection.close()
 
         """
-        return TaskConsumer(connection=DjangoAMQPConnection())
+        return TaskConsumer(connection=DjangoAMQPConnection(
+                                connect_timeout=AMQP_CONNECTION_TIMEOUT))
 
     @classmethod
     def delay(cls, *args, **kwargs):
@@ -326,7 +329,7 @@ class TaskSet(object):
         self.arguments = args
         self.total = len(args)
 
-    def run(self):
+    def run(self, connect_timeout=AMQP_CONNECTION_TIMEOUT):
         """Run all tasks in the taskset.
 
         :returns: A :class:`celery.result.TaskSetResult` instance.
@@ -357,7 +360,7 @@ class TaskSet(object):
 
         """
         taskset_id = str(uuid.uuid4())
-        conn = DjangoAMQPConnection()
+        conn = DjangoAMQPConnection(connect_timeout=connect_timeout)
         publisher = TaskPublisher(connection=conn)
         subtask_ids = [publisher.delay_task_in_set(task_name=self.task_name,
                                                    taskset_id=taskset_id,
@@ -573,3 +576,5 @@ class DeleteExpiredTaskMetaTask(PeriodicTask):
         logger.info("Deleting expired task meta objects...")
         default_backend.cleanup()
 tasks.register(DeleteExpiredTaskMetaTask)
+
+