Преглед на файлове

Tasks can now raise celery.exceptions.Ignore to not update any state

Ask Solem преди 12 години
родител
ревизия
46ab71ff74
променени са 4 файла, в които са добавени 26 реда и са изтрити 7 реда
  1. 4 0
      celery/exceptions.py
  2. 1 0
      celery/states.py
  3. 4 1
      celery/task/trace.py
  4. 17 6
      celery/worker/job.py

+ 4 - 0
celery/exceptions.py

@@ -25,6 +25,10 @@ class SecurityError(Exception):
     """
 
 
+class Ignore(Exception):
+    """A task can raise this to ignore doing state updates."""
+
+
 class SystemTerminate(SystemExit):
     """Signals that the worker should terminate."""
 

+ 1 - 0
celery/states.py

@@ -126,6 +126,7 @@ SUCCESS = 'SUCCESS'
 FAILURE = 'FAILURE'
 REVOKED = 'REVOKED'
 RETRY = 'RETRY'
+IGNORED = 'IGNORED'
 
 READY_STATES = frozenset([SUCCESS, FAILURE, REVOKED])
 UNREADY_STATES = frozenset([PENDING, RECEIVED, STARTED, RETRY])

+ 4 - 1
celery/task/trace.py

@@ -29,7 +29,7 @@ from celery._state import _task_stack
 from celery.app import set_default_app
 from celery.app.task import Task as BaseTask, Context
 from celery.datastructures import ExceptionInfo
-from celery.exceptions import RetryTaskError
+from celery.exceptions import Ignore, RetryTaskError
 from celery.utils.serialization import get_pickleable_exception
 from celery.utils.log import get_logger
 
@@ -43,6 +43,7 @@ send_success = signals.task_success.send
 success_receivers = signals.task_success.receivers
 STARTED = states.STARTED
 SUCCESS = states.SUCCESS
+IGNORED = states.IGNORED
 RETRY = states.RETRY
 FAILURE = states.FAILURE
 EXCEPTION_STATES = states.EXCEPTION_STATES
@@ -222,6 +223,8 @@ def build_tracer(name, task, loader=None, hostname=None, store_errors=True,
                 try:
                     R = retval = fun(*args, **kwargs)
                     state = SUCCESS
+                except Ignore, exc:
+                    I, R = Info(IGNORED, exc), ExceptionInfo(internal=True)
                 except RetryTaskError, exc:
                     I = Info(RETRY, exc)
                     state, retval = I.state, I.retval

+ 17 - 6
celery/worker/job.py

@@ -23,7 +23,7 @@ from celery import exceptions
 from celery import signals
 from celery.app import app_or_default
 from celery.datastructures import ExceptionInfo
-from celery.exceptions import TaskRevokedError
+from celery.exceptions import Ignore, TaskRevokedError
 from celery.platforms import signals as _signals
 from celery.task.trace import (
     trace_task,
@@ -64,8 +64,9 @@ class Request(object):
                  'eventer', 'connection_errors',
                  'task', 'eta', 'expires',
                  'request_dict', 'acknowledged', 'success_msg',
-                 'error_msg', 'retry_msg', 'time_start', 'worker_pid',
-                 '_already_revoked', '_terminate_on_ack', '_tzlocal')
+                 'error_msg', 'retry_msg', 'ignore_msg',
+                 'time_start', 'worker_pid', '_already_revoked',
+                 '_terminate_on_ack', '_tzlocal')
 
     #: Format string used to log task success.
     success_msg = """\
@@ -82,6 +83,10 @@ class Request(object):
         Task %(name)s[%(id)s] INTERNAL ERROR: %(exc)s
     """
 
+    ignored_msg = """\
+        Task %(name)s[%(id)s] ignored
+    """
+
     #: Format string used to log task retry.
     retry_msg = """Task %(name)s[%(id)s] retry: %(exc)s"""
 
@@ -380,9 +385,15 @@ class Request(object):
                          traceback=traceback)
 
         if internal:
-            format = self.internal_error_msg
-            description = 'INTERNAL ERROR'
-            severity = logging.CRITICAL
+            if isinstance(einfo.exception, Ignore):
+                format = self.ignored_msg
+                description = 'ignored'
+                severity = logging.INFO
+                exc_info = None
+            else:
+                format = self.internal_error_msg
+                description = 'INTERNAL ERROR'
+                severity = logging.CRITICAL
 
         context = {
             'hostname': self.hostname,