Browse Source

Merge branch 'master' of github.com:ask/celery

Ask Solem 13 years ago
parent
commit
247a68ef80
3 changed files with 17 additions and 5 deletions
  1. 13 3
      celery/app/builtins.py
  2. 2 0
      celery/app/task.py
  3. 2 2
      celery/task/trace.py

+ 13 - 3
celery/app/builtins.py

@@ -170,9 +170,19 @@ def add_chain_task(app):
         def apply_async(self, args=(), kwargs={}, **options):
             if self.app.conf.CELERY_ALWAYS_EAGER:
                 return self.apply(args, kwargs, **options)
-            tasks = [maybe_subtask(task).clone(task_id=uuid(), **kwargs)
-                        for task in kwargs["tasks"]]
+            options.pop("publisher", None)
+            taskset_id = options.pop("taskset_id", None)
+            chord = options.pop("chord", None)
+            tasks = [maybe_subtask(t).clone(
+                        task_id=options.pop("task_id", uuid()),
+                        **options
+                    )
+                    for t in kwargs["tasks"]]
             reduce(lambda a, b: a.link(b), tasks)
+            if taskset_id:
+                tasks[-1].set(taskset_id=taskset_id)
+            if chord:
+                tasks[-1].set(chord=chord)
             tasks[0].apply_async()
             results = [task.type.AsyncResult(task.options["task_id"])
                             for task in tasks]
@@ -203,7 +213,7 @@ def add_chord_task(app):
         app = _app
         name = "celery.chord"
         accept_magic_kwargs = False
-        ignore_result = True
+        ignore_result = False
 
         def run(self, header, body, interval=1, max_retries=None,
                 propagate=False, eager=False, **kwargs):

+ 2 - 0
celery/app/task.py

@@ -344,9 +344,11 @@ class Task(object):
 
     def __call__(self, *args, **kwargs):
         _task_stack.push(self)
+        self.push_request()
         try:
             return self.run(*args, **kwargs)
         finally:
+            self.pop_request()
             _task_stack.pop()
 
     # - tasks are pickled into the name of the task only, and the reciever

+ 2 - 2
celery/task/trace.py

@@ -241,12 +241,12 @@ def build_tracer(name, task, loader=None, hostname=None, store_errors=True,
                     [subtask(errback).apply_async((uuid, ))
                         for errback in task_request.errbacks or []]
                 else:
-                    if publish_result:
-                        store_result(uuid, retval, SUCCESS)
                     # callback tasks must be applied before the result is
                     # stored, so that result.children is populated.
                     [subtask(callback).apply_async((retval, ))
                         for callback in task_request.callbacks or []]
+                    if publish_result:
+                        store_result(uuid, retval, SUCCESS)
                     if task_on_success:
                         task_on_success(retval, uuid, args, kwargs)
                     if success_receivers: