Browse Source

Try to fix integration tests

Ask Solem 8 years ago
parent
commit
2e7e619282
4 changed files with 20 additions and 7 deletions
  1. 4 1
      celery/contrib/testing/manager.py
  2. 4 0
      celery/contrib/testing/worker.py
  3. 9 5
      t/integration/test_canvas.py
  4. 3 1
      tox.ini

+ 4 - 1
celery/contrib/testing/manager.py

@@ -12,6 +12,7 @@ from kombu.utils.functional import retry_over_time
 
 from celery.exceptions import TimeoutError
 from celery.five import items
+from celery.result import ResultSet
 from celery.utils.text import truncate
 from celery.utils.time import humanize_seconds as _humanize_seconds
 
@@ -105,6 +106,8 @@ class ManagerMixin(object):
     def join(self, r, propagate=False, max_retries=10, **kwargs):
         if self.no_join:
             return
+        if not isinstance(r, ResultSet):
+            r = self.app.ResultSet([r])
         received = []
 
         def on_result(task_id, value):
@@ -125,7 +128,7 @@ class ManagerMixin(object):
                 self.remark('join: connection lost: {0!r}'.format(exc), '!')
         raise AssertionError('Test failed: Missing task results')
 
-    def inspect(self, timeout=1):
+    def inspect(self, timeout=3.0):
         return self.app.control.inspect(timeout=timeout)
 
     def query_tasks(self, ids, timeout=0.5):

+ 4 - 0
celery/contrib/testing/worker.py

@@ -93,6 +93,10 @@ def _start_worker_thread(app,
     """
     setup_app_for_worker(app, loglevel, logfile)
     assert 'celery.ping' in app.tasks
+    # Make sure we can connect to the broker
+    with app.connection() as conn:
+        conn.default_channel.queue_declare
+
     worker = WorkController(
         app=app,
         concurrency=concurrency,

+ 9 - 5
t/integration/xxx_canvas.py → t/integration/test_canvas.py

@@ -3,12 +3,14 @@ import pytest
 from celery import chain, group, uuid
 from .tasks import add, collect_ids, ids
 
+TIMEOUT = 30
+
 
 class test_chain:
 
     def test_simple_chain(self, manager):
         c = add.s(4, 4) | add.s(8) | add.s(16)
-        assert manager.join(c()) == 32
+        assert c().get(timeout=TIMEOUT) == 32
 
     def test_complex_chain(self, manager):
         c = (
@@ -18,13 +20,14 @@ class test_chain:
             group(add.s(i) for i in range(4))
         )
         res = c()
-        assert res.get() == [32, 33, 34, 35]
+        assert res.get(timeout=TIMEOUT) == [32, 33, 34, 35]
 
     def test_parent_ids(self, manager, num=10):
+        assert manager.inspect().ping()
         c = chain(ids.si(i) for i in range(num))
         c.freeze()
         res = c()
-        res.get(timeout=30)
+        res.get(timeout=TIMEOUT)
         self.assert_ids(res, num - 1)
 
     def assert_ids(self, res, size):
@@ -44,12 +47,13 @@ class test_chain:
 
 class test_group:
 
-    def test_parent_ids(self):
+    def test_parent_ids(self, manager):
+        assert manager.inspect().ping()
         g = ids.si(1) | ids.si(2) | group(ids.si(i) for i in range(2, 50))
         res = g()
         expected_root_id = res.parent.parent.id
         expected_parent_id = res.parent.id
-        values = res.get(timeout=30)
+        values = res.get(timeout=TIMEOUT)
 
         for i, r in enumerate(values):
             root_id, parent_id, value = r

+ 3 - 1
tox.ini

@@ -26,7 +26,9 @@ recreate = False
 commands =
     pip install -U -r{toxinidir}/requirements/dev.txt
     unit: py.test -xv
-    integration: py.test -xv t/integration
+    integration: py.test -xsv t/integration
+setenv =
+    WORKER_LOGLEVEL = debug
 
 basepython =
     2.7: python2.7