ソースを参照

Skip adding children for tasks called with task.__call__. Closes #701

Ask Solem 13 年 前
コミット
5a5ccd986b
4 ファイル変更20 行追加6 行削除
  1. 2 2
      celery/app/builtins.py
  2. 2 2
      celery/app/task.py
  3. 14 0
      celery/state.py
  4. 2 2
      celery/task/sets.py

+ 2 - 2
celery/app/builtins.py

@@ -4,7 +4,7 @@ from __future__ import with_statement
 
 from itertools import starmap
 
-from celery.state import get_current_task
+from celery.state import get_current_worker_task
 from celery.utils import uuid
 
 #: global list of functions defining tasks that should be
@@ -123,7 +123,7 @@ def add_group_task(app):
             with app.default_producer() as pub:
                 [subtask(task).apply_async(taskset_id=setid, publisher=pub)
                         for task in tasks]
-            parent = get_current_task()
+            parent = get_current_worker_task()
             if parent:
                 parent.request.children.append(result)
             return result

+ 2 - 2
celery/app/task.py

@@ -21,7 +21,7 @@ from kombu.utils import cached_property
 from celery import current_app
 from celery import states
 from celery.__compat__ import class_property
-from celery.state import get_current_task, _task_stack
+from celery.state import get_current_worker_task, _task_stack
 from celery.datastructures import ExceptionInfo
 from celery.exceptions import MaxRetriesExceededError, RetryTaskError
 from celery.result import EagerResult
@@ -581,7 +581,7 @@ class BaseTask(object):
                 publish.release()
 
         result = self.AsyncResult(task_id)
-        parent = get_current_task()
+        parent = get_current_worker_task()
         if parent:
             parent.request.children.append(result)
         return result

+ 14 - 0
celery/state.py

@@ -32,8 +32,22 @@ def get_current_app():
 
 
 def get_current_task():
+    """Currently executing task."""
     return _task_stack.top
 
 
+def get_current_worker_task():
+    """Currently executing task, that was applied by the worker.
+
+    This is used to differentiate between the actual task
+    executed by the worker and any task that was called within
+    a task (using ``task.__call__`` or ``task.apply``)
+
+    """
+    for task in reversed(_task_stack.stack):
+        if not task.request.called_directly:
+            return task
+
+
 current_app = Proxy(get_current_app)
 current_task = Proxy(get_current_task)

+ 2 - 2
celery/task/sets.py

@@ -2,7 +2,7 @@
 from __future__ import absolute_import
 from __future__ import with_statement
 
-from celery.state import get_current_task
+from celery.state import get_current_worker_task
 from celery.app import app_or_default
 from celery.canvas import subtask, maybe_subtask  # noqa
 from celery.utils import uuid
@@ -44,7 +44,7 @@ class TaskSet(list):
             results = self._async_results(setid, pub)
 
             result = app.TaskSetResult(setid, results)
-            parent = get_current_task()
+            parent = get_current_worker_task()
             if parent:
                 parent.request.children.append(result)
             return result