Преглед на файлове

Task: Properly forward all execution options to retried task. Closes #3297

Ask Solem преди 8 години
родител
ревизия
14c843b765
променени са 2 файла, в които са добавени 25 реда и са изтрити 13 реда
  1. 23 13
      celery/app/task.py
  2. 2 0
      docs/internals/protocol.rst

+ 23 - 13
celery/app/task.py

@@ -112,6 +112,27 @@ class Context(object):
     def __repr__(self):
         return '<Context: {0!r}>'.format(vars(self))
 
+    def as_execution_options(self):
+        limit_hard, limit_soft = self.timelimit or (None, None)
+        return {
+            'task_id': self.id,
+            'root_id': self.root_id,
+            'parent_id': self.parent_id,
+            'group_id': self.group,
+            'chord': self.chord,
+            'chain': self.chain,
+            'link': self.callbacks,
+            'link_error': self.errbacks,
+            'expires': self.expires,
+            'soft_time_limit': limit_soft,
+            'time_limit': limit_hard,
+            'reply_to': self.reply_to,
+            'headers': self.headers,
+            'retries': self.retries,
+            'reply_to': self.reply_to,
+            'origin': self.origin,
+        }
+
     @property
     def children(self):
         # children must be an empy list for every thread
@@ -528,25 +549,14 @@ class Task(object):
         request = self.request if request is None else request
         args = request.args if args is None else args
         kwargs = request.kwargs if kwargs is None else kwargs
-        limit_hard, limit_soft = request.timelimit or (None, None)
-        options = {
-            'task_id': request.id,
-            'link': request.callbacks,
-            'link_error': request.errbacks,
-            'group_id': request.group,
-            'chord': request.chord,
-            'soft_time_limit': limit_soft,
-            'time_limit': limit_hard,
-            'reply_to': request.reply_to,
-            'headers': request.headers,
-        }
+        options = request.as_execution_options()
         options.update(
             {'queue': queue} if queue else (request.delivery_info or {}),
         )
         return self.signature(
             args, kwargs, options, type=self, **extra_options
         )
-    subtask_from_request = signature_from_request
+    subtask_from_request = signature_from_request  # XXX compat
 
     def retry(self, args=None, kwargs=None, exc=None, throw=True,
               eta=None, countdown=None, max_retries=None, **options):

+ 2 - 0
docs/internals/protocol.rst

@@ -125,6 +125,8 @@ Changes from version 1
 
     - Java/C, etc. can use a Thrift/protobuf document as the body
 
+- ``origin`` is the name of the node sending the task.
+
 - Dispatches to actor based on ``task``, ``meth`` headers
 
     ``meth`` is unused by python, but may be used in the future