Browse Source

retry should forward group, chord and timeouts

Ask Solem 12 years ago
parent
commit
9b4ee41406
2 changed files with 7 additions and 9 deletions
  1. 2 3
      celery/app/amqp.py
  2. 5 6
      celery/app/task.py

+ 2 - 3
celery/app/amqp.py

@@ -180,7 +180,7 @@ class TaskProducer(Producer):
             errbacks=None, mandatory=None, priority=None, immediate=None,
             routing_key=None, serializer=None, delivery_mode=None,
             compression=None, reply_to=None, timeout=None, soft_timeout=None,
-            **kwargs):
+            timeouts=None, **kwargs):
         """Send task message."""
         retry = self.retry if retry is None else retry
         # merge default and custom policy
@@ -202,7 +202,6 @@ class TaskProducer(Producer):
             expires = now + timedelta(seconds=expires)
         eta = eta and eta.isoformat()
         expires = expires and expires.isoformat()
-        timeouts = (timeout, soft_timeout)
 
         body = {'task': task_name,
                 'id': task_id,
@@ -215,7 +214,7 @@ class TaskProducer(Producer):
                 'callbacks': callbacks,
                 'errbacks': errbacks,
                 'reply_to': reply_to,
-                'timeouts': timeouts}
+                'timeouts': timeouts or (timeout, soft_timeout)}
         group_id = group_id or taskset_id
         if group_id:
             body['taskset'] = group_id

+ 5 - 6
celery/app/task.py

@@ -498,15 +498,14 @@ class Task(object):
         request = self.request if request is None else request
         args = request.args if args is None else args
         kwargs = request.kwargs if kwargs is None else kwargs
-        delivery_info = request.delivery_info or {}
-        options = {
+        options = dict({
             'task_id': request.id,
             'link': request.callbacks,
             'link_error': request.errbacks,
-            'exchange': delivery_info.get('exchange'),
-            'routing_key': delivery_info.get('routing_key'),
-            'expires': delivery_info.get('expires'),
-        }
+            'group_id': request.taskset,
+            'chord': request.chord,
+            'timeouts': request.timeouts,
+        }, **request.delivery_info or {})
         return self.subtask(args, kwargs, options, type=self, **extra_options)
 
     def retry(self, args=None, kwargs=None, exc=None, throw=True,