Selaa lähdekoodia

Merge branch 'master' into 3.1

Ask Solem 11 vuotta sitten
vanhempi
commit
68787655a8
59 muutettua tiedostoa jossa 1249 lisäystä ja 568 poistoa
  1. 1 1
      CONTRIBUTORS.txt
  2. 171 0
      Changelog
  3. 11 9
      README.rst
  4. 1 1
      celery/__init__.py
  5. 2 2
      celery/__main__.py
  6. 1 1
      celery/_state.py
  7. 4 3
      celery/app/base.py
  8. 10 17
      celery/app/builtins.py
  9. 40 22
      celery/app/trace.py
  10. 3 2
      celery/apps/worker.py
  11. 1 0
      celery/backends/amqp.py
  12. 14 8
      celery/backends/base.py
  13. 4 2
      celery/backends/cache.py
  14. 70 36
      celery/backends/redis.py
  15. 2 1
      celery/beat.py
  16. 13 2
      celery/bin/base.py
  17. 7 9
      celery/bin/celery.py
  18. 9 4
      celery/bin/worker.py
  19. 21 9
      celery/canvas.py
  20. 8 3
      celery/concurrency/asynpool.py
  21. 27 9
      celery/events/__init__.py
  22. 359 185
      celery/events/state.py
  23. 2 2
      celery/loaders/__init__.py
  24. 17 2
      celery/result.py
  25. 4 4
      celery/states.py
  26. 3 2
      celery/tests/app/test_app.py
  27. 11 6
      celery/tests/backends/test_base.py
  28. 5 4
      celery/tests/backends/test_cache.py
  29. 50 8
      celery/tests/backends/test_redis.py
  30. 10 6
      celery/tests/bin/test_celery.py
  31. 101 48
      celery/tests/events/test_state.py
  32. 13 7
      celery/tests/tasks/test_chord.py
  33. 5 4
      celery/tests/worker/test_consumer.py
  34. 63 6
      celery/utils/__init__.py
  35. 1 1
      celery/utils/timeutils.py
  36. 1 7
      celery/worker/__init__.py
  37. 1 1
      celery/worker/components.py
  38. 8 12
      celery/worker/consumer.py
  39. 2 3
      celery/worker/heartbeat.py
  40. 4 1
      celery/worker/job.py
  41. 5 1
      celery/worker/state.py
  42. 20 0
      docs/configuration.rst
  43. 3 3
      docs/getting-started/brokers/rabbitmq.rst
  44. 10 0
      docs/glossary.rst
  45. 1 1
      docs/includes/introduction.txt
  46. 25 1
      docs/internals/protov2.rst
  47. 4 2
      docs/userguide/optimizing.rst
  48. 3 2
      docs/userguide/periodic-tasks.rst
  49. 8 4
      docs/userguide/signals.rst
  50. 2 3
      docs/userguide/tasks.rst
  51. 4 3
      examples/resultgraph/tasks.py
  52. 17 12
      extra/generic-init.d/celerybeat
  53. 49 57
      extra/generic-init.d/celeryd
  54. 4 11
      extra/supervisord/celerybeat.conf
  55. 4 13
      extra/supervisord/celeryd.conf
  56. 6 0
      funtests/stress/stress/templates.py
  57. 2 2
      requirements/default.txt
  58. 0 1
      requirements/pkgutils.txt
  59. 2 2
      setup.cfg

+ 1 - 1
CONTRIBUTORS.txt

@@ -150,4 +150,4 @@ Daniel M Taub, 2013/10/22
 Matt Wise, 2013/11/06
 Michael Robellard, 2013/11/07
 Vsevolod Kulaga, 2013/11/16
-
+Ionel Cristian Mărieș, 2013/12/09

+ 171 - 0
Changelog

@@ -8,6 +8,177 @@ This document contains change notes for bugfix releases in the 3.1.x series
 (Cipater), please see :ref:`whatsnew-3.1` for an overview of what's
 new in Celery 3.1.
 
+.. _version-3.1.7:
+
+3.1.7
+=====
+:release-date: 2013-12-17 06:00 P.M UTC
+:release-by: Ask Solem
+
+.. _v317-important:
+
+Important Notes
+---------------
+
+Init script security improvements
+---------------------------------
+
+Where the generic init scripts (for ``celeryd``, and ``celerybeat``) before
+delegated the responsibility of dropping privileges to the target application,
+it will now use ``su`` instead, so that the Python program is not trusted
+with superuser privileges.
+
+This is not in reaction to any known exploit, but it will
+limit the possibility of a privilege escalation bug being abused in the
+future.
+
+You have to upgrade the init scripts manually from this directory:
+https://github.com/celery/celery/tree/3.1/extra/generic-init.d
+
+AMQP result backend
+~~~~~~~~~~~~~~~~~~~
+
+The 3.1 release accidentally left the amqp backend configured to be
+non-persistent by default.
+
+Upgrading from 3.0 would give a "not equivalent" error when attempting to
+set or retrieve results for a task.  That is unless you manually set the
+persistence setting::
+
+    CELERY_RESULT_PERSISTENT = True
+
+This version restores the previous value so if you already forced
+the upgrade by removing the existing exchange you must either
+keep the configuration by setting ``CELERY_RESULT_PERSISTENT = False``
+or delete the ``celeryresults`` exchange again.
+
+Synchronous subtasks
+~~~~~~~~~~~~~~~~~~~~
+
+Tasks waiting for the result of a subtask will now emit
+a :exc:`RuntimeWarning` warning when using the prefork pool,
+and in 3.2 this will result in an exception being raised.
+
+It's not legal for tasks to block by waiting for subtasks
+as this is likely to lead to resource starvation and eventually
+deadlock when using the prefork pool (see also :ref:`task-synchronous-subtasks`).
+
+If you really know what you are doing you can avoid the warning (and
+the future exception being raised) by moving the operation in a whitelist
+block:
+
+.. code-block:: python
+
+    from celery.result import allow_join_result
+
+    @app.task
+    def misbehaving():
+        result = other_task.delay()
+        with allow_join_result():
+            result.get()
+
+Note also that if you wait for the result of a subtask in any form
+when using the prefork pool you must also disable the pool prefetching
+behavior with the worker :ref:`-Ofair option <prefork-pool-prefetch>`.
+
+.. _v317-fixes:
+
+Fixes
+-----
+
+- Now depends on :ref:`Kombu 3.0.8 <kombu:version-3.0.8>`.
+
+- Now depends on :mod:`billiard` 3.3.0.13
+
+- Events: Fixed compatibility with non-standard json libraries
+  that sends float as :class:`decimal.Decimal` (Issue #1731)
+
+- Events: State worker objects now always defines attributes:
+  ``active``, ``processed``, ``loadavg``, ``sw_ident``, ``sw_ver``
+  and ``sw_sys``.
+
+- Worker: Now keeps count of the total number of tasks processed,
+  not just by type (``all_active_count``).
+
+- Init scripts:  Fixed problem with reading configuration file
+  when the init script is symlinked to a runlevel (e.g. ``S02celeryd``).
+  (Issue #1740).
+
+    This also removed a rarely used feature where you can symlink the script
+    to provide alternative configurations.  You instead copy the script
+    and give it a new name, but perhaps a better solution is to provide
+    arguments to ``CELERYD_OPTS`` to separate them:
+
+    .. code-block:: bash
+
+        CELERYD_NODES="X1 X2 Y1 Y2"
+        CELERYD_OPTS="-A:X1 x -A:X2 x -A:Y1 y -A:Y2 y"
+
+- Fallback chord unlock task is now always called after the chord header
+  (Issue #1700).
+
+    This means that the unlock task will not be started if there's
+    an error sending the header.
+
+- Celery command: Fixed problem with arguments for some control commands.
+
+    Fix contributed by Konstantin Podshumok.
+
+- Fixed bug in ``utcoffset`` where the offset when in DST would be
+  completely wrong (Issue #1743).
+
+- Worker: Errors occurring while attempting to serialize the result of a
+  task will now cause the task to be marked with failure and a
+  :class:`kombu.exceptions.EncodingError` error.
+
+    Fix contributed by Ionel Cristian Mărieș.
+
+- Worker with ``-B`` argument did not properly shut down the beat instance.
+
+- Worker: The ``%n`` and ``%h`` formats are now also supported by the
+  :option:`--logfile`, :option:`--pidfile` and :option:`--statedb` arguments.
+
+    Example:
+
+    .. code-block:: bash
+
+        $ celery -A proj worker -n foo@%h --logfile=%n.log --statedb=%n.db
+
+- Redis/Cache result backends: Will now timeout if keys evicted while trying
+  to join a chord.
+
+- The fallbock unlock chord task now raises :exc:`Retry` so that the
+  retry even is properly logged by the worker.
+
+- Multi: Will no longer apply Eventlet/gevent monkey patches (Issue #1717).
+
+- Redis result backend: Now supports UNIX sockets.
+
+    Like the Redis broker transport the result backend now also supports
+    using ``redis+socket:///tmp/redis.sock`` URLs.
+
+    Contributed by Alcides Viamontes Esquivel.
+
+- Events: Events sent by clients was mistaken for worker related events
+  (Issue #1714).
+
+    For ``events.State`` the tasks now have a ``Task.client`` attribute
+    that is set when a ``task-sent`` event is being received.
+
+    Also, a clients logical clock is not in sync with the cluster so
+    they live in a "time bubble".  So for this reason monitors will no
+    longer attempt to merge with the clock of an event sent by a client,
+    instead it will fake the value by using the current clock with
+    a skew of -1.
+
+- Prefork pool: The method used to find terminated processes was flawed
+  in that it did not also take into account missing popen objects.
+
+- Canvas: ``group`` and ``chord`` now works with anon signatures as long
+  as the group/chord object is associated with an app instance (Issue #1744).
+
+    You can pass the app by using ``group(..., app=app)``.
+
 .. _version-3.1.6:
 
 3.1.6

+ 11 - 9
README.rst

@@ -4,7 +4,7 @@
 
 .. image:: http://cloud.github.com/downloads/celery/celery/celery_128.png
 
-:Version: 3.1.6 (Cipater)
+:Version: 3.1.7 (Cipater)
 :Web: http://celeryproject.org/
 :Download: http://pypi.python.org/pypi/celery/
 :Source: http://github.com/celery/celery/
@@ -274,10 +274,11 @@ Transports and Backends
     for using Redis as a message transport or as a result backend.
 
 :celery[mongodb]:
-    for using MongoDB as a message transport, or as a result backend.
+    for using MongoDB as a message transport (*experimental*),
+    or as a result backend (*supported*).
 
 :celery[sqs]:
-    for using Amazon SQS as a message transport.
+    for using Amazon SQS as a message transport (*experimental*).
 
 :celery[memcache]:
     for using memcached as a result backend.
@@ -286,28 +287,29 @@ Transports and Backends
     for using Apache Cassandra as a result backend.
 
 :celery[couchdb]:
-    for using CouchDB as a message transport.
+    for using CouchDB as a message transport (*experimental*).
 
 :celery[couchbase]:
     for using CouchBase as a result backend.
 
 :celery[beanstalk]:
-    for using Beanstalk as a message transport.
+    for using Beanstalk as a message transport (*experimental*).
 
 :celery[zookeeper]:
     for using Zookeeper as a message transport.
 
 :celery[zeromq]:
-    for using ZeroMQ as a message transport.
+    for using ZeroMQ as a message transport (*experimental*).
 
 :celery[sqlalchemy]:
-    for using SQLAlchemy as a message transport, or as a result backend.
+    for using SQLAlchemy as a message transport (*experimental*),
+    or as a result backend (*supported*).
 
 :celery[pyro]:
-    for using the Pyro4 message transport.
+    for using the Pyro4 message transport (*experimental*).
 
 :celery[slmq]:
-    for using the SoftLayer Message Queue transport.
+    for using the SoftLayer Message Queue transport (*experimental*).
 
 .. _celery-installing-from-source:
 

+ 1 - 1
celery/__init__.py

@@ -14,7 +14,7 @@ version_info_t = namedtuple(
 )
 
 SERIES = 'Cipater'
-VERSION = version_info_t(3, 1, 6, '', '')
+VERSION = version_info_t(3, 1, 7, '', '')
 __version__ = '{0.major}.{0.minor}.{0.micro}{0.releaselevel}'.format(VERSION)
 __author__ = 'Ask Solem'
 __contact__ = 'ask@celeryproject.org'

+ 2 - 2
celery/__main__.py

@@ -24,7 +24,8 @@ def _warn_deprecated(new):
 
 
 def main():
-    maybe_patch_concurrency()
+    if 'multi' not in sys.argv:
+        maybe_patch_concurrency()
     from celery.bin.celery import main
     main()
 
@@ -37,7 +38,6 @@ def _compat_worker():
 
 
 def _compat_multi():
-    maybe_patch_concurrency()
     _warn_deprecated('celery multi')
     from celery.bin.multi import main
     main()

+ 1 - 1
celery/_state.py

@@ -58,7 +58,7 @@ _task_join_will_block = False
 
 def _set_task_join_will_block(blocks):
     global _task_join_will_block
-    _task_join_will_block = True
+    _task_join_will_block = blocks
 
 
 def task_join_will_block():

+ 4 - 3
celery/app/base.py

@@ -307,7 +307,8 @@ class Celery(object):
         conf = self.conf
         if conf.CELERY_ALWAYS_EAGER:  # pragma: no cover
             warnings.warn(AlwaysEagerIgnored(
-                'CELERY_ALWAYS_EAGER has no effect on send_task'))
+                'CELERY_ALWAYS_EAGER has no effect on send_task',
+            ), stacklevel=2)
         options = router.route(options, name, args, kwargs)
         if connection:
             producer = self.amqp.TaskProducer(connection)
@@ -445,8 +446,8 @@ class Celery(object):
         if self._pool:
             self._pool.force_close_all()
             self._pool = None
-            amqp = self.amqp
-            if amqp._producer_pool:
+            amqp = self.__dict__.get('amqp')
+            if amqp is not None and amqp._producer_pool is not None:
                 amqp._producer_pool.force_close_all()
                 amqp._producer_pool = None
 

+ 10 - 17
celery/app/builtins.py

@@ -66,7 +66,7 @@ def add_unlock_chord_task(app):
     """
     from celery.canvas import signature
     from celery.exceptions import ChordError
-    from celery.result import result_from_tuple
+    from celery.result import allow_join_result, result_from_tuple
 
     default_propagate = app.conf.CELERY_CHORD_PROPAGATES
 
@@ -95,7 +95,8 @@ def add_unlock_chord_task(app):
         if deps.ready():
             callback = signature(callback, app=app)
             try:
-                ret = j(propagate=propagate)
+                with allow_join_result():
+                    ret = j(timeout=3.0, propagate=propagate)
             except Exception as exc:
                 try:
                     culprit = next(deps._failed_join_report())
@@ -117,8 +118,8 @@ def add_unlock_chord_task(app):
                         exc=ChordError('Callback error: {0!r}'.format(exc)),
                     )
         else:
-            return unlock_chord.retry(countdown=interval,
-                                      max_retries=max_retries)
+            raise unlock_chord.retry(countdown=interval,
+                                     max_retries=max_retries)
     return unlock_chord
 
 
@@ -277,8 +278,6 @@ def add_chain_task(app):
                     tasks.append(task)
                 prev_task, prev_res = task, res
 
-            print(tasks)
-
             return tasks, results
 
         def apply_async(self, args=(), kwargs={}, group_id=None, chord=None,
@@ -356,17 +355,11 @@ def add_chord_task(app):
             results = [AsyncResult(prepare_member(task, body, group_id))
                        for task in header.tasks]
 
-            # - fallback implementations schedules the chord_unlock task here
-            app.backend.on_chord_apply(group_id, body,
-                                       interval=interval,
-                                       countdown=countdown,
-                                       max_retries=max_retries,
-                                       propagate=propagate,
-                                       result=results)
-            # - call the header group, returning the GroupResult.
-            final_res = header(*partial_args, task_id=group_id)
-
-            return final_res
+            return self.backend.apply_chord(
+                header, partial_args, group_id,
+                body, interval=interval, countdown=countdown,
+                max_retries=max_retries, propagate=propagate, result=results,
+            )
 
         def _prepare_member(self, task, body, group_id):
             opts = task.options

+ 40 - 22
celery/app/trace.py

@@ -22,6 +22,7 @@ import sys
 from warnings import warn
 
 from billiard.einfo import ExceptionInfo
+from kombu.exceptions import EncodeError
 from kombu.utils import kwdict
 
 from celery import current_app
@@ -193,7 +194,26 @@ def build_tracer(name, task, loader=None, hostname=None, store_errors=True,
     from celery import canvas
     signature = canvas.maybe_signature  # maybe_ does not clone if already
 
+    def on_error(request, exc, uuid, state=FAILURE, call_errbacks=True):
+        if propagate:
+            raise
+        I = Info(state, exc)
+        R = I.handle_error_state(task, eager=eager)
+        if call_errbacks:
+            [signature(errback, app=app).apply_async((uuid, ))
+             for errback in request.errbacks or []]
+        return I, R, I.state, I.retval
+
     def trace_task(uuid, args, kwargs, request=None):
+        # R      - is the possibly prepared return value.
+        # I      - is the Info object.
+        # retval - is the always unmodified return value.
+        # state  - is the resulting task state.
+
+        # This function is very long because we have unrolled all the calls
+        # for performance reasons, and because the function is so long
+        # we want the main variables (I, and R) to stand out visually from the
+        # the rest of the variables, so breaking PEP8 is worth it ;)
         R = I = retval = state = None
         kwargs = kwdict(kwargs)
         try:
@@ -224,32 +244,30 @@ def build_tracer(name, task, loader=None, hostname=None, store_errors=True,
                     I, R = Info(IGNORED, exc), ExceptionInfo(internal=True)
                     state, retval = I.state, I.retval
                 except Retry as exc:
-                    I = Info(RETRY, exc)
-                    state, retval = I.state, I.retval
-                    R = I.handle_error_state(task, eager=eager)
+                    I, R, state, retval = on_error(
+                        task_request, exc, uuid, RETRY, call_errbacks=False,
+                    )
                 except Exception as exc:
-                    if propagate:
-                        raise
-                    I = Info(FAILURE, exc)
-                    state, retval = I.state, I.retval
-                    R = I.handle_error_state(task, eager=eager)
-                    [signature(errback, app=app).apply_async((uuid, ))
-                        for errback in task_request.errbacks or []]
+                    I, R, state, retval = on_error(task_request, exc, uuid)
                 except BaseException as exc:
                     raise
                 else:
-                    # callback tasks must be applied before the result is
-                    # stored, so that result.children is populated.
-                    [signature(callback, app=app).apply_async((retval, ))
-                        for callback in task_request.callbacks or []]
-                    if publish_result:
-                        store_result(
-                            uuid, retval, SUCCESS, request=task_request,
-                        )
-                    if task_on_success:
-                        task_on_success(retval, uuid, args, kwargs)
-                    if success_receivers:
-                        send_success(sender=task, result=retval)
+                    try:
+                        # callback tasks must be applied before the result is
+                        # stored, so that result.children is populated.
+                        [signature(callback, app=app).apply_async((retval, ))
+                            for callback in task_request.callbacks or []]
+                        if publish_result:
+                            store_result(
+                                uuid, retval, SUCCESS, request=task_request,
+                            )
+                    except EncodeError as exc:
+                        I, R, state, retval = on_error(task_request, exc, uuid)
+                    else:
+                        if task_on_success:
+                            task_on_success(retval, uuid, args, kwargs)
+                        if success_receivers:
+                            send_success(sender=task, result=retval)
 
                 # -* POST *-
                 if state not in IGNORE_STATES:

+ 3 - 2
celery/apps/worker.py

@@ -91,10 +91,10 @@ BANNER = """\
 {platform}
 
 [config]
-.> broker:      {conninfo}
 .> app:         {app}
+.> transport:   {conninfo}
+.> results:     {results}
 .> concurrency: {concurrency}
-.> events:      {events}
 
 [queues]
 {queues}
@@ -225,6 +225,7 @@ class Worker(WorkController):
             hostname=safe_str(self.hostname),
             version=VERSION_BANNER,
             conninfo=self.app.connection().as_uri(),
+            results=self.app.conf.CELERY_RESULT_BACKEND or 'disabled',
             concurrency=concurrency,
             platform=safe_str(_platform.platform()),
             events=events,

+ 1 - 0
celery/backends/amqp.py

@@ -55,6 +55,7 @@ class AMQPBackend(BaseBackend):
 
     BacklogLimitExceeded = BacklogLimitExceeded
 
+    persistent = True
     supports_autoexpire = True
     supports_native_join = True
 

+ 14 - 8
celery/backends/base.py

@@ -311,7 +311,11 @@ class BaseBackend(object):
         self.app.tasks['celery.chord_unlock'].apply_async(
             (group_id, body, ), kwargs, countdown=countdown,
         )
-    on_chord_apply = fallback_chord_unlock
+
+    def apply_chord(self, header, partial_args, group_id, body, **options):
+        result = header(*partial_args, task_id=group_id)
+        self.fallback_chord_unlock(group_id, body, **options)
+        return result
 
     def current_task_children(self, request=None):
         request = request or getattr(current_task(), 'request', None)
@@ -335,6 +339,8 @@ class KeyValueStoreBackend(BaseBackend):
             self.key_t = self.key_t.__func__  # remove binding
         self._encode_prefixes()
         super(KeyValueStoreBackend, self).__init__(*args, **kwargs)
+        if self.implements_incr:
+            self.apply_chord = self._apply_chord_incr
 
     def _encode_prefixes(self):
         self.task_keyprefix = self.key_t(self.task_keyprefix)
@@ -459,17 +465,16 @@ class KeyValueStoreBackend(BaseBackend):
             meta['result'] = result_from_tuple(result, self.app)
             return meta
 
-    def on_chord_apply(self, group_id, body, result=None, **kwargs):
-        if self.implements_incr:
-            self.save_group(group_id, self.app.GroupResult(group_id, result))
-        else:
-            self.fallback_chord_unlock(group_id, body, result, **kwargs)
+    def _apply_chord_incr(self, header, partial_args, group_id, body,
+                          result=None, **options):
+        self.save_group(group_id, self.app.GroupResult(group_id, result))
+        return header(*partial_args, task_id=group_id)
 
     def on_chord_part_return(self, task, propagate=None):
         if not self.implements_incr:
             return
         from celery import maybe_signature
-        from celery.result import GroupResult
+        from celery.result import GroupResult, allow_join_result
         app = self.app
         if propagate is None:
             propagate = self.app.conf.CELERY_CHORD_PROPAGATES
@@ -502,7 +507,8 @@ class KeyValueStoreBackend(BaseBackend):
             callback = maybe_signature(task.request.chord, app=self.app)
             j = deps.join_native if deps.supports_native_join else deps.join
             try:
-                ret = j(propagate=propagate)
+                with allow_join_result():
+                    ret = j(timeout=3.0, propagate=propagate)
             except Exception as exc:
                 try:
                     culprit = next(deps._failed_join_report())

+ 4 - 2
celery/backends/cache.py

@@ -128,9 +128,11 @@ class CacheBackend(KeyValueStoreBackend):
     def delete(self, key):
         return self.client.delete(key)
 
-    def on_chord_apply(self, group_id, body, result=None, **kwargs):
+    def _apply_chord_incr(self, header, partial_args, group_id, body, **opts):
         self.client.set(self.get_key_for_chord(group_id), '0', time=86400)
-        self.save_group(group_id, self.app.GroupResult(group_id, result))
+        return super(CacheBackend, self)._apply_chord_incr(
+            header, partial_args, group_id, body, **opts
+        )
 
     def incr(self, key):
         return self.client.incr(key)

+ 70 - 36
celery/backends/redis.py

@@ -12,6 +12,9 @@ from kombu.utils import cached_property
 from kombu.utils.url import _parse_url
 
 from celery.exceptions import ImproperlyConfigured
+from celery.five import string_t
+from celery.utils import deprecated_property
+from celery.utils.functional import dictfilter
 
 from .base import KeyValueStoreBackend
 
@@ -35,18 +38,6 @@ class RedisBackend(KeyValueStoreBackend):
     #: redis-py client module.
     redis = redis
 
-    #: default Redis server hostname (`localhost`).
-    host = 'localhost'
-
-    #: default Redis server port (6379)
-    port = 6379
-
-    #: default Redis db number (0)
-    db = 0
-
-    #: default Redis password (:const:`None`)
-    password = None
-
     #: Maximium number of connections in the pool.
     max_connections = None
 
@@ -69,20 +60,54 @@ class RedisBackend(KeyValueStoreBackend):
                 except KeyError:
                     pass
         if host and '://' in host:
-            url, host = host, None
-        self.url = url
-        uhost = uport = upass = udb = None
+            url = host
+            host = None
+
+        self.max_connections = (
+            max_connections or _get('MAX_CONNECTIONS') or self.max_connections
+        )
+
+        self.connparams = {
+            'host': _get('HOST') or 'localhost',
+            'port': _get('PORT') or 6379,
+            'db': _get('DB') or 0,
+            'password': _get('PASSWORD'),
+            'max_connections': max_connections,
+        }
         if url:
-            _, uhost, uport, _, upass, udb, _ = _parse_url(url)
-            udb = udb.strip('/') if udb else 0
-        self.host = uhost or host or _get('HOST') or self.host
-        self.port = int(uport or port or _get('PORT') or self.port)
-        self.db = udb or db or _get('DB') or self.db
-        self.password = upass or password or _get('PASSWORD') or self.password
+            self.connparams = self._params_from_url(url, self.connparams)
+        self.url = url
         self.expires = self.prepare_expires(expires, type=int)
-        self.max_connections = (max_connections
-                                or _get('MAX_CONNECTIONS')
-                                or self.max_connections)
+
+    def _params_from_url(self, url, defaults):
+        scheme, host, port, user, password, path, query = _parse_url(url)
+        connparams = dict(
+            defaults, **dictfilter({
+                'host': host, 'port': port, 'password': password,
+                'db': query.pop('virtual_host', None)})
+        )
+
+        if scheme == 'socket':
+            # use 'path' as path to the socket… in this case
+            # the database number should be given in 'query'
+            connparams.update({
+                'connection_class': self.redis.UnixDomainSocketConnection,
+                'path': '/' + path,
+            })
+            # host+port are invalid options when using this connection type.
+            connparams.pop('host', None)
+            connparams.pop('port', None)
+        else:
+            connparams['db'] = path
+
+        # db may be string and start with / like in kombu.
+        db = connparams.get('db') or 0
+        db = db.strip('/') if isinstance(db, string_t) else db
+        connparams['db'] = int(db)
+
+        # Query parameters override other parameters
+        connparams.update(query)
+        return connparams
 
     def get(self, key):
         return self.client.get(key)
@@ -109,17 +134,26 @@ class RedisBackend(KeyValueStoreBackend):
 
     @cached_property
     def client(self):
-        pool = self.redis.ConnectionPool(host=self.host, port=self.port,
-                                         db=self.db, password=self.password,
-                                         max_connections=self.max_connections)
-        return self.redis.Redis(connection_pool=pool)
+        return self.redis.Redis(
+            connection_pool=self.redis.ConnectionPool(**self.connparams))
 
     def __reduce__(self, args=(), kwargs={}):
-        kwargs.update(
-            dict(host=self.host,
-                 port=self.port,
-                 db=self.db,
-                 password=self.password,
-                 expires=self.expires,
-                 max_connections=self.max_connections))
-        return super(RedisBackend, self).__reduce__(args, kwargs)
+        return super(RedisBackend, self).__reduce__(
+            (self.url, ), {'expires': self.expires},
+        )
+
+    @deprecated_property(3.2, 3.3)
+    def host(self):
+        return self.connparams['host']
+
+    @deprecated_property(3.2, 3.3)
+    def port(self):
+        return self.connparams['port']
+
+    @deprecated_property(3.2, 3.3)
+    def db(self):
+        return self.connparams['db']
+
+    @deprecated_property(3.2, 3.3)
+    def password(self):
+        return self.connparams['password']

+ 2 - 1
celery/beat.py

@@ -18,6 +18,7 @@ import traceback
 from threading import Event, Thread
 
 from billiard import Process, ensure_multiprocessing
+from billiard.common import reset_signals
 from kombu.utils import cached_property, reprcall
 from kombu.utils.functional import maybe_evaluate
 
@@ -501,7 +502,7 @@ else:
             self.name = 'Beat'
 
         def run(self):
-            platforms.signals.reset('SIGTERM')
+            reset_signals(full=False)
             platforms.close_open_fds([
                 sys.__stdin__, sys.__stdout__, sys.__stderr__,
             ] + list(iter_open_logger_fds()))

+ 13 - 2
celery/bin/base.py

@@ -66,6 +66,7 @@ in any command that also has a `--detach` option.
 from __future__ import absolute_import, print_function, unicode_literals
 
 import os
+import random
 import re
 import socket
 import sys
@@ -85,6 +86,7 @@ from celery.five import items, string, string_t
 from celery.platforms import EX_FAILURE, EX_OK, EX_USAGE
 from celery.utils import term
 from celery.utils import text
+from celery.utils import NODENAME_DEFAULT, nodesplit
 from celery.utils.imports import symbol_by_name, import_from_cwd
 
 # always enable DeprecationWarnings, so our users can see them.
@@ -258,6 +260,7 @@ class Command(object):
         pass
 
     def __call__(self, *args, **kwargs):
+        random.seed()  # maybe we were forked.
         self.verify_args(args)
         try:
             ret = self.run(*args, **kwargs)
@@ -522,9 +525,17 @@ class Command(object):
         """
         pass
 
-    def simple_format(self, s, match=find_sformat, expand=r'\1', **keys):
+    def node_format(self, s, nodename, **extra):
+        name, host = nodesplit(nodename)
+        return self._simple_format(
+            s, host, n=name or NODENAME_DEFAULT, **extra)
+
+    def simple_format(self, s, **extra):
+        return self._simple_format(s, socket.gethostname(), **extra)
+
+    def _simple_format(self, s, host,
+                       match=find_sformat, expand=r'\1', **keys):
         if s:
-            host = socket.gethostname()
             name, _, domain = host.partition('.')
             keys = dict({'%': '%', 'h': host, 'n': name, 'd': domain}, **keys)
             return match.sub(lambda m: keys[m.expand(expand)], s)

+ 7 - 9
celery/bin/celery.py

@@ -319,10 +319,7 @@ class _RemoteControl(Command):
         if destination and isinstance(destination, string_t):
             destination = [dest.strip() for dest in destination.split(',')]
 
-        try:
-            handler = getattr(self, method)
-        except AttributeError:
-            handler = self.call
+        handler = getattr(self, method, self.call)
 
         replies = handler(method, *args[1:], timeout=timeout,
                           destination=destination,
@@ -423,22 +420,22 @@ class control(_RemoteControl):
 
     def rate_limit(self, method, task_name, rate_limit, **kwargs):
         """<task_name> <rate_limit> (e.g. 5/s | 5/m | 5/h)>"""
-        return self.call(method, task_name, rate_limit, reply=True, **kwargs)
+        return self.call(method, task_name, rate_limit, **kwargs)
 
     def time_limit(self, method, task_name, soft, hard=None, **kwargs):
         """<task_name> <soft_secs> [hard_secs]"""
         return self.call(method, task_name,
-                         float(soft), float(hard), reply=True, **kwargs)
+                         float(soft), float(hard), **kwargs)
 
     def add_consumer(self, method, queue, exchange=None,
                      exchange_type='direct', routing_key=None, **kwargs):
         """<queue> [exchange [type [routing_key]]]"""
         return self.call(method, queue, exchange,
-                         exchange_type, routing_key, reply=True, **kwargs)
+                         exchange_type, routing_key, **kwargs)
 
     def cancel_consumer(self, method, queue, **kwargs):
         """<queue>"""
-        return self.call(method, queue, reply=True, **kwargs)
+        return self.call(method, queue, **kwargs)
 
 
 class status(Command):
@@ -542,7 +539,8 @@ class shell(Command):  # pragma: no cover
         import celery
         import celery.task.base
         self.app.loader.import_default_modules()
-        self.locals = {'celery': self.app,
+        self.locals = {'app': self.app,
+                       'celery': self.app,
                        'Task': celery.Task,
                        'chord': celery.chord,
                        'group': celery.group,

+ 9 - 4
celery/bin/worker.py

@@ -139,6 +139,7 @@ from celery.bin.base import Command, Option, daemon_options
 from celery.bin.celeryd_detach import detached_celeryd
 from celery.five import string_t
 from celery.platforms import maybe_drop_privileges
+from celery.utils import default_nodename
 from celery.utils.log import LOG_LEVELS, mlevel
 
 __all__ = ['worker', 'main']
@@ -180,8 +181,9 @@ class worker(Command):
             detached_celeryd(self.app).execute_from_commandline(argv)
             raise SystemExit(0)
 
-    def run(self, hostname=None, pool_cls=None, loglevel=None,
-            app=None, uid=None, gid=None, **kwargs):
+    def run(self, hostname=None, pool_cls=None, app=None, uid=None, gid=None,
+            loglevel=None, logfile=None, pidfile=None, state_db=None,
+            **kwargs):
         maybe_drop_privileges(uid=uid, gid=gid)
         # Pools like eventlet/gevent needs to patch libs as early
         # as possible.
@@ -190,7 +192,7 @@ class worker(Command):
         if self.app.IS_WINDOWS and kwargs.get('beat'):
             self.die('-B option does not work on Windows.  '
                      'Please run celery beat as a separate service.')
-        hostname = self.simple_format(hostname)
+        hostname = self.simple_format(default_nodename(hostname))
         if loglevel:
             try:
                 loglevel = mlevel(loglevel)
@@ -200,7 +202,10 @@ class worker(Command):
                         l for l in LOG_LEVELS if isinstance(l, string_t))))
 
         return self.app.Worker(
-            hostname=hostname, pool_cls=pool_cls, loglevel=loglevel, **kwargs
+            hostname=hostname, pool_cls=pool_cls, loglevel=loglevel,
+            logfile=self.node_format(logfile, hostname),
+            pidfile=self.node_format(pidfile, hostname),
+            state_db=self.node_format(state_db, hostname), **kwargs
         ).start()
 
     def with_pool_option(self, argv):

+ 21 - 9
celery/canvas.py

@@ -477,11 +477,7 @@ class group(Signature):
         tasks = _maybe_clone(self.tasks, app=self._app)
         if not tasks:
             return self.freeze()
-        # taking the app from the first task in the list,
-        # there may be a better solution to this, e.g.
-        # consolidate tasks with the same app and apply them in
-        # batches.
-        type = tasks[0].type.app.tasks[self['task']]
+        type = self.type
         return type(*type.prepare(dict(self.options, **options),
                                   tasks, args))
 
@@ -535,7 +531,13 @@ class group(Signature):
 
     @property
     def type(self):
-        return self._type or self.tasks[0].type.app.tasks[self['task']]
+        if self._type:
+            return self._type
+        # taking the app from the first task in the list, there may be a
+        # better solution for this, e.g. to consolidate tasks with the same
+        # app and apply them in batches.
+        app = self._app if self._app else self.tasks[0].type.app
+        return app.tasks[self['task']]
 
 
 @Signature.register_type
@@ -563,15 +565,25 @@ class chord(Signature):
 
     @property
     def type(self):
-        return self._type or self.tasks[0].type.app.tasks['celery.chord']
+        if self._type:
+            return self._type
+        # we will be able to fix this mess in 3.2 when we no longer
+        # require an actual task implementation for chord/group
+        if self._app:
+            app = self._app
+        else:
+            try:
+                app = self.tasks[0].type.app
+            except IndexError:
+                app = self.body.type.app
+        return app.tasks['celery.chord']
 
     def apply_async(self, args=(), kwargs={}, task_id=None, **options):
         body = kwargs.get('body') or self.kwargs['body']
         kwargs = dict(self.kwargs, **kwargs)
         body = body.clone(**options)
 
-        _chord = self._type or body.type.app.tasks['celery.chord']
-
+        _chord = self.type
         if _chord.app.conf.CELERY_ALWAYS_EAGER:
             return self.apply((), kwargs, task_id=task_id, **options)
         res = body.freeze(task_id)

+ 8 - 3
celery/concurrency/asynpool.py

@@ -85,6 +85,9 @@ UNAVAIL = frozenset([errno.EAGAIN, errno.EINTR])
 #: Constant sent by child process when started (ready to accept work)
 WORKER_UP = 15
 
+#: A process must have started before this timeout (in secs.) expires.
+PROC_ALIVE_TIMEOUT = 4.0
+
 SCHED_STRATEGY_PREFETCH = 1
 SCHED_STRATEGY_FAIR = 4
 
@@ -93,6 +96,8 @@ SCHED_STRATEGIES = {
     'fair': SCHED_STRATEGY_FAIR,
 }
 
+RESULT_MAXLEN = 128
+
 Ack = namedtuple('Ack', ('id', 'fd', 'payload'))
 
 
@@ -165,9 +170,9 @@ class Worker(_pool.Worker):
         # is writable.
         self.outq.put((WORKER_UP, (pid, )))
 
-    def prepare_result(self, result):
+    def prepare_result(self, result, RESULT_MAXLEN=RESULT_MAXLEN):
         if not isinstance(result, ExceptionInfo):
-            return truncate(repr(result), 46)
+            return truncate(repr(result), RESULT_MAXLEN)
         return result
 
 
@@ -356,7 +361,7 @@ class AsynPool(_pool.Pool):
         # sent a WORKER_UP message.  If a process fails to send
         # this message within proc_up_timeout we terminate it
         # and hope the next process will recover.
-        self._proc_alive_timeout = 2.0
+        self._proc_alive_timeout = PROC_ALIVE_TIMEOUT
         self._waiting_to_start = set()
 
         # denormalized set of all inqueues.

+ 27 - 9
celery/events/__init__.py

@@ -44,6 +44,8 @@ so timestamps will not work.
 Please uninstall yajl or force anyjson to use a different library.
 """
 
+CLIENT_CLOCK_SKEW = -1
+
 
 def get_exchange(conn):
     ex = copy(event_exchange)
@@ -279,7 +281,8 @@ class EventReceiver(ConsumerMixin):
     app = None
 
     def __init__(self, channel, handlers=None, routing_key='#',
-                 node_id=None, app=None, queue_prefix='celeryev'):
+                 node_id=None, app=None, queue_prefix='celeryev',
+                 accept=None):
         self.app = app_or_default(app or self.app)
         self.channel = maybe_channel(channel)
         self.handlers = {} if handlers is None else handlers
@@ -293,7 +296,12 @@ class EventReceiver(ConsumerMixin):
                            auto_delete=True,
                            durable=False,
                            queue_arguments=self._get_queue_arguments())
-        self.adjust_clock = self.app.clock.adjust
+        self.clock = self.app.clock
+        self.adjust_clock = self.clock.adjust
+        self.forward_clock = self.clock.forward
+        if accept is None:
+            accept = set([self.app.conf.CELERY_EVENT_SERIALIZER, 'json'])
+        self.accept = accept
 
     def _get_queue_arguments(self):
         conf = self.app.conf
@@ -311,7 +319,7 @@ class EventReceiver(ConsumerMixin):
     def get_consumers(self, Consumer, channel):
         return [Consumer(queues=[self.queue],
                          callbacks=[self._receive], no_ack=True,
-                         accept=['application/json'])]
+                         accept=self.accept)]
 
     def on_consume_ready(self, connection, channel, consumers,
                          wakeup=True, **kwargs):
@@ -337,11 +345,20 @@ class EventReceiver(ConsumerMixin):
 
     def event_from_message(self, body, localize=True,
                            now=time.time, tzfields=_TZGETTER,
-                           adjust_timestamp=adjust_timestamp):
-        type = body.get('type', '').lower()
-        clock = body.get('clock')
-        if clock:
-            self.adjust_clock(clock)
+                           adjust_timestamp=adjust_timestamp,
+                           CLIENT_CLOCK_SKEW=CLIENT_CLOCK_SKEW):
+        type = body['type']
+        if type == 'task-sent':
+            # clients never sync so cannot use their clock value
+            _c = body['clock'] = (self.clock.value or 1) + CLIENT_CLOCK_SKEW
+            self.adjust_clock(_c)
+        else:
+            try:
+                clock = body['clock']
+            except KeyError:
+                body['clock'] = self.forward_clock()
+            else:
+                self.adjust_clock(clock)
 
         if localize:
             try:
@@ -350,7 +367,8 @@ class EventReceiver(ConsumerMixin):
                 pass
             else:
                 body['timestamp'] = adjust_timestamp(timestamp, offset)
-        return type, Event(type, body, local_received=now())
+        body['local_received'] = now()
+        return type, body
 
     def _receive(self, body, message):
         self.process(*self.event_from_message(body))

+ 359 - 185
celery/events/state.py

@@ -22,19 +22,24 @@ import sys
 import threading
 
 from datetime import datetime
-from heapq import heappush, heappop
+from decimal import Decimal
+from heapq import heapify, heappush, heappop
 from itertools import islice
+from operator import itemgetter
 from time import time
+from weakref import ref
 
 from kombu.clocks import timetuple
-from kombu.utils import kwdict
+from kombu.utils import cached_property, kwdict
 
 from celery import states
-from celery.datastructures import AttributeDict
-from celery.five import items, values
+from celery.five import class_property, items, values
+from celery.utils import deprecated
 from celery.utils.functional import LRUCache
 from celery.utils.log import get_logger
 
+PYPY = hasattr(sys, 'pypy_version_info')
+
 # The window (in percentage) is added to the workers heartbeat
 # frequency.  If the time between updates exceeds this window,
 # then the worker is considered to be offline.
@@ -55,15 +60,25 @@ logger = get_logger(__name__)
 warn = logger.warning
 
 R_STATE = '<State: events={0.event_count} tasks={0.task_count}>'
-R_WORKER = '<Worker: {0.hostname} ({0.status_string})'
-R_TASK = '<Task: {0.name}({0.uuid}) {0.state}>'
+R_WORKER = '<Worker: {0.hostname} ({0.status_string} clock:{0.clock})'
+R_TASK = '<Task: {0.name}({0.uuid}) {0.state} clock:{0.clock}>'
 
 __all__ = ['Worker', 'Task', 'State', 'heartbeat_expires']
 
 
 def heartbeat_expires(timestamp, freq=60,
-                      expire_window=HEARTBEAT_EXPIRE_WINDOW):
-    return timestamp + freq * (expire_window / 1e2)
+                      expire_window=HEARTBEAT_EXPIRE_WINDOW,
+                      Decimal=Decimal, float=float, isinstance=isinstance):
+    # some json implementations returns decimal.Decimal objects,
+    # which are not compatible with float.
+    freq = float(freq) if isinstance(freq, Decimal) else freq
+    if isinstance(timestamp, Decimal):
+        timestamp = float(timestamp)
+    return timestamp + (freq * (expire_window / 1e2))
+
+
+def _depickle_task(cls, fields):
+    return cls(**(fields if CAN_KWDICT else kwdict(fields)))
 
 
 def with_unique_field(attr):
@@ -89,45 +104,71 @@ def with_unique_field(attr):
 
 
 @with_unique_field('hostname')
-class Worker(AttributeDict):
+class Worker(object):
     """Worker State."""
     heartbeat_max = 4
     expire_window = HEARTBEAT_EXPIRE_WINDOW
-    pid = None
-    _defaults = {'hostname': None, 'pid': None, 'freq': 60}
-
-    def __init__(self, **fields):
-        dict.__init__(self, self._defaults, **fields)
-        self.heartbeats = []
-
-    def on_online(self, timestamp=None, local_received=None, **kwargs):
-        """Callback for the :event:`worker-online` event."""
-        self.update(**kwargs)
-        self.update_heartbeat(local_received, timestamp)
 
-    def on_offline(self, **kwargs):
-        """Callback for the :event:`worker-offline` event."""
-        self.update(**kwargs)
-        self.heartbeats = []
+    _fields = ('hostname', 'pid', 'freq', 'heartbeats', 'clock',
+               'active', 'processed', 'loadavg', 'sw_ident',
+               'sw_ver', 'sw_sys')
+    if not PYPY:
+        __slots__ = _fields + ('event', '__dict__', '__weakref__')
+
+    def __init__(self, hostname=None, pid=None, freq=60,
+                 heartbeats=None, clock=0, active=None, processed=None,
+                 loadavg=None, sw_ident=None, sw_ver=None, sw_sys=None):
+        self.hostname = hostname
+        self.pid = pid
+        self.freq = freq
+        self.heartbeats = [] if heartbeats is None else heartbeats
+        self.clock = clock or 0
+        self.active = active
+        self.processed = processed
+        self.loadavg = loadavg
+        self.sw_ident = sw_ident
+        self.sw_ver = sw_ver
+        self.sw_sys = sw_sys
+        self.event = self._create_event_handler()
 
-    def on_heartbeat(self, timestamp=None, local_received=None, **kwargs):
-        """Callback for the :event:`worker-heartbeat` event."""
-        self.update(**kwargs)
-        self.update_heartbeat(local_received, timestamp)
-
-    def update_heartbeat(self, received, timestamp):
-        if not received or not timestamp:
-            return
-        drift = abs(int(received) - int(timestamp))
-        if drift > HEARTBEAT_DRIFT_MAX:
-            warn(DRIFT_WARNING, self.hostname, drift,
-                 datetime.fromtimestamp(received),
-                 datetime.fromtimestamp(timestamp))
-        heartbeats, hbmax = self.heartbeats, self.heartbeat_max
-        if not heartbeats or (received and received > heartbeats[-1]):
-            heappush(heartbeats, received)
-            if len(heartbeats) > hbmax:
-                heartbeats[:] = heartbeats[hbmax:]
+    def __reduce__(self):
+        return self.__class__, (self.hostname, self.pid, self.freq,
+                                self.heartbeats, self.clock, self.active,
+                                self.processed, self.loadavg, self.sw_ident,
+                                self.sw_ver, self.sw_sys)
+
+    def _create_event_handler(self):
+        _set = object.__setattr__
+        heartbeats = self.heartbeats
+        hbmax = self.heartbeat_max
+
+        def event(type_, timestamp=None,
+                  local_received=None, fields=None,
+                  max_drift=HEARTBEAT_DRIFT_MAX, items=items, abs=abs,
+                  heappush=heappush, heappop=heappop, int=int, len=len):
+            fields = fields or {}
+            for k, v in items(fields):
+                _set(self, k, v)
+            if type_ == 'offline':
+                heartbeats[:] = []
+            else:
+                if not local_received or not timestamp:
+                    return
+                drift = abs(int(local_received) - int(timestamp))
+                if drift > HEARTBEAT_DRIFT_MAX:
+                    warn(DRIFT_WARNING, self.hostname, drift,
+                         datetime.fromtimestamp(local_received),
+                         datetime.fromtimestamp(timestamp))
+                if not heartbeats or (
+                        local_received and local_received > heartbeats[-1]):
+                    heappush(heartbeats, local_received)
+                    if len(heartbeats) > hbmax:
+                        heappop(heartbeats)
+        return event
+
+    def update(self, f, **kw):
+        for k, v in items(dict(f, **kw) if kw else f):
+            setattr(self, k, v)
 
     def __repr__(self):
         return R_WORKER.format(self)
@@ -142,17 +183,53 @@ class Worker(AttributeDict):
                                  self.freq, self.expire_window)
 
     @property
-    def alive(self):
-        return bool(self.heartbeats and time() < self.heartbeat_expires)
+    def alive(self, nowfun=time):
+        return bool(self.heartbeats and nowfun() < self.heartbeat_expires)
 
     @property
     def id(self):
         return '{0.hostname}.{0.pid}'.format(self)
 
+    @deprecated(3.2, 3.3)
+    def update_heartbeat(self, received, timestamp):
+        self.event(None, timestamp, received)
+
+    @deprecated(3.2, 3.3)
+    def on_online(self, timestamp=None, local_received=None, **fields):
+        self.event('online', timestamp, local_received, fields)
+
+    @deprecated(3.2, 3.3)
+    def on_offline(self, timestamp=None, local_received=None, **fields):
+        self.event('offline', timestamp, local_received, fields)
+
+    @deprecated(3.2, 3.3)
+    def on_heartbeat(self, timestamp=None, local_received=None, **fields):
+        self.event('heartbeat', timestamp, local_received, fields)
+
+    @class_property
+    def _defaults(cls):
+        """Deprecated, to be removed in 3.3"""
+        source = cls()
+        return dict((k, getattr(source, k)) for k in cls._fields)
+
 
 @with_unique_field('uuid')
-class Task(AttributeDict):
+class Task(object):
     """Task State."""
+    name = received = sent = started = succeeded = failed = retried = \
+        revoked = args = kwargs = eta = expires = retries = worker = result = \
+        exception = timestamp = runtime = traceback = exchange = \
+        routing_key = client = None
+    state = states.PENDING
+    clock = 0
+
+    _fields = ('uuid', 'name', 'state', 'received', 'sent', 'started',
+               'succeeded', 'failed', 'retried', 'revoked', 'args', 'kwargs',
+               'eta', 'expires', 'retries', 'worker', 'result', 'exception',
+               'timestamp', 'runtime', 'traceback', 'exchange', 'routing_key',
+               'clock', 'client')
+    if not PYPY:
+        __slots__ = ('__dict__', '__weakref__')
 
     #: How to merge out of order events.
     #: Disorder is detected by logical ordering (e.g. :event:`task-received`
@@ -166,116 +243,153 @@ class Task(AttributeDict):
                                      'retries', 'eta', 'expires')}
 
     #: meth:`info` displays these fields by default.
-    _info_fields = ('args', 'kwargs', 'retries', 'result',
-                    'eta', 'runtime', 'expires', 'exception',
-                    'exchange', 'routing_key')
-
-    #: Default values.
-    _defaults = dict(uuid=None, name=None, state=states.PENDING,
-                     received=False, sent=False, started=False,
-                     succeeded=False, failed=False, retried=False,
-                     revoked=False, args=None, kwargs=None, eta=None,
-                     expires=None, retries=None, worker=None, result=None,
-                     exception=None, timestamp=None, runtime=None,
-                     traceback=None, exchange=None, routing_key=None,
-                     clock=0)
-
-    def __init__(self, **fields):
-        dict.__init__(self, self._defaults, **fields)
-
-    def update(self, state, timestamp, fields, _state=states.state):
-        """Update state from new event.
-
-        :param state: State from event.
-        :param timestamp: Timestamp from event.
-        :param fields: Event data.
+    _info_fields = ('args', 'kwargs', 'retries', 'result', 'eta', 'runtime',
+                    'expires', 'exception', 'exchange', 'routing_key')
+
+    def __init__(self, uuid=None, **kwargs):
+        self.uuid = uuid
+        if kwargs:
+            for k, v in items(kwargs):
+                setattr(self, k, v)
+
+    def event(self, type_, timestamp=None, local_received=None, fields=None,
+              precedence=states.precedence, items=items, dict=dict,
+              PENDING=states.PENDING, RECEIVED=states.RECEIVED,
+              STARTED=states.STARTED, FAILURE=states.FAILURE,
+              RETRY=states.RETRY, SUCCESS=states.SUCCESS,
+              REVOKED=states.REVOKED):
+        fields = fields or {}
+        if type_ == 'sent':
+            state, self.sent = PENDING, timestamp
+        elif type_ == 'received':
+            state, self.received = RECEIVED, timestamp
+        elif type_ == 'started':
+            state, self.started = STARTED, timestamp
+        elif type_ == 'failed':
+            state, self.failed = FAILURE, timestamp
+        elif type_ == 'retried':
+            state, self.retried = RETRY, timestamp
+        elif type_ == 'succeeded':
+            state, self.succeeded = SUCCESS, timestamp
+        elif type_ == 'revoked':
+            state, self.revoked = REVOKED, timestamp
+        else:
+            state = type_.upper()
 
-        """
-        time_received = fields.get('local_received') or 0
-        if self.worker and time_received:
-            self.worker.update_heartbeat(time_received, timestamp)
-        if state != states.RETRY and self.state != states.RETRY and \
-                _state(state) < _state(self.state):
+        # note that precedence here is reversed
+        # see implementation in celery.states.state.__lt__
+        if state != RETRY and self.state != RETRY and \
+                precedence(state) > precedence(self.state):
             # this state logically happens-before the current state, so merge.
-            self.merge(state, timestamp, fields)
+            keep = self.merge_rules.get(state)
+            if keep is not None:
+                fields = dict(
+                    (k, v) for k, v in items(fields) if k in keep
+                )
+            for key, value in items(fields):
+                setattr(self, key, value)
         else:
             self.state = state
             self.timestamp = timestamp
-            super(Task, self).update(fields)
+            for key, value in items(fields):
+                setattr(self, key, value)
 
-    def merge(self, state, timestamp, fields):
-        """Merge with out of order event."""
-        keep = self.merge_rules.get(state)
-        if keep is not None:
-            fields = dict((key, fields.get(key)) for key in keep)
-            super(Task, self).update(fields)
+    def info(self, fields=None, extra=[]):
+        """Information about this task suitable for on-screen display."""
+        fields = self._info_fields if fields is None else fields
+
+        def _keys():
+            for key in list(fields) + list(extra):
+                value = getattr(self, key, None)
+                if value is not None:
+                    yield key, value
 
+        return dict(_keys())
+
+    def __repr__(self):
+        return R_TASK.format(self)
+
+    def as_dict(self):
+        get = object.__getattribute__
+        return dict(
+            (k, get(self, k)) for k in self._fields
+        )
+
+    def __reduce__(self):
+        return _depickle_task, (self.__class__, self.as_dict())
+
+    @property
+    def origin(self):
+        return self.client if self.worker is None else self.worker.id
+
+    @property
+    def ready(self):
+        return self.state in states.READY_STATES
+
+    @deprecated(3.2, 3.3)
     def on_sent(self, timestamp=None, **fields):
-        """Callback for the :event:`task-sent` event."""
-        self.sent = timestamp
-        self.update(states.PENDING, timestamp, fields)
+        self.event('sent', timestamp, fields)
 
+    @deprecated(3.2, 3.3)
     def on_received(self, timestamp=None, **fields):
-        """Callback for the :event:`task-received` event."""
-        self.received = timestamp
-        self.update(states.RECEIVED, timestamp, fields)
+        self.event('received', timestamp, fields)
 
+    @deprecated(3.2, 3.3)
     def on_started(self, timestamp=None, **fields):
-        """Callback for the :event:`task-started` event."""
-        self.started = timestamp
-        self.update(states.STARTED, timestamp, fields)
+        self.event('started', timestamp, fields)
 
+    @deprecated(3.2, 3.3)
     def on_failed(self, timestamp=None, **fields):
-        """Callback for the :event:`task-failed` event."""
-        self.failed = timestamp
-        self.update(states.FAILURE, timestamp, fields)
+        self.event('failed', timestamp, fields)
 
+    @deprecated(3.2, 3.3)
     def on_retried(self, timestamp=None, **fields):
-        """Callback for the :event:`task-retried` event."""
-        self.retried = timestamp
-        self.update(states.RETRY, timestamp, fields)
+        self.event('retried', timestamp, fields)
 
+    @deprecated(3.2, 3.3)
     def on_succeeded(self, timestamp=None, **fields):
-        """Callback for the :event:`task-succeeded` event."""
-        self.succeeded = timestamp
-        self.update(states.SUCCESS, timestamp, fields)
+        self.event('succeeded', timestamp, fields)
 
+    @deprecated(3.2, 3.3)
     def on_revoked(self, timestamp=None, **fields):
-        """Callback for the :event:`task-revoked` event."""
-        self.revoked = timestamp
-        self.update(states.REVOKED, timestamp, fields)
+        self.event('revoked', timestamp, fields)
 
+    @deprecated(3.2, 3.3)
     def on_unknown_event(self, shortype, timestamp=None, **fields):
-        self.update(shortype.upper(), timestamp, fields)
+        self.event(shortype, timestamp, fields)
 
-    def info(self, fields=None, extra=[]):
-        """Information about this task suitable for on-screen display."""
-        fields = self._info_fields if fields is None else fields
+    @deprecated(3.2, 3.3)
+    def update(self, state, timestamp, fields,
+               _state=states.state, RETRY=states.RETRY):
+        return self.event(state, timestamp, None, fields)
 
-        def _keys():
-            for key in list(fields) + list(extra):
-                value = getattr(self, key, None)
-                if value is not None:
-                    yield key, value
-
-        return dict(_keys())
-
-    def __repr__(self):
-        return R_TASK.format(self)
+    @deprecated(3.2, 3.3)
+    def merge(self, state, timestamp, fields):
+        keep = self.merge_rules.get(state)
+        if keep is not None:
+            fields = dict((k, v) for k, v in items(fields) if k in keep)
+        for key, value in items(fields):
+            setattr(self, key, value)
 
-    @property
-    def ready(self):
-        return self.state in states.READY_STATES
+    @class_property
+    def _defaults(cls):
+        """Deprecated, to be removed in 3.3."""
+        source = cls()
+        return dict((k, getattr(source, k)) for k in source._fields)
 
 
 class State(object):
     """Records clusters state."""
+    Worker = Worker
+    Task = Task
     event_count = 0
     task_count = 0
+    heap_multiplier = 4
 
     def __init__(self, callback=None,
                  workers=None, tasks=None, taskheap=None,
-                 max_workers_in_memory=5000, max_tasks_in_memory=10000):
+                 max_workers_in_memory=5000, max_tasks_in_memory=10000,
+                 on_node_join=None, on_node_leave=None):
         self.event_callback = callback
         self.workers = (LRUCache(max_workers_in_memory)
                         if workers is None else workers)
@@ -284,10 +398,16 @@ class State(object):
         self._taskheap = [] if taskheap is None else taskheap
         self.max_workers_in_memory = max_workers_in_memory
         self.max_tasks_in_memory = max_tasks_in_memory
+        self.on_node_join = on_node_join
+        self.on_node_leave = on_node_leave
         self._mutex = threading.Lock()
-        self.handlers = {'task': self.task_event,
-                         'worker': self.worker_event}
-        self._get_handler = self.handlers.__getitem__
+        self.handlers = {}
+        self._seen_types = set()
+        self.rebuild_taskheap()
+
+    @cached_property
+    def _event(self):
+        return self._create_dispatcher()
 
     def freeze_while(self, fun, *args, **kwargs):
         clear_after = kwargs.pop('clear_after', False)
@@ -330,11 +450,12 @@ class State(object):
         """
         try:
             worker = self.workers[hostname]
-            worker.update(kwargs)
+            if kwargs:
+                worker.update(kwargs)
             return worker, False
         except KeyError:
-            worker = self.workers[hostname] = Worker(
-                hostname=hostname, **kwargs)
+            worker = self.workers[hostname] = self.Worker(
+                hostname, **kwargs)
             return worker, True
 
     def get_or_create_task(self, uuid):
@@ -342,61 +463,112 @@ class State(object):
         try:
             return self.tasks[uuid], False
         except KeyError:
-            task = self.tasks[uuid] = Task(uuid=uuid)
+            task = self.tasks[uuid] = self.Task(uuid)
             return task, True
 
-    def worker_event(self, type, fields):
-        """Process worker event."""
-        try:
-            hostname = fields['hostname']
-        except KeyError:
-            pass
-        else:
-            worker, created = self.get_or_create_worker(hostname)
-            handler = getattr(worker, 'on_' + type, None)
-            if handler:
-                handler(**(fields if CAN_KWDICT else kwdict(fields)))
-            return worker, created
-
-    def task_event(self, type, fields, timetuple=timetuple):
-        """Process task event."""
-        uuid = fields['uuid']
-        hostname = fields['hostname']
-        worker, _ = self.get_or_create_worker(hostname)
-        task, created = self.get_or_create_task(uuid)
-        task.worker = worker
-        maxtasks = self.max_tasks_in_memory * 2
-
-        taskheap = self._taskheap
-        timestamp = fields.get('timestamp') or 0
-        clock = 0 if type == 'sent' else fields.get('clock')
-        heappush(taskheap, timetuple(clock, timestamp, worker.id, task))
-        if len(taskheap) > maxtasks:
-            heappop(taskheap)
-
-        handler = getattr(task, 'on_' + type, None)
-        if type == 'received':
-            self.task_count += 1
-        if handler:
-            handler(**fields)
-        else:
-            task.on_unknown_event(type, **fields)
-        return created
-
     def event(self, event):
         with self._mutex:
-            return self._dispatch_event(event)
-
-    def _dispatch_event(self, event, kwdict=kwdict):
-        self.event_count += 1
-        event = kwdict(event)
-        group, _, subject = event['type'].partition('-')
-        try:
-            self._get_handler(group)(subject, event)
-        except KeyError:
-            pass
-        if self.event_callback:
-            self.event_callback(self, event)
+            return self._event(event)
+
+    def task_event(self, type_, fields):
+        """Deprecated, use :meth:`event`."""
+        return self._event(dict(fields, type='-'.join(['task', type_])))[0]
+
+    def worker_event(self, type_, fields):
+        """Deprecated, use :meth:`event`."""
+        return self._event(dict(fields, type='-'.join(['worker', type_])))[0]
+
+    def _create_dispatcher(self):
+        get_handler = self.handlers.__getitem__
+        event_callback = self.event_callback
+        wfields = itemgetter('hostname', 'timestamp', 'local_received')
+        tfields = itemgetter('uuid', 'hostname', 'timestamp',
+                             'local_received', 'clock')
+        taskheap = self._taskheap
+        # Removing events from task heap is an O(n) operation,
+        # so easier to just account for the common number of events
+        # for each task (PENDING->RECEIVED->STARTED->final)
+        #: an O(n) operation
+        max_events_in_heap = self.max_tasks_in_memory * self.heap_multiplier
+        add_type = self._seen_types.add
+        on_node_join, on_node_leave = self.on_node_join, self.on_node_leave
+        tasks, Task = self.tasks, self.Task
+        workers, Worker = self.workers, self.Worker
+        # avoid updating LRU entry at getitem
+        get_worker, get_task = workers.data.__getitem__, tasks.data.__getitem__
+
+        def _event(event,
+                   timetuple=timetuple, KeyError=KeyError, created=True):
+            self.event_count += 1
+            if event_callback:
+                event_callback(self, event)
+            group, _, subject = event['type'].partition('-')
+            try:
+                handler = get_handler(group)
+            except KeyError:
+                pass
+            else:
+                return handler(subject, event), subject
+
+            if group == 'worker':
+                try:
+                    hostname, timestamp, local_received = wfields(event)
+                except KeyError:
+                    pass
+                else:
+                    try:
+                        worker, created = get_worker(hostname), False
+                    except KeyError:
+                        if subject == 'offline':
+                            worker, created = None, False
+                        else:
+                            worker = workers[hostname] = Worker(hostname)
+                    if worker:
+                        worker.event(subject, timestamp, local_received, event)
+                    if on_node_join and (created or subject == 'online'):
+                        on_node_join(worker)
+                    if on_node_leave and subject == 'offline':
+                        on_node_leave(worker)
+                    return (worker, created), subject
+            elif group == 'task':
+                (uuid, hostname, timestamp,
+                 local_received, clock) = tfields(event)
+                # task-sent event is sent by client, not worker
+                is_client_event = subject == 'sent'
+                try:
+                    task, created = get_task(uuid), False
+                except KeyError:
+                    task = tasks[uuid] = Task(uuid)
+                if is_client_event:
+                    task.client = hostname
+                else:
+                    try:
+                        worker, created = get_worker(hostname), False
+                    except KeyError:
+                        worker = workers[hostname] = Worker(hostname)
+                    task.worker = worker
+                    if worker is not None and local_received:
+                        worker.event(None, local_received, timestamp)
+                origin = hostname if is_client_event else worker.id
+                heappush(taskheap,
+                         timetuple(clock, timestamp, origin, ref(task)))
+                if len(taskheap) > max_events_in_heap:
+                    heappop(taskheap)
+                if subject == 'received':
+                    self.task_count += 1
+                task.event(subject, timestamp, local_received, event)
+                task_name = task.name
+                if task_name is not None:
+                    add_type(task_name)
+                return (task, created), subject
+        return _event
+
+    def rebuild_taskheap(self, timetuple=timetuple, heapify=heapify):
+        heap = self._taskheap[:] = [
+            timetuple(t.clock, t.timestamp, t.origin, ref(t))
+            for t in values(self.tasks)
+        ]
+        heapify(heap)
 
     def itertasks(self, limit=None):
         for index, row in enumerate(items(self.tasks)):
@@ -409,10 +581,12 @@ class State(object):
         in ``(uuid, Task)`` tuples."""
         seen = set()
         for evtup in islice(reversed(self._taskheap), 0, limit):
-            uuid = evtup[3].uuid
-            if uuid not in seen:
-                yield uuid, evtup[3]
-                seen.add(uuid)
+            task = evtup[3]()
+            if task is not None:
+                uuid = task.uuid
+                if uuid not in seen:
+                    yield uuid, task
+                    seen.add(uuid)
     tasks_by_timestamp = tasks_by_time
 
     def tasks_by_type(self, name, limit=None):
@@ -439,8 +613,7 @@ class State(object):
 
     def task_types(self):
         """Return a list of all seen task types."""
-        return list(sorted(set(task.name for task in values(self.tasks)
-                               if task.name is not None)))
+        return sorted(self._seen_types)
 
     def alive_workers(self):
         """Return a list of (seemingly) alive workers."""
@@ -451,6 +624,7 @@ class State(object):
 
     def __reduce__(self):
         return self.__class__, (
-            self.event_callback, self.workers, self.tasks, self._taskheap,
+            self.event_callback, self.workers, self.tasks, None,
             self.max_workers_in_memory, self.max_tasks_in_memory,
+            self.on_node_join, self.on_node_leave,
         )

+ 2 - 2
celery/loaders/__init__.py

@@ -25,13 +25,13 @@ def get_loader_cls(loader):
     return symbol_by_name(loader, LOADER_ALIASES, imp=import_from_cwd)
 
 
-@deprecated(deprecation='2.5', removal='4.0',
+@deprecated(deprecation=2.5, removal=4.0,
             alternative='celery.current_app.loader')
 def current_loader():
     return current_app.loader
 
 
-@deprecated(deprecation='2.5', removal='4.0',
+@deprecated(deprecation=2.5, removal=4.0,
             alternative='celery.current_app.conf')
 def load_settings():
     return current_app.conf

+ 17 - 2
celery/result.py

@@ -9,8 +9,10 @@
 from __future__ import absolute_import
 
 import time
+import warnings
 
 from collections import deque
+from contextlib import contextmanager
 from copy import copy
 
 from kombu.utils import cached_property
@@ -18,7 +20,7 @@ from kombu.utils.compat import OrderedDict
 
 from . import current_app
 from . import states
-from ._state import task_join_will_block
+from ._state import _set_task_join_will_block, task_join_will_block
 from .app import app_or_default
 from .datastructures import DependencyGraph, GraphFormatter
 from .exceptions import IncompleteStream, TimeoutError
@@ -31,12 +33,25 @@ E_WOULDBLOCK = """\
 Never call result.get() within a task!
 See http://docs.celeryq.org/en/latest/userguide/tasks.html\
 #task-synchronous-subtasks
+
+In Celery 3.2 this will result in an exception being
+raised instead of just being a warning.
 """
 
 
 def assert_will_not_block():
     if task_join_will_block():
-        pass   # TODO future version: raise
+        warnings.warn(RuntimeWarning(E_WOULDBLOCK))
+
+
+@contextmanager
+def allow_join_result():
+    reset_value = task_join_will_block()
+    _set_task_join_will_block(False)
+    try:
+        yield
+    finally:
+        _set_task_join_will_block(reset_value)
 
 
 class ResultBase(object):

+ 4 - 4
celery/states.py

@@ -116,16 +116,16 @@ class state(str):
         return fun(precedence(self), precedence(other))
 
     def __gt__(self, other):
-        return self.compare(other, lambda a, b: a < b)
+        return precedence(self) < precedence(other)
 
     def __ge__(self, other):
-        return self.compare(other, lambda a, b: a <= b)
+        return precedence(self) <= precedence(other)
 
     def __lt__(self, other):
-        return self.compare(other, lambda a, b: a > b)
+        return precedence(self) > precedence(other)
 
     def __le__(self, other):
-        return self.compare(other, lambda a, b: a >= b)
+        return precedence(self) >= precedence(other)
 
 #: Task state is unknown (assumed pending since you know the id).
 PENDING = 'PENDING'

+ 3 - 2
celery/tests/app/test_app.py

@@ -174,12 +174,13 @@ class test_App(AppCase):
 
     def test_maybe_close_pool(self):
         cpool = self.app._pool = Mock()
-        ppool = self.app.amqp._producer_pool = Mock()
+        amqp = self.app.__dict__['amqp'] = Mock()
+        ppool = amqp._producer_pool
         self.app._maybe_close_pool()
         cpool.force_close_all.assert_called_with()
         ppool.force_close_all.assert_called_with()
         self.assertIsNone(self.app._pool)
-        self.assertIsNone(self.app.amqp._producer_pool)
+        self.assertIsNone(self.app.__dict__['amqp']._producer_pool)
 
         self.app._pool = Mock()
         self.app._maybe_close_pool()

+ 11 - 6
celery/tests/backends/test_base.py

@@ -14,6 +14,7 @@ from celery.utils.serialization import UnpickleableExceptionWrapper
 from celery.utils.serialization import get_pickleable_exception as gpe
 
 from celery import states
+from celery import group
 from celery.backends.base import (
     BaseBackend,
     KeyValueStoreBackend,
@@ -62,10 +63,10 @@ class test_BaseBackend_interface(AppCase):
     def test_on_chord_part_return(self):
         self.b.on_chord_part_return(None)
 
-    def test_on_chord_apply(self, unlock='celery.chord_unlock'):
+    def test_apply_chord(self, unlock='celery.chord_unlock'):
         self.app.tasks[unlock] = Mock()
-        self.b.on_chord_apply(
-            'dakj221', 'sdokqweok',
+        self.b.apply_chord(
+            group(app=self.app), (), 'dakj221', None,
             result=[self.app.AsyncResult(x) for x in [1, 2, 3]],
         )
         self.assertTrue(self.app.tasks[unlock].apply_async.call_count)
@@ -306,7 +307,7 @@ class test_KeyValueStoreBackend(AppCase):
             self.b.on_chord_part_return(task, propagate=True)
             self.assertFalse(self.b.expire.called)
             deps.delete.assert_called_with()
-            deps.join_native.assert_called_with(propagate=True)
+            deps.join_native.assert_called_with(propagate=True, timeout=3.0)
 
     def test_chord_part_return_propagate_default(self):
         with self._chord_part_context(self.b) as (task, deps, _):
@@ -315,6 +316,7 @@ class test_KeyValueStoreBackend(AppCase):
             deps.delete.assert_called_with()
             deps.join_native.assert_called_with(
                 propagate=self.b.app.conf.CELERY_CHORD_PROPAGATES,
+                timeout=3.0,
             )
 
     def test_chord_part_return_join_raises_internal(self):
@@ -364,9 +366,12 @@ class test_KeyValueStoreBackend(AppCase):
     def test_chord_apply_fallback(self):
         self.b.implements_incr = False
         self.b.fallback_chord_unlock = Mock()
-        self.b.on_chord_apply('group_id', 'body', 'result', foo=1)
+        self.b.apply_chord(
+            group(app=self.app), (), 'group_id', 'body',
+            result='result', foo=1,
+        )
         self.b.fallback_chord_unlock.assert_called_with(
-            'group_id', 'body', 'result', foo=1,
+            'group_id', 'body', result='result', foo=1,
         )
 
     def test_get_missing_meta(self):

+ 5 - 4
celery/tests/backends/test_cache.py

@@ -9,6 +9,7 @@ from kombu.utils.encoding import str_to_bytes
 
 from celery import signature
 from celery import states
+from celery import group
 from celery.backends.cache import CacheBackend, DummyClient
 from celery.exceptions import ImproperlyConfigured
 from celery.five import items, string, text_t
@@ -62,10 +63,10 @@ class test_CacheBackend(AppCase):
             self.assertEqual(self.tb.get_status(self.tid), states.FAILURE)
             self.assertIsInstance(self.tb.get_result(self.tid), KeyError)
 
-    def test_on_chord_apply(self):
+    def test_apply_chord(self):
         tb = CacheBackend(backend='memory://', app=self.app)
         gid, res = uuid(), [self.app.AsyncResult(uuid()) for _ in range(3)]
-        tb.on_chord_apply(gid, {}, result=res)
+        tb.apply_chord(group(app=self.app), (), gid, {}, result=res)
 
     @patch('celery.result.GroupResult.restore')
     def test_on_chord_part_return(self, restore):
@@ -82,14 +83,14 @@ class test_CacheBackend(AppCase):
 
         gid, res = uuid(), [self.app.AsyncResult(uuid()) for _ in range(3)]
         task.request.group = gid
-        tb.on_chord_apply(gid, {}, result=res)
+        tb.apply_chord(group(app=self.app), (), gid, {}, result=res)
 
         self.assertFalse(deps.join_native.called)
         tb.on_chord_part_return(task)
         self.assertFalse(deps.join_native.called)
 
         tb.on_chord_part_return(task)
-        deps.join_native.assert_called_with(propagate=True)
+        deps.join_native.assert_called_with(propagate=True, timeout=3.0)
         deps.delete.assert_called_with()
 
     def test_mget(self):

+ 50 - 8
celery/tests/backends/test_redis.py

@@ -8,8 +8,9 @@ from kombu.utils import cached_property, uuid
 
 from celery import signature
 from celery import states
+from celery import group
 from celery.datastructures import AttributeDict
-from celery.exceptions import ImproperlyConfigured
+from celery.exceptions import CPendingDeprecationWarning, ImproperlyConfigured
 from celery.utils.timeutils import timedelta_seconds
 
 from celery.tests.case import (
@@ -62,6 +63,11 @@ class redis(object):
         def __init__(self, **kwargs):
             pass
 
+    class UnixDomainSocketConnection(object):
+
+        def __init__(self, **kwargs):
+            pass
+
 
 class test_RedisBackend(AppCase):
 
@@ -99,9 +105,45 @@ class test_RedisBackend(AppCase):
             self.MockBackend(app=self.app)
 
     def test_url(self):
-        x = self.MockBackend('redis://foobar//1', app=self.app)
-        self.assertEqual(x.host, 'foobar')
-        self.assertEqual(x.db, '1')
+        x = self.MockBackend(
+            'redis://:bosco@vandelay.com:123//1', app=self.app,
+        )
+        self.assertTrue(x.connparams)
+        self.assertEqual(x.connparams['host'], 'vandelay.com')
+        self.assertEqual(x.connparams['db'], 1)
+        self.assertEqual(x.connparams['port'], 123)
+        self.assertEqual(x.connparams['password'], 'bosco')
+
+    def test_socket_url(self):
+        x = self.MockBackend(
+            'socket:///tmp/redis.sock?virtual_host=/3', app=self.app,
+        )
+        self.assertTrue(x.connparams)
+        self.assertEqual(x.connparams['path'], '/tmp/redis.sock')
+        self.assertIs(
+            x.connparams['connection_class'],
+            redis.UnixDomainSocketConnection,
+        )
+        self.assertNotIn('host', x.connparams)
+        self.assertNotIn('port', x.connparams)
+        self.assertEqual(x.connparams['db'], 3)
+
+    def test_compat_propertie(self):
+        x = self.MockBackend(
+            'redis://:bosco@vandelay.com:123//1', app=self.app,
+        )
+        with self.assertWarnsRegex(CPendingDeprecationWarning,
+                                   r'scheduled for deprecation'):
+            self.assertEqual(x.host, 'vandelay.com')
+        with self.assertWarnsRegex(CPendingDeprecationWarning,
+                                   r'scheduled for deprecation'):
+            self.assertEqual(x.db, 1)
+        with self.assertWarnsRegex(CPendingDeprecationWarning,
+                                   r'scheduled for deprecation'):
+            self.assertEqual(x.port, 123)
+        with self.assertWarnsRegex(CPendingDeprecationWarning,
+                                   r'scheduled for deprecation'):
+            self.assertEqual(x.password, 'bosco')
 
     def test_conf_raises_KeyError(self):
         self.app.conf = AttributeDict({
@@ -130,9 +172,9 @@ class test_RedisBackend(AppCase):
         b = self.Backend(expires=timedelta(minutes=1), app=self.app)
         self.assertEqual(b.expires, 60)
 
-    def test_on_chord_apply(self):
-        self.Backend(app=self.app).on_chord_apply(
-            'group_id', {},
+    def test_apply_chord(self):
+        self.Backend(app=self.app).apply_chord(
+            group(app=self.app), (), 'group_id', {},
             result=[self.app.AsyncResult(x) for x in [1, 2, 3]],
         )
 
@@ -165,7 +207,7 @@ class test_RedisBackend(AppCase):
 
         b.client.incr.return_value = len(deps)
         b.on_chord_part_return(task)
-        deps.join_native.assert_called_with(propagate=True)
+        deps.join_native.assert_called_with(propagate=True, timeout=3.0)
         deps.delete.assert_called_with()
 
         self.assertTrue(b.client.expire.call_count)

+ 10 - 6
celery/tests/bin/test_celery.py

@@ -62,7 +62,7 @@ class test__main__(AppCase):
             with patch('celery.__main__._warn_deprecated') as depr:
                 with patch('celery.bin.multi.main') as main:
                     __main__._compat_multi()
-                    mpc.assert_called_with()
+                    self.assertFalse(mpc.called)
                     depr.assert_called_with('celery multi')
                     main.assert_called_with()
 
@@ -371,9 +371,13 @@ class test_CeleryCommand(AppCase):
 
         Dummy = x.commands['dummy'] = Mock()
         dummy = Dummy.return_value = Mock()
-        dummy.run_from_argv.side_effect = Error('foo', status='EX_FAILURE')
+        exc = dummy.run_from_argv.side_effect = Error(
+            'foo', status='EX_FAILURE',
+        )
+        x.on_error = Mock(name='on_error')
         help.reset()
         x.execute('dummy', ['dummy'])
+        x.on_error.assert_called_with(exc)
         dummy.run_from_argv.assert_called_with(
             x.prog_name, [], command='dummy',
         )
@@ -520,12 +524,12 @@ class test_control(AppCase):
     def test_rate_limit(self):
         i = self.control(True)
         i.rate_limit('rate_limit', 'proj.add', '1/s')
-        i.call.assert_called_with('rate_limit', 'proj.add', '1/s', reply=True)
+        i.call.assert_called_with('rate_limit', 'proj.add', '1/s')
 
     def test_time_limit(self):
         i = self.control(True)
         i.time_limit('time_limit', 'proj.add', 10, 30)
-        i.call.assert_called_with('time_limit', 'proj.add', 10, 30, reply=True)
+        i.call.assert_called_with('time_limit', 'proj.add', 10, 30)
 
     def test_add_consumer(self):
         i = self.control(True)
@@ -535,13 +539,13 @@ class test_control(AppCase):
         )
         i.call.assert_called_with(
             'add_consumer', 'queue', 'exchange', 'topic', 'rkey',
-            durable=True, reply=True,
+            durable=True,
         )
 
     def test_cancel_consumer(self):
         i = self.control(True)
         i.cancel_consumer('cancel_consumer', 'queue')
-        i.call.assert_called_with('cancel_consumer', 'queue', reply=True)
+        i.call.assert_called_with('cancel_consumer', 'queue')
 
 
 class test_multi(AppCase):

+ 101 - 48
celery/tests/events/test_state.py

@@ -2,6 +2,7 @@ from __future__ import absolute_import
 
 import pickle
 
+from decimal import Decimal
 from random import shuffle
 from time import time
 from itertools import count
@@ -19,6 +20,14 @@ from celery.five import range
 from celery.utils import uuid
 from celery.tests.case import AppCase, patch
 
+try:
+    Decimal(2.6)
+except TypeError:  # pragma: no cover
+    # Py2.6: Must first convert float to str
+    _float_to_decimal = str
+else:
+    _float_to_decimal = lambda f: f  # noqa
+
 
 class replay(object):
 
@@ -34,7 +43,10 @@ class replay(object):
     def next_event(self):
         ev = self.events[next(self.position)]
         ev['local_received'] = ev['timestamp']
-        self.current_clock = ev.get('clock') or self.current_clock + 1
+        try:
+            self.current_clock = ev['clock']
+        except KeyError:
+            ev['clock'] = self.current_clock = self.current_clock + 1
         return ev
 
     def __iter__(self):
@@ -94,10 +106,10 @@ class ev_task_states(replay):
         ]
 
 
-def QTEV(type, uuid, hostname, clock, timestamp=None):
+def QTEV(type, uuid, hostname, clock, name=None, timestamp=None):
     """Quick task event."""
     return Event('task-{0}'.format(type), uuid=uuid, hostname=hostname,
-                 clock=clock, timestamp=timestamp or time())
+                 clock=clock, name=name, timestamp=timestamp or time())
 
 
 class ev_logical_clock_ordering(replay):
@@ -115,18 +127,18 @@ class ev_logical_clock_ordering(replay):
         offset = self.offset
         tA, tB, tC = self.uids
         self.events = [
-            QTEV('received', tA, 'w1', clock=offset + 1),
-            QTEV('received', tB, 'w2', clock=offset + 1),
-            QTEV('started', tA, 'w1', clock=offset + 3),
-            QTEV('received', tC, 'w2', clock=offset + 3),
-            QTEV('started', tB, 'w2', clock=offset + 5),
-            QTEV('retried', tA, 'w1', clock=offset + 7),
-            QTEV('succeeded', tB, 'w2', clock=offset + 9),
-            QTEV('started', tC, 'w2', clock=offset + 10),
-            QTEV('received', tA, 'w3', clock=offset + 13),
-            QTEV('succeded', tC, 'w2', clock=offset + 12),
-            QTEV('started', tA, 'w3', clock=offset + 14),
-            QTEV('succeeded', tA, 'w3', clock=offset + 16),
+            QTEV('received', tA, 'w1', name='tA', clock=offset + 1),
+            QTEV('received', tB, 'w2', name='tB', clock=offset + 1),
+            QTEV('started', tA, 'w1', name='tA', clock=offset + 3),
+            QTEV('received', tC, 'w2', name='tC', clock=offset + 3),
+            QTEV('started', tB, 'w2', name='tB', clock=offset + 5),
+            QTEV('retried', tA, 'w1', name='tA', clock=offset + 7),
+            QTEV('succeeded', tB, 'w2', name='tB', clock=offset + 9),
+            QTEV('started', tC, 'w2', name='tC', clock=offset + 10),
+            QTEV('received', tA, 'w3', name='tA', clock=offset + 13),
+            QTEV('succeded', tC, 'w2', name='tC', clock=offset + 12),
+            QTEV('started', tA, 'w3', name='tA', clock=offset + 14),
+            QTEV('succeeded', tA, 'w3', name='TA', clock=offset + 16),
         ]
 
     def rewind_with_offset(self, offset, uids=None):
@@ -168,9 +180,20 @@ class test_Worker(AppCase):
             hash(Worker(hostname='foo')), hash(Worker(hostname='bar')),
         )
 
+    def test_compatible_with_Decimal(self):
+        w = Worker('george@vandelay.com')
+        timestamp, local_received = Decimal(_float_to_decimal(time())), time()
+        w.event('worker-online', timestamp, local_received, fields={
+            'hostname': 'george@vandelay.com',
+            'timestamp': timestamp,
+            'local_received': local_received,
+            'freq': Decimal(_float_to_decimal(5.6335431)),
+        })
+        self.assertTrue(w.alive)
+
     def test_survives_missing_timestamp(self):
         worker = Worker(hostname='foo')
-        worker.on_heartbeat(timestamp=None)
+        worker.event('heartbeat')
         self.assertEqual(worker.heartbeats, [])
 
     def test_repr(self):
@@ -179,15 +202,15 @@ class test_Worker(AppCase):
     def test_drift_warning(self):
         worker = Worker(hostname='foo')
         with patch('celery.events.state.warn') as warn:
-            worker.update_heartbeat(time(), time() + (HEARTBEAT_DRIFT_MAX * 2))
+            worker.event(None, time() + (HEARTBEAT_DRIFT_MAX * 2), time())
             self.assertTrue(warn.called)
             self.assertIn('Substantial drift', warn.call_args[0][0])
 
-    def test_update_heartbeat(self):
+    def test_updates_heartbeat(self):
         worker = Worker(hostname='foo')
-        worker.update_heartbeat(time(), time())
+        worker.event(None, time(), time())
         self.assertEqual(len(worker.heartbeats), 1)
-        worker.update_heartbeat(time() - 10, time())
+        worker.event(None, time(), time() - 10)
         self.assertEqual(len(worker.heartbeats), 1)
 
 
@@ -238,26 +261,28 @@ class test_Task(AppCase):
     def test_ready(self):
         task = Task(uuid='abcdefg',
                     name='tasks.add')
-        task.on_received(timestamp=time())
+        task.event('received', time(), time())
         self.assertFalse(task.ready)
-        task.on_succeeded(timestamp=time())
+        task.event('succeeded', time(), time())
         self.assertTrue(task.ready)
 
     def test_sent(self):
         task = Task(uuid='abcdefg',
                     name='tasks.add')
-        task.on_sent(timestamp=time())
+        task.event('sent', time(), time())
         self.assertEqual(task.state, states.PENDING)
 
     def test_merge(self):
         task = Task()
-        task.on_failed(timestamp=time())
-        task.on_started(timestamp=time())
-        task.on_received(timestamp=time(), name='tasks.add', args=(2, 2))
+        task.event('failed', time(), time())
+        task.event('started', time(), time())
+        task.event('received', time(), time(), {
+            'name': 'tasks.add', 'args': (2, 2),
+        })
         self.assertEqual(task.state, states.FAILURE)
         self.assertEqual(task.name, 'tasks.add')
         self.assertTupleEqual(task.args, (2, 2))
-        task.on_retried(timestamp=time())
+        task.event('retried', time(), time())
         self.assertEqual(task.state, states.RETRY)
 
     def test_repr(self):
@@ -452,32 +477,60 @@ class test_State(AppCase):
 
     def test_survives_unknown_worker_event(self):
         s = State()
-        s.worker_event('worker-unknown-event-xxx', {'foo': 'bar'})
-        s.worker_event('worker-unknown-event-xxx', {'hostname': 'xxx',
-                                                    'foo': 'bar'})
+        s.event({
+            'type': 'worker-unknown-event-xxx',
+            'foo': 'bar',
+        })
+        s.event({
+            'type': 'worker-unknown-event-xxx',
+            'hostname': 'xxx',
+            'foo': 'bar',
+        })
 
     def test_survives_unknown_task_event(self):
         s = State()
-        s.task_event('task-unknown-event-xxx', {'foo': 'bar',
-                                                'uuid': 'x',
-                                                'hostname': 'y'})
+        s.event(
+            {
+                'type': 'task-unknown-event-xxx',
+                'foo': 'bar',
+                'uuid': 'x',
+                'hostname': 'y',
+                'timestamp': time(),
+                'local_received': time(),
+                'clock': 0,
+            },
+        )
 
     def test_limits_maxtasks(self):
-        s = State()
-        s.max_tasks_in_memory = 1
-        s.task_event('task-unknown-event-xxx', {'foo': 'bar',
-                                                'uuid': 'x',
-                                                'hostname': 'y',
-                                                'clock': 3})
-        s.task_event('task-unknown-event-xxx', {'foo': 'bar',
-                                                'uuid': 'y',
-                                                'hostname': 'y',
-                                                'clock': 4})
-
-        s.task_event('task-unknown-event-xxx', {'foo': 'bar',
-                                                'uuid': 'z',
-                                                'hostname': 'y',
-                                                'clock': 5})
+        s = State(max_tasks_in_memory=1)
+        s.heap_multiplier = 2
+        s.event({
+            'type': 'task-unknown-event-xxx',
+            'foo': 'bar',
+            'uuid': 'x',
+            'hostname': 'y',
+            'clock': 3,
+            'timestamp': time(),
+            'local_received': time(),
+        })
+        s.event({
+            'type': 'task-unknown-event-xxx',
+            'foo': 'bar',
+            'uuid': 'y',
+            'hostname': 'y',
+            'clock': 4,
+            'timestamp': time(),
+            'local_received': time(),
+        })
+        s.event({
+            'type': 'task-unknown-event-xxx',
+            'foo': 'bar',
+            'uuid': 'z',
+            'hostname': 'y',
+            'clock': 5,
+            'timestamp': time(),
+            'local_received': time(),
+        })
         self.assertEqual(len(s._taskheap), 2)
         self.assertEqual(s._taskheap[0].clock, 4)
         self.assertEqual(s._taskheap[1].clock, 5)

+ 13 - 7
celery/tests/tasks/test_chord.py

@@ -5,7 +5,7 @@ from contextlib import contextmanager
 from celery import group
 from celery import canvas
 from celery import result
-from celery.exceptions import ChordError
+from celery.exceptions import ChordError, Retry
 from celery.five import range
 from celery.result import AsyncResult, GroupResult, EagerResult
 from celery.tests.case import AppCase, Mock
@@ -54,6 +54,7 @@ class TSRNoReport(TSR):
 def patch_unlock_retry(app):
     unlock = app.tasks['celery.chord_unlock']
     retry = Mock()
+    retry.return_value = Retry()
     prev, unlock.retry = unlock.retry, retry
     try:
         yield unlock, retry
@@ -148,11 +149,16 @@ class test_unlock_chord_task(ChordCase):
                     setup(callback)
                 try:
                     assert self.app.tasks['celery.chord_unlock'] is unlock
-                    unlock(
-                        'group_id', callback_s,
-                        result=[self.app.AsyncResult(r) for r in ['1', 2, 3]],
-                        GroupResult=ResultCls, **kwargs
-                    )
+                    try:
+                        unlock(
+                            'group_id', callback_s,
+                            result=[
+                                self.app.AsyncResult(r) for r in ['1', 2, 3]
+                            ],
+                            GroupResult=ResultCls, **kwargs
+                        )
+                    except Retry:
+                        pass
                 finally:
                     canvas.maybe_signature = subtask
                 yield callback_s, retry, fail_current
@@ -224,4 +230,4 @@ class test_Chord_task(ChordCase):
         body = dict()
         Chord(group(self.add.subtask((i, i)) for i in range(5)), body)
         Chord([self.add.subtask((j, j)) for j in range(5)], body)
-        self.assertEqual(self.app.backend.on_chord_apply.call_count, 2)
+        self.assertEqual(self.app.backend.apply_chord.call_count, 2)

+ 5 - 4
celery/tests/worker/test_consumer.py

@@ -433,8 +433,13 @@ class test_Gossip(AppCase):
     def test_on_message(self):
         c = self.Consumer()
         g = Gossip(c)
+        self.assertTrue(g.enabled)
         prepare = Mock()
         prepare.return_value = 'worker-online', {}
+        c.app.events.State.assert_called_with(
+            on_node_join=g.on_node_join,
+            on_node_leave=g.on_node_leave,
+        )
         g.update_state = Mock()
         worker = Mock()
         g.on_node_join = Mock()
@@ -450,20 +455,16 @@ class test_Gossip(AppCase):
         g.event_handlers = {}
 
         g.on_message(prepare, message)
-        g.on_node_join.assert_called_with(worker)
 
         message.delivery_info = {'routing_key': 'worker-offline'}
         prepare.return_value = 'worker-offline', {}
         g.on_message(prepare, message)
-        g.on_node_leave.assert_called_with(worker)
 
         message.delivery_info = {'routing_key': 'worker-baz'}
         prepare.return_value = 'worker-baz', {}
         g.update_state.return_value = worker, 0
         g.on_message(prepare, message)
 
-        g.on_node_leave.reset_mock()
         message.headers = {'hostname': g.hostname}
         g.on_message(prepare, message)
-        self.assertFalse(g.on_node_leave.called)
         g.clock.forward.assert_called_with()

+ 63 - 6
celery/utils/__init__.py

@@ -56,6 +56,8 @@ WORKER_DIRECT_QUEUE_FORMAT = '{hostname}.dq'
 #: Separator for worker node name and hostname.
 NODENAME_SEP = '@'
 
+NODENAME_DEFAULT = 'celery'
+
 
 def worker_direct(hostname):
     """Return :class:`kombu.Queue` that is a direct route to
@@ -74,7 +76,7 @@ def worker_direct(hostname):
 
 
 def warn_deprecated(description=None, deprecation=None,
-                    removal=None, alternative=None):
+                    removal=None, alternative=None, stacklevel=2):
     ctx = {'description': description,
            'deprecation': deprecation, 'removal': removal,
            'alternative': alternative}
@@ -82,21 +84,21 @@ def warn_deprecated(description=None, deprecation=None,
         w = CPendingDeprecationWarning(PENDING_DEPRECATION_FMT.format(**ctx))
     else:
         w = CDeprecationWarning(DEPRECATION_FMT.format(**ctx))
-    warnings.warn(w)
+    warnings.warn(w, stacklevel=stacklevel)
 
 
-def deprecated(description=None, deprecation=None,
-               removal=None, alternative=None):
+def deprecated(deprecation=None, removal=None,
+               alternative=None, description=None):
     """Decorator for deprecated functions.
 
     A deprecation warning will be emitted when the function is called.
 
-    :keyword description: Description of what is being deprecated.
     :keyword deprecation: Version that marks first deprecation, if this
       argument is not set a ``PendingDeprecationWarning`` will be emitted
       instead.
     :keyword removed:  Future version when this feature will be removed.
     :keyword alternative:  Instructions for an alternative solution (if any).
+    :keyword description: Description of what is being deprecated.
 
     """
     def _inner(fun):
@@ -107,12 +109,63 @@ def deprecated(description=None, deprecation=None,
             warn_deprecated(description=description or qualname(fun),
                             deprecation=deprecation,
                             removal=removal,
-                            alternative=alternative)
+                            alternative=alternative,
+                            stacklevel=3)
             return fun(*args, **kwargs)
         return __inner
     return _inner
 
 
+def deprecated_property(deprecation=None, removal=None,
+                        alternative=None, description=None):
+    def _inner(fun):
+        return _deprecated_property(
+            fun, deprecation=deprecation, removal=removal,
+            alternative=alternative, description=description or fun.__name__)
+    return _inner
+
+
+class _deprecated_property(object):
+
+    def __init__(self, fget=None, fset=None, fdel=None, doc=None, **depreinfo):
+        self.__get = fget
+        self.__set = fset
+        self.__del = fdel
+        self.__name__, self.__module__, self.__doc__ = (
+            fget.__name__, fget.__module__, fget.__doc__,
+        )
+        self.depreinfo = depreinfo
+        self.depreinfo.setdefault('stacklevel', 3)
+
+    def __get__(self, obj, type=None):
+        if obj is None:
+            return self
+        warn_deprecated(**self.depreinfo)
+        return self.__get(obj)
+
+    def __set__(self, obj, value):
+        if obj is None:
+            return self
+        if self.__set is None:
+            raise AttributeError('cannot set attribute')
+        warn_deprecated(**self.depreinfo)
+        self.__set(obj, value)
+
+    def __delete__(self, obj):
+        if obj is None:
+            return self
+        if self.__del is None:
+            raise AttributeError('cannot delete attribute')
+        warn_deprecated(**self.depreinfo)
+        self.__del(obj)
+
+    def setter(self, fset):
+        return self.__class__(self.__get, fset, self.__del, **self.depreinfo)
+
+    def deleter(self, fdel):
+        return self.__class__(self.__get, self.__set, fdel, **self.depreinfo)
+
+
 def lpmerge(L, R):
     """In place left precedent dictionary merge.
 
@@ -284,6 +337,10 @@ def nodesplit(nodename):
     return parts
 
 
+def default_nodename(hostname):
+    name, host = nodesplit(hostname or '')
+    return nodename(name or NODENAME_DEFAULT, host or socket.gethostname())
+
 # ------------------------------------------------------------------------ #
 # > XXX Compat
 from .log import LOG_LEVELS     # noqa

+ 1 - 1
celery/utils/timeutils.py

@@ -335,7 +335,7 @@ class ffwd(object):
 
 def utcoffset():
     if _time.daylight:
-        return (__timezone__ + __altzone__) // 3600
+        return __altzone__ // 3600
     return __timezone__ // 3600
 
 

+ 1 - 7
celery/worker/__init__.py

@@ -12,7 +12,6 @@
 from __future__ import absolute_import
 
 import os
-import socket
 import sys
 import traceback
 try:
@@ -33,7 +32,7 @@ from celery.exceptions import (
     ImproperlyConfigured, SystemTerminate, TaskRevokedError,
 )
 from celery.five import string_t, values
-from celery.utils import nodename, nodesplit, worker_direct
+from celery.utils import default_nodename, worker_direct
 from celery.utils.imports import reload_from_cwd
 from celery.utils.log import mlevel, worker_logger as logger
 from celery.utils.threads import default_socket_timeout
@@ -65,11 +64,6 @@ def str_to_list(s):
     return s
 
 
-def default_nodename(hostname):
-    name, host = nodesplit(hostname or '')
-    return nodename(name or 'celery', host or socket.gethostname())
-
-
 class WorkController(object):
     """Unmanaged worker instance."""
     app = None

+ 1 - 1
celery/worker/components.py

@@ -86,7 +86,7 @@ class Hub(bootsteps.StartStopStep):
 
     def _patch_thread_primitives(self, w):
         # make clock use dummy lock
-        w.app.clock.lock = DummyLock()
+        w.app.clock.mutex = DummyLock()
         # multiprocessing's ApplyResult uses this lock.
         try:
             from billiard import pool

+ 8 - 12
celery/worker/consumer.py

@@ -389,9 +389,10 @@ class Consumer(object):
         if not cset.consuming_from(queue):
             cset.add_queue(q)
             cset.consume()
-            info('Started consuming from %r', queue)
+            info('Started consuming from %s', queue)
 
     def cancel_task_queue(self, queue):
+        info('Cancelling queue %s', queue)
         self.app.amqp.queues.deselect(queue)
         self.task_consumer.cancel_by_queue(queue)
 
@@ -644,10 +645,13 @@ class Gossip(bootsteps.ConsumerStep):
 
         self.timer = c.timer
         if self.enabled:
-            self.state = c.app.events.State()
+            self.state = c.app.events.State(
+                on_node_join=self.on_node_join,
+                on_node_leave=self.on_node_leave,
+            )
             if c.hub:
                 c._mutex = DummyLock()
-            self.update_state = self.state.worker_event
+            self.update_state = self.state.event
         self.interval = interval
         self._tref = None
         self.consensus_requests = defaultdict(list)
@@ -768,15 +772,7 @@ class Gossip(bootsteps.ConsumerStep):
                     message.payload['hostname'])
         if hostname != self.hostname:
             type, event = prepare(message.payload)
-            group, _, subject = type.partition('-')
-            worker, created = self.update_state(subject, event)
-            if subject == 'offline':
-                try:
-                    self.on_node_leave(worker)
-                finally:
-                    self.state.workers.pop(worker.hostname, None)
-            elif created or subject == 'online':
-                self.on_node_join(worker)
+            obj, subject = self.update_state(event)
         else:
             self.clock.forward()
 

+ 2 - 3
celery/worker/heartbeat.py

@@ -9,10 +9,9 @@
 """
 from __future__ import absolute_import
 
-from celery.five import values
 from celery.utils.sysinfo import load_average
 
-from .state import SOFTWARE_INFO, active_requests, total_count
+from .state import SOFTWARE_INFO, active_requests, all_total_count
 
 __all__ = ['Heart']
 
@@ -40,7 +39,7 @@ class Heart(object):
     def _send(self, event):
         return self.eventer.send(event, freq=self.interval,
                                  active=len(active_requests),
-                                 processed=sum(values(total_count)),
+                                 processed=all_total_count[0],
                                  loadavg=load_average(),
                                  **SOFTWARE_INFO)
 

+ 4 - 1
celery/worker/job.py

@@ -48,6 +48,9 @@ debug, info, warn, error = (logger.debug, logger.info,
 _does_info = False
 _does_debug = False
 
+#: Max length of result representation
+RESULT_MAXLEN = 128
+
 
 def __optimize__():
     # this is also called by celery.app.trace.setup_worker_optimizations
@@ -512,7 +515,7 @@ class Request(object):
             self.on_reject(logger, self.connection_errors, requeue)
             self.acknowledged = True
 
-    def repr_result(self, result, maxlen=46):
+    def repr_result(self, result, maxlen=RESULT_MAXLEN):
         # 46 is the length needed to fit
         #     'the quick brown fox jumps over the lazy dog' :)
         if not isinstance(result, string_t):

+ 5 - 1
celery/worker/state.py

@@ -51,6 +51,9 @@ active_requests = set()
 #: count of tasks accepted by the worker, sorted by type.
 total_count = Counter()
 
+#: count of all tasks accepted by the worker
+all_total_count = [0]
+
 #: the list of currently revoked tasks.  Persistent if statedb set.
 revoked = LimitedSet(maxlen=REVOKES_MAX, expires=REVOKE_EXPIRES)
 
@@ -68,10 +71,11 @@ def maybe_shutdown():
         raise SystemTerminate()
 
 
-def task_accepted(request):
+def task_accepted(request, _all_total_count=all_total_count):
     """Updates global state when a task has been accepted."""
     active_requests.add(request)
     total_count[request.name] += 1
+    all_total_count[0] += 1
 
 
 def task_ready(request):

+ 20 - 0
docs/configuration.rst

@@ -896,6 +896,26 @@ Example::
 
 .. setting:: BROKER_TRANSPORT
 
+BROKER_FAILOVER_STRATEGY
+~~~~~~~~~~~~~~~~~~~~~~~~
+
+Default failover strategy for the broker Connection object. If supplied,
+may map to a key in 'kombu.connection.failover_strategies', or be a reference
+to any method that yields a single item from a supplied list.
+
+Example::
+
+    # Random failover strategy
+    def random_failover_strategy(servers):
+        it = list(it)  # don't modify callers list
+        shuffle = random.shuffle
+        for _ in repeat(None):
+            shuffle(it)
+            yield it[0]
+
+    BROKER_FAILOVER_STRATEGY=random_failover_strategy
+
+
 BROKER_TRANSPORT
 ~~~~~~~~~~~~~~~~
 :Aliases: ``BROKER_BACKEND``

+ 3 - 3
docs/getting-started/brokers/rabbitmq.rst

@@ -48,15 +48,15 @@ allow that user access to that virtual host:
 
 .. code-block:: bash
 
-    $ rabbitmqctl add_user myuser mypassword
+    $ sudo rabbitmqctl add_user myuser mypassword
 
 .. code-block:: bash
 
-    $ rabbitmqctl add_vhost myvhost
+    $ sudo rabbitmqctl add_vhost myvhost
 
 .. code-block:: bash
 
-    $ rabbitmqctl set_permissions -p myvhost myuser ".*" ".*" ".*"
+    $ sudo rabbitmqctl set_permissions -p myvhost myuser ".*" ".*" ".*"
 
 See the RabbitMQ `Admin Guide`_ for more information about `access control`_.
 

+ 10 - 0
docs/glossary.rst

@@ -71,3 +71,13 @@ Glossary
     cipater
         Celery release 3.1 named after song by Autechre
         (http://www.youtube.com/watch?v=OHsaqUr_33Y)
+
+    prefetch multiplier
+        The :term:`prefetch count` is configured by using the
+        :setting:`CELERYD_PREFETCH_MULTIPLIER` setting, which is multiplied
+        by the number of pool slots (threads/processes/greenthreads).
+
+    prefetch count
+        Maximum number of unacknowledged messages a consumer can hold and if
+        exceeded the transport should not deliver any more messages to that
+        consumer.  See :ref:`optimizing-prefetch-limit`.

+ 1 - 1
docs/includes/introduction.txt

@@ -1,4 +1,4 @@
-:Version: 3.1.6 (Cipater)
+:Version: 3.1.7 (Cipater)
 :Web: http://celeryproject.org/
 :Download: http://pypi.python.org/pypi/celery/
 :Source: http://github.com/celery/celery/

+ 25 - 1
docs/internals/protov2.rst

@@ -51,6 +51,29 @@ Notes
 - ``correlation_id`` replaces ``task_id`` field.
 
 
+- ``c_shadow`` lets you specify a different name for logs, monitors
+  can be used for e.g. meta tasks that calls any function::
+
+    from celery.utils.imports import qualname
+
+    class PickleTask(Task):
+        abstract = True
+
+        def unpack_args(self, fun, args=()):
+            return fun, args
+
+        def apply_async(self, args, kwargs, **options):
+            fun, real_args = self.unpack_args(*args)
+            return super(PickleTask, self).apply_async(
+                (fun, real_args, kwargs), shadow=qualname(fun), **options
+            )
+
+    @app.task(base=PickleTask)
+    def call(fun, args, kwargs):
+        return fun(*args, **kwargs)
+
+
+
 Undecided
 ---------
 
@@ -81,7 +104,8 @@ Definition
         'c_type': (string)task,
 
         # optional
-        'c_meth': (string)'',
+        'c_meth': (string)unused,
+        'c_shadow': (string)replace_name,
         'eta': (iso8601)eta,
         'expires'; (iso8601)expires,
         'callbacks': (list)Signature,

+ 4 - 2
docs/userguide/optimizing.rst

@@ -137,7 +137,7 @@ or that the messages may not even fit in memory.
 
 The workers' default prefetch count is the
 :setting:`CELERYD_PREFETCH_MULTIPLIER` setting multiplied by the number
-of child worker processes [*]_.
+of concurrency slots[*]_ (processes/threads/greenthreads).
 
 If you have many tasks with a long duration you want
 the multiplier value to be 1, which means it will only reserve one
@@ -189,9 +189,11 @@ You can enable this behavior by using the following configuration options:
     CELERY_ACKS_LATE = True
     CELERYD_PREFETCH_MULTIPLIER = 1
 
+.. _prefork-pool-prefetch:
+
 Prefork pool prefetch settings
 ------------------------------
-    
+
 The prefork pool will asynchronously send as many tasks to the processes
 as it can and this means that the processes are, in effect, prefetching
 tasks.

+ 3 - 2
docs/userguide/periodic-tasks.rst

@@ -249,8 +249,9 @@ To start the :program:`celery beat` service:
     $ celery beat
 
 You can also start embed `beat` inside the worker by enabling
-workers `-B` option, this is convenient if you only intend to
-use one worker node:
+workers `-B` option, this is convenient if you will never run
+more than one worker node, but it's not commonly used and for that
+reason is not recommended for production use:
 
 .. code-block:: bash
 

+ 8 - 4
docs/userguide/signals.rst

@@ -298,7 +298,7 @@ used to route a task to any specific worker:
 
     @celeryd_after_setup.connect
     def setup_direct_queue(sender, instance, **kwargs):
-        queue_name = '{0}.dq'.format(sender)  # sender is the hostname of the worker
+        queue_name = '{0}.dq'.format(sender)  # sender is the nodename of the worker
         instance.app.amqp.queues.select_add(queue_name)
 
 Provides arguments:
@@ -308,7 +308,7 @@ Provides arguments:
 
 * instance
     This is the :class:`celery.apps.worker.Worker` instance to be initialized.
-    Note that only the :attr:`app` and :attr:`hostname` attributes have been
+    Note that only the :attr:`app` and :attr:`hostname` (nodename) attributes have been
     set so far, and the rest of ``__init__`` has not been executed.
 
 * conf
@@ -349,11 +349,11 @@ sender when you connect:
 Provides arguments:
 
 * sender
-  Hostname of the worker.
+  Nodename of the worker.
 
 * instance
     This is the :class:`celery.apps.worker.Worker` instance to be initialized.
-    Note that only the :attr:`app` and :attr:`hostname` attributes have been
+    Note that only the :attr:`app` and :attr:`hostname` (nodename) attributes have been
     set so far, and the rest of ``__init__`` has not been executed.
 
 * conf
@@ -385,6 +385,10 @@ worker_process_init
 
 Dispatched in all pool child processes when they start.
 
+Note that handlers attached to this signal must not be blocking
+for more than 4 seconds, or the process will be killed assuming
+it failed to start.
+
 .. signal:: worker_process_shutdown
 
 worker_process_shutdown

+ 2 - 3
docs/userguide/tasks.rst

@@ -997,12 +997,11 @@ will do roughly this behind the scenes:
 
 .. code-block:: python
 
-    @app.task
-    class AddTask(Task):
+    class _AddTask(app.Task):
 
         def run(self, x, y):
             return x + y
-    add = registry.tasks[AddTask.name]
+    add = app.tasks[_AddTask.name]
 
 
 Instantiation

+ 4 - 3
examples/resultgraph/tasks.py

@@ -20,7 +20,7 @@
 
 
 from celery import chord, group, task, signature, uuid
-from celery.result import AsyncResult, ResultSet
+from celery.result import AsyncResult, ResultSet, allow_join_result
 from collections import deque
 
 
@@ -79,8 +79,9 @@ def unlock_graph(result, callback,
     if result.ready():
         second_level_res = result.get()
         if second_level_res.ready():
-            signature(callback).delay(list(joinall(
-                second_level_res, propagate=propagate)))
+            with allow_join_result():
+                signature(callback).delay(list(joinall(
+                    second_level_res, propagate=propagate)))
     else:
         unlock_graph.retry(countdown=interval, max_retries=max_retries)
 

+ 17 - 12
extra/generic-init.d/celerybeat

@@ -21,9 +21,13 @@
 # abnormally in the absence of a valid process ID.
 #set -e
 
-# Setting `SCRIPT_NAME` here allows you to symlink/source this init script,
-# making it easy to run multiple processes on the system.
-SCRIPT_NAME="$(basename $0)"
+# May be a runlevel symlink (e.g. S02celeryd)
+if [ -L "$0" ]; then
+    SCRIPT_FILE=$(readlink "$0")
+else
+    SCRIPT_FILE="$0"
+fi
+SCRIPT_NAME="$(basename "$SCRIPT_FILE")"
 
 # /etc/init.d/celerybeat: start and stop the celery periodic task scheduler daemon.
 
@@ -36,8 +40,9 @@ if test -f /etc/default/${SCRIPT_NAME}; then
 fi
 
 CELERY_BIN=${CELERY_BIN:-"celery"}
-DEFAULT_PID_FILE="/var/run/celery/${SCRIPT_NAME}.pid"
-DEFAULT_LOG_FILE="/var/log/celery/${SCRIPT_NAME}.log"
+DEFAULT_USER="celery"
+DEFAULT_PID_FILE="/var/run/celery/beat.pid"
+DEFAULT_LOG_FILE="/var/log/celery/beat.log"
 DEFAULT_LOG_LEVEL="INFO"
 DEFAULT_CELERYBEAT="$CELERY_BIN beat"
 
@@ -50,6 +55,8 @@ if [ ! -z "$CELERY_APP" ]; then
     CELERY_APP_ARG="--app=$CELERY_APP"
 fi
 
+CELERYBEAT_USER=${CELERYBEAT_USER:-${CELERYD_USER:-$DEFAULT_USER}}
+
 # Set CELERY_CREATE_DIRS to always create log/pid dirs.
 CELERY_CREATE_DIRS=${CELERY_CREATE_DIRS:-0}
 CELERY_CREATE_RUNDIR=$CELERY_CREATE_DIRS
@@ -75,12 +82,6 @@ CELERYBEAT_LOG_DIR=`dirname $CELERYBEAT_LOG_FILE`
 CELERYBEAT_PID_DIR=`dirname $CELERYBEAT_PID_FILE`
 
 # Extra start-stop-daemon options, like user/group.
-if [ -n "$CELERYBEAT_USER" ]; then
-    DAEMON_OPTS="$DAEMON_OPTS --uid $CELERYBEAT_USER"
-fi
-if [ -n "$CELERYBEAT_GROUP" ]; then
-    DAEMON_OPTS="$DAEMON_OPTS --gid $CELERYBEAT_GROUP"
-fi
 
 CELERYBEAT_CHDIR=${CELERYBEAT_CHDIR:-$CELERYD_CHDIR}
 if [ -n "$CELERYBEAT_CHDIR" ]; then
@@ -174,9 +175,13 @@ stop_beat () {
     fi
 }
 
+_chuid () {
+    su "$CELERYBEAT_USER" -c "$CELERYBEAT $*"
+}
+
 start_beat () {
     echo "Starting ${SCRIPT_NAME}..."
-    $CELERYBEAT $CELERY_APP_ARG $CELERYBEAT_OPTS $DAEMON_OPTS --detach \
+    _chuid $CELERY_APP_ARG $CELERYBEAT_OPTS $DAEMON_OPTS --detach \
                 --pidfile="$CELERYBEAT_PID_FILE"
 }
 

+ 49 - 57
extra/generic-init.d/celeryd

@@ -19,26 +19,28 @@
 ### END INIT INFO
 #
 #
-# To implement separate init scripts, do NOT copy this script.  Instead,
-# symlink it.  I.e., if my new application, "little-worker" needs an init, I
+# To implement separate init scripts, copy this script and give it a different
+# name:
+# I.e., if my new application, "little-worker" needs an init, I
 # should just use:
 #
-#   ln -s /etc/init.d/celeryd /etc/init.d/little-worker
+#   cp /etc/init.d/celeryd /etc/init.d/little-worker
 #
 # You can then configure this by manipulating /etc/default/little-worker.
 #
-# If you want to have separate LSB headers in each script you can source this
-# script instead of symlinking:
-#   # ...
-#   ### END INIT INFO
-#   source /etc/init.d/celeryd
-#
-# Setting `SCRIPT_NAME` here allows you to symlink/source this init script,
-# making it easy to run multiple processes on the system.
-SCRIPT_NAME="$(basename $0)"
 
-DEFAULT_PID_FILE="/var/run/celery/${SCRIPT_NAME}/%n.pid"
-DEFAULT_LOG_FILE="/var/log/celery/${SCRIPT_NAME}/%n.log"
+
+# May be a runlevel symlink (e.g. S02celeryd)
+if [ -L "$0" ]; then
+    SCRIPT_FILE=$(readlink "$0")
+else
+    SCRIPT_FILE="$0"
+fi
+SCRIPT_NAME="$(basename "$SCRIPT_FILE")"
+
+DEFAULT_USER="celery"
+DEFAULT_PID_FILE="/var/run/celery/%n.pid"
+DEFAULT_LOG_FILE="/var/log/celery/%n.log"
 DEFAULT_LOG_LEVEL="INFO"
 DEFAULT_NODES="celery"
 DEFAULT_CELERYD="-m celery worker --detach"
@@ -53,6 +55,7 @@ if [ ! -z "$CELERY_APP" ]; then
     CELERY_APP_ARG="--app=$CELERY_APP"
 fi
 
+CELERYD_USER=${CELERYD_USER:-$DEFAULT_USER}
 
 # Set CELERY_CREATE_DIRS to always create log/pid dirs.
 CELERY_CREATE_DIRS=${CELERY_CREATE_DIRS:-0}
@@ -82,13 +85,6 @@ CELERYD_LOG_DIR=`dirname $CELERYD_LOG_FILE`
 CELERYD_PID_DIR=`dirname $CELERYD_PID_FILE`
 
 # Extra start-stop-daemon options, like user/group.
-if [ -n "$CELERYD_USER" ]; then
-    DAEMON_OPTS="$DAEMON_OPTS --uid=$CELERYD_USER"
-fi
-if [ -n "$CELERYD_GROUP" ]; then
-    DAEMON_OPTS="$DAEMON_OPTS --gid=$CELERYD_GROUP"
-fi
-
 if [ -n "$CELERYD_CHDIR" ]; then
     DAEMON_OPTS="$DAEMON_OPTS --workdir=$CELERYD_CHDIR"
 fi
@@ -148,36 +144,32 @@ create_paths() {
 export PATH="${PATH:+$PATH:}/usr/sbin:/sbin"
 
 
-_get_pid_files() {
-    [ ! -d "$CELERYD_PID_DIR" ] && return
-    echo `ls -1 "$CELERYD_PID_DIR"/*.pid 2> /dev/null`
-}
-
 _get_pids() {
-    local pid_files=
-    pid_files=`_get_pid_files`
-    [ -z "$pid_files" ] && echo "${SCRIPT_NAME} is stopped" && exit 1
+    found_pids=0
+    my_exitcode=0
 
-    for pid_file in $pid_files; do
+    for pid_file in "$CELERYD_PID_DIR"/*.pid; do
         local pid=`cat "$pid_file"`
         local cleaned_pid=`echo "$pid" | sed -e 's/[^0-9]//g'`
         if [ -z "$pid" ] || [ "$cleaned_pid" != "$pid" ]; then
             echo "bad pid file ($pid_file)"
             one_failed=true
+            my_exitcode=1
         else
+            found_pids=1
             echo "$pid"
         fi
+
+    if [ $found_pids -eq 0 ]; then
+        echo "${SCRIPT_NAME}: All nodes down"
+        exit $my_exitcode
+    fi
     done
 }
 
-_get_worker_pids() {
-    local pids=
-    pids=`_get_pids`
-    local worker_pids=
-    for pid in $pids; do
-        worker_pids=`ps h --ppid $pid -o pid`
-        [ "$worker_pids" ] && echo "$worker_pids" || one_failed=true
-    done
+
+_chuid () {
+    su "$CELERYD_USER" -c "$CELERYD_MULTI $*"
 }
 
 
@@ -185,12 +177,12 @@ start_workers () {
     if [ ! -z "$CELERYD_ULIMIT" ]; then
         ulimit $CELERYD_ULIMIT
     fi
-    $CELERYD_MULTI $* start $CELERYD_NODES $DAEMON_OPTS     \
-                         --pidfile="$CELERYD_PID_FILE"      \
-                         --logfile="$CELERYD_LOG_FILE"      \
-                         --loglevel="$CELERYD_LOG_LEVEL"    \
-                         $CELERY_APP_ARG                    \
-                         $CELERYD_OPTS
+    _chuid $* start $CELERYD_NODES $DAEMON_OPTS     \
+                 --pidfile="$CELERYD_PID_FILE"      \
+                 --logfile="$CELERYD_LOG_FILE"      \
+                 --loglevel="$CELERYD_LOG_LEVEL"    \
+                 $CELERY_APP_ARG                    \
+                 $CELERYD_OPTS
 }
 
 
@@ -200,28 +192,28 @@ dryrun () {
 
 
 stop_workers () {
-    $CELERYD_MULTI stopwait $CELERYD_NODES --pidfile="$CELERYD_PID_FILE"
+    _chuid stopwait $CELERYD_NODES --pidfile="$CELERYD_PID_FILE"
 }
 
 
 restart_workers () {
-    $CELERYD_MULTI restart $CELERYD_NODES $DAEMON_OPTS      \
-                           --pidfile="$CELERYD_PID_FILE"    \
-                           --logfile="$CELERYD_LOG_FILE"    \
-                           --loglevel="$CELERYD_LOG_LEVEL"  \
-                           $CELERY_APP_ARG                  \
-                           $CELERYD_OPTS
+    _chuid restart $CELERYD_NODES $DAEMON_OPTS      \
+                   --pidfile="$CELERYD_PID_FILE"    \
+                   --logfile="$CELERYD_LOG_FILE"    \
+                   --loglevel="$CELERYD_LOG_LEVEL"  \
+                   $CELERY_APP_ARG                  \
+                   $CELERYD_OPTS
 }
 
 
 kill_workers() {
-    $CELERYD_MULTI kill $CELERYD_NODES --pidfile="$CELERYD_PID_FILE"
+    _chuid kill $CELERYD_NODES --pidfile="$CELERYD_PID_FILE"
 }
 
 
 restart_workers_graceful () {
     local worker_pids=
-    worker_pids=`_get_worker_pids`
+    worker_pids=`_get_pids`
     [ "$one_failed" ] && exit 1
 
     for worker_pid in $worker_pids; do
@@ -240,17 +232,17 @@ restart_workers_graceful () {
 
 
 check_status () {
-    local pid_files=
-    pid_files=`_get_pid_files`
-    [ -z "$pid_files" ] && echo "${SCRIPT_NAME} not running (no pidfile)" && exit 1
+    my_exitcode=0
+    found_pids=0
 
     local one_failed=
-    for pid_file in $pid_files; do
+    for pid_file in "$CELERYD_PID_DIR"/*.pid; do
         local node=`basename "$pid_file" .pid`
         local pid=`cat "$pid_file"`
         local cleaned_pid=`echo "$pid" | sed -e 's/[^0-9]//g'`
         if [ -z "$pid" ] || [ "$cleaned_pid" != "$pid" ]; then
             echo "bad pid file ($pid_file)"
+            one_failed=true
         else
             local failed=
             kill -0 $pid 2> /dev/null || failed=true

+ 4 - 11
extra/supervisord/celerybeat.conf

@@ -2,24 +2,17 @@
 ;  celery beat supervisor example
 ; ================================
 
-; NOTE: If you're using Django, you shouldn't use this file.
-; Use
-; http://github.com/celery/django-celery/tree/master/extra/supervisord/celerybeat.conf
-; instead!
-
 [program:celerybeat]
-command=celery beat -A myapp --schedule /var/lib/celery/celerybeat-schedule --loglevel=INFO
+; Set full path to celery program if using virtualenv
+command=celery beat -A myapp --schedule /var/lib/celery/beat.db --loglevel=INFO
 
 ; remove the -A myapp argument if you are not using an app instance
 
-; Set PYTHONPATH to the directory containing app/celeryconfig.py
-environment=PYTHONPATH=/path/to/project
-
 directory=/path/to/project
 user=nobody
 numprocs=1
-stdout_logfile=/var/log/celerybeat.log
-stderr_logfile=/var/log/celerybeat.log
+stdout_logfile=/var/log/celery/beat.log
+stderr_logfile=/var/log/celery/beat.log
 autostart=true
 autorestart=true
 startsecs=10

+ 4 - 13
extra/supervisord/celeryd.conf

@@ -2,24 +2,15 @@
 ;  celery worker supervisor example
 ; ==================================
 
-; NOTE: If you're using Django, you shouldn't use this file.
-; Use
-; http://github.com/celery/django-celery/tree/master/extra/supervisord/celeryd.conf
-; instead!
-
 [program:celery]
-command=celery worker -A proj.tasks:app --loglevel=INFO
-
-; remove -A argument if you don't use a celery app instance.
-
-; Set PYTHONPATH to the directory containing app/celeryconfig.py
-environment=PYTHONPATH=/path/to/project
+; Set full path to celery program if using virtualenv
+command=celery worker -A proj ---loglevel=INFO
 
 directory=/path/to/project
 user=nobody
 numprocs=1
-stdout_logfile=/var/log/celeryd.log
-stderr_logfile=/var/log/celeryd.log
+stdout_logfile=/var/log/celery/worker.log
+stderr_logfile=/var/log/celery/worker.log
 autostart=true
 autorestart=true
 startsecs=10

+ 6 - 0
funtests/stress/stress/templates.py

@@ -94,3 +94,9 @@ class pickle(default):
 class confirms(default):
     BROKER_URL = 'pyamqp://'
     BROKER_TRANSPORT_OPTIONS = {'confirm_publish': True}
+
+
+@template()
+class events(default):
+    CELERY_SEND_EVENTS = True
+    CELERY_SEND_TASK_SENT_EVENT = True

+ 2 - 2
requirements/default.txt

@@ -1,3 +1,3 @@
 pytz>dev
-billiard>=3.3.0.10,<3.4
-kombu>=3.0.7,<4.0
+billiard>=3.3.0.13,<3.4
+kombu>=3.0.8,<4.0

+ 0 - 1
requirements/pkgutils.txt

@@ -5,4 +5,3 @@ flake8
 flakeplus
 tox
 Sphinx-PyPI-upload
-bundle>=1.1.0

+ 2 - 2
setup.cfg

@@ -15,5 +15,5 @@ upload-dir = docs/.build/html
 
 [bdist_rpm]
 requires = pytz >= 2011b
-           billiard >= 3.3.0.10
-           kombu >= 3.0.7
+           billiard >= 3.3.0.13
+           kombu >= 3.0.8