Selaa lähdekoodia

100% coverage for celery.app.*

Ask Solem 12 vuotta sitten
vanhempi
commit
2f376b1705

+ 2 - 2
celery/__init__.py

@@ -25,7 +25,7 @@ VERSION_BANNER = '{0} ({1})'.format(__version__, SERIES)
 # -eof meta-
 
 import os
-if os.environ.get('C_IMPDEBUG'):
+if os.environ.get('C_IMPDEBUG'):  # pragma: no cover
     import sys
     from .five import builtins
     real_import = builtins.__import__
@@ -40,7 +40,7 @@ if os.environ.get('C_IMPDEBUG'):
 
 STATICA_HACK = True
 globals()['kcah_acitats'[::-1].upper()] = False
-if STATICA_HACK:
+if STATICA_HACK:  # pragma: no cover
     # This is never executed, but tricks static analyzers (PyDev, PyCharm,
     # pylint, etc.) into knowing the types of these symbols, and what
     # they contain.

+ 1 - 1
celery/__main__.py

@@ -51,5 +51,5 @@ def _compat_beat():
     main()
 
 
-if __name__ == '__main__':
+if __name__ == '__main__':  # pragma: no cover
     main()

+ 1 - 1
celery/_state.py

@@ -53,7 +53,7 @@ def _get_current_app():
     return _tls.current_app or default_app
 
 C_STRICT_APP = os.environ.get('C_STRICT_APP')
-if os.environ.get('C_STRICT_APP'):
+if os.environ.get('C_STRICT_APP'):  # pragma: no cover
     def get_current_app():
         import traceback
         print('-- USES CURRENT_APP', file=sys.stderr)  # noqa+

+ 2 - 3
celery/app/base.py

@@ -39,7 +39,7 @@ from .builtins import shared_task, load_shared_tasks
 from .defaults import DEFAULTS, find_deprecated_settings
 from .registry import TaskRegistry
 from .utils import (
-    AppPickler, Settings, bugreport, _unpickle_app, _unpickle_app_v2,
+    AppPickler, Settings, bugreport, _unpickle_app, _unpickle_app_v2, appstr,
 )
 
 _EXECV = os.environ.get('FORKED_BY_MULTIPROCESSING')
@@ -456,8 +456,7 @@ class Celery(object):
         return attrgetter(path)(self)
 
     def __repr__(self):
-        return '<{0} {1}:0x{2:x}>'.format(
-            type(self).__name__, self.main or '__main__', id(self))
+        return '<{0} {1}>'.format(type(self).__name__, appstr(self))
 
     def __reduce__(self):
         if self._using_v1_reduce:

+ 7 - 6
celery/app/builtins.py

@@ -252,7 +252,7 @@ def add_chain_task(app):
                         res = Signature._freeze(task)
                         task = chord(task, body=next_step, task_id=res.task_id)
                     except IndexError:
-                        pass
+                        pass  # no callback, so keep as group
                 if prev_task:
                     # link previous task to this task.
                     prev_task.link(task)
@@ -352,7 +352,8 @@ def add_chord_task(app):
             opts.update(chord=body, group_id=group_id)
             return task_id
 
-        def apply_async(self, args=(), kwargs={}, task_id=None, **options):
+        def apply_async(self, args=(), kwargs={}, task_id=None,
+                        group_id=None, chord=None, **options):
             if self.app.conf.CELERY_ALWAYS_EAGER:
                 return self.apply(args, kwargs, **options)
             header = kwargs.pop('header')
@@ -360,10 +361,10 @@ def add_chord_task(app):
             header, body = (list(maybe_subtask(header)),
                             maybe_subtask(body))
             # forward certain options to body
-            for opt_name in ['group_id', 'chord']:
-                opt_value = options.pop(opt_name, None)
-                if opt_value:
-                    body.set(**{opt_name: opt_value})
+            if chord is not None:
+                body.options['chord'] = chord
+            if group_id is not None:
+                body.options['group_id'] = group_id
             [body.link(s) for s in options.pop('link', [])]
             [body.link_error(s) for s in options.pop('link_error', [])]
             callback_id = body.options.setdefault('task_id', task_id or uuid())

+ 2 - 2
celery/app/control.py

@@ -269,7 +269,7 @@ class Control(object):
         Supports the same arguments as :meth:`broadcast`.
 
         """
-        return self.broadcast('pool_grow', {}, destination, **kwargs)
+        return self.broadcast('pool_grow', {'n': n}, destination, **kwargs)
 
     def pool_shrink(self, n=1, destination=None, **kwargs):
         """Tell all (or specific) workers to shrink the pool by ``n``.
@@ -277,7 +277,7 @@ class Control(object):
         Supports the same arguments as :meth:`broadcast`.
 
         """
-        return self.broadcast('pool_shrink', {}, destination, **kwargs)
+        return self.broadcast('pool_shrink', {'n': n}, destination, **kwargs)
 
     def broadcast(self, command, arguments=None, destination=None,
                   connection=None, reply=False, timeout=1, limit=None,

+ 2 - 7
celery/app/routes.py

@@ -63,13 +63,8 @@ class Router(object):
             try:
                 Q = self.queues[queue]  # noqa
             except KeyError:
-                if not self.create_missing:
-                    raise QueueNotFound(
-                        'Queue {0!r} missing from CELERY_QUEUES'.format(queue))
-                for key in 'exchange', 'routing_key':
-                    if route.get(key) is None:
-                        route[key] = queue
-                Q = self.app.amqp.queues.add(queue, **route)
+                raise QueueNotFound(
+                    'Queue {0!r} missing from CELERY_QUEUES'.format(queue))
             # needs to be declared by publisher
             route['queue'] = Q
         return route

+ 26 - 8
celery/app/task.py

@@ -25,6 +25,7 @@ from celery.utils.mail import ErrorMail
 
 from .annotations import resolve_all as resolve_all_annotations
 from .registry import _unpickle_task_v2
+from .utils import appstr
 
 #: extracts attributes related to publishing a message from an object.
 extract_exec_options = mattrgetter(
@@ -33,6 +34,29 @@ extract_exec_options = mattrgetter(
     'immediate', 'mandatory',  # imm+man is deprecated
 )
 
+# We take __repr__ very seriously around here ;)
+R_BOUND_TASK = '<class {0.__name__} of {app}{flags}>'
+R_UNBOUND_TASK = '<unbound {0.__name__}{flags}>'
+R_SELF_TASK = '<@task {0.name} bound to other {0.__self__}>'
+R_INSTANCE = '<@task: {0.name} of {app}{flags}>'
+
+
+def _strflags(flags, default=''):
+    if flags:
+        return ' ({0})'.format(', '.join(flags))
+    return default
+
+
+def _reprtask(task, fmt=None, flags=None):
+    flags = list(flags) if flags is not None else []
+    flags.append('v2 compatible') if task.__v2_compat__ else None
+    if not fmt:
+        fmt = R_BOUND_TASK if task._app else R_UNBOUND_TASK
+    return fmt.format(
+        task, flags=_strflags(flags),
+        app=appstr(task._app) if task._app else None,
+    )
+
 
 class Context(object):
     # Default context
@@ -124,11 +148,7 @@ class TaskType(type):
         return instance.__class__
 
     def __repr__(cls):
-        if cls._app:
-            return '<class {0.__name__} of {0._app}>'.format(cls)
-        if cls.__v2_compat__:
-            return '<unbound {0.__name__} (v2 compatible)>'.format(cls)
-        return '<unbound {0.__name__}>'.format(cls)
+        return _reprtask(cls)
 
 
 @with_metaclass(TaskType)
@@ -782,9 +802,7 @@ class Task(object):
 
     def __repr__(self):
         """`repr(task)`"""
-        if self.__self__:
-            return '<bound task {0.name} of {0.__self__}>'.format(self)
-        return '<@task: {0.name}>'.format(self)
+        return _reprtask(self, R_SELF_TASK if self.__self__ else R_INSTANCE)
 
     def _get_request(self):
         """Get current request object."""

+ 5 - 0
celery/app/utils.py

@@ -37,6 +37,11 @@ HIDDEN_SETTINGS = re.compile(
 )
 
 
+def appstr(app):
+    """String used in __repr__ etc, to id app instances."""
+    return '{0}:0x{1:x}'.format(app.main or '__main__', id(app))
+
+
 class Settings(ConfigurationView):
     """Celery settings object."""
 

+ 1 - 1
celery/concurrency/processes.py

@@ -352,7 +352,7 @@ class AsynPool(_pool.Pool):
                 task = resq.recv()
             except (IOError, EOFError) as exc:
                 debug('got %r while flushing process %r',
-                        exc, proc, exc_info=1)
+                      exc, proc, exc_info=1)
                 break
             else:
                 if task is not None:

+ 2 - 2
celery/contrib/migrate.py

@@ -239,7 +239,7 @@ def prepare_queues(queues):
 def start_filter(app, conn, filter, limit=None, timeout=1.0,
                  ack_messages=False, tasks=None, queues=None,
                  callback=None, forever=False, on_declare_queue=None,
-                 consume_from=None, state=None, **kwargs):
+                 consume_from=None, state=None, accept=None, **kwargs):
     state = state or State()
     queues = prepare_queues(queues)
     consume_from = [_maybe_queue(app, q)
@@ -257,7 +257,7 @@ def start_filter(app, conn, filter, limit=None, timeout=1.0,
     def ack_message(body, message):
         message.ack()
 
-    consumer = app.amqp.TaskConsumer(conn, queues=consume_from)
+    consumer = app.amqp.TaskConsumer(conn, queues=consume_from, accept=accept)
 
     if tasks:
         filter = filter_callback(filter, tasks)

+ 75 - 0
celery/tests/app/test_amqp.py

@@ -35,6 +35,38 @@ class test_TaskProducer(AppCase):
         prod.publish_task('tasks.add', (2, 2), {}, retry=False, chord=123)
         self.assertFalse(prod.connection.ensure.call_count)
 
+    def test_publish_custom_queue(self):
+        prod = self.app.amqp.TaskProducer(Mock())
+        self.app.amqp.queues['some_queue'] = Queue(
+            'xxx', Exchange('yyy'), 'zzz',
+        )
+        prod.channel.connection.client.declared_entities = set()
+        prod.publish = Mock()
+        prod.publish_task('tasks.add', (8, 8), {}, retry=False,
+                          queue='some_queue')
+        self.assertEqual(prod.publish.call_args[1]['exchange'], 'yyy')
+        self.assertEqual(prod.publish.call_args[1]['routing_key'], 'zzz')
+
+    def test_event_dispatcher(self):
+        prod = self.app.amqp.TaskProducer(Mock())
+        self.assertTrue(prod.event_dispatcher)
+        self.assertFalse(prod.event_dispatcher.enabled)
+
+
+class test_TaskConsumer(AppCase):
+
+    def test_accept_content(self):
+        with self.app.pool.acquire(block=True) as conn:
+            self.app.conf.CELERY_ACCEPT_CONTENT = ['application/json']
+            self.assertEqual(
+                self.app.amqp.TaskConsumer(conn).accept,
+                set(['application/json'])
+            )
+            self.assertEqual(
+                self.app.amqp.TaskConsumer(conn, accept=['json']).accept,
+                set(['application/json']),
+            )
+
 
 class test_compat_TaskPublisher(AppCase):
 
@@ -123,6 +155,49 @@ class test_Queues(AppCase):
         self.assertIsInstance(q['foo'], Queue)
         self.assertEqual(q['foo'].routing_key, 'rk')
 
+    def test_with_ha_policy(self):
+        qn = Queues(ha_policy=None, create_missing=False)
+        qn.add('xyz')
+        self.assertIsNone(qn['xyz'].queue_arguments)
+
+        qn.add('xyx', queue_arguments={'x-foo': 'bar'})
+        self.assertEqual(qn['xyx'].queue_arguments, {'x-foo': 'bar'})
+
+        q = Queues(ha_policy='all', create_missing=False)
+        q.add(Queue('foo'))
+        self.assertEqual(q['foo'].queue_arguments, {'x-ha-policy': 'all'})
+
+        qq = Queue('xyx2', queue_arguments={'x-foo': 'bari'})
+        q.add(qq)
+        self.assertEqual(q['xyx2'].queue_arguments, {
+            'x-ha-policy': 'all',
+            'x-foo': 'bari',
+        })
+
+        q2 = Queues(ha_policy=['A', 'B', 'C'], create_missing=False)
+        q2.add(Queue('foo'))
+        self.assertEqual(q2['foo'].queue_arguments, {
+            'x-ha-policy': 'nodes',
+            'x-ha-policy-params': ['A', 'B', 'C'],
+        })
+
+    def test_select_add(self):
+        q = Queues()
+        q.select_subset(['foo', 'bar'])
+        q.select_add('baz')
+        self.assertItemsEqual(q._consume_from.keys(), ['foo', 'bar', 'baz'])
+
+    def test_select_remove(self):
+        q = Queues()
+        q.select_subset(['foo', 'bar'])
+        q.select_remove('bar')
+        self.assertItemsEqual(q._consume_from.keys(), ['foo'])
+
+    def test_with_ha_policy_compat(self):
+        q = Queues(ha_policy='all')
+        q.add('bar')
+        self.assertEqual(q['bar'].queue_arguments, {'x-ha-policy': 'all'})
+
     def test_add_default_exchange(self):
         ex = Exchange('fff', 'fanout')
         q = Queues(default_exchange=ex)

+ 154 - 5
celery/tests/app/test_app.py

@@ -7,9 +7,10 @@ from pickle import loads, dumps
 
 from kombu import Exchange
 
-from celery import Celery
+from celery import Celery, shared_task, current_app
 from celery import app as _app
 from celery import _state
+from celery.app import base as _appbase
 from celery.app import defaults
 from celery.exceptions import ImproperlyConfigured
 from celery.five import items
@@ -18,8 +19,13 @@ from celery.platforms import pyimplementation
 from celery.utils.serialization import pickle
 
 from celery.tests import config
-from celery.tests.utils import (Case, mask_modules, platform_pyimp,
-                                sys_platform, pypy_version)
+from celery.tests.utils import (
+    Case,
+    mask_modules,
+    platform_pyimp,
+    sys_platform,
+    pypy_version,
+)
 from celery.utils import uuid
 from celery.utils.mail import ErrorMail
 
@@ -74,6 +80,100 @@ class test_App(Case):
         task = app.task(fun)
         self.assertEqual(task.name, app.main + '.fun')
 
+    def test_with_config_source(self):
+        app = Celery(set_as_current=False, config_source=ObjectConfig)
+        self.assertEqual(app.conf.FOO, 1)
+        self.assertEqual(app.conf.BAR, 2)
+
+    def test_task_windows_execv(self):
+        app = Celery(set_as_current=False)
+
+        prev, _appbase._EXECV = _appbase._EXECV, True
+        try:
+
+            @app.task()
+            def foo():
+                pass
+
+            self.assertTrue(foo._get_current_object())  # is proxy
+
+        finally:
+            _appbase._EXECV = prev
+        assert not _appbase._EXECV
+
+    def test_task_takes_no_args(self):
+        app = Celery(set_as_current=False)
+
+        with self.assertRaises(TypeError):
+            @app.task(1)
+            def foo():
+                pass
+
+    def test_add_defaults(self):
+        app = Celery(set_as_current=False)
+
+        self.assertFalse(app.configured)
+        _conf = {'FOO': 300}
+        conf = lambda: _conf
+        app.add_defaults(conf)
+        self.assertIn(conf, app._pending_defaults)
+        self.assertFalse(app.configured)
+        self.assertEqual(app.conf.FOO, 300)
+        self.assertTrue(app.configured)
+        self.assertFalse(app._pending_defaults)
+
+        # defaults not pickled
+        appr = loads(dumps(app))
+        with self.assertRaises(AttributeError):
+            appr.conf.FOO
+
+        # add more defaults after configured
+        conf2 = {'FOO': 'BAR'}
+        app.add_defaults(conf2)
+        self.assertEqual(app.conf.FOO, 'BAR')
+
+        self.assertIn(_conf, app.conf.defaults)
+        self.assertIn(conf2, app.conf.defaults)
+
+    def test_connection_or_acquire(self):
+
+        with self.app.connection_or_acquire(block=True):
+            self.assertTrue(self.app.pool._dirty)
+
+        with self.app.connection_or_acquire(pool=False):
+            self.assertFalse(self.app.pool._dirty)
+
+    def test_maybe_close_pool(self):
+        cpool = self.app._pool = Mock()
+        ppool = self.app.amqp._producer_pool = Mock()
+        self.app._maybe_close_pool()
+        cpool.force_close_all.assert_called_with()
+        ppool.force_close_all.assert_called_with()
+        self.assertIsNone(self.app._pool)
+        self.assertIsNone(self.app.amqp._producer_pool)
+
+        self.app._pool = Mock()
+        self.app._maybe_close_pool()
+        self.app._maybe_close_pool()
+
+    def test_using_v1_reduce(self):
+        self.app._using_v1_reduce = True
+        self.assertTrue(loads(dumps(self.app)))
+
+    def test_autodiscover_tasks(self):
+        self.app.conf.CELERY_FORCE_BILLIARD_LOGGING = True
+        with patch('celery.app.base.ensure_process_aware_logger') as ep:
+            self.app.loader.autodiscover_tasks = Mock()
+            self.app.autodiscover_tasks(['proj.A', 'proj.B'])
+            ep.assert_called_with()
+            self.app.loader.autodiscover_tasks.assert_called_with(
+                ['proj.A', 'proj.B'], 'tasks',
+            )
+        with patch('celery.app.base.ensure_process_aware_logger') as ep:
+            self.app.conf.CELERY_FORCE_BILLIARD_LOGGING = False
+            self.app.autodiscover_tasks(['proj.A', 'proj.B'])
+            self.assertFalse(ep.called)
+
     def test_with_broker(self):
         prev = os.environ.get('CELERY_BROKER_URL')
         os.environ.pop('CELERY_BROKER_URL', None)
@@ -117,13 +217,13 @@ class test_App(Case):
             _state._task_stack.pop()
 
     def test_task_not_shared(self):
-        with patch('celery.app.base.shared_task') as shared_task:
+        with patch('celery.app.base.shared_task') as sh:
             app = Celery(set_as_current=False)
 
             @app.task(shared=False)
             def foo():
                 pass
-            self.assertFalse(shared_task.called)
+            self.assertFalse(sh.called)
 
     def test_task_compat_with_filter(self):
         app = Celery(set_as_current=False, accept_magic_kwargs=True)
@@ -146,6 +246,8 @@ class test_App(Case):
             check(task)
             return task
 
+        assert not _appbase._EXECV
+
         @app.task(filter=filter)
         def foo():
             pass
@@ -367,6 +469,18 @@ class test_App(Case):
         # not set as current, so ends up as default app after reduce
         self.assertIs(r.app, _state.default_app)
 
+    def test_get_active_apps(self):
+        self.assertTrue(list(_state._get_active_apps()))
+
+        app1 = Celery(set_as_current=False)
+        appid = id(app1)
+        self.assertIn(app1, _state._get_active_apps())
+        del(app1)
+
+        # weakref removed from list when app goes out of scope.
+        with self.assertRaises(StopIteration):
+            next(app for app in _state._get_active_apps() if id(app) == appid)
+
     def test_config_from_envvar_more(self, key='CELERY_HARNESS_CFG1'):
         self.assertFalse(self.app.config_from_envvar('HDSAJIHWIQHEWQU',
                                                      silent=True))
@@ -543,3 +657,38 @@ class test_pyimplementation(Case):
             with sys_platform('darwin'):
                 with pypy_version():
                     self.assertEqual('CPython', pyimplementation())
+
+
+class test_shared_task(Case):
+
+    def setUp(self):
+        self._restore_app = current_app._get_current_object()
+
+    def tearDown(self):
+        self._restore_app.set_current()
+
+    def test_registers_to_all_apps(self):
+        xproj = Celery('xproj')
+        xproj.finalize()
+
+        @shared_task
+        def foo():
+            return 42
+
+        @shared_task()
+        def bar():
+            return 84
+
+        self.assertIs(foo.app, xproj)
+        self.assertIs(bar.app, xproj)
+        self.assertTrue(foo._get_current_object())
+
+        yproj = Celery('yproj')
+        self.assertIs(foo.app, yproj)
+        self.assertIs(bar.app, yproj)
+
+        @shared_task()
+        def baz():
+            return 168
+
+        self.assertIs(baz.app, yproj)

+ 60 - 0
celery/tests/app/test_builtins.py

@@ -4,6 +4,7 @@ from mock import Mock, patch
 
 from celery import current_app as app, group, task, chord
 from celery.app import builtins
+from celery.canvas import Signature
 from celery.five import range
 from celery._state import _task_stack
 from celery.tests.utils import Case
@@ -100,6 +101,13 @@ class test_group(Case):
         x = group([add.s(4, 4), add.s(8, 8)])
         x.apply_async()
 
+    def test_apply_empty(self):
+        x = group()
+        x.apply()
+        res = x.apply_async()
+        self.assertFalse(res)
+        self.assertFalse(res.results)
+
     def test_apply_async_with_parent(self):
         _task_stack.push(add)
         try:
@@ -133,6 +141,49 @@ class test_chain(Case):
         self.assertTrue(result.parent.parent)
         self.assertIsNone(result.parent.parent.parent)
 
+    def test_group_to_chord(self):
+        c = (
+            group(add.s(i, i) for i in range(5)) |
+            add.s(10) |
+            add.s(20) |
+            add.s(30)
+        )
+        tasks, _ = c.type.prepare_steps((), c.tasks)
+        self.assertIsInstance(tasks[0], chord)
+        self.assertTrue(tasks[0].body.options['link'])
+        self.assertTrue(tasks[0].body.options['link'][0].options['link'])
+
+        c2 = add.s(2, 2) | group(add.s(i, i) for i in range(10))
+        tasks2, _ = c2.type.prepare_steps((), c2.tasks)
+        self.assertIsInstance(tasks2[1], group)
+
+    def test_apply_options(self):
+
+        class static(Signature):
+
+            def clone(self, *args, **kwargs):
+                return self
+
+        def s(*args, **kwargs):
+            return static(add.name, args, kwargs)
+
+        c = s(2, 2) | s(4, 4) | s(8, 8)
+        r1 = c.apply_async(task_id='some_id')
+        self.assertEqual(r1.id, 'some_id')
+
+        c.apply_async(group_id='some_group_id')
+        self.assertEqual(c.tasks[-1].options['group_id'], 'some_group_id')
+
+        c.apply_async(chord='some_chord_id')
+        self.assertEqual(c.tasks[-1].options['chord'], 'some_chord_id')
+
+        c.apply_async(link=[s(32)])
+        self.assertListEqual(c.tasks[-1].options['link'], [s(32)])
+
+        c.apply_async(link_error=[s('error')])
+        for task in c.tasks:
+            self.assertListEqual(task.options['link_error'], [s('error')])
+
 
 class test_chord(Case):
 
@@ -152,6 +203,15 @@ class test_chord(Case):
     def test_run_header_not_group(self):
         self.task([add.s(i, i) for i in range(10)], xsum.s())
 
+    def test_forward_options(self):
+        body = xsum.s()
+        x = chord([add.s(i, i) for i in range(10)], body=body)
+        x.apply_async(group_id='some_group_id')
+        self.assertEqual(body.options['group_id'], 'some_group_id')
+        x2 = chord([add.s(i, i) for i in range(10)], body=body)
+        x2.apply_async(chord='some_chord_id')
+        self.assertEqual(body.options['chord'], 'some_chord_id')
+
     def test_apply_eager(self):
         app.conf.CELERY_ALWAYS_EAGER = True
         try:

+ 72 - 10
celery/tests/app/test_control.py

@@ -1,14 +1,15 @@
 from __future__ import absolute_import
 
+import warnings
+
 from functools import wraps
 
 from kombu.pidbox import Mailbox
 
-from celery.app import app_or_default
 from celery.app import control
 from celery.task import task
 from celery.utils import uuid
-from celery.tests.utils import Case
+from celery.tests.utils import AppCase, Case
 
 
 @task()
@@ -45,12 +46,29 @@ def with_mock_broadcast(fun):
     return _resets
 
 
-class test_inspect(Case):
+class test_flatten_reply(Case):
+
+    def test_flatten_reply(self):
+        reply = [
+            {'foo@example.com': {'hello': 10}},
+            {'foo@example.com': {'hello': 20}},
+            {'bar@example.com': {'hello': 30}}
+        ]
+        with warnings.catch_warnings(record=True) as w:
+            nodes = control.flatten_reply(reply)
+            self.assertIn(
+                'multiple replies',
+                str(w[-1].message),
+            )
+            self.assertIn('foo@example.com', nodes)
+            self.assertIn('bar@example.com', nodes)
+
 
-    def setUp(self):
-        app = self.app = app_or_default()
-        self.c = Control(app=app)
-        self.prev, app.control = app.control, self.c
+class test_inspect(AppCase):
+
+    def setup(self):
+        self.c = Control(app=self.app)
+        self.prev, self.app.control = self.app.control, self.c
         self.i = self.c.inspect()
 
     def tearDown(self):
@@ -70,6 +88,36 @@ class test_inspect(Case):
         self.i.active()
         self.assertIn('dump_active', MockMailbox.sent)
 
+    @with_mock_broadcast
+    def test_clock(self):
+        self.i.clock()
+        self.assertIn('clock', MockMailbox.sent)
+
+    @with_mock_broadcast
+    def test_conf(self):
+        self.i.conf()
+        self.assertIn('dump_conf', MockMailbox.sent)
+
+    @with_mock_broadcast
+    def test_hello(self):
+        self.i.hello()
+        self.assertIn('hello', MockMailbox.sent)
+
+    @with_mock_broadcast
+    def test_memsample(self):
+        self.i.memsample()
+        self.assertIn('memsample', MockMailbox.sent)
+
+    @with_mock_broadcast
+    def test_memdump(self):
+        self.i.memdump()
+        self.assertIn('memdump', MockMailbox.sent)
+
+    @with_mock_broadcast
+    def test_objgraph(self):
+        self.i.objgraph()
+        self.assertIn('objgraph', MockMailbox.sent)
+
     @with_mock_broadcast
     def test_scheduled(self):
         self.i.scheduled()
@@ -111,10 +159,9 @@ class test_inspect(Case):
         self.assertIn('report', MockMailbox.sent)
 
 
-class test_Broadcast(Case):
+class test_Broadcast(AppCase):
 
-    def setUp(self):
-        self.app = app_or_default()
+    def setup(self):
         self.control = Control(app=self.app)
         self.app.control = self.control
 
@@ -182,6 +229,21 @@ class test_Broadcast(Case):
         self.control.ping()
         self.assertIn('ping', MockMailbox.sent)
 
+    @with_mock_broadcast
+    def test_election(self):
+        self.control.election('some_id', 'topic', 'action')
+        self.assertIn('election', MockMailbox.sent)
+
+    @with_mock_broadcast
+    def test_pool_grow(self):
+        self.control.pool_grow(2)
+        self.assertIn('pool_grow', MockMailbox.sent)
+
+    @with_mock_broadcast
+    def test_pool_shrink(self):
+        self.control.pool_shrink(2)
+        self.assertIn('pool_shrink', MockMailbox.sent)
+
     @with_mock_broadcast
     def test_revoke_from_result(self):
         self.app.AsyncResult('foozbazzbar').revoke()

+ 5 - 0
celery/tests/app/test_defaults.py

@@ -5,6 +5,8 @@ import sys
 from importlib import import_module
 from mock import Mock, patch
 
+from celery.app.defaults import NAMESPACES
+
 from celery.tests.utils import Case, pypy_version, sys_platform
 
 
@@ -17,6 +19,9 @@ class test_defaults(Case):
         if self._prev:
             sys.modules['celery.app.defaults'] = self._prev
 
+    def test_option_repr(self):
+        self.assertTrue(repr(NAMESPACES['BROKER']['URL']))
+
     def test_any(self):
         val = object()
         self.assertIs(self.defaults.Option.typemap['any'](val), val)

+ 10 - 0
celery/tests/app/test_log.py

@@ -125,12 +125,22 @@ class test_default_logger(AppCase):
 
     def test_setup_logging_subsystem_misc(self):
         log.setup_logging_subsystem(loglevel=None)
+
+    def test_setup_logging_subsystem_misc2(self):
         self.app.conf.CELERYD_HIJACK_ROOT_LOGGER = True
         try:
             log.setup_logging_subsystem()
         finally:
             self.app.conf.CELERYD_HIJACK_ROOT_LOGGER = False
 
+    def test_get_default_logger(self):
+        self.assertTrue(self.app.log.get_default_logger())
+
+    def test_configure_logger(self):
+        logger = self.app.log.get_default_logger()
+        self.app.log._configure_logger(logger, sys.stderr, None, '', False)
+        logger.handlers[:] = []
+
     def test_setup_logging_subsystem_colorize(self):
         log.setup_logging_subsystem(colorize=None)
         log.setup_logging_subsystem(colorize=True)

+ 17 - 10
celery/tests/app/test_utils.py

@@ -1,25 +1,32 @@
-"""
-Tests of celery.app.utils
-"""
-
 from __future__ import absolute_import
 
+from collections import Mapping, MutableMapping
+
+from celery import Celery
+from celery.app.utils import Settings, bugreport
 
-import unittest
+from celery.tests.utils import AppCase, Case, Mock
 
 
-class TestSettings(unittest.TestCase):
+class TestSettings(Case):
     """
     Tests of celery.app.utils.Settings
     """
     def test_is_mapping(self):
         """Settings should be a collections.Mapping"""
-        from celery.app.utils import Settings
-        from collections import Mapping
         self.assertTrue(issubclass(Settings, Mapping))
 
     def test_is_mutable_mapping(self):
         """Settings should be a collections.MutableMapping"""
-        from celery.app.utils import Settings
-        from collections import MutableMapping
         self.assertTrue(issubclass(Settings, MutableMapping))
+
+
+class test_bugreport(AppCase):
+
+    def test_no_conn_driver_info(self):
+        app = Celery(set_as_current=False)
+        app.connection = Mock()
+        conn = app.connection.return_value = Mock()
+        conn.transport = None
+
+        bugreport(app)

+ 1 - 2
celery/tests/backends/test_base.py

@@ -246,8 +246,7 @@ class test_KeyValueStoreBackend(Case):
         tid = uuid()
         tsr = GroupResult(tid, [AsyncResult(uuid()) for _ in range(10)])
         self.b.save_group(tid, tsr)
-        stored = self.b.restore_group(tid)
-        print(stored)
+        self.b.restore_group(tid)
         self.assertEqual(self.b.restore_group(tid), tsr)
         self.b.delete_group(tid)
         self.assertIsNone(self.b.restore_group(tid))

+ 54 - 3
celery/tests/bin/test_celery.py

@@ -1,9 +1,12 @@
 from __future__ import absolute_import
 
+import sys
+
 from anyjson import dumps
 from datetime import datetime
 from mock import Mock, patch
 
+from celery import __main__
 from celery import task
 from celery.platforms import EX_FAILURE, EX_USAGE, EX_OK
 from celery.bin.base import Error
@@ -20,10 +23,10 @@ from celery.bin.celery import (
     report,
     CeleryCommand,
     determine_exit_status,
-    main,
+    main as mainfun,
 )
 
-from celery.tests.utils import AppCase, WhateverIO
+from celery.tests.utils import AppCase, Case, WhateverIO, override_stdouts
 
 
 @task()
@@ -31,6 +34,54 @@ def add(x, y):
     return x + y
 
 
+class test__main__(Case):
+
+    def test_warn_deprecated(self):
+        with override_stdouts() as (stdout, _):
+            __main__._warn_deprecated('YADDA YADDA')
+            self.assertIn('command is deprecated', stdout.getvalue())
+            self.assertIn('YADDA YADDA', stdout.getvalue())
+
+    def test_maybe_patch_concurrency(self):
+        with patch('celery.platforms.maybe_patch_concurrency') as _mpc:
+            __main__.maybe_patch_concurrency()
+            _mpc.assert_called_with(sys.argv, ['-P'], ['--pool'])
+
+    def test_main(self):
+        with patch('celery.__main__.maybe_patch_concurrency') as mpc:
+            with patch('celery.bin.celery.main') as main:
+                __main__.main()
+                mpc.assert_called_with()
+                main.assert_called_with()
+
+    def test_compat_worker(self):
+        with patch('celery.__main__.maybe_patch_concurrency') as mpc:
+            with patch('celery.__main__._warn_deprecated') as depr:
+                with patch('celery.bin.worker.main') as main:
+                    __main__._compat_worker()
+                    mpc.assert_called_with()
+                    depr.assert_called_with('celery worker')
+                    main.assert_called_with()
+
+    def test_compat_multi(self):
+        with patch('celery.__main__.maybe_patch_concurrency') as mpc:
+            with patch('celery.__main__._warn_deprecated') as depr:
+                with patch('celery.bin.multi.main') as main:
+                    __main__._compat_multi()
+                    mpc.assert_called_with()
+                    depr.assert_called_with('celery multi')
+                    main.assert_called_with()
+
+    def test_compat_beat(self):
+        with patch('celery.__main__.maybe_patch_concurrency') as mpc:
+            with patch('celery.__main__._warn_deprecated') as depr:
+                with patch('celery.bin.beat.main') as main:
+                    __main__._compat_beat()
+                    mpc.assert_called_with()
+                    depr.assert_called_with('celery beat')
+                    main.assert_called_with()
+
+
 class test_Command(AppCase):
 
     def test_Error_repr(self):
@@ -328,5 +379,5 @@ class test_main(AppCase):
     @patch('celery.bin.celery.CeleryCommand')
     def test_main(self, Command):
         command = Command.return_value = Mock()
-        main()
+        mainfun()
         command.execute_from_commandline.assert_called_with(None)

+ 5 - 4
celery/tests/contrib/test_migrate.py

@@ -77,7 +77,7 @@ class test_migrate_tasks(AppCase):
         self.assertTrue(x.default_channel.queues)
         self.assertFalse(y.default_channel.queues)
 
-        migrate_tasks(x, y)
+        migrate_tasks(x, y, accept=['text/plain'])
 
         yq = q(y.default_channel)
         self.assertEqual(yq.get().body, ensure_bytes('foo'))
@@ -86,11 +86,12 @@ class test_migrate_tasks(AppCase):
 
         Producer(x).publish('foo', exchange=name, routing_key=name)
         callback = Mock()
-        migrate_tasks(x, y, callback=callback)
+        migrate_tasks(x, y, callback=callback, accept=['text/plain'])
         self.assertTrue(callback.called)
         migrate = Mock()
         Producer(x).publish('baz', exchange=name, routing_key=name)
-        migrate_tasks(x, y, callback=callback, migrate=migrate)
+        migrate_tasks(x, y, callback=callback,
+                      migrate=migrate, accept=['text/plain'])
         self.assertTrue(migrate.called)
 
         with patch('kombu.transport.virtual.Channel.queue_declare') as qd:
@@ -106,5 +107,5 @@ class test_migrate_tasks(AppCase):
         x.default_channel.queues = {}
         y.default_channel.queues = {}
         callback = Mock()
-        migrate_tasks(x, y, callback=callback)
+        migrate_tasks(x, y, callback=callback, accept=['text/plain'])
         self.assertFalse(callback.called)

+ 110 - 37
celery/tests/tasks/test_chord.py

@@ -6,8 +6,9 @@ from contextlib import contextmanager
 from celery import canvas
 from celery import current_app
 from celery import result
+from celery.exceptions import ChordError
 from celery.five import range
-from celery.result import AsyncResult, GroupResult
+from celery.result import AsyncResult, GroupResult, EagerResult
 from celery.task import task, TaskSet
 from celery.tests.utils import AppCase, Mock
 
@@ -31,11 +32,24 @@ class TSR(GroupResult):
     def ready(self):
         return self.is_ready
 
-    def join(self, **kwargs):
+    def join(self, propagate=True, **kwargs):
+        if propagate:
+            for value in self.value:
+                if isinstance(value, Exception):
+                    raise value
         return self.value
+    join_native = join
 
-    def join_native(self, **kwargs):
-        return self.value
+    def _failed_join_report(self):
+        for value in self.value:
+            if isinstance(value, Exception):
+                yield EagerResult('some_id', value, 'FAILURE')
+
+
+class TSRNoReport(TSR):
+
+    def _failed_join_report(self):
+        return iter([])
 
 
 @contextmanager
@@ -58,47 +72,106 @@ class test_unlock_chord_task(AppCase):
             is_ready = True
             value = [2, 4, 8, 6]
 
-        @task()
-        def callback(*args, **kwargs):
-            pass
+        with self._chord_context(AlwaysReady) as (cb, retry, _):
+            cb.type.apply_async.assert_called_with(
+                ([2, 4, 8, 6], ), {}, task_id=cb.id,
+            )
+            # did not retry
+            self.assertFalse(retry.call_count)
 
-        pts, result.GroupResult = result.GroupResult, AlwaysReady
-        callback.apply_async = Mock()
-        callback_s = callback.s()
-        try:
-            with patch_unlock_retry() as (unlock, retry):
-                subtask, canvas.maybe_subtask = canvas.maybe_subtask, passthru
-                try:
-                    unlock('group_id', callback_s,
-                           result=[AsyncResult(r) for r in ['1', 2, 3]],
-                           GroupResult=AlwaysReady)
-                finally:
-                    canvas.maybe_subtask = subtask
-                callback.apply_async.assert_called_with(([2, 4, 8, 6], ), {})
-                # did not retry
-                self.assertFalse(retry.call_count)
-        finally:
-            result.GroupResult = pts
+    def test_callback_fails(self):
+        class AlwaysReady(TSR):
+            is_ready = True
+            value = [2, 4, 8, 6]
 
-    @patch('celery.result.GroupResult')
-    def test_when_not_ready(self, GroupResult):
-        with patch_unlock_retry() as (unlock, retry):
+        def setup(callback):
+            callback.apply_async.side_effect = IOError()
 
-            class NeverReady(TSR):
-                is_ready = False
+        with self._chord_context(AlwaysReady, setup) as (cb, retry, fail):
+            self.assertTrue(fail.called)
+            self.assertEqual(
+                fail.call_args[0][0], cb.id,
+            )
+            self.assertIsInstance(
+                fail.call_args[1]['exc'], ChordError,
+            )
 
-            pts, result.GroupResult = result.GroupResult, NeverReady
+    def test_unlock_ready_failed(self):
+
+        class Failed(TSR):
+            is_ready = True
+            value = [2, KeyError('foo'), 8, 6]
+
+        with self._chord_context(Failed) as (cb, retry, fail_current):
+            self.assertFalse(cb.type.apply_async.called)
+            # did not retry
+            self.assertFalse(retry.call_count)
+            self.assertTrue(fail_current.called)
+            self.assertEqual(
+                fail_current.call_args[0][0], cb.id,
+            )
+            self.assertIsInstance(
+                fail_current.call_args[1]['exc'], ChordError,
+            )
+            self.assertIn('some_id', str(fail_current.call_args[1]['exc']))
+
+    def test_unlock_ready_failed_no_culprit(self):
+        class Failed(TSRNoReport):
+            is_ready = True
+            value = [2, KeyError('foo'), 8, 6]
+
+        with self._chord_context(Failed) as (cb, retry, fail_current):
+            self.assertTrue(fail_current.called)
+            self.assertEqual(
+                fail_current.call_args[0][0], cb.id,
+            )
+            self.assertIsInstance(
+                fail_current.call_args[1]['exc'], ChordError,
+            )
+
+    @contextmanager
+    def _chord_context(self, ResultCls, setup=None, **kwargs):
+        with patch('celery.result.GroupResult'):
+
+            @task()
+            def callback(*args, **kwargs):
+                pass
+
+            pts, result.GroupResult = result.GroupResult, ResultCls
+            callback.apply_async = Mock()
+            callback_s = callback.s()
+            callback_s.id = 'callback_id'
+            fail_current = self.app.backend.fail_from_current_stack = Mock()
             try:
-                callback = Mock()
-                unlock('group_id', callback, interval=10, max_retries=30,
-                       result=[AsyncResult(x) for x in ['1', '2', '3']],
-                       GroupResult=NeverReady)
-                self.assertFalse(callback.delay.call_count)
-                # did retry
-                unlock.retry.assert_called_with(countdown=10, max_retries=30)
+                with patch_unlock_retry() as (unlock, retry):
+                    subtask, canvas.maybe_subtask = (
+                        canvas.maybe_subtask, passthru,
+                    )
+                    if setup:
+                        setup(callback)
+                    try:
+                        unlock(
+                            'group_id', callback_s,
+                            result=[AsyncResult(r) for r in ['1', 2, 3]],
+                            GroupResult=ResultCls, **kwargs
+                        )
+                    finally:
+                        canvas.maybe_subtask = subtask
+                    yield callback_s, retry, fail_current
             finally:
                 result.GroupResult = pts
 
+    @patch('celery.result.GroupResult')
+    def test_when_not_ready(self, GroupResult):
+        class NeverReady(TSR):
+            is_ready = False
+
+        with self._chord_context(NeverReady, interval=10, max_retries=30) \
+                as (cb, retry, _):
+            self.assertFalse(cb.type.apply_async.called)
+            # did retry
+            retry.assert_called_with(countdown=10, max_retries=30)
+
     def test_is_in_registry(self):
         self.assertIn('celery.chord_unlock', current_app.tasks)
 

+ 22 - 2
celery/tests/tasks/test_registry.py

@@ -1,8 +1,13 @@
 from __future__ import absolute_import
 
-from celery.app.registry import TaskRegistry
+from celery import Celery
+from celery.app.registry import (
+    TaskRegistry,
+    _unpickle_task,
+    _unpickle_task_v2,
+)
 from celery.task import Task, PeriodicTask
-from celery.tests.utils import Case
+from celery.tests.utils import AppCase, Case
 
 
 class MockTask(Task):
@@ -20,6 +25,21 @@ class MockPeriodicTask(PeriodicTask):
         return True
 
 
+class test_unpickle_task(AppCase):
+
+    def setup(self):
+        self.app = Celery(set_as_current=True)
+
+    def test_unpickle_v1(self):
+        self.app.tasks['txfoo'] = 'bar'
+        self.assertEqual(_unpickle_task('txfoo'), 'bar')
+
+    def test_unpickle_v2(self):
+        self.app.tasks['txfoo1'] = 'bar1'
+        self.assertEqual(_unpickle_task_v2('txfoo1'), 'bar1')
+        self.assertEqual(_unpickle_task_v2('txfoo1', module='celery'), 'bar1')
+
+
 class test_TaskRegistry(Case):
 
     def test_NotRegistered_str(self):

+ 15 - 1
celery/tests/tasks/test_tasks.py

@@ -232,7 +232,6 @@ class test_tasks(Case):
         @task
         def xxx():
             pass
-
         self.assertIs(pickle.loads(pickle.dumps(xxx)), xxx.app.tasks[xxx.name])
 
     def createTask(self, name):
@@ -339,6 +338,21 @@ class test_tasks(Case):
         publisher = T1.get_publisher()
         self.assertTrue(publisher.exchange)
 
+    def test_repr_v2_compat(self):
+        task = type(self.createTask('c.unittest.v2c'))
+        task.__v2_compat__ = True
+        self.assertIn('v2 compatible', repr(task))
+
+    def test_apply_with_self(self):
+
+        @task(__self__=42)
+        def tawself(self):
+            return self
+
+        self.assertEqual(tawself.apply().get(), 42)
+
+        self.assertEqual(tawself(), 42)
+
     def test_context_get(self):
         task = self.createTask('c.unittest.t.c.g')
         task.push_request()