|  | @@ -2,6 +2,7 @@ import socket
 | 
	
		
			
				|  |  |  import sys
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  from datetime import timedelta
 | 
	
		
			
				|  |  | +from Queue import Empty, Queue
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  from celery import states
 | 
	
		
			
				|  |  |  from celery.app import app_or_default
 | 
	
	
		
			
				|  | @@ -123,8 +124,15 @@ class test_AMQPBackend(unittest.TestCase):
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      def test_poll_result(self):
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +        results = Queue()
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        class Message(object):
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            def __init__(self, **merge):
 | 
	
		
			
				|  |  | +                self.payload = dict({"status": states.STARTED,
 | 
	
		
			
				|  |  | +                                     "result": None}, **merge)
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |          class MockBinding(object):
 | 
	
		
			
				|  |  | -            get_returns = [True]
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |              def __init__(self, *args, **kwargs):
 | 
	
		
			
				|  |  |                  pass
 | 
	
	
		
			
				|  | @@ -136,23 +144,37 @@ class test_AMQPBackend(unittest.TestCase):
 | 
	
		
			
				|  |  |                  pass
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |              def get(self, no_ack=False):
 | 
	
		
			
				|  |  | -                if self.get_returns[0]:
 | 
	
		
			
				|  |  | -                    class Object(object):
 | 
	
		
			
				|  |  | -                        payload = {"status": "STARTED",
 | 
	
		
			
				|  |  | -                                "result": None}
 | 
	
		
			
				|  |  | -                    return Object()
 | 
	
		
			
				|  |  | +                try:
 | 
	
		
			
				|  |  | +                    return results.get(block=False)
 | 
	
		
			
				|  |  | +                except Empty:
 | 
	
		
			
				|  |  | +                    pass
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          class MockBackend(AMQPBackend):
 | 
	
		
			
				|  |  |              Queue = MockBinding
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          backend = MockBackend()
 | 
	
		
			
				|  |  | -        backend.poll(gen_unique_id())
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        # FFWD's to the latest state.
 | 
	
		
			
				|  |  | +        results.put(Message(status=states.RECEIVED, seq=1))
 | 
	
		
			
				|  |  | +        results.put(Message(status=states.STARTED, seq=2))
 | 
	
		
			
				|  |  | +        results.put(Message(status=states.FAILURE, seq=3))
 | 
	
		
			
				|  |  | +        r1 = backend.poll(gen_unique_id())
 | 
	
		
			
				|  |  | +        self.assertDictContainsSubset({"status": states.FAILURE,
 | 
	
		
			
				|  |  | +                                       "seq": 3}, r1,
 | 
	
		
			
				|  |  | +                                       "FFWDs to the last state")
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        # Caches last known state.
 | 
	
		
			
				|  |  | +        results.put(Message())
 | 
	
		
			
				|  |  |          uuid = gen_unique_id()
 | 
	
		
			
				|  |  |          backend.poll(uuid)
 | 
	
		
			
				|  |  | -        self.assertIn(uuid, backend._cache)
 | 
	
		
			
				|  |  | -        MockBinding.get_returns[0] = False
 | 
	
		
			
				|  |  | +        self.assertIn(uuid, backend._cache, "Caches last known state")
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        # Returns cache if no new states.
 | 
	
		
			
				|  |  | +        results.queue.clear()
 | 
	
		
			
				|  |  | +        assert not results.qsize()
 | 
	
		
			
				|  |  |          backend._cache[uuid] = "hello"
 | 
	
		
			
				|  |  | -        self.assertEqual(backend.poll(uuid), "hello")
 | 
	
		
			
				|  |  | +        self.assertEqual(backend.poll(uuid), "hello",
 | 
	
		
			
				|  |  | +                         "Returns cache if no new states")
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      def test_wait_for(self):
 | 
	
		
			
				|  |  |          b = self.create_backend()
 |