|
@@ -480,6 +480,22 @@ class Task(object):
|
|
|
parent.request.children.append(result)
|
|
|
return result
|
|
|
|
|
|
+ def subtask_from_request(self, request=None, args=None, kwargs=None,
|
|
|
+ **extra_options):
|
|
|
+
|
|
|
+ request = self.request if request is None else request
|
|
|
+ args = request.args if args is None else args
|
|
|
+ kwargs = request.kwargs if kwargs is None else kwargs
|
|
|
+ delivery_info = request.delivery_info or {}
|
|
|
+ options = {
|
|
|
+ 'task_id': request.id,
|
|
|
+ 'link': request.callbacks,
|
|
|
+ 'link_error': request.errbacks,
|
|
|
+ 'exchange': delivery_info.get('exchange'),
|
|
|
+ 'routing_key': delivery_info.get('routing_key')
|
|
|
+ }
|
|
|
+ return self.subtask(args, kwargs, options, **extra_options)
|
|
|
+
|
|
|
def retry(self, args=None, kwargs=None, exc=None, throw=True,
|
|
|
eta=None, countdown=None, max_retries=None, **options):
|
|
|
"""Retry the task.
|
|
@@ -526,44 +542,31 @@ class Task(object):
|
|
|
|
|
|
"""
|
|
|
request = self.request
|
|
|
+ retries = request.retries + 1
|
|
|
max_retries = self.max_retries if max_retries is None else max_retries
|
|
|
- args = request.args if args is None else args
|
|
|
- kwargs = request.kwargs if kwargs is None else kwargs
|
|
|
- delivery_info = request.delivery_info
|
|
|
|
|
|
# Not in worker or emulated by (apply/always_eager),
|
|
|
# so just raise the original exception.
|
|
|
if request.called_directly:
|
|
|
- maybe_reraise()
|
|
|
+ maybe_reraise() # raise orig stack if PyErr_Occurred
|
|
|
raise exc or RetryTaskError('Task can be retried', None)
|
|
|
|
|
|
- if delivery_info:
|
|
|
- options.setdefault('exchange', delivery_info.get('exchange'))
|
|
|
- options.setdefault('routing_key', delivery_info.get('routing_key'))
|
|
|
-
|
|
|
if not eta and countdown is None:
|
|
|
countdown = self.default_retry_delay
|
|
|
|
|
|
- options.update({'retries': request.retries + 1,
|
|
|
- 'task_id': request.id,
|
|
|
- 'countdown': countdown,
|
|
|
- 'eta': eta,
|
|
|
- 'link': request.callbacks,
|
|
|
- 'link_error': request.errbacks})
|
|
|
+ S = self.subtask_from_request(request, args, kwargs,
|
|
|
+ countdown=countdown, eta=eta, retries=retries)
|
|
|
|
|
|
- if max_retries is not None and options['retries'] > max_retries:
|
|
|
+ if max_retries is not None and retries > max_retries:
|
|
|
if exc:
|
|
|
maybe_reraise()
|
|
|
raise self.MaxRetriesExceededError(
|
|
|
"""Can't retry %s[%s] args:%s kwargs:%s""" % (
|
|
|
- self.name, options['task_id'], args, kwargs))
|
|
|
+ self.name, request.id, S.args, S.kwargs))
|
|
|
|
|
|
# If task was executed eagerly using apply(),
|
|
|
# then the retry must also be executed eagerly.
|
|
|
- if request.is_eager:
|
|
|
- self.apply(args=args, kwargs=kwargs, **options).get()
|
|
|
- else:
|
|
|
- self.apply_async(args=args, kwargs=kwargs, **options)
|
|
|
+ S.apply() if request.is_eager else S.apply_async()
|
|
|
ret = RetryTaskError(exc=exc, when=eta or countdown)
|
|
|
if throw:
|
|
|
raise ret
|