Преглед на файлове

Merge branch '3.1' of github.com:celery/celery into 3.1

Ask Solem преди 9 години
родител
ревизия
8a4245282a

+ 2 - 1
celery/backends/amqp.py

@@ -180,7 +180,8 @@ class AMQPBackend(BaseBackend):
                 raise self.BacklogLimitExceeded(task_id)
 
             if latest:
-                payload = self._cache[task_id] = latest.payload
+                payload = self._cache[task_id] = \
+                    self.meta_from_decoded(latest.payload)
                 latest.requeue()
                 return payload
             else:

+ 7 - 4
celery/backends/mongodb.py

@@ -104,8 +104,13 @@ class MongoBackend(BaseBackend):
             if pymongo.version_tuple >= (3, ):
                 return {'maxPoolSize': self.max_pool_size}
             else:  # pragma: no cover
-                return {'max_pool_size': max_pool_size,
-                        'auto_start_request': False}
+                options = {
+                    'max_pool_size': self.max_pool_size,
+                    'auto_start_request': False
+                }
+                if detect_environment() != 'default':
+                    options['use_greenlets'] = True
+                return options
 
     def _get_connection(self):
         """Connect to the MongoDB server."""
@@ -124,8 +129,6 @@ class MongoBackend(BaseBackend):
                 url = 'mongodb://{0}:{1}'.format(url, self.port)
             if url == 'mongodb://':
                 url = url + 'localhost'
-            if detect_environment() != 'default':
-                self.options['use_greenlets'] = True
             self._connection = MongoClient(host=url, **self.options)
 
         return self._connection

+ 57 - 6
celery/tests/backends/test_amqp.py

@@ -1,5 +1,6 @@
 from __future__ import absolute_import
 
+import json
 import pickle
 import socket
 
@@ -129,7 +130,7 @@ class test_AMQPBackend(AppCase):
         self.assertState(b.get_task_meta(uuid()), states.PENDING)
 
     @contextmanager
-    def _result_context(self):
+    def _result_context(self, serializer='pickle'):
         results = Queue()
 
         class Message(object):
@@ -139,9 +140,13 @@ class test_AMQPBackend(AppCase):
             def __init__(self, **merge):
                 self.payload = dict({'status': states.STARTED,
                                      'result': None}, **merge)
-                self.body = pickle.dumps(self.payload)
-                self.content_type = 'application/x-python-serialize'
-                self.content_encoding = 'binary'
+                if serializer == 'json':
+                    self.body = json.dumps(self.payload)
+                    self.content_type = 'application/json'
+                else:
+                    self.body = pickle.dumps(self.payload)
+                    self.content_type = 'application/x-python-serialize'
+                    self.content_encoding = 'binary'
 
             def ack(self, *args, **kwargs):
                 self.acked += 1
@@ -176,6 +181,7 @@ class test_AMQPBackend(AppCase):
             Queue = MockBinding
 
         backend = MockBackend(self.app, max_cached_results=100)
+        backend.serializer = serializer
         backend._republish = Mock()
 
         yield results, backend, Message
@@ -200,8 +206,10 @@ class test_AMQPBackend(AppCase):
                 results.put(state_message)
             r1 = backend.get_task_meta(tid)
             self.assertDictContainsSubset(
-                {'status': states.FAILURE, 'seq': 3}, r1,
-                'FFWDs to the last state',
+                {
+                    'status': states.FAILURE,
+                    'seq': 3
+                }, r1, 'FFWDs to the last state',
             )
 
             # Caches last known state.
@@ -221,6 +229,49 @@ class test_AMQPBackend(AppCase):
                 'Returns cache if no new states',
             )
 
+    def test_poll_result_for_json_serializer(self):
+        with self._result_context(serializer='json') as \
+                (results, backend, Message):
+            tid = uuid()
+            # FFWD's to the latest state.
+            state_messages = [
+                Message(task_id=tid, status=states.RECEIVED, seq=1),
+                Message(task_id=tid, status=states.STARTED, seq=2),
+                Message(task_id=tid, status=states.FAILURE, seq=3,
+                        result={
+                            'exc_type': 'RuntimeError',
+                            'exc_message': 'Mock'
+                        }),
+                ]
+            for state_message in state_messages:
+                results.put(state_message)
+            r1 = backend.get_task_meta(tid)
+            self.assertDictContainsSubset(
+                {
+                    'status': states.FAILURE,
+                    'seq': 3
+                }, r1, 'FFWDs to the last state',
+            )
+            self.assertEquals(type(r1['result']).__name__, 'RuntimeError')
+            self.assertEqual(str(r1['result']), 'Mock')
+
+            # Caches last known state.
+            tid = uuid()
+            results.put(Message(task_id=tid))
+            backend.get_task_meta(tid)
+            self.assertIn(tid, backend._cache, 'Caches last known state')
+
+            self.assertTrue(state_messages[-1].requeued)
+
+            # Returns cache if no new states.
+            results.queue.clear()
+            assert not results.qsize()
+            backend._cache[tid] = 'hello'
+            self.assertEqual(
+                backend.get_task_meta(tid), 'hello',
+                'Returns cache if no new states',
+                )
+
     def test_wait_for(self):
         b = self.create_backend()
 

+ 40 - 1
celery/tests/backends/test_mongodb.py

@@ -254,7 +254,7 @@ class test_MongoBackend(AppCase):
         mock_database.__getitem__.assert_called_once_with(MONGODB_COLLECTION)
         mock_collection.find_one.assert_called_once_with(
             {'_id': sentinel.taskset_id})
-        self.assertEqual(
+        self.assertItemsEqual(
             ['date_done', 'result', 'task_id'],
             list(ret_val.keys()),
         )
@@ -324,3 +324,42 @@ class test_MongoBackend(AppCase):
         with self.assertRaises(ImproperlyConfigured):
             x._get_database()
         db.authenticate.assert_called_with('jerry', 'cere4l')
+
+    @patch('celery.backends.mongodb.detect_environment')
+    def test_prepare_client_options_for_ver_2(self, m_detect_env):
+        m_detect_env.return_value = 'default'
+        with patch('pymongo.version_tuple', new=(2, 6, 3)):
+            options = self.backend._prepare_client_options()
+            self.assertDictEqual(options, {
+                'max_pool_size': self.backend.max_pool_size,
+                'auto_start_request': False
+            })
+
+    @patch('celery.backends.mongodb.detect_environment')
+    def test_prepare_client_options_for_ver_2_with_gevent(self, m_detect_env):
+        m_detect_env.return_value = 'gevent'
+        with patch('pymongo.version_tuple', new=(2, 6, 3)):
+            options = self.backend._prepare_client_options()
+            self.assertDictEqual(options, {
+                'max_pool_size': self.backend.max_pool_size,
+                'auto_start_request': False,
+                'use_greenlets': True
+            })
+
+    @patch('celery.backends.mongodb.detect_environment')
+    def test_prepare_client_options_for_ver_3(self, m_detect_env):
+        m_detect_env.return_value = 'default'
+        with patch('pymongo.version_tuple', new=(3, 0, 3)):
+            options = self.backend._prepare_client_options()
+            self.assertDictEqual(options, {
+                'maxPoolSize': self.backend.max_pool_size
+            })
+
+    @patch('celery.backends.mongodb.detect_environment')
+    def test_prepare_client_options_for_ver_3_with_gevent(self, m_detect_env):
+        m_detect_env.return_value = 'gevent'
+        with patch('pymongo.version_tuple', new=(3, 0, 3)):
+            options = self.backend._prepare_client_options()
+            self.assertDictEqual(options, {
+                'maxPoolSize': self.backend.max_pool_size
+            })

+ 1 - 1
celery/tests/backends/test_redis.py

@@ -251,7 +251,7 @@ class test_RedisBackend(AppCase):
         self.assertTrue(b.client.lrange.call_count)
         gkey = b.get_key_for_group('group_id', '.j')
         b.client.delete.assert_called_with(gkey)
-        b.client.expire.assert_called_witeh(gkey, 86400)
+        b.client.expire.assert_called_with(gkey, 86400)
 
     def test_process_cleanup(self):
         self.Backend(app=self.app, new_join=True).process_cleanup()

+ 1 - 1
celery/tests/utils/test_mail.py

@@ -46,7 +46,7 @@ class test_Mailer(Case):
         mailer = Mailer(use_ssl=False, use_tls=False)
         mailer._send(msg)
 
-        client.sendmail.assert_called_With(msg.sender, msg.to, str(msg))
+        client.sendmail.assert_called_with(msg.sender, msg.to, str(msg))
 
         client.quit.side_effect = SSLError()
         mailer._send(msg)