| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276 | 
							- from __future__ import absolute_import, unicode_literals
 
- import pickle
 
- import pytest
 
- from contextlib import contextmanager
 
- from datetime import timedelta
 
- from pickle import dumps, loads
 
- from case import Mock, mock
 
- from billiard.einfo import ExceptionInfo
 
- from celery import states
 
- from celery import uuid
 
- from celery.backends.amqp import AMQPBackend
 
- from celery.five import Empty, Queue, range
 
- from celery.result import AsyncResult
 
- class SomeClass(object):
 
-     def __init__(self, data):
 
-         self.data = data
 
- class test_AMQPBackend:
 
-     def setup(self):
 
-         self.app.conf.result_cache_max = 100
 
-     def create_backend(self, **opts):
 
-         opts = dict(dict(serializer='pickle', persistent=True), **opts)
 
-         return AMQPBackend(self.app, **opts)
 
-     def test_destination_for(self):
 
-         b = self.create_backend()
 
-         request = Mock()
 
-         assert b.destination_for('id', request) == (
 
-             b.rkey('id'), request.correlation_id,
 
-         )
 
-     def test_store_result__no_routing_key(self):
 
-         b = self.create_backend()
 
-         b.destination_for = Mock()
 
-         b.destination_for.return_value = None, None
 
-         b.store_result('id', None, states.SUCCESS)
 
-     def test_mark_as_done(self):
 
-         tb1 = self.create_backend(max_cached_results=1)
 
-         tb2 = self.create_backend(max_cached_results=1)
 
-         tid = uuid()
 
-         tb1.mark_as_done(tid, 42)
 
-         assert tb2.get_state(tid) == states.SUCCESS
 
-         assert tb2.get_result(tid) == 42
 
-         assert tb2._cache.get(tid)
 
-         assert tb2.get_result(tid), 42
 
-     @pytest.mark.usefixtures('depends_on_current_app')
 
-     def test_pickleable(self):
 
-         assert loads(dumps(self.create_backend()))
 
-     def test_revive(self):
 
-         tb = self.create_backend()
 
-         tb.revive(None)
 
-     def test_is_pickled(self):
 
-         tb1 = self.create_backend()
 
-         tb2 = self.create_backend()
 
-         tid2 = uuid()
 
-         result = {'foo': 'baz', 'bar': SomeClass(12345)}
 
-         tb1.mark_as_done(tid2, result)
 
-         # is serialized properly.
 
-         rindb = tb2.get_result(tid2)
 
-         assert rindb.get('foo') == 'baz'
 
-         assert rindb.get('bar').data == 12345
 
-     def test_mark_as_failure(self):
 
-         tb1 = self.create_backend()
 
-         tb2 = self.create_backend()
 
-         tid3 = uuid()
 
-         try:
 
-             raise KeyError('foo')
 
-         except KeyError as exception:
 
-             einfo = ExceptionInfo()
 
-             tb1.mark_as_failure(tid3, exception, traceback=einfo.traceback)
 
-             assert tb2.get_state(tid3) == states.FAILURE
 
-             assert isinstance(tb2.get_result(tid3), KeyError)
 
-             assert tb2.get_traceback(tid3) == einfo.traceback
 
-     def test_repair_uuid(self):
 
-         from celery.backends.amqp import repair_uuid
 
-         for i in range(10):
 
-             tid = uuid()
 
-             assert repair_uuid(tid.replace('-', '')) == tid
 
-     def test_expires_is_int(self):
 
-         b = self.create_backend(expires=48)
 
-         q = b._create_binding('x1y2z3')
 
-         assert q.expires == 48
 
-     def test_expires_is_float(self):
 
-         b = self.create_backend(expires=48.3)
 
-         q = b._create_binding('x1y2z3')
 
-         assert q.expires == 48.3
 
-     def test_expires_is_timedelta(self):
 
-         b = self.create_backend(expires=timedelta(minutes=1))
 
-         q = b._create_binding('x1y2z3')
 
-         assert q.expires == 60
 
-     @mock.sleepdeprived()
 
-     def test_store_result_retries(self):
 
-         iterations = [0]
 
-         stop_raising_at = [5]
 
-         def publish(*args, **kwargs):
 
-             if iterations[0] > stop_raising_at[0]:
 
-                 return
 
-             iterations[0] += 1
 
-             raise KeyError('foo')
 
-         backend = AMQPBackend(self.app)
 
-         from celery.app.amqp import Producer
 
-         prod, Producer.publish = Producer.publish, publish
 
-         try:
 
-             with pytest.raises(KeyError):
 
-                 backend.retry_policy['max_retries'] = None
 
-                 backend.store_result('foo', 'bar', 'STARTED')
 
-             with pytest.raises(KeyError):
 
-                 backend.retry_policy['max_retries'] = 10
 
-                 backend.store_result('foo', 'bar', 'STARTED')
 
-         finally:
 
-             Producer.publish = prod
 
-     def test_poll_no_messages(self):
 
-         b = self.create_backend()
 
-         assert b.get_task_meta(uuid())['status'] == states.PENDING
 
-     @contextmanager
 
-     def _result_context(self):
 
-         results = Queue()
 
-         class Message(object):
 
-             acked = 0
 
-             requeued = 0
 
-             def __init__(self, **merge):
 
-                 self.payload = dict({'status': states.STARTED,
 
-                                      'result': None}, **merge)
 
-                 self.properties = {'correlation_id': merge.get('task_id')}
 
-                 self.body = pickle.dumps(self.payload)
 
-                 self.content_type = 'application/x-python-serialize'
 
-                 self.content_encoding = 'binary'
 
-             def ack(self, *args, **kwargs):
 
-                 self.acked += 1
 
-             def requeue(self, *args, **kwargs):
 
-                 self.requeued += 1
 
-         class MockBinding(object):
 
-             def __init__(self, *args, **kwargs):
 
-                 self.channel = Mock()
 
-             def __call__(self, *args, **kwargs):
 
-                 return self
 
-             def declare(self):
 
-                 pass
 
-             def get(self, no_ack=False, accept=None):
 
-                 try:
 
-                     m = results.get(block=False)
 
-                     if m:
 
-                         m.accept = accept
 
-                     return m
 
-                 except Empty:
 
-                     pass
 
-             def is_bound(self):
 
-                 return True
 
-         class MockBackend(AMQPBackend):
 
-             Queue = MockBinding
 
-         backend = MockBackend(self.app, max_cached_results=100)
 
-         backend._republish = Mock()
 
-         yield results, backend, Message
 
-     def test_backlog_limit_exceeded(self):
 
-         with self._result_context() as (results, backend, Message):
 
-             for i in range(1001):
 
-                 results.put(Message(task_id='id', status=states.RECEIVED))
 
-             with pytest.raises(backend.BacklogLimitExceeded):
 
-                 backend.get_task_meta('id')
 
-     def test_poll_result(self):
 
-         with self._result_context() as (results, backend, Message):
 
-             tid = uuid()
 
-             # FFWD's to the latest state.
 
-             state_messages = [
 
-                 Message(task_id=tid, status=states.RECEIVED, seq=1),
 
-                 Message(task_id=tid, status=states.STARTED, seq=2),
 
-                 Message(task_id=tid, status=states.FAILURE, seq=3),
 
-             ]
 
-             for state_message in state_messages:
 
-                 results.put(state_message)
 
-             r1 = backend.get_task_meta(tid)
 
-             # FFWDs to the last state.
 
-             assert r1['status'] == states.FAILURE
 
-             assert r1['seq'] == 3
 
-             # Caches last known state.
 
-             tid = uuid()
 
-             results.put(Message(task_id=tid))
 
-             backend.get_task_meta(tid)
 
-             assert tid, backend._cache in 'Caches last known state'
 
-             assert state_messages[-1].requeued
 
-             # Returns cache if no new states.
 
-             results.queue.clear()
 
-             assert not results.qsize()
 
-             backend._cache[tid] = 'hello'
 
-             # returns cache if no new states.
 
-             assert backend.get_task_meta(tid) == 'hello'
 
-     def test_drain_events_decodes_exceptions_in_meta(self):
 
-         tid = uuid()
 
-         b = self.create_backend(serializer='json')
 
-         b.store_result(tid, RuntimeError('aap'), states.FAILURE)
 
-         result = AsyncResult(tid, backend=b)
 
-         with pytest.raises(Exception) as excinfo:
 
-             result.get()
 
-         assert excinfo.value.__class__.__name__ == 'RuntimeError'
 
-         assert str(excinfo.value) == 'aap'
 
-     def test_no_expires(self):
 
-         b = self.create_backend(expires=None)
 
-         app = self.app
 
-         app.conf.result_expires = None
 
-         b = self.create_backend(expires=None)
 
-         q = b._create_binding('foo')
 
-         assert q.expires is None
 
-     def test_process_cleanup(self):
 
-         self.create_backend().process_cleanup()
 
-     def test_reload_task_result(self):
 
-         with pytest.raises(NotImplementedError):
 
-             self.create_backend().reload_task_result('x')
 
-     def test_reload_group_result(self):
 
-         with pytest.raises(NotImplementedError):
 
-             self.create_backend().reload_group_result('x')
 
-     def test_save_group(self):
 
-         with pytest.raises(NotImplementedError):
 
-             self.create_backend().save_group('x', 'x')
 
-     def test_restore_group(self):
 
-         with pytest.raises(NotImplementedError):
 
-             self.create_backend().restore_group('x')
 
-     def test_delete_group(self):
 
-         with pytest.raises(NotImplementedError):
 
-             self.create_backend().delete_group('x')
 
 
  |