|
@@ -2,6 +2,7 @@ import socket
|
|
|
import sys
|
|
|
|
|
|
from datetime import timedelta
|
|
|
+from Queue import Empty, Queue
|
|
|
|
|
|
from celery import states
|
|
|
from celery.app import app_or_default
|
|
@@ -123,8 +124,15 @@ class test_AMQPBackend(unittest.TestCase):
|
|
|
|
|
|
def test_poll_result(self):
|
|
|
|
|
|
+ results = Queue()
|
|
|
+
|
|
|
+ class Message(object):
|
|
|
+
|
|
|
+ def __init__(self, **merge):
|
|
|
+ self.payload = dict({"status": states.STARTED,
|
|
|
+ "result": None}, **merge)
|
|
|
+
|
|
|
class MockBinding(object):
|
|
|
- get_returns = [True]
|
|
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
|
pass
|
|
@@ -136,23 +144,37 @@ class test_AMQPBackend(unittest.TestCase):
|
|
|
pass
|
|
|
|
|
|
def get(self, no_ack=False):
|
|
|
- if self.get_returns[0]:
|
|
|
- class Object(object):
|
|
|
- payload = {"status": "STARTED",
|
|
|
- "result": None}
|
|
|
- return Object()
|
|
|
+ try:
|
|
|
+ return results.get(block=False)
|
|
|
+ except Empty:
|
|
|
+ pass
|
|
|
|
|
|
class MockBackend(AMQPBackend):
|
|
|
Queue = MockBinding
|
|
|
|
|
|
backend = MockBackend()
|
|
|
- backend.poll(gen_unique_id())
|
|
|
+
|
|
|
+ # FFWD's to the latest state.
|
|
|
+ results.put(Message(status=states.RECEIVED, seq=1))
|
|
|
+ results.put(Message(status=states.STARTED, seq=2))
|
|
|
+ results.put(Message(status=states.FAILURE, seq=3))
|
|
|
+ r1 = backend.poll(gen_unique_id())
|
|
|
+ self.assertDictContainsSubset({"status": states.FAILURE,
|
|
|
+ "seq": 3}, r1,
|
|
|
+ "FFWDs to the last state")
|
|
|
+
|
|
|
+ # Caches last known state.
|
|
|
+ results.put(Message())
|
|
|
uuid = gen_unique_id()
|
|
|
backend.poll(uuid)
|
|
|
- self.assertIn(uuid, backend._cache)
|
|
|
- MockBinding.get_returns[0] = False
|
|
|
+ self.assertIn(uuid, backend._cache, "Caches last known state")
|
|
|
+
|
|
|
+ # Returns cache if no new states.
|
|
|
+ results.queue.clear()
|
|
|
+ assert not results.qsize()
|
|
|
backend._cache[uuid] = "hello"
|
|
|
- self.assertEqual(backend.poll(uuid), "hello")
|
|
|
+ self.assertEqual(backend.poll(uuid), "hello",
|
|
|
+ "Returns cache if no new states")
|
|
|
|
|
|
def test_wait_for(self):
|
|
|
b = self.create_backend()
|