فهرست منبع

Fix retried tasks with expirations (#3790)

* Fix retried tasks with expirations

* Use string_t instead of basestring for python 3
Brendan MacDonell 8 سال پیش
والد
کامیت
6c0abdbc0e
3فایلهای تغییر یافته به همراه17 افزوده شده و 2 حذف شده
  1. 3 1
      celery/app/amqp.py
  2. 8 0
      t/integration/tasks.py
  3. 6 1
      t/integration/test_tasks.py

+ 3 - 1
celery/app/amqp.py

@@ -331,7 +331,9 @@ class AMQP(object):
                 now + timedelta(seconds=expires), tz=timezone,
             )
         eta = eta and eta.isoformat()
-        expires = expires and expires.isoformat()
+        # If we retry a task `expires` will already be ISO8601-formatted.
+        if not isinstance(expires, string_t):
+            expires = expires and expires.isoformat()
 
         if argsrepr is None:
             argsrepr = saferepr(args, self.argsrepr_maxsize)

+ 8 - 0
t/integration/tasks.py

@@ -55,3 +55,11 @@ def collect_ids(self, res, i):
 
     """
     return res, (self.request.root_id, self.request.parent_id, i)
+
+
+@shared_task(bind=True, expires=60.0, max_retries=1)
+def retry_once(self):
+    """Task that fails and is retried. Returns the number of retries."""
+    if self.request.retries:
+        return self.request.retries
+    raise self.retry(countdown=0.1)

+ 6 - 1
t/integration/test_tasks.py

@@ -1,7 +1,7 @@
 from __future__ import absolute_import, unicode_literals
 from celery import group
 from .conftest import flaky
-from .tasks import print_unicode, sleeping
+from .tasks import print_unicode, retry_once, sleeping
 
 
 class test_tasks:
@@ -12,6 +12,11 @@ class test_tasks:
         sleeping.delay(sleep)
         manager.assert_accepted([r1.id])
 
+    @flaky
+    def test_task_retried(self):
+        res = retry_once.delay()
+        assert res.get(timeout=10) == 1  # retried once
+
     @flaky
     def test_unicode_task(self, manager):
         manager.join(