Bläddra i källkod

AMQP Backend: Support the x-expires argument for queues (using the CELERY_AMQP_TASK_RESULT_EXPIRES setting).

This must be an integer describing the time which after the result queue
should be deleted if unused.
Ask Solem 14 år sedan
förälder
incheckning
6d7d4b4453
2 ändrade filer med 7 tillägg och 6 borttagningar
  1. 1 0
      celery/app/defaults.py
  2. 6 6
      celery/backends/amqp.py

+ 1 - 0
celery/app/defaults.py

@@ -88,6 +88,7 @@ NAMESPACES = {
         "SEND_TASK_ERROR_EMAILS": Option(False, type="bool"),
         "STORE_ERRORS_EVEN_IF_IGNORED": Option(False, type="bool"),
         "TASK_RESULT_EXPIRES": Option(timedelta(days=1), type="int"),
+        "AMQP_TASK_RESULT_EXPIRES": Option(type="int"),
         "TASK_ERROR_WHITELIST": Option((), type="tuple"),
         "TASK_SERIALIZER": Option("pickle"),
         "TRACK_STARTED": Option(False, type="bool"),

+ 6 - 6
celery/backends/amqp.py

@@ -30,11 +30,8 @@ class ResultConsumer(Consumer):
     no_ack = True
     auto_delete = False
 
-    def __init__(self, connection, task_id, expires=None, **kwargs):
+    def __init__(self, connection, task_id, **kwargs):
         routing_key = task_id.replace("-", "")
-        if expires is not None:
-            pass
-            #self.queue_arguments = {"x-expires": expires}
         super(ResultConsumer, self).__init__(connection,
                 queue=routing_key, routing_key=routing_key, **kwargs)
 
@@ -57,6 +54,7 @@ class AMQPBackend(BaseDictBackend):
         super(AMQPBackend, self).__init__(**kwargs)
         conf = self.app.conf
         self._connection = connection
+        self.queue_arguments = {}
         self.exchange = exchange or conf.CELERY_RESULT_EXCHANGE
         self.exchange_type = exchange_type or conf.CELERY_RESULT_EXCHANGE_TYPE
         if persistent is None:
@@ -66,11 +64,13 @@ class AMQPBackend(BaseDictBackend):
         self.auto_delete = auto_delete
         self.expires = expires
         if self.expires is None:
-            self.expires = conf.CELERY_TASK_RESULT_EXPIRES
+            self.expires = conf.CELERY_AMQP_TASK_RESULT_EXPIRES
         if isinstance(self.expires, timedelta):
             self.expires = timeutils.timedelta_seconds(self.expires)
         if self.expires is not None:
             self.expires = int(self.expires)
+            # WARNING: Requires RabbitMQ 2.1.0 or higher.
+            self.queue_arguments["x-expires"] = self.expires
 
     def _create_publisher(self, task_id, connection):
         delivery_mode = self.persistent and 2 or 1
@@ -92,7 +92,7 @@ class AMQPBackend(BaseDictBackend):
                               exchange_type=self.exchange_type,
                               durable=self.persistent,
                               auto_delete=self.auto_delete,
-                              expires=self.expires)
+                              queue_arguments=self.queue_arguments)
 
     def store_result(self, task_id, result, status, traceback=None,
             max_retries=20, retry_delay=0.2):