Browse Source

Adds Reject exception to reject or requeue the message. Closes #1383

Ask Solem 11 years ago
parent
commit
67e221a024
7 changed files with 71 additions and 29 deletions
  1. 27 21
      celery/app/task.py
  2. 6 2
      celery/app/trace.py
  3. 12 0
      celery/exceptions.py
  4. 1 0
      celery/states.py
  5. 3 1
      celery/worker/consumer.py
  6. 18 3
      celery/worker/job.py
  7. 4 2
      celery/worker/strategy.py

+ 27 - 21
celery/app/task.py

@@ -16,7 +16,7 @@ from celery import current_app
 from celery import states
 from celery._state import _task_stack
 from celery.canvas import subtask
-from celery.exceptions import MaxRetriesExceededError, RetryTaskError
+from celery.exceptions import MaxRetriesExceededError, RetryTaskError, Reject
 from celery.five import class_property, items, with_metaclass
 from celery.result import EagerResult
 from celery.utils import gen_task_name, fun_takes_kwargs, uuid, maybe_reraise
@@ -577,26 +577,32 @@ class Task(object):
         if not eta and countdown is None:
             countdown = self.default_retry_delay
 
-        S = self.subtask_from_request(
-            request, args, kwargs,
-            countdown=countdown, eta=eta, retries=retries,
-            **options
-        )
-
-        if max_retries is not None and retries > max_retries:
-            if exc:
-                maybe_reraise()
-            raise self.MaxRetriesExceededError(
-                "Can't retry {0}[{1}] args:{2} kwargs:{3}".format(
-                    self.name, request.id, S.args, S.kwargs))
-
-        # If task was executed eagerly using apply(),
-        # then the retry must also be executed eagerly.
-        S.apply().get() if request.is_eager else S.apply_async()
-        ret = RetryTaskError(exc=exc, when=eta or countdown)
-        if throw:
-            raise ret
-        return ret
+        is_eager = request.is_eager
+        try:
+            S = self.subtask_from_request(
+                request, args, kwargs,
+                countdown=countdown, eta=eta, retries=retries,
+                **options
+            )
+
+            if max_retries is not None and retries > max_retries:
+                if exc:
+                    maybe_reraise()
+                raise self.MaxRetriesExceededError(
+                    "Can't retry {0}[{1}] args:{2} kwargs:{3}".format(
+                        self.name, request.id, S.args, S.kwargs))
+
+            # If task was executed eagerly using apply(),
+            # then the retry must also be executed eagerly.
+            S.apply().get() if is_eager else S.apply_async()
+            ret = RetryTaskError(exc=exc, when=eta or countdown)
+            if throw:
+                raise ret
+            return ret
+        except Exception as exc:
+            if is_eager:
+                raise
+            raise Reject(exc, requeue=True)
 
     def apply(self, args=None, kwargs=None,
               link=None, link_error=None, **options):

+ 6 - 2
celery/app/trace.py

@@ -29,7 +29,7 @@ from celery import states, signals
 from celery._state import _task_stack
 from celery.app import set_default_app
 from celery.app.task import Task as BaseTask, Context
-from celery.exceptions import Ignore, RetryTaskError
+from celery.exceptions import Ignore, RetryTaskError, Reject
 from celery.utils.log import get_logger
 from celery.utils.objects import mro_lookup
 from celery.utils.serialization import (
@@ -48,10 +48,11 @@ send_success = signals.task_success.send
 STARTED = states.STARTED
 SUCCESS = states.SUCCESS
 IGNORED = states.IGNORED
+REJECTED = states.REJECTED
 RETRY = states.RETRY
 FAILURE = states.FAILURE
 EXCEPTION_STATES = states.EXCEPTION_STATES
-IGNORE_STATES = frozenset([IGNORED, RETRY])
+IGNORE_STATES = frozenset([IGNORED, RETRY, REJECTED])
 
 #: set by :func:`setup_worker_optimizations`
 _tasks = None
@@ -210,6 +211,9 @@ def build_tracer(name, task, loader=None, hostname=None, store_errors=True,
                 try:
                     R = retval = fun(*args, **kwargs)
                     state = SUCCESS
+                except Reject as exc:
+                    I, R = Info(REJECTED, exc), ExceptionInfo(internal=True)
+                    state, retval = I.state, I.retval
                 except Ignore as exc:
                     I, R = Info(IGNORED, exc), ExceptionInfo(internal=True)
                     state, retval = I.state, I.retval

+ 12 - 0
celery/exceptions.py

@@ -40,6 +40,18 @@ class Ignore(Exception):
     """A task can raise this to ignore doing state updates."""
 
 
+class Reject(Exception):
+    """A task can raise this if it wants to reject/requeue the message."""
+
+    def __init__(self, reason=None, requeue=False):
+        self.reason = reason
+        self.requeue = requeue
+        super(Reject, self).__init__(reason, requeue)
+
+    def __repr__(self):
+        return 'reject requeue=%s: %s' % (self.requeue, self.reason)
+
+
 class SystemTerminate(SystemExit):
     """Signals that the worker should terminate."""
 

+ 1 - 0
celery/states.py

@@ -142,6 +142,7 @@ REVOKED = 'REVOKED'
 #: Task is waiting for retry.
 RETRY = 'RETRY'
 IGNORED = 'IGNORED'
+REJECTED = 'REJECTED'
 
 READY_STATES = frozenset([SUCCESS, FAILURE, REVOKED])
 UNREADY_STATES = frozenset([PENDING, RECEIVED, STARTED, RETRY])

+ 3 - 1
celery/worker/consumer.py

@@ -409,7 +409,9 @@ class Consumer(object):
                 return on_unknown_message(body, message)
 
             try:
-                strategies[name](message, body, message.ack_log_error)
+                strategies[name](message, body,
+                                 message.ack_log_error,
+                                 message.reject_log_error)
             except KeyError as exc:
                 on_unknown_task(body, message, exc)
             except InvalidTaskError as exc:

+ 18 - 3
celery/worker/job.py

@@ -24,7 +24,7 @@ from celery.app.trace import trace_task, trace_task_ret
 from celery.exceptions import (
     Ignore, TaskRevokedError, InvalidTaskError,
     SoftTimeLimitExceeded, TimeLimitExceeded,
-    WorkerLostError, Terminated, RetryTaskError,
+    WorkerLostError, Terminated, RetryTaskError, Reject,
 )
 from celery.five import items, monotonic
 from celery.platforms import signals as _signals
@@ -99,13 +99,17 @@ class Request(object):
         Task %(name)s[%(id)s] ignored
     """
 
+    rejected_msg = """\
+        Task %(name)s[%(id)s] %(exc)s
+    """
+
     #: Format string used to log task retry.
     retry_msg = """Task %(name)s[%(id)s] retry: %(exc)s"""
 
     def __init__(self, body, on_ack=noop,
                  hostname=None, eventer=None, app=None,
                  connection_errors=None, request_dict=None,
-                 delivery_info=None, task=None, **opts):
+                 delivery_info=None, task=None, on_reject=noop, **opts):
         self.app = app
         name = self.name = body['task']
         self.id = body['id']
@@ -417,7 +421,13 @@ class Request(object):
         if internal:
             if isinstance(einfo.exception, MemoryError):
                 raise MemoryError('Process got: %s' % (einfo.exception, ))
-            if isinstance(einfo.exception, Ignore):
+            elif isinstance(einfo.exception, Reject):
+                format = self.rejected_msg
+                description = 'rejected'
+                severity = logging.WARN
+                exc_info = einfo
+                self.reject(requeue=einfo.exception.requeue)
+            elif isinstance(einfo.exception, Ignore):
                 format = self.ignored_msg
                 description = 'ignored'
                 severity = logging.INFO
@@ -456,6 +466,11 @@ class Request(object):
             self.on_ack(logger, self.connection_errors)
             self.acknowledged = True
 
+    def reject(self, requeue=False):
+        if not self.acknowledged:
+            self.on_reject(logger, self.connection_errors, requeue)
+            self.acknowledged = True
+
     def repr_result(self, result, maxlen=46):
         # 46 is the length needed to fit
         #     'the quick brown fox jumps over the lazy dog' :)

+ 4 - 2
celery/worker/strategy.py

@@ -41,8 +41,10 @@ def default(task, app, consumer,
     handle = consumer.on_task_request
     limit_task = consumer._limit_task
 
-    def task_message_handler(message, body, ack, to_timestamp=to_timestamp):
-        req = Req(body, on_ack=ack, app=app, hostname=hostname,
+    def task_message_handler(message, body, ack, reject,
+                             to_timestamp=to_timestamp):
+        req = Req(body, on_ack=ack, on_reject=reject,
+                  app=app, hostname=hostname,
                   eventer=eventer, task=task,
                   connection_errors=connection_errors,
                   delivery_info=message.delivery_info)