Browse Source

Tests passing

Ask Solem 12 years ago
parent
commit
b605c03cd6

+ 4 - 4
celery/app/routes.py

@@ -58,9 +58,9 @@ class Router(object):
             # things (like the routing_key): great for topic exchanges.
             queue = route.pop('queue', None)
 
-        if queue:  # expand config from configured queue.
+        if queue:
             try:
-                _Q = self.queues[queue]  # noqa
+                Q = self.queues[queue]  # noqa
             except KeyError:
                 if not self.create_missing:
                     raise QueueNotFound(
@@ -68,9 +68,9 @@ class Router(object):
                 for key in 'exchange', 'routing_key':
                     if route.get(key) is None:
                         route[key] = queue
-                self.app.amqp.queues.add(queue, **route)
+                Q = self.app.amqp.queues.add(queue, **route)
             # needs to be declared by publisher
-            route['queue'] = queue
+            route['queue'] = Q
         return route
 
     def lookup_route(self, task, args=None, kwargs=None):

+ 17 - 3
celery/canvas.py

@@ -20,7 +20,7 @@ from kombu.utils import cached_property, fxrange, kwdict, reprcall, uuid
 from celery import current_app
 from celery.local import Proxy
 from celery.utils.compat import chain_from_iterable
-from celery.result import GroupResult
+from celery.result import AsyncResult, GroupResult
 from celery.utils.functional import (
     maybe_list, is_list, regen,
     chunks as _chunks,
@@ -135,7 +135,7 @@ class Signature(dict):
             tid = opts['task_id']
         except KeyError:
             tid = opts['task_id'] = _id or uuid()
-        return self.type.AsyncResult(tid)
+        return self.AsyncResult(tid)
 
     def replace(self, args=None, kwargs=None, options=None):
         s = self.clone()
@@ -156,7 +156,7 @@ class Signature(dict):
     def apply_async(self, args=(), kwargs={}, **options):
         # For callbacks: extra args are prepended to the stored args.
         args, kwargs, options = self._merge(args, kwargs, options)
-        return self.type.apply_async(args, kwargs, **options)
+        return self._apply_async(args, kwargs, **options)
 
     def append_to_list_option(self, key, value):
         items = self.options.setdefault(key, [])
@@ -204,6 +204,20 @@ class Signature(dict):
     @cached_property
     def type(self):
         return self._type or current_app.tasks[self['task']]
+
+    @cached_property
+    def AsyncResult(self):
+        try:
+            return self.type.AsyncResult
+        except KeyError:  # task not registered
+            return AsyncResult
+
+    @cached_property
+    def _apply_async(self):
+        try:
+            return self.type.apply_async
+        except KeyError:
+            return current_app.send_task
     task = _getitem_property('task')
     args = _getitem_property('args')
     kwargs = _getitem_property('kwargs')

+ 2 - 2
celery/tests/app/test_amqp.py

@@ -62,7 +62,7 @@ class test_PublisherPool(AppCase):
             delattr(self.app, '_pool')
         except AttributeError:
             pass
-        self.app.amqp.__dict__.pop('producer_pool', None)
+        self.app.amqp._producer_pool = None
         try:
             pool = self.app.amqp.producer_pool
             self.assertEqual(pool.limit, self.app.pool.limit)
@@ -84,7 +84,7 @@ class test_PublisherPool(AppCase):
             delattr(self.app, '_pool')
         except AttributeError:
             pass
-        self.app.amqp.__dict__.pop('producer_pool', None)
+        self.app.amqp._producer_pool = None
         try:
             pool = self.app.amqp.producer_pool
             self.assertEqual(pool.limit, self.app.pool.limit)

+ 17 - 19
celery/tests/app/test_routes.py

@@ -3,6 +3,7 @@ from __future__ import with_statement
 
 from functools import wraps
 
+from kombu import Exchange
 from kombu.utils.functional import maybe_promise
 
 from celery import current_app
@@ -59,12 +60,7 @@ d_queue = {'exchange': current_app.conf.CELERY_DEFAULT_EXCHANGE,
 
 
 class RouteCase(Case):
-
-    def assertAnswer(self, answer, expected):
-        self.assertEqual(answer['exchange'].name, expected['exchange'])
-        self.assertEqual(answer['routing_key'], expected['routing_key'])
-        if 'queue' in expected:
-            self.assertEqual(answer['queue'], expected['queue'])
+    pass
 
 
 class test_MapRoute(RouteCase):
@@ -73,7 +69,10 @@ class test_MapRoute(RouteCase):
     def test_route_for_task_expanded_route(self):
         expand = E(current_app.amqp.queues)
         route = routes.MapRoute({mytask.name: {'queue': 'foo'}})
-        self.assertAnswer(expand(route.route_for_task(mytask.name)), a_queue)
+        self.assertEqual(
+            expand(route.route_for_task(mytask.name))['queue'].name,
+            'foo',
+        )
         self.assertIsNone(route.route_for_task('celery.awesome'))
 
     @with_queues(foo=a_queue, bar=b_queue)
@@ -103,8 +102,8 @@ class test_lookup_route(RouteCase):
         R = routes.prepare(({mytask.name: {'queue': 'bar'}},
                             {mytask.name: {'queue': 'foo'}}))
         router = Router(R, current_app.amqp.queues)
-        self.assertAnswer(router.route({}, mytask.name,
-                          args=[1, 2], kwargs={}), b_queue)
+        self.assertEqual(router.route({}, mytask.name,
+                         args=[1, 2], kwargs={})['queue'].name, 'bar')
 
     @with_queues()
     def test_expands_queue_in_options(self):
@@ -118,17 +117,16 @@ class test_lookup_route(RouteCase):
                               'immediate': False},
                              mytask.name,
                              args=[1, 2], kwargs={})
-        self.assertDictContainsSubset({'routing_key': 'testq',
-                                       'immediate': False},
-                                       route)
-        self.assertEqual(route['exchange'].name, 'testq')
-        self.assertIn('queue', route)
+        self.assertEqual(route['queue'].name, 'testq')
+        self.assertEqual(route['queue'].exchange, Exchange('testq'))
+        self.assertEqual(route['queue'].routing_key, 'testq')
+        self.assertEqual(route['immediate'], False)
 
     @with_queues(foo=a_queue, bar=b_queue)
     def test_expand_destination_string(self):
         x = Router({}, current_app.amqp.queues)
         dest = x.expand_destination('foo')
-        self.assertEqual(dest['exchange'].name, 'fooexchange')
+        self.assertEqual(dest['queue'].name, 'foo')
 
     @with_queues(foo=a_queue, bar=b_queue, **{
         current_app.conf.CELERY_DEFAULT_QUEUE: d_queue})
@@ -136,10 +134,10 @@ class test_lookup_route(RouteCase):
         R = routes.prepare(({'celery.xaza': {'queue': 'bar'}},
                             {mytask.name: {'queue': 'foo'}}))
         router = Router(R, current_app.amqp.queues)
-        self.assertAnswer(router.route({}, mytask.name,
-                          args=[1, 2], kwargs={}), a_queue)
-        self.assertAnswer(router.route({}, 'celery.poza'),
-                dict(d_queue, queue=current_app.conf.CELERY_DEFAULT_QUEUE))
+        self.assertEqual(router.route({}, mytask.name,
+                          args=[1, 2], kwargs={})['queue'].name, 'foo')
+        self.assertEqual(router.route({}, 'celery.poza')['queue'].name,
+                current_app.conf.CELERY_DEFAULT_QUEUE)
 
 
 class test_prepare(Case):

+ 1 - 2
celery/tests/bin/test_celeryd_detach.py

@@ -86,8 +86,7 @@ class test_Command(Case):
         x.execute_from_commandline(self.argv)
         self.assertTrue(exit.called)
         detach.assert_called_with(path=x.execv_path, uid=None, gid=None,
-            umask=0, working_directory=None, fake=False,
-            logfile='/var/log', pidfile='celeryd.pid',
+            umask=0, fake=False, logfile='/var/log', pidfile='celeryd.pid',
             argv=['-m', 'celery.bin.celeryd', '-c', '1', '-lDEBUG',
                   '--logfile=/var/log', '--pidfile=celeryd.pid',
                   '--', '.disable_rate_limits=1'],

+ 3 - 2
celery/tests/worker/test_worker.py

@@ -280,7 +280,7 @@ class test_Consumer(Case):
         l.receive_message(m.decode(), m)
         self.assertTrue(warn.call_count)
 
-    @patch('celery.utils.timer2.to_timestamp')
+    @patch('celery.worker.consumer.to_timestamp')
     def test_receive_message_eta_OverflowError(self, to_timestamp):
         to_timestamp.side_effect = OverflowError()
         l = MyKombuConsumer(self.ready_queue, timer=self.timer)
@@ -291,10 +291,11 @@ class test_Consumer(Case):
         l.event_dispatcher = Mock()
         l.pidbox_node = MockNode()
         l.update_strategies()
+        l.qos = Mock()
 
         l.receive_message(m.decode(), m)
+        self.assertTrue(to_timestamp.called)
         self.assertTrue(m.acknowledged)
-        self.assertTrue(to_timestamp.call_count)
 
     @patch('celery.worker.consumer.error')
     def test_receive_message_InvalidTaskError(self, error):