|
@@ -503,7 +503,12 @@ class BaseTask(object):
|
|
|
if delivery_info:
|
|
|
options.setdefault("exchange", delivery_info.get("exchange"))
|
|
|
options.setdefault("routing_key", delivery_info.get("routing_key"))
|
|
|
- countdown = options.setdefault("countdown", self.default_retry_delay)
|
|
|
+
|
|
|
+ if options.has_key("eta"):
|
|
|
+ eta = options["eta"]
|
|
|
+ else:
|
|
|
+ countdown = options.setdefault("countdown", self.default_retry_delay)
|
|
|
+
|
|
|
options.update({"retries": request.retries + 1,
|
|
|
"task_id": request.id})
|
|
|
|
|
@@ -519,7 +524,10 @@ class BaseTask(object):
|
|
|
|
|
|
self.apply_async(args=args, kwargs=kwargs, **options)
|
|
|
if throw:
|
|
|
- raise RetryTaskError("Retry in %d seconds" % (countdown, ), exc)
|
|
|
+ if options.has_key("eta"):
|
|
|
+ raise RetryTaskError("Retry at %s" % str(eta), exc)
|
|
|
+ else:
|
|
|
+ raise RetryTaskError("Retry in %d seconds" % (countdown, ), exc)
|
|
|
|
|
|
@classmethod
|
|
|
def apply(self, args=None, kwargs=None, **options):
|