Browse Source

Fixes for group/chord and using json serializer

Ask Solem 12 years ago
parent
commit
3077d7f0f1
2 changed files with 7 additions and 6 deletions
  1. 2 1
      celery/app/amqp.py
  2. 5 5
      celery/app/builtins.py

+ 2 - 1
celery/app/amqp.py

@@ -148,7 +148,7 @@ class TaskProducer(Producer):
         self.queues = self.app.amqp.queues  # shortcut
         super(TaskProducer, self).__init__(channel, exchange, *args, **kwargs)
 
-    def delay_task(self, task_name, task_args=None, task_kwargs=None,
+    def publish_task(self, task_name, task_args=None, task_kwargs=None,
             countdown=None, eta=None, task_id=None, group_id=None,
             taskset_id=None,  # compat alias to group_id
             expires=None, exchange=None, exchange_type=None,
@@ -213,6 +213,7 @@ class TaskProducer(Producer):
                                                expires=expires,
                                                queue=queue)
         return task_id
+    delay_task = publish_task   # XXX Compat
 
 
 class TaskPublisher(TaskProducer):

+ 5 - 5
celery/app/builtins.py

@@ -155,13 +155,13 @@ def add_group_task(app):
             if self.app.conf.CELERY_ALWAYS_EAGER:
                 return self.apply(args, kwargs, **options)
             tasks, result, gid = self.prepare(options, **kwargs)
-            super(Group, self).apply_async(
-                    (list(tasks), result, gid), **options)
+            super(Group, self).apply_async((
+                list(tasks), result.serializable(), gid), **options)
             return result
 
         def apply(self, args=(), kwargs={}, **options):
-            tasks, result, gid = self.prepare(options, **kwargs)
-            return super(Group, self).apply((tasks, result, gid), **options)
+            return super(Group, self).apply(
+                    self.prepare(options, **kwargs), **options)
     return Group
 
 
@@ -226,7 +226,7 @@ def add_chord_task(app):
         def run(self, header, body, interval=1, max_retries=None,
                 propagate=False, eager=False, **kwargs):
             if not isinstance(header, group):
-                header = group(header)
+                header = group(map(maybe_subtask, header))
             r = []
             group_id = uuid()
             for task in header.tasks: