Browse Source

Added support for expiration of AMQP results (requires RabbitMQ 2.1.0's support for the (signed-int)"x-expires" argument to queue_declare)

The new configuration option CELERY_AMQP_TASK_RESULT_EXPIRES sets the expiry
time in seconds (can be int or float)

    CELERY_AMQP_TASK_RESULT_EXPIRES = 30 * 60  # 30 mins
    CELERY_AMQP_TASK_RESULT_EXPIRES = 0.80     # 800 ms

Closes #194.
Ask Solem 14 years ago
parent
commit
86eb07feff
2 changed files with 8 additions and 6 deletions
  1. 6 6
      celery/backends/amqp.py
  2. 2 0
      celery/conf.py

+ 6 - 6
celery/backends/amqp.py

@@ -40,11 +40,8 @@ class ResultConsumer(Consumer):
     no_ack = True
     no_ack = True
     auto_delete = False
     auto_delete = False
 
 
-    def __init__(self, connection, task_id, expires=None, **kwargs):
+    def __init__(self, connection, task_id, **kwargs):
         routing_key = task_id.replace("-", "")
         routing_key = task_id.replace("-", "")
-        if expires is not None:
-            pass
-            #self.queue_arguments = {"x-expires": expires}
         super(ResultConsumer, self).__init__(connection,
         super(ResultConsumer, self).__init__(connection,
                 queue=routing_key, routing_key=routing_key, **kwargs)
                 queue=routing_key, routing_key=routing_key, **kwargs)
 
 
@@ -65,6 +62,7 @@ class AMQPBackend(BaseDictBackend):
             persistent=None, serializer=None, auto_delete=None,
             persistent=None, serializer=None, auto_delete=None,
             expires=None, **kwargs):
             expires=None, **kwargs):
         self._connection = connection
         self._connection = connection
+        self.queue_arguments = {}
         self.exchange = exchange
         self.exchange = exchange
         self.exchange_type = exchange_type
         self.exchange_type = exchange_type
         self.persistent = persistent
         self.persistent = persistent
@@ -72,11 +70,13 @@ class AMQPBackend(BaseDictBackend):
         self.auto_delete = auto_delete
         self.auto_delete = auto_delete
         self.expires = expires
         self.expires = expires
         if self.expires is None:
         if self.expires is None:
-            self.expires = conf.TASK_RESULT_EXPIRES
+            self.expires = conf.AMQP_TASK_RESULT_EXPIRES
         if isinstance(self.expires, timedelta):
         if isinstance(self.expires, timedelta):
             self.expires = timeutils.timedelta_seconds(self.expires)
             self.expires = timeutils.timedelta_seconds(self.expires)
         if self.expires is not None:
         if self.expires is not None:
             self.expires = int(self.expires)
             self.expires = int(self.expires)
+            # WARNING: Requries RabbitMQ 2.1.0 or higher.
+            self.queue_arguments["x-expires"] = self.expires
         super(AMQPBackend, self).__init__(**kwargs)
         super(AMQPBackend, self).__init__(**kwargs)
 
 
     def _create_publisher(self, task_id, connection):
     def _create_publisher(self, task_id, connection):
@@ -98,7 +98,7 @@ class AMQPBackend(BaseDictBackend):
                               exchange_type=self.exchange_type,
                               exchange_type=self.exchange_type,
                               durable=self.persistent,
                               durable=self.persistent,
                               auto_delete=self.auto_delete,
                               auto_delete=self.auto_delete,
-                              expires=self.expires)
+                              queue_arguments=self.queue_arguments)
 
 
     def store_result(self, task_id, result, status, traceback=None,
     def store_result(self, task_id, result, status, traceback=None,
             max_retries=20, retry_delay=0.2):
             max_retries=20, retry_delay=0.2):

+ 2 - 0
celery/conf.py

@@ -31,6 +31,7 @@ _DEFAULTS = {
     "CELERY_ALWAYS_EAGER": False,
     "CELERY_ALWAYS_EAGER": False,
     "CELERY_EAGER_PROPAGATES_EXCEPTIONS": False,
     "CELERY_EAGER_PROPAGATES_EXCEPTIONS": False,
     "CELERY_TASK_RESULT_EXPIRES": timedelta(days=1),
     "CELERY_TASK_RESULT_EXPIRES": timedelta(days=1),
+    "CELERY_AMQP_TASK_RESULT_EXPIRES": None,
     "CELERY_SEND_EVENTS": False,
     "CELERY_SEND_EVENTS": False,
     "CELERY_IGNORE_RESULT": False,
     "CELERY_IGNORE_RESULT": False,
     "CELERY_STORE_ERRORS_EVEN_IF_IGNORED": False,
     "CELERY_STORE_ERRORS_EVEN_IF_IGNORED": False,
@@ -128,6 +129,7 @@ def prepare(m, source=settings, defaults=_DEFAULTS):
     m.CACHE_BACKEND_OPTIONS = _get("CELERY_CACHE_BACKEND_OPTIONS") or {}
     m.CACHE_BACKEND_OPTIONS = _get("CELERY_CACHE_BACKEND_OPTIONS") or {}
     m.TASK_SERIALIZER = _get("CELERY_TASK_SERIALIZER")
     m.TASK_SERIALIZER = _get("CELERY_TASK_SERIALIZER")
     m.TASK_RESULT_EXPIRES = _get("CELERY_TASK_RESULT_EXPIRES")
     m.TASK_RESULT_EXPIRES = _get("CELERY_TASK_RESULT_EXPIRES")
+    m.AMQP_TASK_RESULT_EXPIRES = _get("CELERY_AMQP_TASK_RESULT_EXPIRES")
     m.IGNORE_RESULT = _get("CELERY_IGNORE_RESULT")
     m.IGNORE_RESULT = _get("CELERY_IGNORE_RESULT")
     m.TRACK_STARTED = _get("CELERY_TRACK_STARTED")
     m.TRACK_STARTED = _get("CELERY_TRACK_STARTED")
     m.ACKS_LATE = _get("CELERY_ACKS_LATE")
     m.ACKS_LATE = _get("CELERY_ACKS_LATE")