Selaa lähdekoodia

Add test for ResultSet that includes a deliberate error (#4746)

* Add test for ResultSet that includes a deliberate error

* Use ValueError rather than StandardError in integration test

* Stop attempting to cache results in ResultSet

There are various good reasons:

- the population of the cache itself fails if any if the underlying AsyncResult
  failed, causing the calls to get or join_native to unexpectedly die;
- the cache is only used by get, not by other methods such as join or join_native

* Fix flake8 line length warning

* Further flake8 fixes

* Fixes merge error

* Fix flake8 whitespace errors
Derek Harland 6 vuotta sitten
vanhempi
commit
b472fbccd0
3 muutettua tiedostoa jossa 18 lisäystä ja 6 poistoa
  1. 0 4
      celery/result.py
  2. 6 0
      t/integration/tasks.py
  3. 12 2
      t/integration/test_canvas.py

+ 0 - 4
celery/result.py

@@ -497,7 +497,6 @@ class ResultSet(ResultBase):
 
     def __init__(self, results, app=None, ready_barrier=None, **kwargs):
         self._app = app
-        self._cache = None
         self.results = results
         self.on_ready = promise(args=(self,))
         self._on_full = ready_barrier or barrier(results)
@@ -516,7 +515,6 @@ class ResultSet(ResultBase):
 
     def _on_ready(self):
         if self.backend.is_async:
-            self._cache = [r.get() for r in self.results]
             self.on_ready()
 
     def remove(self, result):
@@ -662,8 +660,6 @@ class ResultSet(ResultBase):
         in addition it uses :meth:`join_native` if available for the
         current result backend.
         """
-        if self._cache is not None:
-            return self._cache
         return (self.join_native if self.supports_native_join else self.join)(
             timeout=timeout, propagate=propagate,
             interval=interval, callback=callback, no_ack=no_ack,

+ 6 - 0
t/integration/tasks.py

@@ -23,6 +23,12 @@ def add(x, y):
     return x + y
 
 
+@shared_task
+def raise_error():
+    """Deliberately raise an error."""
+    raise ValueError("deliberate error")
+
+
 @shared_task(ignore_result=True)
 def add_ignore_result(x, y):
     """Add two numbers."""

+ 12 - 2
t/integration/test_canvas.py

@@ -12,8 +12,8 @@ from .conftest import flaky, get_active_redis_channels, get_redis_connection
 from .tasks import (add, add_chord_to_chord, add_replaced, add_to_all,
                     add_to_all_to_chord, build_chain_inside_task, chord_error,
                     collect_ids, delayed_sum, delayed_sum_with_soft_guard,
-                    fail, identity, ids, print_unicode, redis_echo,
-                    second_order_replace1, tsum)
+                    fail, identity, ids, print_unicode, raise_error,
+                    redis_echo, second_order_replace1, tsum)
 
 TIMEOUT = 120
 
@@ -213,6 +213,16 @@ class test_result_set:
         rs = ResultSet([add.delay(1, 1), add.delay(2, 2)])
         assert rs.get(timeout=TIMEOUT) == [2, 4]
 
+    @flaky
+    def test_result_set_error(self, manager):
+        assert manager.inspect().ping()
+
+        rs = ResultSet([raise_error.delay(), add.delay(1, 1)])
+        rs.get(timeout=TIMEOUT, propagate=False)
+
+        assert rs.results[0].failed()
+        assert rs.results[1].successful()
+
 
 class test_group: