Преглед на файлове

Fixes chord bug when processing IGNORED tasks

Should not run on_chord_part_return, task_after_return,
nor send_postrun when task was IGNORED or RETRIED
Vlad преди 12 години
родител
ревизия
1a4e3e2fcc
променени са 1 файла, в които са добавени 14 реда и са изтрити 10 реда
  1. 14 10
      celery/task/trace.py

+ 14 - 10
celery/task/trace.py

@@ -47,6 +47,7 @@ IGNORED = states.IGNORED
 RETRY = states.RETRY
 RETRY = states.RETRY
 FAILURE = states.FAILURE
 FAILURE = states.FAILURE
 EXCEPTION_STATES = states.EXCEPTION_STATES
 EXCEPTION_STATES = states.EXCEPTION_STATES
+IGNORE_STATES = frozenset([IGNORED, RETRY])
 
 
 #: set by :func:`setup_worker_optimizations`
 #: set by :func:`setup_worker_optimizations`
 _tasks = None
 _tasks = None
@@ -142,7 +143,8 @@ class TraceInfo(object):
 
 
 
 
 def build_tracer(name, task, loader=None, hostname=None, store_errors=True,
 def build_tracer(name, task, loader=None, hostname=None, store_errors=True,
-                 Info=TraceInfo, eager=False, propagate=False):
+                 Info=TraceInfo, eager=False, propagate=False,
+                 IGNORE_STATES=IGNORE_STATES):
     """Builts a function that tracing the tasks execution; catches all
     """Builts a function that tracing the tasks execution; catches all
     exceptions, and saves the state and result of the task execution
     exceptions, and saves the state and result of the task execution
     to the result backend.
     to the result backend.
@@ -227,7 +229,8 @@ def build_tracer(name, task, loader=None, hostname=None, store_errors=True,
                     state = SUCCESS
                     state = SUCCESS
                 except Ignore, exc:
                 except Ignore, exc:
                     I, R = Info(IGNORED, exc), ExceptionInfo(internal=True)
                     I, R = Info(IGNORED, exc), ExceptionInfo(internal=True)
-                except RetryTaskError, exc:
+                    state, retval = I.state, I.retval
+                except RetryTaskError,  exc:
                     I = Info(RETRY, exc)
                     I = Info(RETRY, exc)
                     state, retval = I.state, I.retval
                     state, retval = I.state, I.retval
                     R = I.handle_error_state(task, eager=eager)
                     R = I.handle_error_state(task, eager=eager)
@@ -264,14 +267,15 @@ def build_tracer(name, task, loader=None, hostname=None, store_errors=True,
                         send_success(sender=task, result=retval)
                         send_success(sender=task, result=retval)
 
 
                 # -* POST *-
                 # -* POST *-
-                if task_request.chord:
-                    on_chord_part_return(task)
-                if task_after_return:
-                    task_after_return(state, retval, uuid, args, kwargs, None)
-                if postrun_receivers:
-                    send_postrun(sender=task, task_id=uuid, task=task,
-                                 args=args, kwargs=kwargs,
-                                 retval=retval, state=state)
+                if state not in IGNORE_STATES:
+                    if task_request.chord:
+                        on_chord_part_return(task)
+                    if task_after_return:
+                        task_after_return(state, retval, uuid, args, kwargs, None)
+                    if postrun_receivers:
+                        send_postrun(sender=task, task_id=uuid, task=task,
+                                     args=args, kwargs=kwargs,
+                                     retval=retval, state=state)
             finally:
             finally:
                 pop_task()
                 pop_task()
                 pop_request()
                 pop_request()