Browse Source

Merge branch '4.0' into 5.0-devel

Ask Solem 8 years ago
parent
commit
8c3b680cad
100 changed files with 1468 additions and 1318 deletions
  1. 2 0
      .coveragerc
  2. 12 0
      .github/ISSUE_TEMPLATE
  3. 13 0
      .github/PULL_REQUEST_TEMPLATE
  4. 13 6
      .travis.yml
  5. 2 0
      CONTRIBUTING.rst
  6. 1 0
      CONTRIBUTORS.txt
  7. 28 6
      Changelog
  8. 16 14
      README.rst
  9. 1 1
      appveyor.yml
  10. 2 3
      celery/__init__.py
  11. 0 1
      celery/__main__.py
  12. 60 2
      celery/_state.py
  13. 10 72
      celery/app/__init__.py
  14. 3 1
      celery/app/base.py
  15. 0 1
      celery/app/builtins.py
  16. 0 3
      celery/app/control.py
  17. 3 3
      celery/app/defaults.py
  18. 0 2
      celery/app/registry.py
  19. 0 3
      celery/app/routes.py
  20. 26 9
      celery/app/task.py
  21. 3 3
      celery/app/trace.py
  22. 2 1
      celery/app/utils.py
  23. 0 2
      celery/apps/beat.py
  24. 8 3
      celery/backends/base.py
  25. 0 2
      celery/backends/cache.py
  26. 0 2
      celery/backends/cassandra.py
  27. 0 2
      celery/backends/consul.py
  28. 4 8
      celery/backends/couchbase.py
  29. 0 3
      celery/backends/couchdb.py
  30. 5 5
      celery/backends/database/__init__.py
  31. 1 4
      celery/backends/database/models.py
  32. 0 1
      celery/backends/database/session.py
  33. 0 4
      celery/backends/elasticsearch.py
  34. 4 5
      celery/backends/filesystem.py
  35. 0 3
      celery/backends/mongodb.py
  36. 3 6
      celery/backends/riak.py
  37. 13 2
      celery/backends/rpc.py
  38. 13 13
      celery/beat.py
  39. 0 2
      celery/bin/beat.py
  40. 78 0
      celery/bin/call.py
  41. 10 741
      celery/bin/celery.py
  42. 0 4
      celery/bin/celeryd_detach.py
  43. 237 0
      celery/bin/control.py
  44. 0 2
      celery/bin/events.py
  45. 0 2
      celery/bin/graph.py
  46. 45 0
      celery/bin/list.py
  47. 0 2
      celery/bin/logtool.py
  48. 65 0
      celery/bin/migrate.py
  49. 0 3
      celery/bin/multi.py
  50. 68 0
      celery/bin/purge.py
  51. 41 0
      celery/bin/result.py
  52. 157 0
      celery/bin/shell.py
  53. 92 0
      celery/bin/upgrade.py
  54. 0 1
      celery/bin/worker.py
  55. 145 207
      celery/canvas.py
  56. 5 4
      celery/concurrency/asynpool.py
  57. 0 1
      celery/concurrency/gevent.py
  58. 0 1
      celery/concurrency/solo.py
  59. 0 16
      celery/contrib/pytest.py
  60. 0 2
      celery/contrib/rdb.py
  61. 0 2
      celery/contrib/sphinx.py
  62. 0 3
      celery/contrib/testing/app.py
  63. 2 10
      celery/contrib/testing/manager.py
  64. 0 1
      celery/contrib/testing/mocks.py
  65. 0 2
      celery/contrib/testing/worker.py
  66. 0 2
      celery/events/dumper.py
  67. 0 1
      celery/events/snapshot.py
  68. 121 48
      celery/exceptions.py
  69. 7 1
      celery/fixups/django.py
  70. 0 2
      celery/loaders/default.py
  71. 0 2
      celery/security/__init__.py
  72. 0 3
      celery/security/certificate.py
  73. 0 1
      celery/security/key.py
  74. 0 3
      celery/security/utils.py
  75. 1 1
      celery/utils/__init__.py
  76. 0 1
      celery/utils/abstract.py
  77. 0 3
      celery/utils/deprecated.py
  78. 0 2
      celery/utils/dispatch/signal.py
  79. 35 0
      celery/utils/functional.py
  80. 0 2
      celery/utils/imports.py
  81. 0 1
      celery/utils/iso8601.py
  82. 0 3
      celery/utils/nodenames.py
  83. 53 1
      celery/utils/objects.py
  84. BIN
      celery/utils/static/celery_128.png
  85. 0 3
      celery/utils/sysinfo.py
  86. 0 2
      celery/utils/term.py
  87. 45 2
      celery/utils/text.py
  88. 2 1
      celery/utils/time.py
  89. 0 1
      celery/worker/consumer/agent.py
  90. 0 1
      celery/worker/consumer/connection.py
  91. 2 7
      celery/worker/consumer/consumer.py
  92. 0 2
      celery/worker/consumer/control.py
  93. 0 2
      celery/worker/consumer/events.py
  94. 0 2
      celery/worker/consumer/heart.py
  95. 0 1
      celery/worker/consumer/mingle.py
  96. 0 3
      celery/worker/consumer/tasks.py
  97. 0 1
      celery/worker/heartbeat.py
  98. 8 7
      celery/worker/loops.py
  99. 0 3
      celery/worker/pidbox.py
  100. 1 1
      celery/worker/request.py

+ 2 - 0
.coveragerc

@@ -19,3 +19,5 @@ omit =
     *celery/backends/riak.py
     *celery/concurrency/asynpool.py
     *celery/utils/debug.py
+    *celery/contrib/testing/*
+    *celery/contrib/pytest.py

+ 12 - 0
.github/ISSUE_TEMPLATE

@@ -0,0 +1,12 @@
+## Checklist
+
+- [ ] I have included the output of ``celery -A proj report`` in the issue.
+      (if you are not able to do this, then at least specify the Celery
+       version affected).
+- [ ] I have verified that the issue exists against the `master` branch of Celery.
+
+## Steps to reproduce
+
+## Expected behavior
+
+## Actual behavior

+ 13 - 0
.github/PULL_REQUEST_TEMPLATE

@@ -0,0 +1,13 @@
+*Note*: Before submitting this pull request, please review our [contributing
+guidelines](https://docs.celeryproject.org/en/master/contributing.html).
+
+## Description
+
+Please describe your pull request.
+
+NOTE: All patches should be made against master, not a maintenance branch like
+3.1, 2.5, etc.  That is unless the bug is already fixed in master, but not in
+that version series.
+
+If it fixes a bug or resolves a feature request,
+be sure to link to that issue via (Fixes #4412) for example.

+ 13 - 6
.travis.yml

@@ -10,21 +10,25 @@ env:
     PYTHONUNBUFFERED=yes
   matrix:
     - TOXENV=2.7-unit
-    - TOXENV=2.7-integration
+    - TOXENV=2.7-integration-rabbitmq
+    - TOXENV=2.7-integration-redis
     - TOXENV=3.4-unit
-    - TOXENV=3.4-integration
+    - TOXENV=3.4-integration-rabbitmq
+    - TOXENV=3.4-integration-redis
     - TOXENV=3.5-unit
-    - TOXENV=3.5-integration
+    - TOXENV=3.5-integration-rabbitmq
+    - TOXENV=3.5-integration-redis
     - TOXENV=pypy-unit PYPY_VERSION="5.3"
-    - TOXENV=pypy-integration PYPY_VERSION="5.3"
+    - TOXENV=pypy-integration-rabbitmq PYPY_VERSION="5.3"
+    - TOXENV=pypy-integration-redis PYPY_VERSION="5.3"
     - TOXENV=pypy3-unit
-    - TOXENV=pypy3-integration
+    - TOXENV=pypy3-integration-rabbitmq
+    - TOXENV=pypy3-integration-redis
     - TOXENV=flake8
     - TOXENV=flakeplus
     - TOXENV=apicheck
     - TOXENV=configcheck
     - TOXENV=pydocstyle
-    - TOXENV=cov
 before_install:
     - |
           if [ "$TOXENV" = "pypy" ]; then
@@ -38,6 +42,9 @@ before_install:
             virtualenv --python="$PYENV_ROOT/versions/pypy-$PYPY_VERSION/bin/python" "$HOME/virtualenvs/pypy-$PYPY_VERSION"
             source "$HOME/virtualenvs/pypy-$PYPY_VERSION/bin/activate"
           fi
+after_success:
+  - .tox/$TRAVIS_PYTHON_VERSION/bin/coverage xml
+  - .tox/$TRAVIS_PYTHON_VERSION/bin/codecov -e TOXENV
 install: travis_retry pip install -U tox
 script: tox -v -- -v
 notifications:

+ 2 - 0
CONTRIBUTING.rst

@@ -463,6 +463,8 @@ install the development requirements first:
 
     $ pip install -U -r requirements/dev.txt
 
+THIS REQUIREMENT FILE MAY NOT BE PRESENT, SKIP IF NOT FOUND.
+
 Both the stable and the development version have testing related
 dependencies, so install these next:
 

+ 1 - 0
CONTRIBUTORS.txt

@@ -219,3 +219,4 @@ Kevin Richardson, 2016/06/29
 Andrew Stewart, 2016/07/04
 Xin Li, 2016/08/03
 Alli Witheford, 2016/09/29
+Marat Sharafutdinov, 2016/11/04

+ 28 - 6
Changelog

@@ -5,16 +5,38 @@
 ================
 
 This document contains change notes for bugfix releases in
-the 4.0.x series (0today8), please see :ref:`whatsnew-4.0` for
+the 4.0.x series (latentcall), please see :ref:`whatsnew-4.0` for
 an overview of what's new in Celery 4.0.
 
+
 .. _version-4.0.0:
 
 4.0.0
 =====
-:release-date: TBA
-:status: *FROZEN*
-:branch: master
-:release-by:
+:release-date: 2016-11-04 02:00 P.M PDT
+:release-by: Ask Solem
+
+See :ref:`whatsnew-4.0` (in :file:`docs/whatsnew-4.0.rst`).
+
+.. _version-4.0.0rc7:
+
+4.0.0rc7
+========
+:release-date: 2016-11-02 01:30 P.M PDT
+
+Important notes
+---------------
+
+- Database result backend related setting names changed from
+  ``sqlalchemy_*`` -> ``database_*``.
+
+    The ``sqlalchemy_`` named settings won't work at all in this
+    version so you need to rename them.  This is a last minute change,
+    and as they were not supported in 3.1 we will not be providing
+    aliases.
+
+- ``chain(A, B, C)`` now works the same way as ``A | B | C``.
 
-See :ref:`whatsnew-4.0`.
+    This means calling ``chain()`` might not actually return a chain,
+    it can return a group or any other type depending on how the
+    workflow can be optimized.

+ 16 - 14
README.rst

@@ -1,17 +1,13 @@
-=================================
- Celery - Distributed Task Queue
-=================================
+.. image:: http://docs.celeryproject.org/en/latest/_images/celery-banner-small.png
 
-.. image:: http://cloud.github.com/downloads/celery/celery/celery_128.png
+|build-status| |license| |wheel| |pyversion| |pyimp|
 
-|build-status| |coverage| |license| |wheel| |pyversion| |pyimp|
-
-:Version: 4.0.0rc5 (0today8)
+:Version: 4.0.0 (latentcall)
 :Web: http://celeryproject.org/
 :Download: http://pypi.python.org/pypi/celery/
 :Source: https://github.com/celery/celery/
-:Keywords: task queue, job queue, asynchronous, async, rabbitmq, amqp, redis,
-  python, webhooks, queue, distributed
+:Keywords: task, queue, job, async, rabbitmq, amqp, redis,
+  python, distributed, actors
 
 --
 
@@ -35,20 +31,19 @@ Celery is written in Python, but the protocol can be implemented in any
 language. In addition to Python there's node-celery_ for Node.js,
 and a `PHP client`_.
 
-Language interoperability can also be achieved
-by `using webhooks`_.
+Language interoperability can also be achieved by using webhooks
+in such a way that the client enqueues an URL to be requested by a worker.
 
 .. _node-celery: https://github.com/mher/node-celery
 .. _`PHP client`: https://github.com/gjedeer/celery-php
-.. _`using webhooks`:
-    http://docs.celeryproject.org/en/latest/userguide/remote-tasks.html
 
 What do I need?
 ===============
 
 Celery version 5.0 runs on,
 
-- Python (3.6)
+- Python (3.5, 3.6)
+- PyPy (5.5)
 
 
 This is the last version to support Python 2.7,
@@ -314,6 +309,13 @@ Transports and Backends
 :``celery[consul]``:
     for using the Consul.io Key/Value store as a message transport or result backend (*experimental*).
 
+:``celery[django]``
+    specifies the lowest version possible for Django support.
+
+    You should probably not use this in your requirements, it's here
+    for informational purposes only.
+
+
 .. _celery-installing-from-source:
 
 Downloading and installing from source

+ 1 - 1
appveyor.yml

@@ -37,7 +37,7 @@ init:
 install:
   - "powershell extra\\appveyor\\install.ps1"
   - "%PYTHON%/Scripts/pip.exe install -U setuptools"
-  - "%PYTHON%/Scripts/pip.exe install -r requirements/dev.txt"
+  - "%PYTHON%/Scripts/pip.exe install -U eventlet"
 
 build: off
 

+ 2 - 3
celery/__init__.py

@@ -8,12 +8,11 @@
 import os
 import re
 import sys
-
 from collections import namedtuple
 
-SERIES = '0today8'
+SERIES = 'latentcall'
 
-__version__ = '4.0.0rc5'
+__version__ = '4.0.0'
 __author__ = 'Ask Solem'
 __contact__ = 'ask@celeryproject.org'
 __homepage__ = 'http://celeryproject.org'

+ 0 - 1
celery/__main__.py

@@ -1,6 +1,5 @@
 """Entry-point for the :program:`celery` umbrella command."""
 import sys
-
 from . import maybe_patch_concurrency
 
 __all__ = ['main']

+ 60 - 2
celery/_state.py

@@ -10,7 +10,6 @@ import os
 import sys
 import threading
 import weakref
-
 from celery.local import Proxy
 from celery.utils.threads import LocalStack
 
@@ -23,6 +22,13 @@ __all__ = [
 #: Global default app used when no current app.
 default_app = None
 
+#: Function returning the app provided or the default app if none.
+#:
+#: The environment variable :envvar:`CELERY_TRACE_APP` is used to
+#: trace app leaks.  When enabled an exception is raised if there
+#: is no active app.
+app_or_default = None
+
 #: List of all app instances (weakrefs), mustn't be used directly.
 _apps = weakref.WeakSet()
 
@@ -64,6 +70,16 @@ _tls = _TLS()
 _task_stack = LocalStack()
 
 
+#: Function used to push a task to the thread local stack
+#: keeping track of the currently executing task.
+#: You must remember to pop the task after.
+push_current_task = _task_stack.push
+
+#: Function used to pop a task from the thread local stack
+#: keeping track of the currently executing task.
+pop_current_task = _task_stack.pop
+
+
 def set_default_app(app):
     """Set default app."""
     global default_app
@@ -73,7 +89,7 @@ def set_default_app(app):
 def _get_current_app():
     if default_app is None:
         #: creates the global fallback app instance.
-        from celery.app import Celery
+        from celery.app.base import Celery
         set_default_app(Celery(
             'default', fixups=[], set_as_current=False,
             loader=os.environ.get('CELERY_LOADER') or 'default',
@@ -133,3 +149,45 @@ def _deregister_app(app):
 
 def _get_active_apps():
     return _apps
+
+
+def _app_or_default(app=None):
+    if app is None:
+        return get_current_app()
+    return app
+
+
+def _app_or_default_trace(app=None):  # pragma: no cover
+    from traceback import print_stack
+    try:
+        from billiard.process import current_process
+    except ImportError:
+        current_process = None
+    if app is None:
+        if getattr(_tls, 'current_app', None):
+            print('-- RETURNING TO CURRENT APP --')  # noqa+
+            print_stack()
+            return _tls.current_app
+        if not current_process or current_process()._name == 'MainProcess':
+            raise Exception('DEFAULT APP')
+        print('-- RETURNING TO DEFAULT APP --')      # noqa+
+        print_stack()
+        return default_app
+    return app
+
+
+def enable_trace():
+    """Enable tracing of app instances."""
+    global app_or_default
+    app_or_default = _app_or_default_trace
+
+
+def disable_trace():
+    """Disable tracing of app instances."""
+    global app_or_default
+    app_or_default = _app_or_default
+
+if os.environ.get('CELERY_TRACE_APP'):  # pragma: no cover
+    enable_trace()
+else:
+    disable_trace()

+ 10 - 72
celery/app/__init__.py

@@ -1,89 +1,27 @@
 # -*- coding: utf-8 -*-
 """Celery Application."""
-import os
-
 from celery.local import Proxy
 from celery import _state
 from celery._state import (
-    get_current_app as current_app,
-    get_current_task as current_task,
-    connect_on_app_finalize, set_default_app, _get_active_apps, _task_stack,
+    app_or_default, enable_trace, disable_trace,
+    push_current_task, pop_current_task,
 )
-
-from .base import Celery, AppPickler
+from .base import Celery
+from .utils import AppPickler
 
 __all__ = [
-    'Celery', 'AppPickler', 'default_app', 'app_or_default',
+    'Celery', 'AppPickler', 'app_or_default', 'default_app',
     'bugreport', 'enable_trace', 'disable_trace', 'shared_task',
-    'set_default_app', 'current_app', 'current_task',
     'push_current_task', 'pop_current_task',
 ]
 
 #: Proxy always returning the app set as default.
 default_app = Proxy(lambda: _state.default_app)
 
-#: Function returning the app provided or the default app if none.
-#:
-#: The environment variable :envvar:`CELERY_TRACE_APP` is used to
-#: trace app leaks.  When enabled an exception is raised if there
-#: is no active app.
-app_or_default = None
-
-#: Function used to push a task to the thread local stack
-#: keeping track of the currently executing task.
-#: You must remember to pop the task after.
-push_current_task = _task_stack.push
-
-#: Function used to pop a task from the thread local stack
-#: keeping track of the currently executing task.
-pop_current_task = _task_stack.pop
-
 
 def bugreport(app=None):
     """Return information useful in bug reports."""
-    return (app or current_app()).bugreport()
-
-
-def _app_or_default(app=None):
-    if app is None:
-        return _state.get_current_app()
-    return app
-
-
-def _app_or_default_trace(app=None):  # pragma: no cover
-    from traceback import print_stack
-    try:
-        from billiard.process import current_process
-    except ImportError:
-        current_process = None
-    if app is None:
-        if getattr(_state._tls, 'current_app', None):
-            print('-- RETURNING TO CURRENT APP --')  # noqa+
-            print_stack()
-            return _state._tls.current_app
-        if not current_process or current_process()._name == 'MainProcess':
-            raise Exception('DEFAULT APP')
-        print('-- RETURNING TO DEFAULT APP --')      # noqa+
-        print_stack()
-        return _state.default_app
-    return app
-
-
-def enable_trace():
-    """Enable tracing of app instances."""
-    global app_or_default
-    app_or_default = _app_or_default_trace
-
-
-def disable_trace():
-    """Disable tracing of app instances."""
-    global app_or_default
-    app_or_default = _app_or_default
-
-if os.environ.get('CELERY_TRACE_APP'):  # pragma: no cover
-    enable_trace()
-else:
-    disable_trace()
+    return (app or _state.get_current_app()).bugreport()
 
 
 def shared_task(*args, **kwargs):
@@ -115,13 +53,13 @@ def shared_task(*args, **kwargs):
         def __inner(fun):
             name = options.get('name')
             # Set as shared task so that unfinalized apps,
-            # and future apps will load the task.
-            connect_on_app_finalize(
+            # and future apps will register a copy of this task.
+            _state.connect_on_app_finalize(
                 lambda app: app._task_from_fun(fun, **options)
             )
 
             # Force all finalized apps to take this task as well.
-            for app in _get_active_apps():
+            for app in _state._get_active_apps():
                 if app.finalized:
                     with app._finalize_mutex:
                         app._task_from_fun(fun, **options)
@@ -129,7 +67,7 @@ def shared_task(*args, **kwargs):
             # Return a proxy that always gets the task from the current
             # apps task registry.
             def task_by_cons():
-                app = current_app()
+                app = _state.get_current_app()
                 return app.tasks[
                     name or app.gen_task_name(fun.__name__, fun.__module__)
                 ]

+ 3 - 1
celery/app/base.py

@@ -228,7 +228,8 @@ class Celery:
                  amqp=None, events=None, log=None, control=None,
                  set_as_current=True, tasks=None, broker=None, include=None,
                  changes=None, config_source=None, fixups=None, task_cls=None,
-                 autofinalize=True, namespace=None, **kwargs):
+                 autofinalize=True, namespace=None, strict_typing=True,
+                 **kwargs):
         self.clock = LamportClock()
         self.main = main
         self.amqp_cls = amqp or self.amqp_cls
@@ -243,6 +244,7 @@ class Celery:
         self.steps = defaultdict(set)
         self.autofinalize = autofinalize
         self.namespace = namespace
+        self.strict_typing = strict_typing
 
         self.configured = False
         self._config_source = config_source

+ 0 - 1
celery/app/builtins.py

@@ -7,7 +7,6 @@ from celery._state import connect_on_app_finalize
 from celery.utils.log import get_logger
 
 __all__ = []
-
 logger = get_logger(__name__)
 
 

+ 0 - 3
celery/app/control.py

@@ -5,13 +5,10 @@ Client for worker remote control commands.
 Server implementation is in :mod:`celery.worker.control`.
 """
 import warnings
-
 from billiard.common import TERM_SIGNAME
-
 from kombu.pidbox import Mailbox
 from kombu.utils.functional import lazy
 from kombu.utils.objects import cached_property
-
 from celery.exceptions import DuplicateNodenameWarning
 from celery.utils.text import pluralize
 

+ 3 - 3
celery/app/defaults.py

@@ -94,7 +94,7 @@ NAMESPACES = Namespace(
         connection_retry=Option(True, type='bool'),
         connection_max_retries=Option(100, type='int'),
         failover_strategy=Option(None, type='string'),
-        heartbeat=Option(None, type='int'),
+        heartbeat=Option(120, type='int'),
         heartbeat_checkrate=Option(3.0, type='int'),
         login_method=Option(None, type='string'),
         pool_limit=Option(10, type='int'),
@@ -185,8 +185,8 @@ NAMESPACES = Namespace(
         cert_store=Option(type='string'),
         key=Option(type='string'),
     ),
-    sqlalchemy=Namespace(
-        dburi=Option(old={'celery_result_dburi'}),
+    database=Namespace(
+        url=Option(old={'celery_result_dburi'}),
         engine_options=Option(
             type='dict', old={'celery_result_engine_options'},
         ),

+ 0 - 2
celery/app/registry.py

@@ -1,9 +1,7 @@
 # -*- coding: utf-8 -*-
 """Registry of available tasks."""
 import inspect
-
 from importlib import import_module
-
 from celery._state import get_current_app
 from celery.exceptions import NotRegistered
 

+ 0 - 3
celery/app/routes.py

@@ -5,11 +5,8 @@ Contains utilities for working with task routers, (:setting:`task_routes`).
 """
 import re
 import string
-
 from collections import Mapping, OrderedDict
-
 from kombu import Queue
-
 from celery.exceptions import QueueNotFound
 from celery.utils.collections import lpmerge
 from celery.utils.functional import maybe_evaluate, mlazy

+ 26 - 9
celery/app/task.py

@@ -161,6 +161,12 @@ class Task:
     #: Name of the task.
     name = None
 
+    #: Enable argument checking.
+    #: You can set this to false if you don't want the signature to be
+    #: checked when calling the task.
+    #: Defaults to :attr:`app.strict_typing <@Celery.strict_typing>`.
+    typing = None
+
     #: Maximum number of retries before giving up.  If set to :const:`None`,
     #: it will **never** stop retrying.
     max_retries = 3
@@ -300,6 +306,9 @@ class Task:
         conf = app.conf
         cls._exec_options = None  # clear option cache
 
+        if cls.typing is None:
+            cls.typing = app.strict_typing
+
         for attr_name, config_name in cls.from_config:
             if getattr(cls, attr_name, None) is None:
                 setattr(cls, attr_name, conf[config_name])
@@ -479,18 +488,26 @@ class Task:
             headers (Dict): Message headers to be included in the message.
 
         Returns:
-            ~@AsyncResult: Future promise.
+            ~@AsyncResult: Promise of future evaluation.
+
+        Raises:
+            TypeError: If not enough arguments are passed, or too many
+                arguments are passed.  Note that signature checks may
+                be disabled by specifying ``@task(typing=False)``.
+            kombu.exceptions.OperationalError: If a connection to the
+               transport cannot be made, or if the connection is lost.
 
         Note:
             Also supports all keyword arguments supported by
             :meth:`kombu.Producer.publish`.
         """
-        try:
-            check_arguments = self.__header__
-        except AttributeError:  # pragma: no cover
-            pass
-        else:
-            check_arguments(*(args or ()), **(kwargs or {}))
+        if self.typing:
+            try:
+                check_arguments = self.__header__
+            except AttributeError:  # pragma: no cover
+                pass
+            else:
+                check_arguments(*(args or ()), **(kwargs or {}))
 
         app = self._get_app()
         if app.conf.task_always_eager:
@@ -776,7 +793,7 @@ class Task:
                 :setting:`task_publish_retry` setting.
             retry_policy (Mapping): Retry settings.  Default is taken
                 from the :setting:`task_publish_retry_policy` setting.
-            **fields (**Any): Map containing information about the event.
+            **fields (Any): Map containing information about the event.
                 Must be JSON serializable.
         """
         req = self.request
@@ -817,7 +834,7 @@ class Task:
 
         if self.request.chain:
             for t in self.request.chain:
-                sig |= signature(t)
+                sig |= signature(t, app=self.app)
 
         sig.freeze(self.request.id,
                    group_id=self.request.group,

+ 3 - 3
celery/app/trace.py

@@ -31,7 +31,6 @@ from kombu.utils.encoding import safe_repr, safe_str
 from celery import current_app, group
 from celery import states, signals
 from celery._state import _task_stack
-from celery.app import set_default_app
 from celery.app.task import Task as BaseTask, Context
 from celery.exceptions import Ignore, Reject, Retry, InvalidTaskError
 from celery.utils.log import get_logger
@@ -418,7 +417,8 @@ def build_tracer(name, task, loader=None, hostname=None, store_errors=True,
                         # execute first task in chain
                         chain = task_request.chain
                         if chain:
-                            signature(chain.pop(), app=app).apply_async(
+                            _chsig = signature(chain.pop(), app=app)
+                            _chsig.apply_async(
                                 (retval,), chain=chain,
                                 parent_id=uuid, root_id=root_id,
                             )
@@ -561,7 +561,7 @@ def setup_worker_optimizations(app, hostname=None):
     # and means that only a single app can be used for workers
     # running in the same process.
     app.set_current()
-    set_default_app(app)
+    app.set_default()
 
     # evaluate all task classes by finalizing the app.
     app.finalize()

+ 2 - 1
celery/app/utils.py

@@ -162,10 +162,11 @@ class Settings(ConfigurationView):
 
     def table(self, with_defaults=False, censored=True):
         filt = filter_hidden_settings if censored else lambda v: v
+        dict_members = dir(dict)
         return filt({
             k: v for k, v in (
                 self if with_defaults else self.without_defaults()).items()
-            if not k.startswith('_')
+            if not k.startswith('_') and k not in dict_members
         })
 
     def humanize(self, with_defaults=False, censored=True):

+ 0 - 2
celery/apps/beat.py

@@ -10,9 +10,7 @@ and so on.
 import numbers
 import socket
 import sys
-
 from datetime import datetime
-
 from celery import VERSION_BANNER, platforms, beat
 from celery.utils.imports import qualname
 from celery.utils.log import LOG_LEVELS, get_logger

+ 8 - 3
celery/backends/base.py

@@ -23,7 +23,7 @@ from kombu.utils.url import maybe_sanitize_url
 
 from celery import states
 from celery import current_app, group, maybe_signature
-from celery.app import current_task
+from celery._state import get_current_task
 from celery.exceptions import (
     ChordError, TimeoutError, TaskRevokedError, ImproperlyConfigured,
 )
@@ -411,15 +411,19 @@ class Backend:
             (group_id, body,), kwargs, countdown=countdown,
         )
 
+    def ensure_chords_allowed(self):
+        pass
+
     def apply_chord(self, header, partial_args, group_id, body,
                     options={}, **kwargs):
+        self.ensure_chords_allowed()
         fixed_options = {k: v for k, v in options.items() if k != 'task_id'}
         result = header(*partial_args, task_id=group_id, **fixed_options or {})
         self.fallback_chord_unlock(group_id, body, **kwargs)
         return result
 
     def current_task_children(self, request=None):
-        request = request or getattr(current_task(), 'request', None)
+        request = request or getattr(get_current_task(), 'request', None)
         if request:
             return [r.as_tuple() for r in getattr(request, 'children', [])]
 
@@ -671,6 +675,7 @@ class BaseKeyValueStoreBackend(Backend):
 
     def _apply_chord_incr(self, header, partial_args, group_id, body,
                           result=None, options={}, **kwargs):
+        self.ensure_chords_allowed()
         self.save_group(group_id, self.app.GroupResult(group_id, result))
 
         fixed_options = {k: v for k, v in options.items() if k != 'task_id'}
@@ -754,7 +759,7 @@ class DisabledBackend(BaseBackend):
     def store_result(self, *args, **kwargs):
         pass
 
-    def apply_chord(self, *args, **kwargs):
+    def ensure_chords_allowed(self):
         raise NotImplementedError(E_CHORD_NO_BACKEND.strip())
 
     def _is_disabled(self, *args, **kwargs):

+ 0 - 2
celery/backends/cache.py

@@ -2,10 +2,8 @@
 """Memcached and in-memory cache result backend."""
 from kombu.utils.encoding import bytes_to_str, ensure_bytes
 from kombu.utils.objects import cached_property
-
 from celery.exceptions import ImproperlyConfigured
 from celery.utils.functional import LRUCache
-
 from .base import KeyValueStoreBackend
 
 __all__ = ['CacheBackend']

+ 0 - 2
celery/backends/cassandra.py

@@ -3,9 +3,7 @@
 from celery import states
 from celery.exceptions import ImproperlyConfigured
 from celery.utils.log import get_logger
-
 from .base import BaseBackend
-
 try:  # pragma: no cover
     import cassandra
     import cassandra.auth

+ 0 - 2
celery/backends/consul.py

@@ -5,11 +5,9 @@
     in the key-value store of Consul.
 """
 from kombu.utils.url import parse_url
-
 from celery.exceptions import ImproperlyConfigured
 from celery.backends.base import KeyValueStoreBackend
 from celery.utils.log import get_logger
-
 try:
     import consul
 except ImportError:

+ 4 - 8
celery/backends/couchbase.py

@@ -1,7 +1,10 @@
 # -*- coding: utf-8 -*-
 """Couchbase result store backend."""
 import logging
-
+from kombu.utils.encoding import str_t
+from kombu.utils.url import _parse_url
+from celery.exceptions import ImproperlyConfigured
+from .base import KeyValueStoreBackend
 try:
     from couchbase import Couchbase
     from couchbase.connection import Connection
@@ -9,13 +12,6 @@ try:
 except ImportError:
     Couchbase = Connection = NotFoundError = None   # noqa
 
-from kombu.utils.encoding import str_t
-from kombu.utils.url import _parse_url
-
-from celery.exceptions import ImproperlyConfigured
-
-from .base import KeyValueStoreBackend
-
 __all__ = ['CouchbaseBackend']
 
 

+ 0 - 3
celery/backends/couchdb.py

@@ -1,11 +1,8 @@
 # -*- coding: utf-8 -*-
 """CouchDB result store backend."""
 from kombu.utils.url import _parse_url
-
 from celery.exceptions import ImproperlyConfigured
-
 from .base import KeyValueStoreBackend
-
 try:
     import pycouchdb
 except ImportError:

+ 5 - 5
celery/backends/database/__init__.py

@@ -72,22 +72,22 @@ class DatabaseBackend(BaseBackend):
         super().__init__(
             expires_type=maybe_timedelta, url=url, **kwargs)
         conf = self.app.conf
-        self.url = url or dburi or conf.sqlalchemy_dburi
+        self.url = url or dburi or conf.database_url
         self.engine_options = dict(
             engine_options or {},
-            **conf.sqlalchemy_engine_options or {})
+            **conf.database_engine_options or {})
         self.short_lived_sessions = kwargs.get(
             'short_lived_sessions',
-            conf.sqlalchemy_short_lived_sessions)
+            conf.database_short_lived_sessions)
 
-        tablenames = conf.sqlalchemy_table_names or {}
+        tablenames = conf.database_table_names or {}
         Task.__table__.name = tablenames.get('task', 'celery_taskmeta')
         TaskSet.__table__.name = tablenames.get('group', 'celery_tasksetmeta')
 
         if not self.url:
             raise ImproperlyConfigured(
                 'Missing connection string! Do you have the'
-                ' sqlalchemy_dburi setting set to a real value?')
+                ' database_url setting set to a real value?')
 
     def ResultSession(self, session_manager=SessionManager()):
         return session_manager.session_factory(

+ 1 - 4
celery/backends/database/models.py

@@ -1,12 +1,9 @@
 # -*- coding: utf-8 -*-
 """Database models used by the SQLAlchemy result store backend."""
-from datetime import datetime
-
 import sqlalchemy as sa
+from datetime import datetime
 from sqlalchemy.types import PickleType
-
 from celery import states
-
 from .session import ResultModelBase
 
 __all__ = ['Task', 'TaskSet']

+ 0 - 1
celery/backends/database/session.py

@@ -4,7 +4,6 @@ from sqlalchemy import create_engine
 from sqlalchemy.ext.declarative import declarative_base
 from sqlalchemy.orm import sessionmaker
 from sqlalchemy.pool import NullPool
-
 from kombu.utils.compat import register_after_fork
 
 ResultModelBase = declarative_base()

+ 0 - 4
celery/backends/elasticsearch.py

@@ -1,13 +1,9 @@
 # -* coding: utf-8 -*-
 """Elasticsearch result store backend."""
 from datetime import datetime
-
 from kombu.utils.url import _parse_url
-
 from celery.exceptions import ImproperlyConfigured
-
 from .base import KeyValueStoreBackend
-
 try:
     import elasticsearch
 except ImportError:

+ 4 - 5
celery/backends/filesystem.py

@@ -1,15 +1,12 @@
 # -*- coding: utf-8 -*-
 """File-system result store backend."""
+import os
+import locale
 from kombu.utils.encoding import ensure_bytes
-
 from celery import uuid
 from celery.exceptions import ImproperlyConfigured
 from celery.backends.base import KeyValueStoreBackend
 
-import os
-import locale
-default_encoding = locale.getpreferredencoding(False)
-
 # Python 2 does not have FileNotFoundError and IsADirectoryError
 try:
     FileNotFoundError
@@ -17,6 +14,8 @@ except NameError:
     FileNotFoundError = IOError
     IsADirectoryError = IOError
 
+default_encoding = locale.getpreferredencoding(False)
+
 E_PATH_INVALID = """\
 The configured path for the file-system backend does not
 work correctly, please make sure that it exists and has

+ 0 - 3
celery/backends/mongodb.py

@@ -1,14 +1,11 @@
 # -*- coding: utf-8 -*-
 """MongoDB result store backend."""
 from datetime import datetime, timedelta
-
 from kombu.utils.objects import cached_property
 from kombu.utils.url import maybe_sanitize_url
 from kombu.exceptions import EncodeError
-
 from celery import states
 from celery.exceptions import ImproperlyConfigured
-
 from .base import BaseBackend
 
 try:

+ 3 - 6
celery/backends/riak.py

@@ -1,5 +1,8 @@
 # -*- coding: utf-8 -*-
 """Riak result store backend."""
+from kombu.utils.url import _parse_url
+from celery.exceptions import ImproperlyConfigured
+from .base import KeyValueStoreBackend
 try:
     import riak
     from riak import RiakClient
@@ -7,12 +10,6 @@ try:
 except ImportError:  # pragma: no cover
     riak = RiakClient = last_written_resolver = None  # noqa
 
-from kombu.utils.url import _parse_url
-
-from celery.exceptions import ImproperlyConfigured
-
-from .base import KeyValueStoreBackend
-
 __all__ = ['RiakBackend']
 
 E_BUCKET_NAME = """\

+ 13 - 2
celery/backends/rpc.py

@@ -10,15 +10,23 @@ from kombu.common import maybe_declare
 from kombu.utils.compat import register_after_fork
 from kombu.utils.objects import cached_property
 
-from celery import current_task
 from celery import states
-from celery._state import task_join_will_block
+from celery._state import current_task, task_join_will_block
 
 from . import base
 from .async import AsyncBackendMixin, BaseResultConsumer
 
 __all__ = ['BacklogLimitExceeded', 'RPCBackend']
 
+E_NO_CHORD_SUPPORT = """
+The "rpc" result backend does not support chords!
+
+Note that a group chained with a task is also upgraded to be a chord,
+as this pattern requires synchronization.
+
+Result backends that supports chords: Redis, Database, Memcached, and more.
+"""
+
 
 class BacklogLimitExceeded(Exception):
     """Too much state history to fast-forward."""
@@ -145,6 +153,9 @@ class RPCBackend(base.Backend, AsyncBackendMixin):
         # RPC backend caches the binding, as one queue is used for all tasks.
         return self.binding
 
+    def ensure_chords_allowed(self):
+        raise NotImplementedError(E_NO_CHORD_SUPPORT.strip())
+
     def on_task_call(self, producer, task_id):
         # Called every time a task is sent when using this backend.
         # We declare the queue we receive replies on in advance of sending

+ 13 - 13
celery/beat.py

@@ -430,24 +430,24 @@ class PersistentScheduler(Scheduler):
         self._create_schedule()
 
         tz = self.app.conf.timezone
-        stored_tz = self._store.get(str(b'tz'))
+        stored_tz = self._store.get(str('tz'))
         if stored_tz is not None and stored_tz != tz:
             warning('Reset: Timezone changed from %r to %r', stored_tz, tz)
             self._store.clear()   # Timezone changed, reset db!
         utc = self.app.conf.enable_utc
-        stored_utc = self._store.get(str(b'utc_enabled'))
+        stored_utc = self._store.get(str('utc_enabled'))
         if stored_utc is not None and stored_utc != utc:
             choices = {True: 'enabled', False: 'disabled'}
             warning('Reset: UTC changed from %s to %s',
                     choices[stored_utc], choices[utc])
             self._store.clear()   # UTC setting changed, reset db!
-        entries = self._store.setdefault(str(b'entries'), {})
+        entries = self._store.setdefault(str('entries'), {})
         self.merge_inplace(self.app.conf.beat_schedule)
         self.install_default_entries(self.schedule)
         self._store.update({
-            str(b'__version__'): __version__,
-            str(b'tz'): tz,
-            str(b'utc_enabled'): utc,
+            str('__version__'): __version__,
+            str('tz'): tz,
+            str('utc_enabled'): utc,
         })
         self.sync()
         debug('Current schedule:\n' + '\n'.join(
@@ -456,31 +456,31 @@ class PersistentScheduler(Scheduler):
     def _create_schedule(self):
         for _ in (1, 2):
             try:
-                self._store[str(b'entries')]
+                self._store[str('entries')]
             except KeyError:
                 # new schedule db
                 try:
-                    self._store[str(b'entries')] = {}
+                    self._store[str('entries')] = {}
                 except KeyError as exc:
                     self._store = self._destroy_open_corrupted_schedule(exc)
                     continue
             else:
-                if str(b'__version__') not in self._store:
+                if str('__version__') not in self._store:
                     warning('DB Reset: Account for new __version__ field')
                     self._store.clear()   # remove schedule at 2.2.2 upgrade.
-                elif str(b'tz') not in self._store:
+                elif str('tz') not in self._store:
                     warning('DB Reset: Account for new tz field')
                     self._store.clear()   # remove schedule at 3.0.8 upgrade
-                elif str(b'utc_enabled') not in self._store:
+                elif str('utc_enabled') not in self._store:
                     warning('DB Reset: Account for new utc_enabled field')
                     self._store.clear()   # remove schedule at 3.0.9 upgrade
             break
 
     def get_schedule(self):
-        return self._store[str(b'entries')]
+        return self._store[str('entries')]
 
     def set_schedule(self, schedule):
-        self._store[str(b'entries')] = schedule
+        self._store[str('entries')] = schedule
     schedule = property(get_schedule, set_schedule)
 
     def sync(self):

+ 0 - 2
celery/bin/beat.py

@@ -65,9 +65,7 @@
     Executable to use for the detached process.
 """
 from functools import partial
-
 from celery.platforms import detached, maybe_drop_privileges
-
 from celery.bin.base import Command, daemon_options
 
 __all__ = ['beat']

+ 78 - 0
celery/bin/call.py

@@ -0,0 +1,78 @@
+"""The ``celery call`` program used to send tasks from the command-line."""
+from __future__ import absolute_import, unicode_literals
+from kombu.utils.json import loads
+from celery.bin.base import Command
+from celery.five import string_t
+from celery.utils.time import maybe_iso8601
+
+
+class call(Command):
+    """Call a task by name.
+
+    Examples:
+        .. code-block:: console
+
+            $ celery call tasks.add --args='[2, 2]'
+            $ celery call tasks.add --args='[2, 2]' --countdown=10
+    """
+
+    args = '<task_name>'
+
+    def add_arguments(self, parser):
+        group = parser.add_argument_group('Calling Options')
+        group.add_argument('--args', '-a',
+                           help='positional arguments (json).')
+        group.add_argument('--kwargs', '-k',
+                           help='keyword arguments (json).')
+        group.add_argument('--eta',
+                           help='scheduled time (ISO-8601).')
+        group.add_argument(
+            '--countdown', type=float,
+            help='eta in seconds from now (float/int).',
+        )
+        group.add_argument(
+            '--expires',
+            help='expiry time (ISO-8601/float/int).',
+        ),
+        group.add_argument(
+            '--serializer', default='json',
+            help='defaults to json.'),
+
+        ropts = parser.add_argument_group('Routing Options')
+        ropts.add_argument('--queue', help='custom queue name.')
+        ropts.add_argument('--exchange', help='custom exchange name.')
+        ropts.add_argument('--routing-key', help='custom routing key.')
+
+    def run(self, name, *_, **kwargs):
+        self._send_task(name, **kwargs)
+
+    def _send_task(self, name, args=None, kwargs=None,
+                   countdown=None, serializer=None,
+                   queue=None, exchange=None, routing_key=None,
+                   eta=None, expires=None, **_):
+        # arguments
+        args = loads(args) if isinstance(args, string_t) else args
+        kwargs = loads(kwargs) if isinstance(kwargs, string_t) else kwargs
+
+        # Expires can be int/float.
+        try:
+            expires = float(expires)
+        except (TypeError, ValueError):
+            # or a string describing an ISO 8601 datetime.
+            try:
+                expires = maybe_iso8601(expires)
+            except (TypeError, ValueError):
+                raise
+
+        # send the task and print the id.
+        self.out(self.app.send_task(
+            name,
+            args=args or (), kwargs=kwargs or {},
+            countdown=countdown,
+            serializer=serializer,
+            queue=queue,
+            exchange=exchange,
+            routing_key=routing_key,
+            eta=maybe_iso8601(eta),
+            expires=expires,
+        ).id)

+ 10 - 741
celery/bin/celery.py

@@ -190,7 +190,6 @@ in any command that also has a `--detach` option.
 
     Use :pypi:`gevent` monkey patches.
 
-
 ``celery result``
 -----------------
 
@@ -254,24 +253,14 @@ in any command that also has a `--detach` option.
 
     Destination routing key (defaults to the queue routing key).
 """
-import codecs
 import numbers
-import os
 import sys
 
 from functools import partial
-from importlib import import_module
-
-from kombu.utils.json import dumps, loads
-from kombu.utils.objects import cached_property
 
-from celery.app import defaults
-from celery.platforms import EX_OK, EX_FAILURE, EX_UNAVAILABLE, EX_USAGE
+from celery.platforms import EX_OK, EX_FAILURE, EX_USAGE
 from celery.utils import term
 from celery.utils import text
-from celery.utils.functional import pass1
-from celery.utils.text import str_to_list
-from celery.utils.time import maybe_iso8601
 
 # Cannot use relative imports here due to a Windows issue (#1111).
 from celery.bin.base import Command, Extensions
@@ -279,10 +268,19 @@ from celery.bin.base import Command, Extensions
 # Import commands from other modules
 from celery.bin.amqp import amqp
 from celery.bin.beat import beat
+from celery.bin.call import call
+from celery.bin.control import _RemoteControl  # noqa
+from celery.bin.control import control, inspect, status
 from celery.bin.events import events
 from celery.bin.graph import graph
+from celery.bin.list import list_
 from celery.bin.logtool import logtool
+from celery.bin.migrate import migrate
+from celery.bin.purge import purge
+from celery.bin.result import result
+from celery.bin.shell import shell
 from celery.bin.worker import worker
+from celery.bin.upgrade import upgrade
 
 __all__ = ['CeleryCommand', 'main']
 
@@ -295,11 +293,6 @@ HELP = """
 Type '{prog_name} <command> --help' for help using a specific command.
 """
 
-MIGRATE_PROGRESS_FMT = """\
-Migrating task {state.count}/{state.strtotal}: \
-{body[task]}[{body[id]}]\
-"""
-
 command_classes = [
     ('Main', ['worker', 'events', 'beat', 'shell', 'multi', 'amqp'], 'green'),
     ('Remote Control', ['status', 'inspect', 'control'], 'blue'),
@@ -344,730 +337,6 @@ class multi(Command):
         return cmd.execute_from_commandline([command] + argv)
 
 
-class list_(Command):
-    """Get info from broker.
-
-    Note:
-       For RabbitMQ the management plugin is required.
-
-    Example:
-        .. code-block:: console
-
-            $ celery list bindings
-    """
-
-    args = '[bindings]'
-
-    def list_bindings(self, management):
-        try:
-            bindings = management.get_bindings()
-        except NotImplementedError:
-            raise self.Error('Your transport cannot list bindings.')
-
-        def fmt(q, e, r):
-            return self.out('{0:<28} {1:<28} {2}'.format(q, e, r))
-        fmt('Queue', 'Exchange', 'Routing Key')
-        fmt('-' * 16, '-' * 16, '-' * 16)
-        for b in bindings:
-            fmt(b['destination'], b['source'], b['routing_key'])
-
-    def run(self, what=None, *_, **kw):
-        topics = {'bindings': self.list_bindings}
-        available = ', '.join(topics)
-        if not what:
-            raise self.UsageError(
-                'Missing argument, specify one of: {0}'.format(available))
-        if what not in topics:
-            raise self.UsageError(
-                'unknown topic {0!r} (choose one of: {1})'.format(
-                    what, available))
-        with self.app.connection() as conn:
-            self.app.amqp.TaskConsumer(conn).declare()
-            topics[what](conn.manager)
-
-
-class call(Command):
-    """Call a task by name.
-
-    Examples:
-        .. code-block:: console
-
-            $ celery call tasks.add --args='[2, 2]'
-            $ celery call tasks.add --args='[2, 2]' --countdown=10
-    """
-
-    args = '<task_name>'
-
-    def add_arguments(self, parser):
-        group = parser.add_argument_group('Calling Options')
-        group.add_argument('--args', '-a',
-                           help='positional arguments (json).')
-        group.add_argument('--kwargs', '-k',
-                           help='keyword arguments (json).')
-        group.add_argument('--eta',
-                           help='scheduled time (ISO-8601).')
-        group.add_argument(
-            '--countdown', type=float,
-            help='eta in seconds from now (float/int).',
-        )
-        group.add_argument(
-            '--expires',
-            help='expiry time (ISO-8601/float/int).',
-        ),
-        group.add_argument(
-            '--serializer', default='json',
-            help='defaults to json.'),
-
-        ropts = parser.add_argument_group('Routing Options')
-        ropts.add_argument('--queue', help='custom queue name.')
-        ropts.add_argument('--exchange', help='custom exchange name.')
-        ropts.add_argument('--routing-key', help='custom routing key.')
-
-    def run(self, name, *_, **kwargs):
-        self._send_task(name, **kwargs)
-
-    def _send_task(self, name, args=None, kwargs=None,
-                   countdown=None, serializer=None,
-                   queue=None, exchange=None, routing_key=None,
-                   eta=None, expires=None, **_):
-        # arguments
-        args = loads(args) if isinstance(args, str) else args
-        kwargs = loads(kwargs) if isinstance(kwargs, str) else kwargs
-
-        # expires can be int/float.
-        try:
-            expires = float(expires)
-        except (TypeError, ValueError):
-            # or a string describing an ISO 8601 datetime.
-            try:
-                expires = maybe_iso8601(expires)
-            except (TypeError, ValueError):
-                raise
-
-        # send the task and print the id.
-        self.out(self.app.send_task(
-            name,
-            args=args or (), kwargs=kwargs or {},
-            countdown=countdown,
-            serializer=serializer,
-            queue=queue,
-            exchange=exchange,
-            routing_key=routing_key,
-            eta=maybe_iso8601(eta),
-            expires=expires,
-        ).id)
-
-
-class purge(Command):
-    """Erase all messages from all known task queues.
-
-    Warning:
-        There's no undo operation for this command.
-    """
-
-    warn_prelude = (
-        '{warning}: This will remove all tasks from {queues}: {names}.\n'
-        '         There is no undo for this operation!\n\n'
-        '(to skip this prompt use the -f option)\n'
-    )
-    warn_prompt = 'Are you sure you want to delete all tasks'
-
-    fmt_purged = 'Purged {mnum} {messages} from {qnum} known task {queues}.'
-    fmt_empty = 'No messages purged from {qnum} {queues}'
-
-    def add_arguments(self, parser):
-        group = parser.add_argument_group('Purging Options')
-        group.add_argument(
-            '--force', '-f', action='store_true', default=False,
-            help="Don't prompt for verification",
-        )
-        group.add_argument(
-            '--queues', '-Q', default=[],
-            help='Comma separated list of queue names to purge.',
-        )
-        group.add_argument(
-            '--exclude-queues', '-X', default=[],
-            help='Comma separated list of queues names not to purge.',
-        )
-
-    def run(self, force=False, queues=None, exclude_queues=None, **kwargs):
-        queues = set(str_to_list(queues or []))
-        exclude = set(str_to_list(exclude_queues or []))
-        names = (queues or set(self.app.amqp.queues.keys())) - exclude
-        qnum = len(names)
-
-        messages = None
-        if names:
-            if not force:
-                self.out(self.warn_prelude.format(
-                    warning=self.colored.red('WARNING'),
-                    queues=text.pluralize(qnum, 'queue'),
-                    names=', '.join(sorted(names)),
-                ))
-                if self.ask(self.warn_prompt, ('yes', 'no'), 'no') != 'yes':
-                    return
-            with self.app.connection_for_write() as conn:
-                messages = sum(self._purge(conn, queue) for queue in names)
-        fmt = self.fmt_purged if messages else self.fmt_empty
-        self.out(fmt.format(
-            mnum=messages, qnum=qnum,
-            messages=text.pluralize(messages, 'message'),
-            queues=text.pluralize(qnum, 'queue')))
-
-    def _purge(self, conn, queue):
-        try:
-            return conn.default_channel.queue_purge(queue) or 0
-        except conn.channel_errors:
-            return 0
-
-
-class result(Command):
-    """Gives the return value for a given task id.
-
-    Examples:
-        .. code-block:: console
-
-            $ celery result 8f511516-e2f5-4da4-9d2f-0fb83a86e500
-            $ celery result 8f511516-e2f5-4da4-9d2f-0fb83a86e500 -t tasks.add
-            $ celery result 8f511516-e2f5-4da4-9d2f-0fb83a86e500 --traceback
-    """
-
-    args = '<task_id>'
-
-    def add_arguments(self, parser):
-        group = parser.add_argument_group('Result Options')
-        group.add_argument(
-            '--task', '-t', help='name of task (if custom backend)',
-        )
-        group.add_argument(
-            '--traceback', action='store_true', default=False,
-            help='show traceback instead',
-        )
-
-    def run(self, task_id, *args, **kwargs):
-        result_cls = self.app.AsyncResult
-        task = kwargs.get('task')
-        traceback = kwargs.get('traceback', False)
-
-        if task:
-            result_cls = self.app.tasks[task].AsyncResult
-        task_result = result_cls(task_id)
-        if traceback:
-            value = task_result.traceback
-        else:
-            value = task_result.get()
-        self.out(self.pretty(value)[1])
-
-
-class _RemoteControl(Command):
-
-    name = None
-    leaf = False
-    control_group = None
-
-    def __init__(self, *args, **kwargs):
-        self.show_body = kwargs.pop('show_body', True)
-        self.show_reply = kwargs.pop('show_reply', True)
-        super().__init__(*args, **kwargs)
-
-    def add_arguments(self, parser):
-        group = parser.add_argument_group('Remote Control Options')
-        group.add_argument(
-            '--timeout', '-t', type=float,
-            help='Timeout in seconds (float) waiting for reply',
-        )
-        group.add_argument(
-            '--destination', '-d',
-            help='Comma separated list of destination node names.')
-        group.add_argument(
-            '--json', '-j', action='store_true', default=False,
-            help='Use json as output format.',
-        )
-
-    @classmethod
-    def get_command_info(cls, command,
-                         indent=0, prefix='', color=None,
-                         help=False, app=None, choices=None):
-        if choices is None:
-            choices = cls._choices_by_group(app)
-        meta = choices[command]
-        if help:
-            help = '|' + text.indent(meta.help, indent + 4)
-        else:
-            help = None
-        return text.join([
-            '|' + text.indent('{0}{1} {2}'.format(
-                prefix, color(command), meta.signature or ''), indent),
-            help,
-        ])
-
-    @classmethod
-    def list_commands(cls, indent=0, prefix='',
-                      color=None, help=False, app=None):
-        choices = cls._choices_by_group(app)
-        color = color if color else lambda x: x
-        prefix = prefix + ' ' if prefix else ''
-        return '\n'.join(
-            cls.get_command_info(c, indent, prefix, color, help,
-                                 app=app, choices=choices)
-            for c in sorted(choices))
-
-    def usage(self, command):
-        return '%(prog)s {0} [options] {1} <command> [arg1 .. argN]'.format(
-            command, self.args)
-
-    def call(self, *args, **kwargs):
-        raise NotImplementedError('call')
-
-    def run(self, *args, **kwargs):
-        if not args:
-            raise self.UsageError(
-                'Missing {0.name} method.  See --help'.format(self))
-        return self.do_call_method(args, **kwargs)
-
-    def _ensure_fanout_supported(self):
-        with self.app.connection_for_write() as conn:
-            if not conn.supports_exchange_type('fanout'):
-                raise self.Error(
-                    'Broadcast not supported by transport {0!r}'.format(
-                        conn.info()['transport']))
-
-    def do_call_method(self, args,
-                       timeout=None, destination=None, json=False, **kwargs):
-        method = args[0]
-        if method == 'help':
-            raise self.Error("Did you mean '{0.name} --help'?".format(self))
-        try:
-            meta = self.choices[method]
-        except KeyError:
-            raise self.UsageError(
-                'Unknown {0.name} method {1}'.format(self, method))
-
-        self._ensure_fanout_supported()
-
-        timeout = timeout or meta.default_timeout
-        if destination and isinstance(destination, str):
-            destination = [dest.strip() for dest in destination.split(',')]
-
-        replies = self.call(
-            method,
-            arguments=self.compile_arguments(meta, method, args[1:]),
-            timeout=timeout,
-            destination=destination,
-            callback=None if json else self.say_remote_command_reply,
-        )
-        if not replies:
-            raise self.Error('No nodes replied within time constraint.',
-                             status=EX_UNAVAILABLE)
-        if json:
-            self.out(dumps(replies))
-        return replies
-
-    def compile_arguments(self, meta, method, args):
-        args = list(args)
-        kw = {}
-        if meta.args:
-            kw.update({
-                k: v for k, v in self._consume_args(meta, method, args)
-            })
-        if meta.variadic:
-            kw.update({meta.variadic: args})
-        if not kw and args:
-            raise self.Error(
-                'Command {0!r} takes no arguments.'.format(method),
-                status=EX_USAGE)
-        return kw or {}
-
-    def _consume_args(self, meta, method, args):
-        i = 0
-        try:
-            for i, arg in enumerate(args):
-                try:
-                    name, typ = meta.args[i]
-                except IndexError:
-                    if meta.variadic:
-                        break
-                    raise self.Error(
-                        'Command {0!r} takes arguments: {1}'.format(
-                            method, meta.signature),
-                        status=EX_USAGE)
-                else:
-                    yield name, typ(arg) if typ is not None else arg
-        finally:
-            args[:] = args[i:]
-
-    @classmethod
-    def _choices_by_group(cls, app):
-        from celery.worker.control import Panel
-        # need to import task modules for custom user-remote control commands.
-        app.loader.import_default_modules()
-
-        return {
-            name: info for name, info in Panel.meta.items()
-            if info.type == cls.control_group and info.visible
-        }
-
-    @cached_property
-    def choices(self):
-        return self._choices_by_group(self.app)
-
-    @property
-    def epilog(self):
-        return '\n'.join([
-            '[Commands]',
-            self.list_commands(indent=4, help=True, app=self.app)
-        ])
-
-
-class inspect(_RemoteControl):
-    """Inspect the worker at runtime.
-
-    Availability: RabbitMQ (AMQP) and Redis transports.
-
-    Examples:
-        .. code-block:: console
-
-            $ celery inspect active --timeout=5
-            $ celery inspect scheduled -d worker1@example.com
-            $ celery inspect revoked -d w1@e.com,w2@e.com
-    """
-
-    name = 'inspect'
-    control_group = 'inspect'
-
-    def call(self, method, arguments, **options):
-        return self.app.control.inspect(**options)._request(
-            method, **arguments)
-
-
-class control(_RemoteControl):
-    """Workers remote control.
-
-    Availability: RabbitMQ (AMQP), Redis, and MongoDB transports.
-
-    Examples:
-        .. code-block:: console
-
-            $ celery control enable_events --timeout=5
-            $ celery control -d worker1@example.com enable_events
-            $ celery control -d w1.e.com,w2.e.com enable_events
-
-            $ celery control -d w1.e.com add_consumer queue_name
-            $ celery control -d w1.e.com cancel_consumer queue_name
-
-            $ celery control add_consumer queue exchange direct rkey
-    """
-
-    name = 'control'
-    control_group = 'control'
-
-    def call(self, method, arguments, **options):
-        return self.app.control.broadcast(
-            method, arguments=arguments, reply=True, **options)
-
-
-class status(Command):
-    """Show list of workers that are online."""
-
-    option_list = inspect.option_list
-
-    def run(self, *args, **kwargs):
-        I = inspect(
-            app=self.app,
-            no_color=kwargs.get('no_color', False),
-            stdout=self.stdout, stderr=self.stderr,
-            show_reply=False, show_body=False, quiet=True,
-        )
-        replies = I.run('ping', **kwargs)
-        if not replies:
-            raise self.Error('No nodes replied within time constraint',
-                             status=EX_UNAVAILABLE)
-        nodecount = len(replies)
-        if not kwargs.get('quiet', False):
-            self.out('\n{0} {1} online.'.format(
-                nodecount, text.pluralize(nodecount, 'node')))
-
-
-class migrate(Command):
-    """Migrate tasks from one broker to another.
-
-    Warning:
-        This command is experimental, make sure you have a backup of
-        the tasks before you continue.
-
-    Example:
-        .. code-block:: console
-
-            $ celery migrate amqp://A.example.com amqp://guest@B.example.com//
-            $ celery migrate redis://localhost amqp://guest@localhost//
-    """
-
-    args = '<source_url> <dest_url>'
-    progress_fmt = MIGRATE_PROGRESS_FMT
-
-    def add_arguments(self, parser):
-        group = parser.add_argument_group('Migration Options')
-        group.add_argument(
-            '--limit', '-n', type=int,
-            help='Number of tasks to consume (int)',
-        )
-        group.add_argument(
-            '--timeout', '-t', type=float, default=1.0,
-            help='Timeout in seconds (float) waiting for tasks',
-        )
-        group.add_argument(
-            '--ack-messages', '-a', action='store_true', default=False,
-            help='Ack messages from source broker.',
-        )
-        group.add_argument(
-            '--tasks', '-T',
-            help='List of task names to filter on.',
-        )
-        group.add_argument(
-            '--queues', '-Q',
-            help='List of queues to migrate.',
-        )
-        group.add_argument(
-            '--forever', '-F', action='store_true', default=False,
-            help='Continually migrate tasks until killed.',
-        )
-
-    def on_migrate_task(self, state, body, message):
-        self.out(self.progress_fmt.format(state=state, body=body))
-
-    def run(self, source, destination, **kwargs):
-        from kombu import Connection
-        from celery.contrib.migrate import migrate_tasks
-
-        migrate_tasks(Connection(source),
-                      Connection(destination),
-                      callback=self.on_migrate_task,
-                      **kwargs)
-
-
-class shell(Command):  # pragma: no cover
-    """Start shell session with convenient access to celery symbols.
-
-    The following symbols will be added to the main globals:
-
-        - ``celery``:  the current application.
-        - ``chord``, ``group``, ``chain``, ``chunks``,
-          ``xmap``, ``xstarmap`` ``subtask``, ``Task``
-        - all registered tasks.
-    """
-
-    def add_arguments(self, parser):
-        group = parser.add_argument_group('Shell Options')
-        group.add_argument(
-            '--ipython', '-I',
-            action='store_true', help='force iPython.', default=False,
-        )
-        group.add_argument(
-            '--bpython', '-B',
-            action='store_true', help='force bpython.', default=False,
-        )
-        group.add_argument(
-            '--python',
-            action='store_true', default=False,
-            help='force default Python shell.',
-        )
-        group.add_argument(
-            '--without-tasks', '-T',
-            action='store_true', default=False,
-            help="don't add tasks to locals.",
-        )
-        group.add_argument(
-            '--eventlet',
-            action='store_true', default=False,
-            help='use eventlet.',
-        )
-        group.add_argument(
-            '--gevent', action='store_true', default=False,
-            help='use gevent.',
-        )
-
-    def run(self, *args, **kwargs):
-        if args:
-            raise self.UsageError(
-                'shell command does not take arguments: {0}'.format(args))
-        return self._run(**kwargs)
-
-    def _run(self, ipython=False, bpython=False,
-             python=False, without_tasks=False, eventlet=False,
-             gevent=False, **kwargs):
-        sys.path.insert(0, os.getcwd())
-        if eventlet:
-            import_module('celery.concurrency.eventlet')
-        if gevent:
-            import_module('celery.concurrency.gevent')
-        import celery
-        import celery.task.base
-        self.app.loader.import_default_modules()
-
-        # pylint: disable=attribute-defined-outside-init
-        self.locals = {
-            'app': self.app,
-            'celery': self.app,
-            'Task': celery.Task,
-            'chord': celery.chord,
-            'group': celery.group,
-            'chain': celery.chain,
-            'chunks': celery.chunks,
-            'xmap': celery.xmap,
-            'xstarmap': celery.xstarmap,
-            'subtask': celery.subtask,
-            'signature': celery.signature,
-        }
-
-        if not without_tasks:
-            self.locals.update({
-                task.__name__: task for task in self.app.tasks.values()
-                if not task.name.startswith('celery.')
-            })
-
-        if python:
-            return self.invoke_fallback_shell()
-        elif bpython:
-            return self.invoke_bpython_shell()
-        elif ipython:
-            return self.invoke_ipython_shell()
-        return self.invoke_default_shell()
-
-    def invoke_default_shell(self):
-        try:
-            import IPython  # noqa
-        except ImportError:
-            try:
-                import bpython  # noqa
-            except ImportError:
-                return self.invoke_fallback_shell()
-            else:
-                return self.invoke_bpython_shell()
-        else:
-            return self.invoke_ipython_shell()
-
-    def invoke_fallback_shell(self):
-        import code
-        try:
-            import readline
-        except ImportError:
-            pass
-        else:
-            import rlcompleter
-            readline.set_completer(
-                rlcompleter.Completer(self.locals).complete)
-            readline.parse_and_bind('tab:complete')
-        code.interact(local=self.locals)
-
-    def invoke_ipython_shell(self):
-        for ip in (self._ipython, self._ipython_pre_10,
-                   self._ipython_terminal, self._ipython_010,
-                   self._no_ipython):
-            try:
-                return ip()
-            except ImportError:
-                pass
-
-    def _ipython(self):
-        from IPython import start_ipython
-        start_ipython(argv=[], user_ns=self.locals)
-
-    def _ipython_pre_10(self):  # pragma: no cover
-        from IPython.frontend.terminal.ipapp import TerminalIPythonApp
-        app = TerminalIPythonApp.instance()
-        app.initialize(argv=[])
-        app.shell.user_ns.update(self.locals)
-        app.start()
-
-    def _ipython_terminal(self):  # pragma: no cover
-        from IPython.terminal import embed
-        embed.TerminalInteractiveShell(user_ns=self.locals).mainloop()
-
-    def _ipython_010(self):  # pragma: no cover
-        from IPython.Shell import IPShell
-        IPShell(argv=[], user_ns=self.locals).mainloop()
-
-    def _no_ipython(self):  # pragma: no cover
-        raise ImportError('no suitable ipython found')
-
-    def invoke_bpython_shell(self):
-        import bpython
-        bpython.embed(self.locals)
-
-
-class upgrade(Command):
-    """Perform upgrade between versions."""
-
-    choices = {'settings'}
-
-    def add_arguments(self, parser):
-        group = parser.add_argument_group('Upgrading Options')
-        group.add_argument(
-            '--django', action='store_true', default=False,
-            help='Upgrade Django project',
-        )
-        group.add_argument(
-            '--compat', action='store_true', default=False,
-            help='Maintain backwards compatibility',
-        )
-        group.add_argument(
-            '--no-backup', action='store_true', default=False,
-            help='Dont backup original files',
-        )
-
-    def usage(self, command):
-        return '%(prog)s <command> settings [filename] [options]'
-
-    def run(self, *args, **kwargs):
-        try:
-            command = args[0]
-        except IndexError:
-            raise self.UsageError('missing upgrade type')
-        if command not in self.choices:
-            raise self.UsageError('unknown upgrade type: {0}'.format(command))
-        return getattr(self, command)(*args, **kwargs)
-
-    def settings(self, command, filename,
-                 no_backup=False, django=False, compat=False, **kwargs):
-        lines = self._slurp(filename) if no_backup else self._backup(filename)
-        keyfilter = self._compat_key if django or compat else pass1
-        print('processing {0}...'.format(filename), file=self.stderr)
-        with codecs.open(filename, 'w', 'utf-8') as write_fh:
-            for line in lines:
-                write_fh.write(self._to_new_key(line, keyfilter))
-
-    def _slurp(self, filename):
-        with codecs.open(filename, 'r', 'utf-8') as read_fh:
-            return [line for line in read_fh]
-
-    def _backup(self, filename, suffix='.orig'):
-        lines = []
-        backup_filename = ''.join([filename, suffix])
-        print('writing backup to {0}...'.format(backup_filename),
-              file=self.stderr)
-        with codecs.open(filename, 'r', 'utf-8') as read_fh:
-            with codecs.open(backup_filename, 'w', 'utf-8') as backup_fh:
-                for line in read_fh:
-                    backup_fh.write(line)
-                    lines.append(line)
-        return lines
-
-    def _to_new_key(self, line, keyfilter=pass1, source=defaults._TO_NEW_KEY):
-        # sort by length to avoid, for example, broker_transport overriding
-        # broker_transport_options.
-        for old_key in reversed(sorted(source, key=lambda x: len(x))):
-            new_line = line.replace(old_key, keyfilter(source[old_key]))
-            if line != new_line:
-                return new_line  # only one match per line.
-        return line
-
-    def _compat_key(self, key, namespace='CELERY'):
-        key = key.upper()
-        if not key.startswith(namespace):
-            key = '_'.join([namespace, key])
-        return key
-
-
 class help(Command):
     """Show help screen and exit."""
 

+ 0 - 4
celery/bin/celeryd_detach.py

@@ -9,17 +9,13 @@ import argparse
 import celery
 import os
 import sys
-
 from celery.platforms import EX_FAILURE, detached
 from celery.utils.log import get_logger
 from celery.utils.nodenames import default_nodename, node_format
-
 from celery.bin.base import daemon_options
 
 __all__ = ['detached_celeryd', 'detach']
-
 logger = get_logger(__name__)
-
 C_FAKEFORK = os.environ.get('C_FAKEFORK')
 
 

+ 237 - 0
celery/bin/control.py

@@ -0,0 +1,237 @@
+"""The ``celery control``, ``. inspect`` and ``. status`` programs."""
+from __future__ import absolute_import, unicode_literals
+from kombu.utils.json import dumps
+from kombu.utils.objects import cached_property
+from celery.five import items, string_t
+from celery.bin.base import Command
+from celery.platforms import EX_UNAVAILABLE, EX_USAGE
+from celery.utils import text
+
+
+class _RemoteControl(Command):
+
+    name = None
+    leaf = False
+    control_group = None
+
+    def __init__(self, *args, **kwargs):
+        self.show_body = kwargs.pop('show_body', True)
+        self.show_reply = kwargs.pop('show_reply', True)
+        super(_RemoteControl, self).__init__(*args, **kwargs)
+
+    def add_arguments(self, parser):
+        group = parser.add_argument_group('Remote Control Options')
+        group.add_argument(
+            '--timeout', '-t', type=float,
+            help='Timeout in seconds (float) waiting for reply',
+        )
+        group.add_argument(
+            '--destination', '-d',
+            help='Comma separated list of destination node names.')
+        group.add_argument(
+            '--json', '-j', action='store_true', default=False,
+            help='Use json as output format.',
+        )
+
+    @classmethod
+    def get_command_info(cls, command,
+                         indent=0, prefix='', color=None,
+                         help=False, app=None, choices=None):
+        if choices is None:
+            choices = cls._choices_by_group(app)
+        meta = choices[command]
+        if help:
+            help = '|' + text.indent(meta.help, indent + 4)
+        else:
+            help = None
+        return text.join([
+            '|' + text.indent('{0}{1} {2}'.format(
+                prefix, color(command), meta.signature or ''), indent),
+            help,
+        ])
+
+    @classmethod
+    def list_commands(cls, indent=0, prefix='',
+                      color=None, help=False, app=None):
+        choices = cls._choices_by_group(app)
+        color = color if color else lambda x: x
+        prefix = prefix + ' ' if prefix else ''
+        return '\n'.join(
+            cls.get_command_info(c, indent, prefix, color, help,
+                                 app=app, choices=choices)
+            for c in sorted(choices))
+
+    def usage(self, command):
+        return '%(prog)s {0} [options] {1} <command> [arg1 .. argN]'.format(
+            command, self.args)
+
+    def call(self, *args, **kwargs):
+        raise NotImplementedError('call')
+
+    def run(self, *args, **kwargs):
+        if not args:
+            raise self.UsageError(
+                'Missing {0.name} method.  See --help'.format(self))
+        return self.do_call_method(args, **kwargs)
+
+    def _ensure_fanout_supported(self):
+        with self.app.connection_for_write() as conn:
+            if not conn.supports_exchange_type('fanout'):
+                raise self.Error(
+                    'Broadcast not supported by transport {0!r}'.format(
+                        conn.info()['transport']))
+
+    def do_call_method(self, args,
+                       timeout=None, destination=None, json=False, **kwargs):
+        method = args[0]
+        if method == 'help':
+            raise self.Error("Did you mean '{0.name} --help'?".format(self))
+        try:
+            meta = self.choices[method]
+        except KeyError:
+            raise self.UsageError(
+                'Unknown {0.name} method {1}'.format(self, method))
+
+        self._ensure_fanout_supported()
+
+        timeout = timeout or meta.default_timeout
+        if destination and isinstance(destination, string_t):
+            destination = [dest.strip() for dest in destination.split(',')]
+
+        replies = self.call(
+            method,
+            arguments=self.compile_arguments(meta, method, args[1:]),
+            timeout=timeout,
+            destination=destination,
+            callback=None if json else self.say_remote_command_reply,
+        )
+        if not replies:
+            raise self.Error('No nodes replied within time constraint.',
+                             status=EX_UNAVAILABLE)
+        if json:
+            self.out(dumps(replies))
+        return replies
+
+    def compile_arguments(self, meta, method, args):
+        args = list(args)
+        kw = {}
+        if meta.args:
+            kw.update({
+                k: v for k, v in self._consume_args(meta, method, args)
+            })
+        if meta.variadic:
+            kw.update({meta.variadic: args})
+        if not kw and args:
+            raise self.Error(
+                'Command {0!r} takes no arguments.'.format(method),
+                status=EX_USAGE)
+        return kw or {}
+
+    def _consume_args(self, meta, method, args):
+        i = 0
+        try:
+            for i, arg in enumerate(args):
+                try:
+                    name, typ = meta.args[i]
+                except IndexError:
+                    if meta.variadic:
+                        break
+                    raise self.Error(
+                        'Command {0!r} takes arguments: {1}'.format(
+                            method, meta.signature),
+                        status=EX_USAGE)
+                else:
+                    yield name, typ(arg) if typ is not None else arg
+        finally:
+            args[:] = args[i:]
+
+    @classmethod
+    def _choices_by_group(cls, app):
+        from celery.worker.control import Panel
+        # need to import task modules for custom user-remote control commands.
+        app.loader.import_default_modules()
+
+        return {
+            name: info for name, info in items(Panel.meta)
+            if info.type == cls.control_group and info.visible
+        }
+
+    @cached_property
+    def choices(self):
+        return self._choices_by_group(self.app)
+
+    @property
+    def epilog(self):
+        return '\n'.join([
+            '[Commands]',
+            self.list_commands(indent=4, help=True, app=self.app)
+        ])
+
+
+class inspect(_RemoteControl):
+    """Inspect the worker at runtime.
+
+    Availability: RabbitMQ (AMQP) and Redis transports.
+
+    Examples:
+        .. code-block:: console
+
+            $ celery inspect active --timeout=5
+            $ celery inspect scheduled -d worker1@example.com
+            $ celery inspect revoked -d w1@e.com,w2@e.com
+    """
+
+    name = 'inspect'
+    control_group = 'inspect'
+
+    def call(self, method, arguments, **options):
+        return self.app.control.inspect(**options)._request(
+            method, **arguments)
+
+
+class control(_RemoteControl):
+    """Workers remote control.
+
+    Availability: RabbitMQ (AMQP), Redis, and MongoDB transports.
+
+    Examples:
+        .. code-block:: console
+
+            $ celery control enable_events --timeout=5
+            $ celery control -d worker1@example.com enable_events
+            $ celery control -d w1.e.com,w2.e.com enable_events
+
+            $ celery control -d w1.e.com add_consumer queue_name
+            $ celery control -d w1.e.com cancel_consumer queue_name
+
+            $ celery control add_consumer queue exchange direct rkey
+    """
+
+    name = 'control'
+    control_group = 'control'
+
+    def call(self, method, arguments, **options):
+        return self.app.control.broadcast(
+            method, arguments=arguments, reply=True, **options)
+
+
+class status(Command):
+    """Show list of workers that are online."""
+
+    option_list = inspect.option_list
+
+    def run(self, *args, **kwargs):
+        I = inspect(
+            app=self.app,
+            no_color=kwargs.get('no_color', False),
+            stdout=self.stdout, stderr=self.stderr,
+            show_reply=False, show_body=False, quiet=True,
+        )
+        replies = I.run('ping', **kwargs)
+        if not replies:
+            raise self.Error('No nodes replied within time constraint',
+                             status=EX_UNAVAILABLE)
+        nodecount = len(replies)
+        if not kwargs.get('quiet', False):
+            self.out('\n{0} {1} online.'.format(
+                nodecount, text.pluralize(nodecount, 'node')))

+ 0 - 2
celery/bin/events.py

@@ -66,9 +66,7 @@
     Executable to use for the detached process.
 """
 import sys
-
 from functools import partial
-
 from celery.platforms import detached, set_process_title, strargv
 from celery.bin.base import Command, daemon_options
 

+ 0 - 2
celery/bin/graph.py

@@ -4,9 +4,7 @@
 .. program:: celery graph
 """
 from operator import itemgetter
-
 from celery.utils.graph import DependencyGraph, GraphFormatter
-
 from .base import Command
 
 __all__ = ['graph']

+ 45 - 0
celery/bin/list.py

@@ -0,0 +1,45 @@
+"""The ``celery list bindings`` command, used to inspect queue bindings."""
+from __future__ import absolute_import, unicode_literals
+from celery.bin.base import Command
+
+
+class list_(Command):
+    """Get info from broker.
+
+    Note:
+       For RabbitMQ the management plugin is required.
+
+    Example:
+        .. code-block:: console
+
+            $ celery list bindings
+    """
+
+    args = '[bindings]'
+
+    def list_bindings(self, management):
+        try:
+            bindings = management.get_bindings()
+        except NotImplementedError:
+            raise self.Error('Your transport cannot list bindings.')
+
+        def fmt(q, e, r):
+            return self.out('{0:<28} {1:<28} {2}'.format(q, e, r))
+        fmt('Queue', 'Exchange', 'Routing Key')
+        fmt('-' * 16, '-' * 16, '-' * 16)
+        for b in bindings:
+            fmt(b['destination'], b['source'], b['routing_key'])
+
+    def run(self, what=None, *_, **kw):
+        topics = {'bindings': self.list_bindings}
+        available = ', '.join(topics)
+        if not what:
+            raise self.UsageError(
+                'Missing argument, specify one of: {0}'.format(available))
+        if what not in topics:
+            raise self.UsageError(
+                'unknown topic {0!r} (choose one of: {1})'.format(
+                    what, available))
+        with self.app.connection() as conn:
+            self.app.amqp.TaskConsumer(conn).declare()
+            topics[what](conn.manager)

+ 0 - 2
celery/bin/logtool.py

@@ -4,10 +4,8 @@
 .. program:: celery logtool
 """
 import re
-
 from collections import Counter
 from fileinput import FileInput
-
 from .base import Command
 
 __all__ = ['logtool']

+ 65 - 0
celery/bin/migrate.py

@@ -0,0 +1,65 @@
+"""The ``celery migrate`` command, used to filter and move messages."""
+from __future__ import absolute_import, unicode_literals
+from celery.bin.base import Command
+
+MIGRATE_PROGRESS_FMT = """\
+Migrating task {state.count}/{state.strtotal}: \
+{body[task]}[{body[id]}]\
+"""
+
+
+class migrate(Command):
+    """Migrate tasks from one broker to another.
+
+    Warning:
+        This command is experimental, make sure you have a backup of
+        the tasks before you continue.
+
+    Example:
+        .. code-block:: console
+
+            $ celery migrate amqp://A.example.com amqp://guest@B.example.com//
+            $ celery migrate redis://localhost amqp://guest@localhost//
+    """
+
+    args = '<source_url> <dest_url>'
+    progress_fmt = MIGRATE_PROGRESS_FMT
+
+    def add_arguments(self, parser):
+        group = parser.add_argument_group('Migration Options')
+        group.add_argument(
+            '--limit', '-n', type=int,
+            help='Number of tasks to consume (int)',
+        )
+        group.add_argument(
+            '--timeout', '-t', type=float, default=1.0,
+            help='Timeout in seconds (float) waiting for tasks',
+        )
+        group.add_argument(
+            '--ack-messages', '-a', action='store_true', default=False,
+            help='Ack messages from source broker.',
+        )
+        group.add_argument(
+            '--tasks', '-T',
+            help='List of task names to filter on.',
+        )
+        group.add_argument(
+            '--queues', '-Q',
+            help='List of queues to migrate.',
+        )
+        group.add_argument(
+            '--forever', '-F', action='store_true', default=False,
+            help='Continually migrate tasks until killed.',
+        )
+
+    def on_migrate_task(self, state, body, message):
+        self.out(self.progress_fmt.format(state=state, body=body))
+
+    def run(self, source, destination, **kwargs):
+        from kombu import Connection
+        from celery.contrib.migrate import migrate_tasks
+
+        migrate_tasks(Connection(source),
+                      Connection(destination),
+                      callback=self.on_migrate_task,
+                      **kwargs)

+ 0 - 3
celery/bin/multi.py

@@ -96,11 +96,8 @@ Examples
 import os
 import signal
 import sys
-
 from functools import wraps
-
 from kombu.utils.objects import cached_property
-
 from celery import VERSION_BANNER
 from celery.apps.multi import Cluster, MultiParser, NamespacedOptionParser
 from celery.platforms import EX_FAILURE, EX_OK, signals

+ 68 - 0
celery/bin/purge.py

@@ -0,0 +1,68 @@
+"""The ``celery purge`` program, used to delete messages from queues."""
+from __future__ import absolute_import, unicode_literals
+from celery.five import keys
+from celery.bin.base import Command
+from celery.utils import text
+
+
+class purge(Command):
+    """Erase all messages from all known task queues.
+
+    Warning:
+        There's no undo operation for this command.
+    """
+
+    warn_prelude = (
+        '{warning}: This will remove all tasks from {queues}: {names}.\n'
+        '         There is no undo for this operation!\n\n'
+        '(to skip this prompt use the -f option)\n'
+    )
+    warn_prompt = 'Are you sure you want to delete all tasks'
+
+    fmt_purged = 'Purged {mnum} {messages} from {qnum} known task {queues}.'
+    fmt_empty = 'No messages purged from {qnum} {queues}'
+
+    def add_arguments(self, parser):
+        group = parser.add_argument_group('Purging Options')
+        group.add_argument(
+            '--force', '-f', action='store_true', default=False,
+            help="Don't prompt for verification",
+        )
+        group.add_argument(
+            '--queues', '-Q', default=[],
+            help='Comma separated list of queue names to purge.',
+        )
+        group.add_argument(
+            '--exclude-queues', '-X', default=[],
+            help='Comma separated list of queues names not to purge.',
+        )
+
+    def run(self, force=False, queues=None, exclude_queues=None, **kwargs):
+        queues = set(text.str_to_list(queues or []))
+        exclude = set(text.str_to_list(exclude_queues or []))
+        names = (queues or set(keys(self.app.amqp.queues))) - exclude
+        qnum = len(names)
+
+        messages = None
+        if names:
+            if not force:
+                self.out(self.warn_prelude.format(
+                    warning=self.colored.red('WARNING'),
+                    queues=text.pluralize(qnum, 'queue'),
+                    names=', '.join(sorted(names)),
+                ))
+                if self.ask(self.warn_prompt, ('yes', 'no'), 'no') != 'yes':
+                    return
+            with self.app.connection_for_write() as conn:
+                messages = sum(self._purge(conn, queue) for queue in names)
+        fmt = self.fmt_purged if messages else self.fmt_empty
+        self.out(fmt.format(
+            mnum=messages, qnum=qnum,
+            messages=text.pluralize(messages, 'message'),
+            queues=text.pluralize(qnum, 'queue')))
+
+    def _purge(self, conn, queue):
+        try:
+            return conn.default_channel.queue_purge(queue) or 0
+        except conn.channel_errors:
+            return 0

+ 41 - 0
celery/bin/result.py

@@ -0,0 +1,41 @@
+"""The ``celery result`` program, used to inspect task results."""
+from __future__ import absolute_import, unicode_literals
+from celery.bin.base import Command
+
+
+class result(Command):
+    """Gives the return value for a given task id.
+
+    Examples:
+        .. code-block:: console
+
+            $ celery result 8f511516-e2f5-4da4-9d2f-0fb83a86e500
+            $ celery result 8f511516-e2f5-4da4-9d2f-0fb83a86e500 -t tasks.add
+            $ celery result 8f511516-e2f5-4da4-9d2f-0fb83a86e500 --traceback
+    """
+
+    args = '<task_id>'
+
+    def add_arguments(self, parser):
+        group = parser.add_argument_group('Result Options')
+        group.add_argument(
+            '--task', '-t', help='name of task (if custom backend)',
+        )
+        group.add_argument(
+            '--traceback', action='store_true', default=False,
+            help='show traceback instead',
+        )
+
+    def run(self, task_id, *args, **kwargs):
+        result_cls = self.app.AsyncResult
+        task = kwargs.get('task')
+        traceback = kwargs.get('traceback', False)
+
+        if task:
+            result_cls = self.app.tasks[task].AsyncResult
+        task_result = result_cls(task_id)
+        if traceback:
+            value = task_result.traceback
+        else:
+            value = task_result.get()
+        self.out(self.pretty(value)[1])

+ 157 - 0
celery/bin/shell.py

@@ -0,0 +1,157 @@
+"""The ``celery shell`` program, used to start a REPL."""
+from __future__ import absolute_import, unicode_literals
+import os
+import sys
+from importlib import import_module
+from celery.five import values
+from celery.bin.base import Command
+
+
+class shell(Command):  # pragma: no cover
+    """Start shell session with convenient access to celery symbols.
+
+    The following symbols will be added to the main globals:
+
+        - ``celery``:  the current application.
+        - ``chord``, ``group``, ``chain``, ``chunks``,
+          ``xmap``, ``xstarmap`` ``subtask``, ``Task``
+        - all registered tasks.
+    """
+
+    def add_arguments(self, parser):
+        group = parser.add_argument_group('Shell Options')
+        group.add_argument(
+            '--ipython', '-I',
+            action='store_true', help='force iPython.', default=False,
+        )
+        group.add_argument(
+            '--bpython', '-B',
+            action='store_true', help='force bpython.', default=False,
+        )
+        group.add_argument(
+            '--python',
+            action='store_true', default=False,
+            help='force default Python shell.',
+        )
+        group.add_argument(
+            '--without-tasks', '-T',
+            action='store_true', default=False,
+            help="don't add tasks to locals.",
+        )
+        group.add_argument(
+            '--eventlet',
+            action='store_true', default=False,
+            help='use eventlet.',
+        )
+        group.add_argument(
+            '--gevent', action='store_true', default=False,
+            help='use gevent.',
+        )
+
+    def run(self, *args, **kwargs):
+        if args:
+            raise self.UsageError(
+                'shell command does not take arguments: {0}'.format(args))
+        return self._run(**kwargs)
+
+    def _run(self, ipython=False, bpython=False,
+             python=False, without_tasks=False, eventlet=False,
+             gevent=False, **kwargs):
+        sys.path.insert(0, os.getcwd())
+        if eventlet:
+            import_module('celery.concurrency.eventlet')
+        if gevent:
+            import_module('celery.concurrency.gevent')
+        import celery
+        import celery.task.base
+        self.app.loader.import_default_modules()
+
+        # pylint: disable=attribute-defined-outside-init
+        self.locals = {
+            'app': self.app,
+            'celery': self.app,
+            'Task': celery.Task,
+            'chord': celery.chord,
+            'group': celery.group,
+            'chain': celery.chain,
+            'chunks': celery.chunks,
+            'xmap': celery.xmap,
+            'xstarmap': celery.xstarmap,
+            'subtask': celery.subtask,
+            'signature': celery.signature,
+        }
+
+        if not without_tasks:
+            self.locals.update({
+                task.__name__: task for task in values(self.app.tasks)
+                if not task.name.startswith('celery.')
+            })
+
+        if python:
+            return self.invoke_fallback_shell()
+        elif bpython:
+            return self.invoke_bpython_shell()
+        elif ipython:
+            return self.invoke_ipython_shell()
+        return self.invoke_default_shell()
+
+    def invoke_default_shell(self):
+        try:
+            import IPython  # noqa
+        except ImportError:
+            try:
+                import bpython  # noqa
+            except ImportError:
+                return self.invoke_fallback_shell()
+            else:
+                return self.invoke_bpython_shell()
+        else:
+            return self.invoke_ipython_shell()
+
+    def invoke_fallback_shell(self):
+        import code
+        try:
+            import readline
+        except ImportError:
+            pass
+        else:
+            import rlcompleter
+            readline.set_completer(
+                rlcompleter.Completer(self.locals).complete)
+            readline.parse_and_bind('tab:complete')
+        code.interact(local=self.locals)
+
+    def invoke_ipython_shell(self):
+        for ip in (self._ipython, self._ipython_pre_10,
+                   self._ipython_terminal, self._ipython_010,
+                   self._no_ipython):
+            try:
+                return ip()
+            except ImportError:
+                pass
+
+    def _ipython(self):
+        from IPython import start_ipython
+        start_ipython(argv=[], user_ns=self.locals)
+
+    def _ipython_pre_10(self):  # pragma: no cover
+        from IPython.frontend.terminal.ipapp import TerminalIPythonApp
+        app = TerminalIPythonApp.instance()
+        app.initialize(argv=[])
+        app.shell.user_ns.update(self.locals)
+        app.start()
+
+    def _ipython_terminal(self):  # pragma: no cover
+        from IPython.terminal import embed
+        embed.TerminalInteractiveShell(user_ns=self.locals).mainloop()
+
+    def _ipython_010(self):  # pragma: no cover
+        from IPython.Shell import IPShell
+        IPShell(argv=[], user_ns=self.locals).mainloop()
+
+    def _no_ipython(self):  # pragma: no cover
+        raise ImportError('no suitable ipython found')
+
+    def invoke_bpython_shell(self):
+        import bpython
+        bpython.embed(self.locals)

+ 92 - 0
celery/bin/upgrade.py

@@ -0,0 +1,92 @@
+"""The ``celery upgrade`` command, used to upgrade from previous versions."""
+from __future__ import absolute_import, print_function, unicode_literals
+import codecs
+from celery.app import defaults
+from celery.bin.base import Command
+from celery.utils.functional import pass1
+
+
+class upgrade(Command):
+    """Perform upgrade between versions."""
+
+    choices = {'settings'}
+
+    def add_arguments(self, parser):
+        group = parser.add_argument_group('Upgrading Options')
+        group.add_argument(
+            '--django', action='store_true', default=False,
+            help='Upgrade Django project',
+        )
+        group.add_argument(
+            '--compat', action='store_true', default=False,
+            help='Maintain backwards compatibility',
+        )
+        group.add_argument(
+            '--no-backup', action='store_true', default=False,
+            help='Dont backup original files',
+        )
+
+    def usage(self, command):
+        return '%(prog)s <command> settings [filename] [options]'
+
+    def run(self, *args, **kwargs):
+        try:
+            command = args[0]
+        except IndexError:
+            raise self.UsageError(
+                'missing upgrade type: try `celery upgrade settings` ?')
+        if command not in self.choices:
+            raise self.UsageError('unknown upgrade type: {0}'.format(command))
+        return getattr(self, command)(*args, **kwargs)
+
+    def settings(self, command, filename,
+                 no_backup=False, django=False, compat=False, **kwargs):
+        lines = self._slurp(filename)
+        keyfilter = self._compat_key if django or compat else pass1
+        print('processing {0}...'.format(filename), file=self.stderr)
+        # gives list of tuples: ``(did_change, line_contents)``
+        new_lines = [
+            self._to_new_key(line, keyfilter) for line in lines
+        ]
+        if any(n[0] for n in new_lines):  # did have changes
+            if not no_backup:
+                self._backup(filename)
+            with codecs.open(filename, 'w', 'utf-8') as write_fh:
+                for _, line in new_lines:
+                    write_fh.write(line)
+            print('Changes to your setting have been made!',
+                  file=self.stdout)
+        else:
+            print('Does not seem to require any changes :-)',
+                  file=self.stdout)
+
+    def _slurp(self, filename):
+        with codecs.open(filename, 'r', 'utf-8') as read_fh:
+            return [line for line in read_fh]
+
+    def _backup(self, filename, suffix='.orig'):
+        lines = []
+        backup_filename = ''.join([filename, suffix])
+        print('writing backup to {0}...'.format(backup_filename),
+              file=self.stderr)
+        with codecs.open(filename, 'r', 'utf-8') as read_fh:
+            with codecs.open(backup_filename, 'w', 'utf-8') as backup_fh:
+                for line in read_fh:
+                    backup_fh.write(line)
+                    lines.append(line)
+        return lines
+
+    def _to_new_key(self, line, keyfilter=pass1, source=defaults._TO_NEW_KEY):
+        # sort by length to avoid, for example, broker_transport overriding
+        # broker_transport_options.
+        for old_key in reversed(sorted(source, key=lambda x: len(x))):
+            new_line = line.replace(old_key, keyfilter(source[old_key]))
+            if line != new_line and 'CELERY_CELERY' not in new_line:
+                return 1, new_line  # only one match per line.
+        return 0, line
+
+    def _compat_key(self, key, namespace='CELERY'):
+        key = key.upper()
+        if not key.startswith(namespace):
+            key = '_'.join([namespace, key])
+        return key

+ 0 - 1
celery/bin/worker.py

@@ -176,7 +176,6 @@ The :program:`celery worker` command (previously known as ``celeryd``)
     Executable to use for the detached process.
 """
 import sys
-
 from celery import concurrency
 from celery.bin.base import Command, daemon_options
 from celery.bin.celeryd_detach import detached_celeryd

+ 145 - 207
celery/canvas.py

@@ -5,11 +5,13 @@
 
     You should import these from :mod:`celery` and not this module.
 """
+import itertools
+import operator
+
 from collections import MutableSequence, deque
 from copy import deepcopy
 from functools import partial as _partial, reduce
 from operator import itemgetter
-from itertools import chain as _chain
 
 from kombu.utils.functional import fxrange, reprcall
 from kombu.utils.objects import cached_property
@@ -21,8 +23,10 @@ from celery.result import GroupResult
 from celery.utils import abstract
 from celery.utils.functional import (
     maybe_list, is_list, _regen, regen, chunks as _chunks,
+    seq_concat_seq, seq_concat_item,
 )
-from celery.utils.text import truncate
+from celery.utils.objects import getitem_property
+from celery.utils.text import truncate, remove_repeating_from_task
 
 __all__ = [
     'Signature', 'chain', 'xmap', 'xstarmap', 'chunks',
@@ -30,90 +34,6 @@ __all__ = [
 ]
 
 
-def _shorten_names(task_name, s):
-    # type: (str, str) -> str
-    """Remove repeating module names from string.
-
-    Arguments:
-        task_name (str): Task name (full path including module),
-            to use as the basis for removing module names.
-        s (str): The string we want to work on.
-
-    Example:
-
-        >>> _shorten_names(
-        ...    'x.tasks.add',
-        ...    'x.tasks.add(2, 2) | x.tasks.add(4) | x.tasks.mul(8)',
-        ... )
-        'x.tasks.add(2, 2) | add(4) | mul(8)'
-    """
-    # This is used by repr(), to remove repeating module names.
-
-    # extract the module part of the task name
-    module = task_name.rpartition('.')[0] + '.'
-    # find the first occurance of the module name in the string.
-    index = s.find(module)
-    if index >= 0:
-        s = ''.join([
-            # leave the first occurance of the module name untouched.
-            s[:index + len(module)],
-            # strip seen module name from the rest of the string.
-            s[index + len(module):].replace(module, ''),
-        ])
-    return s
-
-
-class _getitem_property:
-    """Attribute -> dict key descriptor.
-
-    The target object must support ``__getitem__``,
-    and optionally ``__setitem__``.
-
-    Example:
-        >>> from collections import defaultdict
-
-        >>> class Me(dict):
-        ...     deep = defaultdict(dict)
-        ...
-        ...     foo = _getitem_property('foo')
-        ...     deep_thing = _getitem_property('deep.thing')
-
-
-        >>> me = Me()
-        >>> me.foo
-        None
-
-        >>> me.foo = 10
-        >>> me.foo
-        10
-        >>> me['foo']
-        10
-
-        >>> me.deep_thing = 42
-        >>> me.deep_thing
-        42
-        >>> me.deep
-        defaultdict(<type 'dict'>, {'thing': 42})
-    """
-
-    def __init__(self, keypath, doc=None):
-        path, _, self.key = keypath.rpartition('.')
-        self.path = path.split('.') if path else None
-        self.__doc__ = doc
-
-    def _path(self, obj):
-        return (reduce(lambda d, k: d[k], [obj] + self.path) if self.path
-                else obj)
-
-    def __get__(self, obj, type=None):
-        if obj is None:
-            return type
-        return self._path(obj).get(self.key)
-
-    def __set__(self, obj, value):
-        self._path(obj)[self.key] = value
-
-
 def maybe_unroll_group(g):
     """Unroll group with only one member."""
     # Issue #1656
@@ -140,34 +60,6 @@ def _upgrade(fields, sig):
     return sig
 
 
-def _seq_concat_item(seq, item):
-    """Return copy of sequence seq with item added.
-
-    Returns:
-        Sequence: if seq is a tuple, the result will be a tuple,
-           otherwise it depends on the implementation of ``__add__``.
-    """
-    return seq + (item,) if isinstance(seq, tuple) else seq + [item]
-
-
-def _seq_concat_seq(a, b):
-    """Concatenate two sequences: ``a + b``.
-
-    Returns:
-        Sequence: The return value will depend on the largest sequence
-            - if b is larger and is a tuple, the return value will be a tuple.
-            - if a is larger and is a list, the return value will be a list,
-    """
-    # find the type of the largest sequence
-    prefer = type(max([a, b], key=len))
-    # convert the smallest list to the type of the largest sequence.
-    if not isinstance(a, prefer):
-        a = prefer(a)
-    if not isinstance(b, prefer):
-        b = prefer(b)
-    return a + b
-
-
 @abstract.CallableSignature.register
 class Signature(dict):
     """Task Signature.
@@ -229,9 +121,11 @@ class Signature(dict):
     _app = _type = None
 
     @classmethod
-    def register_type(cls, subclass, name=None):
-        cls.TYPES[name or subclass.__name__] = subclass
-        return subclass
+    def register_type(cls, name=None):
+        def _inner(subclass):
+            cls.TYPES[name or subclass.__name__] = subclass
+            return subclass
+        return _inner
 
     @classmethod
     def from_dict(cls, d, app=None):
@@ -461,34 +355,46 @@ class Signature(dict):
 
         "unchain" if you will, but with links intact.
         """
-        return list(_chain.from_iterable(_chain(
+        return list(itertools.chain.from_iterable(itertools.chain(
             [[self]],
             (link.flatten_links()
                 for link in maybe_list(self.options.get('link')) or [])
         )))
 
     def __or__(self, other):
+        # These could be implemented in each individual class,
+        # I'm sure, but for now we have this.
+        if isinstance(other, chord) and len(other.tasks) == 1:
+            # chord with one header -> header[0] | body
+            other = other.tasks[0] | other.body
+
         if isinstance(self, group):
             if isinstance(other, group):
                 # group() | group() -> single group
-                return group(_chain(self.tasks, other.tasks), app=self.app)
+                return group(
+                    itertools.chain(self.tasks, other.tasks), app=self.app)
             # group() | task -> chord
+            if len(self.tasks) == 1:
+                # group(ONE.s()) | other -> ONE.s() | other
+                # Issue #3323
+                return self.tasks[0] | other
             return chord(self, body=other, app=self._app)
         elif isinstance(other, group):
             # unroll group with one member
             other = maybe_unroll_group(other)
-            if isinstance(self, chain):
+            if isinstance(self, _chain):
                 # chain | group() -> chain
                 sig = self.clone()
                 sig.tasks.append(other)
                 return sig
             # task | group() -> chain
-            return chain(self, other, app=self.app)
-        if not isinstance(self, chain) and isinstance(other, chain):
+            return _chain(self, other, app=self.app)
+
+        if not isinstance(self, _chain) and isinstance(other, _chain):
             # task | chain -> chain
-            return chain(
-                _seq_concat_seq((self,), other.tasks), app=self._app)
-        elif isinstance(other, chain):
+            return _chain(
+                seq_concat_seq((self,), other.tasks), app=self._app)
+        elif isinstance(other, _chain):
             # chain | chain -> chain
             sig = self.clone()
             if isinstance(sig.tasks, tuple):
@@ -496,12 +402,16 @@ class Signature(dict):
             sig.tasks.extend(other.tasks)
             return sig
         elif isinstance(self, chord):
+            # chord(ONE, body) | other -> ONE | body | other
+            # chord with one header task is unecessary.
+            if len(self.tasks) == 1:
+                return self.tasks[0] | self.body | other
             # chord | task ->  attach to body
             sig = self.clone()
             sig.body = sig.body | other
             return sig
         elif isinstance(other, Signature):
-            if isinstance(self, chain):
+            if isinstance(self, _chain):
                 if isinstance(self.tasks[-1], group):
                     # CHAIN [last item is group] | TASK -> chord
                     sig = self.clone()
@@ -515,10 +425,10 @@ class Signature(dict):
                     return sig
                 else:
                     # chain | task -> chain
-                    return chain(
-                        _seq_concat_item(self.tasks, other), app=self._app)
+                    return _chain(
+                        seq_concat_item(self.tasks, other), app=self._app)
             # task | task -> chain
-            return chain(self, other, app=self._app)
+            return _chain(self, other, app=self._app)
         return NotImplemented
 
     def election(self):
@@ -580,70 +490,23 @@ class Signature(dict):
             return self.type.apply_async
         except KeyError:
             return _partial(self.app.send_task, self['task'])
-    id = _getitem_property('options.task_id', 'Task UUID')
-    parent_id = _getitem_property('options.parent_id', 'Task parent UUID.')
-    root_id = _getitem_property('options.root_id', 'Task root UUID.')
-    task = _getitem_property('task', 'Name of task.')
-    args = _getitem_property('args', 'Positional arguments to task.')
-    kwargs = _getitem_property('kwargs', 'Keyword arguments to task.')
-    options = _getitem_property('options', 'Task execution options.')
-    subtask_type = _getitem_property('subtask_type', 'Type of signature')
-    chord_size = _getitem_property(
+    id = getitem_property('options.task_id', 'Task UUID')
+    parent_id = getitem_property('options.parent_id', 'Task parent UUID.')
+    root_id = getitem_property('options.root_id', 'Task root UUID.')
+    task = getitem_property('task', 'Name of task.')
+    args = getitem_property('args', 'Positional arguments to task.')
+    kwargs = getitem_property('kwargs', 'Keyword arguments to task.')
+    options = getitem_property('options', 'Task execution options.')
+    subtask_type = getitem_property('subtask_type', 'Type of signature')
+    chord_size = getitem_property(
         'chord_size', 'Size of chord (if applicable)')
-    immutable = _getitem_property(
+    immutable = getitem_property(
         'immutable', 'Flag set if no longer accepts new arguments')
 
 
-@Signature.register_type
-class chain(Signature):
-    """Chain tasks together.
-
-    Each tasks follows one another,
-    by being applied as a callback of the previous task.
-
-    Note:
-        If called with only one argument, then that argument must
-        be an iterable of tasks to chain: this allows us
-        to use generator expressions.
-
-    Example:
-        This is effectively :math:`((2 + 2) + 4)`:
-
-        .. code-block:: pycon
-
-            >>> res = chain(add.s(2, 2), add.s(4))()
-            >>> res.get()
-            8
-
-        Calling a chain will return the result of the last task in the chain.
-        You can get to the other tasks by following the ``result.parent``'s:
-
-        .. code-block:: pycon
-
-            >>> res.parent.get()
-            4
-
-        Using a generator expression:
-
-        .. code-block:: pycon
-
-            >>> lazy_chain = chain(add.s(i) for i in range(10))
-            >>> res = lazy_chain(3)
-
-    Arguments:
-        *tasks (Signature): List of task signatures to chain.
-            If only one argument is passed and that argument is
-            an iterable, then that'll be used as the list of signatures
-            to chain instead.  This means that you can use a generator
-            expression.
-
-    Returns:
-        ~celery.chain: A lazy signature that can be called to apply the first
-            task in the chain.  When that task succeeed the next task in the
-            chain is applied, and so on.
-    """
-
-    tasks = _getitem_property('kwargs.tasks', 'Tasks in chain.')
+@Signature.register_type(name='chain')
+class _chain(Signature):
+    tasks = getitem_property('kwargs.tasks', 'Tasks in chain.')
 
     @classmethod
     def from_dict(cls, d, app=None):
@@ -653,7 +516,7 @@ class chain(Signature):
                 tasks = d['kwargs']['tasks'] = list(tasks)
             # First task must be signature object to get app
             tasks[0] = maybe_signature(tasks[0], app=app)
-        return _upgrade(d, chain(tasks, app=app, **d['options']))
+        return _upgrade(d, _chain(tasks, app=app, **d['options']))
 
     def __init__(self, *tasks, **options):
         tasks = (regen(tasks[0]) if len(tasks) == 1 and is_list(tasks[0])
@@ -749,6 +612,11 @@ class chain(Signature):
         prev_res = None
         tasks, results = [], []
         i = 0
+        # NOTE: We are doing this in reverse order.
+        # The result is a list of tasks in reverse order, that is
+        # passed as the ``chain`` message field.
+        # As it's reversed the worker can just do ``chain.pop()`` to
+        # get the next task in the chain.
         while steps:
             task = steps_pop()
             is_first_task, is_last_task = not steps, not i
@@ -764,7 +632,7 @@ class chain(Signature):
             elif is_first_task:
                 task.args = tuple(args) + tuple(task.args)
 
-            if isinstance(task, chain):
+            if isinstance(task, _chain):
                 # splice the chain
                 steps_extend(task.tasks)
                 continue
@@ -812,6 +680,7 @@ class chain(Signature):
 
             prev_task, prev_res = task, res
             if isinstance(task, chord):
+                app.backend.ensure_chords_allowed()
                 # If the task is a chord, and the body is a chain
                 # the chain has already been prepared, and res is
                 # set to the last task in the callback chain.
@@ -846,11 +715,70 @@ class chain(Signature):
         if not self.tasks:
             return '<{0}@{1:#x}: empty>'.format(
                 type(self).__name__, id(self))
-        return _shorten_names(
+        return remove_repeating_from_task(
             self.tasks[0]['task'],
             ' | '.join(repr(t) for t in self.tasks))
 
 
+class chain(_chain):
+    """Chain tasks together.
+
+    Each tasks follows one another,
+    by being applied as a callback of the previous task.
+
+    Note:
+        If called with only one argument, then that argument must
+        be an iterable of tasks to chain: this allows us
+        to use generator expressions.
+
+    Example:
+        This is effectively :math:`((2 + 2) + 4)`:
+
+        .. code-block:: pycon
+
+            >>> res = chain(add.s(2, 2), add.s(4))()
+            >>> res.get()
+            8
+
+        Calling a chain will return the result of the last task in the chain.
+        You can get to the other tasks by following the ``result.parent``'s:
+
+        .. code-block:: pycon
+
+            >>> res.parent.get()
+            4
+
+        Using a generator expression:
+
+        .. code-block:: pycon
+
+            >>> lazy_chain = chain(add.s(i) for i in range(10))
+            >>> res = lazy_chain(3)
+
+    Arguments:
+        *tasks (Signature): List of task signatures to chain.
+            If only one argument is passed and that argument is
+            an iterable, then that'll be used as the list of signatures
+            to chain instead.  This means that you can use a generator
+            expression.
+
+    Returns:
+        ~celery.chain: A lazy signature that can be called to apply the first
+            task in the chain.  When that task succeeed the next task in the
+            chain is applied, and so on.
+    """
+
+    # could be function, but must be able to reference as :class:`chain`.
+    def __new__(cls, *tasks, **kwargs):
+        # This forces `chain(X, Y, Z)` to work the same way as `X | Y | Z`
+        if not kwargs and tasks:
+            if len(tasks) == 1 and is_list(tasks[0]):
+                # ensure chain(generator_expression) works.
+                tasks = tasks[0]
+            return reduce(operator.or_, tasks)
+        return super(chain, cls).__new__(cls, *tasks, **kwargs)
+
+
 class _basemap(Signature):
     _task_name = None
     _unpack_args = itemgetter('task', 'it')
@@ -876,7 +804,7 @@ class _basemap(Signature):
         )
 
 
-@Signature.register_type
+@Signature.register_type()
 class xmap(_basemap):
     """Map operation for tasks.
 
@@ -893,7 +821,7 @@ class xmap(_basemap):
             task.task, truncate(repr(it), 100))
 
 
-@Signature.register_type
+@Signature.register_type()
 class xstarmap(_basemap):
     """Map operation for tasks, using star arguments."""
 
@@ -905,7 +833,7 @@ class xstarmap(_basemap):
             task.task, truncate(repr(it), 100))
 
 
-@Signature.register_type
+@Signature.register_type()
 class chunks(Signature):
     """Partition of tasks in n chunks."""
 
@@ -950,7 +878,7 @@ def _maybe_group(tasks, app):
     if isinstance(tasks, dict):
         tasks = signature(tasks, app=app)
 
-    if isinstance(tasks, (group, chain)):
+    if isinstance(tasks, (group, _chain)):
         tasks = tasks.tasks
     elif isinstance(tasks, abstract.CallableSignature):
         tasks = [tasks]
@@ -959,7 +887,7 @@ def _maybe_group(tasks, app):
     return tasks
 
 
-@Signature.register_type
+@Signature.register_type()
 class group(Signature):
     """Creates a group of tasks to be executed in parallel.
 
@@ -990,7 +918,7 @@ class group(Signature):
             that can be used to inspect the state of the group).
     """
 
-    tasks = _getitem_property('kwargs.tasks', 'Tasks in group.')
+    tasks = getitem_property('kwargs.tasks', 'Tasks in group.')
 
     @classmethod
     def from_dict(cls, d, app=None):
@@ -1180,7 +1108,7 @@ class group(Signature):
 
     def __repr__(self):
         if self.tasks:
-            return _shorten_names(
+            return remove_repeating_from_task(
                 self.tasks[0]['task'],
                 'group({0.tasks!r})'.format(self))
         return 'group(<empty>)'
@@ -1199,7 +1127,7 @@ class group(Signature):
         return app if app is not None else current_app
 
 
-@Signature.register_type
+@Signature.register_type()
 class chord(Signature):
     r"""Barrier synchronization primitive.
 
@@ -1263,8 +1191,12 @@ class chord(Signature):
         # but the body may actually be a chain,
         # so find the first result without a parent
         node = bodyres
+        seen = set()
         while node:
-            if not node.parent:
+            if node.id in seen:
+                raise RuntimeError('Recursive result parents')
+            seen.add(node.id)
+            if node.parent is None:
                 node.parent = header_result
                 break
             node = node.parent
@@ -1286,6 +1218,12 @@ class chord(Signature):
         if app.conf.task_always_eager:
             return self.apply(args, kwargs,
                               body=body, task_id=task_id, **options)
+        if len(self.tasks) == 1:
+            # chord([A], B) can be optimized as A | B
+            # - Issue #3323
+            return (self.tasks[0].set(task_id=task_id) | body).apply_async(
+                args, kwargs, **options)
+        # chord([A, B, ...], C)
         return self.run(tasks, body, args, task_id=task_id, **options)
 
     def apply(self, args=(), kwargs={}, propagate=True, body=None, **options):
@@ -1356,15 +1294,15 @@ class chord(Signature):
 
     def __repr__(self):
         if self.body:
-            if isinstance(self.body, chain):
-                return _shorten_names(
+            if isinstance(self.body, _chain):
+                return remove_repeating_from_task(
                     self.body.tasks[0]['task'],
-                    '({0} | {1!r})'.format(
+                    '%({0} | {1!r})'.format(
                         self.body.tasks[0].reprcall(self.tasks),
                         chain(self.body.tasks[1:], app=self._app),
                     ),
                 )
-            return _shorten_names(
+            return '%' + remove_repeating_from_task(
                 self.body['task'], self.body.reprcall(self.tasks))
         return '<chord without body: {0.tasks!r}>'.format(self)
 
@@ -1384,8 +1322,8 @@ class chord(Signature):
                 app = body._app
         return app if app is not None else current_app
 
-    tasks = _getitem_property('kwargs.header', 'Tasks in chord header.')
-    body = _getitem_property('kwargs.body', 'Body task of chord.')
+    tasks = getitem_property('kwargs.header', 'Tasks in chord header.')
+    body = getitem_property('kwargs.body', 'Body task of chord.')
 
 
 def signature(varies, *args, **kwargs):

+ 5 - 4
celery/concurrency/asynpool.py

@@ -233,7 +233,7 @@ class ResultHandler(_pool.ResultHandler):
                            else EOFError())
                 Hr += n
 
-        body_size, = unpack_from('>i', bufv)
+        body_size, = unpack_from(b'>i', bufv)
         if readcanbuf:
             buf = bytearray(body_size)
             bufv = memoryview(buf)
@@ -1131,11 +1131,12 @@ class AsynPool(_pool.Pool):
     def _setup_queues(self):
         # this is only used by the original pool that used a shared
         # queue for all processes.
+        self._quick_put = None
 
-        # these attributes makes no sense for us, but we'll still
-        # have to initialize them.
+        # these attributes are unused by this class, but we'll still
+        # have to initialize them for compatibility.
         self._inqueue = self._outqueue = \
-            self._quick_put = self._quick_get = self._poll_result = None
+            self._quick_get = self._poll_result = None
 
     def process_flush_queues(self, proc):
         """Flush all queues.

+ 0 - 1
celery/concurrency/gevent.py

@@ -3,7 +3,6 @@
 from kombu.async import timer as _timer
 from time import monotonic
 from . import base
-
 try:
     from gevent import Timeout
 except ImportError:  # pragma: no cover

+ 0 - 1
celery/concurrency/solo.py

@@ -1,7 +1,6 @@
 # -*- coding: utf-8 -*-
 """Single-threaded execution pool."""
 import os
-
 from .base import BasePool, apply_target
 
 __all__ = ['TaskPool']

+ 0 - 16
celery/contrib/pytest.py

@@ -1,11 +1,7 @@
 """Fixtures and testing utilities for :pypi:`py.test <pytest>`."""
 import os
 import pytest
-
 from contextlib import contextmanager
-
-from celery.backends.cache import CacheBackend, DummyClient
-
 from .testing import worker
 from .testing.app import TestApp, setup_default_app
 
@@ -152,15 +148,3 @@ def celery_worker(request, celery_app, celery_includes, celery_worker_pool):
 def depends_on_current_app(celery_app):
     """Fixture that sets app as current."""
     celery_app.set_current()
-
-
-@pytest.fixture(autouse=True)
-def reset_cache_backend_state(celery_app):
-    """Fixture that resets the internal state of the cache result backend."""
-    yield
-    backend = celery_app.__dict__.get('backend')
-    if backend is not None:
-        if isinstance(backend, CacheBackend):
-            if isinstance(backend.client, DummyClient):
-                backend.client.cache.clear()
-            backend._cache.clear()

+ 0 - 2
celery/contrib/rdb.py

@@ -45,9 +45,7 @@ import errno
 import os
 import socket
 import sys
-
 from pdb import Pdb
-
 from billiard.process import current_process
 
 __all__ = [

+ 0 - 2
celery/contrib/sphinx.py

@@ -29,10 +29,8 @@ using `:task:proj.tasks.add` syntax.
 Use ``.. autotask::`` to manually document a task.
 """
 from inspect import formatargspec, getfullargspec
-
 from sphinx.domains.python import PyModulelevel
 from sphinx.ext.autodoc import FunctionDocumenter
-
 from celery.app.task import BaseTask
 
 

+ 0 - 3
celery/contrib/testing/app.py

@@ -1,11 +1,8 @@
 """Create Celery app instances used for testing."""
 import weakref
-
 from contextlib import contextmanager
 from copy import deepcopy
-
 from kombu.utils.imports import symbol_by_name
-
 from celery import Celery
 from celery import _state
 

+ 2 - 10
celery/contrib/testing/manager.py

@@ -15,21 +15,13 @@ from celery.utils.time import humanize_seconds as _humanize_seconds
 
 E_STILL_WAITING = 'Still waiting for {0}.  Trying again {when}: {exc!r}'
 
+humanize_seconds = partial(_humanize_seconds, microseconds=True)
+
 
 class Sentinel(Exception):
     """Signifies the end of something."""
 
 
-def humanize_seconds(secs, prefix='', sep='', now='now', **kwargs):
-    # type: (float, str, str, str, **Any) -> str
-    """Represent seconds in a human readable way."""
-    s = _humanize_seconds(secs, prefix, sep, now, **kwargs)
-    if s == now and secs > 0:
-        return '{prefix}{sep}{0:.2f} seconds'.format(
-            float(secs), prefix=prefix, sep=sep)
-    return s
-
-
 class ManagerMixin:
     """Mixin that adds :class:`Manager` capabilities."""
 

+ 0 - 1
celery/contrib/testing/mocks.py

@@ -1,7 +1,6 @@
 """Useful mocks for unit testing."""
 import numbers
 from datetime import datetime, timedelta
-
 try:
     from case import Mock
 except ImportError:

+ 0 - 2
celery/contrib/testing/worker.py

@@ -1,9 +1,7 @@
 """Embedded workers for integration tests."""
 import os
 import threading
-
 from contextlib import contextmanager
-
 from celery import worker
 from celery.result import allow_join_result, _set_task_join_will_block
 from celery.utils.dispatch import Signal

+ 0 - 2
celery/events/dumper.py

@@ -5,9 +5,7 @@ This is a simple program that dumps events to the console
 as they happen.  Think of it like a `tcpdump` for Celery events.
 """
 import sys
-
 from datetime import datetime
-
 from celery.app import app_or_default
 from celery.utils.functional import LRUCache
 from celery.utils.time import humanize_seconds

+ 0 - 1
celery/events/snapshot.py

@@ -8,7 +8,6 @@ implementation of this writing the snapshots to a database
 in :mod:`djcelery.snapshots` in the `django-celery` distribution.
 """
 from kombu.utils.limits import TokenBucket
-
 from celery import platforms
 from celery.app import app_or_default
 from celery.utils.timer2 import Timer

+ 121 - 48
celery/exceptions.py

@@ -1,22 +1,89 @@
 # -*- coding: utf-8 -*-
-"""Celery Exceptions."""
+"""Celery error types.
+
+Error Hierarchy
+===============
+
+- :exc:`Exception`
+    - :exc:`celery.exceptions.CeleryError`
+        - :exc:`~celery.exceptions.ImproperlyConfigured`
+        - :exc:`~celery.exceptions.SecurityError`
+        - :exc:`~celery.exceptions.TaskPredicate`
+            - :exc:`~celery.exceptions.Ignore`
+            - :exc:`~celery.exceptions.Reject`
+            - :exc:`~celery.exceptions.Retry`
+        - :exc:`~celery.exceptions.TaskError`
+            - :exc:`~celery.exceptions.QueueNotFound`
+            - :exc:`~celery.exceptions.IncompleteStream`
+            - :exc:`~celery.exceptions.NotRegistered`
+            - :exc:`~celery.exceptions.AlreadyRegistered`
+            - :exc:`~celery.exceptions.TimeoutError`
+            - :exc:`~celery.exceptions.MaxRetriesExceededError`
+            - :exc:`~celery.exceptions.TaskRevokedError`
+            - :exc:`~celery.exceptions.InvalidTaskError`
+            - :exc:`~celery.exceptions.ChordError`
+    - :class:`kombu.exceptions.KombuError`
+        - :exc:`~celery.exceptions.OperationalError`
+
+            Raised when a transport connection error occurs while
+            sending a message (be it a task, remote control command error).
+
+            .. note::
+                This exception does not inherit from
+                :exc:`~celery.exceptions.CeleryError`.
+    - **billiard errors** (prefork pool)
+        - :exc:`~celery.exceptions.SoftTimeLimitExceeded`
+        - :exc:`~celery.exceptions.TimeLimitExceeded`
+        - :exc:`~celery.exceptions.WorkerLostError`
+        - :exc:`~celery.exceptions.Terminated`
+- :class:`UserWarning`
+    - :class:`~celery.exceptions.CeleryWarning`
+        - :class:`~celery.exceptions.AlwaysEagerIgnored`
+        - :class:`~celery.exceptions.DuplicateNodenameWarning`
+        - :class:`~celery.exceptions.FixupWarning`
+        - :class:`~celery.exceptions.NotConfigured`
+- :exc:`BaseException`
+    - :exc:`SystemExit`
+        - :exc:`~celery.exceptions.WorkerTerminate`
+        - :exc:`~celery.exceptions.WorkerShutdown`
+"""
 import numbers
-
-from billiard.exceptions import (  # noqa
+from billiard.exceptions import (
     SoftTimeLimitExceeded, TimeLimitExceeded, WorkerLostError, Terminated,
 )
+from kombu.exceptions import OperationalError
 
 __all__ = [
-    'CeleryError', 'CeleryWarning', 'TaskPredicate',
-    'SecurityError', 'Ignore', 'QueueNotFound',
+    # Warnings
+    'CeleryWarning',
+    'AlwaysEagerIgnored', 'DuplicateNodenameWarning',
+    'FixupWarning', 'NotConfigured',
+
+    # Core errors
+    'CeleryError',
+    'ImproperlyConfigured', 'SecurityError',
+
+    # Kombu (messaging) errors.
+    'OperationalError',
+
+    # Task semi-predicates
+    'TaskPredicate', 'Ignore', 'Reject', 'Retry',
+
+    # Task related errors.
+    'TaskError', 'QueueNotFound', 'IncompleteStream',
+    'NotRegistered', 'AlreadyRegistered', 'TimeoutError',
+    'MaxRetriesExceededError', 'TaskRevokedError',
+    'InvalidTaskError', 'ChordError',
+
+    # Billiard task errors.
+    'SoftTimeLimitExceeded', 'TimeLimitExceeded',
+    'WorkerLostError', 'Terminated',
+
+    # Deprecation warnings (forcing Python to emit them).
+    'CPendingDeprecationWarning', 'CDeprecationWarning',
+
+    # Worker shutdown semi-predicates (inherits from SystemExit).
     'WorkerShutdown', 'WorkerTerminate',
-    'ImproperlyConfigured', 'NotRegistered', 'AlreadyRegistered',
-    'TimeoutError', 'MaxRetriesExceededError', 'Retry', 'Reject',
-    'TaskRevokedError', 'NotConfigured', 'AlwaysEagerIgnored',
-    'InvalidTaskError', 'ChordError', 'CPendingDeprecationWarning',
-    'CDeprecationWarning', 'FixupWarning', 'DuplicateNodenameWarning',
-    'SoftTimeLimitExceeded', 'TimeLimitExceeded', 'WorkerLostError',
-    'Terminated', 'IncompleteStream'
 ]
 
 UNREGISTERED_FMT = """\
@@ -24,16 +91,28 @@ Task of kind {0} never registered, please make sure it's imported.\
 """
 
 
-class CeleryError(Exception):
-    """Base class for all Celery errors."""
-
-
 class CeleryWarning(UserWarning):
     """Base class for all Celery warnings."""
 
 
-class SecurityError(CeleryError):
-    """Security related exception."""
+class AlwaysEagerIgnored(CeleryWarning):
+    """send_task ignores :setting:`task_always_eager` option."""
+
+
+class DuplicateNodenameWarning(CeleryWarning):
+    """Multiple workers are using the same nodename."""
+
+
+class FixupWarning(CeleryWarning):
+    """Fixup related warning."""
+
+
+class NotConfigured(CeleryWarning):
+    """Celery hasn't been configured, as no config module has been found."""
+
+
+class CeleryError(Exception):
+    """Base class for all Celery errors."""
 
 
 class TaskPredicate(CeleryError):
@@ -95,62 +174,55 @@ class Reject(TaskPredicate):
         return 'reject requeue=%s: %s' % (self.requeue, self.reason)
 
 
-class WorkerTerminate(SystemExit):
-    """Signals that the worker should terminate immediately."""
+class ImproperlyConfigured(CeleryError):
+    """Celery is somehow improperly configured."""
 
 
-class WorkerShutdown(SystemExit):
-    """Signals that the worker should perform a warm shutdown."""
+class SecurityError(CeleryError):
+    """Security related exception."""
 
 
-class QueueNotFound(KeyError):
+class TaskError(CeleryError):
+    """Task related errors."""
+
+
+class QueueNotFound(KeyError, TaskError):
     """Task routed to a queue not in ``conf.queues``."""
 
 
-class ImproperlyConfigured(ImportError):
-    """Celery is somehow improperly configured."""
+class IncompleteStream(TaskError):
+    """Found the end of a stream of data, but the data isn't complete."""
 
 
-class NotRegistered(KeyError, CeleryError):
+class NotRegistered(KeyError, TaskError):
     """The task ain't registered."""
 
     def __repr__(self):
         return UNREGISTERED_FMT.format(self)
 
 
-class AlreadyRegistered(CeleryError):
+class AlreadyRegistered(TaskError):
     """The task is already registered."""
+    # XXX Unused
 
 
-class TimeoutError(CeleryError):
+class TimeoutError(TaskError):
     """The operation timed out."""
 
 
-class MaxRetriesExceededError(CeleryError):
+class MaxRetriesExceededError(TaskError):
     """The tasks max restart limit has been exceeded."""
 
 
-class TaskRevokedError(CeleryError):
+class TaskRevokedError(TaskError):
     """The task has been revoked, so no result available."""
 
 
-class NotConfigured(CeleryWarning):
-    """Celery hasn't been configured, as no config module has been found."""
-
-
-class AlwaysEagerIgnored(CeleryWarning):
-    """send_task ignores :setting:`task_always_eager` option."""
-
-
-class InvalidTaskError(CeleryError):
+class InvalidTaskError(TaskError):
     """The task has invalid data or ain't properly constructed."""
 
 
-class IncompleteStream(CeleryError):
-    """Found the end of a stream of data, but the data isn't complete."""
-
-
-class ChordError(CeleryError):
+class ChordError(TaskError):
     """A task part of the chord raised an exception."""
 
 
@@ -162,9 +234,10 @@ class CDeprecationWarning(DeprecationWarning):
     """Warning of deprecation."""
 
 
-class FixupWarning(CeleryWarning):
-    """Fixup related warning."""
+class WorkerTerminate(SystemExit):
+    """Signals that the worker should terminate immediately."""
+SystemTerminate = WorkerTerminate  # XXX compat
 
 
-class DuplicateNodenameWarning(CeleryWarning):
-    """Multiple workers are using the same nodename."""
+class WorkerShutdown(SystemExit):
+    """Signals that the worker should perform a warm shutdown."""

+ 7 - 1
celery/fixups/django.py

@@ -11,7 +11,7 @@ from importlib import import_module
 
 from celery import _state
 from celery import signals
-from celery.exceptions import FixupWarning
+from celery.exceptions import FixupWarning, ImproperlyConfigured
 
 __all__ = ['DjangoFixup', 'fixup']
 
@@ -29,6 +29,11 @@ def _maybe_close_fd(fh):
         pass
 
 
+def _verify_django_version(django):
+    if django.VERSION < (1, 8):
+        raise ImproperlyConfigured('Celery 4.x requires Django 1.8 or later.')
+
+
 def fixup(app, env='DJANGO_SETTINGS_MODULE'):
     """Install Django fixup if settings module environment is set."""
     SETTINGS_MODULE = os.environ.get(env)
@@ -38,6 +43,7 @@ def fixup(app, env='DJANGO_SETTINGS_MODULE'):
         except ImportError:
             warnings.warn(FixupWarning(ERR_NOT_INSTALLED))
         else:
+            _verify_django_version(django)
             return DjangoFixup(app).install()
 
 

+ 0 - 2
celery/loaders/default.py

@@ -2,11 +2,9 @@
 """The default loader used when no custom app has been initialized."""
 import os
 import warnings
-
 from celery.exceptions import NotConfigured
 from celery.utils.collections import DictAttribute
 from celery.utils.serialization import strtobool
-
 from .base import BaseLoader
 
 __all__ = ['Loader', 'DEFAULT_CONFIG_MODULE']

+ 0 - 2
celery/security/__init__.py

@@ -3,9 +3,7 @@
 from kombu.serialization import (
     registry, disable_insecure_serializers as _disable_insecure_serializers,
 )
-
 from celery.exceptions import ImproperlyConfigured
-
 from .serialization import register_auth
 
 SSL_NOT_INSTALLED = """\

+ 0 - 3
celery/security/certificate.py

@@ -2,11 +2,8 @@
 """X.509 certificates."""
 import glob
 import os
-
 from kombu.utils.encoding import bytes_to_str
-
 from celery.exceptions import SecurityError
-
 from .utils import crypto, reraise_errors
 
 __all__ = ['Certificate', 'CertStore', 'FSCertStore']

+ 0 - 1
celery/security/key.py

@@ -1,7 +1,6 @@
 # -*- coding: utf-8 -*-
 """Private keys for the security serializer."""
 from kombu.utils.encoding import ensure_bytes
-
 from .utils import crypto, reraise_errors
 
 __all__ = ['PrivateKey']

+ 0 - 3
celery/security/utils.py

@@ -1,11 +1,8 @@
 # -*- coding: utf-8 -*-
 """Utilities used by the message signing serializer."""
 import sys
-
 from contextlib import contextmanager
-
 from celery.exceptions import SecurityError
-
 try:
     from OpenSSL import crypto
 except ImportError:  # pragma: no cover

+ 1 - 1
celery/utils/__init__.py

@@ -4,7 +4,7 @@
 Don't import from here directly anymore, as these are only
 here for backwards compatibility.
 """
-from .functional import memoize
+from .functional import memoize  # noqa
 from .nodenames import worker_direct, nodename, nodesplit
 
 __all__ = ['worker_direct', 'gen_task_name', 'nodename', 'nodesplit',

+ 0 - 1
celery/utils/abstract.py

@@ -2,7 +2,6 @@
 """Abstract classes."""
 from abc import ABCMeta, abstractmethod, abstractproperty
 from collections import Callable
-
 from typing import Any, Sequence, Tuple
 
 __all__ = ['CallableTask', 'CallableSignature']

+ 0 - 3
celery/utils/deprecated.py

@@ -1,11 +1,8 @@
 # -*- coding: utf-8 -*-
 """Deprecation utilities."""
 import warnings
-
 from typing import Any, Callable, Optional
-
 from vine.utils import wraps
-
 from celery.exceptions import CPendingDeprecationWarning, CDeprecationWarning
 
 __all__ = ['Callable', 'Property', 'warn']

+ 0 - 2
celery/utils/dispatch/signal.py

@@ -1,10 +1,8 @@
 # -*- coding: utf-8 -*-
 """Implementation of the Observer pattern."""
 import weakref
-
 from celery.local import PromiseProxy, Proxy
 from celery.utils.log import get_logger
-
 from . import saferef
 
 __all__ = ['Signal']

+ 35 - 0
celery/utils/functional.py

@@ -118,6 +118,13 @@ def firstmethod(method: str, on_call: Optional[Callable]=None) -> Any:
 def chunks(it: Iterable, n: int) -> Iterable:
     """Split an iterator into chunks with `n` elements each.
 
+    Warning:
+        ``it`` must be an actual iterator, if you pass this a
+        concrete sequence will get you repeating elements.
+
+        So ``chunks(iter(range(1000)), 10)`` is fine, but
+        ``chunks(range(1000), 10)`` is not.
+
     Example:
         # n == 2
         >>> x = chunks(iter([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]), 2)
@@ -284,3 +291,31 @@ def fun_takes_argument(name: str, fun: Callable,
 def maybe(typ, val):
     """Call typ on value if val is defined."""
     return typ(val) if val is not None else val
+
+
+def seq_concat_item(seq, item):
+    """Return copy of sequence seq with item added.
+
+    Returns:
+        Sequence: if seq is a tuple, the result will be a tuple,
+           otherwise it depends on the implementation of ``__add__``.
+    """
+    return seq + (item,) if isinstance(seq, tuple) else seq + [item]
+
+
+def seq_concat_seq(a, b):
+    """Concatenate two sequences: ``a + b``.
+
+    Returns:
+        Sequence: The return value will depend on the largest sequence
+            - if b is larger and is a tuple, the return value will be a tuple.
+            - if a is larger and is a list, the return value will be a list,
+    """
+    # find the type of the largest sequence
+    prefer = type(max([a, b], key=len))
+    # convert the smallest list to the type of the largest sequence.
+    if not isinstance(a, prefer):
+        a = prefer(a)
+    if not isinstance(b, prefer):
+        b = prefer(b)
+    return a + b

+ 0 - 2
celery/utils/imports.py

@@ -5,12 +5,10 @@ import importlib
 import os
 import sys
 import warnings
-
 from contextlib import contextmanager
 from imp import reload
 from types import ModuleType
 from typing import Any, Callable, Iterator, Optional
-
 from kombu.utils.imports import symbol_by_name
 
 #: Billiard sets this when execv is enabled.

+ 0 - 1
celery/utils/iso8601.py

@@ -33,7 +33,6 @@ TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
 SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 """
 import re
-
 from datetime import datetime
 from pytz import FixedOffset
 

+ 0 - 3
celery/utils/nodenames.py

@@ -2,12 +2,9 @@
 """Worker name utilities."""
 import os
 import socket
-
 from functools import partial
 from typing import Dict, Optional, Tuple
-
 from kombu.entity import Exchange, Queue
-
 from .functional import memoize
 from .text import simple_format
 

+ 53 - 1
celery/utils/objects.py

@@ -1,8 +1,9 @@
 # -*- coding: utf-8 -*-
 """Object related utilities, including introspection, etc."""
+from functools import reduce
 from typing import Any, Callable, Set, Sequence
 
-__all__ = ['Bunch', 'FallbackContext', 'mro_lookup']
+__all__ = ['Bunch', 'FallbackContext', 'getitem_property', 'mro_lookup']
 
 
 class Bunch:
@@ -90,3 +91,54 @@ class FallbackContext:
     def __exit__(self, *exc_info) -> Any:
         if self._context is not None:
             return self._context.__exit__(*exc_info)
+
+
+class getitem_property(object):
+    """Attribute -> dict key descriptor.
+
+    The target object must support ``__getitem__``,
+    and optionally ``__setitem__``.
+
+    Example:
+        >>> from collections import defaultdict
+
+        >>> class Me(dict):
+        ...     deep = defaultdict(dict)
+        ...
+        ...     foo = _getitem_property('foo')
+        ...     deep_thing = _getitem_property('deep.thing')
+
+
+        >>> me = Me()
+        >>> me.foo
+        None
+
+        >>> me.foo = 10
+        >>> me.foo
+        10
+        >>> me['foo']
+        10
+
+        >>> me.deep_thing = 42
+        >>> me.deep_thing
+        42
+        >>> me.deep
+        defaultdict(<type 'dict'>, {'thing': 42})
+    """
+
+    def __init__(self, keypath, doc=None):
+        path, _, self.key = keypath.rpartition('.')
+        self.path = path.split('.') if path else None
+        self.__doc__ = doc
+
+    def _path(self, obj):
+        return (reduce(lambda d, k: d[k], [obj] + self.path) if self.path
+                else obj)
+
+    def __get__(self, obj, type=None):
+        if obj is None:
+            return type
+        return self._path(obj).get(self.key)
+
+    def __set__(self, obj, value):
+        self._path(obj)[self.key] = value

BIN
celery/utils/static/celery_128.png


+ 0 - 3
celery/utils/sysinfo.py

@@ -1,11 +1,8 @@
 # -*- coding: utf-8 -*-
 """System information utilities."""
 import os
-
 from collections import namedtuple
-
 from math import ceil
-
 from kombu.utils.objects import cached_property
 
 __all__ = ['load_average', 'load_average_t', 'df']

+ 0 - 2
celery/utils/term.py

@@ -5,11 +5,9 @@ import base64
 import os
 import platform
 import sys
-
 from functools import reduce
 from typing import Any, Tuple
 from typing import Callable, Mapping  # noqa
-
 from celery.platforms import isatty
 
 __all__ = ['colored']

+ 45 - 2
celery/utils/text.py

@@ -1,14 +1,12 @@
 # -*- coding: utf-8 -*-
 """Text formatting utilities."""
 import re
-
 from collections import Callable
 from functools import partial
 from pprint import pformat
 from textwrap import fill
 from typing import Any, ByteString, Mapping, Pattern, Sequence, Union
 
-
 __all__ = [
     'abbr', 'abbrtask', 'dedent', 'dedent_initial',
     'ensure_newlines', 'ensure_sep',
@@ -139,3 +137,48 @@ def simple_format(s: str, keys: Mapping[str, Any],
 
         return pattern.sub(resolve, s)
     return s
+
+
+def remove_repeating_from_task(task_name, s):
+    # type: (str, str) -> str
+    """Given task name, remove repeating module names.
+
+    Example:
+        >>> remove_repeating_from_task(
+        ...     'tasks.add',
+        ...     'tasks.add(2, 2), tasks.mul(3), tasks.div(4)')
+        'tasks.add(2, 2), mul(3), div(4)'
+    """
+    # This is used by e.g. repr(chain), to remove repeating module names.
+    #  - extract the module part of the task name
+    module = str(task_name).rpartition('.')[0] + '.'
+    return remove_repeating(module, s)
+
+
+def remove_repeating(substr, s):
+    # type: (str, str) -> str
+    """Remove repeating module names from string.
+
+    Arguments:
+        task_name (str): Task name (full path including module),
+            to use as the basis for removing module names.
+        s (str): The string we want to work on.
+
+    Example:
+
+        >>> _shorten_names(
+        ...    'x.tasks.add',
+        ...    'x.tasks.add(2, 2) | x.tasks.add(4) | x.tasks.mul(8)',
+        ... )
+        'x.tasks.add(2, 2) | add(4) | mul(8)'
+    """
+    # find the first occurrence of substr in the string.
+    index = s.find(substr)
+    if index >= 0:
+        return ''.join([
+            # leave the first occurance of substr untouched.
+            s[:index + len(substr)],
+            # strip seen substr from the rest of the string.
+            s[index + len(substr):].replace(substr, ''),
+        ])
+    return s

+ 2 - 1
celery/utils/time.py

@@ -250,7 +250,8 @@ def humanize_seconds(secs: numbers.Number,
             return '{0}{1}{2} {3}'.format(prefix, sep, formatter(w),
                                           pluralize(w, unit))
     if microseconds and secs > 0.0:
-        return '{prefix}{0:.2f} seconds'.format(secs, prefix=prefix)
+        return '{prefix}{sep}{0:.2f} seconds'.format(
+            secs, sep=sep, prefix=prefix)
     return now
 
 

+ 0 - 1
celery/worker/consumer/agent.py

@@ -1,6 +1,5 @@
 """Celery + :pypi:`cell` integration."""
 from celery import bootsteps
-
 from .connection import Connection
 
 __all__ = ['Agent']

+ 0 - 1
celery/worker/consumer/connection.py

@@ -1,6 +1,5 @@
 """Consumer Broker Connection Bootstep."""
 from kombu.common import ignore_errors
-
 from celery import bootsteps
 from celery.utils.log import get_logger
 

+ 2 - 7
celery/worker/consumer/consumer.py

@@ -188,7 +188,7 @@ class Consumer:
         self.reset_rate_limits()
 
         self.hub = hub
-        if self.hub:
+        if self.hub or getattr(self.pool, 'is_green', False):
             self.amqheartbeat = amqheartbeat
             if self.amqheartbeat is None:
                 self.amqheartbeat = self.app.conf.broker_heartbeat
@@ -366,7 +366,7 @@ class Consumer:
         doesn't enter a loop.
 
         Arguments:
-            message (Message): The message received.
+            message (kombu.Message): The message received.
             exc (Exception): The exception being handled.
         """
         crit(MESSAGE_DECODE_ERROR,
@@ -553,11 +553,6 @@ class Consumer:
                     )
                 except InvalidTaskError as exc:
                     return on_invalid_task(payload, message, exc)
-                except MemoryError:
-                    raise
-                except Exception as exc:  # pylint: disable=broad-except
-                    # XXX handle as internal error?
-                    return on_invalid_task(payload, message, exc)
 
         return on_task_received
 

+ 0 - 2
celery/worker/consumer/control.py

@@ -6,9 +6,7 @@ The actual commands are implemented in :mod:`celery.worker.control`.
 """
 from celery import bootsteps
 from celery.utils.log import get_logger
-
 from celery.worker import pidbox
-
 from .tasks import Tasks
 
 __all__ = ['Control']

+ 0 - 2
celery/worker/consumer/events.py

@@ -3,9 +3,7 @@
 ``Events`` -> :class:`celery.events.EventDispatcher`.
 """
 from kombu.common import ignore_errors
-
 from celery import bootsteps
-
 from .connection import Connection
 
 __all__ = ['Events']

+ 0 - 2
celery/worker/consumer/heart.py

@@ -1,8 +1,6 @@
 """Worker Event Heartbeat Bootstep."""
 from celery import bootsteps
-
 from celery.worker import heartbeat
-
 from .events import Events
 
 __all__ = ['Heart']

+ 0 - 1
celery/worker/consumer/mingle.py

@@ -1,7 +1,6 @@
 """Worker <-> Worker Sync at startup (Bootstep)."""
 from celery import bootsteps
 from celery.utils.log import get_logger
-
 from .events import Events
 
 __all__ = ['Mingle']

+ 0 - 3
celery/worker/consumer/tasks.py

@@ -1,13 +1,10 @@
 """Worker Task Consumer Bootstep."""
 from kombu.common import QoS, ignore_errors
-
 from celery import bootsteps
 from celery.utils.log import get_logger
-
 from .mingle import Mingle
 
 __all__ = ['Tasks']
-
 logger = get_logger(__name__)
 debug = logger.debug
 

+ 0 - 1
celery/worker/heartbeat.py

@@ -6,7 +6,6 @@ at regular intervals (may not be an actual thread).
 """
 from celery.signals import heartbeat_sent
 from celery.utils.sysinfo import load_average
-
 from .state import SOFTWARE_INFO, active_requests, all_total_count
 
 __all__ = ['Heart']

+ 8 - 7
celery/worker/loops.py

@@ -1,11 +1,9 @@
 """The consumers highly-optimized inner loop."""
 import errno
 import socket
-
 from celery import bootsteps
 from celery.exceptions import WorkerShutdown, WorkerTerminate, WorkerLostError
 from celery.utils.log import get_logger
-
 from . import state
 
 __all__ = ['asynloop', 'synloop']
@@ -26,10 +24,11 @@ def _quick_drain(connection, timeout=0.1):
 
 
 def _enable_amqheartbeats(timer, connection, rate=2.0):
-    tick = connection.heartbeat_check
-    heartbeat = connection.get_heartbeat_interval()  # negotiated
-    if heartbeat and connection.supports_heartbeats:
-        timer.call_repeatedly(heartbeat / rate, tick, rate)
+    if connection:
+        tick = connection.heartbeat_check
+        heartbeat = connection.get_heartbeat_interval()  # negotiated
+        if heartbeat and connection.supports_heartbeats:
+            timer.call_repeatedly(heartbeat / rate, tick, (rate,))
 
 
 def asynloop(obj, connection, consumer, blueprint, hub, qos,
@@ -41,7 +40,7 @@ def asynloop(obj, connection, consumer, blueprint, hub, qos,
 
     on_task_received = obj.create_task_handler()
 
-    _enable_amqheartbeats(hub, connection, rate=hbrate)
+    _enable_amqheartbeats(hub.timer, connection, rate=hbrate)
 
     consumer.on_message = on_task_received
     consumer.consume()
@@ -102,6 +101,8 @@ def synloop(obj, connection, consumer, blueprint, hub, qos,
     RUN = bootsteps.RUN
     on_task_received = obj.create_task_handler()
     perform_pending_operations = obj.perform_pending_operations
+    if getattr(obj.pool, 'is_green', False):
+        _enable_amqheartbeats(obj.timer, connection, rate=hbrate)
     consumer.on_message = on_task_received
     consumer.consume()
 

+ 0 - 3
celery/worker/pidbox.py

@@ -1,14 +1,11 @@
 """Worker Pidbox (remote control)."""
 import socket
 import threading
-
 from kombu.common import ignore_errors
 from kombu.utils.encoding import safe_str
-
 from celery.utils.collections import AttributeDict
 from celery.utils.functional import pass1
 from celery.utils.log import get_logger
-
 from . import control
 
 __all__ = ['Pidbox', 'gPidbox']

+ 1 - 1
celery/worker/request.py

@@ -363,7 +363,7 @@ class Request:
             )
         # (acks_late) acknowledge after result stored.
         if self.task.acks_late:
-            requeue = self.delivery_info.get('redelivered', None) is False
+            requeue = not self.delivery_info.get('redelivered')
             reject = (
                 self.task.reject_on_worker_lost and
                 isinstance(exc, WorkerLostError)

Some files were not shown because too many files changed in this diff