Browse Source

Merge branch 'master' into 3.1

Ask Solem 11 years ago
parent
commit
2c5bac7ffe
78 changed files with 1396 additions and 440 deletions
  1. 29 9
      .travis.yml
  2. 59 32
      CONTRIBUTING.rst
  3. 1 0
      CONTRIBUTORS.txt
  4. 130 0
      Changelog
  5. 5 5
      README.rst
  6. 3 1
      celery/app/amqp.py
  7. 2 3
      celery/app/base.py
  8. 14 22
      celery/app/builtins.py
  9. 1 2
      celery/app/control.py
  10. 2 1
      celery/app/defaults.py
  11. 12 7
      celery/app/log.py
  12. 7 3
      celery/app/task.py
  13. 12 7
      celery/app/trace.py
  14. 2 1
      celery/apps/beat.py
  15. 3 3
      celery/apps/worker.py
  16. 16 8
      celery/backends/amqp.py
  17. 71 34
      celery/backends/base.py
  18. 76 4
      celery/backends/redis.py
  19. 31 13
      celery/beat.py
  20. 5 17
      celery/bin/base.py
  21. 2 1
      celery/bin/celery.py
  22. 12 0
      celery/bin/multi.py
  23. 2 2
      celery/bin/worker.py
  24. 17 4
      celery/canvas.py
  25. 7 2
      celery/concurrency/prefork.py
  26. 11 4
      celery/events/state.py
  27. 5 2
      celery/exceptions.py
  28. 29 0
      celery/local.py
  29. 2 1
      celery/platforms.py
  30. 88 22
      celery/result.py
  31. 3 2
      celery/schedules.py
  32. 1 1
      celery/tests/app/test_app.py
  33. 36 1
      celery/tests/app/test_beat.py
  34. 28 8
      celery/tests/app/test_log.py
  35. 11 10
      celery/tests/backends/test_amqp.py
  36. 13 10
      celery/tests/backends/test_base.py
  37. 2 2
      celery/tests/backends/test_cache.py
  38. 102 74
      celery/tests/backends/test_redis.py
  39. 5 5
      celery/tests/bin/test_base.py
  40. 57 11
      celery/tests/case.py
  41. 1 1
      celery/tests/security/test_certificate.py
  42. 1 1
      celery/tests/security/test_key.py
  43. 6 7
      celery/tests/tasks/test_result.py
  44. 1 1
      celery/tests/tasks/test_trace.py
  45. 6 6
      celery/tests/utils/test_functional.py
  46. 26 2
      celery/tests/utils/test_local.py
  47. 1 0
      celery/tests/worker/test_consumer.py
  48. 5 1
      celery/tests/worker/test_hub.py
  49. 27 1
      celery/tests/worker/test_worker.py
  50. 40 10
      celery/utils/__init__.py
  51. 5 3
      celery/utils/debug.py
  52. 16 0
      celery/utils/dispatch/signal.py
  53. 5 2
      celery/utils/functional.py
  54. 2 3
      celery/utils/iso8601.py
  55. 8 1
      celery/utils/log.py
  56. 2 1
      celery/utils/timeutils.py
  57. 5 0
      celery/worker/consumer.py
  58. 1 1
      celery/worker/heartbeat.py
  59. 2 2
      celery/worker/job.py
  60. 1 0
      celery/worker/pidbox.py
  61. 13 7
      docs/configuration.rst
  62. 34 11
      docs/contributing.rst
  63. 1 1
      docs/django/first-steps-with-django.rst
  64. 17 0
      docs/getting-started/brokers/redis.rst
  65. 1 1
      docs/getting-started/first-steps-with-celery.rst
  66. 4 0
      docs/reference/celery.rst
  67. 1 1
      docs/userguide/tasks.rst
  68. 58 0
      docs/userguide/workers.rst
  69. 6 2
      examples/app/myapp.py
  70. 20 17
      examples/celery_http_gateway/settings.py
  71. 104 14
      extra/release/sphinx-to-rst.py
  72. 13 0
      funtests/stress/stress/app.py
  73. 13 2
      funtests/stress/stress/templates.py
  74. 20 1
      pavement.py
  75. 1 1
      requirements/default.txt
  76. 1 1
      setup.cfg
  77. 1 0
      setup.py
  78. 13 4
      tox.ini

+ 29 - 9
.travis.yml

@@ -1,22 +1,42 @@
 language: python
-python:
-  - 2.6
-  - 2.7
-  - 3.3
-  - pypy
+python: 2.7
+env:
+  global:
+    PYTHONUNBUFFERED=yes
+  matrix:
+    - TOXENV=2.6 
+    - TOXENV=2.7 
+    - TOXENV=3.3 
+    - TOXENV=3.4 
+    - TOXENV=pypy
 before_install:
   - |
-    deactivate
-    if python --version |& grep PyPy; then
+    if [[ $TOXENV = pypy ]]; then
+      deactivate
       sudo apt-add-repository --yes ppa:pypy/ppa
       sudo apt-get update
       sudo apt-get install pypy
       source ~/virtualenv/pypy/bin/activate
     fi
+    if [[ $TOXENV = 3.4 ]]; then
+      sudo apt-get update
+      sudo apt-get install python3.4-dev
+      source ~/virtualenv/python3.4
+      virtualenv ~/virtualenv/python3.4 --python=$(which python3.4)
+      source ~/virtualenv/python3.4/bin/activate
+    fi
     python --version
     uname -a
     lsb_release -a
-    sudo pip install tox
-script: tox -v -e $TRAVIS_PYTHON_VERSION -- -v
+install:
+  - pip install tox
+script:
+  - tox -v -- -v
 after_success:
   - .tox/$TRAVIS_PYTHON_VERSION/bin/coveralls
+notifications:
+  irc:
+    channels:
+      - "chat.freenode.net#celery"
+    on_success: always
+    on_failure: always

+ 59 - 32
CONTRIBUTING.rst

@@ -161,7 +161,7 @@ If you'd like to submit the information encrypted our PGP key is::
 Other bugs
 ----------
 
-Bugs can always be described to the ``mailing-list``, but the best
+Bugs can always be described to the `mailing-list`_, but the best
 way to report an issue and to ensure a timely response is to use the
 issue tracker.
 
@@ -175,7 +175,7 @@ and participate in the discussion.
 2) **Determine if your bug is really a bug.**
 
 You should not file a bug if you are requesting support.  For that you can use
-the ``mailing-list``, or ``irc-channel``.
+the `mailing-list`_, or `irc-channel`_.
 
 3) **Make sure your bug hasn't already been reported.**
 
@@ -206,15 +206,14 @@ spelling or other errors on the website/docs/code.
        hard to get or might not be that useful. Try to inspect the process to
        get more diagnostic data. Some ideas:
 
-       * Enable celery's ``breakpoint signal <breakpoint_signal>`` and use it
-         to inspect the process's state. This will allow you to open a ``pdb``
-         session.
+       * Enable celery's ``breakpoint_signal`` and use it
+         to inspect the process's state.  This will allow you to open a
+         ``pdb`` session.
        * Collect tracing data using strace_(Linux), dtruss (OSX) and ktrace(BSD),
          ltrace_ and lsof_.
 
     D) Include the output from the `celery report` command:
-
-        .. code-block:: bash
+        ::
 
             $ celery -A proj report
 
@@ -251,7 +250,7 @@ issue tracker.
 * Django-Celery: http://github.com/celery/django-celery/issues
 
 If you are unsure of the origin of the bug you can ask the
-``mailing-list``, or just use the Celery issue tracker.
+`mailing-list`_, or just use the Celery issue tracker.
 
 Contributors guide to the codebase
 ==================================
@@ -259,7 +258,7 @@ Contributors guide to the codebase
 There's a separate section for internal details,
 including details about the codebase and a style guide.
 
-Read ``internals-guide`` for more!
+Read `internals-guide`_ for more!
 
 .. _versions:
 
@@ -272,7 +271,7 @@ semver: http://semver.org.
 
 Stable releases are published at PyPI
 while development releases are only available in the GitHub git repository as tags.
-All version tags starts with “v”, so version 0.8.0 is the tag v0.8.0.
+All version tags starts with "v", so version 0.8.0 is the tag v0.8.0.
 
 .. _git-branches:
 
@@ -410,11 +409,7 @@ to upstream changes:
 ::
 
     $ cd celery
-::
-
     $ git remote add upstream git://github.com/celery/celery.git
-::
-
     $ git fetch upstream
 
 If you need to pull in new changes from upstream you should
@@ -433,8 +428,6 @@ fetch and checkout a remote branch like this::
 
     git checkout --track -b 3.0-devel origin/3.0-devel
 
-For a list of branches see ``git-branches``.
-
 .. _`Fork a Repo`: http://help.github.com/fork-a-repo/
 .. _`Rebasing merge commits in git`:
     http://notes.envato.com/developers/rebasing-merge-commits-in-git/
@@ -506,6 +499,13 @@ the steps outlined here: http://bit.ly/koJoso
 Calculating test coverage
 ~~~~~~~~~~~~~~~~~~~~~~~~~
 
+To calculate test coverage you must first install the ``coverage`` module.
+
+Installing the ``coverage`` module:
+::
+
+    $ pip install -U coverage
+
 Code coverage in HTML:
 ::
 
@@ -606,21 +606,32 @@ it should be located in ``docs/reference/``.
 
 For example if reference is missing for the module ``celery.worker.awesome``
 and this module is considered part of the public API, use the following steps:
+
+
+Use an existing file as a template:
 ::
 
     $ cd docs/reference/
     $ cp celery.schedules.rst celery.worker.awesome.rst
+
+Edit the file using your favorite editor:
 ::
 
     $ vim celery.worker.awesome.rst
 
         # change every occurance of ``celery.schedules`` to
         # ``celery.worker.awesome``
+
+
+Edit the index using your favorite editor:
 ::
 
     $ vim index.rst
 
         # Add ``celery.worker.awesome`` to the index.
+
+
+Commit your changes:
 ::
 
     # Add the file to git
@@ -650,8 +661,7 @@ is following the conventions.
   style.
 
     Do this:
-
-    .. code-block:: python
+    ::
 
         def method(self, arg):
             """Short description.
@@ -661,16 +671,14 @@ is following the conventions.
             """
 
     or:
-
-    .. code-block:: python
+    ::
 
         def method(self, arg):
             """Short description."""
 
 
     but not this:
-
-    .. code-block:: python
+    ::
 
         def method(self, arg):
             """
@@ -682,8 +690,7 @@ is following the conventions.
 * Lines should not exceed 78 columns.
 
   You can enforce this in ``vim`` by setting the ``textwidth`` option:
-
-  .. code-block:: vim
+  ::
 
         set textwidth=78
 
@@ -709,8 +716,7 @@ is following the conventions.
     Within these sections the imports should be sorted by module name.
 
     Example:
-
-    .. code-block:: python
+    ::
 
         import threading
         import time
@@ -753,7 +759,9 @@ is following the conventions.
 
 * Note that we use "new-style` relative imports when the distribution
   does not support Python versions below 2.5
-::
+
+    This requires Python 2.5 or later:
+    ::
 
         from . import submodule
 
@@ -796,13 +804,12 @@ that require 3rd party libraries must be added.
 
 3) Document the new feature in ``docs/includes/installation.txt``
 
-    You must add your feature to the list in the ``bundles`` section
+    You must add your feature to the list in the `bundles`_ section
     of ``docs/includes/installation.txt``.
 
     After you've made changes to this file you need to render
     the distro ``README`` file:
-
-    .. code-block:: bash
+    ::
 
         $ pip install -U requirements/pkgutils.txt
         $ paver readme
@@ -826,7 +833,7 @@ regarding the official git repositories, PyPI packages
 Read the Docs pages.
 
 If the issue is not an emergency then it is better
-to ``report an issue <reporting-bugs>``.
+to `report an issue`_.
 
 
 Committers
@@ -893,6 +900,16 @@ Messaging library.
 :PyPI: http://pypi.python.org/pypi/kombu
 :docs: http://kombu.readthedocs.org
 
+amqp
+----
+
+Python AMQP 0.9.1 client.
+
+:git: https://github.com/celery/py-amqp
+:CI: http://travis-ci.org/#!/celery/py-amqp
+:PyPI: http://pypi.python.org/pypi/amqp
+:docs: http://amqp.readthedocs.org
+
 billiard
 --------
 
@@ -977,7 +994,7 @@ Deprecated
 
 Old name for ``librabbitmq``.
 
-``None``
+:git: ``None``
 :PyPI: http://pypi.python.org/pypi/pylibrabbitmq
 
 .. _release-procedure:
@@ -1036,3 +1053,13 @@ following:
 
 * Also add the previous version under the "versions" tab.
 
+.. _`mailing-list`: http://groups.google.com/group/celery-users
+
+.. _`irc-channel`: http://docs.celeryproject.org/en/latest/getting-started/resources.html#irc
+
+.. _`internals-guide`: http://docs.celeryproject.org/en/latest/internals/guide.html
+
+.. _`bundles`: http://docs.celeryproject.org/en/latest/getting-started/introduction.html#bundles
+
+.. _`report an issue`: http://docs.celeryproject.org/en/latest/contributing.html#reporting-bugs
+

+ 1 - 0
CONTRIBUTORS.txt

@@ -157,3 +157,4 @@ Pepijn de Vos, 2014/01/15
 Dan McGee, 2014/01/27
 Paul Kilgo, 2014/01/28
 Martin Davidsson, 2014/02/08
+Chris Clark, 2014/02/20

+ 130 - 0
Changelog

@@ -8,6 +8,136 @@ This document contains change notes for bugfix releases in the 3.1.x series
 (Cipater), please see :ref:`whatsnew-3.1` for an overview of what's
 new in Celery 3.1.
 
+.. _version-3.1.10:
+
+3.1.10
+======
+:release-date: 2014-XX-XX XX:XX X.X UTC
+:release-by: XX
+
+- **Requirements**:
+
+    - Now depends on :ref:`Kombu 3.0.14 <kombu:version-3.0.14>`.
+
+- **Redis:** Important note about events (Issue #1882).
+
+    There is a new transport option for Redis that enables monitors
+    to filter out unwanted events.  Enabling this option in the workers
+    will increase performance considerably:
+
+    .. code-block:: python
+
+        BROKER_TRANSPORT_OPTIONS = {'fanout_patterns': True}
+
+    Enabling this option means that your workers will not be able to see
+    workers with the option disabled (or is running an older version of
+    Celery), so if you do enable it then make sure you do so on all
+    nodes.
+
+    See :ref:`redis-caveats-fanout-patterns`.
+
+    This will be the default in Celery 3.2.
+
+- **Results**: The :class:`@AsyncResult` object now keeps a local cache
+  of the final state of the task.
+
+    This means that the global result cache can finally be disabled,
+    and you can do so by setting :setting:`CELERY_MAX_CACHED_RESULTS` to
+    :const:`-1`.  The lifetime of the cache will then be bound to the
+    lifetime of the result object, and this will be the default behavior
+    in Celery 3.2.
+
+- **Events**: The "Substantial drift" warning message is now logged once
+  per node name only (Issue #1802).
+
+- **Worker**: Ability to use one log file per child process when using the
+  prefork pool.
+
+    This can be enabled by using the new ``%i`` and ``%I`` format specifiers
+    for the log file name.  See :ref:`worker-files-process-index`.
+
+- **Redis**: New experimental chord join implementation.
+
+    This is an optimization for chords when using the Redis result backend,
+    where the join operation is now considerably faster and using less
+    resources than the previous strategy.
+
+    The new option can be set in the result backend URL:
+
+        CELERY_RESULT_BACKEND = 'redis://localhost?new_join=1'
+
+    This must be enabled manually as it's incompatible
+    with workers and clients not using it, so be sure to enable
+    the option in all clients and workers if you decide to use it.
+
+- **Multi**: With ``-opt:index`` (e.g. ``-c:1``) the index now always refers
+  to the position of a node in the argument list.
+
+    This means that referring to a number will work when specifying a list
+    of node names and not just for a number range:
+
+    .. code-block:: bash
+
+        celery multi start A B C D -c:1 4 -c:2-4 8
+
+    In this example ``1`` refers to node A (as it's the first node in the
+    list).
+
+- **Signals**: The sender argument to ``Signal.connect`` can now be a proxy
+  object, which means that it can be used with the task decorator
+  (Issue #1873).
+
+- **Task**: A regression caused the ``queue`` argument to ``Task.retry`` to be
+  ignored (Issue #1892).
+
+- **App**: Fixed error message for :meth:`~@Celery.config_from_envvar`.
+
+    Fix contributed by Dmitry Malinovsky.
+
+- **Canvas**: Chords can now contain a group of other chords (Issue #1921).
+
+- **Canvas**: Chords can now be combined when using the amqp result backend
+  (a chord where the callback is also a chord).
+
+- **Canvas**: Calling ``result.get()`` for a chain task will now complete
+  even if one of the tasks in the chain is ``ignore_result=True``
+  (Issue #1905).
+
+- **Canvas**: Worker now also logs chord errors.
+
+- **Canvas**: A chord task raising an exception will now result in
+  any errbacks (``link_error``) to the chord callback to also be called.
+
+- **Task**: Task callbacks and errbacks are now called using the group
+  primitive.
+
+- **Task**: ``Task.apply`` now properly sets ``request.headers``
+  (Issue #1874).
+
+- **Worker**: Fixed ``UnicodeEncodeError`` occuring when worker is started
+  by `supervisord`.
+
+    Fix contributed by Codeb Fan.
+
+- **Beat**: No longer attempts to upgrade a newly created database file
+  (Issue #1923).
+
+- **Beat**: New setting :setting:``CELERYBEAT_SYNC_EVERY`` can be be used
+  to control file sync by specifying the number of tasks to send between
+  each sync.
+
+    Contributed by Chris Clark.
+
+- **Commands**: :program:`celery inspect memdump` no longer crashes
+  if the :mod:`psutil` module is not installed (Issue #1914).
+
+- **Worker**: Remote control commands now always accepts json serialized
+  messages (Issue #1870).
+
+- **Worker**: Gossip will now drop any task related events it receives
+  by mistake (Issue #1882).
+
+
 .. _version-3.1.9:
 
 3.1.9

+ 5 - 5
README.rst

@@ -81,8 +81,8 @@ getting started tutorials:
 .. _`Next steps`:
     http://docs.celeryproject.org/en/latest/getting-started/next-steps.html
 
-Celery is
-==========
+Celery is...
+============
 
 - **Simple**
 
@@ -119,8 +119,8 @@ Celery is…
     Custom pool implementations, serializers, compression schemes, logging,
     schedulers, consumers, producers, autoscalers, broker transports and much more.
 
-It supports
-============
+It supports...
+==============
 
     - **Message Transports**
 
@@ -128,7 +128,7 @@ It supports…
         - MongoDB_ (experimental), Amazon SQS (experimental),
         - CouchDB_ (experimental), SQLAlchemy_ (experimental),
         - Django ORM (experimental), `IronMQ`_
-        - and more
+        - and more...
 
     - **Concurrency**
 

+ 3 - 1
celery/app/amqp.py

@@ -8,6 +8,8 @@
 """
 from __future__ import absolute_import
 
+import numbers
+
 from datetime import timedelta
 from weakref import WeakValueDictionary
 
@@ -252,7 +254,7 @@ class TaskProducer(Producer):
             eta = now + timedelta(seconds=countdown)
             if self.utc:
                 eta = to_utc(eta).astimezone(self.app.timezone)
-        if isinstance(expires, (int, float)):
+        if isinstance(expires, numbers.Real):
             now = now or self.app.now()
             expires = now + timedelta(seconds=expires)
             if self.utc:

+ 2 - 3
celery/app/base.py

@@ -33,7 +33,6 @@ from celery.exceptions import AlwaysEagerIgnored, ImproperlyConfigured
 from celery.five import items, values
 from celery.loaders import get_loader_cls
 from celery.local import PromiseProxy, maybe_evaluate
-from celery.utils import shadowsig
 from celery.utils.functional import first, maybe_list
 from celery.utils.imports import instantiate, symbol_by_name
 from celery.utils.objects import mro_lookup
@@ -238,7 +237,6 @@ class Celery(object):
             '__doc__': fun.__doc__,
             '__module__': fun.__module__,
             '__wrapped__': fun}, **options))()
-        shadowsig(T, fun)  # for inspect.getargspec
         task = self._tasks[T.name]  # return global instance.
         return task
 
@@ -275,7 +273,8 @@ class Celery(object):
         if not module_name:
             if silent:
                 return False
-            raise ImproperlyConfigured(ERR_ENVVAR_NOT_SET.format(module_name))
+            raise ImproperlyConfigured(
+                ERR_ENVVAR_NOT_SET.format(variable_name))
         return self.config_from_object(module_name, silent=silent, force=force)
 
     def config_from_cmdline(self, argv, namespace='celery'):

+ 14 - 22
celery/app/builtins.py

@@ -13,9 +13,12 @@ from collections import deque
 
 from celery._state import get_current_worker_task
 from celery.utils import uuid
+from celery.utils.log import get_logger
 
 __all__ = ['shared_task', 'load_shared_tasks']
 
+logger = get_logger(__name__)
+
 #: global list of functions defining tasks that should be
 #: added to all apps.
 _shared_tasks = set()
@@ -105,16 +108,17 @@ def add_unlock_chord_task(app):
                     )
                 except StopIteration:
                     reason = repr(exc)
-
-                app._tasks[callback.task].backend.fail_from_current_stack(
-                    callback.id, exc=ChordError(reason),
-                )
+                logger.error('Chord %r raised: %r', group_id, exc, exc_info=1)
+                app.backend.chord_error_from_stack(callback,
+                                                   ChordError(reason))
             else:
                 try:
                     callback.delay(ret)
                 except Exception as exc:
-                    app._tasks[callback.task].backend.fail_from_current_stack(
-                        callback.id,
+                    logger.error('Chord %r raised: %r', group_id, exc,
+                                 exc_info=1)
+                    app.backend.chord_error_from_stack(
+                        callback,
                         exc=ChordError('Callback error: {0!r}'.format(exc)),
                     )
         else:
@@ -179,7 +183,7 @@ def add_group_task(app):
                     [stask.apply(group_id=group_id) for stask in taskit],
                 )
             with app.producer_or_acquire() as pub:
-                [stask.apply_async(group_id=group_id, publisher=pub,
+                [stask.apply_async(group_id=group_id, producer=pub,
                                    add_to_parent=False) for stask in taskit]
             parent = get_current_worker_task()
             if parent:
@@ -340,20 +344,18 @@ def add_chord_task(app):
             app = self.app
             propagate = default_propagate if propagate is None else propagate
             group_id = uuid()
-            AsyncResult = app.AsyncResult
-            prepare_member = self._prepare_member
 
             # - convert back to group if serialized
             tasks = header.tasks if isinstance(header, group) else header
             header = group([
                 maybe_signature(s, app=app).clone() for s in tasks
-            ])
+            ], app=self.app)
             # - eager applies the group inline
             if eager:
                 return header.apply(args=partial_args, task_id=group_id)
 
-            results = [AsyncResult(prepare_member(task, body, group_id))
-                       for task in header.tasks]
+            body.setdefault('chord_size', len(header.tasks))
+            results = header.freeze(group_id=group_id, chord=body).results
 
             return self.backend.apply_chord(
                 header, partial_args, group_id,
@@ -361,16 +363,6 @@ def add_chord_task(app):
                 max_retries=max_retries, propagate=propagate, result=results,
             )
 
-        def _prepare_member(self, task, body, group_id):
-            opts = task.options
-            # d.setdefault would work but generating uuid's are expensive
-            try:
-                task_id = opts['task_id']
-            except KeyError:
-                task_id = opts['task_id'] = uuid()
-            opts.update(chord=body, group_id=group_id)
-            return task_id
-
         def apply_async(self, args=(), kwargs={}, task_id=None,
                         group_id=None, chord=None, **options):
             app = self.app

+ 1 - 2
celery/app/control.py

@@ -125,8 +125,7 @@ class Control(object):
 
     def __init__(self, app=None):
         self.app = app
-        self.mailbox = self.Mailbox('celery', type='fanout',
-                                    accept=self.app.conf.CELERY_ACCEPT_CONTENT)
+        self.mailbox = self.Mailbox('celery', type='fanout', accept=['json'])
 
     @cached_property
     def inspect(self):

+ 2 - 1
celery/app/defaults.py

@@ -124,7 +124,7 @@ NAMESPACES = {
         'IMPORTS': Option((), type='tuple'),
         'INCLUDE': Option((), type='tuple'),
         'IGNORE_RESULT': Option(False, type='bool'),
-        'MAX_CACHED_RESULTS': Option(5000, type='int'),
+        'MAX_CACHED_RESULTS': Option(100, type='int'),
         'MESSAGE_COMPRESSION': Option(type='string'),
         'MONGODB_BACKEND_SETTINGS': Option(type='dict'),
         'REDIS_HOST': Option(type='string', **_REDIS_OLD),
@@ -196,6 +196,7 @@ NAMESPACES = {
         'SCHEDULE': Option({}, type='dict'),
         'SCHEDULER': Option('celery.beat:PersistentScheduler'),
         'SCHEDULE_FILENAME': Option('celerybeat-schedule'),
+        'SYNC_EVERY': Option(0, type='int'),
         'MAX_LOOP_INTERVAL': Option(0, type='float'),
         'LOG_LEVEL': Option('INFO', deprecate_by='2.4', remove_by='4.0',
                             alt='--loglevel argument'),

+ 12 - 7
celery/app/log.py

@@ -24,7 +24,7 @@ from kombu.utils.encoding import set_default_encoding_file
 from celery import signals
 from celery._state import get_current_task
 from celery.five import class_property, string_t
-from celery.utils import isatty
+from celery.utils import isatty, node_format
 from celery.utils.log import (
     get_logger, mlevel,
     ColorFormatter, ensure_process_aware_logger,
@@ -65,9 +65,9 @@ class Logging(object):
         self.colorize = self.app.conf.CELERYD_LOG_COLOR
 
     def setup(self, loglevel=None, logfile=None, redirect_stdouts=False,
-              redirect_level='WARNING', colorize=None):
+              redirect_level='WARNING', colorize=None, hostname=None):
         handled = self.setup_logging_subsystem(
-            loglevel, logfile, colorize=colorize,
+            loglevel, logfile, colorize=colorize, hostname=hostname,
         )
         if not handled:
             if redirect_stdouts:
@@ -87,10 +87,12 @@ class Logging(object):
             CELERY_LOG_REDIRECT_LEVEL=str(loglevel or ''),
         )
 
-    def setup_logging_subsystem(self, loglevel=None, logfile=None,
-                                format=None, colorize=None, **kwargs):
+    def setup_logging_subsystem(self, loglevel=None, logfile=None, format=None,
+                                colorize=None, hostname=None, **kwargs):
         if self.already_setup:
             return
+        if logfile and hostname:
+            logfile = node_format(logfile, hostname)
         self.already_setup = True
         loglevel = mlevel(loglevel or self.loglevel)
         format = format or self.format
@@ -107,6 +109,9 @@ class Logging(object):
 
             if self.app.conf.CELERYD_HIJACK_ROOT_LOGGER:
                 root.handlers = []
+                get_logger('celery').handlers = []
+                get_logger('celery.task').handlers = []
+                get_logger('celery.redirected').handlers = []
 
             # Configure root logger
             self._configure_logger(
@@ -228,8 +233,8 @@ class Logging(object):
         return WatchedFileHandler(logfile)
 
     def _has_handler(self, logger):
-        return (logger.handlers and
-                not isinstance(logger.handlers[0], NullHandler))
+        if logger.handlers:
+            return any(not isinstance(h, NullHandler) for h in logger.handlers)
 
     def _is_configured(self, logger):
         return self._has_handler(logger) and not getattr(

+ 7 - 3
celery/app/task.py

@@ -556,12 +556,12 @@ class Task(object):
         )
 
     def subtask_from_request(self, request=None, args=None, kwargs=None,
-                             **extra_options):
+                             queue=None, **extra_options):
         request = self.request if request is None else request
         args = request.args if args is None else args
         kwargs = request.kwargs if kwargs is None else kwargs
         limit_hard, limit_soft = request.timelimit or (None, None)
-        options = dict({
+        options = {
             'task_id': request.id,
             'link': request.callbacks,
             'link_error': request.errbacks,
@@ -569,7 +569,10 @@ class Task(object):
             'chord': request.chord,
             'soft_time_limit': limit_soft,
             'time_limit': limit_hard,
-        }, **request.delivery_info or {})
+        }
+        options.update(
+            {'queue': queue} if queue else (request.delivery_info or {})
+        )
         return self.subtask(args, kwargs, options, type=self, **extra_options)
 
     def retry(self, args=None, kwargs=None, exc=None, throw=True,
@@ -710,6 +713,7 @@ class Task(object):
                    'loglevel': options.get('loglevel', 0),
                    'callbacks': maybe_list(link),
                    'errbacks': maybe_list(link_error),
+                   'headers': options.get('headers'),
                    'delivery_info': {'is_eager': True}}
         if self.accept_magic_kwargs:
             default_kwargs = {'task_name': task.name,

+ 12 - 7
celery/app/trace.py

@@ -25,7 +25,7 @@ from billiard.einfo import ExceptionInfo
 from kombu.exceptions import EncodeError
 from kombu.utils import kwdict
 
-from celery import current_app
+from celery import current_app, group
 from celery import states, signals
 from celery._state import _task_stack
 from celery.app import set_default_app
@@ -200,8 +200,10 @@ def build_tracer(name, task, loader=None, hostname=None, store_errors=True,
         I = Info(state, exc)
         R = I.handle_error_state(task, eager=eager)
         if call_errbacks:
-            [signature(errback, app=app).apply_async((uuid, ))
-             for errback in request.errbacks or []]
+            group(
+                [signature(errback, app=app)
+                 for errback in request.errbacks or []], app=app,
+            ).apply_async((uuid, ))
         return I, R, I.state, I.retval
 
     def trace_task(uuid, args, kwargs, request=None):
@@ -255,8 +257,11 @@ def build_tracer(name, task, loader=None, hostname=None, store_errors=True,
                     try:
                         # callback tasks must be applied before the result is
                         # stored, so that result.children is populated.
-                        [signature(callback, app=app).apply_async((retval, ))
-                            for callback in task_request.callbacks or []]
+                        group(
+                            [signature(callback, app=app)
+                             for callback in task.request.callbacks or []],
+                            app=app,
+                        ).apply_async((retval, ))
                         if publish_result:
                             store_result(
                                 uuid, retval, SUCCESS, request=task_request,
@@ -272,7 +277,7 @@ def build_tracer(name, task, loader=None, hostname=None, store_errors=True,
                 # -* POST *-
                 if state not in IGNORE_STATES:
                     if task_request.chord:
-                        on_chord_part_return(task)
+                        on_chord_part_return(task, state, R)
                     if task_after_return:
                         task_after_return(
                             state, retval, uuid, args, kwargs, None,
@@ -336,7 +341,7 @@ def eager_trace_task(task, uuid, args, kwargs, request=None, **opts):
 def report_internal_error(task, exc):
     _type, _value, _tb = sys.exc_info()
     try:
-        _value = task.backend.prepare_exception(exc)
+        _value = task.backend.prepare_exception(exc, 'pickle')
         exc_info = ExceptionInfo((_type, _value, _tb), internal=True)
         warn(RuntimeWarning(
             'Exception raised outside body: {0!r}:\n{1}'.format(

+ 2 - 1
celery/apps/beat.py

@@ -12,6 +12,7 @@
 """
 from __future__ import absolute_import, unicode_literals
 
+import numbers
 import socket
 import sys
 
@@ -66,7 +67,7 @@ class Beat(object):
         )
         self.pidfile = pidfile
 
-        if not isinstance(self.loglevel, int):
+        if not isinstance(self.loglevel, numbers.Integral):
             self.loglevel = LOG_LEVELS[self.loglevel.upper()]
 
     def _getopt(self, key, value):

+ 3 - 3
celery/apps/worker.py

@@ -165,10 +165,10 @@ class Worker(WorkController):
 
         # Dump configuration to screen so we have some basic information
         # for when users sends bug reports.
-        print(''.join([
+        print(safe_str(''.join([
             string(self.colored.cyan(' \n', self.startup_info())),
             string(self.colored.reset(self.extra_info() or '')),
-        ]), file=sys.__stdout__)
+        ])), file=sys.__stdout__)
         self.set_process_status('-active-')
         self.install_platform_tweaks(self)
 
@@ -181,7 +181,7 @@ class Worker(WorkController):
             colorize = not self.no_color
         return self.app.log.setup(
             self.loglevel, self.logfile,
-            redirect_stdouts=False, colorize=colorize,
+            redirect_stdouts=False, colorize=colorize, hostname=self.hostname,
         )
 
     def purge_messages(self):

+ 16 - 8
celery/backends/amqp.py

@@ -141,6 +141,7 @@ class AMQPBackend(BaseBackend):
         return [self._create_binding(task_id)]
 
     def wait_for(self, task_id, timeout=None, cache=True, propagate=True,
+                 no_ack=True, on_interval=None,
                  READY_STATES=states.READY_STATES,
                  PROPAGATE_STATES=states.PROPAGATE_STATES,
                  **kwargs):
@@ -150,7 +151,8 @@ class AMQPBackend(BaseBackend):
             meta = cached_meta
         else:
             try:
-                meta = self.consume(task_id, timeout=timeout)
+                meta = self.consume(task_id, timeout=timeout, no_ack=no_ack,
+                                    on_interval=on_interval)
             except socket.timeout:
                 raise TimeoutError('The operation timed out.')
 
@@ -167,15 +169,18 @@ class AMQPBackend(BaseBackend):
 
             prev = latest = acc = None
             for i in range(backlog_limit):  # spool ffwd
-                prev, latest, acc = latest, acc, binding.get(
+                acc = binding.get(
                     accept=self.accept, no_ack=False,
                 )
                 if not acc:  # no more messages
                     break
+                if acc.payload['task_id'] == task_id:
+                    prev, latest = latest, acc
                 if prev:
                     # backends are not expected to keep history,
                     # so we delete everything except the most recent state.
                     prev.ack()
+                    prev = None
             else:
                 raise self.BacklogLimitExceeded(task_id)
 
@@ -193,7 +198,7 @@ class AMQPBackend(BaseBackend):
     poll = get_task_meta  # XXX compat
 
     def drain_events(self, connection, consumer,
-                     timeout=None, now=monotonic, wait=None):
+                     timeout=None, on_interval=None, now=monotonic, wait=None):
         wait = wait or connection.drain_events
         results = {}
 
@@ -209,27 +214,30 @@ class AMQPBackend(BaseBackend):
             if timeout and now() - time_start >= timeout:
                 raise socket.timeout()
             wait(timeout=timeout)
+            if on_interval:
+                on_interval()
             if results:  # got event on the wanted channel.
                 break
         self._cache.update(results)
         return results
 
-    def consume(self, task_id, timeout=None):
+    def consume(self, task_id, timeout=None, no_ack=True, on_interval=None):
         wait = self.drain_events
         with self.app.pool.acquire_channel(block=True) as (conn, channel):
             binding = self._create_binding(task_id)
             with self.Consumer(channel, binding,
-                               no_ack=True, accept=self.accept) as consumer:
+                               no_ack=no_ack, accept=self.accept) as consumer:
                 while 1:
                     try:
-                        return wait(conn, consumer, timeout)[task_id]
+                        return wait(
+                            conn, consumer, timeout, on_interval)[task_id]
                     except KeyError:
                         continue
 
     def _many_bindings(self, ids):
         return [self._create_binding(task_id) for task_id in ids]
 
-    def get_many(self, task_ids, timeout=None,
+    def get_many(self, task_ids, timeout=None, no_ack=True,
                  now=monotonic, getfields=itemgetter('status', 'task_id'),
                  READY_STATES=states.READY_STATES,
                  PROPAGATE_STATES=states.PROPAGATE_STATES, **kwargs):
@@ -263,7 +271,7 @@ class AMQPBackend(BaseBackend):
 
             bindings = self._many_bindings(task_ids)
             with self.Consumer(channel, bindings, on_message=on_message,
-                               accept=self.accept, no_ack=True):
+                               accept=self.accept, no_ack=no_ack):
                 wait = conn.drain_events
                 popleft = results.popleft
                 while ids:

+ 71 - 34
celery/backends/base.py

@@ -35,6 +35,7 @@ from celery.result import (
 )
 from celery.utils import timeutils
 from celery.utils.functional import LRUCache
+from celery.utils.log import get_logger
 from celery.utils.serialization import (
     get_pickled_exception,
     get_pickleable_exception,
@@ -46,12 +47,20 @@ __all__ = ['BaseBackend', 'KeyValueStoreBackend', 'DisabledBackend']
 EXCEPTION_ABLE_CODECS = frozenset(['pickle', 'yaml'])
 PY3 = sys.version_info >= (3, 0)
 
+logger = get_logger(__name__)
+
 
 def unpickle_backend(cls, args, kwargs):
     """Return an unpickled backend."""
     return cls(*args, app=current_app._get_current_object(), **kwargs)
 
 
+class _nulldict(dict):
+
+    def __setitem__(self, k, v):
+        pass
+
+
 class BaseBackend(object):
     READY_STATES = states.READY_STATES
     UNREADY_STATES = states.UNREADY_STATES
@@ -90,9 +99,8 @@ class BaseBackend(object):
         (self.content_type,
          self.content_encoding,
          self.encoder) = serializer_registry._encoders[self.serializer]
-        self._cache = LRUCache(
-            limit=max_cached_results or conf.CELERY_MAX_CACHED_RESULTS,
-        )
+        cmax = max_cached_results or conf.CELERY_MAX_CACHED_RESULTS
+        self._cache = _nulldict() if cmax == -1 else LRUCache(limit=cmax)
         self.accept = prepare_accept_content(
             conf.CELERY_ACCEPT_CONTENT if accept is None else accept,
         )
@@ -111,6 +119,21 @@ class BaseBackend(object):
         return self.store_result(task_id, exc, status=states.FAILURE,
                                  traceback=traceback, request=request)
 
+    def chord_error_from_stack(self, callback, exc=None):
+        from celery import group
+        app = self.app
+        backend = app._tasks[callback.task].backend
+        try:
+            group(
+                [app.signature(errback)
+                 for errback in callback.options.get('link_error') or []],
+                app=app,
+            ).apply_async((callback.id, ))
+        except Exception as eb_exc:
+            return backend.fail_from_current_stack(callback.id, exc=eb_exc)
+        else:
+            return backend.fail_from_current_stack(callback.id, exc=exc)
+
     def fail_from_current_stack(self, task_id, exc=None):
         type_, real_exc, tb = sys.exc_info()
         try:
@@ -132,9 +155,10 @@ class BaseBackend(object):
                                  status=states.REVOKED, traceback=None,
                                  request=request)
 
-    def prepare_exception(self, exc):
+    def prepare_exception(self, exc, serializer=None):
         """Prepare exception for serialization."""
-        if self.serializer in EXCEPTION_ABLE_CODECS:
+        serializer = self.serializer if serializer is None else serializer
+        if serializer in EXCEPTION_ABLE_CODECS:
             return get_pickleable_exception(exc)
         return {'exc_type': type(exc).__name__, 'exc_message': str(exc)}
 
@@ -162,7 +186,9 @@ class BaseBackend(object):
                      content_encoding=self.content_encoding,
                      accept=self.accept)
 
-    def wait_for(self, task_id, timeout=None, propagate=True, interval=0.5):
+    def wait_for(self, task_id,
+                 timeout=None, propagate=True, interval=0.5, no_ack=True,
+                 on_interval=None):
         """Wait for task and return its result.
 
         If the task raises an exception, this exception
@@ -185,6 +211,8 @@ class BaseBackend(object):
                 if propagate:
                     raise result
                 return result
+            if on_interval:
+                on_interval()
             # avoid hammering the CPU checking status.
             time.sleep(interval)
             time_elapsed += interval
@@ -311,7 +339,7 @@ class BaseBackend(object):
     def on_task_call(self, producer, task_id):
         return {}
 
-    def on_chord_part_return(self, task, propagate=False):
+    def on_chord_part_return(self, task, state, result, propagate=False):
         pass
 
     def fallback_chord_unlock(self, group_id, body, result=None,
@@ -374,17 +402,26 @@ class KeyValueStoreBackend(BaseBackend):
     def expire(self, key, value):
         pass
 
-    def get_key_for_task(self, task_id):
+    def get_key_for_task(self, task_id, key=''):
         """Get the cache key for a task by id."""
-        return self.task_keyprefix + self.key_t(task_id)
+        key_t = self.key_t
+        return key_t('').join([
+            self.task_keyprefix, key_t(task_id), key_t(key),
+        ])
 
-    def get_key_for_group(self, group_id):
+    def get_key_for_group(self, group_id, key=''):
         """Get the cache key for a group by id."""
-        return self.group_keyprefix + self.key_t(group_id)
+        key_t = self.key_t
+        return key_t('').join([
+            self.group_keyprefix, key_t(group_id), key_t(key),
+        ])
 
-    def get_key_for_chord(self, group_id):
+    def get_key_for_chord(self, group_id, key=''):
         """Get the cache key for the chord waiting on group with given id."""
-        return self.chord_keyprefix + self.key_t(group_id)
+        key_t = self.key_t
+        return key_t('').join([
+            self.chord_keyprefix, key_t(group_id), key_t(key),
+        ])
 
     def _strip_prefix(self, key):
         """Takes bytes, emits string."""
@@ -406,7 +443,7 @@ class KeyValueStoreBackend(BaseBackend):
                         for i, value in enumerate(values)
                         if value is not None)
 
-    def get_many(self, task_ids, timeout=None, interval=0.5,
+    def get_many(self, task_ids, timeout=None, interval=0.5, no_ack=True,
                  READY_STATES=states.READY_STATES):
         interval = 0.5 if interval is None else interval
         ids = task_ids if isinstance(task_ids, set) else set(task_ids)
@@ -479,12 +516,12 @@ class KeyValueStoreBackend(BaseBackend):
         self.save_group(group_id, self.app.GroupResult(group_id, result))
         return header(*partial_args, task_id=group_id)
 
-    def on_chord_part_return(self, task, propagate=None):
+    def on_chord_part_return(self, task, state, result, propagate=None):
         if not self.implements_incr:
             return
         app = self.app
         if propagate is None:
-            propagate = self.app.conf.CELERY_CHORD_PROPAGATES
+            propagate = app.conf.CELERY_CHORD_PROPAGATES
         gid = task.request.group
         if not gid:
             return
@@ -492,26 +529,26 @@ class KeyValueStoreBackend(BaseBackend):
         try:
             deps = GroupResult.restore(gid, backend=task.backend)
         except Exception as exc:
-            callback = maybe_signature(task.request.chord, app=self.app)
-            return app._tasks[callback.task].backend.fail_from_current_stack(
-                callback.id,
-                exc=ChordError('Cannot restore group: {0!r}'.format(exc)),
+            callback = maybe_signature(task.request.chord, app=app)
+            logger.error('Chord %r raised: %r', gid, exc, exc_info=1)
+            return self.chord_error_from_stack(
+                callback,
+                ChordError('Cannot restore group: {0!r}'.format(exc)),
             )
         if deps is None:
             try:
                 raise ValueError(gid)
             except ValueError as exc:
-                callback = maybe_signature(task.request.chord, app=self.app)
-                task = app._tasks[callback.task]
-                return task.backend.fail_from_current_stack(
-                    callback.id,
-                    exc=ChordError('GroupResult {0} no longer exists'.format(
-                        gid,
-                    ))
+                callback = maybe_signature(task.request.chord, app=app)
+                logger.error('Chord callback %r raised: %r', gid, exc,
+                             exc_info=1)
+                return self.chord_error_from_stack(
+                    callback,
+                    ChordError('GroupResult {0} no longer exists'.format(gid)),
                 )
         val = self.incr(key)
         if val >= len(deps):
-            callback = maybe_signature(task.request.chord, app=self.app)
+            callback = maybe_signature(task.request.chord, app=app)
             j = deps.join_native if deps.supports_native_join else deps.join
             try:
                 with allow_join_result():
@@ -525,16 +562,16 @@ class KeyValueStoreBackend(BaseBackend):
                 except StopIteration:
                     reason = repr(exc)
 
-                app._tasks[callback.task].backend.fail_from_current_stack(
-                    callback.id, exc=ChordError(reason),
-                )
+                logger.error('Chord %r raised: %r', gid, reason, exc_info=1)
+                self.chord_error_from_stack(callback, ChordError(reason))
             else:
                 try:
                     callback.delay(ret)
                 except Exception as exc:
-                    app._tasks[callback.task].backend.fail_from_current_stack(
-                        callback.id,
-                        exc=ChordError('Callback error: {0!r}'.format(exc)),
+                    logger.error('Chord %r raised: %r', gid, exc, exc_info=1)
+                    self.chord_error_from_stack(
+                        callback,
+                        ChordError('Callback error: {0!r}'.format(exc)),
                     )
             finally:
                 deps.delete()

+ 76 - 4
celery/backends/redis.py

@@ -13,9 +13,11 @@ from functools import partial
 from kombu.utils import cached_property, retry_over_time
 from kombu.utils.url import _parse_url
 
-from celery.exceptions import ImproperlyConfigured
+from celery import states
+from celery.canvas import maybe_signature
+from celery.exceptions import ChordError, ImproperlyConfigured
 from celery.five import string_t
-from celery.utils import deprecated_property
+from celery.utils import deprecated_property, strtobool
 from celery.utils.functional import dictfilter
 from celery.utils.log import get_logger
 from celery.utils.timeutils import humanize_seconds
@@ -56,7 +58,7 @@ class RedisBackend(KeyValueStoreBackend):
 
     def __init__(self, host=None, port=None, db=None, password=None,
                  expires=None, max_connections=None, url=None,
-                 connection_pool=None, **kwargs):
+                 connection_pool=None, new_join=False, **kwargs):
         super(RedisBackend, self).__init__(**kwargs)
         conf = self.app.conf
         if self.redis is None:
@@ -90,7 +92,17 @@ class RedisBackend(KeyValueStoreBackend):
         self.url = url
         self.expires = self.prepare_expires(expires, type=int)
 
-        self.connection_errors, self.channel_errors = get_redis_error_classes()
+        try:
+            new_join = strtobool(self.connparams.pop('new_join'))
+        except KeyError:
+            pass
+        if new_join:
+            self.apply_chord = self._new_chord_apply
+            self.on_chord_part_return = self._new_chord_return
+
+        self.connection_errors, self.channel_errors = (
+            get_redis_error_classes() if get_redis_error_classes
+            else ((), ()))
 
     def _params_from_url(self, url, defaults):
         scheme, host, port, user, password, path, query = _parse_url(url)
@@ -165,6 +177,66 @@ class RedisBackend(KeyValueStoreBackend):
     def expire(self, key, value):
         return self.client.expire(key, value)
 
+    def _unpack_chord_result(self, tup, decode,
+                             PROPAGATE_STATES=states.PROPAGATE_STATES):
+        _, tid, state, retval = decode(tup)
+        if state in PROPAGATE_STATES:
+            raise ChordError('Dependency {0} raised {1!r}'.format(tid, retval))
+        return retval
+
+    def _new_chord_apply(self, header, partial_args, group_id, body,
+                         result=None, **options):
+        # avoids saving the group in the redis db.
+        return header(*partial_args, task_id=group_id)
+
+    def _new_chord_return(self, task, state, result, propagate=None,
+                          PROPAGATE_STATES=states.PROPAGATE_STATES):
+        app = self.app
+        if propagate is None:
+            propagate = self.app.conf.CELERY_CHORD_PROPAGATES
+        request = task.request
+        tid, gid = request.id, request.group
+        if not gid or not tid:
+            return
+
+        client = self.client
+        jkey = self.get_key_for_group(gid, '.j')
+        result = self.encode_result(result, state)
+        _, readycount, _ = client.pipeline()                            \
+            .rpush(jkey, self.encode([1, tid, state, result]))          \
+            .llen(jkey)                                                 \
+            .expire(jkey, 86400)                                        \
+            .execute()
+
+        try:
+            callback = maybe_signature(request.chord, app=app)
+            total = callback['chord_size']
+            if readycount >= total:
+                decode, unpack = self.decode, self._unpack_chord_result
+                resl, _ = client.pipeline()     \
+                    .lrange(jkey, 0, total)     \
+                    .delete(jkey)               \
+                    .execute()
+                try:
+                    callback.delay([unpack(tup, decode) for tup in resl])
+                except Exception as exc:
+                    error('Chord callback for %r raised: %r',
+                          request.group, exc, exc_info=1)
+                    app._tasks[callback.task].backend.fail_from_current_stack(
+                        callback.id,
+                        exc=ChordError('Callback error: {0!r}'.format(exc)),
+                    )
+        except ChordError as exc:
+            error('Chord %r raised: %r', request.group, exc, exc_info=1)
+            app._tasks[callback.task].backend.fail_from_current_stack(
+                callback.id, exc=exc,
+            )
+        except Exception as exc:
+            error('Chord %r raised: %r', request.group, exc, exc_info=1)
+            app._tasks[callback.task].backend.fail_from_current_stack(
+                callback.id, exc=ChordError('Join error: {0!r}'.format(exc)),
+            )
+
     @property
     def ConnectionPool(self):
         if self._ConnectionPool is None:

+ 31 - 13
celery/beat.py

@@ -161,17 +161,24 @@ class Scheduler(object):
     #: How often to sync the schedule (3 minutes by default)
     sync_every = 3 * 60
 
+    #: How many tasks can be called before a sync is forced.
+    sync_every_tasks = None
+
     _last_sync = None
+    _tasks_since_sync = 0
 
     logger = logger  # compat
 
     def __init__(self, app, schedule=None, max_interval=None,
-                 Publisher=None, lazy=False, **kwargs):
+                 Publisher=None, lazy=False, sync_every_tasks=None, **kwargs):
         self.app = app
         self.data = maybe_evaluate({} if schedule is None else schedule)
         self.max_interval = (max_interval
                              or app.conf.CELERYBEAT_MAX_LOOP_INTERVAL
                              or self.max_interval)
+        self.sync_every_tasks = (
+            app.conf.CELERYBEAT_SYNC_EVERY if sync_every_tasks is None
+            else sync_every_tasks)
         self.Publisher = Publisher or app.amqp.TaskProducer
         if not lazy:
             self.setup_schedule()
@@ -219,8 +226,12 @@ class Scheduler(object):
         return min(remaining_times + [self.max_interval])
 
     def should_sync(self):
-        return (not self._last_sync or
-                (monotonic() - self._last_sync) > self.sync_every)
+        return (
+            (not self._last_sync or
+               (monotonic() - self._last_sync) > self.sync_every) or
+            (self.sync_every_tasks and
+                self._tasks_since_sync >= self.sync_every_tasks)
+        )
 
     def reserve(self, entry):
         new_entry = self.schedule[entry.name] = next(entry)
@@ -247,6 +258,7 @@ class Scheduler(object):
                 "Couldn't apply scheduled task {0.name}: {exc}".format(
                     entry, exc=exc)), sys.exc_info()[2])
         finally:
+            self._tasks_since_sync += 1
             if self.should_sync():
                 self._do_sync()
         return result
@@ -263,6 +275,7 @@ class Scheduler(object):
             self.sync()
         finally:
             self._last_sync = monotonic()
+            self._tasks_since_sync = 0
 
     def sync(self):
         pass
@@ -352,7 +365,6 @@ class PersistentScheduler(Scheduler):
         try:
             self._store = self.persistence.open(self.schedule_filename,
                                                 writeback=True)
-            entries = self._store.setdefault('entries', {})
         except Exception as exc:
             error('Removing corrupted schedule file %r: %r',
                   self.schedule_filename, exc, exc_info=True)
@@ -360,15 +372,21 @@ class PersistentScheduler(Scheduler):
             self._store = self.persistence.open(self.schedule_filename,
                                                 writeback=True)
         else:
-            if '__version__' not in self._store:
-                warning('Reset: Account for new __version__ field')
-                self._store.clear()   # remove schedule at 2.2.2 upgrade.
-            if 'tz' not in self._store:
-                warning('Reset: Account for new tz field')
-                self._store.clear()   # remove schedule at 3.0.8 upgrade
-            if 'utc_enabled' not in self._store:
-                warning('Reset: Account for new utc_enabled field')
-                self._store.clear()   # remove schedule at 3.0.9 upgrade
+            try:
+                self._store['entries']
+            except KeyError:
+                # new schedule db
+                self._store['entries'] = {}
+            else:
+                if '__version__' not in self._store:
+                    warning('DB Reset: Account for new __version__ field')
+                    self._store.clear()   # remove schedule at 2.2.2 upgrade.
+                elif 'tz' not in self._store:
+                    warning('DB Reset: Account for new tz field')
+                    self._store.clear()   # remove schedule at 3.0.8 upgrade
+                elif 'utc_enabled' not in self._store:
+                    warning('DB Reset: Account for new utc_enabled field')
+                    self._store.clear()   # remove schedule at 3.0.9 upgrade
 
         tz = self.app.conf.CELERY_TIMEZONE
         stored_tz = self._store.get('tz')

+ 5 - 17
celery/bin/base.py

@@ -68,7 +68,6 @@ from __future__ import absolute_import, print_function, unicode_literals
 import os
 import random
 import re
-import socket
 import sys
 import warnings
 import json
@@ -86,7 +85,7 @@ from celery.five import items, string, string_t
 from celery.platforms import EX_FAILURE, EX_OK, EX_USAGE
 from celery.utils import term
 from celery.utils import text
-from celery.utils import NODENAME_DEFAULT, nodesplit
+from celery.utils import node_format, host_format
 from celery.utils.imports import symbol_by_name, import_from_cwd
 
 try:
@@ -106,7 +105,6 @@ Try --help?
 
 find_long_opt = re.compile(r'.+?(--.+?)(?:\s|,|$)')
 find_rst_ref = re.compile(r':\w+:`(.+?)`')
-find_sformat = re.compile(r'%(\w)')
 
 __all__ = ['Error', 'UsageError', 'Extensions', 'HelpFormatter',
            'Command', 'Option', 'daemon_options']
@@ -566,20 +564,10 @@ class Command(object):
         pass
 
     def node_format(self, s, nodename, **extra):
-        name, host = nodesplit(nodename)
-        return self._simple_format(
-            s, host, n=name or NODENAME_DEFAULT, **extra)
-
-    def simple_format(self, s, **extra):
-        return self._simple_format(s, socket.gethostname(), **extra)
-
-    def _simple_format(self, s, host,
-                       match=find_sformat, expand=r'\1', **keys):
-        if s:
-            name, _, domain = host.partition('.')
-            keys = dict({'%': '%', 'h': host, 'n': name, 'd': domain}, **keys)
-            return match.sub(lambda m: keys[m.expand(expand)], s)
-        return s
+        return node_format(s, nodename, **extra)
+
+    def host_format(self, s, **extra):
+        return host_format(s, **extra)
 
     def _get_default_app(self, *args, **kwargs):
         from celery._state import get_current_app

+ 2 - 1
celery/bin/celery.py

@@ -9,6 +9,7 @@ The :program:`celery` umbrella command.
 from __future__ import absolute_import, unicode_literals
 
 import anyjson
+import numbers
 import os
 import sys
 
@@ -61,7 +62,7 @@ if DEBUG:  # pragma: no cover
 
 
 def determine_exit_status(ret):
-    if isinstance(ret, int):
+    if isinstance(ret, numbers.Integral):
         return ret
     return EX_OK if ret else EX_FAILURE
 

+ 12 - 0
celery/bin/multi.py

@@ -478,6 +478,18 @@ def multi_args(p, cmd='celery worker', append='', prefix='', suffix=''):
                 p.namespaces[subns].update(ns_opts)
             p.namespaces.pop(ns_name)
 
+    # Numbers in args always refers to the index in the list of names.
+    # (e.g. `start foo bar baz -c:1` where 1 is foo, 2 is bar, and so on).
+    for ns_name, ns_opts in list(items(p.namespaces)):
+        if ns_name.isdigit():
+            ns_index = int(ns_name) - 1
+            if ns_index < 0:
+                raise KeyError('Indexes start at 1 got: %r' % (ns_name, ))
+            try:
+                p.namespaces[names[ns_index]].update(ns_opts)
+            except IndexError:
+                raise KeyError('No node at index %r' % (ns_name, ))
+
     for name in names:
         this_suffix = suffix
         if '@' in name:

+ 2 - 2
celery/bin/worker.py

@@ -192,7 +192,7 @@ class worker(Command):
         if self.app.IS_WINDOWS and kwargs.get('beat'):
             self.die('-B option does not work on Windows.  '
                      'Please run celery beat as a separate service.')
-        hostname = self.simple_format(default_nodename(hostname))
+        hostname = self.host_format(default_nodename(hostname))
         if loglevel:
             try:
                 loglevel = mlevel(loglevel)
@@ -203,7 +203,7 @@ class worker(Command):
 
         return self.app.Worker(
             hostname=hostname, pool_cls=pool_cls, loglevel=loglevel,
-            logfile=self.node_format(logfile, hostname),
+            logfile=logfile,  # node format handled by celery.app.log.setup
             pidfile=self.node_format(pidfile, hostname),
             state_db=self.node_format(state_db, hostname), **kwargs
         ).start()

+ 17 - 4
celery/canvas.py

@@ -194,7 +194,7 @@ class Signature(dict):
         return s
     partial = clone
 
-    def freeze(self, _id=None):
+    def freeze(self, _id=None, group_id=None, chord=None):
         opts = self.options
         try:
             tid = opts['task_id']
@@ -202,6 +202,10 @@ class Signature(dict):
             tid = opts['task_id'] = _id or uuid()
         if 'reply_to' not in opts:
             opts['reply_to'] = self.app.oid
+        if group_id:
+            opts['group_id'] = group_id
+        if chord:
+            opts['chord'] = chord
         return self.AsyncResult(tid)
     _freeze = freeze
 
@@ -502,16 +506,20 @@ class group(Signature):
     def __call__(self, *partial_args, **options):
         return self.apply_async(partial_args, **options)
 
-    def freeze(self, _id=None):
+    def freeze(self, _id=None, group_id=None, chord=None):
         opts = self.options
         try:
             gid = opts['task_id']
         except KeyError:
             gid = opts['task_id'] = uuid()
+        if group_id:
+            opts['group_id'] = group_id
+        if chord:
+            opts['chord'] = group_id
         new_tasks, results = [], []
         for task in self.tasks:
             task = maybe_signature(task, app=self._app).clone()
-            results.append(task._freeze())
+            results.append(task.freeze(group_id=group_id, chord=chord))
             new_tasks.append(task)
         self.tasks = self.kwargs['tasks'] = new_tasks
         return self.app.GroupResult(gid, results)
@@ -552,6 +560,9 @@ class chord(Signature):
         )
         self.subtask_type = 'chord'
 
+    def freeze(self, _id=None, group_id=None, chord=None):
+        return self.body.freeze(_id, group_id=group_id, chord=chord)
+
     @classmethod
     def from_dict(self, d, app=None):
         args, d['kwargs'] = self._unpack_args(**kwdict(d['kwargs']))
@@ -578,7 +589,9 @@ class chord(Signature):
                 app = self.body.type.app
         return app.tasks['celery.chord']
 
-    def apply_async(self, args=(), kwargs={}, task_id=None, **options):
+    def apply_async(self, args=(), kwargs={}, task_id=None,
+                    producer=None, publisher=None, connection=None,
+                    router=None, result_cls=None, **options):
         body = kwargs.get('body') or self.kwargs['body']
         kwargs = dict(self.kwargs, **kwargs)
         body = body.clone(**options)

+ 7 - 2
celery/concurrency/prefork.py

@@ -57,10 +57,15 @@ def process_initializer(app, hostname):
     # run once per process.
     app.loader.init_worker()
     app.loader.init_worker_process()
+    logfile = os.environ.get('CELERY_LOG_FILE') or None
+    if logfile and '%i' in logfile.lower():
+        # logfile path will differ so need to set up logging again.
+        app.log.already_setup = False
     app.log.setup(int(os.environ.get('CELERY_LOG_LEVEL', 0) or 0),
-                  os.environ.get('CELERY_LOG_FILE') or None,
+                  logfile,
                   bool(os.environ.get('CELERY_LOG_REDIRECT', False)),
-                  str(os.environ.get('CELERY_LOG_REDIRECT_LEVEL')))
+                  str(os.environ.get('CELERY_LOG_REDIRECT_LEVEL')),
+                  hostname=hostname)
     if os.environ.get('FORKED_BY_MULTIPROCESSING'):
         # pool did execv after fork
         trace.setup_worker_optimizations(app)

+ 11 - 4
celery/events/state.py

@@ -35,7 +35,7 @@ from kombu.utils import cached_property, kwdict
 from celery import states
 from celery.five import class_property, items, values
 from celery.utils import deprecated
-from celery.utils.functional import LRUCache
+from celery.utils.functional import LRUCache, memoize
 from celery.utils.log import get_logger
 
 PYPY = hasattr(sys, 'pypy_version_info')
@@ -66,6 +66,14 @@ R_TASK = '<Task: {0.name}({0.uuid}) {0.state} clock:{0.clock}>'
 __all__ = ['Worker', 'Task', 'State', 'heartbeat_expires']
 
 
+@memoize(maxsize=1000, keyfun=lambda a, _: a[0])
+def _warn_drift(hostname, drift, local_received, timestamp):
+    # we use memoize here so the warning is only logged once per hostname
+    warn(DRIFT_WARNING, hostname, drift,
+         datetime.fromtimestamp(local_received),
+         datetime.fromtimestamp(timestamp))
+
+
 def heartbeat_expires(timestamp, freq=60,
                       expire_window=HEARTBEAT_EXPIRE_WINDOW,
                       Decimal=Decimal, float=float, isinstance=isinstance):
@@ -158,9 +166,8 @@ class Worker(object):
                     return
                 drift = abs(int(local_received) - int(timestamp))
                 if drift > HEARTBEAT_DRIFT_MAX:
-                    warn(DRIFT_WARNING, self.hostname, drift,
-                         datetime.fromtimestamp(local_received),
-                         datetime.fromtimestamp(timestamp))
+                    _warn_drift(self.hostname, drift,
+                                local_received, timestamp)
                 if local_received:
                     hearts = len(heartbeats)
                     if hearts > hbmax - 1:

+ 5 - 2
celery/exceptions.py

@@ -8,6 +8,8 @@
 """
 from __future__ import absolute_import
 
+import numbers
+
 from .five import string_t
 
 from billiard.exceptions import (  # noqa
@@ -98,7 +100,8 @@ class Retry(Exception):
     #: Exception (if any) that caused the retry to happen.
     exc = None
 
-    #: Time of retry (ETA), either int or :class:`~datetime.datetime`.
+    #: Time of retry (ETA), either :class:`numbers.Real` or
+    #: :class:`~datetime.datetime`.
     when = None
 
     def __init__(self, message=None, exc=None, when=None, **kwargs):
@@ -112,7 +115,7 @@ class Retry(Exception):
         Exception.__init__(self, exc, when, **kwargs)
 
     def humanize(self):
-        if isinstance(self.when, int):
+        if isinstance(self.when, numbers.Real):
             return 'in {0.when}s'.format(self)
         return 'at {0.when}'.format(self)
 

+ 29 - 0
celery/local.py

@@ -212,12 +212,27 @@ class PromiseProxy(Proxy):
 
     """
 
+    __slots__ = ('__pending__', )
+
     def _get_current_object(self):
         try:
             return object.__getattribute__(self, '__thing')
         except AttributeError:
             return self.__evaluate__()
 
+    def __then__(self, fun, *args, **kwargs):
+        if self.__evaluated__():
+            return fun(*args, **kwargs)
+        from collections import deque
+        try:
+            pending = object.__getattribute__(self, '__pending__')
+        except AttributeError:
+            pending = None
+        if pending is None:
+            pending = deque()
+            object.__setattr__(self, '__pending__', pending)
+        pending.append((fun, args, kwargs))
+
     def __evaluated__(self):
         try:
             object.__getattribute__(self, '__thing')
@@ -243,6 +258,20 @@ class PromiseProxy(Proxy):
                 except AttributeError:  # pragma: no cover
                     # May mask errors so ignore
                     pass
+            try:
+                pending = object.__getattribute__(self, '__pending__')
+            except AttributeError:
+                pass
+            else:
+                try:
+                    while pending:
+                        fun, args, kwargs = pending.popleft()
+                        fun(*args, **kwargs)
+                finally:
+                    try:
+                        object.__delattr__(self, '__pending__')
+                    except AttributeError:
+                        pass
 
 
 def maybe_evaluate(obj):

+ 2 - 1
celery/platforms.py

@@ -12,6 +12,7 @@ from __future__ import absolute_import, print_function
 import atexit
 import errno
 import math
+import numbers
 import os
 import platform as _platform
 import signal as _signal
@@ -609,7 +610,7 @@ class Signals(object):
 
     def signum(self, signal_name):
         """Get signal number from signal name."""
-        if isinstance(signal_name, int):
+        if isinstance(signal_name, numbers.Integral):
             return signal_name
         if not isinstance(signal_name, string_t) \
                 or not signal_name.isupper():

+ 88 - 22
celery/result.py

@@ -87,6 +87,7 @@ class AsyncResult(ResultBase):
         self.backend = backend or self.app.backend
         self.task_name = task_name
         self.parent = parent
+        self._cache = None
 
     def as_tuple(self):
         parent = self.parent
@@ -118,7 +119,8 @@ class AsyncResult(ResultBase):
                                 terminate=terminate, signal=signal,
                                 reply=wait, timeout=timeout)
 
-    def get(self, timeout=None, propagate=True, interval=0.5):
+    def get(self, timeout=None, propagate=True, interval=0.5, no_ack=True,
+            follow_parents=True):
         """Wait until task is ready, and return its result.
 
         .. warning::
@@ -133,6 +135,10 @@ class AsyncResult(ResultBase):
            retrieve the result.  Note that this does not have any effect
            when using the amqp result store backend, as it does not
            use polling.
+        :keyword no_ack: Enable amqp no ack (automatically acknowledge
+            message).  If this is :const:`False` then the message will
+            **not be acked**.
+        :keyword follow_parents: Reraise any exception raised by parent task.
 
         :raises celery.exceptions.TimeoutError: if `timeout` is not
             :const:`None` and the result does not arrive within `timeout`
@@ -143,15 +149,32 @@ class AsyncResult(ResultBase):
 
         """
         assert_will_not_block()
-        if propagate and self.parent:
-            for node in reversed(list(self._parents())):
-                node.get(propagate=True, timeout=timeout, interval=interval)
+        on_interval = None
+        if follow_parents and propagate and self.parent:
+            on_interval = self._maybe_reraise_parent_error
+            on_interval()
 
-        return self.backend.wait_for(self.id, timeout=timeout,
-                                     propagate=propagate,
-                                     interval=interval)
+        if self._cache:
+            if propagate:
+                self.maybe_reraise()
+            return self.result
+
+        try:
+            return self.backend.wait_for(
+                self.id, timeout=timeout,
+                propagate=propagate,
+                interval=interval,
+                on_interval=on_interval,
+                no_ack=no_ack,
+            )
+        finally:
+            self._get_task_meta()  # update self._cache
     wait = get  # deprecated alias to :meth:`get`.
 
+    def _maybe_reraise_parent_error(self):
+        for node in reversed(list(self._parents())):
+            node.maybe_reraise()
+
     def _parents(self):
         node = self.parent
         while node:
@@ -238,6 +261,10 @@ class AsyncResult(ResultBase):
         """Returns :const:`True` if the task failed."""
         return self.state == states.FAILURE
 
+    def maybe_reraise(self):
+        if self.state in states.PROPAGATE_STATES:
+            raise self.result
+
     def build_graph(self, intermediate=False, formatter=None):
         graph = DependencyGraph(
             formatter=formatter or GraphFormatter(root=self.id, shape='oval'),
@@ -280,6 +307,9 @@ class AsyncResult(ResultBase):
     def __reduce_args__(self):
         return self.id, self.backend, self.task_name, None, self.parent
 
+    def __del__(self):
+        self._cache = None
+
     @cached_property
     def graph(self):
         return self.build_graph()
@@ -290,22 +320,42 @@ class AsyncResult(ResultBase):
 
     @property
     def children(self):
-        children = self.backend.get_children(self.id)
+        return self._get_task_meta().get('children')
+
+    def _get_task_meta(self):
+        if self._cache is None:
+            meta = self.backend.get_task_meta(self.id)
+            if meta:
+                state = meta['status']
+                if state == states.SUCCESS or state in states.PROPAGATE_STATES:
+                    self._set_cache(meta)
+                    return self._set_cache(meta)
+            return meta
+        return self._cache
+
+    def _set_cache(self, d):
+        state, children = d['status'], d.get('children')
+        if state in states.EXCEPTION_STATES:
+            d['result'] = self.backend.exception_to_python(d['result'])
         if children:
-            return [result_from_tuple(child, self.app) for child in children]
+            d['children'] = [
+                result_from_tuple(child, self.app) for child in children
+            ]
+        self._cache = d
+        return d
 
     @property
     def result(self):
         """When the task has been executed, this contains the return value.
         If the task raised an exception, this will be the exception
         instance."""
-        return self.backend.get_result(self.id)
+        return self._get_task_meta()['result']
     info = result
 
     @property
     def traceback(self):
         """Get the traceback of a failed task."""
-        return self.backend.get_traceback(self.id)
+        return self._get_task_meta().get('traceback')
 
     @property
     def state(self):
@@ -337,7 +387,7 @@ class AsyncResult(ResultBase):
                 then contains the tasks return value.
 
         """
-        return self.backend.get_status(self.id)
+        return self._get_task_meta()['status']
     status = state
 
     @property
@@ -426,6 +476,10 @@ class ResultSet(ResultBase):
         """
         return any(result.failed() for result in self.results)
 
+    def maybe_reraise(self):
+        for result in self.results:
+            result.maybe_reraise()
+
     def waiting(self):
         """Are any of the tasks incomplete?
 
@@ -506,7 +560,8 @@ class ResultSet(ResultBase):
             if timeout and elapsed >= timeout:
                 raise TimeoutError('The operation timed out')
 
-    def get(self, timeout=None, propagate=True, interval=0.5, callback=None):
+    def get(self, timeout=None, propagate=True, interval=0.5,
+            callback=None, no_ack=True):
         """See :meth:`join`
 
         This is here for API compatibility with :class:`AsyncResult`,
@@ -516,9 +571,10 @@ class ResultSet(ResultBase):
         """
         return (self.join_native if self.supports_native_join else self.join)(
             timeout=timeout, propagate=propagate,
-            interval=interval, callback=callback)
+            interval=interval, callback=callback, no_ack=no_ack)
 
-    def join(self, timeout=None, propagate=True, interval=0.5, callback=None):
+    def join(self, timeout=None, propagate=True, interval=0.5,
+             callback=None, no_ack=True):
         """Gathers the results of all tasks as a list in order.
 
         .. note::
@@ -557,6 +613,10 @@ class ResultSet(ResultBase):
                            ``result = app.AsyncResult(task_id)`` (both will
                            take advantage of the backend cache anyway).
 
+        :keyword no_ack: Automatic message acknowledgement (Note that if this
+            is set to :const:`False` then the messages *will not be
+            acknowledged*).
+
         :raises celery.exceptions.TimeoutError: if ``timeout`` is not
             :const:`None` and the operation takes longer than ``timeout``
             seconds.
@@ -573,16 +633,17 @@ class ResultSet(ResultBase):
                 remaining = timeout - (monotonic() - time_start)
                 if remaining <= 0.0:
                     raise TimeoutError('join operation timed out')
-            value = result.get(timeout=remaining,
-                               propagate=propagate,
-                               interval=interval)
+            value = result.get(
+                timeout=remaining, propagate=propagate,
+                interval=interval, no_ack=no_ack,
+            )
             if callback:
                 callback(result.id, value)
             else:
                 results.append(value)
         return results
 
-    def iter_native(self, timeout=None, interval=0.5):
+    def iter_native(self, timeout=None, interval=0.5, no_ack=True):
         """Backend optimized version of :meth:`iterate`.
 
         .. versionadded:: 2.2
@@ -598,11 +659,12 @@ class ResultSet(ResultBase):
         if not results:
             return iter([])
         return results[0].backend.get_many(
-            set(r.id for r in results), timeout=timeout, interval=interval,
+            set(r.id for r in results),
+            timeout=timeout, interval=interval, no_ack=no_ack,
         )
 
     def join_native(self, timeout=None, propagate=True,
-                    interval=0.5, callback=None):
+                    interval=0.5, callback=None, no_ack=True):
         """Backend optimized version of :meth:`join`.
 
         .. versionadded:: 2.2
@@ -619,7 +681,7 @@ class ResultSet(ResultBase):
             (result.id, i) for i, result in enumerate(self.results)
         )
         acc = None if callback else [None for _ in range(len(self))]
-        for task_id, meta in self.iter_native(timeout, interval):
+        for task_id, meta in self.iter_native(timeout, interval, no_ack):
             value = meta['result']
             if propagate and meta['status'] in states.PROPAGATE_STATES:
                 raise value
@@ -772,6 +834,10 @@ class EagerResult(AsyncResult):
         self._state = state
         self._traceback = traceback
 
+    def _get_task_meta(self):
+        return {'task_id': self.id, 'result': self._result, 'status':
+                self._state, 'traceback': self._traceback}
+
     def __reduce__(self):
         return self.__class__, self.__reduce_args__()
 

+ 3 - 2
celery/schedules.py

@@ -9,6 +9,7 @@
 """
 from __future__ import absolute_import
 
+import numbers
 import re
 
 from collections import namedtuple
@@ -401,7 +402,7 @@ class crontab(schedule):
         week.
 
         """
-        if isinstance(cronspec, int):
+        if isinstance(cronspec, numbers.Integral):
             result = set([cronspec])
         elif isinstance(cronspec, string_t):
             result = crontab_parser(max_, min_).parse(cronspec)
@@ -583,7 +584,7 @@ class crontab(schedule):
 
 def maybe_schedule(s, relative=False, app=None):
     if s is not None:
-        if isinstance(s, int):
+        if isinstance(s, numbers.Integral):
             s = timedelta(seconds=s)
         if isinstance(s, timedelta):
             return schedule(s, relative, app=app)

+ 1 - 1
celery/tests/app/test_app.py

@@ -644,7 +644,7 @@ class test_App(AppCase):
 
 class test_defaults(AppCase):
 
-    def test_str_to_bool(self):
+    def test_strtobool(self):
         for s in ('false', 'no', '0'):
             self.assertFalse(defaults.strtobool(s))
         for s in ('true', 'yes', '1'):

+ 36 - 1
celery/tests/app/test_beat.py

@@ -162,7 +162,7 @@ class test_Scheduler(AppCase):
         scheduler.apply_async(scheduler.Entry(task=foo.name, app=self.app))
         self.assertTrue(foo.apply_async.called)
 
-    def test_apply_async_should_not_sync(self):
+    def test_should_sync(self):
 
         @self.app.task(shared=False)
         def not_sync():
@@ -181,6 +181,41 @@ class test_Scheduler(AppCase):
         s.apply_async(s.Entry(task=not_sync.name, app=self.app))
         self.assertFalse(s._do_sync.called)
 
+    def test_should_sync_increments_sync_every_counter(self):
+        self.app.conf.CELERYBEAT_SYNC_EVERY = 2
+
+        @self.app.task(shared=False)
+        def not_sync():
+            pass
+        not_sync.apply_async = Mock()
+
+        s = mScheduler(app=self.app)
+        self.assertEqual(s.sync_every_tasks, 2)
+        s._do_sync = Mock()
+
+        s.apply_async(s.Entry(task=not_sync.name, app=self.app))
+        self.assertEqual(s._tasks_since_sync, 1)
+        s.apply_async(s.Entry(task=not_sync.name, app=self.app))
+        s._do_sync.assert_called_with()
+
+        self.app.conf.CELERYBEAT_SYNC_EVERY = 0
+
+    def test_sync_task_counter_resets_on_do_sync(self):
+        self.app.conf.CELERYBEAT_SYNC_EVERY = 1
+
+        @self.app.task(shared=False)
+        def not_sync():
+            pass
+        not_sync.apply_async = Mock()
+
+        s = mScheduler(app=self.app)
+        self.assertEqual(s.sync_every_tasks, 1)
+
+        s.apply_async(s.Entry(task=not_sync.name, app=self.app))
+        self.assertEqual(s._tasks_since_sync, 0)
+
+        self.app.conf.CELERYBEAT_SYNC_EVERY = 0
+
     @patch('celery.app.base.Celery.send_task')
     def test_send_task(self, send_task):
         b = beat.Scheduler(app=self.app)

+ 28 - 8
celery/tests/app/test_log.py

@@ -2,6 +2,9 @@ from __future__ import absolute_import
 
 import sys
 import logging
+
+from collections import defaultdict
+from io import StringIO
 from tempfile import mktemp
 
 from celery import signals
@@ -248,14 +251,31 @@ class test_default_logger(AppCase):
                 l.info('The quick brown fox...')
                 self.assertIn('The quick brown fox...', stderr.getvalue())
 
-    def test_setup_logger_no_handlers_file(self):
-        with restore_logging():
-            l = self.get_logger()
-            l.handlers = []
-            tempfile = mktemp(suffix='unittest', prefix='celery')
-            l = self.setup_logger(logfile=tempfile, loglevel=0, root=False)
-            self.assertIsInstance(get_handlers(l)[0],
-                                  logging.FileHandler)
+    @patch('os.fstat')
+    def test_setup_logger_no_handlers_file(self, *args):
+        tempfile = mktemp(suffix='unittest', prefix='celery')
+        _open = ('builtins.open' if sys.version_info[0] == 3
+                 else '__builtin__.open')
+        with patch(_open) as osopen:
+            with restore_logging():
+                files = defaultdict(StringIO)
+
+                def open_file(filename, *args, **kwargs):
+                    f = files[filename]
+                    f.fileno = Mock()
+                    f.fileno.return_value = 99
+                    return f
+
+                osopen.side_effect = open_file
+                l = self.get_logger()
+                l.handlers = []
+                l = self.setup_logger(
+                    logfile=tempfile, loglevel=logging.INFO, root=False,
+                )
+                self.assertIsInstance(
+                    get_handlers(l)[0], logging.FileHandler,
+                )
+                self.assertIn(tempfile, files)
 
     def test_redirect_stdouts(self):
         with restore_logging():

+ 11 - 10
celery/tests/backends/test_amqp.py

@@ -33,8 +33,8 @@ class test_AMQPBackend(AppCase):
         return AMQPBackend(self.app, **opts)
 
     def test_mark_as_done(self):
-        tb1 = self.create_backend()
-        tb2 = self.create_backend()
+        tb1 = self.create_backend(max_cached_results=1)
+        tb2 = self.create_backend(max_cached_results=1)
 
         tid = uuid()
 
@@ -175,7 +175,7 @@ class test_AMQPBackend(AppCase):
         class MockBackend(AMQPBackend):
             Queue = MockBinding
 
-        backend = MockBackend(self.app)
+        backend = MockBackend(self.app, max_cached_results=100)
         backend._republish = Mock()
 
         yield results, backend, Message
@@ -183,29 +183,30 @@ class test_AMQPBackend(AppCase):
     def test_backlog_limit_exceeded(self):
         with self._result_context() as (results, backend, Message):
             for i in range(1001):
-                results.put(Message(status=states.RECEIVED))
+                results.put(Message(task_id='id', status=states.RECEIVED))
             with self.assertRaises(backend.BacklogLimitExceeded):
                 backend.get_task_meta('id')
 
     def test_poll_result(self):
         with self._result_context() as (results, backend, Message):
+            tid = uuid()
             # FFWD's to the latest state.
             state_messages = [
-                Message(status=states.RECEIVED, seq=1),
-                Message(status=states.STARTED, seq=2),
-                Message(status=states.FAILURE, seq=3),
+                Message(task_id=tid, status=states.RECEIVED, seq=1),
+                Message(task_id=tid, status=states.STARTED, seq=2),
+                Message(task_id=tid, status=states.FAILURE, seq=3),
             ]
             for state_message in state_messages:
                 results.put(state_message)
-            r1 = backend.get_task_meta(uuid())
+            r1 = backend.get_task_meta(tid)
             self.assertDictContainsSubset(
                 {'status': states.FAILURE, 'seq': 3}, r1,
                 'FFWDs to the last state',
             )
 
             # Caches last known state.
-            results.put(Message())
             tid = uuid()
+            results.put(Message(task_id=tid))
             backend.get_task_meta(tid)
             self.assertIn(tid, backend._cache, 'Caches last known state')
 
@@ -261,7 +262,7 @@ class test_AMQPBackend(AppCase):
                 b.drain_events(Connection(), consumer, timeout=0.1)
 
     def test_get_many(self):
-        b = self.create_backend()
+        b = self.create_backend(max_cached_results=10)
 
         tids = []
         for i in range(10):

+ 13 - 10
celery/tests/backends/test_base.py

@@ -62,7 +62,7 @@ class test_BaseBackend_interface(AppCase):
             self.b.forget('SOMExx-N0nex1stant-IDxx-')
 
     def test_on_chord_part_return(self):
-        self.b.on_chord_part_return(None)
+        self.b.on_chord_part_return(None, None, None)
 
     def test_apply_chord(self, unlock='celery.chord_unlock'):
         self.app.tasks[unlock] = Mock()
@@ -234,9 +234,10 @@ class test_BaseBackend_dict(AppCase):
         self.assertIsInstance(self.b.prepare_value(g), self.app.GroupResult)
 
     def test_is_cached(self):
-        self.b._cache['foo'] = 1
-        self.assertTrue(self.b.is_cached('foo'))
-        self.assertFalse(self.b.is_cached('false'))
+        b = BaseBackend(app=self.app, max_cached_results=1)
+        b._cache['foo'] = 1
+        self.assertTrue(b.is_cached('foo'))
+        self.assertFalse(b.is_cached('false'))
 
 
 class test_KeyValueStoreBackend(AppCase):
@@ -246,7 +247,7 @@ class test_KeyValueStoreBackend(AppCase):
 
     def test_on_chord_part_return(self):
         assert not self.b.implements_incr
-        self.b.on_chord_part_return(None)
+        self.b.on_chord_part_return(None, None, None)
 
     def test_get_store_delete_result(self):
         tid = uuid()
@@ -282,12 +283,14 @@ class test_KeyValueStoreBackend(AppCase):
     def test_chord_part_return_no_gid(self):
         self.b.implements_incr = True
         task = Mock()
+        state = 'SUCCESS'
+        result = 10
         task.request.group = None
         self.b.get_key_for_chord = Mock()
         self.b.get_key_for_chord.side_effect = AssertionError(
             'should not get here',
         )
-        self.assertIsNone(self.b.on_chord_part_return(task))
+        self.assertIsNone(self.b.on_chord_part_return(task, state, result))
 
     @contextmanager
     def _chord_part_context(self, b):
@@ -315,14 +318,14 @@ class test_KeyValueStoreBackend(AppCase):
 
     def test_chord_part_return_propagate_set(self):
         with self._chord_part_context(self.b) as (task, deps, _):
-            self.b.on_chord_part_return(task, propagate=True)
+            self.b.on_chord_part_return(task, 'SUCCESS', 10, propagate=True)
             self.assertFalse(self.b.expire.called)
             deps.delete.assert_called_with()
             deps.join_native.assert_called_with(propagate=True, timeout=3.0)
 
     def test_chord_part_return_propagate_default(self):
         with self._chord_part_context(self.b) as (task, deps, _):
-            self.b.on_chord_part_return(task, propagate=None)
+            self.b.on_chord_part_return(task, 'SUCCESS', 10, propagate=None)
             self.assertFalse(self.b.expire.called)
             deps.delete.assert_called_with()
             deps.join_native.assert_called_with(
@@ -334,7 +337,7 @@ class test_KeyValueStoreBackend(AppCase):
         with self._chord_part_context(self.b) as (task, deps, callback):
             deps._failed_join_report = lambda: iter([])
             deps.join_native.side_effect = KeyError('foo')
-            self.b.on_chord_part_return(task)
+            self.b.on_chord_part_return(task, 'SUCCESS', 10)
             self.assertTrue(self.b.fail_from_current_stack.called)
             args = self.b.fail_from_current_stack.call_args
             exc = args[1]['exc']
@@ -348,7 +351,7 @@ class test_KeyValueStoreBackend(AppCase):
                 self.app.AsyncResult('culprit'),
             ])
             deps.join_native.side_effect = KeyError('foo')
-            b.on_chord_part_return(task)
+            b.on_chord_part_return(task, 'SUCCESS', 10)
             self.assertTrue(b.fail_from_current_stack.called)
             args = b.fail_from_current_stack.call_args
             exc = args[1]['exc']

+ 2 - 2
celery/tests/backends/test_cache.py

@@ -86,10 +86,10 @@ class test_CacheBackend(AppCase):
         tb.apply_chord(group(app=self.app), (), gid, {}, result=res)
 
         self.assertFalse(deps.join_native.called)
-        tb.on_chord_part_return(task)
+        tb.on_chord_part_return(task, 'SUCCESS', 10)
         self.assertFalse(deps.join_native.called)
 
-        tb.on_chord_part_return(task)
+        tb.on_chord_part_return(task, 'SUCCESS', 10)
         deps.join_native.assert_called_with(propagate=True, timeout=3.0)
         deps.delete.assert_called_with()
 

+ 102 - 74
celery/tests/backends/test_redis.py

@@ -4,52 +4,55 @@ from datetime import timedelta
 
 from pickle import loads, dumps
 
-from kombu.utils import cached_property, uuid
-
 from celery import signature
 from celery import states
 from celery import group
+from celery import uuid
 from celery.datastructures import AttributeDict
 from celery.exceptions import ImproperlyConfigured
 from celery.utils.timeutils import timedelta_seconds
 
 from celery.tests.case import (
-    AppCase, Mock, SkipTest, depends_on_current_app, patch,
+    AppCase, Mock, MockCallbacks, SkipTest, depends_on_current_app, patch,
 )
 
 
-class Redis(object):
+class Connection(object):
+    connected = True
+
+    def disconnect(self):
+        self.connected = False
+
 
-    class Connection(object):
-        connected = True
+class Pipeline(object):
 
-        def disconnect(self):
-            self.connected = False
+    def __init__(self, client):
+        self.client = client
+        self.steps = []
 
-    class Pipeline(object):
+    def __getattr__(self, attr):
 
-        def __init__(self, client):
-            self.client = client
-            self.steps = []
+        def add_step(*args, **kwargs):
+            self.steps.append((getattr(self.client, attr), args, kwargs))
+            return self
+        return add_step
 
-        def __getattr__(self, attr):
+    def execute(self):
+        return [step(*a, **kw) for step, a, kw in self.steps]
 
-            def add_step(*args, **kwargs):
-                self.steps.append((getattr(self.client, attr), args, kwargs))
-                return self
-            return add_step
 
-        def execute(self):
-            return [step(*a, **kw) for step, a, kw in self.steps]
+class Redis(MockCallbacks):
+    Connection = Connection
+    Pipeline = Pipeline
 
     def __init__(self, host=None, port=None, db=None, password=None, **kw):
         self.host = host
         self.port = port
         self.db = db
         self.password = password
-        self.connection = self.Connection()
         self.keyspace = {}
         self.expiry = {}
+        self.connection = self.Connection()
 
     def get(self, key):
         return self.keyspace.get(key)
@@ -63,16 +66,30 @@ class Redis(object):
 
     def expire(self, key, expires):
         self.expiry[key] = expires
+        return expires
 
     def delete(self, key):
-        self.keyspace.pop(key)
-
-    def publish(self, key, value):
-        pass
+        return bool(self.keyspace.pop(key, None))
 
     def pipeline(self):
         return self.Pipeline(self)
 
+    def _get_list(self, key):
+        try:
+            return self.keyspace[key]
+        except KeyError:
+            l = self.keyspace[key] = []
+            return l
+
+    def rpush(self, key, value):
+        self._get_list(key).append(value)
+
+    def lrange(self, key, start, stop):
+        return self._get_list(key)[start:stop]
+
+    def llen(self, key):
+        return len(self.keyspace.get(key) or [])
+
 
 class redis(object):
     Redis = Redis
@@ -91,41 +108,34 @@ class redis(object):
 class test_RedisBackend(AppCase):
 
     def get_backend(self):
-        from celery.backends import redis
+        from celery.backends.redis import RedisBackend
 
-        class RedisBackend(redis.RedisBackend):
+        class _RedisBackend(RedisBackend):
             redis = redis
 
-        return RedisBackend
+        return _RedisBackend
 
     def setup(self):
         self.Backend = self.get_backend()
 
-        class MockBackend(self.Backend):
-
-            @cached_property
-            def client(self):
-                return Mock()
-
-        self.MockBackend = MockBackend
-
     @depends_on_current_app
     def test_reduce(self):
         try:
             from celery.backends.redis import RedisBackend
-            x = RedisBackend(app=self.app)
+            x = RedisBackend(app=self.app, new_join=True)
             self.assertTrue(loads(dumps(x)))
         except ImportError:
             raise SkipTest('redis not installed')
 
     def test_no_redis(self):
-        self.MockBackend.redis = None
+        self.Backend.redis = None
         with self.assertRaises(ImproperlyConfigured):
-            self.MockBackend(app=self.app)
+            self.Backend(app=self.app, new_join=True)
 
     def test_url(self):
-        x = self.MockBackend(
+        x = self.Backend(
             'redis://:bosco@vandelay.com:123//1', app=self.app,
+            new_join=True,
         )
         self.assertTrue(x.connparams)
         self.assertEqual(x.connparams['host'], 'vandelay.com')
@@ -134,8 +144,9 @@ class test_RedisBackend(AppCase):
         self.assertEqual(x.connparams['password'], 'bosco')
 
     def test_socket_url(self):
-        x = self.MockBackend(
+        x = self.Backend(
             'socket:///tmp/redis.sock?virtual_host=/3', app=self.app,
+            new_join=True,
         )
         self.assertTrue(x.connparams)
         self.assertEqual(x.connparams['path'], '/tmp/redis.sock')
@@ -148,8 +159,9 @@ class test_RedisBackend(AppCase):
         self.assertEqual(x.connparams['db'], 3)
 
     def test_compat_propertie(self):
-        x = self.MockBackend(
+        x = self.Backend(
             'redis://:bosco@vandelay.com:123//1', app=self.app,
+            new_join=True,
         )
         with self.assertPendingDeprecation():
             self.assertEqual(x.host, 'vandelay.com')
@@ -167,71 +179,85 @@ class test_RedisBackend(AppCase):
             'CELERY_ACCEPT_CONTENT': ['json'],
             'CELERY_TASK_RESULT_EXPIRES': None,
         })
-        self.MockBackend(app=self.app)
+        self.Backend(app=self.app, new_join=True)
 
     def test_expires_defaults_to_config(self):
         self.app.conf.CELERY_TASK_RESULT_EXPIRES = 10
-        b = self.Backend(expires=None, app=self.app)
+        b = self.Backend(expires=None, app=self.app, new_join=True)
         self.assertEqual(b.expires, 10)
 
     def test_expires_is_int(self):
-        b = self.Backend(expires=48, app=self.app)
+        b = self.Backend(expires=48, app=self.app, new_join=True)
         self.assertEqual(b.expires, 48)
 
+    def test_set_new_join_from_url_query(self):
+        b = self.Backend('redis://?new_join=True;foobar=1', app=self.app)
+        self.assertEqual(b.on_chord_part_return, b._new_chord_return)
+        self.assertEqual(b.apply_chord, b._new_chord_apply)
+
+    def test_default_is_old_join(self):
+        b = self.Backend(app=self.app)
+        self.assertNotEqual(b.on_chord_part_return, b._new_chord_return)
+        self.assertNotEqual(b.apply_chord, b._new_chord_apply)
+
     def test_expires_is_None(self):
-        b = self.Backend(expires=None, app=self.app)
+        b = self.Backend(expires=None, app=self.app, new_join=True)
         self.assertEqual(b.expires, timedelta_seconds(
             self.app.conf.CELERY_TASK_RESULT_EXPIRES))
 
     def test_expires_is_timedelta(self):
-        b = self.Backend(expires=timedelta(minutes=1), app=self.app)
+        b = self.Backend(
+            expires=timedelta(minutes=1), app=self.app, new_join=1,
+        )
         self.assertEqual(b.expires, 60)
 
     def test_apply_chord(self):
-        self.Backend(app=self.app).apply_chord(
+        self.Backend(app=self.app, new_join=True).apply_chord(
             group(app=self.app), (), 'group_id', {},
             result=[self.app.AsyncResult(x) for x in [1, 2, 3]],
         )
 
     def test_mget(self):
-        b = self.MockBackend(app=self.app)
+        b = self.Backend(app=self.app, new_join=True)
         self.assertTrue(b.mget(['a', 'b', 'c']))
         b.client.mget.assert_called_with(['a', 'b', 'c'])
 
     def test_set_no_expire(self):
-        b = self.MockBackend(app=self.app)
+        b = self.Backend(app=self.app, new_join=True)
         b.expires = None
         b.set('foo', 'bar')
 
     @patch('celery.result.GroupResult.restore')
     def test_on_chord_part_return(self, restore):
-        b = self.MockBackend(app=self.app)
-        deps = Mock()
-        deps.__len__ = Mock()
-        deps.__len__.return_value = 10
-        restore.return_value = deps
-        b.client.incr.return_value = 1
-        task = Mock()
-        task.name = 'foobarbaz'
-        self.app.tasks['foobarbaz'] = task
-        task.request.chord = signature(task)
-        task.request.group = 'group_id'
-
-        b.on_chord_part_return(task)
-        self.assertTrue(b.client.incr.call_count)
-
-        b.client.incr.return_value = len(deps)
-        b.on_chord_part_return(task)
-        deps.join_native.assert_called_with(propagate=True, timeout=3.0)
-        deps.delete.assert_called_with()
-
-        self.assertTrue(b.client.expire.call_count)
+        b = self.Backend(app=self.app, new_join=True)
+
+        def create_task():
+            tid = uuid()
+            task = Mock(name='task-{0}'.format(tid))
+            task.name = 'foobarbaz'
+            self.app.tasks['foobarbaz'] = task
+            task.request.chord = signature(task)
+            task.request.id = tid
+            task.request.chord['chord_size'] = 10
+            task.request.group = 'group_id'
+            return task
+
+        tasks = [create_task() for i in range(10)]
+
+        for i in range(10):
+            b.on_chord_part_return(tasks[i], states.SUCCESS, i)
+            self.assertTrue(b.client.rpush.call_count)
+            b.client.rpush.reset_mock()
+        self.assertTrue(b.client.lrange.call_count)
+        gkey = b.get_key_for_group('group_id', '.j')
+        b.client.delete.assert_called_with(gkey)
+        b.client.expire.assert_called_witeh(gkey, 86400)
 
     def test_process_cleanup(self):
-        self.Backend(app=self.app).process_cleanup()
+        self.Backend(app=self.app, new_join=True).process_cleanup()
 
     def test_get_set_forget(self):
-        b = self.Backend(app=self.app)
+        b = self.Backend(app=self.app, new_join=True)
         tid = uuid()
         b.store_result(tid, 42, states.SUCCESS)
         self.assertEqual(b.get_status(tid), states.SUCCESS)
@@ -240,8 +266,10 @@ class test_RedisBackend(AppCase):
         self.assertEqual(b.get_status(tid), states.PENDING)
 
     def test_set_expires(self):
-        b = self.Backend(expires=512, app=self.app)
+        b = self.Backend(expires=512, app=self.app, new_join=True)
         tid = uuid()
         key = b.get_key_for_task(tid)
         b.store_result(tid, 42, states.SUCCESS)
-        self.assertEqual(b.client.expiry[key], 512)
+        b.client.expire.assert_called_with(
+            key, 512,
+        )

+ 5 - 5
celery/tests/bin/test_base.py

@@ -241,21 +241,21 @@ class test_Command(AppCase):
         with self.assertRaises(AttributeError):
             cmd.find_app(__name__)
 
-    def test_simple_format(self):
+    def test_host_format(self):
         cmd = MockCommand(app=self.app)
         with patch('socket.gethostname') as hn:
             hn.return_value = 'blacktron.example.com'
-            self.assertEqual(cmd.simple_format(''), '')
+            self.assertEqual(cmd.host_format(''), '')
             self.assertEqual(
-                cmd.simple_format('celery@%h'),
+                cmd.host_format('celery@%h'),
                 'celery@blacktron.example.com',
             )
             self.assertEqual(
-                cmd.simple_format('celery@%d'),
+                cmd.host_format('celery@%d'),
                 'celery@example.com',
             )
             self.assertEqual(
-                cmd.simple_format('celery@%n'),
+                cmd.host_format('celery@%n'),
                 'celery@blacktron',
             )
 

+ 57 - 11
celery/tests/case.py

@@ -11,12 +11,14 @@ except AttributeError:
 import importlib
 import inspect
 import logging
+import numbers
 import os
 import platform
 import re
 import sys
 import threading
 import time
+import types
 import warnings
 
 from contextlib import contextmanager
@@ -45,12 +47,6 @@ from celery.five import (
 from celery.utils.functional import noop
 from celery.utils.imports import qualname
 
-try:  # pragma: no cover
-    from django.utils.six import MovedModule
-except ImportError:  # pragma: no cover
-    class MovedModule(object):  # noqa
-        pass
-
 __all__ = [
     'Case', 'AppCase', 'Mock', 'MagicMock', 'ANY',
     'patch', 'call', 'sentinel', 'skip_unless_module',
@@ -68,6 +64,8 @@ sentinel = mock.sentinel
 MagicMock = mock.MagicMock
 ANY = mock.ANY
 
+PY3 = sys.version_info[0] == 3
+
 CASE_REDEFINES_SETUP = """\
 {name} (subclass of AppCase) redefines private "setUp", should be: "setup"\
 """
@@ -170,6 +168,35 @@ def ContextMock(*args, **kwargs):
     return obj
 
 
+def _bind(f, o):
+    @wraps(f)
+    def bound_meth(*fargs, **fkwargs):
+        return f(o, *fargs, **fkwargs)
+    return bound_meth
+
+
+if PY3:  # pragma: no cover
+    def _get_class_fun(meth):
+        return meth
+else:
+    def _get_class_fun(meth):
+        return meth.__func__
+
+
+class MockCallbacks(object):
+
+    def __new__(cls, *args, **kwargs):
+        r = Mock(name=cls.__name__)
+        _get_class_fun(cls.__init__)(r, *args, **kwargs)
+        for key, value in items(vars(cls)):
+            if key not in ('__dict__', '__weakref__', '__new__', '__init__'):
+                if inspect.ismethod(value) or inspect.isfunction(value):
+                    r.__getattr__(key).side_effect = _bind(value, r)
+                else:
+                    r.__setattr__(key, value)
+        return r
+
+
 def skip_unless_module(module):
 
     def _inner(fun):
@@ -201,6 +228,18 @@ class _AssertRaisesBaseContext(object):
         self.expected_regex = expected_regex
 
 
+def _is_magic_module(m):
+    # some libraries create custom module types that are lazily
+    # lodaded, e.g. Django installs some modules in sys.modules that
+    # will load _tkinter and other shit when touched.
+
+    # pyflakes refuses to accept 'noqa' for this isinstance.
+    cls, modtype = m.__class__, types.ModuleType
+    return (not cls is modtype and (
+        '__getattr__' in vars(m.__class__) or
+        '__getattribute__' in vars(m.__class__)))
+
+
 class _AssertWarnsContext(_AssertRaisesBaseContext):
     """A context manager used to implement TestCase.assertWarns* methods."""
 
@@ -209,10 +248,17 @@ class _AssertWarnsContext(_AssertRaisesBaseContext):
         # to work properly.
         warnings.resetwarnings()
         for v in list(values(sys.modules)):
-            # do not evaluate Django moved modules:
-            if not isinstance(v, MovedModule):
-                if getattr(v, '__warningregistry__', None):
-                    v.__warningregistry__ = {}
+            # do not evaluate Django moved modules and other lazily
+            # initialized modules.
+            if v and not _is_magic_module(v):
+                # use raw __getattribute__ to protect even better from
+                # lazily loaded modules
+                try:
+                    object.__getattribute__(v, '__warningregistry__')
+                except AttributeError:
+                    pass
+                else:
+                    object.__setattr__(v, '__warningregistry__', {})
         self.warnings_manager = warnings.catch_warnings(record=True)
         self.warnings = self.warnings_manager.__enter__()
         warnings.simplefilter('always', self.expected)
@@ -782,7 +828,7 @@ def body_from_sig(app, sig, utc=True):
     if eta and isinstance(eta, datetime):
         eta = eta.isoformat()
     expires = sig.options.pop('expires', None)
-    if expires and isinstance(expires, int):
+    if expires and isinstance(expires, numbers.Real):
         expires = app.now() + timedelta(seconds=expires)
     if expires and isinstance(expires, datetime):
         expires = expires.isoformat()

+ 1 - 1
celery/tests/security/test_certificate.py

@@ -16,7 +16,7 @@ class test_Certificate(SecurityCase):
         Certificate(CERT2)
 
     def test_invalid_certificate(self):
-        self.assertRaises(TypeError, Certificate, None)
+        self.assertRaises((SecurityError, TypeError), Certificate, None)
         self.assertRaises(SecurityError, Certificate, '')
         self.assertRaises(SecurityError, Certificate, 'foo')
         self.assertRaises(SecurityError, Certificate, CERT1[:20] + CERT1[21:])

+ 1 - 1
celery/tests/security/test_key.py

@@ -14,7 +14,7 @@ class test_PrivateKey(SecurityCase):
         PrivateKey(KEY2)
 
     def test_invalid_private_key(self):
-        self.assertRaises(TypeError, PrivateKey, None)
+        self.assertRaises((SecurityError, TypeError), PrivateKey, None)
         self.assertRaises(SecurityError, PrivateKey, '')
         self.assertRaises(SecurityError, PrivateKey, 'foo')
         self.assertRaises(SecurityError, PrivateKey, KEY1[:20] + KEY1[21:])

+ 6 - 7
celery/tests/tasks/test_result.py

@@ -66,15 +66,15 @@ class test_AsyncResult(AppCase):
     def test_children(self):
         x = self.app.AsyncResult('1')
         children = [EagerResult(str(i), i, states.SUCCESS) for i in range(3)]
+        x._cache = {'children': children, 'status': states.SUCCESS}
         x.backend = Mock()
-        x.backend.get_children.return_value = children
-        x.backend.READY_STATES = states.READY_STATES
         self.assertTrue(x.children)
         self.assertEqual(len(x.children), 3)
 
     def test_propagates_for_parent(self):
         x = self.app.AsyncResult(uuid())
         x.backend = Mock()
+        x.backend.get_task_meta.return_value = {}
         x.parent = EagerResult(uuid(), KeyError('foo'), states.FAILURE)
         with self.assertRaises(KeyError):
             x.get(propagate=True)
@@ -89,10 +89,11 @@ class test_AsyncResult(AppCase):
         x = self.app.AsyncResult(tid)
         child = [self.app.AsyncResult(uuid()).as_tuple()
                  for i in range(10)]
-        x.backend._cache[tid] = {'children': child}
+        x._cache = {'children': child}
         self.assertTrue(x.children)
         self.assertEqual(len(x.children), 10)
 
+        x._cache = {'status': states.SUCCESS}
         x.backend._cache[tid] = {'result': None}
         self.assertIsNone(x.children)
 
@@ -122,13 +123,11 @@ class test_AsyncResult(AppCase):
 
     def test_iterdeps(self):
         x = self.app.AsyncResult('1')
-        x.backend._cache['1'] = {'status': states.SUCCESS, 'result': None}
         c = [EagerResult(str(i), i, states.SUCCESS) for i in range(3)]
+        x._cache = {'status': states.SUCCESS, 'result': None, 'children': c}
         for child in c:
             child.backend = Mock()
             child.backend.get_children.return_value = []
-        x.backend.get_children = Mock()
-        x.backend.get_children.return_value = c
         it = x.iterdeps()
         self.assertListEqual(list(it), [
             (None, x),
@@ -136,7 +135,7 @@ class test_AsyncResult(AppCase):
             (x, c[1]),
             (x, c[2]),
         ])
-        x.backend._cache.pop('1')
+        x._cache = None
         x.ready = Mock()
         x.ready.return_value = False
         with self.assertRaises(IncompleteStream):

+ 1 - 1
celery/tests/tasks/test_trace.py

@@ -101,7 +101,7 @@ class test_trace(TraceCase):
         add.backend = Mock()
 
         self.trace(add, (2, 2), {}, request={'chord': uuid()})
-        add.backend.on_chord_part_return.assert_called_with(add)
+        add.backend.on_chord_part_return.assert_called_with(add, 'SUCCESS', 4)
 
     def test_when_backend_cleanup_raises(self):
 

+ 6 - 6
celery/tests/utils/test_functional.py

@@ -72,21 +72,21 @@ class test_LRUCache(Case):
 
             def __init__(self, cache):
                 self.cache = cache
-                self._is_shutdown = Event()
-                self._is_stopped = Event()
+                self.__is_shutdown = Event()
+                self.__is_stopped = Event()
                 Thread.__init__(self)
 
             def run(self):
-                while not self._is_shutdown.isSet():
+                while not self.__is_shutdown.isSet():
                     try:
                         self.cache.data.popitem(last=False)
                     except KeyError:
                         break
-                self._is_stopped.set()
+                self.__is_stopped.set()
 
             def stop(self):
-                self._is_shutdown.set()
-                self._is_stopped.wait()
+                self.__is_shutdown.set()
+                self.__is_stopped.wait()
                 self.join(THREAD_TIMEOUT_MAX)
 
         burglar = Burglar(x)

+ 26 - 2
celery/tests/utils/test_local.py

@@ -170,10 +170,10 @@ class test_Proxy(Case):
         class O(object):
 
             def __complex__(self):
-                return 10.333
+                return complex(10.333)
 
         o = Proxy(O)
-        self.assertEqual(o.__complex__(), 10.333)
+        self.assertEqual(o.__complex__(), complex(10.333))
 
     def test_index(self):
 
@@ -329,6 +329,30 @@ class test_PromiseProxy(Case):
         self.assertEqual(p.attr, 123)
         self.assertEqual(X.evals, 1)
 
+    def test_callbacks(self):
+        source = Mock(name='source')
+        p = PromiseProxy(source)
+        cbA = Mock(name='cbA')
+        cbB = Mock(name='cbB')
+        cbC = Mock(name='cbC')
+        p.__then__(cbA, p)
+        p.__then__(cbB, p)
+        self.assertFalse(p.__evaluated__())
+        self.assertTrue(object.__getattribute__(p, '__pending__'))
+
+        self.assertTrue(repr(p))
+        with self.assertRaises(AttributeError):
+            object.__getattribute__(p, '__pending__')
+        cbA.assert_called_with(p)
+        cbB.assert_called_with(p)
+
+        self.assertTrue(p.__evaluated__())
+        p.__then__(cbC, p)
+        cbC.assert_called_with(p)
+
+        with self.assertRaises(AttributeError):
+            object.__getattribute__(p, '__pending__')
+
     def test_maybe_evaluate(self):
         x = PromiseProxy(lambda: 30)
         self.assertFalse(x.__evaluated__())

+ 1 - 0
celery/tests/worker/test_consumer.py

@@ -439,6 +439,7 @@ class test_Gossip(AppCase):
         c.app.events.State.assert_called_with(
             on_node_join=g.on_node_join,
             on_node_leave=g.on_node_leave,
+            max_tasks_in_memory=1,
         )
         g.update_state = Mock()
         worker = Mock()

+ 5 - 1
celery/tests/worker/test_hub.py

@@ -53,8 +53,12 @@ class test_LaxBoundedSemaphore(Case):
         self.assertEqual(x.value, 0)
         self.assertFalse(c3.called)
 
+        x.release()
+        self.assertEqual(x.value, 0)
         x.release()
         self.assertEqual(x.value, 1)
+        x.release()
+        self.assertEqual(x.value, 2)
         c3.assert_called_with(3)
 
     def test_bounded(self):
@@ -83,7 +87,7 @@ class test_LaxBoundedSemaphore(Case):
         x.grow(2)
         cb2.assert_called_with(2)
         cb3.assert_called_with(3)
-        self.assertEqual(x.value, 3)
+        self.assertEqual(x.value, 2)
         self.assertEqual(x.initial_value, 3)
 
         self.assertFalse(x._waiting)

+ 27 - 1
celery/tests/worker/test_worker.py

@@ -1,4 +1,4 @@
-from __future__ import absolute_import
+from __future__ import absolute_import, print_function
 
 import os
 import socket
@@ -516,30 +516,56 @@ class test_Consumer(AppCase):
         self.assertTrue(logger.critical.call_count)
 
     def test_receive_message_eta(self):
+        import sys
+        from functools import partial
+        if os.environ.get('C_DEBUG_TEST'):
+            pp = partial(print, file=sys.__stderr__)
+        else:
+            def pp(*args, **kwargs):
+                pass
+        pp('TEST RECEIVE MESSAGE ETA')
+        pp('+CREATE MYKOMBUCONSUMER')
         l = _MyKombuConsumer(self.buffer.put, timer=self.timer, app=self.app)
+        pp('-CREATE MYKOMBUCONSUMER')
         l.steps.pop()
         l.event_dispatcher = mock_event_dispatcher()
         backend = Mock()
+        pp('+ CREATE MESSAGE')
         m = create_message(
             backend, task=self.foo_task.name,
             args=[2, 4, 8], kwargs={},
             eta=(datetime.now() + timedelta(days=1)).isoformat(),
         )
+        pp('- CREATE MESSAGE')
 
         try:
+            pp('+ BLUEPRINT START 1')
             l.blueprint.start(l)
+            pp('- BLUEPRINT START 1')
             p = l.app.conf.BROKER_CONNECTION_RETRY
             l.app.conf.BROKER_CONNECTION_RETRY = False
+            pp('+ BLUEPRINT START 2')
             l.blueprint.start(l)
+            pp('- BLUEPRINT START 2')
             l.app.conf.BROKER_CONNECTION_RETRY = p
+            pp('+ BLUEPRINT RESTART')
             l.blueprint.restart(l)
+            pp('- BLUEPRINT RESTART')
             l.event_dispatcher = mock_event_dispatcher()
+            pp('+ GET ON MESSAGE')
             callback = self._get_on_message(l)
+            pp('- GET ON MESSAGE')
+            pp('+ CALLBACK')
             callback(m.decode(), m)
+            pp('- CALLBACK')
         finally:
+            pp('+ STOP TIMER')
             l.timer.stop()
+            pp('- STOP TIMER')
             try:
+                pp('+ JOIN TIMER')
                 l.timer.join()
+                pp('- JOIN TIMER')
             except RuntimeError:
                 pass
 

+ 40 - 10
celery/utils/__init__.py

@@ -8,15 +8,18 @@
 """
 from __future__ import absolute_import, print_function
 
+import numbers
 import os
+import re
 import socket
 import sys
 import traceback
 import warnings
 import datetime
 
+from collections import Callable
 from functools import partial, wraps
-from inspect import getargspec, ismethod
+from inspect import getargspec
 from pprint import pprint
 
 from kombu.entity import Exchange, Queue
@@ -59,6 +62,7 @@ WORKER_DIRECT_QUEUE_FORMAT = '{hostname}.dq'
 NODENAME_SEP = '@'
 
 NODENAME_DEFAULT = 'celery'
+RE_FORMAT = re.compile(r'%(\w)')
 
 
 def worker_direct(hostname):
@@ -255,7 +259,7 @@ def strtobool(term, table={'false': False, 'no': False, '0': False,
 
 
 def jsonify(obj,
-            builtin_types=(int, float, string_t), key=None,
+            builtin_types=(numbers.Real, string_t), key=None,
             keyfilter=None,
             unknown_type_filter=None):
     """Transforms object making it suitable for json serialization"""
@@ -344,15 +348,41 @@ def default_nodename(hostname):
     return nodename(name or NODENAME_DEFAULT, host or socket.gethostname())
 
 
-def shadowsig(wrapper, wrapped):
-    if ismethod(wrapped):
-        wrapped = wrapped.__func__
-    wrapper.__code__ = wrapped.__code__
-    wrapper.__defaults__ = wrapper.func_defaults = wrapped.__defaults__
+def node_format(s, nodename, **extra):
+    name, host = nodesplit(nodename)
+    return host_format(
+        s, host, n=name or NODENAME_DEFAULT, **extra)
 
-    if not PY3:
-        wrapper.func_code = wrapper.__code__
-        wrapper.func_defaults = wrapper.__defaults__
+
+def _fmt_process_index(prefix='', default='0'):
+    from .log import current_process_index
+    index = current_process_index()
+    return '{0}{1}'.format(prefix, index) if index else default
+_fmt_process_index_with_prefix = partial(_fmt_process_index, '-', '')
+
+
+def host_format(s, host=None, **extra):
+    host = host or socket.gethostname()
+    name, _, domain = host.partition('.')
+    keys = dict({
+        'h': host, 'n': name, 'd': domain,
+        'i': _fmt_process_index, 'I': _fmt_process_index_with_prefix,
+    }, **extra)
+    return simple_format(s, keys)
+
+
+def simple_format(s, keys, pattern=RE_FORMAT, expand=r'\1'):
+    if s:
+        keys.setdefault('%', '%')
+
+        def resolve(match):
+            resolver = keys[match.expand(expand)]
+            if isinstance(resolver, Callable):
+                return resolver()
+            return resolver
+
+        return pattern.sub(resolve, s)
+    return s
 
 
 # ------------------------------------------------------------------------ #

+ 5 - 3
celery/utils/debug.py

@@ -6,7 +6,7 @@
     Utilities for debugging memory usage.
 
 """
-from __future__ import absolute_import, print_function
+from __future__ import absolute_import, print_function, unicode_literals
 
 import os
 
@@ -41,8 +41,10 @@ _mem_sample = []
 def _on_blocking(signum, frame):
     import inspect
     raise RuntimeError(
-        'Blocking detection timed-out at: %s' % (
-            inspect.getframeinfo(frame), ))
+        'Blocking detection timed-out at: {0}'.format(
+            inspect.getframeinfo(frame)
+        )
+    )
 
 
 @contextmanager

+ 16 - 0
celery/utils/dispatch/signal.py

@@ -4,7 +4,9 @@ from __future__ import absolute_import
 
 import weakref
 from . import saferef
+
 from celery.five import range
+from celery.local import PromiseProxy, Proxy
 
 __all__ = ['Signal']
 
@@ -12,6 +14,8 @@ WEAKREF_TYPES = (weakref.ReferenceType, saferef.BoundMethodWeakref)
 
 
 def _make_id(target):  # pragma: no cover
+    if isinstance(target, Proxy):
+        target = target._get_current_object()
     if hasattr(target, '__func__'):
         return (id(target.__self__), id(target.__func__))
     return id(target)
@@ -39,6 +43,12 @@ class Signal(object):  # pragma: no cover
             providing_args = []
         self.providing_args = set(providing_args)
 
+    def _connect_proxy(self, fun, sender, weak, dispatch_uid):
+        return self.connect(
+            fun, sender=sender._get_current_object(),
+            weak=weak, dispatch_uid=dispatch_uid,
+        )
+
     def connect(self, *args, **kwargs):
         """Connect receiver to sender for signal.
 
@@ -74,6 +84,12 @@ class Signal(object):  # pragma: no cover
             def _connect_signal(fun):
                 receiver = fun
 
+                if isinstance(sender, PromiseProxy):
+                    sender.__then__(
+                        self._connect_proxy, fun, sender, weak, dispatch_uid,
+                    )
+                    return fun
+
                 if dispatch_uid:
                     lookup_key = (dispatch_uid, _make_id(sender))
                 else:

+ 5 - 2
celery/utils/functional.py

@@ -121,7 +121,7 @@ class LRUCache(UserDict):
             return list(self._iterate_items())
 
 
-def memoize(maxsize=None, Cache=LRUCache):
+def memoize(maxsize=None, keyfun=None, Cache=LRUCache):
 
     def _memoize(fun):
         mutex = threading.Lock()
@@ -129,7 +129,10 @@ def memoize(maxsize=None, Cache=LRUCache):
 
         @wraps(fun)
         def _M(*args, **kwargs):
-            key = args + (KEYWORD_MARK, ) + tuple(sorted(kwargs.items()))
+            if keyfun:
+                key = keyfun(args, kwargs)
+            else:
+                key = args + (KEYWORD_MARK, ) + tuple(sorted(kwargs.items()))
             try:
                 with mutex:
                     value = cache[key]

+ 2 - 3
celery/utils/iso8601.py

@@ -69,10 +69,9 @@ def parse_iso8601(datestring):
             hours = -hours
             minutes = -minutes
         tz = FixedOffset(minutes + hours * 60)
-    frac = groups['fraction']
-    groups['fraction'] = int(float('0.%s' % frac) * 1e6) if frac else 0
+    frac = groups['fraction'] or 0
     return datetime(
         int(groups['year']), int(groups['month']), int(groups['day']),
         int(groups['hour']), int(groups['minute']), int(groups['second']),
-        int(groups['fraction']), tz
+        int(frac), tz
     )

+ 8 - 1
celery/utils/log.py

@@ -9,6 +9,7 @@
 from __future__ import absolute_import, print_function
 
 import logging
+import numbers
 import os
 import sys
 import threading
@@ -110,7 +111,7 @@ def get_task_logger(name):
 
 
 def mlevel(level):
-    if level and not isinstance(level, int):
+    if level and not isinstance(level, numbers.Integral):
         return LOG_LEVELS[level.upper()]
     return level
 
@@ -281,4 +282,10 @@ def get_multiprocessing_logger():
 def reset_multiprocessing_logger():
     if mputil and hasattr(mputil, '_logger'):
         mputil._logger = None
+
+
+def current_process_index(base=1):
+    if current_process:
+        index = getattr(current_process(), 'index', None)
+        return index + base if index is not None else index
 ensure_process_aware_logger()

+ 2 - 1
celery/utils/timeutils.py

@@ -8,6 +8,7 @@
 """
 from __future__ import absolute_import
 
+import numbers
 import os
 import time as _time
 
@@ -134,7 +135,7 @@ timezone = _Zone()
 
 def maybe_timedelta(delta):
     """Coerces integer to timedelta if `delta` is an integer."""
-    if isinstance(delta, (int, float)):
+    if isinstance(delta, numbers.Real):
         return timedelta(seconds=delta)
     return delta
 

+ 5 - 0
celery/worker/consumer.py

@@ -658,6 +658,7 @@ class Gossip(bootsteps.ConsumerStep):
             self.state = c.app.events.State(
                 on_node_join=self.on_node_join,
                 on_node_leave=self.on_node_leave,
+                max_tasks_in_memory=1,
             )
             if c.hub:
                 c._mutex = DummyLock()
@@ -771,6 +772,10 @@ class Gossip(bootsteps.ConsumerStep):
 
     def on_message(self, prepare, message):
         _type = message.delivery_info['routing_key']
+
+        # For redis when `fanout_patterns=False` (See Issue #1882)
+        if _type.split('.', 1)[0] == 'task':
+            return
         try:
             handler = self.event_handlers[_type]
         except KeyError:

+ 1 - 1
celery/worker/heartbeat.py

@@ -22,7 +22,7 @@ class Heart(object):
     :param timer: Timer instance.
     :param eventer: Event dispatcher used to send the event.
     :keyword interval: Time in seconds between heartbeats.
-                       Default is 30 seconds.
+                       Default is 2 seconds.
 
     """
 

+ 2 - 2
celery/worker/job.py

@@ -288,8 +288,8 @@ class Request(object):
                         'hostname': self.hostname, 'is_eager': False,
                         'delivery_info': self.delivery_info})
         retval = trace_task(self.task, self.id, self.args, kwargs, request,
-                            **{'hostname': self.hostname,
-                               'loader': self.app.loader})
+                            hostname=self.hostname, loader=self.app.loader,
+                            app=self.app)
         self.acknowledge()
         return retval
 

+ 1 - 0
celery/worker/pidbox.py

@@ -44,6 +44,7 @@ class Pidbox(object):
     def start(self, c):
         self.node.channel = c.connection.channel()
         self.consumer = self.node.listen(callback=self.on_message)
+        self.consumer.on_decode_error = c.on_decode_error
 
     def on_stop(self):
         pass

+ 13 - 7
docs/configuration.rst

@@ -1142,12 +1142,8 @@ CELERY_MAX_CACHED_RESULTS
 Result backends caches ready results used by the client.
 
 This is the total number of results to cache before older results are evicted.
-The default is 5000.  0 or None means no limit.
-
-.. note::
-    
-    These results are kept in memory.  Reduce this setting if your Celery is utilizing
-    a large amount of memory.
+The default is 5000.  0 or None means no limit, and a value of :const:`-1`
+will disable the cache.
 
 .. setting:: CELERY_CHORD_PROPAGATES
 
@@ -1816,6 +1812,17 @@ suffix `.db` may be appended to the file name (depending on Python version).
 Can also be set via the :option:`--schedule` argument to
 :mod:`~celery.bin.beat`.
 
+.. setting:: CELERYBEAT_SYNC_EVERY
+
+CELERYBEAT_SYNC_EVERY
+~~~~~~~~~~~~~~~~~~~~~
+
+The number of periodic tasks that can be called before another database sync
+is issued.
+Defaults to 0 (sync based on timing - default of 3 minutes as determined by
+scheduler.sync_every). If set to 1, beat will call sync after every task
+message sent.
+
 .. setting:: CELERYBEAT_MAX_LOOP_INTERVAL
 
 CELERYBEAT_MAX_LOOP_INTERVAL
@@ -1824,7 +1831,6 @@ CELERYBEAT_MAX_LOOP_INTERVAL
 The maximum number of seconds :mod:`~celery.bin.beat` can sleep
 between checking the schedule.
 
-
 The default for this value is scheduler specific.
 For the default celery beat scheduler the value is 300 (5 minutes),
 but for e.g. the django-celery database scheduler it is 5 seconds

+ 34 - 11
docs/contributing.rst

@@ -207,8 +207,8 @@ spelling or other errors on the website/docs/code.
        get more diagnostic data. Some ideas:
 
        * Enable celery's :ref:`breakpoint signal <breakpoint_signal>` and use it
-         to inspect the process's state. This will allow you to open a :mod:`pdb`
-         session.
+         to inspect the process's state.  This will allow you to open a
+         :mod:`pdb` session.
        * Collect tracing data using strace_(Linux), dtruss (OSX) and ktrace(BSD),
          ltrace_ and lsof_.
 
@@ -412,13 +412,7 @@ to upstream changes:
 .. code-block:: bash
 
     $ cd celery
-
-.. code-block:: bash
-
     $ git remote add upstream git://github.com/celery/celery.git
-
-.. code-block:: bash
-
     $ git fetch upstream
 
 If you need to pull in new changes from upstream you should
@@ -438,8 +432,6 @@ fetch and checkout a remote branch like this::
 
     git checkout --track -b 3.0-devel origin/3.0-devel
 
-For a list of branches see :ref:`git-branches`.
-
 .. _`Fork a Repo`: http://help.github.com/fork-a-repo/
 .. _`Rebasing merge commits in git`:
     http://notes.envato.com/developers/rebasing-merge-commits-in-git/
@@ -514,6 +506,14 @@ the steps outlined here: http://bit.ly/koJoso
 Calculating test coverage
 ~~~~~~~~~~~~~~~~~~~~~~~~~
 
+To calculate test coverage you must first install the :mod:`coverage` module.
+
+Installing the :mod:`coverage` module:
+
+.. code-block:: bash
+
+    $ pip install -U coverage
+
 Code coverage in HTML:
 
 .. code-block:: bash
@@ -625,11 +625,16 @@ it should be located in :file:`docs/reference/`.
 For example if reference is missing for the module ``celery.worker.awesome``
 and this module is considered part of the public API, use the following steps:
 
+
+Use an existing file as a template:
+
 .. code-block:: bash
 
     $ cd docs/reference/
     $ cp celery.schedules.rst celery.worker.awesome.rst
 
+Edit the file using your favorite editor:
+
 .. code-block:: bash
 
     $ vim celery.worker.awesome.rst
@@ -637,12 +642,18 @@ and this module is considered part of the public API, use the following steps:
         # change every occurance of ``celery.schedules`` to
         # ``celery.worker.awesome``
 
+
+Edit the index using your favorite editor:
+
 .. code-block:: bash
 
     $ vim index.rst
 
         # Add ``celery.worker.awesome`` to the index.
 
+
+Commit your changes:
+
 .. code-block:: bash
 
     # Add the file to git
@@ -776,7 +787,9 @@ is following the conventions.
 * Note that we use "new-style` relative imports when the distribution
   does not support Python versions below 2.5
 
-.. code-block:: python
+    This requires Python 2.5 or later:
+
+    .. code-block:: python
 
         from . import submodule
 
@@ -916,6 +929,16 @@ Messaging library.
 :PyPI: http://pypi.python.org/pypi/kombu
 :docs: http://kombu.readthedocs.org
 
+amqp
+----
+
+Python AMQP 0.9.1 client.
+
+:git: https://github.com/celery/py-amqp
+:CI: http://travis-ci.org/#!/celery/py-amqp
+:PyPI: http://pypi.python.org/pypi/amqp
+:docs: http://amqp.readthedocs.org
+
 billiard
 --------
 

+ 1 - 1
docs/django/first-steps-with-django.rst

@@ -49,7 +49,7 @@ both the app and tasks, like in the :ref:`tut-celery` tutorial.
 
 Let's break down what happens in the first module,
 first we import absolute imports from the future, so that our
-``celery.py`` module will not crash with the library:
+``celery.py`` module will not clash with the library:
 
 .. code-block:: python
 

+ 17 - 0
docs/getting-started/brokers/redis.rst

@@ -67,6 +67,8 @@ For a complete list of options supported by the Redis result backend, see
 Caveats
 =======
 
+.. _redis-caveat-fanout-prefix:
+
 - Broadcast messages will be seen by all virtual hosts by default.
 
     You have to set a transport option to prefix the messages so that
@@ -80,6 +82,21 @@ Caveats
     This setting will be the default in the future, so better to migrate
     sooner rather than later.
 
+.. _redis-caveat-fanout-patterns:
+
+- Workers will receive all task related events by default.
+
+    To avoid this you must set the ``fanout_patterns`` fanout option so that
+    the workers may only subscribe to worker related events::
+
+        BROKER_TRANSPORT_OPTIONS = {'fanout_patterns': True}
+
+    Note that this change is backward incompatible so all workers in the
+    cluster must have this option enabled, or else they will not be able to
+    communicate.
+
+    This option will be enabled by default in the future.
+
 - If a task is not acknowledged within the :ref:`redis-visibility_timeout`
   the task will be redelivered to another worker and executed.
 

+ 1 - 1
docs/getting-started/first-steps-with-celery.rst

@@ -177,7 +177,7 @@ For a complete listing of the command-line options available, do:
 
     $  celery worker --help
 
-There also several other commands available, and help is also available:
+There are also several other commands available, and help is also available:
 
 .. code-block:: bash
 

+ 4 - 0
docs/reference/celery.rst

@@ -382,6 +382,10 @@ and creating Celery applications.
         Finalizes the app by loading built-in tasks,
         and evaluating pending task decorators
 
+    .. method:: Celery.on_configure()
+
+        Optional callback for when the first time the configured is required.
+
     .. attribute:: Celery.Pickler
 
         Helper class used to pickle this application.

+ 1 - 1
docs/userguide/tasks.rst

@@ -902,7 +902,7 @@ Example that stores results manually:
     @app.task(bind=True)
     def get_tweets(self, user):
         timeline = twitter.get_timeline(user)
-        self.update_state(sate=states.SUCCESS, meta=timeline)
+        self.update_state(state=states.SUCCESS, meta=timeline)
         raise Ignore()
 
 .. _task-semipred-reject:

+ 58 - 0
docs/userguide/workers.rst

@@ -134,6 +134,62 @@ The worker's main process overrides the following signals:
 | :sig:`USR2`  | Remote debug, see :mod:`celery.contrib.rdb`.    |
 +--------------+-------------------------------------------------+
 
+.. _worker-files:
+
+Variables in file paths
+=======================
+
+The file path arguments for :option:`--logfile`, :option:`--pidfile` and :option:`--statedb`
+can contain variables that the worker will expand:
+
+Node name replacements
+----------------------
+
+- ``%h``:  Hostname including domain name.
+- ``%n``:  Hostname only.
+- ``%d``:  Domain name only.
+- ``%i``:  Prefork pool process index or 0 if MainProcess.
+- ``%I``:  Prefork pool process index with separator.
+
+E.g. if the current hostname is ``george.example.com`` then
+these will expand to:
+
+- ``--logfile=%h.log`` -> :file:`george.example.com.log`
+- ``--logfile=%n.log`` -> :file:`george.log`
+- ``--logfile=%d`` -> :file:`example.com.log`
+
+.. _worker-files-process-index:
+
+Prefork pool process index
+--------------------------
+
+The prefork pool process index specifiers will expand into a different
+filename depending on the process that will eventually need to open the file.
+
+This can be used to specify one log file per child process.
+
+Note that the numbers will stay within the process limit even if processes
+exit or if autoscale/maxtasksperchild/time limits are used.  I.e. the number
+is the *process index* not the process count or pid.
+
+* ``%i`` - Pool process index or 0 if MainProcess.
+
+    Where ``-n worker1@example.com -c2 -f %n-%i.log`` will result in
+    three log files:
+
+        - :file:`worker1-0.log` (main process)
+        - :file:`worker1-1.log` (pool process 1)
+        - :file:`worker1-2.log` (pool process 2)
+
+* ``%I`` - Pool process index with separator.
+
+    Where ``-n worker1@example.com -c2 -f %n%I.log`` will result in
+    three log files:
+
+        - :file:`worker1.log` (main process)
+        - :file:`worker1-1.log`` (pool process 1)
+        - :file:`worker1-2.log`` (pool process 2)
+
 .. _worker-concurrency:
 
 Concurrency
@@ -334,6 +390,8 @@ name:
     celery multi start 2 -l info --statedb=/var/run/celery/%n.state
 
 
+See also :ref:`worker-files`
+
 Note that remote control commands must be working for revokes to work.
 Remote control commands are only supported by the RabbitMQ (amqp) and Redis
 at this point.

+ 6 - 2
examples/app/myapp.py

@@ -24,8 +24,12 @@ name using the fully qualified form::
 """
 from celery import Celery
 
-app = Celery('myapp', broker='amqp://guest@localhost//')
-
+app = Celery(
+    'myapp',
+    broker='amqp://guest@localhost//',
+    # add result backend here if needed.
+    #backend='rpc'
+)
 
 @app.task()
 def add(x, y):

+ 20 - 17
examples/celery_http_gateway/settings.py

@@ -1,5 +1,7 @@
 # Django settings for celery_http_gateway project.
 
+import django
+
 DEBUG = True
 TEMPLATE_DEBUG = DEBUG
 
@@ -13,23 +15,24 @@ ADMINS = (
 
 MANAGERS = ADMINS
 
-# 'postgresql_psycopg2', 'postgresql', 'mysql', 'sqlite3' or 'oracle'.
-DATABASE_ENGINE = 'sqlite3'
-
-# path to database file if using sqlite3.
-DATABASE_NAME = 'development.db'
-
-# Not used with sqlite3.
-DATABASE_USER = ''
-
-# Not used with sqlite3.
-DATABASE_PASSWORD = ''
-
-# Set to empty string for localhost. Not used with sqlite3.
-DATABASE_HOST = ''
-
-# Set to empty string for default. Not used with sqlite3.
-DATABASE_PORT = ''
+DATABASES = {
+    'default': {
+        'ENGINE': 'django.db.backends.sqlite3',
+        'NAME': 'development.db',
+        'USER': '',
+        'PASSWORD': '',
+        'HOST': '',
+        'PORT': '',
+    }
+}
+
+if django.VERSION[:3] < (1, 3):
+    DATABASE_ENGINE = DATABASES['default']['ENGINE']
+    DATABASE_NAME = DATABASES['default']['NAME']
+    DATABASE_USER = DATABASES['default']['USER']
+    DATABASE_PASSWORD = DATABASES['default']['PASSWORD']
+    DATABASE_HOST = DATABASES['default']['HOST']
+    DATABASE_PORT = DATABASES['default']['PORT']
 
 # Local time zone for this installation. Choices can be found here:
 # http://en.wikipedia.org/wiki/List_of_tz_zones_by_name

+ 104 - 14
extra/release/sphinx-to-rst.py

@@ -1,22 +1,55 @@
 #!/usr/bin/env python
+# -*- coding: utf-8 -*-
+from __future__ import print_function, unicode_literals
+
+import codecs
 import os
 import re
 import sys
 
 from collections import Callable
+from functools import partial
+
+SAY = partial(print, file=sys.stderr)
+
+dirname = ''
+
+RE_CODE_BLOCK = re.compile(r'(\s*).. code-block:: (.+?)\s*$')
+RE_INCLUDE = re.compile(r'\s*.. include:: (.+?)\s*$')
+RE_REFERENCE = re.compile(r':(\w+):`(.+?)`')
+RE_NAMED_REF = re.compile('(.+?)\<(.+)\>')
+UNITABLE = {
+    '…': '...',
+    '“': '"',
+    '”': '"',
+}
+X = re.compile(re.escape('…'))
+HEADER = re.compile('^[\=\~\-]+$')
+UNIRE = re.compile('|'.join(re.escape(p) for p in UNITABLE),
+                   re.UNICODE)
+REFBASE = 'http://docs.celeryproject.org/en/latest'
+REFS = {
+    'mailing-list':
+        'http://groups.google.com/group/celery-users',
+    'irc-channel': 'getting-started/resources.html#irc',
+    'breakpoint-signal': 'tutorials/debugging.html',
+    'internals-guide': 'internals/guide.html',
+    'bundles': 'getting-started/introduction.html#bundles',
+    'reporting-bugs': 'contributing.html#reporting-bugs',
+}
 
-dirname = ""
+pending_refs = {}
 
-RE_CODE_BLOCK = re.compile(r'.. code-block:: (.+?)\s*$')
-RE_INCLUDE = re.compile(r'.. include:: (.+?)\s*$')
-RE_REFERENCE = re.compile(r':(.+?):`(.+?)`')
+
+def _replace_handler(match, key=UNITABLE.__getitem__):
+    return key(match.group(0))
 
 
 def include_file(lines, pos, match):
     global dirname
     orig_filename = match.groups()[0]
     filename = os.path.join(dirname, orig_filename)
-    fh = open(filename)
+    fh = codecs.open(filename, encoding='utf-8')
     try:
         old_dirname = dirname
         dirname = os.path.dirname(orig_filename)
@@ -28,8 +61,19 @@ def include_file(lines, pos, match):
         fh.close()
 
 
+def asciify(lines):
+    prev_diff = None
+    for line in lines:
+        new_line = UNIRE.sub(_replace_handler, line)
+        if prev_diff and HEADER.match(new_line):
+            new_line = ''.join([
+                new_line.rstrip(), new_line[0] * prev_diff, '\n'])
+        prev_diff = len(new_line) - len(line)
+        yield new_line.encode('ascii')
+
+
 def replace_code_block(lines, pos, match):
-    lines[pos] = ""
+    lines[pos] = ''
     curpos = pos - 1
     # Find the first previous line with text to append "::" to it.
     while True:
@@ -39,13 +83,58 @@ def replace_code_block(lines, pos, match):
             break
         curpos -= 1
 
-    if lines[prev_line_with_text].endswith(":"):
-        lines[prev_line_with_text] += ":"
+    if lines[prev_line_with_text].endswith(':'):
+        lines[prev_line_with_text] += ':'
     else:
-        lines[prev_line_with_text] += "::"
+        lines[prev_line_with_text] += match.group(1) + '::'
+
+
+def _deref_default(target):
+    return r'``{0}``'.format(target)
+
+
+def _deref_ref(target):
+    m = RE_NAMED_REF.match(target)
+    if m:
+        text, target = m.group(1).strip(), m.group(2).strip()
+    else:
+        text = target
+
+    try:
+        url = REFS[target]
+    except KeyError:
+        return _deref_default(target)
+
+    if '://' not in url:
+        url = '/'.join([REFBASE, url])
+    pending_refs[text] = url
+
+    return r'`{0}`_'.format(text)
+
+
+DEREF = {'ref': _deref_ref}
+
+
+def _deref(match):
+    return DEREF.get(match.group(1), _deref_default)(match.group(2))
+
+
+def deref_all(line):
+    return RE_REFERENCE.subn(_deref, line)[0]
+
+
+def resolve_ref(name, url):
+    return '\n.. _`{0}`: {1}\n'.format(name, url)
+
+
+def resolve_pending_refs(lines):
+    for line in lines:
+        yield line
+    for name, url in pending_refs.items():
+        yield resolve_ref(name, url)
+
 
 TO_RST_MAP = {RE_CODE_BLOCK: replace_code_block,
-              RE_REFERENCE: r'``\2``',
               RE_INCLUDE: include_file}
 
 
@@ -60,17 +149,18 @@ def _process(lines):
                     line = lines[i]
             else:
                 lines[i] = regex.sub(alt, line)
-    return lines
+        lines[i] = deref_all(lines[i])
+    return resolve_pending_refs(asciify(lines))
 
 
 def sphinx_to_rst(fh):
-    return "".join(_process(fh))
+    return ''.join(_process(fh))
 
 
-if __name__ == "__main__":
+if __name__ == '__main__':
     global dirname
     dirname = os.path.dirname(sys.argv[1])
-    fh = open(sys.argv[1])
+    fh = codecs.open(sys.argv[1], encoding='utf-8')
     try:
         print(sphinx_to_rst(fh))
     finally:

+ 13 - 0
funtests/stress/stress/app.py

@@ -11,9 +11,12 @@ from celery import Celery
 from celery import signals
 from celery.bin.base import Option
 from celery.exceptions import SoftTimeLimitExceeded
+from celery.utils.log import get_task_logger
 
 from .templates import use_template, template_names
 
+logger = get_task_logger(__name__)
+
 
 class App(Celery):
     template_selected = False
@@ -124,6 +127,16 @@ def segfault():
     assert False, 'should not get here'
 
 
+@app.task
+def raising(exc=KeyError()):
+    raise exc
+
+
+@app.task
+def logs(msg, p=False):
+    print(msg) if p else logger.info(msg)
+
+
 def marker(s, sep='-'):
     print('{0}{1}'.format(sep, s))
     while True:

+ 13 - 2
funtests/stress/stress/templates.py

@@ -56,6 +56,7 @@ class default(object):
               exchange=Exchange(CSTRESS_QUEUE),
               routing_key=CSTRESS_QUEUE),
     ]
+    CELERY_MAX_CACHED_RESULTS = -1
     BROKER_URL = os.environ.get('CSTRESS_BROKER', 'amqp://')
     CELERY_RESULT_BACKEND = os.environ.get('CSTRESS_BACKEND', 'rpc://')
     CELERYD_PREFETCH_MULTIPLIER = int(os.environ.get('CSTRESS_PREFETCH', 10))
@@ -69,8 +70,13 @@ class default(object):
 @template()
 class redis(default):
     BROKER_URL = os.environ.get('CSTRESS_BROKER', 'redis://')
-    CELERY_RESULT_BACKEND = os.environ.get('CSTRESS_bACKEND', 'redis://')
-    BROKER_TRANSPORT_OPTIONS = {'fanout_prefix': True}
+    CELERY_RESULT_BACKEND = os.environ.get(
+        'CSTRESS_BACKEND', 'redis://?new_join=1',
+    )
+    BROKER_TRANSPORT_OPTIONS = {
+        'fanout_prefix': True,
+        'fanout_patterns': True,
+    }
 
 
 @template()
@@ -100,3 +106,8 @@ class confirms(default):
 class events(default):
     CELERY_SEND_EVENTS = True
     CELERY_SEND_TASK_SENT_EVENT = True
+
+
+@template()
+class execv(default):
+    CELERYD_FORCE_EXECV = True

+ 20 - 1
pavement.py

@@ -1,4 +1,8 @@
+from __future__ import print_function
+
 import sys
+import traceback
+
 from paver.easy import task, sh, cmdopts, path, needs, options, Bunch
 from paver import doctools  # noqa
 from paver.setuputils import setup  # noqa
@@ -95,11 +99,25 @@ def clean_contributing(options):
     path('CONTRIBUTING.rst').unlink_p()
 
 
+@task
+def verify_readme(options):
+    with open('README.rst') as fp:
+        try:
+            fp.read().encode('ascii')
+        except Exception:
+            print('README contains non-ascii characters', file=sys.stderr)
+            print('Original exception below...', file=sys.stderr)
+            traceback.print_stack(file=sys.stderr)
+            sh('false')
+
+
 @task
 @needs('clean_readme')
 def readme(options):
     sh('{0} extra/release/sphinx-to-rst.py docs/templates/readme.txt \
             > README.rst'.format(sys.executable))
+    verify_readme()
+
 
 @task
 @needs('clean_contributing')
@@ -107,6 +125,7 @@ def contributing(options):
     sh('{0} extra/release/sphinx-to-rst.py docs/contributing.rst \
             > CONTRIBUTING.rst'.format(sys.executable))
 
+
 @task
 def bump(options):
     sh("extra/release/bump_version.py \
@@ -166,7 +185,7 @@ def gitcleanforce(options):
 
 @task
 @needs('flakes', 'autodoc', 'verifyindex',
-       'verifyconfigref', 'test', 'gitclean')
+       'verifyconfigref', 'verify_readme', 'test', 'gitclean')
 def releaseok(options):
     pass
 

+ 1 - 1
requirements/default.txt

@@ -1,3 +1,3 @@
 pytz>dev
 billiard>=3.3.0.14,<3.4
-kombu>=3.0.12,<4.0
+kombu>=3.0.14,<4.0

+ 1 - 1
setup.cfg

@@ -12,4 +12,4 @@ upload-dir = docs/.build/html
 [bdist_rpm]
 requires = pytz >= 2011b
            billiard >= 3.3.0.14
-           kombu >= 3.0.12
+           kombu >= 3.0.14

+ 1 - 0
setup.py

@@ -71,6 +71,7 @@ classes = """
     Programming Language :: Python :: 2.7
     Programming Language :: Python :: 3
     Programming Language :: Python :: 3.3
+    Programming Language :: Python :: 3.4
     Programming Language :: Python :: Implementation :: CPython
     Programming Language :: Python :: Implementation :: PyPy
     Programming Language :: Python :: Implementation :: Jython

+ 13 - 4
tox.ini

@@ -3,19 +3,28 @@ envlist =
     2.6,
     2.7,
     3.3,
+    3.4,
     pypy
 
 [testenv]
 sitepackages = False
 commands = nosetests
 
+[testenv:3.4]
+basepython = python3.4
+deps = -r{toxinidir}/requirements/default.txt
+       -r{toxinidir}/requirements/test3.txt
+       -r{toxinidir}/requirements/test-ci.txt
+commands = {toxinidir}/extra/release/removepyc.sh {toxinidir}
+           nosetests -xsv --with-coverage --cover-inclusive --cover-erase []
+
 [testenv:3.3]
 basepython = python3.3
 deps = -r{toxinidir}/requirements/default.txt
        -r{toxinidir}/requirements/test3.txt
        -r{toxinidir}/requirements/test-ci.txt
 commands = {toxinidir}/extra/release/removepyc.sh {toxinidir}
-           nosetests celery.tests --with-coverage --cover-inclusive --cover-erase []
+           nosetests -xsv --with-coverage --cover-inclusive --cover-erase []
 
 [testenv:2.7]
 basepython = python2.7
@@ -23,7 +32,7 @@ deps = -r{toxinidir}/requirements/default.txt
        -r{toxinidir}/requirements/test.txt
        -r{toxinidir}/requirements/test-ci.txt
 commands = {toxinidir}/extra/release/removepyc.sh {toxinidir}
-           nosetests --with-coverage --cover-inclusive --cover-erase []
+           nosetests -xsv --with-coverage --cover-inclusive --cover-erase []
 
 [testenv:2.6]
 basepython = python2.6
@@ -31,7 +40,7 @@ deps = -r{toxinidir}/requirements/default.txt
        -r{toxinidir}/requirements/test.txt
        -r{toxinidir}/requirements/test-ci.txt
 commands = {toxinidir}/extra/release/removepyc.sh {toxinidir}
-           nosetests --with-coverage --cover-inclusive --cover-erase []
+           nosetests -xsv --with-coverage --cover-inclusive --cover-erase []
 
 [testenv:pypy]
 basepython = pypy
@@ -39,4 +48,4 @@ deps = -r{toxinidir}/requirements/default.txt
        -r{toxinidir}/requirements/test.txt
        -r{toxinidir}/requirements/test-ci.txt
 commands = {toxinidir}/extra/release/removepyc.sh {toxinidir}
-           nosetests --with-coverage --cover-inclusive --cover-erase []
+           nosetests -xsv --with-coverage --cover-inclusive --cover-erase []