Explorar o código

revert "test ci fix", get_many_on_message test added

Ilya Georgievsky %!s(int64=10) %!d(string=hai) anos
pai
achega
60c11b681e
Modificáronse 2 ficheiros con 33 adicións e 0 borrados
  1. 30 0
      celery/tests/backends/test_amqp.py
  2. 3 0
      requirements/test-ci.txt

+ 30 - 0
celery/tests/backends/test_amqp.py

@@ -294,6 +294,36 @@ class test_AMQPBackend(AppCase):
             b.store_result(tids[0], i, states.PENDING)
             list(b.get_many(tids, timeout=0.01))
 
+    def test_get_many_on_message(self):
+        b = self.create_backend(max_cached_results=10)
+
+        tids = []
+        for i in range(10):
+            tid = uuid()
+            b.store_result(tid, '', states.PENDING)
+            b.store_result(tid, 'comment_%i_1' % i, states.STARTED)
+            b.store_result(tid, 'comment_%i_2' % i, states.STARTED)
+            b.store_result(tid, 'final result %i' % i, states.SUCCESS)
+            tids.append(tid)
+
+
+        expected_messages = {}
+        for i, _tid in enumerate(tids):
+            expected_messages[_tid] = []
+            expected_messages[_tid].append( (states.PENDING, '') )
+            expected_messages[_tid].append( (states.STARTED, 'comment_%i_1' % i) )
+            expected_messages[_tid].append( (states.STARTED, 'comment_%i_2' % i) )
+            expected_messages[_tid].append( (states.SUCCESS, 'final result %i' % i) )
+
+        on_message_results = {}
+        def on_message(body):
+            if not body['task_id'] in on_message_results:
+                on_message_results[body['task_id']] = []
+            on_message_results[body['task_id']].append( (body['status'], body['result']) )
+
+        b.get_many(tids, timeout=1, on_message=on_message)
+        self.assertEqual(sorted(on_message_results), sorted(expected_messages))
+
     def test_get_many_raises_outer_block(self):
 
         class Backend(AMQPBackend):

+ 3 - 0
requirements/test-ci.txt

@@ -1,4 +1,7 @@
 coverage>=3.0
 coveralls
 redis
+#riak >=2.0
+#pymongo
+#SQLAlchemy
 PyOpenSSL