|
@@ -15,6 +15,7 @@ import warnings
|
|
|
from collections import defaultdict, deque
|
|
|
from copy import deepcopy
|
|
|
from operator import attrgetter
|
|
|
+from functools import wraps
|
|
|
|
|
|
from amqp import promise
|
|
|
from billiard.util import register_after_fork
|
|
@@ -38,7 +39,6 @@ from celery.utils.dispatch import Signal
|
|
|
from celery.utils.functional import first, maybe_list
|
|
|
from celery.utils.imports import instantiate, symbol_by_name
|
|
|
from celery.utils.objects import FallbackContext, mro_lookup
|
|
|
-from celery.contrib.autoretry import autoretry
|
|
|
|
|
|
from .annotations import prepare as prepare_annotations
|
|
|
from .defaults import DEFAULTS, find_deprecated_settings
|
|
@@ -284,11 +284,19 @@ class Celery(object):
|
|
|
self._tasks[task.name] = task
|
|
|
task.bind(self) # connects task to this app
|
|
|
|
|
|
- autoretry_on = options.get('autoretry_on')
|
|
|
- retry_kwargs = options.get('retry_kwargs')
|
|
|
+ autoretry_for = tuple(options.get('autoretry_for', ()))
|
|
|
+ retry_kwargs = options.get('retry_kwargs', {})
|
|
|
|
|
|
- if autoretry_on:
|
|
|
- task = autoretry(autoretry_on, retry_kwargs)(task)
|
|
|
+ if autoretry_for and not hasattr(task, '_orig_run'):
|
|
|
+
|
|
|
+ @wraps(task.run)
|
|
|
+ def run(*args, **kwargs):
|
|
|
+ try:
|
|
|
+ return task._orig_run(*args, **kwargs)
|
|
|
+ except autoretry_for as exc:
|
|
|
+ raise task.retry(exc=exc, **retry_kwargs)
|
|
|
+
|
|
|
+ task._orig_run, task.run = task.run, run
|
|
|
else:
|
|
|
task = self._tasks[name]
|
|
|
return task
|