Forráskód Böngészése

Chord join must use with allow_join_result()

Ask Solem 11 éve
szülő
commit
eb5d05ce9a
4 módosított fájl, 22 hozzáadás és 8 törlés
  1. 3 2
      celery/app/builtins.py
  2. 3 2
      celery/backends/base.py
  3. 12 1
      celery/result.py
  4. 4 3
      examples/resultgraph/tasks.py

+ 3 - 2
celery/app/builtins.py

@@ -66,7 +66,7 @@ def add_unlock_chord_task(app):
     """
     from celery.canvas import signature
     from celery.exceptions import ChordError
-    from celery.result import result_from_tuple
+    from celery.result import allow_join_result, result_from_tuple
 
     default_propagate = app.conf.CELERY_CHORD_PROPAGATES
 
@@ -95,7 +95,8 @@ def add_unlock_chord_task(app):
         if deps.ready():
             callback = signature(callback, app=app)
             try:
-                ret = j(propagate=propagate)
+                with allow_join_result():
+                    ret = j(propagate=propagate)
             except Exception as exc:
                 try:
                     culprit = next(deps._failed_join_report())

+ 3 - 2
celery/backends/base.py

@@ -469,7 +469,7 @@ class KeyValueStoreBackend(BaseBackend):
         if not self.implements_incr:
             return
         from celery import maybe_signature
-        from celery.result import GroupResult
+        from celery.result import GroupResult, allow_join_result
         app = self.app
         if propagate is None:
             propagate = self.app.conf.CELERY_CHORD_PROPAGATES
@@ -502,7 +502,8 @@ class KeyValueStoreBackend(BaseBackend):
             callback = maybe_signature(task.request.chord, app=self.app)
             j = deps.join_native if deps.supports_native_join else deps.join
             try:
-                ret = j(propagate=propagate)
+                with allow_join_result():
+                    ret = j(propagate=propagate)
             except Exception as exc:
                 try:
                     culprit = next(deps._failed_join_report())

+ 12 - 1
celery/result.py

@@ -11,6 +11,7 @@ from __future__ import absolute_import
 import time
 
 from collections import deque
+from contextlib import contextmanager
 from copy import copy
 
 from kombu.utils import cached_property
@@ -18,7 +19,7 @@ from kombu.utils.compat import OrderedDict
 
 from . import current_app
 from . import states
-from ._state import task_join_will_block
+from ._state import _set_task_join_will_block, task_join_will_block
 from .app import app_or_default
 from .datastructures import DependencyGraph, GraphFormatter
 from .exceptions import IncompleteStream, TimeoutError
@@ -39,6 +40,16 @@ def assert_will_not_block():
         raise RuntimeError(E_WOULDBLOCK)
 
 
+@contextmanager
+def allow_join_result():
+    reset_value = task_join_will_block()
+    _set_task_join_will_block(False)
+    try:
+        yield
+    finally:
+        _set_task_join_will_block(reset_value)
+
+
 class ResultBase(object):
     """Base class for all results"""
 

+ 4 - 3
examples/resultgraph/tasks.py

@@ -20,7 +20,7 @@
 
 
 from celery import chord, group, task, signature, uuid
-from celery.result import AsyncResult, ResultSet
+from celery.result import AsyncResult, ResultSet, allow_join_result
 from collections import deque
 
 
@@ -79,8 +79,9 @@ def unlock_graph(result, callback,
     if result.ready():
         second_level_res = result.get()
         if second_level_res.ready():
-            signature(callback).delay(list(joinall(
-                second_level_res, propagate=propagate)))
+            with allow_join_result():
+                signature(callback).delay(list(joinall(
+                    second_level_res, propagate=propagate)))
     else:
         unlock_graph.retry(countdown=interval, max_retries=max_retries)