Ask Solem 8 years ago
parent
commit
49909c910e

+ 2 - 2
celery/app/amqp.py

@@ -394,9 +394,9 @@ class AMQP(object):
         kwargs = kwargs or {}
         kwargs = kwargs or {}
         utc = self.utc
         utc = self.utc
         if not isinstance(args, (list, tuple)):
         if not isinstance(args, (list, tuple)):
-            raise ValueError('task args must be a list or tuple')
+            raise TypeError('task args must be a list or tuple')
         if not isinstance(kwargs, Mapping):
         if not isinstance(kwargs, Mapping):
-            raise ValueError('task keyword arguments must be a mapping')
+            raise TypeError('task keyword arguments must be a mapping')
         if countdown:  # convert countdown to ETA
         if countdown:  # convert countdown to ETA
             self._verify_seconds(countdown, 'countdown')
             self._verify_seconds(countdown, 'countdown')
             now = now or self.app.now()
             now = now or self.app.now()

+ 75 - 40
celery/app/control.py

@@ -57,7 +57,7 @@ class Inspect(object):
 
 
     app = None
     app = None
 
 
-    def __init__(self, destination=None, timeout=1, callback=None,
+    def __init__(self, destination=None, timeout=1.0, callback=None,
                  connection=None, app=None, limit=None):
                  connection=None, app=None, limit=None):
         self.app = app or self.app
         self.app = app or self.app
         self.destination = destination
         self.destination = destination
@@ -113,13 +113,17 @@ class Inspect(object):
         return self._request('registered', taskinfoitems=taskinfoitems)
         return self._request('registered', taskinfoitems=taskinfoitems)
     registered_tasks = registered
     registered_tasks = registered
 
 
-    def ping(self):
+    def ping(self, destination=None):
         return self._request('ping')
         return self._request('ping')
 
 
     def active_queues(self):
     def active_queues(self):
         return self._request('active_queues')
         return self._request('active_queues')
 
 
-    def query_task(self, ids):
+    def query_task(self, *ids):
+        # signature used be unary: query_task(ids=[id1, id2])
+        # we need this to preserve backward compatibility.
+        if len(ids) == 1 and isinstance(ids[0], (list, tuple)):
+            ids = ids[0]
         return self._request('query_task', ids=ids)
         return self._request('query_task', ids=ids)
 
 
     def conf(self, with_defaults=False):
     def conf(self, with_defaults=False):
@@ -179,9 +183,12 @@ class Control(object):
     discard_all = purge
     discard_all = purge
 
 
     def election(self, id, topic, action=None, connection=None):
     def election(self, id, topic, action=None, connection=None):
-        self.broadcast('election', connection=connection, arguments={
-            'id': id, 'topic': topic, 'action': action,
-        })
+        self.broadcast(
+            'election', connection=connection, destination=None,
+            arguments={
+                'id': id, 'topic': topic, 'action': action,
+            },
+        )
 
 
     def revoke(self, task_id, destination=None, terminate=False,
     def revoke(self, task_id, destination=None, terminate=False,
                signal=TERM_SIGNAME, **kwargs):
                signal=TERM_SIGNAME, **kwargs):
@@ -200,10 +207,11 @@ class Control(object):
         See Also:
         See Also:
             :meth:`broadcast` for supported keyword arguments.
             :meth:`broadcast` for supported keyword arguments.
         """
         """
-        return self.broadcast('revoke', destination=destination,
-                              arguments={'task_id': task_id,
-                                         'terminate': terminate,
-                                         'signal': signal}, **kwargs)
+        return self.broadcast('revoke', destination=destination, arguments={
+            'task_id': task_id,
+            'terminate': terminate,
+            'signal': signal,
+        }, **kwargs)
 
 
     def terminate(self, task_id,
     def terminate(self, task_id,
                   destination=None, signal=TERM_SIGNAME, **kwargs):
                   destination=None, signal=TERM_SIGNAME, **kwargs):
@@ -217,7 +225,7 @@ class Control(object):
             task_id,
             task_id,
             destination=destination, terminate=True, signal=signal, **kwargs)
             destination=destination, terminate=True, signal=signal, **kwargs)
 
 
-    def ping(self, destination=None, timeout=1, **kwargs):
+    def ping(self, destination=None, timeout=1.0, **kwargs):
         """Ping all (or specific) workers.
         """Ping all (or specific) workers.
 
 
         Returns:
         Returns:
@@ -226,8 +234,9 @@ class Control(object):
         See Also:
         See Also:
             :meth:`broadcast` for supported keyword arguments.
             :meth:`broadcast` for supported keyword arguments.
         """
         """
-        return self.broadcast('ping', reply=True, destination=destination,
-                              timeout=timeout, **kwargs)
+        return self.broadcast(
+            'ping', reply=True, arguments={}, destination=destination,
+            timeout=timeout, **kwargs)
 
 
     def rate_limit(self, task_name, rate_limit, destination=None, **kwargs):
     def rate_limit(self, task_name, rate_limit, destination=None, **kwargs):
         """Tell workers to set a new rate limit for task by type.
         """Tell workers to set a new rate limit for task by type.
@@ -242,13 +251,18 @@ class Control(object):
         See Also:
         See Also:
             :meth:`broadcast` for supported keyword arguments.
             :meth:`broadcast` for supported keyword arguments.
         """
         """
-        return self.broadcast('rate_limit', destination=destination,
-                              arguments={'task_name': task_name,
-                                         'rate_limit': rate_limit},
-                              **kwargs)
-
-    def add_consumer(self, queue, exchange=None, exchange_type='direct',
-                     routing_key=None, options=None, **kwargs):
+        return self.broadcast(
+            'rate_limit',
+            destination=destination,
+            arguments={
+                'task_name': task_name,
+                'rate_limit': rate_limit,
+            },
+            **kwargs)
+
+    def add_consumer(self, queue,
+                     exchange=None, exchange_type='direct', routing_key=None,
+                     options=None, destination=None, **kwargs):
         """Tell all (or specific) workers to start consuming from a new queue.
         """Tell all (or specific) workers to start consuming from a new queue.
 
 
         Only the queue name is required as if only the queue is specified
         Only the queue name is required as if only the queue is specified
@@ -273,23 +287,28 @@ class Control(object):
         """
         """
         return self.broadcast(
         return self.broadcast(
             'add_consumer',
             'add_consumer',
-            arguments=dict({'queue': queue, 'exchange': exchange,
-                            'exchange_type': exchange_type,
-                            'routing_key': routing_key}, **options or {}),
+            destination=destination,
+            arguments=dict({
+                'queue': queue,
+                'exchange': exchange,
+                'exchange_type': exchange_type,
+                'routing_key': routing_key,
+            }, **options or {}),
             **kwargs
             **kwargs
         )
         )
 
 
-    def cancel_consumer(self, queue, **kwargs):
+    def cancel_consumer(self, queue, destination=None, **kwargs):
         """Tell all (or specific) workers to stop consuming from ``queue``.
         """Tell all (or specific) workers to stop consuming from ``queue``.
 
 
         See Also:
         See Also:
             Supports the same arguments as :meth:`broadcast`.
             Supports the same arguments as :meth:`broadcast`.
         """
         """
         return self.broadcast(
         return self.broadcast(
-            'cancel_consumer', arguments={'queue': queue}, **kwargs
-        )
+            'cancel_consumer', destination=destination,
+            arguments={'queue': queue}, **kwargs)
 
 
-    def time_limit(self, task_name, soft=None, hard=None, **kwargs):
+    def time_limit(self, task_name, soft=None, hard=None,
+                   destination=None, **kwargs):
         """Tell workers to set time limits for a task by type.
         """Tell workers to set time limits for a task by type.
 
 
         Arguments:
         Arguments:
@@ -300,8 +319,13 @@ class Control(object):
         """
         """
         return self.broadcast(
         return self.broadcast(
             'time_limit',
             'time_limit',
-            arguments={'task_name': task_name,
-                       'hard': hard, 'soft': soft}, **kwargs)
+            arguments={
+                'task_name': task_name,
+                'hard': hard,
+                'soft': soft,
+            },
+            destination=destination,
+            **kwargs)
 
 
     def enable_events(self, destination=None, **kwargs):
     def enable_events(self, destination=None, **kwargs):
         """Tell all (or specific) workers to enable events.
         """Tell all (or specific) workers to enable events.
@@ -309,7 +333,8 @@ class Control(object):
         See Also:
         See Also:
             Supports the same arguments as :meth:`broadcast`.
             Supports the same arguments as :meth:`broadcast`.
         """
         """
-        return self.broadcast('enable_events', {}, destination, **kwargs)
+        return self.broadcast(
+            'enable_events', arguments={}, destination=destination, **kwargs)
 
 
     def disable_events(self, destination=None, **kwargs):
     def disable_events(self, destination=None, **kwargs):
         """Tell all (or specific) workers to disable events.
         """Tell all (or specific) workers to disable events.
@@ -317,7 +342,8 @@ class Control(object):
         See Also:
         See Also:
             Supports the same arguments as :meth:`broadcast`.
             Supports the same arguments as :meth:`broadcast`.
         """
         """
-        return self.broadcast('disable_events', {}, destination, **kwargs)
+        return self.broadcast(
+            'disable_events', arguments={}, destination=destination, **kwargs)
 
 
     def pool_grow(self, n=1, destination=None, **kwargs):
     def pool_grow(self, n=1, destination=None, **kwargs):
         """Tell all (or specific) workers to grow the pool by ``n``.
         """Tell all (or specific) workers to grow the pool by ``n``.
@@ -325,7 +351,8 @@ class Control(object):
         See Also:
         See Also:
             Supports the same arguments as :meth:`broadcast`.
             Supports the same arguments as :meth:`broadcast`.
         """
         """
-        return self.broadcast('pool_grow', {'n': n}, destination, **kwargs)
+        return self.broadcast(
+            'pool_grow', arguments={'n': n}, destination=destination, **kwargs)
 
 
     def pool_shrink(self, n=1, destination=None, **kwargs):
     def pool_shrink(self, n=1, destination=None, **kwargs):
         """Tell all (or specific) workers to shrink the pool by ``n``.
         """Tell all (or specific) workers to shrink the pool by ``n``.
@@ -333,7 +360,9 @@ class Control(object):
         See Also:
         See Also:
             Supports the same arguments as :meth:`broadcast`.
             Supports the same arguments as :meth:`broadcast`.
         """
         """
-        return self.broadcast('pool_shrink', {'n': n}, destination, **kwargs)
+        return self.broadcast(
+            'pool_shrink', arguments={'n': n},
+            destination=destination, **kwargs)
 
 
     def autoscale(self, max, min, destination=None, **kwargs):
     def autoscale(self, max, min, destination=None, **kwargs):
         """Change worker(s) autoscale setting.
         """Change worker(s) autoscale setting.
@@ -342,7 +371,8 @@ class Control(object):
             Supports the same arguments as :meth:`broadcast`.
             Supports the same arguments as :meth:`broadcast`.
         """
         """
         return self.broadcast(
         return self.broadcast(
-            'autoscale', {'max': max, 'min': min}, destination, **kwargs)
+            'autoscale', arguments={'max': max, 'min': min},
+            destination=destination, **kwargs)
 
 
     def shutdown(self, destination=None, **kwargs):
     def shutdown(self, destination=None, **kwargs):
         """Shutdown worker(s).
         """Shutdown worker(s).
@@ -351,7 +381,7 @@ class Control(object):
             Supports the same arguments as :meth:`broadcast`
             Supports the same arguments as :meth:`broadcast`
         """
         """
         return self.broadcast(
         return self.broadcast(
-            'shutdown', {}, destination, **kwargs)
+            'shutdown', arguments={}, destination=destination, **kwargs)
 
 
     def pool_restart(self, modules=None, reload=False, reloader=None,
     def pool_restart(self, modules=None, reload=False, reloader=None,
                      destination=None, **kwargs):
                      destination=None, **kwargs):
@@ -360,7 +390,7 @@ class Control(object):
         Keyword Arguments:
         Keyword Arguments:
             modules (Sequence[str]): List of modules to reload.
             modules (Sequence[str]): List of modules to reload.
             reload (bool): Flag to enable module reloading.  Default is False.
             reload (bool): Flag to enable module reloading.  Default is False.
-            reloader (Any):  Function to reload a module.
+            reloader (Any): Function to reload a module.
             destination (Sequence[str]): List of worker names to send this
             destination (Sequence[str]): List of worker names to send this
                 command to.
                 command to.
 
 
@@ -369,8 +399,12 @@ class Control(object):
         """
         """
         return self.broadcast(
         return self.broadcast(
             'pool_restart',
             'pool_restart',
-            {'modules': modules, 'reload': reload, 'reloader': reloader},
-            destination, **kwargs)
+            arguments={
+                'modules': modules,
+                'reload': reload,
+                'reloader': reloader,
+            },
+            destination=destination, **kwargs)
 
 
     def heartbeat(self, destination=None, **kwargs):
     def heartbeat(self, destination=None, **kwargs):
         """Tell worker(s) to send a heartbeat immediately.
         """Tell worker(s) to send a heartbeat immediately.
@@ -378,10 +412,11 @@ class Control(object):
         See Also:
         See Also:
             Supports the same arguments as :meth:`broadcast`
             Supports the same arguments as :meth:`broadcast`
         """
         """
-        return self.broadcast('heartbeat', {}, destination, **kwargs)
+        return self.broadcast(
+            'heartbeat', arguments={}, destination=destination, **kwargs)
 
 
     def broadcast(self, command, arguments=None, destination=None,
     def broadcast(self, command, arguments=None, destination=None,
-                  connection=None, reply=False, timeout=1, limit=None,
+                  connection=None, reply=False, timeout=1.0, limit=None,
                   callback=None, channel=None, **extra_kwargs):
                   callback=None, channel=None, **extra_kwargs):
         """Broadcast a control command to the celery workers.
         """Broadcast a control command to the celery workers.
 
 

+ 39 - 0
t/unit/app/test_amqp.py

@@ -85,6 +85,14 @@ class test_Queues:
         assert isinstance(q['foo'], Queue)
         assert isinstance(q['foo'], Queue)
         assert q['foo'].routing_key == 'rk'
         assert q['foo'].routing_key == 'rk'
 
 
+    def test_setitem_adds_default_exchange(self):
+        q = Queues(default_exchange=Exchange('bar'))
+        assert q.default_exchange
+        queue = Queue('foo', exchange=None)
+        queue.exchange = None
+        q['foo'] = queue
+        assert q['foo'].exchange == q.default_exchange
+
     @pytest.mark.parametrize('ha_policy,qname,q,qargs,expected', [
     @pytest.mark.parametrize('ha_policy,qname,q,qargs,expected', [
         (None, 'xyz', 'xyz', None, None),
         (None, 'xyz', 'xyz', None, None),
         (None, 'xyz', 'xyz', {'x-foo': 'bar'}, {'x-foo': 'bar'}),
         (None, 'xyz', 'xyz', {'x-foo': 'bar'}, {'x-foo': 'bar'}),
@@ -181,6 +189,25 @@ class test_default_queues:
         assert queue.routing_key == rkey or name
         assert queue.routing_key == rkey or name
 
 
 
 
+class test_AMQP_proto1:
+
+    def test_kwargs_must_be_mapping(self):
+        with pytest.raises(TypeError):
+            self.app.amqp.as_task_v1(uuid(), 'foo', kwargs=[1, 2])
+
+    def test_args_must_be_list(self):
+        with pytest.raises(TypeError):
+            self.app.amqp.as_task_v1(uuid(), 'foo', args='abc')
+
+    def test_countdown_negative(self):
+        with pytest.raises(ValueError):
+            self.app.amqp.as_task_v1(uuid(), 'foo', countdown=-1232132323123)
+
+    def test_as_task_message_without_utc(self):
+        self.app.amqp.utc = False
+        self.app.amqp.as_task_v1(uuid(), 'foo', countdown=30, expires=40)
+
+
 class test_AMQP:
 class test_AMQP:
 
 
     def setup(self):
     def setup(self):
@@ -188,6 +215,18 @@ class test_AMQP:
             uuid(), 'foo', create_sent_event=True,
             uuid(), 'foo', create_sent_event=True,
         )
         )
 
 
+    def test_kwargs_must_be_mapping(self):
+        with pytest.raises(TypeError):
+            self.app.amqp.as_task_v2(uuid(), 'foo', kwargs=[1, 2])
+
+    def test_args_must_be_list(self):
+        with pytest.raises(TypeError):
+            self.app.amqp.as_task_v2(uuid(), 'foo', args='abc')
+
+    def test_countdown_negative(self):
+        with pytest.raises(ValueError):
+            self.app.amqp.as_task_v2(uuid(), 'foo', countdown=-1232132323123)
+
     def test_Queues__with_ha_policy(self):
     def test_Queues__with_ha_policy(self):
         x = self.app.amqp.Queues({}, ha_policy='all')
         x = self.app.amqp.Queues({}, ha_policy='all')
         assert x.ha_policy == 'all'
         assert x.ha_policy == 'all'

+ 18 - 0
t/unit/app/test_app.py

@@ -72,6 +72,19 @@ class test_App:
     def setup(self):
     def setup(self):
         self.app.add_defaults(deepcopy(self.CELERY_TEST_CONFIG))
         self.app.add_defaults(deepcopy(self.CELERY_TEST_CONFIG))
 
 
+    @patch('celery.app.base.set_default_app')
+    def test_set_default(self, set_default_app):
+        self.app.set_default()
+        set_default_app.assert_called_with(self.app)
+
+    @patch('celery.security.setup_security')
+    def test_setup_security(self, setup_security):
+        self.app.setup_security(
+            {'json'}, 'key', 'cert', 'store', 'digest', 'serializer')
+        setup_security.assert_called_with(
+            {'json'}, 'key', 'cert', 'store', 'digest', 'serializer',
+            app=self.app)
+
     def test_task_autofinalize_disabled(self):
     def test_task_autofinalize_disabled(self):
         with self.Celery('xyzibari', autofinalize=False) as app:
         with self.Celery('xyzibari', autofinalize=False) as app:
             @app.task
             @app.task
@@ -882,6 +895,11 @@ class test_App:
         self.app.select_queues({'foo', 'bar'})
         self.app.select_queues({'foo', 'bar'})
         self.app.amqp.queues.select.assert_called_with({'foo', 'bar'})
         self.app.amqp.queues.select.assert_called_with({'foo', 'bar'})
 
 
+    def test_Beat(self):
+        from celery.apps.beat import Beat
+        beat = self.app.Beat()
+        assert isinstance(beat, Beat)
+
 
 
 class test_defaults:
 class test_defaults:
 
 

+ 4 - 0
t/unit/app/test_backends.py

@@ -35,3 +35,7 @@ class test_backends:
             sbn.side_effect = ValueError()
             sbn.side_effect = ValueError()
             with pytest.raises(ImproperlyConfigured):
             with pytest.raises(ImproperlyConfigured):
                 backends.by_name('xxx.xxx:foo', app.loader)
                 backends.by_name('xxx.xxx:foo', app.loader)
+
+    def test_backend_can_not_be_module(self, app):
+        with pytest.raises(ImproperlyConfigured):
+            backends.by_name(pytest, app.loader)

+ 348 - 157
t/unit/app/test_control.py

@@ -2,13 +2,13 @@ from __future__ import absolute_import, unicode_literals
 
 
 import pytest
 import pytest
 
 
-from kombu.pidbox import Mailbox
-from vine.utils import wraps
+from case import Mock
 
 
 from celery import uuid
 from celery import uuid
 from celery.app import control
 from celery.app import control
 from celery.exceptions import DuplicateNodenameWarning
 from celery.exceptions import DuplicateNodenameWarning
 from celery.five import items
 from celery.five import items
+from celery.utils.collections import LimitedSet
 
 
 
 
 def _info_for_commandclass(type_):
 def _info_for_commandclass(type_):
@@ -36,35 +36,6 @@ def test_inspect_implements_all_commands(app):
             assert getattr(inspect, name)
             assert getattr(inspect, name)
 
 
 
 
-class MockMailbox(Mailbox):
-    sent = []
-
-    def _publish(self, command, *args, **kwargs):
-        self.__class__.sent.append(command)
-
-    def close(self):
-        pass
-
-    def _collect(self, *args, **kwargs):
-        pass
-
-
-class Control(control.Control):
-    Mailbox = MockMailbox
-
-
-def with_mock_broadcast(fun):
-
-    @wraps(fun)
-    def _resets(*args, **kwargs):
-        MockMailbox.sent = []
-        try:
-            return fun(*args, **kwargs)
-        finally:
-            MockMailbox.sent = []
-    return _resets
-
-
 class test_flatten_reply:
 class test_flatten_reply:
 
 
     def test_flatten_reply(self):
     def test_flatten_reply(self):
@@ -85,12 +56,12 @@ class test_flatten_reply:
 class test_inspect:
 class test_inspect:
 
 
     def setup(self):
     def setup(self):
-        self.c = Control(app=self.app)
-        self.prev, self.app.control = self.app.control, self.c
-        self.i = self.c.inspect()
+        self.app.control.broadcast = Mock(name='broadcast')
+        self.app.control.broadcast.return_value = {}
+        self.inspect = self.app.control.inspect()
 
 
     def test_prepare_reply(self):
     def test_prepare_reply(self):
-        reply = self.i._prepare([
+        reply = self.inspect._prepare([
             {'w1': {'ok': 1}},
             {'w1': {'ok': 1}},
             {'w2': {'ok': 1}},
             {'w2': {'ok': 1}},
         ])
         ])
@@ -99,201 +70,421 @@ class test_inspect:
             'w2': {'ok': 1},
             'w2': {'ok': 1},
         }
         }
 
 
-        i = self.c.inspect(destination='w1')
+        i = self.app.control.inspect(destination='w1')
         assert i._prepare([{'w1': {'ok': 1}}]) == {'ok': 1}
         assert i._prepare([{'w1': {'ok': 1}}]) == {'ok': 1}
 
 
-    @with_mock_broadcast
+    def assert_broadcast_called(self, command,
+                                destination=None,
+                                callback=None,
+                                connection=None,
+                                limit=None,
+                                timeout=None,
+                                reply=True,
+                                **arguments):
+        self.app.control.broadcast.assert_called_with(
+            command,
+            arguments=arguments,
+            destination=destination or self.inspect.destination,
+            callback=callback or self.inspect.callback,
+            connection=connection or self.inspect.connection,
+            limit=limit if limit is not None else self.inspect.limit,
+            timeout=timeout if timeout is not None else self.inspect.timeout,
+            reply=reply,
+        )
+
     def test_active(self):
     def test_active(self):
-        self.i.active()
-        assert 'active' in MockMailbox.sent
+        self.inspect.active()
+        self.assert_broadcast_called('active')
 
 
-    @with_mock_broadcast
     def test_clock(self):
     def test_clock(self):
-        self.i.clock()
-        assert 'clock' in MockMailbox.sent
+        self.inspect.clock()
+        self.assert_broadcast_called('clock')
 
 
-    @with_mock_broadcast
     def test_conf(self):
     def test_conf(self):
-        self.i.conf()
-        assert 'conf' in MockMailbox.sent
+        self.inspect.conf()
+        self.assert_broadcast_called('conf', with_defaults=False)
+
+    def test_conf__with_defaults(self):
+        self.inspect.conf(with_defaults=True)
+        self.assert_broadcast_called('conf', with_defaults=True)
 
 
-    @with_mock_broadcast
     def test_hello(self):
     def test_hello(self):
-        self.i.hello('george@vandelay.com')
-        assert 'hello' in MockMailbox.sent
+        self.inspect.hello('george@vandelay.com')
+        self.assert_broadcast_called(
+            'hello', from_node='george@vandelay.com', revoked=None)
+
+    def test_hello__with_revoked(self):
+        revoked = LimitedSet(100)
+        for i in range(100):
+            revoked.add('id{0}'.format(i))
+        self.inspect.hello('george@vandelay.com', revoked=revoked._data)
+        self.assert_broadcast_called(
+            'hello', from_node='george@vandelay.com', revoked=revoked._data)
 
 
-    @with_mock_broadcast
     def test_memsample(self):
     def test_memsample(self):
-        self.i.memsample()
-        assert 'memsample' in MockMailbox.sent
+        self.inspect.memsample()
+        self.assert_broadcast_called('memsample')
 
 
-    @with_mock_broadcast
     def test_memdump(self):
     def test_memdump(self):
-        self.i.memdump()
-        assert 'memdump' in MockMailbox.sent
+        self.inspect.memdump()
+        self.assert_broadcast_called('memdump', samples=10)
+
+    def test_memdump__samples_specified(self):
+        self.inspect.memdump(samples=303)
+        self.assert_broadcast_called('memdump', samples=303)
 
 
-    @with_mock_broadcast
     def test_objgraph(self):
     def test_objgraph(self):
-        self.i.objgraph()
-        assert 'objgraph' in MockMailbox.sent
+        self.inspect.objgraph()
+        self.assert_broadcast_called(
+            'objgraph', num=200, type='Request', max_depth=10)
 
 
-    @with_mock_broadcast
     def test_scheduled(self):
     def test_scheduled(self):
-        self.i.scheduled()
-        assert 'scheduled' in MockMailbox.sent
+        self.inspect.scheduled()
+        self.assert_broadcast_called('scheduled')
 
 
-    @with_mock_broadcast
     def test_reserved(self):
     def test_reserved(self):
-        self.i.reserved()
-        assert 'reserved' in MockMailbox.sent
+        self.inspect.reserved()
+        self.assert_broadcast_called('reserved')
 
 
-    @with_mock_broadcast
     def test_stats(self):
     def test_stats(self):
-        self.i.stats()
-        assert 'stats' in MockMailbox.sent
+        self.inspect.stats()
+        self.assert_broadcast_called('stats')
 
 
-    @with_mock_broadcast
     def test_revoked(self):
     def test_revoked(self):
-        self.i.revoked()
-        assert 'revoked' in MockMailbox.sent
-
-    @with_mock_broadcast
-    def test_tasks(self):
-        self.i.registered()
-        assert 'registered' in MockMailbox.sent
+        self.inspect.revoked()
+        self.assert_broadcast_called('revoked')
+
+    def test_registered(self):
+        self.inspect.registered()
+        self.assert_broadcast_called('registered', taskinfoitems=())
+
+    def test_registered__taskinfoitems(self):
+        self.inspect.registered('rate_limit', 'time_limit')
+        self.assert_broadcast_called(
+            'registered',
+            taskinfoitems=('rate_limit', 'time_limit'),
+        )
 
 
-    @with_mock_broadcast
     def test_ping(self):
     def test_ping(self):
-        self.i.ping()
-        assert 'ping' in MockMailbox.sent
+        self.inspect.ping()
+        self.assert_broadcast_called('ping')
 
 
-    @with_mock_broadcast
     def test_active_queues(self):
     def test_active_queues(self):
-        self.i.active_queues()
-        assert 'active_queues' in MockMailbox.sent
+        self.inspect.active_queues()
+        self.assert_broadcast_called('active_queues')
+
+    def test_query_task(self):
+        self.inspect.query_task('foo', 'bar')
+        self.assert_broadcast_called('query_task', ids=('foo', 'bar'))
+
+    def test_query_task__compat_single_list_argument(self):
+        self.inspect.query_task(['foo', 'bar'])
+        self.assert_broadcast_called('query_task', ids=['foo', 'bar'])
+
+    def test_query_task__scalar(self):
+        self.inspect.query_task('foo')
+        self.assert_broadcast_called('query_task', ids=('foo',))
 
 
-    @with_mock_broadcast
     def test_report(self):
     def test_report(self):
-        self.i.report()
-        assert 'report' in MockMailbox.sent
+        self.inspect.report()
+        self.assert_broadcast_called('report')
+
+
+class test_Control_broadcast:
+
+    def setup(self):
+        self.app.control.mailbox = Mock(name='mailbox')
+
+    def test_broadcast(self):
+        self.app.control.broadcast('foobarbaz', arguments={'foo': 2})
+        self.app.control.mailbox.assert_called()
+        self.app.control.mailbox()._broadcast.assert_called_with(
+            'foobarbaz', {'foo': 2}, None, False, 1.0, None, None,
+            channel=None,
+        )
+
+    def test_broadcast_limit(self):
+        self.app.control.broadcast(
+            'foobarbaz1', arguments=None, limit=None, destination=[1, 2, 3],
+        )
+        self.app.control.mailbox.assert_called()
+        self.app.control.mailbox()._broadcast.assert_called_with(
+            'foobarbaz1', {}, [1, 2, 3], False, 1.0, None, None,
+            channel=None,
+        )
 
 
 
 
-class test_Broadcast:
+class test_Control:
 
 
     def setup(self):
     def setup(self):
-        self.control = Control(app=self.app)
-        self.app.control = self.control
+        self.app.control.broadcast = Mock(name='broadcast')
+        self.app.control.broadcast.return_value = {}
 
 
         @self.app.task(shared=False)
         @self.app.task(shared=False)
         def mytask():
         def mytask():
             pass
             pass
         self.mytask = mytask
         self.mytask = mytask
 
 
-    def test_purge(self):
-        self.control.purge()
+    def assert_control_called_with_args(self, name, destination=None,
+                                        _options=None, **args):
+        self.app.control.broadcast.assert_called_with(
+            name, destination=destination, arguments=args, **_options or {})
 
 
-    @with_mock_broadcast
-    def test_broadcast(self):
-        self.control.broadcast('foobarbaz', arguments=[])
-        assert 'foobarbaz' in MockMailbox.sent
+    def test_purge(self):
+        self.app.amqp.TaskConsumer = Mock(name='TaskConsumer')
+        self.app.control.purge()
+        self.app.amqp.TaskConsumer().purge.assert_called_with()
 
 
-    @with_mock_broadcast
-    def test_broadcast_limit(self):
-        self.control.broadcast(
-            'foobarbaz1', arguments=[], limit=None, destination=[1, 2, 3],
+    def test_rate_limit(self):
+        self.app.control.rate_limit(self.mytask.name, '100/m')
+        self.assert_control_called_with_args(
+            'rate_limit',
+            destination=None,
+            task_name=self.mytask.name,
+            rate_limit='100/m',
         )
         )
-        assert 'foobarbaz1' in MockMailbox.sent
-
-    @with_mock_broadcast
-    def test_broadcast_validate(self):
-        with pytest.raises(ValueError):
-            self.control.broadcast('foobarbaz2',
-                                   destination='foo')
 
 
-    @with_mock_broadcast
-    def test_rate_limit(self):
-        self.control.rate_limit(self.mytask.name, '100/m')
-        assert 'rate_limit' in MockMailbox.sent
+    def test_rate_limit__with_destination(self):
+        self.app.control.rate_limit(
+            self.mytask.name, '100/m', 'a@w.com', limit=100)
+        self.assert_control_called_with_args(
+            'rate_limit',
+            destination='a@w.com',
+            task_name=self.mytask.name,
+            rate_limit='100/m',
+            _options={'limit': 100},
+        )
 
 
-    @with_mock_broadcast
     def test_time_limit(self):
     def test_time_limit(self):
-        self.control.time_limit(self.mytask.name, soft=10, hard=20)
-        assert 'time_limit' in MockMailbox.sent
+        self.app.control.time_limit(self.mytask.name, soft=10, hard=20)
+        self.assert_control_called_with_args(
+            'time_limit',
+            destination=None,
+            task_name=self.mytask.name,
+            soft=10,
+            hard=20,
+        )
+
+    def test_time_limit__with_destination(self):
+        self.app.control.time_limit(
+            self.mytask.name, soft=10, hard=20,
+            destination='a@q.com', limit=99,
+        )
+        self.assert_control_called_with_args(
+            'time_limit',
+            destination='a@q.com',
+            task_name=self.mytask.name,
+            soft=10,
+            hard=20,
+            _options={'limit': 99},
+        )
 
 
-    @with_mock_broadcast
     def test_add_consumer(self):
     def test_add_consumer(self):
-        self.control.add_consumer('foo')
-        assert 'add_consumer' in MockMailbox.sent
+        self.app.control.add_consumer('foo')
+        self.assert_control_called_with_args(
+            'add_consumer',
+            destination=None,
+            queue='foo',
+            exchange=None,
+            exchange_type='direct',
+            routing_key=None,
+        )
+
+    def test_add_consumer__with_options_and_dest(self):
+        self.app.control.add_consumer(
+            'foo', 'ex', 'topic', 'rkey', destination='a@q.com', limit=78)
+        self.assert_control_called_with_args(
+            'add_consumer',
+            destination='a@q.com',
+            queue='foo',
+            exchange='ex',
+            exchange_type='topic',
+            routing_key='rkey',
+            _options={'limit': 78},
+        )
 
 
-    @with_mock_broadcast
     def test_cancel_consumer(self):
     def test_cancel_consumer(self):
-        self.control.cancel_consumer('foo')
-        assert 'cancel_consumer' in MockMailbox.sent
+        self.app.control.cancel_consumer('foo')
+        self.assert_control_called_with_args(
+            'cancel_consumer',
+            destination=None,
+            queue='foo',
+        )
+
+    def test_cancel_consumer__with_destination(self):
+        self.app.control.cancel_consumer(
+            'foo', destination='w1@q.com', limit=3)
+        self.assert_control_called_with_args(
+            'cancel_consumer',
+            destination='w1@q.com',
+            queue='foo',
+            _options={'limit': 3},
+        )
 
 
-    @with_mock_broadcast
     def test_shutdown(self):
     def test_shutdown(self):
-        self.control.shutdown()
-        assert 'shutdown' in MockMailbox.sent
+        self.app.control.shutdown()
+        self.assert_control_called_with_args('shutdown', destination=None)
+
+    def test_shutdown__with_destination(self):
+        self.app.control.shutdown(destination='a@q.com', limit=3)
+        self.assert_control_called_with_args(
+            'shutdown', destination='a@q.com', _options={'limit': 3})
 
 
-    @with_mock_broadcast
     def test_heartbeat(self):
     def test_heartbeat(self):
-        self.control.heartbeat()
-        assert 'heartbeat' in MockMailbox.sent
+        self.app.control.heartbeat()
+        self.assert_control_called_with_args('heartbeat', destination=None)
+
+    def test_heartbeat__with_destination(self):
+        self.app.control.heartbeat(destination='a@q.com', limit=3)
+        self.assert_control_called_with_args(
+            'heartbeat', destination='a@q.com', _options={'limit': 3})
 
 
-    @with_mock_broadcast
     def test_pool_restart(self):
     def test_pool_restart(self):
-        self.control.pool_restart()
-        assert 'pool_restart' in MockMailbox.sent
+        self.app.control.pool_restart()
+        self.assert_control_called_with_args(
+            'pool_restart',
+            destination=None,
+            modules=None,
+            reload=False,
+            reloader=None)
 
 
-    @with_mock_broadcast
     def test_terminate(self):
     def test_terminate(self):
-        self.control.terminate('124')
-        assert 'revoke' in MockMailbox.sent
+        self.app.control.revoke = Mock(name='revoke')
+        self.app.control.terminate('124')
+        self.app.control.revoke.assert_called_with(
+            '124', destination=None,
+            terminate=True,
+            signal=control.TERM_SIGNAME,
+        )
 
 
-    @with_mock_broadcast
     def test_enable_events(self):
     def test_enable_events(self):
-        self.control.enable_events()
-        assert 'enable_events' in MockMailbox.sent
+        self.app.control.enable_events()
+        self.assert_control_called_with_args('enable_events', destination=None)
+
+    def test_enable_events_with_destination(self):
+        self.app.control.enable_events(destination='a@q.com', limit=3)
+        self.assert_control_called_with_args(
+            'enable_events', destination='a@q.com', _options={'limit': 3})
 
 
-    @with_mock_broadcast
     def test_disable_events(self):
     def test_disable_events(self):
-        self.control.disable_events()
-        assert 'disable_events' in MockMailbox.sent
+        self.app.control.disable_events()
+        self.assert_control_called_with_args(
+            'disable_events', destination=None)
 
 
-    @with_mock_broadcast
-    def test_revoke(self):
-        self.control.revoke('foozbaaz')
-        assert 'revoke' in MockMailbox.sent
+    def test_disable_events_with_destination(self):
+        self.app.control.disable_events(destination='a@q.com', limit=3)
+        self.assert_control_called_with_args(
+            'disable_events', destination='a@q.com', _options={'limit': 3})
 
 
-    @with_mock_broadcast
     def test_ping(self):
     def test_ping(self):
-        self.control.ping()
-        assert 'ping' in MockMailbox.sent
+        self.app.control.ping()
+        self.assert_control_called_with_args(
+            'ping', destination=None,
+            _options={'timeout': 1.0, 'reply': True})
+
+    def test_ping_with_destination(self):
+        self.app.control.ping(destination='a@q.com', limit=3)
+        self.assert_control_called_with_args(
+            'ping',
+            destination='a@q.com',
+            _options={
+                'limit': 3,
+                'timeout': 1.0,
+                'reply': True,
+            })
+
+    def test_revoke(self):
+        self.app.control.revoke('foozbaaz')
+        self.assert_control_called_with_args(
+            'revoke',
+            destination=None,
+            task_id='foozbaaz',
+            signal=control.TERM_SIGNAME,
+            terminate=False,
+        )
+
+    def test_revoke__with_options(self):
+        self.app.control.revoke(
+            'foozbaaz',
+            destination='a@q.com',
+            terminate=True,
+            signal='KILL',
+            limit=404,
+        )
+        self.assert_control_called_with_args(
+            'revoke',
+            destination='a@q.com',
+            task_id='foozbaaz',
+            signal='KILL',
+            terminate=True,
+            _options={'limit': 404},
+        )
 
 
-    @with_mock_broadcast
     def test_election(self):
     def test_election(self):
-        self.control.election('some_id', 'topic', 'action')
-        assert 'election' in MockMailbox.sent
+        self.app.control.election('some_id', 'topic', 'action')
+        self.assert_control_called_with_args(
+            'election',
+            destination=None,
+            topic='topic',
+            action='action',
+            id='some_id',
+            _options={'connection': None},
+        )
+
+    def test_autoscale(self):
+        self.app.control.autoscale(300, 10)
+        self.assert_control_called_with_args(
+            'autoscale', max=300, min=10, destination=None)
+
+    def test_autoscale__with_options(self):
+        self.app.control.autoscale(300, 10, destination='a@q.com', limit=39)
+        self.assert_control_called_with_args(
+            'autoscale', max=300, min=10,
+            destination='a@q.com',
+            _options={'limit': 39}
+        )
 
 
-    @with_mock_broadcast
     def test_pool_grow(self):
     def test_pool_grow(self):
-        self.control.pool_grow(2)
-        assert 'pool_grow' in MockMailbox.sent
+        self.app.control.pool_grow(2)
+        self.assert_control_called_with_args(
+            'pool_grow', n=2, destination=None)
+
+    def test_pool_grow__with_options(self):
+        self.app.control.pool_grow(2, destination='a@q.com', limit=39)
+        self.assert_control_called_with_args(
+            'pool_grow', n=2,
+            destination='a@q.com',
+            _options={'limit': 39}
+        )
 
 
-    @with_mock_broadcast
     def test_pool_shrink(self):
     def test_pool_shrink(self):
-        self.control.pool_shrink(2)
-        assert 'pool_shrink' in MockMailbox.sent
+        self.app.control.pool_shrink(2)
+        self.assert_control_called_with_args(
+            'pool_shrink', n=2, destination=None)
+
+    def test_pool_shrink__with_options(self):
+        self.app.control.pool_shrink(2, destination='a@q.com', limit=39)
+        self.assert_control_called_with_args(
+            'pool_shrink', n=2,
+            destination='a@q.com',
+            _options={'limit': 39}
+        )
 
 
-    @with_mock_broadcast
     def test_revoke_from_result(self):
     def test_revoke_from_result(self):
+        self.app.control.revoke = Mock(name='revoke')
         self.app.AsyncResult('foozbazzbar').revoke()
         self.app.AsyncResult('foozbazzbar').revoke()
-        assert 'revoke' in MockMailbox.sent
+        self.app.control.revoke.assert_called_with(
+            'foozbazzbar',
+            connection=None, reply=False, signal=None,
+            terminate=False, timeout=None)
 
 
-    @with_mock_broadcast
     def test_revoke_from_resultset(self):
     def test_revoke_from_resultset(self):
-        r = self.app.GroupResult(uuid(),
-                                 [self.app.AsyncResult(x)
-                                  for x in [uuid() for i in range(10)]])
+        self.app.control.revoke = Mock(name='revoke')
+        uuids = [uuid() for _ in range(10)]
+        r = self.app.GroupResult(
+            uuid(), [self.app.AsyncResult(x) for x in uuids])
         r.revoke()
         r.revoke()
-        assert 'revoke' in MockMailbox.sent
+        self.app.control.revoke.assert_called_with(
+            uuids,
+            connection=None, reply=False, signal=None,
+            terminate=False, timeout=None)

+ 1 - 1
t/unit/tasks/test_tasks.py

@@ -334,7 +334,7 @@ class test_tasks(TasksCase):
             self.increment_counter.apply_async([], 'str')
             self.increment_counter.apply_async([], 'str')
 
 
     def test_task_args_must_be_list(self):
     def test_task_args_must_be_list(self):
-        with pytest.raises(ValueError):
+        with pytest.raises(TypeError):
             self.increment_counter.apply_async('s', {})
             self.increment_counter.apply_async('s', {})
 
 
     def test_regular_task(self):
     def test_regular_task(self):