Explorar o código

Merge remote-tracking branch 'refs/remotes/celery/master'

Conflicts:
	celery/backends/new_cassandra.py
	celery/tests/backends/test_new_cassandra.py
Piotr Maślanka %!s(int64=9) %!d(string=hai) anos
pai
achega
2e875601ac
Modificáronse 50 ficheiros con 1539 adicións e 1036 borrados
  1. 3 3
      CONTRIBUTING.rst
  2. 1 4
      README.rst
  3. 1 1
      celery/__init__.py
  4. 2 2
      celery/app/amqp.py
  5. 13 1
      celery/app/base.py
  6. 5 14
      celery/app/builtins.py
  7. 9 9
      celery/app/defaults.py
  8. 2 3
      celery/app/task.py
  9. 19 8
      celery/app/trace.py
  10. 0 1
      celery/backends/__init__.py
  11. 3 5
      celery/backends/base.py
  12. 172 135
      celery/backends/cassandra.py
  13. 0 229
      celery/backends/new_cassandra.py
  14. 1 1
      celery/bin/worker.py
  15. 110 72
      celery/canvas.py
  16. 15 5
      celery/contrib/rdb.py
  17. 18 9
      celery/events/state.py
  18. 3 1
      celery/result.py
  19. 6 0
      celery/signals.py
  20. 59 5
      celery/tests/app/test_builtins.py
  21. 3 10
      celery/tests/backends/test_base.py
  22. 101 117
      celery/tests/backends/test_cassandra.py
  23. 0 174
      celery/tests/backends/test_new_cassandra.py
  24. 4 4
      celery/tests/case.py
  25. 39 34
      celery/tests/contrib/test_rdb.py
  26. 2 0
      celery/tests/events/test_state.py
  27. 36 2
      celery/tests/tasks/test_canvas.py
  28. 2 2
      celery/tests/tasks/test_tasks.py
  29. 1 1
      celery/worker/__init__.py
  30. 14 2
      celery/worker/consumer.py
  31. 5 3
      celery/worker/request.py
  32. 1 0
      celery/worker/strategy.py
  33. 160 17
      docs/configuration.rst
  34. BIN=BIN
      docs/images/worker_graph_full.png
  35. 0 3
      docs/includes/installation.txt
  36. 1 1
      docs/includes/introduction.txt
  37. 0 100
      docs/internals/deprecation.rst
  38. 0 11
      docs/internals/reference/celery.backends.new_cassandra.rst
  39. 0 1
      docs/internals/reference/index.rst
  40. 1 1
      docs/userguide/extending.rst
  41. 2 2
      docs/userguide/monitoring.rst
  42. 47 0
      docs/userguide/signals.rst
  43. 558 34
      docs/whatsnew-4.0.rst
  44. 1 1
      extra/generic-init.d/celeryd
  45. 10 0
      funtests/stress/stress/app.py
  46. 107 4
      funtests/stress/stress/suite.py
  47. 1 1
      funtests/stress/stress/templates.py
  48. 1 1
      requirements/extras/cassandra.txt
  49. 0 1
      requirements/extras/new_cassandra.txt
  50. 0 1
      setup.py

+ 3 - 3
CONTRIBUTING.rst

@@ -538,7 +538,7 @@ If you only want to test specific Python versions use the ``-e``
 option:
 ::
 
-    $ tox -e py26
+    $ tox -e 2.7
 
 Building the documentation
 --------------------------
@@ -586,7 +586,7 @@ To not return a negative exit code when this command fails use
 the ``flakes`` target instead:
 ::
 
-    $ make flakes
+    $ make flakes§
 
 API reference
 ~~~~~~~~~~~~~
@@ -619,7 +619,7 @@ Edit the file using your favorite editor:
 
     $ vim celery.worker.awesome.rst
 
-        # change every occurance of ``celery.schedules`` to
+        # change every occurrence of ``celery.schedules`` to
         # ``celery.worker.awesome``
 
 

+ 1 - 4
README.rst

@@ -4,7 +4,7 @@
 
 .. image:: http://cloud.github.com/downloads/celery/celery/celery_128.png
 
-:Version: 4.0.0b1 (0today8)
+:Version: 4.0.0rc1 (0today8)
 :Web: http://celeryproject.org/
 :Download: http://pypi.python.org/pypi/celery/
 :Source: http://github.com/celery/celery/
@@ -284,9 +284,6 @@ Transports and Backends
     for using memcached as a result backend.
 
 :celery[cassandra]:
-    for using Apache Cassandra as a result backend with pycassa driver.
-
-:celery[new_cassandra]:
     for using Apache Cassandra as a result backend with DataStax driver.
 
 :celery[couchdb]:

+ 1 - 1
celery/__init__.py

@@ -18,7 +18,7 @@ version_info_t = namedtuple(
 )
 
 SERIES = '0today8'
-VERSION = version_info_t(4, 0, 0, 'b1', '')
+VERSION = version_info_t(4, 0, 0, 'rc1', '')
 
 __version__ = '{0.major}.{0.minor}.{0.micro}{0.releaselevel}'.format(VERSION)
 __author__ = 'Ask Solem'

+ 2 - 2
celery/app/amqp.py

@@ -360,8 +360,8 @@ class AMQP(object):
             ),
             sent_event={
                 'uuid': task_id,
-                'root': root_id,
-                'parent': parent_id,
+                'root_id': root_id,
+                'parent_id': parent_id,
                 'name': name,
                 'args': argsrepr,
                 'kwargs': kwargsrepr,

+ 13 - 1
celery/app/base.py

@@ -622,6 +622,7 @@ class Celery(object):
         Otherwise supports the same arguments as :meth:`@-Task.apply_async`.
 
         """
+        parent = have_parent = None
         amqp = self.amqp
         task_id = task_id or uuid()
         producer = producer or publisher  # XXX compat
@@ -633,6 +634,16 @@ class Celery(object):
             ), stacklevel=2)
         options = router.route(options, route_name or name, args, kwargs)
 
+        if root_id is None:
+            parent, have_parent = get_current_worker_task(), True
+            if parent:
+                root_id = parent.request.root_id or parent.request.id
+        if parent_id is None:
+            if not have_parent:
+                parent, have_parent = get_current_worker_task(), True
+            if parent:
+                parent_id = parent.request.id
+
         message = amqp.create_task_message(
             task_id, name, args, kwargs, countdown, eta, group_id,
             expires, retries, chord,
@@ -649,7 +660,8 @@ class Celery(object):
             amqp.send_task_message(P, name, message, **options)
         result = (result_cls or self.AsyncResult)(task_id)
         if add_to_parent:
-            parent = get_current_worker_task()
+            if not have_parent:
+                parent, have_parent = get_current_worker_task(), True
             if parent:
                 parent.add_trail(result)
         return result

+ 5 - 14
celery/app/builtins.py

@@ -54,20 +54,12 @@ def add_unlock_chord_task(app):
     from celery.exceptions import ChordError
     from celery.result import allow_join_result, result_from_tuple
 
-    default_propagate = app.conf.chord_propagates
-
     @app.task(name='celery.chord_unlock', max_retries=None, shared=False,
               default_retry_delay=1, ignore_result=True, lazy=False, bind=True)
-    def unlock_chord(self, group_id, callback, interval=None, propagate=None,
+    def unlock_chord(self, group_id, callback, interval=None,
                      max_retries=None, result=None,
                      Result=app.AsyncResult, GroupResult=app.GroupResult,
-                     result_from_tuple=result_from_tuple):
-        # if propagate is disabled exceptions raised by chord tasks
-        # will be sent as part of the result list to the chord callback.
-        # Since 3.1 propagate will be enabled by default, and instead
-        # the chord callback changes state to FAILURE with the
-        # exception set to ChordError.
-        propagate = default_propagate if propagate is None else propagate
+                     result_from_tuple=result_from_tuple, **kwargs):
         if interval is None:
             interval = self.default_retry_delay
 
@@ -93,7 +85,7 @@ def add_unlock_chord_task(app):
         callback = maybe_signature(callback, app=app)
         try:
             with allow_join_result():
-                ret = j(timeout=3.0, propagate=propagate)
+                ret = j(timeout=3.0, propagate=True)
         except Exception as exc:
             try:
                 culprit = next(deps._failed_join_report())
@@ -191,8 +183,7 @@ def add_chord_task(app):
     @app.task(name='celery.chord', bind=True, ignore_result=False,
               shared=False, lazy=False)
     def chord(self, header, body, partial_args=(), interval=None,
-              countdown=1, max_retries=None, propagate=None,
-              eager=False, **kwargs):
+              countdown=1, max_retries=None, eager=False, **kwargs):
         app = self.app
         # - convert back to group if serialized
         tasks = header.tasks if isinstance(header, group) else header
@@ -202,5 +193,5 @@ def add_chord_task(app):
         body = maybe_signature(body, app=app)
         ch = _chord(header, body)
         return ch.run(header, body, partial_args, app, interval,
-                      countdown, max_retries, propagate, **kwargs)
+                      countdown, max_retries, **kwargs)
     return chord

+ 9 - 9
celery/app/defaults.py

@@ -123,19 +123,14 @@ NAMESPACES = Namespace(
         backend_options=Option({}, type='dict'),
     ),
     cassandra=Namespace(
-        column_family=Option(type='string'),
         entry_ttl=Option(type="float"),
         keyspace=Option(type='string'),
         port=Option(type="string"),
         read_consistency=Option(type='string'),
         servers=Option(type='list'),
+        table=Option(type='string'),
         write_consistency=Option(type='string'),
     ),
-    chord=Namespace(
-        __old__=old_ns('celery_chord'),
-
-        propagates=Option(True, type='bool'),
-    ),
     couchbase=Namespace(
         __old__=old_ns('celery_couchbase'),
 
@@ -171,6 +166,7 @@ NAMESPACES = Namespace(
         max_connections=Option(type='int'),
         password=Option(type='string'),
         port=Option(type='int'),
+        socket_timeout=Option(5.0, type='float'),
     ),
     result=Namespace(
         __old__=old_ns('celery_result'),
@@ -225,7 +221,9 @@ NAMESPACES = Namespace(
         default_queue=Option('celery'),
         default_rate_limit=Option(type='string'),
         default_routing_key=Option('celery'),
-        eager_propagates_exceptions=Option(False, type='bool'),
+        eager_propagates=Option(
+            False, type='bool', old={'celery_eager_propagates_exceptions'},
+        ),
         ignore_result=Option(False, type='bool'),
         protocol=Option(1, type='int', old={'celery_task_protocol'}),
         publish_retry=Option(
@@ -277,7 +275,7 @@ NAMESPACES = Namespace(
         hijack_root_logger=Option(True, type='bool'),
         log_color=Option(type='bool'),
         log_format=Option(DEFAULT_PROCESS_LOG_FMT),
-        lost_wait=Option(10.0, type='float'),
+        lost_wait=Option(10.0, type='float', old={'celeryd_worker_lost_wait'}),
         max_memory_per_child=Option(type='int'),
         max_tasks_per_child=Option(type='int'),
         pool=Option(DEFAULT_POOL),
@@ -290,7 +288,9 @@ NAMESPACES = Namespace(
         redirect_stdouts_level=Option(
             'WARNING', old={'celery_redirect_stdouts_level'},
         ),
-        send_events=Option(False, type='bool'),
+        send_task_events=Option(
+            False, type='bool', old={'celeryd_send_events'},
+        ),
         state_db=Option(),
         task_log_format=Option(DEFAULT_TASK_LOG_FMT),
         timer=Option(type='string'),

+ 2 - 3
celery/app/task.py

@@ -667,7 +667,7 @@ class Task(object):
         :param args: positional arguments passed on to the task.
         :param kwargs: keyword arguments passed on to the task.
         :keyword throw: Re-raise task exceptions.  Defaults to
-                        the :setting:`task_eager_propagates_exceptions`
+                        the :setting:`task_eager_propagates`
                         setting.
 
         :rtype :class:`celery.result.EagerResult`:
@@ -684,8 +684,7 @@ class Task(object):
         kwargs = kwargs or {}
         task_id = options.get('task_id') or uuid()
         retries = options.get('retries', 0)
-        throw = app.either('task_eager_propagates_exceptions',
-                           options.pop('throw', None))
+        throw = app.either('task_eager_propagates', options.pop('throw', None))
 
         # Make sure we get the task instance, not class.
         task = app._tasks[self.name]

+ 19 - 8
celery/app/trace.py

@@ -306,10 +306,11 @@ def build_tracer(name, task, loader=None, hostname=None, store_errors=True,
         I = Info(state, exc)
         R = I.handle_error_state(task, request, eager=eager)
         if call_errbacks:
+            root_id = request.root_id or uuid
             group(
                 [signature(errback, app=app)
                  for errback in request.errbacks or []], app=app,
-            ).apply_async((uuid,))
+            ).apply_async((uuid,), parent_id=uuid, root_id=root_id)
         return I, R, I.state, I.retval
 
     def trace_task(uuid, args, kwargs, request=None):
@@ -336,6 +337,7 @@ def build_tracer(name, task, loader=None, hostname=None, store_errors=True,
             push_task(task)
             task_request = Context(request or {}, args=args,
                                    called_directly=False, kwargs=kwargs)
+            root_id = task_request.root_id or uuid
             push_request(task_request)
             try:
                 # -*- PRE -*-
@@ -363,8 +365,7 @@ def build_tracer(name, task, loader=None, hostname=None, store_errors=True,
                     I.handle_ignore(task, task_request)
                 except Retry as exc:
                     I, R, state, retval = on_error(
-                        task_request, exc, uuid, RETRY, call_errbacks=False,
-                    )
+                        task_request, exc, uuid, RETRY, call_errbacks=False)
                 except Exception as exc:
                     I, R, state, retval = on_error(task_request, exc, uuid)
                 except BaseException as exc:
@@ -389,17 +390,27 @@ def build_tracer(name, task, loader=None, hostname=None, store_errors=True,
                                     else:
                                         sigs.append(sig)
                                 for group_ in groups:
-                                    group.apply_async((retval,))
+                                    group.apply_async(
+                                        (retval,),
+                                        parent_id=uuid, root_id=root_id,
+                                    )
                                 if sigs:
-                                    group(sigs).apply_async((retval,))
+                                    group(sigs).apply_async(
+                                        (retval,),
+                                        parent_id=uuid, root_id=root_id,
+                                    )
                             else:
-                                signature(callbacks[0], app=app).delay(retval)
+                                signature(callbacks[0], app=app).apply_async(
+                                    (retval,), parent_id=uuid, root_id=root_id,
+                                )
 
                         # execute first task in chain
-                        chain = task.request.chain
+                        chain = task_request.chain
                         if chain:
                             signature(chain.pop(), app=app).apply_async(
-                                    (retval,), chain=chain)
+                                (retval,), chain=chain,
+                                parent_id=uuid, root_id=root_id,
+                            )
                         mark_as_done(
                             uuid, retval, task_request, publish_result,
                         )

+ 0 - 1
celery/backends/__init__.py

@@ -30,7 +30,6 @@ BACKEND_ALIASES = {
     'db': 'celery.backends.database:DatabaseBackend',
     'database': 'celery.backends.database:DatabaseBackend',
     'cassandra': 'celery.backends.cassandra:CassandraBackend',
-    'new_cassandra': 'celery.backends.new_cassandra:CassandraBackend',
     'couchbase': 'celery.backends.couchbase:CouchBaseBackend',
     'couchdb': 'celery.backends.couchdb:CouchDBBackend',
     'riak': 'celery.backends.riak:RiakBackend',

+ 3 - 5
celery/backends/base.py

@@ -359,7 +359,7 @@ class BaseBackend(object):
     def add_to_chord(self, chord_id, result):
         raise NotImplementedError('Backend does not support add_to_chord')
 
-    def on_chord_part_return(self, request, state, result, propagate=False):
+    def on_chord_part_return(self, request, state, result, **kwargs):
         pass
 
     def fallback_chord_unlock(self, group_id, body, result=None,
@@ -553,12 +553,10 @@ class KeyValueStoreBackend(BaseBackend):
 
         return header(*partial_args, task_id=group_id, **fixed_options or {})
 
-    def on_chord_part_return(self, request, state, result, propagate=None):
+    def on_chord_part_return(self, request, state, result, **kwargs):
         if not self.implements_incr:
             return
         app = self.app
-        if propagate is None:
-            propagate = app.conf.chord_propagates
         gid = request.group
         if not gid:
             return
@@ -593,7 +591,7 @@ class KeyValueStoreBackend(BaseBackend):
             j = deps.join_native if deps.supports_native_join else deps.join
             try:
                 with allow_join_result():
-                    ret = j(timeout=3.0, propagate=propagate)
+                    ret = j(timeout=3.0, propagate=True)
             except Exception as exc:
                 try:
                     culprit = next(deps._failed_join_report())

+ 172 - 135
celery/backends/cassandra.py

@@ -1,61 +1,85 @@
 # -* coding: utf-8 -*-
 """
-    celery.backends.cassandra
-    ~~~~~~~~~~~~~~~~~~~~~~~~~
+    celery.backends.new_cassandra
+    ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 
-    Apache Cassandra result store backend.
+    Apache Cassandra result store backend using DataStax driver
 
 """
 from __future__ import absolute_import
 
+import sys
 try:  # pragma: no cover
-    import pycassa
-    from thrift import Thrift
-    C = pycassa.cassandra.ttypes
+    import cassandra
+    import cassandra.cluster
 except ImportError:  # pragma: no cover
-    pycassa = None   # noqa
-
-import socket
-import time
+    cassandra = None   # noqa
 
 from celery import states
 from celery.exceptions import ImproperlyConfigured
-from celery.five import monotonic
-from celery.utils import deprecated
 from celery.utils.log import get_logger
-
 from .base import BaseBackend
 
 __all__ = ['CassandraBackend']
 
 logger = get_logger(__name__)
 
+E_NO_CASSANDRA = """
+You need to install the cassandra-driver library to
+use the Cassandra backend. See https://github.com/datastax/python-driver
+"""
 
-class CassandraBackend(BaseBackend):
-    """Highly fault tolerant Cassandra backend.
+Q_INSERT_RESULT = """
+INSERT INTO {table} (
+    task_id, status, result, date_done, traceback, children) VALUES (
+        %s, %s, %s, %s, %s, %s) {expires};
+"""
+
+Q_SELECT_RESULT = """
+SELECT status, result, date_done, traceback, children
+FROM {table}
+WHERE task_id=%s
+LIMIT 1
+"""
+
+Q_CREATE_RESULT_TABLE = """
+CREATE TABLE {table} (
+    task_id text,
+    status text,
+    result blob,
+    date_done timestamp,
+    traceback blob,
+    children blob,
+    PRIMARY KEY ((task_id), date_done)
+) WITH CLUSTERING ORDER BY (date_done DESC);
+"""
 
-    .. attribute:: servers
+Q_EXPIRES = """
+    USING TTL {0}
+"""
+
+if sys.version_info[0] == 3:
+    def buf_t(x):
+        return bytes(x, 'utf8')
+else:
+    buf_t = buffer  # noqa
 
-        List of Cassandra servers with format: ``hostname:port``.
+
+class CassandraBackend(BaseBackend):
+    """Cassandra backend utilizing DataStax driver
 
     :raises celery.exceptions.ImproperlyConfigured: if
-        module :mod:`pycassa` is not available.
+        module :mod:`cassandra` is not available.
 
     """
-    servers = []
-    keyspace = None
-    column_family = None
-    detailed_mode = False
-    _retry_timeout = 300
-    _retry_wait = 3
-    supports_autoexpire = True
-
-    @deprecated(description='The old cassandra backend',
-                deprecation='4.0',
-                removal='5.0',
-                alternative='Use the `new_cassandra` result backend instead')
-    def __init__(self, servers=None, keyspace=None, column_family=None,
-                 cassandra_options=None, detailed_mode=False, **kwargs):
+
+    #: List of Cassandra servers with format: ``hostname``.
+    servers = None
+
+    supports_autoexpire = True      # autoexpire supported via entry_ttl
+
+    def __init__(self, servers=None, keyspace=None, table=None, entry_ttl=None,
+                 port=9042, **kwargs):
         """Initialize Cassandra backend.
 
         Raises :class:`celery.exceptions.ImproperlyConfigured` if
@@ -64,129 +88,142 @@ class CassandraBackend(BaseBackend):
         """
         super(CassandraBackend, self).__init__(**kwargs)
 
-        if not pycassa:
-            raise ImproperlyConfigured(
-                'You need to install the pycassa library to use the '
-                'Cassandra backend. See https://github.com/pycassa/pycassa')
+        if not cassandra:
+            raise ImproperlyConfigured(E_NO_CASSANDRA)
 
         conf = self.app.conf
         self.servers = (servers or
-                        conf.get('cassandra_servers') or
-                        self.servers)
+                        conf.get('cassandra_servers', None))
+        self.port = (port or
+                     conf.get('cassandra_port', None))
         self.keyspace = (keyspace or
-                         conf.get('cassandra_keyspace') or
-                         self.keyspace)
-        self.column_family = (column_family or
-                              conf.get('cassandra_column_family') or
-                              self.column_family)
-        self.cassandra_options = dict(conf.get('cassandra_options') or {},
-                                      **cassandra_options or {})
-        self.detailed_mode = (detailed_mode or
-                              conf.get('cassandra_detailed_mode') or
-                              self.detailed_mode)
+                         conf.get('cassandra_keyspace', None))
+        self.table = (table or
+                      conf.get('cassandra_table', None))
+
+        if not self.servers or not self.keyspace or not self.table:
+            raise ImproperlyConfigured('Cassandra backend not configured.')
+
+        expires = (entry_ttl or conf.get('cassandra_entry_ttl', None))
+
+        self.cqlexpires = (Q_EXPIRES.format(expires)
+                           if expires is not None else '')
+
         read_cons = conf.get('cassandra_read_consistency') or 'LOCAL_QUORUM'
         write_cons = conf.get('cassandra_write_consistency') or 'LOCAL_QUORUM'
-        try:
-            self.read_consistency = getattr(pycassa.ConsistencyLevel,
-                                            read_cons)
-        except AttributeError:
-            self.read_consistency = pycassa.ConsistencyLevel.LOCAL_QUORUM
-        try:
-            self.write_consistency = getattr(pycassa.ConsistencyLevel,
-                                             write_cons)
-        except AttributeError:
-            self.write_consistency = pycassa.ConsistencyLevel.LOCAL_QUORUM
-
-        if not self.servers or not self.keyspace or not self.column_family:
-            raise ImproperlyConfigured(
-                'Cassandra backend not configured.')
-
-        self._column_family = None
-
-    def _retry_on_error(self, fun, *args, **kwargs):
-        ts = monotonic() + self._retry_timeout
-        while 1:
-            try:
-                return fun(*args, **kwargs)
-            except (pycassa.InvalidRequestException,
-                    pycassa.TimedOutException,
-                    pycassa.UnavailableException,
-                    pycassa.AllServersUnavailable,
-                    socket.error,
-                    socket.timeout,
-                    Thrift.TException) as exc:
-                if monotonic() > ts:
-                    raise
-                logger.warning('Cassandra error: %r. Retrying...', exc)
-                time.sleep(self._retry_wait)
-
-    def _get_column_family(self):
-        if self._column_family is None:
-            conn = pycassa.ConnectionPool(self.keyspace,
-                                          server_list=self.servers,
-                                          **self.cassandra_options)
-            self._column_family = pycassa.ColumnFamily(
-                conn, self.column_family,
-                read_consistency_level=self.read_consistency,
-                write_consistency_level=self.write_consistency,
-            )
-        return self._column_family
+
+        self.read_consistency = getattr(
+            cassandra.ConsistencyLevel, read_cons,
+            cassandra.ConsistencyLevel.LOCAL_QUORUM,
+        )
+        self.write_consistency = getattr(
+            cassandra.ConsistencyLevel, write_cons,
+            cassandra.ConsistencyLevel.LOCAL_QUORUM,
+        )
+
+        self._connection = None
+        self._session = None
+        self._write_stmt = None
+        self._read_stmt = None
+        self._make_stmt = None
 
     def process_cleanup(self):
-        if self._column_family is not None:
-            self._column_family = None
+        if self._connection is not None:
+            self._connection.shutdown() # also shuts down _session
+
+        self._connection = None
+        self._session = None
+
+    def _get_connection(self, write=False):
+        """Prepare the connection for action
+
+        :param write: bool - are we a writer?
+
+        """
+        if self._connection is None:
+            try:
+                self._connection = cassandra.cluster.Cluster(self.servers,
+                                                             port=self.port)
+                self._session = self._connection.connect(self.keyspace)
+
+                # We are forced to do concatenation below, as formatting would
+                # blow up on superficial %s that will be processed by Cassandra
+                self._write_stmt = cassandra.query.SimpleStatement(
+                    Q_INSERT_RESULT.format(
+                        table=self.table, expires=self.cqlexpires),
+                )
+                self._write_stmt.consistency_level = self.write_consistency
+
+                self._read_stmt = cassandra.query.SimpleStatement(
+                    Q_SELECT_RESULT.format(table=self.table),
+                )
+                self._read_stmt.consistency_level = self.read_consistency
+
+                if write:
+                    # Only possible writers "workers" are allowed to issue
+                    # CREATE TABLE. This is to prevent conflicting situations
+                    # where both task-creator and task-executor would issue it
+                    # at the same time.
+
+                    # Anyway; if you're doing anything critical, you should
+                    # have created this table in advance, in which case
+                    # this query will be a no-op (AlreadyExists)
+                    self._make_stmt = cassandra.query.SimpleStatement(
+                        Q_CREATE_RESULT_TABLE.format(table=self.table),
+                    )
+                    self._make_stmt.consistency_level = self.write_consistency
+
+                    try:
+                        self._session.execute(self._make_stmt)
+                    except cassandra.AlreadyExists:
+                        pass
+
+            except cassandra.OperationTimedOut:
+                # a heavily loaded or gone Cassandra cluster failed to respond.
+                # leave this class in a consistent state
+                if self._connection is not None:
+                    self._connection.shutdown()     # also shuts down _session
+
+                self._connection = None
+                self._session = None
+                raise   # we did fail after all - reraise
 
     def _store_result(self, task_id, result, status,
                       traceback=None, request=None, **kwargs):
         """Store return value and status of an executed task."""
+        self._get_connection(write=True)
 
-        def _do_store():
-            cf = self._get_column_family()
-            date_done = self.app.now()
-            meta = {'status': status,
-                    'date_done': date_done.strftime('%Y-%m-%dT%H:%M:%SZ'),
-                    'traceback': self.encode(traceback),
-                    'result': self.encode(result),
-                    'children': self.encode(
-                        self.current_task_children(request),
-                    )}
-            if self.detailed_mode:
-                cf.insert(
-                    task_id, {date_done: self.encode(meta)}, ttl=self.expires,
-                )
-            else:
-                cf.insert(task_id, meta, ttl=self.expires)
-
-        return self._retry_on_error(_do_store)
+        self._session.execute(self._write_stmt, (
+            task_id,
+            status,
+            buf_t(self.encode(result)),
+            self.app.now(),
+            buf_t(self.encode(traceback)),
+            buf_t(self.encode(self.current_task_children(request)))
+        ))
 
     def _get_task_meta_for(self, task_id):
         """Get task metadata for a task by id."""
+        self._get_connection()
 
-        def _do_get():
-            cf = self._get_column_family()
-            try:
-                if self.detailed_mode:
-                    row = cf.get(task_id, column_reversed=True, column_count=1)
-                    return self.decode(list(row.values())[0])
-                else:
-                    obj = cf.get(task_id)
-                    return self.meta_from_decoded({
-                        'task_id': task_id,
-                        'status': obj['status'],
-                        'result': self.decode(obj['result']),
-                        'date_done': obj['date_done'],
-                        'traceback': self.decode(obj['traceback']),
-                        'children': self.decode(obj['children']),
-                    })
-            except (KeyError, pycassa.NotFoundException):
-                return {'status': states.PENDING, 'result': None}
-
-        return self._retry_on_error(_do_get)
+        res = self._session.execute(self._read_stmt, (task_id, ))
+        if not res:
+            return {'status': states.PENDING, 'result': None}
+
+        status, result, date_done, traceback, children = res[0]
+
+        return self.meta_from_decoded({
+            'task_id': task_id,
+            'status': status,
+            'result': self.decode(result),
+            'date_done': date_done.strftime('%Y-%m-%dT%H:%M:%SZ'),
+            'traceback': self.decode(traceback),
+            'children': self.decode(children),
+        })
 
     def __reduce__(self, args=(), kwargs={}):
         kwargs.update(
             dict(servers=self.servers,
                  keyspace=self.keyspace,
-                 column_family=self.column_family,
-                 cassandra_options=self.cassandra_options))
+                 table=self.table))
         return super(CassandraBackend, self).__reduce__(args, kwargs)

+ 0 - 229
celery/backends/new_cassandra.py

@@ -1,229 +0,0 @@
-# -* coding: utf-8 -*-
-"""
-    celery.backends.new_cassandra
-    ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-
-    Apache Cassandra result store backend using DataStax driver
-
-"""
-from __future__ import absolute_import
-
-import sys
-try:  # pragma: no cover
-    import cassandra
-    import cassandra.cluster
-except ImportError:  # pragma: no cover
-    cassandra = None   # noqa
-
-from celery import states
-from celery.exceptions import ImproperlyConfigured
-from celery.utils.log import get_logger
-from .base import BaseBackend
-
-__all__ = ['CassandraBackend']
-
-logger = get_logger(__name__)
-
-E_NO_CASSANDRA = """
-You need to install the cassandra-driver library to
-use the Cassandra backend. See https://github.com/datastax/python-driver
-"""
-
-Q_INSERT_RESULT = """
-INSERT INTO {table} (
-    task_id, status, result, date_done, traceback, children) VALUES (
-        %s, %s, %s, %s, %s, %s) {expires};
-"""
-
-Q_SELECT_RESULT = """
-SELECT status, result, date_done, traceback, children
-FROM {table}
-WHERE task_id=%s
-LIMIT 1
-"""
-
-Q_CREATE_RESULT_TABLE = """
-CREATE TABLE {table} (
-    task_id text,
-    status text,
-    result blob,
-    date_done timestamp,
-    traceback blob,
-    children blob,
-    PRIMARY KEY ((task_id), date_done)
-) WITH CLUSTERING ORDER BY (date_done DESC);
-"""
-
-Q_EXPIRES = """
-    USING TTL {0}
-"""
-
-if sys.version_info[0] == 3:
-    def buf_t(x):
-        return bytes(x, 'utf8')
-else:
-    buf_t = buffer  # noqa
-
-
-class CassandraBackend(BaseBackend):
-    """Cassandra backend utilizing DataStax driver
-
-    :raises celery.exceptions.ImproperlyConfigured: if
-        module :mod:`cassandra` is not available.
-
-    """
-
-    #: List of Cassandra servers with format: ``hostname``.
-    servers = None
-
-    supports_autoexpire = True      # autoexpire supported via entry_ttl
-
-    def __init__(self, servers=None, keyspace=None, table=None, entry_ttl=None,
-                 port=9042, **kwargs):
-        """Initialize Cassandra backend.
-
-        Raises :class:`celery.exceptions.ImproperlyConfigured` if
-        the :setting:`cassandra_servers` setting is not set.
-
-        """
-        super(CassandraBackend, self).__init__(**kwargs)
-
-        if not cassandra:
-            raise ImproperlyConfigured(E_NO_CASSANDRA)
-
-        conf = self.app.conf
-        self.servers = (servers or
-                        conf.get('cassandra_servers', None))
-        self.port = (port or
-                     conf.get('cassandra_port', None))
-        self.keyspace = (keyspace or
-                         conf.get('cassandra_keyspace', None))
-        self.table = (table or
-                      conf.get('cassandra_table', None))
-
-        if not self.servers or not self.keyspace or not self.table:
-            raise ImproperlyConfigured('Cassandra backend not configured.')
-
-        expires = (entry_ttl or conf.get('cassandra_entry_ttl', None))
-
-        self.cqlexpires = (Q_EXPIRES.format(expires)
-                           if expires is not None else '')
-
-        read_cons = conf.get('cassandra_read_consistency') or 'LOCAL_QUORUM'
-        write_cons = conf.get('cassandra_write_consistency') or 'LOCAL_QUORUM'
-
-        self.read_consistency = getattr(
-            cassandra.ConsistencyLevel, read_cons,
-            cassandra.ConsistencyLevel.LOCAL_QUORUM,
-        )
-        self.write_consistency = getattr(
-            cassandra.ConsistencyLevel, write_cons,
-            cassandra.ConsistencyLevel.LOCAL_QUORUM,
-        )
-
-        self._connection = None
-        self._session = None
-        self._write_stmt = None
-        self._read_stmt = None
-        self._make_stmt = None
-
-    def process_cleanup(self):
-        if self._connection is not None:
-            self._connection.shutdown() # also shuts down _session
-
-        self._connection = None
-        self._session = None
-
-    def _get_connection(self, write=False):
-        """Prepare the connection for action
-
-        :param write: bool - are we a writer?
-
-        """
-        if self._connection is None:
-            try:
-                self._connection = cassandra.cluster.Cluster(self.servers,
-                                                             port=self.port)
-                self._session = self._connection.connect(self.keyspace)
-
-                # We are forced to do concatenation below, as formatting would
-                # blow up on superficial %s that will be processed by Cassandra
-                self._write_stmt = cassandra.query.SimpleStatement(
-                    Q_INSERT_RESULT.format(
-                        table=self.table, expires=self.cqlexpires),
-                )
-                self._write_stmt.consistency_level = self.write_consistency
-
-                self._read_stmt = cassandra.query.SimpleStatement(
-                    Q_SELECT_RESULT.format(table=self.table),
-                )
-                self._read_stmt.consistency_level = self.read_consistency
-
-                if write:
-                    # Only possible writers "workers" are allowed to issue
-                    # CREATE TABLE. This is to prevent conflicting situations
-                    # where both task-creator and task-executor would issue it
-                    # at the same time.
-
-                    # Anyway; if you're doing anything critical, you should
-                    # have created this table in advance, in which case
-                    # this query will be a no-op (AlreadyExists)
-                    self._make_stmt = cassandra.query.SimpleStatement(
-                        Q_CREATE_RESULT_TABLE.format(table=self.table),
-                    )
-                    self._make_stmt.consistency_level = self.write_consistency
-
-                    try:
-                        self._session.execute(self._make_stmt)
-                    except cassandra.AlreadyExists:
-                        pass
-
-            except cassandra.OperationTimedOut:
-                # a heavily loaded or gone Cassandra cluster failed to respond.
-                # leave this class in a consistent state
-                if self._connection is not None:
-                    self._connection.shutdown()     # also shuts down _session
-
-                self._connection = None
-                self._session = None
-                raise   # we did fail after all - reraise
-
-    def _store_result(self, task_id, result, status,
-                      traceback=None, request=None, **kwargs):
-        """Store return value and status of an executed task."""
-        self._get_connection(write=True)
-
-        self._session.execute(self._write_stmt, (
-            task_id,
-            status,
-            buf_t(self.encode(result)),
-            self.app.now(),
-            buf_t(self.encode(traceback)),
-            buf_t(self.encode(self.current_task_children(request)))
-        ))
-
-    def _get_task_meta_for(self, task_id):
-        """Get task metadata for a task by id."""
-        self._get_connection()
-
-        res = self._session.execute(self._read_stmt, (task_id, ))
-        if not res:
-            return {'status': states.PENDING, 'result': None}
-
-        status, result, date_done, traceback, children = res[0]
-
-        return self.meta_from_decoded({
-            'task_id': task_id,
-            'status': status,
-            'result': self.decode(result),
-            'date_done': date_done.strftime('%Y-%m-%dT%H:%M:%SZ'),
-            'traceback': self.decode(traceback),
-            'children': self.decode(children),
-        })
-
-    def __reduce__(self, args=(), kwargs={}):
-        kwargs.update(
-            dict(servers=self.servers,
-                 keyspace=self.keyspace,
-                 table=self.table))
-        return super(CassandraBackend, self).__reduce__(args, kwargs)

+ 1 - 1
celery/bin/worker.py

@@ -242,7 +242,7 @@ class worker(Command):
             Option('--scheduler', dest='scheduler_cls'),
             Option('-S', '--statedb',
                    default=conf.worker_state_db, dest='state_db'),
-            Option('-E', '--events', default=conf.worker_send_events,
+            Option('-E', '--events', default=conf.worker_send_task_events,
                    action='store_true', dest='send_events'),
             Option('--time-limit', type='float', dest='task_time_limit',
                    default=conf.task_time_limit),

+ 110 - 72
celery/canvas.py

@@ -216,13 +216,17 @@ class Signature(dict):
         return s
     partial = clone
 
-    def freeze(self, _id=None, group_id=None, chord=None, root_id=None):
+    def freeze(self, _id=None, group_id=None, chord=None,
+               root_id=None, parent_id=None):
         opts = self.options
         try:
             tid = opts['task_id']
         except KeyError:
             tid = opts['task_id'] = _id or uuid()
-        root_id = opts.setdefault('root_id', root_id)
+        if root_id:
+            opts['root_id'] = root_id
+        if parent_id:
+            opts['parent_id'] = parent_id
         if 'reply_to' not in opts:
             opts['reply_to'] = self.app.oid
         if group_id:
@@ -251,6 +255,9 @@ class Signature(dict):
     def set_immutable(self, immutable):
         self.immutable = immutable
 
+    def set_parent_id(self, parent_id):
+        self.parent_id = parent_id
+
     def apply_async(self, args=(), kwargs={}, route_name=None, **options):
         try:
             _apply = self._apply_async
@@ -362,6 +369,8 @@ class Signature(dict):
         except KeyError:
             return _partial(self.app.send_task, self['task'])
     id = _getitem_property('options.task_id')
+    parent_id = _getitem_property('options.parent_id')
+    root_id = _getitem_property('options.root_id')
     task = _getitem_property('task')
     args = _getitem_property('args')
     kwargs = _getitem_property('kwargs')
@@ -399,8 +408,8 @@ class chain(Signature):
             dict(self.options, **options) if options else self.options))
 
     def run(self, args=(), kwargs={}, group_id=None, chord=None,
-            task_id=None, link=None, link_error=None,
-            publisher=None, producer=None, root_id=None, app=None, **options):
+            task_id=None, link=None, link_error=None, publisher=None,
+            producer=None, root_id=None, parent_id=None, app=None, **options):
         app = app or self.app
         use_link = self._use_link
         args = (tuple(args) + tuple(self.args)
@@ -410,35 +419,28 @@ class chain(Signature):
             tasks, results = self._frozen
         else:
             tasks, results = self.prepare_steps(
-                args, self.tasks, root_id, link_error, app,
+                args, self.tasks, root_id, parent_id, link_error, app,
                 task_id, group_id, chord,
             )
 
         if results:
-            # make sure we can do a link() and link_error() on a chain object.
-            if self._use_link:
-                # old task protocol used link for chains, last is last.
-                if link:
-                    tasks[-1].set(link=link)
-                tasks[0].apply_async(**options)
-                return results[-1]
-            else:
-                # -- using chain message field means last task is first.
-                if link:
-                    tasks[0].set(link=link)
-                first_task = tasks.pop()
-                first_task.apply_async(chain=tasks, **options)
-                return results[0]
-
-    def freeze(self, _id=None, group_id=None, chord=None, root_id=None):
+            if link:
+                tasks[0].set(link=link)
+            first_task = tasks.pop()
+            first_task.apply_async(
+                chain=tasks if not use_link else None, **options)
+            return results[0]
+
+    def freeze(self, _id=None, group_id=None, chord=None,
+               root_id=None, parent_id=None):
         _, results = self._frozen = self.prepare_steps(
-            self.args, self.tasks, root_id, None,
+            self.args, self.tasks, root_id, parent_id, None,
             self.app, _id, group_id, chord, clone=False,
         )
         return results[-1]
 
     def prepare_steps(self, args, tasks,
-                      root_id=None, link_error=None, app=None,
+                      root_id=None, parent_id=None, link_error=None, app=None,
                       last_task_id=None, group_id=None, chord_body=None,
                       clone=True, from_dict=Signature.from_dict):
         app = app or self.app
@@ -452,17 +454,16 @@ class chain(Signature):
             use_link = False
         steps = deque(tasks)
 
-        steps_pop = steps.popleft if use_link else steps.pop
-        steps_extend = steps.extendleft if use_link else steps.extend
-        extend_order = reversed if use_link else noop
+        steps_pop = steps.pop
+        steps_extend = steps.extend
 
-        next_step = prev_task = prev_res = None
+        next_step = prev_task = prev_prev_task = None
+        prev_res = prev_prev_res = None
         tasks, results = [], []
         i = 0
         while steps:
             task = steps_pop()
-            last_task = not steps if use_link else not i
-            first_task = not i if use_link else not steps
+            is_first_task, is_last_task = not steps, not i
 
             if not isinstance(task, abstract.CallableSignature):
                 task = from_dict(task, app=app)
@@ -471,30 +472,27 @@ class chain(Signature):
 
             # first task gets partial args from chain
             if clone:
-                task = task.clone(args) if not i else task.clone()
-            elif first_task:
+                task = task.clone(args) if is_first_task else task.clone()
+            elif is_first_task:
                 task.args = tuple(args) + tuple(task.args)
 
             if isinstance(task, chain):
                 # splice the chain
-                steps_extend(extend_order(task.tasks))
+                steps_extend(task.tasks)
                 continue
-            elif isinstance(task, group):
-                if (steps if use_link else prev_task):
-                    # automatically upgrade group(...) | s to chord(group, s)
-                    try:
-                        next_step = steps_pop() if use_link else prev_task
-                        # for chords we freeze by pretending it's a normal
-                        # signature instead of a group.
-                        res = Signature.freeze(next_step, root_id=root_id)
-                        task = chord(
-                            task, body=next_step,
-                            task_id=res.task_id, root_id=root_id,
-                        )
-                    except IndexError:
-                        pass  # no callback, so keep as group.
-
-            if last_task:
+
+            if isinstance(task, group) and prev_task:
+                # automatically upgrade group(...) | s to chord(group, s)
+                # for chords we freeze by pretending it's a normal
+                # signature instead of a group.
+                tasks.pop()
+                results.pop()
+                prev_res = prev_prev_res
+                task = chord(
+                    task, body=prev_task,
+                    task_id=res.task_id, root_id=root_id, app=app,
+                )
+            if is_last_task:
                 # chain(task_id=id) means task id is set for the last task
                 # in the chain.  If the chord is part of a chord/group
                 # then that chord/group must synchronize based on the
@@ -506,26 +504,36 @@ class chain(Signature):
                 )
             else:
                 res = task.freeze(root_id=root_id)
-            root_id = res.id if root_id is None else root_id
+
             i += 1
 
             if prev_task:
+                prev_task.set_parent_id(task.id)
                 if use_link:
                     # link previous task to this task.
-                    prev_task.link(task)
-                    if not res.parent:
-                        res.parent = prev_res
-                else:
+                    task.link(prev_task)
+                    if not res.parent and prev_res:
+                        prev_res.parent = res.parent
+                elif prev_res:
                     prev_res.parent = res
 
+            if is_first_task and parent_id is not None:
+                task.set_parent_id(parent_id)
+
             if link_error:
                 task.set(link_error=link_error)
 
             tasks.append(task)
             results.append(res)
 
-            prev_task, prev_res = task, res
+            prev_prev_task, prev_task, prev_prev_res, prev_res = (
+                prev_task, task, prev_res, res,
+            )
 
+        if root_id is None and tasks:
+            root_id = tasks[-1].id
+            for task in reversed(tasks):
+                task.options['root_id'] = root_id
         return tasks, results
 
     def apply(self, args=(), kwargs={}, **options):
@@ -644,13 +652,16 @@ class chunks(Signature):
         return cls(task, it, n, app=app)()
 
 
-def _maybe_group(tasks):
+def _maybe_group(tasks, app):
+    if isinstance(tasks, dict):
+        tasks = signature(tasks, app=app)
+
     if isinstance(tasks, group):
-        tasks = list(tasks.tasks)
+        tasks = tasks.tasks
     elif isinstance(tasks, abstract.CallableSignature):
         tasks = [tasks]
     else:
-        tasks = [signature(t) for t in regen(tasks)]
+        tasks = [signature(t, app=app) for t in regen(tasks)]
     return tasks
 
 
@@ -659,8 +670,9 @@ class group(Signature):
     tasks = _getitem_property('kwargs.tasks')
 
     def __init__(self, *tasks, **options):
+        app = options.get('app')
         if len(tasks) == 1:
-            tasks = _maybe_group(tasks[0])
+            tasks = _maybe_group(tasks[0], app)
         Signature.__init__(
             self, 'celery.group', (), {'tasks': tasks}, **options
         )
@@ -672,6 +684,9 @@ class group(Signature):
             d, group(d['kwargs']['tasks'], app=app, **d['options']),
         )
 
+    def __len__(self):
+        return len(self.tasks)
+
     def _prepared(self, tasks, partial_args, group_id, root_id, app, dict=dict,
                   CallableSignature=abstract.CallableSignature,
                   from_dict=Signature.from_dict):
@@ -686,7 +701,7 @@ class group(Signature):
                     task = from_dict(task, app=app)
                 if isinstance(task, group):
                     # needs yield_from :(
-                    unroll = task._prepared(
+                    unroll = task_prepared(
                         task.tasks, partial_args, group_id, root_id, app,
                     )
                     for taskN, resN in unroll:
@@ -713,6 +728,10 @@ class group(Signature):
             options.pop('task_id', uuid()))
         return options, group_id, options.get('root_id')
 
+    def set_parent_id(self, parent_id):
+        for task in self.tasks:
+            task.set_parent_id(parent_id)
+
     def apply_async(self, args=(), kwargs=None, add_to_parent=True,
                     producer=None, **options):
         app = self.app
@@ -767,7 +786,7 @@ class group(Signature):
     def __call__(self, *partial_args, **options):
         return self.apply_async(partial_args, **options)
 
-    def _freeze_unroll(self, new_tasks, group_id, chord, root_id):
+    def _freeze_unroll(self, new_tasks, group_id, chord, root_id, parent_id):
         stack = deque(self.tasks)
         while stack:
             task = maybe_signature(stack.popleft(), app=self._app).clone()
@@ -776,9 +795,11 @@ class group(Signature):
             else:
                 new_tasks.append(task)
                 yield task.freeze(group_id=group_id,
-                                  chord=chord, root_id=root_id)
+                                  chord=chord, root_id=root_id,
+                                  parent_id=parent_id)
 
-    def freeze(self, _id=None, group_id=None, chord=None, root_id=None):
+    def freeze(self, _id=None, group_id=None, chord=None,
+               root_id=None, parent_id=None):
         opts = self.options
         try:
             gid = opts['task_id']
@@ -789,11 +810,12 @@ class group(Signature):
         if chord:
             opts['chord'] = chord
         root_id = opts.setdefault('root_id', root_id)
+        parent_id = opts.setdefault('parent_id', parent_id)
         new_tasks = []
         # Need to unroll subgroups early so that chord gets the
         # right result instance for chord_unlock etc.
         results = list(self._freeze_unroll(
-            new_tasks, group_id, chord, root_id,
+            new_tasks, group_id, chord, root_id, parent_id,
         ))
         if isinstance(self.tasks, MutableSequence):
             self.tasks[:] = new_tasks
@@ -829,16 +851,29 @@ class group(Signature):
 class chord(Signature):
 
     def __init__(self, header, body=None, task='celery.chord',
-                 args=(), kwargs={}, **options):
+                 args=(), kwargs={}, app=None, **options):
         Signature.__init__(
             self, task, args,
-            dict(kwargs, header=_maybe_group(header),
+            dict(kwargs, header=_maybe_group(header, app),
                  body=maybe_signature(body, app=self._app)), **options
         )
         self.subtask_type = 'chord'
 
-    def freeze(self, *args, **kwargs):
-        return self.body.freeze(*args, **kwargs)
+    def freeze(self, _id=None, group_id=None, chord=None,
+               root_id=None, parent_id=None):
+        if not isinstance(self.tasks, group):
+            self.tasks = group(self.tasks)
+        self.tasks.freeze(parent_id=parent_id, root_id=root_id)
+        self.id = self.tasks.id
+        return self.body.freeze(_id, parent_id=self.id, root_id=root_id)
+
+    def set_parent_id(self, parent_id):
+        tasks = self.tasks
+        if isinstance(tasks, group):
+            tasks = tasks.tasks
+        for task in tasks:
+            task.set_parent_id(parent_id)
+        self.parent_id = parent_id
 
     @classmethod
     def from_dict(self, d, app=None):
@@ -858,7 +893,11 @@ class chord(Signature):
     def _get_app(self, body=None):
         app = self._app
         if app is None:
-            app = self.tasks[0]._app
+            try:
+                tasks = self.tasks.tasks  # is a group
+            except AttributeError:
+                tasks = self.tasks
+            app = tasks[0]._app
             if app is None and body is not None:
                 app = body._app
         return app if app is not None else current_app
@@ -900,16 +939,15 @@ class chord(Signature):
         return sum(self._traverse_tasks(self.tasks, 1))
 
     def run(self, header, body, partial_args, app=None, interval=None,
-            countdown=1, max_retries=None, propagate=None, eager=False,
+            countdown=1, max_retries=None, eager=False,
             task_id=None, **options):
         app = app or self._get_app(body)
-        propagate = (app.conf.chord_propagates
-                     if propagate is None else propagate)
         group_id = uuid()
         root_id = body.options.get('root_id')
         body.chord_size = self.__length_hint__()
         options = dict(self.options, **options) if options else self.options
         if options:
+            options.pop('task_id', None)
             body.options.update(options)
 
         results = header.freeze(
@@ -920,7 +958,7 @@ class chord(Signature):
             header, partial_args, group_id, body,
             interval=interval, countdown=countdown,
             options=options, max_retries=max_retries,
-            propagate=propagate, result=results)
+            result=results)
         bodyres.parent = parent
         return bodyres
 

+ 15 - 5
celery/contrib/rdb.py

@@ -132,13 +132,23 @@ class Rdb(Pdb):
     def say(self, m):
         print(m, file=self.out)
 
+    def __enter__(self):
+        return self
+
+    def __exit__(self, *exc_info):
+        self._close_session()
+
     def _close_session(self):
         self.stdin, self.stdout = sys.stdin, sys.stdout = self._prev_handles
-        self._handle.close()
-        self._client.close()
-        self._sock.close()
-        self.active = False
-        self.say(SESSION_ENDED.format(self=self))
+        if self.active:
+            if self._handle is not None:
+                self._handle.close()
+            if self._client is not None:
+                self._client.close()
+            if self._sock is not None:
+                self._sock.close()
+            self.active = False
+            self.say(SESSION_ENDED.format(self=self))
 
     def do_continue(self, arg):
         self._close_session()

+ 18 - 9
celery/events/state.py

@@ -233,11 +233,13 @@ class Task(object):
     state = states.PENDING
     clock = 0
 
-    _fields = ('uuid', 'name', 'state', 'received', 'sent', 'started',
-               'succeeded', 'failed', 'retried', 'revoked', 'args', 'kwargs',
-               'eta', 'expires', 'retries', 'worker', 'result', 'exception',
-               'timestamp', 'runtime', 'traceback', 'exchange', 'routing_key',
-               'clock', 'client')
+    _fields = (
+        'uuid', 'name', 'state', 'received', 'sent', 'started',
+        'succeeded', 'failed', 'retried', 'revoked', 'args', 'kwargs',
+        'eta', 'expires', 'retries', 'worker', 'result', 'exception',
+        'timestamp', 'runtime', 'traceback', 'exchange', 'routing_key',
+        'clock', 'client', 'root_id', 'parent_id',
+    )
     if not PYPY:
         __slots__ = ('__dict__', '__weakref__')
 
@@ -249,12 +251,19 @@ class Task(object):
     #: that state. ``(RECEIVED, ('name', 'args')``, means the name and args
     #: fields are always taken from the RECEIVED state, and any values for
     #: these fields received before or after is simply ignored.
-    merge_rules = {states.RECEIVED: ('name', 'args', 'kwargs',
-                                     'retries', 'eta', 'expires')}
+    merge_rules = {
+        states.RECEIVED: (
+            'name', 'args', 'kwargs', 'parent_id',
+            'root_id' 'retries', 'eta', 'expires',
+        ),
+    }
 
     #: meth:`info` displays these fields by default.
-    _info_fields = ('args', 'kwargs', 'retries', 'result', 'eta', 'runtime',
-                    'expires', 'exception', 'exchange', 'routing_key')
+    _info_fields = (
+        'args', 'kwargs', 'retries', 'result', 'eta', 'runtime',
+        'expires', 'exception', 'exchange', 'routing_key',
+        'root_id', 'parent_id',
+    )
 
     def __init__(self, uuid=None, **kwargs):
         self.uuid = uuid

+ 3 - 1
celery/result.py

@@ -122,7 +122,7 @@ class AsyncResult(ResultBase):
                                 reply=wait, timeout=timeout)
 
     def get(self, timeout=None, propagate=True, interval=0.5,
-            no_ack=True, follow_parents=True,
+            no_ack=True, follow_parents=True, callback=None,
             EXCEPTION_STATES=states.EXCEPTION_STATES,
             PROPAGATE_STATES=states.PROPAGATE_STATES):
         """Wait until task is ready, and return its result.
@@ -174,6 +174,8 @@ class AsyncResult(ResultBase):
             status = meta['status']
             if status in PROPAGATE_STATES and propagate:
                 raise meta['result']
+            if callback is not None:
+                callback(self.id, meta['result'])
             return meta['result']
     wait = get  # deprecated alias to :meth:`get`.
 

+ 6 - 0
celery/signals.py

@@ -50,6 +50,12 @@ task_failure = Signal(providing_args=[
 task_revoked = Signal(providing_args=[
     'request', 'terminated', 'signum', 'expired',
 ])
+task_rejected = Signal(providing_args=[
+    'message', 'exc',
+])
+task_unknown = Signal(providing_args=[
+    'message', 'exc', 'name', 'id',
+])
 celeryd_init = Signal(providing_args=['instance', 'conf', 'options'])
 celeryd_after_setup = Signal(providing_args=['instance', 'conf'])
 import_modules = Signal(providing_args=[])

+ 59 - 5
celery/tests/app/test_builtins.py

@@ -133,23 +133,77 @@ class test_chain(BuiltinsCase):
         self.assertTrue(result.parent.parent)
         self.assertIsNone(result.parent.parent.parent)
 
+    def test_group_to_chord__freeze_parent_id(self):
+        def using_freeze(c):
+            c.freeze(parent_id='foo', root_id='root')
+            return c._frozen[0]
+        self.assert_group_to_chord_parent_ids(using_freeze)
+
+    def assert_group_to_chord_parent_ids(self, freezefun):
+        c = (
+            self.add.s(5, 5) |
+            group([self.add.s(i, i) for i in range(5)], app=self.app) |
+            self.add.si(10, 10) |
+            self.add.si(20, 20) |
+            self.add.si(30, 30)
+        )
+        tasks = freezefun(c)
+        self.assertEqual(tasks[-1].parent_id, 'foo')
+        self.assertEqual(tasks[-1].root_id, 'root')
+        self.assertEqual(tasks[-2].parent_id, tasks[-1].id)
+        self.assertEqual(tasks[-2].root_id, 'root')
+        self.assertEqual(tasks[-2].body.parent_id, tasks[-2].tasks.id)
+        self.assertEqual(tasks[-2].body.parent_id, tasks[-2].id)
+        self.assertEqual(tasks[-2].body.root_id, 'root')
+        self.assertEqual(tasks[-2].tasks.tasks[0].parent_id, tasks[-1].id)
+        self.assertEqual(tasks[-2].tasks.tasks[0].root_id, 'root')
+        self.assertEqual(tasks[-2].tasks.tasks[1].parent_id, tasks[-1].id)
+        self.assertEqual(tasks[-2].tasks.tasks[1].root_id, 'root')
+        self.assertEqual(tasks[-2].tasks.tasks[2].parent_id, tasks[-1].id)
+        self.assertEqual(tasks[-2].tasks.tasks[2].root_id, 'root')
+        self.assertEqual(tasks[-2].tasks.tasks[3].parent_id, tasks[-1].id)
+        self.assertEqual(tasks[-2].tasks.tasks[3].root_id, 'root')
+        self.assertEqual(tasks[-2].tasks.tasks[4].parent_id, tasks[-1].id)
+        self.assertEqual(tasks[-2].tasks.tasks[4].root_id, 'root')
+        self.assertEqual(tasks[-3].parent_id, tasks[-2].body.id)
+        self.assertEqual(tasks[-3].root_id, 'root')
+        self.assertEqual(tasks[-4].parent_id, tasks[-3].id)
+        self.assertEqual(tasks[-4].root_id, 'root')
+
     def test_group_to_chord(self):
         c = (
+            self.add.s(5) |
             group([self.add.s(i, i) for i in range(5)], app=self.app) |
             self.add.s(10) |
             self.add.s(20) |
             self.add.s(30)
         )
         c._use_link = True
-        tasks, _ = c.prepare_steps((), c.tasks)
-        self.assertIsInstance(tasks[0], chord)
-        self.assertTrue(tasks[0].body.options['link'])
-        self.assertTrue(tasks[0].body.options['link'][0].options['link'])
+        tasks, results = c.prepare_steps((), c.tasks)
+
+        self.assertEqual(tasks[-1].args[0], 5)
+        self.assertIsInstance(tasks[-2], chord)
+        self.assertEqual(len(tasks[-2].tasks), 5)
+        self.assertEqual(tasks[-2].parent_id, tasks[-1].id)
+        self.assertEqual(tasks[-2].root_id, tasks[-1].id)
+        self.assertEqual(tasks[-2].body.args[0], 10)
+        self.assertEqual(tasks[-2].body.parent_id, tasks[-2].id)
+
+        self.assertEqual(tasks[-3].args[0], 20)
+        self.assertEqual(tasks[-3].root_id, tasks[-1].id)
+        self.assertEqual(tasks[-3].parent_id, tasks[-2].body.id)
+
+        self.assertEqual(tasks[-4].args[0], 30)
+        self.assertEqual(tasks[-4].parent_id, tasks[-3].id)
+        self.assertEqual(tasks[-4].root_id, tasks[-1].id)
+
+        self.assertTrue(tasks[-2].body.options['link'])
+        self.assertTrue(tasks[-2].body.options['link'][0].options['link'])
 
         c2 = self.add.s(2, 2) | group(self.add.s(i, i) for i in range(10))
         c2._use_link = True
         tasks2, _ = c2.prepare_steps((), c2.tasks)
-        self.assertIsInstance(tasks2[1], group)
+        self.assertIsInstance(tasks2[0], group)
 
     def test_group_to_chord__protocol_2(self):
         c = (

+ 3 - 10
celery/tests/backends/test_base.py

@@ -328,24 +328,17 @@ class test_KeyValueStoreBackend(AppCase):
 
     def test_chord_part_return_propagate_set(self):
         with self._chord_part_context(self.b) as (task, deps, _):
-            self.b.on_chord_part_return(
-                task.request, 'SUCCESS', 10, propagate=True,
-            )
+            self.b.on_chord_part_return(task.request, 'SUCCESS', 10)
             self.assertFalse(self.b.expire.called)
             deps.delete.assert_called_with()
             deps.join_native.assert_called_with(propagate=True, timeout=3.0)
 
     def test_chord_part_return_propagate_default(self):
         with self._chord_part_context(self.b) as (task, deps, _):
-            self.b.on_chord_part_return(
-                task.request, 'SUCCESS', 10, propagate=None,
-            )
+            self.b.on_chord_part_return(task.request, 'SUCCESS', 10)
             self.assertFalse(self.b.expire.called)
             deps.delete.assert_called_with()
-            deps.join_native.assert_called_with(
-                propagate=self.b.app.conf.chord_propagates,
-                timeout=3.0,
-            )
+            deps.join_native.assert_called_with(propagate=True, timeout=3.0)
 
     def test_chord_part_return_join_raises_internal(self):
         with self._chord_part_context(self.b) as (task, deps, callback):

+ 101 - 117
celery/tests/backends/test_cassandra.py

@@ -1,74 +1,48 @@
 from __future__ import absolute_import
 
-import socket
-
 from pickle import loads, dumps
+from datetime import datetime
 
 from celery import states
 from celery.exceptions import ImproperlyConfigured
 from celery.tests.case import (
-    AppCase, Mock, mock_module, depends_on_current_app,
+    AppCase, Mock, mock_module, depends_on_current_app
 )
 
+CASSANDRA_MODULES = ['cassandra', 'cassandra.cluster']
+
 
 class Object(object):
     pass
 
 
-def install_exceptions(mod):
-    # py3k: cannot catch exceptions not ineheriting from BaseException.
-
-    class NotFoundException(Exception):
-        pass
-
-    class TException(Exception):
-        pass
-
-    class InvalidRequestException(Exception):
-        pass
-
-    class UnavailableException(Exception):
-        pass
-
-    class TimedOutException(Exception):
-        pass
-
-    class AllServersUnavailable(Exception):
-        pass
-
-    mod.NotFoundException = NotFoundException
-    mod.TException = TException
-    mod.InvalidRequestException = InvalidRequestException
-    mod.TimedOutException = TimedOutException
-    mod.UnavailableException = UnavailableException
-    mod.AllServersUnavailable = AllServersUnavailable
-
-
 class test_CassandraBackend(AppCase):
 
     def setup(self):
         self.app.conf.update(
             cassandra_servers=['example.com'],
-            cassandra_keyspace='keyspace',
-            cassandra_column_family='columns',
+            cassandra_keyspace='celery',
+            cassandra_table='task_results',
         )
 
-    def test_init_no_pycassa(self):
-        with mock_module('pycassa'):
+    def test_init_no_cassandra(self):
+        """should raise ImproperlyConfigured when no python-driver
+        installed."""
+        with mock_module(*CASSANDRA_MODULES):
             from celery.backends import cassandra as mod
-            prev, mod.pycassa = mod.pycassa, None
+            prev, mod.cassandra = mod.cassandra, None
             try:
                 with self.assertRaises(ImproperlyConfigured):
                     mod.CassandraBackend(app=self.app)
             finally:
-                mod.pycassa = prev
+                mod.cassandra = prev
 
     def test_init_with_and_without_LOCAL_QUROM(self):
-        with mock_module('pycassa'):
+        with mock_module(*CASSANDRA_MODULES):
             from celery.backends import cassandra as mod
-            mod.pycassa = Mock()
-            install_exceptions(mod.pycassa)
-            cons = mod.pycassa.ConsistencyLevel = Object()
+            mod.cassandra = Mock()
+
+            cons = mod.cassandra.ConsistencyLevel = Object()
             cons.LOCAL_QUORUM = 'foo'
 
             self.app.conf.cassandra_read_consistency = 'LOCAL_FOO'
@@ -87,104 +61,114 @@ class test_CassandraBackend(AppCase):
 
     @depends_on_current_app
     def test_reduce(self):
-        with mock_module('pycassa'):
+        with mock_module(*CASSANDRA_MODULES):
             from celery.backends.cassandra import CassandraBackend
             self.assertTrue(loads(dumps(CassandraBackend(app=self.app))))
 
     def test_get_task_meta_for(self):
-        with mock_module('pycassa'):
+        with mock_module(*CASSANDRA_MODULES):
             from celery.backends import cassandra as mod
-            mod.pycassa = Mock()
-            install_exceptions(mod.pycassa)
-            mod.Thrift = Mock()
-            install_exceptions(mod.Thrift)
+            mod.cassandra = Mock()
+
             x = mod.CassandraBackend(app=self.app)
-            Get_Column = x._get_column_family = Mock()
-            get_column = Get_Column.return_value = Mock()
-            get = get_column.get
-            META = get.return_value = {
-                'task_id': 'task_id',
-                'status': states.SUCCESS,
-                'result': '1',
-                'date_done': 'date',
-                'traceback': '',
-                'children': None,
-            }
+            x._connection = True
+            session = x._session = Mock()
+            execute = session.execute = Mock()
+            execute.return_value = [
+                [states.SUCCESS, '1', datetime.now(), b'', b'']
+            ]
             x.decode = Mock()
-            x.detailed_mode = False
-            meta = x._get_task_meta_for('task_id')
-            self.assertEqual(meta['status'], states.SUCCESS)
-
-            x.detailed_mode = True
-            row = get.return_value = Mock()
-            row.values.return_value = [Mock()]
-            x.decode.return_value = META
             meta = x._get_task_meta_for('task_id')
             self.assertEqual(meta['status'], states.SUCCESS)
-            x.decode.return_value = Mock()
 
-            x.detailed_mode = False
-            get.side_effect = KeyError()
+            x._session.execute.return_value = []
             meta = x._get_task_meta_for('task_id')
             self.assertEqual(meta['status'], states.PENDING)
 
-            calls = [0]
-            end = [10]
-
-            def work_eventually(*arg):
-                try:
-                    if calls[0] > end[0]:
-                        return META
-                    raise socket.error()
-                finally:
-                    calls[0] += 1
-            get.side_effect = work_eventually
-            x._retry_timeout = 10
-            x._retry_wait = 0.01
-            meta = x._get_task_meta_for('task')
-            self.assertEqual(meta['status'], states.SUCCESS)
-
-            x._retry_timeout = 0.1
-            calls[0], end[0] = 0, 100
-            with self.assertRaises(socket.error):
-                x._get_task_meta_for('task')
-
     def test_store_result(self):
-        with mock_module('pycassa'):
+        with mock_module(*CASSANDRA_MODULES):
             from celery.backends import cassandra as mod
-            mod.pycassa = Mock()
-            install_exceptions(mod.pycassa)
-            mod.Thrift = Mock()
-            install_exceptions(mod.Thrift)
-            x = mod.CassandraBackend(app=self.app)
-            Get_Column = x._get_column_family = Mock()
-            cf = Get_Column.return_value = Mock()
-            x.detailed_mode = False
-            x._store_result('task_id', 'result', states.SUCCESS)
-            self.assertTrue(cf.insert.called)
+            mod.cassandra = Mock()
 
-            cf.insert.reset()
-            x.detailed_mode = True
+            x = mod.CassandraBackend(app=self.app)
+            x._connection = True
+            session = x._session = Mock()
+            session.execute = Mock()
             x._store_result('task_id', 'result', states.SUCCESS)
-            self.assertTrue(cf.insert.called)
 
     def test_process_cleanup(self):
-        with mock_module('pycassa'):
+        with mock_module(*CASSANDRA_MODULES):
             from celery.backends import cassandra as mod
             x = mod.CassandraBackend(app=self.app)
-            x._column_family = None
             x.process_cleanup()
 
-            x._column_family = True
-            x.process_cleanup()
-            self.assertIsNone(x._column_family)
+            self.assertIsNone(x._connection)
+            self.assertIsNone(x._session)
 
-    def test_get_column_family(self):
-        with mock_module('pycassa'):
+    def test_timeouting_cluster(self):
+        """
+        Tests behaviour when Cluster.connect raises cassandra.OperationTimedOut
+        """
+        with mock_module(*CASSANDRA_MODULES):
             from celery.backends import cassandra as mod
-            mod.pycassa = Mock()
-            install_exceptions(mod.pycassa)
+
+            class OTOExc(Exception):
+                pass
+
+            class VeryFaultyCluster(object):
+                def __init__(self, *args, **kwargs):
+                    pass
+
+                def connect(self, *args, **kwargs):
+                    raise OTOExc()
+
+                def shutdown(self):
+                    pass
+
+            mod.cassandra = Mock()
+            mod.cassandra.OperationTimedOut = OTOExc
+            mod.cassandra.cluster = Mock()
+            mod.cassandra.cluster.Cluster = VeryFaultyCluster
+
             x = mod.CassandraBackend(app=self.app)
-            self.assertTrue(x._get_column_family())
-            self.assertIsNotNone(x._column_family)
-            self.assertIs(x._get_column_family(), x._column_family)
+
+            with self.assertRaises(OTOExc):
+                x._store_result('task_id', 'result', states.SUCCESS)
+            self.assertIsNone(x._connection)
+            self.assertIsNone(x._session)
+
+            x.process_cleanup()  # should not raise
+
+
+    def test_please_free_memory(self):
+        """
+        Ensure that Cluster object IS shut down.
+        """
+        with mock_module(*CASSANDRA_MODULES):
+            from celery.backends import cassandra as mod
+
+            class RAMHoggingCluster(object):
+
+                objects_alive = 0
+
+                def __init__(self, *args, **kwargs):
+                    pass
+
+                def connect(self, *args, **kwargs):
+                    RAMHoggingCluster.objects_alive += 1
+                    return Mock()
+
+                def shutdown(self):
+                    RAMHoggingCluster.objects_alive -= 1
+
+            mod.cassandra = Mock()
+
+            mod.cassandra.cluster = Mock()
+            mod.cassandra.cluster.Cluster = RAMHoggingCluster
+
+            for x in range(0, 10):
+                x = mod.CassandraBackend(app=self.app)
+                x._store_result('task_id', 'result', states.SUCCESS)
+                x.process_cleanup()
+
+            self.assertEquals(RAMHoggingCluster.objects_alive, 0)

+ 0 - 174
celery/tests/backends/test_new_cassandra.py

@@ -1,174 +0,0 @@
-from __future__ import absolute_import
-
-from pickle import loads, dumps
-from datetime import datetime
-
-from celery import states
-from celery.exceptions import ImproperlyConfigured
-from celery.tests.case import (
-    AppCase, Mock, mock_module, depends_on_current_app
-)
-
-CASSANDRA_MODULES = ['cassandra', 'cassandra.cluster']
-
-
-class Object(object):
-    pass
-
-
-class test_CassandraBackend(AppCase):
-
-    def setup(self):
-        self.app.conf.update(
-            cassandra_servers=['example.com'],
-            cassandra_keyspace='celery',
-            cassandra_table='task_results',
-        )
-
-    def test_init_no_cassandra(self):
-        """should raise ImproperlyConfigured when no python-driver
-        installed."""
-        with mock_module(*CASSANDRA_MODULES):
-            from celery.backends import new_cassandra as mod
-            prev, mod.cassandra = mod.cassandra, None
-            try:
-                with self.assertRaises(ImproperlyConfigured):
-                    mod.CassandraBackend(app=self.app)
-            finally:
-                mod.cassandra = prev
-
-    def test_init_with_and_without_LOCAL_QUROM(self):
-        with mock_module(*CASSANDRA_MODULES):
-            from celery.backends import new_cassandra as mod
-            mod.cassandra = Mock()
-
-            cons = mod.cassandra.ConsistencyLevel = Object()
-            cons.LOCAL_QUORUM = 'foo'
-
-            self.app.conf.cassandra_read_consistency = 'LOCAL_FOO'
-            self.app.conf.cassandra_write_consistency = 'LOCAL_FOO'
-
-            mod.CassandraBackend(app=self.app)
-            cons.LOCAL_FOO = 'bar'
-            mod.CassandraBackend(app=self.app)
-
-            # no servers raises ImproperlyConfigured
-            with self.assertRaises(ImproperlyConfigured):
-                self.app.conf.cassandra_servers = None
-                mod.CassandraBackend(
-                    app=self.app, keyspace='b', column_family='c',
-                )
-
-    @depends_on_current_app
-    def test_reduce(self):
-        with mock_module(*CASSANDRA_MODULES):
-            from celery.backends.new_cassandra import CassandraBackend
-            self.assertTrue(loads(dumps(CassandraBackend(app=self.app))))
-
-    def test_get_task_meta_for(self):
-        with mock_module(*CASSANDRA_MODULES):
-            from celery.backends import new_cassandra as mod
-            mod.cassandra = Mock()
-
-            x = mod.CassandraBackend(app=self.app)
-            x._connection = True
-            session = x._session = Mock()
-            execute = session.execute = Mock()
-            execute.return_value = [
-                [states.SUCCESS, '1', datetime.now(), b'', b'']
-            ]
-            x.decode = Mock()
-            meta = x._get_task_meta_for('task_id')
-            self.assertEqual(meta['status'], states.SUCCESS)
-
-            x._session.execute.return_value = []
-            meta = x._get_task_meta_for('task_id')
-            self.assertEqual(meta['status'], states.PENDING)
-
-    def test_store_result(self):
-        with mock_module(*CASSANDRA_MODULES):
-            from celery.backends import new_cassandra as mod
-            mod.cassandra = Mock()
-
-            x = mod.CassandraBackend(app=self.app)
-            x._connection = True
-            session = x._session = Mock()
-            session.execute = Mock()
-            x._store_result('task_id', 'result', states.SUCCESS)
-
-    def test_process_cleanup(self):
-        with mock_module(*CASSANDRA_MODULES):
-            from celery.backends import new_cassandra as mod
-            x = mod.CassandraBackend(app=self.app)
-            x.process_cleanup()
-
-            self.assertIsNone(x._connection)
-            self.assertIsNone(x._session)
-
-    def test_timeouting_cluster(self):
-        """
-        Tests behaviour when Cluster.connect raises cassandra.OperationTimedOut
-        """
-        with mock_module(*CASSANDRA_MODULES):
-            from celery.backends import new_cassandra as mod
-
-            class OTOExc(Exception):
-                pass
-
-            class VeryFaultyCluster(object):
-                def __init__(self, *args, **kwargs):
-                    pass
-
-                def connect(self, *args, **kwargs):
-                    raise OTOExc()
-
-                def shutdown(self):
-                    pass
-
-            mod.cassandra = Mock()
-            mod.cassandra.OperationTimedOut = OTOExc
-            mod.cassandra.cluster = Mock()
-            mod.cassandra.cluster.Cluster = VeryFaultyCluster
-
-            x = mod.CassandraBackend(app=self.app)
-
-            with self.assertRaises(OTOExc):
-                x._store_result('task_id', 'result', states.SUCCESS)
-            self.assertIsNone(x._connection)
-            self.assertIsNone(x._session)
-
-            x.process_cleanup()  # should not raise
-
-
-    def test_please_free_memory(self):
-        """
-        Ensure that Cluster object IS shut down.
-        """
-        with mock_module(*CASSANDRA_MODULES):
-            from celery.backends import new_cassandra as mod
-
-            class RAMHoggingCluster(object):
-
-                objects_alive = 0
-
-                def __init__(self, *args, **kwargs):
-                    pass
-
-                def connect(self, *args, **kwargs):
-                    RAMHoggingCluster.objects_alive += 1
-                    return Mock()
-
-                def shutdown(self):
-                    RAMHoggingCluster.objects_alive -= 1
-
-            mod.cassandra = Mock()
-
-            mod.cassandra.cluster = Mock()
-            mod.cassandra.cluster.Cluster = RAMHoggingCluster
-
-            for x in range(0, 10):
-                x = mod.CassandraBackend(app=self.app)
-                x._store_result('task_id', 'result', states.SUCCESS)
-                x.process_cleanup()
-
-            self.assertEquals(RAMHoggingCluster.objects_alive, 0)

+ 4 - 4
celery/tests/case.py

@@ -452,11 +452,11 @@ class AppCase(Case):
         assert sys.__stdout__
         assert sys.__stderr__
         this = self._get_test_name()
-        if isinstance(sys.stdout, LoggingProxy) or \
-                isinstance(sys.__stdout__, LoggingProxy):
+        if isinstance(sys.stdout, (LoggingProxy, Mock)) or \
+                isinstance(sys.__stdout__, (LoggingProxy, Mock)):
             raise RuntimeError(CASE_LOG_REDIRECT_EFFECT.format(this, 'stdout'))
-        if isinstance(sys.stderr, LoggingProxy) or \
-                isinstance(sys.__stderr__, LoggingProxy):
+        if isinstance(sys.stderr, (LoggingProxy, Mock)) or \
+                isinstance(sys.__stderr__, (LoggingProxy, Mock)):
             raise RuntimeError(CASE_LOG_REDIRECT_EFFECT.format(this, 'stderr'))
         backend = self.app.__dict__.get('backend')
         if backend is not None:

+ 39 - 34
celery/tests/contrib/test_rdb.py

@@ -8,14 +8,14 @@ from celery.contrib.rdb import (
     debugger,
     set_trace,
 )
-from celery.tests.case import Case, Mock, WhateverIO, patch, skip_if_pypy
+from celery.tests.case import AppCase, Mock, WhateverIO, patch, skip_if_pypy
 
 
 class SockErr(socket.error):
     errno = None
 
 
-class test_Rdb(Case):
+class test_Rdb(AppCase):
 
     @patch('celery.contrib.rdb.Rdb')
     def test_debugger(self, Rdb):
@@ -37,56 +37,60 @@ class test_Rdb(Case):
         get_avail_port.return_value = (sock, 8000)
         sock.accept.return_value = (Mock(), ['helu'])
         out = WhateverIO()
-        rdb = Rdb(out=out)
-        self.assertTrue(get_avail_port.called)
-        self.assertIn('helu', out.getvalue())
-
-        # set_quit
-        with patch('sys.settrace') as settrace:
-            rdb.set_quit()
-            settrace.assert_called_with(None)
-
-        # set_trace
-        with patch('celery.contrib.rdb.Pdb.set_trace') as pset:
-            with patch('celery.contrib.rdb._frame'):
-                rdb.set_trace()
-                rdb.set_trace(Mock())
-                pset.side_effect = SockErr
-                pset.side_effect.errno = errno.ENOENT
-                with self.assertRaises(SockErr):
+        with Rdb(out=out) as rdb:
+            self.assertTrue(get_avail_port.called)
+            self.assertIn('helu', out.getvalue())
+
+            # set_quit
+            with patch('sys.settrace') as settrace:
+                rdb.set_quit()
+                settrace.assert_called_with(None)
+
+            # set_trace
+            with patch('celery.contrib.rdb.Pdb.set_trace') as pset:
+                with patch('celery.contrib.rdb._frame'):
                     rdb.set_trace()
+                    rdb.set_trace(Mock())
+                    pset.side_effect = SockErr
+                    pset.side_effect.errno = errno.ENOENT
+                    with self.assertRaises(SockErr):
+                        rdb.set_trace()
 
-        # _close_session
-        rdb._close_session()
+            # _close_session
+            rdb._close_session()
 
-        # do_continue
-        rdb.set_continue = Mock()
-        rdb.do_continue(Mock())
-        rdb.set_continue.assert_called_with()
+            # do_continue
+            rdb.set_continue = Mock()
+            rdb.do_continue(Mock())
+            rdb.set_continue.assert_called_with()
 
-        # do_quit
-        rdb.set_quit = Mock()
-        rdb.do_quit(Mock())
-        rdb.set_quit.assert_called_with()
+            # do_quit
+            rdb.set_quit = Mock()
+            rdb.do_quit(Mock())
+            rdb.set_quit.assert_called_with()
 
     @patch('socket.socket')
     @skip_if_pypy
     def test_get_avail_port(self, sock):
         out = WhateverIO()
         sock.return_value.accept.return_value = (Mock(), ['helu'])
-        Rdb(out=out)
+        with Rdb(out=out) as rdb:
+            pass
 
         with patch('celery.contrib.rdb.current_process') as curproc:
             curproc.return_value.name = 'PoolWorker-10'
-            Rdb(out=out)
+            with Rdb(out=out) as rdb:
+                pass
 
         err = sock.return_value.bind.side_effect = SockErr()
         err.errno = errno.ENOENT
         with self.assertRaises(SockErr):
-            Rdb(out=out)
+            with Rdb(out=out) as rdb:
+                pass
         err.errno = errno.EADDRINUSE
         with self.assertRaises(Exception):
-            Rdb(out=out)
+            with Rdb(out=out) as rdb:
+                pass
         called = [0]
 
         def effect(*a, **kw):
@@ -97,4 +101,5 @@ class test_Rdb(Case):
             finally:
                 called[0] += 1
         sock.return_value.bind.side_effect = effect
-        Rdb(out=out)
+        with Rdb(out=out) as rdb:
+            pass

+ 2 - 0
celery/tests/events/test_state.py

@@ -244,6 +244,8 @@ class test_Task(AppCase):
                     eta=1,
                     runtime=0.0001,
                     expires=1,
+                    parent_id='bdefc',
+                    root_id='dedfef',
                     foo=None,
                     exception=1,
                     received=time() - 10,

+ 36 - 2
celery/tests/tasks/test_canvas.py

@@ -14,7 +14,7 @@ from celery.canvas import (
 )
 from celery.result import EagerResult
 
-from celery.tests.case import AppCase, Mock
+from celery.tests.case import AppCase, ContextMock, Mock
 
 SIG = Signature({'task': 'TASK',
                  'args': ('A1',),
@@ -233,6 +233,40 @@ class test_chain(CanvasCase):
         self.assertIsNone(chain(app=self.app)())
         self.assertIsNone(chain(app=self.app).apply_async())
 
+    def test_root_id_parent_id(self):
+        self.app.conf.task_protocol = 2
+        c = chain(self.add.si(i, i) for i in range(4))
+        c.freeze()
+        tasks, _ = c._frozen
+        for i, task in enumerate(tasks):
+            self.assertEqual(task.root_id, tasks[-1].id)
+            try:
+                self.assertEqual(task.parent_id, tasks[i + 1].id)
+            except IndexError:
+                assert i == len(tasks) - 1
+            else:
+                valid_parents = i
+        self.assertEqual(valid_parents, len(tasks) - 2)
+
+        self.assert_sent_with_ids(tasks[-1], tasks[-1].id, 'foo',
+                                  parent_id='foo')
+        self.assertTrue(tasks[-2].options['parent_id'])
+        self.assert_sent_with_ids(tasks[-2], tasks[-1].id, tasks[-1].id)
+        self.assert_sent_with_ids(tasks[-3], tasks[-1].id, tasks[-2].id)
+        self.assert_sent_with_ids(tasks[-4], tasks[-1].id, tasks[-3].id)
+
+
+    def assert_sent_with_ids(self, task, rid, pid, **options):
+        self.app.amqp.send_task_message = Mock(name='send_task_message')
+        self.app.backend = Mock()
+        self.app.producer_or_acquire = ContextMock()
+
+        res = task.apply_async(**options)
+        self.assertTrue(self.app.amqp.send_task_message.called)
+        message = self.app.amqp.send_task_message.call_args[0][2]
+        self.assertEqual(message.headers['parent_id'], pid)
+        self.assertEqual(message.headers['root_id'], rid)
+
     def test_call_no_tasks(self):
         x = chain()
         self.assertFalse(x())
@@ -269,7 +303,7 @@ class test_group(CanvasCase):
 
     def test_maybe_group_sig(self):
         self.assertListEqual(
-            _maybe_group(self.add.s(2, 2)), [self.add.s(2, 2)],
+            _maybe_group(self.add.s(2, 2), self.app), [self.add.s(2, 2)],
         )
 
     def test_from_dict(self):

+ 2 - 2
celery/tests/tasks/test_tasks.py

@@ -462,8 +462,8 @@ class test_apply_task(TasksCase):
         with self.assertRaises(KeyError):
             self.raising.apply(throw=True)
 
-    def test_apply_with_task_eager_propagates_exceptions(self):
-        self.app.conf.task_eager_propagates_exceptions = True
+    def test_apply_with_task_eager_propagates(self):
+        self.app.conf.task_eager_propagates = True
         with self.assertRaises(KeyError):
             self.raising.apply()
 

+ 1 - 1
celery/worker/__init__.py

@@ -358,7 +358,7 @@ class WorkController(object):
         self.logfile = logfile
 
         self.concurrency = either('worker_concurrency', concurrency)
-        self.send_events = either('worker_send_events', send_events)
+        self.send_events = either('worker_send_task_events', send_events)
         self.pool_cls = either('worker_pool', pool_cls)
         self.consumer_cls = either('worker_consumer', consumer_cls)
         self.timer_cls = either('worker_timer', timer_cls)

+ 14 - 2
celery/worker/consumer.py

@@ -32,9 +32,10 @@ from kombu.utils.encoding import safe_repr, bytes_t
 from kombu.utils.limits import TokenBucket
 
 from celery import bootsteps
+from celery import signals
 from celery.app.trace import build_tracer
 from celery.canvas import signature
-from celery.exceptions import InvalidTaskError
+from celery.exceptions import InvalidTaskError, NotRegistered
 from celery.utils.functional import noop
 from celery.utils.log import get_logger
 from celery.utils.text import truncate
@@ -434,14 +435,26 @@ class Consumer(object):
     def on_unknown_message(self, body, message):
         warn(UNKNOWN_FORMAT, self._message_report(body, message))
         message.reject_log_error(logger, self.connection_errors)
+        signals.task_rejected.send(sender=self, message=message, exc=None)
 
     def on_unknown_task(self, body, message, exc):
         error(UNKNOWN_TASK_ERROR, exc, dump_body(message, body), exc_info=True)
+        id_, name = message.headers['id'], message.headers['task']
         message.reject_log_error(logger, self.connection_errors)
+        self.app.backend.mark_as_failure(id_, NotRegistered(name))
+        if self.event_dispatcher:
+            self.event_dispatcher.send(
+                'task-failed', uuid=id_,
+                exception='NotRegistered({0!r})'.format(name),
+            )
+        signals.task_unknown.send(
+            sender=self, message=message, exc=exc, name=name, id=id_,
+        )
 
     def on_invalid_task(self, body, message, exc):
         error(INVALID_TASK_ERROR, exc, dump_body(message, body), exc_info=True)
         message.reject_log_error(logger, self.connection_errors)
+        signals.task_rejected.send(sender=self, message=message, exc=exc)
 
     def update_strategies(self):
         loader = self.app.loader
@@ -458,7 +471,6 @@ class Consumer(object):
         callbacks = self.on_task_message
 
         def on_task_received(message):
-
             # payload will only be set for v1 protocol, since v2
             # will defer deserializing the message body to the pool.
             payload = None

+ 5 - 3
celery/worker/request.py

@@ -77,9 +77,9 @@ class Request(object):
 
     if not IS_PYPY:  # pragma: no cover
         __slots__ = (
-            'app', 'type', 'name', 'id', 'on_ack', 'body',
-            'hostname', 'eventer', 'connection_errors', 'task', 'eta',
-            'expires', 'request_dict', 'on_reject', 'utc',
+            'app', 'type', 'name', 'id', 'root_id', 'parent_id',
+            'on_ack', 'body', 'hostname', 'eventer', 'connection_errors',
+            'task', 'eta', 'expires', 'request_dict', 'on_reject', 'utc',
             'content_type', 'content_encoding', 'argsrepr', 'kwargsrepr',
             '__weakref__', '__dict__',
         )
@@ -108,6 +108,8 @@ class Request(object):
 
         self.id = headers['id']
         type = self.type = self.name = headers['task']
+        self.root_id = headers.get('root_id')
+        self.parent_id = headers.get('parent_id')
         if 'shadow' in headers:
             self.name = headers['shadow']
         if 'timelimit' in headers:

+ 1 - 0
celery/worker/strategy.py

@@ -101,6 +101,7 @@ def default(task, app, consumer,
                 'task-received',
                 uuid=req.id, name=req.name,
                 args=req.argsrepr, kwargs=req.kwargsrepr,
+                root_id=req.root_id, parent_id=req.parent_id,
                 retries=req.request_dict.get('retries', 0),
                 eta=req.eta and req.eta.isoformat(),
                 expires=req.expires and req.expires.isoformat(),

+ 160 - 17
docs/configuration.rst

@@ -34,6 +34,139 @@ It should contain all you need to run a basic Celery set-up.
 
     task_annotations = {'tasks.add': {'rate_limit': '10/s'}}
 
+
+.. _conf-old-settings-map:
+
+New lowercase settings
+======================
+
+Version 4.0 introduced new lower case settings and setting organization.
+
+The major difference between previous versions, apart from the lower case
+names, are the renaming of some prefixes, like ``celerybeat_`` to ``beat_``,
+``celeryd_`` to ``worker_``, and most of the top level ``celery_`` settings
+have been moved into a new  ``task_`` prefix.
+
+Celery will still be able to read old configuration files, so there is no
+rush in moving to the new settings format.
+
+=====================================  ==============================================
+**Setting name**                       **Replace with**
+=====================================  ==============================================
+``CELERY_ACCEPT_CONTENT``              :setting:`accept_content`
+``ADMINS``                             :setting:`admins`
+``CELERY_ENABLE_UTC``                  :setting:`enable_utc`
+``CELERY_IMPORTS``                     :setting:`imports`
+``CELERY_INCLUDE``                     :setting:`include`
+``SERVER_EMAIL``                       :setting:`server_email`
+``CELERY_TIMEZONE``                    :setting:`timezone`
+``CELERYBEAT_MAX_LOOP_INTERVAL``       :setting:`beat_max_loop_interval`
+``CELERYBEAT_SCHEDULE``                :setting:`beat_schedule`
+``CELERYBEAT_SCHEDULER``               :setting:`beat_scheduler`
+``CELERYBEAT_SCHEDULE_FILENAME``       :setting:`beat_schedule_filename`
+``CELERYBEAT_SYNC_EVERY``              :setting:`beat_sync_every`
+``BROKER_URL``                         :setting:`broker_url`
+``BROKER_TRANSPORT``                   :setting:`broker_transport`
+``BROKER_TRANSPORT_OPTIONS``           :setting:`broker_transport_options`
+``BROKER_CONNECTION_TIMEOUT``          :setting:`broker_connection_timeout`
+``BROKER_CONNECTION_RETRY``            :setting:`broker_connection_retry`
+``BROKER_CONNECTION_MAX_RETRIES``      :setting:`broker_connection_max_retries`
+``BROKER_FAILOVER_STRATEGY``           :setting:`broker_failover_strategy`
+``BROKER_HEARTBEAT``                   :setting:`broker_heartbeat`
+``BROKER_LOGIN_METHOD``                :setting:`broker_login_method`
+``BROKER_POOL_LIMIT``                  :setting:`broker_pool_limit`
+``BROKER_USE_SSL``                     :setting:`broker_use_ssl`
+``CELERY_CACHE_BACKEND``               :setting:`cache_backend`
+``CELERY_CACHE_BACKEND_OPTIONS``       :setting:`cache_backend_options`
+``CASSANDRA_COLUMN_FAMILY``            :setting:`cassandra_table`
+``CASSANDRA_ENTRY_TTL``                :setting:`cassandra_entry_ttl`
+``CASSANDRA_KEYSPACE``                 :setting:`cassandra_keyspace`
+``CASSANDRA_PORT``                     :setting:`cassandra_port`
+``CASSANDRA_READ_CONSISTENCY``         :setting:`cassandra_read_consistency`
+``CASSANDRA_SERVERS``                  :setting:`cassandra_servers`
+``CASSANDRA_WRITE_CONSISTENCY``        :setting:`cassandra_write_consistency`
+``CELERY_COUCHBASE_BACKEND_SETTINGS``  :setting:`couchbase_backend_settings`
+``EMAIL_HOST``                         :setting:`email_host`
+``EMAIL_HOST_USER``                    :setting:`email_host_user`
+``EMAIL_HOST_PASSWORD``                :setting:`email_host_password`
+``EMAIL_PORT``                         :setting:`email_port`
+``EMAIL_TIMEOUT``                      :setting:`email_timeout`
+``EMAIL_USE_SSL``                      :setting:`email_use_ssl`
+``EMAIL_USE_TLS``                      :setting:`email_use_tls`
+``CELERY_MONGODB_BACKEND_SETTINGS``    :setting:`mongodb_backend_settings`
+``CELERY_EVENT_QUEUE_EXPIRES``         :setting:`event_queue_expires`
+``CELERY_EVENT_QUEUE_TTL``             :setting:`event_queue_ttl`
+``CELERY_EVENT_SERIALIZER``            :setting:`event_serializer`
+``CELERY_REDIS_DB``                    :setting:`redis_db`
+``CELERY_REDIS_HOST``                  :setting:`redis_host`
+``CELERY_REDIS_MAX_CONNECTIONS``       :setting:`redis_max_connections`
+``CELERY_REDIS_PASSWORD``              :setting:`redis_password`
+``CELERY_REDIS_PORT``                  :setting:`redis_port`
+``CELERY_RESULT_BACKEND``              :setting:`result_backend`
+``CELERY_MAX_CACHED_RESULTS``          :setting:`result_cache_max`
+``CELERY_MESSAGE_COMPRESSION``         :setting:`result_compression`
+``CELERY_RESULT_EXCHANGE``             :setting:`result_exchange`
+``CELERY_RESULT_EXCHANGE_TYPE``        :setting:`result_exchange_type`
+``CELERY_TASK_RESULT_EXPIRES``         :setting:`result_expires`
+``CELERY_RESULT_PERSISTENT``           :setting:`result_persistent`
+``CELERY_RESULT_SERIALIZER``           :setting:`result_serializer`
+``CELERY_RESULT_DBURI``                :setting:`sqlalchemy_dburi`
+``CELERY_RESULT_ENGINE_OPTIONS``       :setting:`sqlalchemy_engine_options`
+``-*-_DB_SHORT_LIVED_SESSIONS``        :setting:`sqlalchemy_short_lived_sessions`
+``CELERY_RESULT_DB_TABLE_NAMES``       :setting:`sqlalchemy_db_names`
+``CELERY_SECURITY_CERTIFICATE``        :setting:`security_certificate`
+``CELERY_SECURITY_CERT_STORE``         :setting:`security_cert_store`
+``CELERY_SECURITY_KEY``                :setting:`security_key`
+``CELERY_ACKS_LATE``                   :setting:`task_acks_late`
+``CELERY_ALWAYS_EAGER``                :setting:`task_always_eager`
+``CELERY_ANNOTATIONS``                 :setting:`task_annotations`
+``CELERY_MESSAGE_COMPRESSION``         :setting:`task_compression`
+``CELERY_CREATE_MISSING_QUEUES``       :setting:`task_create_missing_queues`
+``CELERY_DEFAULT_DELIVERY_MODE``       :setting:`task_default_delivery_mode`
+``CELERY_DEFAULT_EXCHANGE``            :setting:`task_default_exchange`
+``CELERY_DEFAULT_EXCHANGE_TYPE``       :setting:`task_default_exchange_type`
+``CELERY_DEFAULT_QUEUE``               :setting:`task_default_queue`
+``CELERY_DEFAULT_RATE_LIMIT``          :setting:`task_default_rate_limit`
+``CELERY_DEFAULT_ROUTING_KEY``         :setting:`task_default_routing_key`
+``-"-_EAGER_PROPAGATES_EXCEPTIONS``    :setting:`task_eager_propagates`
+``CELERY_IGNORE_RESULT``               :setting:`task_ignore_result`
+``CELERY_TASK_PUBLISH_RETRY``          :setting:`task_publish_retry`
+``CELERY_TASK_PUBLISH_RETRY_POLICY``   :setting:`task_publish_retry_policy`
+``CELERY_QUEUES``                      :setting:`task_queues`
+``CELERY_ROUTES``                      :setting:`task_routes`
+``CELERY_SEND_TASK_ERROR_EMAILS``      :setting:`task_send_error_emails`
+``CELERY_SEND_TASK_SENT_EVENT``        :setting:`task_send_sent_event`
+``CELERY_TASK_SERIALIZER``             :setting:`task_serializer`
+``CELERYD_TASK_SOFT_TIME_LIMIT``       :setting:`task_soft_time_limit`
+``CELERYD_TASK_TIME_LIMIT``            :setting:`task_time_limit`
+``CELERY_TRACK_STARTED``               :setting:`task_track_started`
+``CELERYD_AGENT``                      :setting:`worker_agent`
+``CELERYD_AUTOSCALER``                 :setting:`worker_autoscaler`
+``CELERYD_AUTORELAODER``               :setting:`worker_autoreloader`
+``CELERYD_CONCURRENCY``                :setting:`worker_concurrency`
+``CELERYD_CONSUMER``                   :setting:`worker_consumer`
+``CELERY_WORKER_DIRECT``               :setting:`worker_direct`
+``CELERY_DISABLE_RATE_LIMITS``         :setting:`worker_disable_rate_limits`
+``CELERY_ENABLE_REMOTE_CONTROL``       :setting:`worker_enable_remote_control`
+``CELERYD_FORCE_EXECV``                :setting:`worker_force_execv`
+``CELERYD_HIJACK_ROOT_LOGGER``         :setting:`worker_hijack_root_logger`
+``CELERYD_LOG_COLOR``                  :setting:`worker_log_color`
+``CELERYD_LOG_FORMAT``                 :setting:`worker_log_format`
+``CELERYD_WORKER_LOST_WAIT``           :setting:`worker_lost_wait`
+``CELERYD_MAX_TASKS_PER_CHILD``        :setting:`worker_max_tasks_per_child`
+``CELERYD_POOL``                       :setting:`worker_pool`
+``CELERYD_POOL_PUTLOCKS``              :setting:`worker_pool_putlocks`
+``CELERYD_POOL_RESTARTS``              :setting:`worker_pool_restarts`
+``CELERYD_PREFETCH_MULTIPLIER``        :setting:`worker_prefetch_multiplier`
+``CELERYD_REDIRECT_STDOUTS``           :setting:`worker_redirect_stdouts`
+``CELERYD_REDIRECT_STDOUTS_LEVEL``     :setting:`worker_redirect_stdouts_level`
+``CELERYD_SEND_EVENTS``                :setting:`worker_send_task_events`
+``CELERYD_STATE_DB``                   :setting:`worker_state_db`
+``CELERYD_TASK_LOG_FORMAT``            :setting:`worker_task_log_format`
+``CELERYD_TIMER``                      :setting:`worker_timer`
+``CELERYD_TIMER_PRECISION``            :setting:`worker_timer_precision`
+=====================================  ==============================================
+
 Configuration Directives
 ========================
 
@@ -223,10 +356,10 @@ is already evaluated.
 That is, tasks will be executed locally instead of being sent to
 the queue.
 
-.. setting:: task_eager_propagates_exceptions
+.. setting:: task_eager_propagates
 
-task_eager_propagates_exceptions
-~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+task_eager_propagates
+~~~~~~~~~~~~~~~~~~~~~
 
 If this is :const:`True`, eagerly executed tasks (applied by `task.apply()`,
 or when the :setting:`task_always_eager` setting is enabled), will
@@ -374,9 +507,9 @@ Can be one of the following:
     Use `MongoDB`_ to store the results.
     See :ref:`conf-mongodb-result-backend`.
 
-* new_cassandra
-    Use `Cassandra`_ to store the results, using newer database driver than _cassandra_.
-    See :ref:`conf-new_cassandra-result-backend`.
+* cassandra
+    Use `Cassandra`_ to store the results.
+    See :ref:`conf-cassandra-result-backend`.
 
 * ironcache
     Use `IronCache`_ to store the results.
@@ -692,6 +825,16 @@ redis_max_connections
 Maximum number of connections available in the Redis connection
 pool used for sending and retrieving results.
 
+.. setting:: redis_socket_timeout
+
+redis_socket_timeout
+~~~~~~~~~~~~~~~~~~~~
+
+Socket timeout for connections to Redis from the result backend
+in seconds (int/float)
+
+Default is 5 seconds.
+
 .. _conf-mongodb-result-backend:
 
 MongoDB backend settings
@@ -742,10 +885,10 @@ Example configuration
         'taskmeta_collection': 'my_taskmeta_collection',
     }
 
-.. _conf-new_cassandra-result-backend:
+.. _conf-cassandra-result-backend:
 
-new_cassandra backend settings
-------------------------------
+cassandra backend settings
+--------------------------
 
 .. note::
 
@@ -786,14 +929,14 @@ The keyspace in which to store the results. e.g.::
 
     cassandra_keyspace = 'tasks_keyspace'
 
-.. setting:: cassandra_column_family
+.. setting:: cassandra_table
 
-cassandra_column_family
-~~~~~~~~~~~~~~~~~~~~~~~
+cassandra_table
+~~~~~~~~~~~~~~~
 
 The table (column family) in which to store the results. e.g.::
 
-    cassandra_column_family = 'tasks'
+    cassandra_table = 'tasks'
 
 .. setting:: cassandra_read_consistency
 
@@ -826,7 +969,7 @@ Example configuration
 
     cassandra_servers = ['localhost']
     cassandra_keyspace = 'celery'
-    cassandra_column_family = 'task_results'
+    cassandra_table = 'tasks'
     cassandra_read_consistency = 'ONE'
     cassandra_write_consistency = 'ONE'
     cassandra_entry_ttl = 86400
@@ -1775,10 +1918,10 @@ george@vandelay.com and kramer@vandelay.com:
 Events
 ------
 
-.. setting:: worker_send_events
+.. setting:: worker_send_task_events
 
-worker_send_events
-~~~~~~~~~~~~~~~~~~
+worker_send_task_events
+~~~~~~~~~~~~~~~~~~~~~~~
 
 Send task-related events so that tasks can be monitored using tools like
 `flower`.  Sets the default value for the workers :option:`-E` argument.

BIN=BIN
docs/images/worker_graph_full.png


+ 0 - 3
docs/includes/installation.txt

@@ -78,9 +78,6 @@ Transports and Backends
     for using memcached as a result backend.
 
 :celery[cassandra]:
-    for using Apache Cassandra as a result backend with pycassa driver.
-
-:celery[new_cassandra]:
     for using Apache Cassandra as a result backend with DataStax driver.
 
 :celery[couchdb]:

+ 1 - 1
docs/includes/introduction.txt

@@ -1,4 +1,4 @@
-:Version: 4.0.0b1 (0today8)
+:Version: 4.0.0rc1 (0today8)
 :Web: http://celeryproject.org/
 :Download: http://pypi.python.org/pypi/celery/
 :Source: http://github.com/celery/celery/

+ 0 - 100
docs/internals/deprecation.rst

@@ -7,106 +7,6 @@
 .. contents::
     :local:
 
-.. _deprecations-v4.0:
-
-Removals for version 4.0
-========================
-
-- Module ``celery.task.trace`` has been renamed to ``celery.app.trace``
-  as the ``celery.task`` package is being phased out.  The compat module
-  will be removed in version 4.0 so please change any import from::
-
-    from celery.task.trace import …
-
-  to::
-
-    from celery.app.trace import …
-
-- ``AsyncResult.serializable()`` and ``celery.result.from_serializable``
-  will be removed.
-
-    Use instead::
-
-        >>> tup = result.as_tuple()
-        >>> from celery.result import result_from_tuple
-        >>> result = result_from_tuple(tup)
-
-TaskSet
-~~~~~~~
-
-TaskSet has been renamed to group and TaskSet will be removed in version 4.0.
-
-Old::
-
-    >>> from celery.task import TaskSet
-
-    >>> TaskSet(add.subtask((i, i)) for i in xrange(10)).apply_async()
-
-New::
-
-    >>> from celery import group
-    >>> group(add.s(i, i) for i in xrange(10))()
-
-
-Magic keyword arguments
-~~~~~~~~~~~~~~~~~~~~~~~
-
-The magic keyword arguments accepted by tasks will be removed
-in 4.0, so you should start rewriting any tasks
-using the ``celery.decorators`` module and depending
-on keyword arguments being passed to the task,
-for example::
-
-    from celery.decorators import task
-
-    @task()
-    def add(x, y, task_id=None):
-        print("My task id is %r" % (task_id,))
-
-should be rewritten into::
-
-    from celery import task
-
-    @task(bind=True)
-    def add(self, x, y):
-        print("My task id is {0.request.id}".format(self))
-
-:mod:`celery.result`
---------------------
-
-- ``BaseAsyncResult`` -> ``AsyncResult``.
-
-- ``TaskSetResult`` -> ``GroupResult``.
-
-- ``TaskSetResult.total`` -> ``len(GroupResult)``
-
-- ``TaskSetResult.taskset_id`` -> ``GroupResult.id``
-
-:mod:`celery.loader`
---------------------
-
-- ``current_loader()`` -> ``current_app.loader``
-
-- ``load_settings()`` -> ``current_app.conf``
-
-
-Settings
---------
-
-Logging Settings
-~~~~~~~~~~~~~~~~
-
-=====================================  =====================================
-**Setting name**                       **Replace with**
-=====================================  =====================================
-``CELERYD_LOG_LEVEL``                  :option:`--loglevel`
-``CELERYD_LOG_FILE``                   :option:`--logfile``
-``CELERYBEAT_LOG_LEVEL``               :option:`--loglevel`
-``CELERYBEAT_LOG_FILE``                :option:`--loglevel``
-``CELERYMON_LOG_LEVEL``                :option:`--loglevel`
-``CELERYMON_LOG_FILE``                 :option:`--loglevel``
-=====================================  =====================================
-
 .. _deprecations-v5.0:
 
 Removals for version 5.0

+ 0 - 11
docs/internals/reference/celery.backends.new_cassandra.rst

@@ -1,11 +0,0 @@
-================================================
- celery.backends.new_cassandra
-================================================
-
-.. contents::
-    :local:
-.. currentmodule:: celery.backends.new_cassandra
-
-.. automodule:: celery.backends.new_cassandra
-    :members:
-    :undoc-members:

+ 0 - 1
docs/internals/reference/index.rst

@@ -32,7 +32,6 @@
     celery.backends.mongodb
     celery.backends.redis
     celery.backends.riak
-    celery.backends.new_cassandra
     celery.backends.cassandra
     celery.backends.couchbase
     celery.app.trace

+ 1 - 1
docs/userguide/extending.rst

@@ -560,7 +560,7 @@ information about the boot process::
     [2013-05-29 16:18:20,511: DEBUG/MainProcess] | Worker: Building graph...
     <celery.apps.worker.Worker object at 0x101ad8410> is in init
     [2013-05-29 16:18:20,511: DEBUG/MainProcess] | Worker: New boot order:
-        {Hub, Queues (intra), Pool, Autoreloader, Timer, StateDB,
+        {Hub, Pool, Autoreloader, Timer, StateDB,
          Autoscaler, InfoStep, Beat, Consumer}
     [2013-05-29 16:18:20,514: DEBUG/MainProcess] | Consumer: Preparing bootsteps.
     [2013-05-29 16:18:20,514: DEBUG/MainProcess] | Consumer: Building graph...

+ 2 - 2
docs/userguide/monitoring.rst

@@ -650,7 +650,7 @@ task-sent
 ~~~~~~~~~
 
 :signature: ``task-sent(uuid, name, args, kwargs, retries, eta, expires,
-              queue, exchange, routing_key)``
+              queue, exchange, routing_key, root_id, parent_id)``
 
 Sent when a task message is published and
 the :setting:`task_send_sent_event` setting is enabled.
@@ -661,7 +661,7 @@ task-received
 ~~~~~~~~~~~~~
 
 :signature: ``task-received(uuid, name, args, kwargs, retries, eta, hostname,
-              timestamp)``
+              timestamp, root_id, parent_id)``
 
 Sent when the worker receives a task.
 

+ 47 - 0
docs/userguide/signals.rst

@@ -300,6 +300,53 @@ Provides arguments:
 * expired
   Set to :const:`True` if the task expired.
 
+.. signal:: task_unknown
+
+task_unknown
+~~~~~~~~~~~~
+
+Dispatched when a worker receives a message for a task that is not registered.
+
+Sender is the worker :class:`~celery.worker.consumer.Consumer`.
+
+Provides arguments:
+
+* name
+
+  Name of task not found in registry.
+
+* id
+
+  The task id found in the message.
+
+* message
+
+    Raw message object.
+
+* exc
+
+    The error that occurred.
+
+.. signal:: task_rejected
+
+task_rejected
+~~~~~~~~~~~~~
+
+Dispatched when a worker receives an unknown type of message to one of its
+task queues.
+
+Sender is the worker :class:`~celery.worker.consumer.Consumer`.
+
+Provides arguments:
+
+* message
+
+  Raw message object.
+
+* exc
+
+    The error that occurred (if any).
+
 App Signals
 -----------
 

+ 558 - 34
docs/whatsnew-4.0.rst

@@ -1,7 +1,7 @@
 .. _whatsnew-4.0:
 
 ===========================================
- What's new in Celery 4.0 (TBA)
+ What's new in Celery 4.0 (0Today8)
 ===========================================
 :Author: Ask Solem (ask at celeryproject.org)
 
@@ -55,21 +55,135 @@ Dropped support for Python 2.6
 
 Celery now requires Python 2.7 or later.
 
+Dropped support for Python 3.3
+------------------------------
+
+Celery now requires Python3 3.4 or later.
+
 JSON is now the default serializer
 ----------------------------------
 
-Using one logfile per process by default
-----------------------------------------
-
 The Task base class no longer automatically register tasks
 ----------------------------------------------------------
 
 The metaclass has been removed blah blah
 
-
 Arguments now verified when calling a task
 ------------------------------------------
 
+Redis Events not backward compatible
+------------------------------------
+
+The Redis ``fanout_patterns`` and ``fanout_prefix`` transport
+options are now enabled by default, which means that workers
+running 4.0 cannot see workers running 3.1 and vice versa.
+
+They should still execute tasks as normally, so this is only
+related to monitoring events.
+
+To avoid this situation you can reconfigure the 3.1 workers (and clients)
+to enable these settings before you mix them with workers and clients
+running 4.x:
+
+.. code-block:: python
+
+    BROKER_TRANSPORT_OPTIONS = {
+        'fanout_patterns': True,
+        'fanout_prefix': True,
+    }
+
+Lowercase setting names
+-----------------------
+
+In the pursuit of beauty all settings have been renamed to be in all
+lowercase, in a consistent naming scheme.
+
+This change is fully backwards compatible so you can still use the uppercase
+setting names.
+
+The loader will try to detect if your configuration is using the new format,
+and act accordingly, but this also means that you are not allowed to mix and
+match new and old setting names, that is unless you provide a value for both
+alternatives.
+
+The major difference between previous versions, apart from the lower case
+names, are the renaming of some prefixes, like ``celerybeat_`` to ``beat_``,
+``celeryd_`` to ``worker_``.
+
+The ``celery_`` prefix has also been removed, and task related settings
+from this namespace is now prefixed by ``task_``, worker related settings
+with ``worker_``.
+
+Apart from this most of the settings will be the same in lowercase, apart from
+a few special ones:
+
+=====================================  ==========================================================
+**Setting name**                       **Replace with**
+=====================================  ==========================================================
+``CELERY_MAX_CACHED_RESULTS``          :setting:`result_cache_max`
+``CELERY_MESSAGE_COMPRESSION``         :setting:`result_compression`/:setting:`task_compression`.
+``CELERY_TASK_RESULT_EXPIRES``         :setting:`result_expires`
+``CELERY_RESULT_DBURI``                :setting:`sqlalchemy_dburi`
+``CELERY_RESULT_ENGINE_OPTIONS``       :setting:`sqlalchemy_engine_options`
+``-*-_DB_SHORT_LIVED_SESSIONS``        :setting:`sqlalchemy_short_lived_sessions`
+``CELERY_RESULT_DB_TABLE_NAMES``       :setting:`sqlalchemy_db_names`
+``CELERY_ACKS_LATE``                   :setting:`task_acks_late`
+``CELERY_ALWAYS_EAGER``                :setting:`task_always_eager`
+``CELERY_ANNOTATIONS``                 :setting:`task_annotations`
+``CELERY_MESSAGE_COMPRESSION``         :setting:`task_compression`
+``CELERY_CREATE_MISSING_QUEUES``       :setting:`task_create_missing_queues`
+``CELERY_DEFAULT_DELIVERY_MODE``       :setting:`task_default_delivery_mode`
+``CELERY_DEFAULT_EXCHANGE``            :setting:`task_default_exchange`
+``CELERY_DEFAULT_EXCHANGE_TYPE``       :setting:`task_default_exchange_type`
+``CELERY_DEFAULT_QUEUE``               :setting:`task_default_queue`
+``CELERY_DEFAULT_RATE_LIMIT``          :setting:`task_default_rate_limit`
+``CELERY_DEFAULT_ROUTING_KEY``         :setting:`task_default_routing_key`
+``-"-_EAGER_PROPAGATES_EXCEPTIONS``    :setting:`task_eager_propagates`
+``CELERY_IGNORE_RESULT``               :setting:`task_ignore_result`
+``CELERY_TASK_PUBLISH_RETRY``          :setting:`task_publish_retry`
+``CELERY_TASK_PUBLISH_RETRY_POLICY``   :setting:`task_publish_retry_policy`
+``CELERY_QUEUES``                      :setting:`task_queues`
+``CELERY_ROUTES``                      :setting:`task_routes`
+``CELERY_SEND_TASK_ERROR_EMAILS``      :setting:`task_send_error_emails`
+``CELERY_SEND_TASK_SENT_EVENT``        :setting:`task_send_sent_event`
+``CELERY_TASK_SERIALIZER``             :setting:`task_serializer`
+``CELERYD_TASK_SOFT_TIME_LIMIT``       :setting:`task_soft_time_limit`
+``CELERYD_TASK_TIME_LIMIT``            :setting:`task_time_limit`
+``CELERY_TRACK_STARTED``               :setting:`task_track_started`
+``CELERY_DISABLE_RATE_LIMITS``         :setting:`worker_disable_rate_limits`
+``CELERY_ENABLE_REMOTE_CONTROL``       :setting:`worker_enable_remote_control`
+``CELERYD_SEND_EVENTS``                :setting:`worker_send_task_events`
+=====================================  ==========================================================
+
+You can see a full table of the changes in :ref:`conf-old-settings-map`.
+
+Django: Autodiscover no longer takes arguments.
+-----------------------------------------------
+
+Celery's Django support will instead automatically find your installed apps,
+which means app configurations will work.
+
+# e436454d02dcbba4f4410868ad109c54047c2c15
+
+Old command-line programs removed
+---------------------------------
+
+Installing Celery will no longer install the ``celeryd``,
+``celerybeat`` and ``celeryd-multi`` programs.
+
+This was announced with the release of Celery 3.1, but you may still
+have scripts pointing to the old names, so make sure you update them
+to use the new umbrella command.
+
++-------------------+--------------+-------------------------------------+
+| Program           | New Status   | Replacement                         |
++===================+==============+=====================================+
+| ``celeryd``       | **REMOVED**  | :program:`celery worker`            |
++-------------------+--------------+-------------------------------------+
+| ``celerybeat``    | **REMOVED**  | :program:`celery beat`              |
++-------------------+--------------+-------------------------------------+
+| ``celeryd-multi`` | **REMOVED**  | :program:`celery multi`             |
++-------------------+--------------+-------------------------------------+
 
 .. _v320-news:
 
@@ -79,6 +193,8 @@ News
 New Task Message Protocol
 =========================
 
+# e71652d384b1b5df2a4e6145df9f0efb456bc71c
+
 
 ``TaskProducer`` replaced by ``app.amqp.create_task_message`` and
 ``app.amqp.send_task_message``.
@@ -86,10 +202,109 @@ New Task Message Protocol
 - Worker stores results for internal errors like ``ContentDisallowed``, and
   exceptions occurring outside of the task function.
 
+- Worker stores results and sends monitoring events for unknown task names
+
+- shadow
+
+- argsrepr
+
+- Support for very long chains
+
+- parent_id / root_id
+
+
+Prefork: Tasks now log from the child process
+=============================================
+
+Logging of task success/failure now happens from the child process
+actually executing the task, which means that logging utilities
+like Sentry can get full information about tasks that fail, including
+variables in the traceback.
+
+Prefork: One logfile per child process
+======================================
+
+Init scrips and :program:`celery multi` now uses the `%I` logfile format
+option (e.g. :file:`/var/log/celery/%n%I.log`) to ensure each child
+process has a separate log file to avoid race conditions.
+
+You are encouraged to upgrade your init scripts and multi arguments
+to do so also.
 
 Canvas Refactor
 ===============
 
+# BLALBLABLA
+d79dcd8e82c5e41f39abd07ffed81ca58052bcd2
+1e9dd26592eb2b93f1cb16deb771cfc65ab79612
+e442df61b2ff1fe855881c1e2ff9acc970090f54
+0673da5c09ac22bdd49ba811c470b73a036ee776
+
+- Now unrolls groups within groups into a single group (Issue #1509).
+- chunks/map/starmap tasks now routes based on the target task
+- chords and chains can now be immutable.
+- Fixed bug where serialized signature were not converted back into
+  signatures (Issue #2078)
+
+    Fix contributed by Ross Deane.
+
+- Fixed problem where chains and groups did not work when using JSON
+  serialization (Issue #2076).
+
+    Fix contributed by Ross Deane.
+
+- Creating a chord no longer results in multiple values for keyword
+  argument 'task_id'" (Issue #2225).
+
+    Fix contributed by Aneil Mallavarapu
+
+- Fixed issue where the wrong result is returned when a chain
+  contains a chord as the penultimate task.
+
+    Fix contributed by Aneil Mallavarapu
+
+- Special case of ``group(A.s() | group(B.s() | C.s()))`` now works.
+
+- Chain: Fixed bug with incorrect id set when a subtask is also a chain.
+
+Schedule tasks based on sunrise, sunset, dawn and dusk.
+=======================================================
+
+See :ref:`beat-solar` for more information.
+
+Contributed by Mark Parncutt.
+
+App can now configure periodic tasks
+====================================
+
+# bc18d0859c1570f5eb59f5a969d1d32c63af764b
+# 132d8d94d38f4050db876f56a841d5a5e487b25b
+
+RabbitMQ Priority queue support
+===============================
+
+# 1d4cbbcc921aa34975bde4b503b8df9c2f1816e0
+
+Contributed by Gerald Manipon.
+
+Prefork: Limits for child process resident memory size.
+=======================================================
+
+This version introduces the new :setting:`worker_max_memory_per_child` setting,
+which BLA BLA BLA
+
+# 5cae0e754128750a893524dcba4ae030c414de33
+
+Contributed by Dave Smith.
+
+Redis: New optimized chord join implementation.
+===============================================
+
+This was an experimental feature introduced in Celery 3.1,
+but is now enabled by default.
+
+?new_join BLABLABLA
+
 Riak Result Backend
 ===================
 
@@ -99,44 +314,71 @@ Bla bla
 
 - blah blah
 
+CouchDB Result Backend
+======================
+
+Contributed by Nathan Van Gheem
+
 New Cassandra Backend
 =====================
-New Cassandra backend will be called new_cassandra and utilize python-driver.
-Old backend is now deprecated.
+
+The new Cassandra backend utilizes the python-driver library.
+Old backend is deprecated and everyone using cassandra is required to upgrade
+to be using the new driver.
+
+# XXX What changed?
 
 
 Event Batching
 ==============
 
-Events are now buffered in the worker and sent as a list
+Events are now buffered in the worker and sent as a list, and
+events are sent as transient messages by default so that they are not written
+to disk by RabbitMQ.
+
+03399b4d7c26fb593e61acf34f111b66b340ba4e
 
 
 Task.replace
 ============
- Task.replace changed, removes Task.replace_in_chord.
 
-    The two methods had almost the same functionality, but the old Task.replace
-    would force the new task to inherit the callbacks/errbacks of the existing
-    task.
+Task.replace changed, removes Task.replace_in_chord.
 
-    If you replace a node in a tree, then you would not expect the new node to
-    inherit the children of the old node, so this seems like unexpected
-    behavior.
+The two methods had almost the same functionality, but the old Task.replace
+would force the new task to inherit the callbacks/errbacks of the existing
+task.
 
-    So self.replace(sig) now works for any task, in addition sig can now
-    be a group.
+If you replace a node in a tree, then you would not expect the new node to
+inherit the children of the old node, so this seems like unexpected
+behavior.
 
-    Groups are automatically converted to a chord, where the callback
-    will "accumulate" the results of the group tasks.
+So self.replace(sig) now works for any task, in addition sig can now
+be a group.
 
-    A new builtin task (`celery.accumulate` was added for this purpose)
+Groups are automatically converted to a chord, where the callback
+will "accumulate" the results of the group tasks.
 
-    Closes #817
+A new builtin task (`celery.accumulate` was added for this purpose)
+
+Closes #817
 
 
 Optimized Beat implementation
 =============================
 
+heapq
+20340d79b55137643d5ac0df063614075385daaa
+
+Contributed by Ask Solem and Alexander Koshelev.
+
+
+Task Autoretry Decorator
+========================
+
+75246714dd11e6c463b9dc67f4311690643bff24
+
+Contributed by Dmitry Malinovsky.
+
 In Other News
 -------------
 
@@ -151,21 +393,182 @@ In Other News
 - **Programs**: ``%n`` format for :program:`celery multi` is now synonym with
   ``%N`` to be consistent with :program:`celery worker`.
 
-- **Programs**: celery inspect/control now supports --json argument
+- **Programs**: celery inspect/control now supports ``--json`` argument to
+  give output in json format.
+
+- **Programs**: :program:`celery inspect registered` now ignores built-in
+  tasks.
+
+- **Programs**: New :program:`celery logtool`: Utility for filtering and parsing
+  celery worker logfiles
+
+- **Redis Transport**: The Redis transport now supports the
+  :setting:`broker_use_ssl` option.
 
-- **Programs**: :program:`celery logtool`: Utility for filtering and parsing celery worker logfiles
+- **Worker**: Worker now only starts the remote control command consumer if the
+  broker transport used actually supports them.
 
 - **Worker**: Gossip now sets ``x-message-ttl`` for event queue to heartbeat_interval s.
-  (Iss ue #2005).
+  (Issue #2005).
+
+- **Worker**: Now preserves exit code (Issue #2024).
+
+- **Worker**: Loglevel for unrecoverable errors changed from ``error`` to
+  ``critical``.
+
+- **Worker**: Improved rate limiting accuracy.
+
+- **Worker**: Account for missing timezone information in task expires field.
+
+    Fix contributed by Albert Wang.
 
-- **App**: New signals
+- **Worker**: The worker no longer has a ``Queues`` bootsteps, as it is now
+    superfluous.
+
+- **Tasks**: New :setting:`task_reject_on_worker_lost` setting, and
+  :attr:`~@Task.reject_on_worker_lost` task attribute decides what happens
+  when the child worker process executing a late ack task is terminated.
+
+    Contributed by Michael Permana.
+
+- **App**: New signals for app configuration/finalization:
 
     - :data:`app.on_configure <@on_configure>`
     - :data:`app.on_after_configure <@on_after_configure>`
     - :data:`app.on_after_finalize <@on_after_finalize>`
 
+- **Task**: New task signals for rejected task messages:
+
+    - :data:`celery.signals.task_rejected`.
+    - :data:`celery.signals.task_unknown`.
+
+- **Events**: Event messages now uses the RabbitMQ ``x-message-ttl`` option
+    to ensure older event messages are discarded.
+
+    The default is 5 seconds, but can be changed using the
+    :setting:`event_queue_ttl` setting.
+
+- **Events**: Event monitors now sets the :setting:`event_queue_expires`
+  setting by default.
+
+    The queues will now expire after 60 seconds after the monitor stops
+    consuming from it.
+
 - **Canvas**: ``chunks``/``map``/``starmap`` are now routed based on the target task.
 
+- **Canvas**: ``Signature.link`` now works when argument is scalar (not a list)
+    (Issue #2019).
+
+- **App**: The application can now change how task names are generated using
+    the :meth:`~@gen_task_name` method.
+
+    Contributed by Dmitry Malinovsky.
+
+- **Tasks**: ``Task.subtask`` renamed to ``Task.signature`` with alias.
+
+- **Tasks**: ``Task.subtask_from_request`` renamed to
+  ``Task.signature_from_request`` with alias.
+
+- **Tasks**: The ``delivery_mode`` attribute for :class:`kombu.Queue` is now
+  respected (Issue #1953).
+
+- **Tasks**: Routes in :setting:`task-routes` can now specify a
+  :class:`~kombu.Queue` instance directly.
+
+    Example:
+
+    .. code-block:: python
+
+        task_routes = {'proj.tasks.add': {'queue': Queue('add')}}
+
+- **Tasks**: ``AsyncResult`` now raises :exc:`ValueError` if task_id is None.
+  (Issue #1996).
+
+- **Tasks**: ``result.get()`` now supports an ``on_message`` argument to set a
+  callback to be called for every message received.
+
+- **Tasks**: New abstract classes added:
+
+    - :class:`~celery.utils.abstract.CallableTask`
+
+        Looks like a task.
+
+    - :class:`~celery.utils.abstract.CallableSignature`
+
+        Looks like a task signature.
+
+- **Programs**: :program:`celery multi` now passes through `%i` and `%I` log
+  file formats.
+
+- **Programs**: A new command line option :option:``--executable`` is now
+  available for daemonizing programs.
+
+    Contributed by Bert Vanderbauwhede.
+
+- **Programs**: :program:`celery worker` supports new
+  :option:`--prefetch-multiplier` option.
+
+    Contributed by Mickaël Penhard.
+
+- **Prefork**: Prefork pool now uses ``poll`` instead of ``select`` where
+  available (Issue #2373).
+
+- **Tasks**: New :setting:`email_charset` setting allows for changing
+  the charset used for outgoing error emails.
+
+    Contributed by Vladimir Gorbunov.
+
+- **Worker**: Now respects :setting:`broker_connection_retry` setting.
+
+    Fix contributed by Nat Williams.
+
+- **Worker**: Autoscale did not always update keepalive when scaling down.
+
+    Fix contributed by Philip Garnero.
+
+- **General**: Dates are now always timezone aware even if
+  :setting:`enable_utc` is disabled (Issue #943).
+
+    Fix contributed by Omer Katz.
+
+- **Result Backends**: The redis result backend now has a default socket
+   timeout of 5 seconds.
+
+    The default can be changed using the new :setting:`redis_socket_timeout`
+    setting.
+
+    Contributed by Raghuram Srinivasan.
+
+- **Result Backends**: RPC Backend result queues are now auto delete by
+  default (Issue #2001).
+
+- **Result Backends**: MongoDB now supports setting the
+  :setting:`result_serialzier` setting to ``bson`` to use the MongoDB
+  libraries own serializer.
+
+    Contributed by Davide Quarta.
+
+- **Result Backends**: SQLAlchemy result backend now ignores all result
+   engine options when using NullPool (Issue #1930).
+
+- **Result Backends**: MongoDB URI handling has been improved to use
+    database name, user and password from the URI if provided.
+
+    Contributed by Samuel Jaillet.
+
+- **Result Backends**: Fix problem with rpc/amqp backends where exception
+    was not deserialized properly with the json serializer (Issue #2518).
+
+    Fix contributed by Allard Hoeve.
+
+- **General**: All Celery exceptions/warnings now inherit from common
+  :class:`~celery.exceptions.CeleryException`/:class:`~celery.exceptions.CeleryWarning`.
+  (Issue #2643).
+
+- **Tasks**: Task retry now also throws in eager mode.
+
+    Fix contributed by Feanil Patel.
+
 - Apps can now define how tasks are named (:meth:`@gen_task_name`).
 
     Contributed by Dmitry Malinovsky
@@ -175,16 +578,143 @@ In Other News
 - Beat: ``Scheduler.Publisher``/``.publisher`` renamed to
   ``.Producer``/``.producer``.
 
+Unscheduled Removals
+====================
+
+- The experimental :mod:`celery.contrib.methods` feature has been removed,
+  as there were far many bugs in the implementation to be useful.
+
+- The CentOS init scripts have been removed.
+
+    These did not really add any features over the generic init scripts,
+    so you are encouraged to use them instead, or something like
+    ``supervisord``.
+
 
 .. _v320-removals:
 
 Scheduled Removals
 ==================
 
-- The module ``celery.task.trace`` has been removed as scheduled for this
-  version.
+Modules
+-------
+
+- Module ``celery.worker.job`` has been renamed to :mod:`celery.worker.request`.
+
+    This was an internal module so should not have any effect.
+    It is now part of the public API so should not change again.
 
-- Magic keyword arguments no longer supported.
+- Module ``celery.task.trace`` has been renamed to ``celery.app.trace``
+  as the ``celery.task`` package is being phased out.  The compat module
+  will be removed in version 4.0 so please change any import from::
+
+    from celery.task.trace import …
+
+  to::
+
+    from celery.app.trace import …
+
+- Old compatibility aliases in the :mod:`celery.loaders` module
+  has been removed.
+
+    - Removed ``celery.loaders.current_loader()``, use: ``current_app.loader``
+
+    - Removed ``celery.loaders.load_settings()``, use: ``current_app.conf``
+
+Result
+------
+
+- ``AsyncResult.serializable()`` and ``celery.result.from_serializable``
+    has been removed:
+
+    Use instead:
+
+    .. code-block:: pycon
+
+        >>> tup = result.as_tuple()
+        >>> from celery.result import result_from_tuple
+        >>> result = result_from_tuple(tup)
+
+- Removed ``BaseAsyncResult``, use ``AsyncResult`` for instance checks
+  instead.
+
+- Removed ``TaskSetResult``, use ``GroupResult`` instead.
+
+    - ``TaskSetResult.total`` -> ``len(GroupResult)``
+
+    - ``TaskSetResult.taskset_id`` -> ``GroupResult.id``
+
+
+TaskSet
+-------
+
+TaskSet has been renamed to group and TaskSet will be removed in version 4.0.
+
+Old::
+
+    >>> from celery.task import TaskSet
+
+    >>> TaskSet(add.subtask((i, i)) for i in xrange(10)).apply_async()
+
+New::
+
+    >>> from celery import group
+    >>> group(add.s(i, i) for i in xrange(10))()
+
+
+Magic keyword arguments
+-----------------------
+
+Support for the very old magic keyword arguments accepted by tasks has finally
+been in 4.0.
+
+If you are still using these you have to rewrite any task still
+using the old ``celery.decorators`` module and depending
+on keyword arguments being passed to the task,
+for example::
+
+    from celery.decorators import task
+
+    @task()
+    def add(x, y, task_id=None):
+        print("My task id is %r" % (task_id,))
+
+should be rewritten into::
+
+    from celery import task
+
+    @task(bind=True)
+    def add(self, x, y):
+        print("My task id is {0.request.id}".format(self))
+
+Settings
+--------
+
+The following settings have been removed, and is no longer supported:
+
+Logging Settings
+~~~~~~~~~~~~~~~~
+
+=====================================  =====================================
+**Setting name**                       **Replace with**
+=====================================  =====================================
+``CELERYD_LOG_LEVEL``                  :option:`--loglevel`
+``CELERYD_LOG_FILE``                   :option:`--logfile``
+``CELERYBEAT_LOG_LEVEL``               :option:`--loglevel`
+``CELERYBEAT_LOG_FILE``                :option:`--loglevel``
+``CELERYMON_LOG_LEVEL``                celerymon is deprecated, use flower.
+``CELERYMON_LOG_FILE``                 celerymon is deprecated, use flower.
+``CELERYMON_LOG_FORMAT``               celerymon is deprecated, use flower.
+=====================================  =====================================
+
+Task Settings
+~~~~~~~~~~~~~~
+
+=====================================  =====================================
+**Setting name**                       **Replace with**
+=====================================  =====================================
+``CELERY_CHORD_PROPAGATES``            N/a
+=====================================  =====================================
 
 .. _v320-deprecations:
 
@@ -198,9 +728,3 @@ See the :ref:`deprecation-timeline`.
 Fixes
 =====
 
-.. _v320-internal:
-
-Internal changes
-================
-
-- Module ``celery.worker.job`` has been renamed to :mod:`celery.worker.request`.

+ 1 - 1
extra/generic-init.d/celeryd

@@ -39,7 +39,7 @@ fi
 
 
 # Can be a runlevel symlink (e.g. S02celeryd)
-if [ -L "$0" ]; then
+if [[ `dirname $0` == /etc/rc*.d ]]; then
     SCRIPT_FILE=$(readlink "$0")
 else
     SCRIPT_FILE="$0"

+ 10 - 0
funtests/stress/stress/app.py

@@ -63,6 +63,16 @@ def add(x, y):
     return x + y
 
 
+@app.task(bind=True)
+def ids(self, i):
+    return (self.request.root_id, self.request.parent_id, i)
+
+
+@app.task(bind=True)
+def collect_ids(self, ids, i):
+    return ids, (self.request.root_id, self.request.parent_id, i)
+
+
 @app.task
 def xsum(x):
     return sum(x)

+ 107 - 4
funtests/stress/stress/suite.py

@@ -10,7 +10,7 @@ from collections import OrderedDict, defaultdict, namedtuple
 from itertools import count
 from time import sleep
 
-from celery import group, VERSION_BANNER
+from celery import VERSION_BANNER, chain, group, uuid
 from celery.exceptions import TimeoutError
 from celery.five import items, monotonic, range, values
 from celery.utils.debug import blockdetection
@@ -18,12 +18,13 @@ from celery.utils.text import pluralize, truncate
 from celery.utils.timeutils import humanize_seconds
 
 from .app import (
-    marker, _marker, add, any_, exiting, kill, sleeping,
+    marker, _marker, add, any_, collect_ids, exiting, ids, kill, sleeping,
     sleeping_ignore_limits, any_returning, print_unicode,
 )
 from .data import BIG, SMALL
 from .fbi import FBI
 
+
 BANNER = """\
 Celery stress-suite v{version}
 
@@ -50,6 +51,10 @@ Progress = namedtuple('Progress', (
 Inf = float('Inf')
 
 
+def assert_equal(a, b):
+    assert a == b, '{0!r} != {1!r}'.format(a, b)
+
+
 class StopSuite(Exception):
     pass
 
@@ -163,6 +168,7 @@ class BaseSuite(object):
         )
 
     def runtest(self, fun, n=50, index=0, repeats=1):
+        n = getattr(fun, '__iterations__', None) or n
         print('{0}: [[[{1}({2})]]]'.format(repeats, fun.__name__, n))
         with blockdetection(self.block_timeout):
             with self.fbi.investigation():
@@ -185,6 +191,8 @@ class BaseSuite(object):
                             raise
                         except Exception as exc:
                             print('-> {0!r}'.format(exc))
+                            import traceback
+                            print(traceback.format_exc())
                             print(pstatus(self.progress))
                         else:
                             print(pstatus(self.progress))
@@ -238,13 +246,14 @@ class BaseSuite(object):
 _creation_counter = count(0)
 
 
-def testcase(*groups):
+def testcase(*groups, **kwargs):
     if not groups:
         raise ValueError('@testcase requires at least one group name')
 
     def _mark_as_case(fun):
         fun.__testgroup__ = groups
         fun.__testsort__ = next(_creation_counter)
+        fun.__iterations__ = kwargs.get('iterations')
         return fun
 
     return _mark_as_case
@@ -262,12 +271,106 @@ def _is_descriptor(obj, attr):
 
 class Suite(BaseSuite):
 
+    @testcase('all', 'green', iterations=1)
+    def chain(self):
+        c = add.s(4, 4) | add.s(8) | add.s(16)
+        assert_equal(self.join(c()), 32)
+
+    @testcase('all', 'green', iterations=1)
+    def chaincomplex(self):
+        c = (
+            add.s(2, 2) | (
+                add.s(4) | add.s(8) | add.s(16)
+            ) |
+            group(add.s(i) for i in range(4))
+        )
+        res = c()
+        assert_equal(res.get(), [32, 33, 34, 35])
+
+    @testcase('all', 'green', iterations=1)
+    def parentids_chain(self):
+        c = chain(ids.si(i) for i in range(248))
+        c.freeze()
+        res = c()
+        res.get(timeout=5)
+        self.assert_ids(res, len(c.tasks) - 1)
+
+    @testcase('all', 'green', iterations=1)
+    def parentids_group(self):
+        g = ids.si(1) | ids.si(2) | group(ids.si(i) for i in range(2, 50))
+        res = g()
+        expected_root_id = res.parent.parent.id
+        expected_parent_id = res.parent.id
+        values = res.get(timeout=5)
+
+        for i, r in enumerate(values):
+            root_id, parent_id, value = r
+            assert_equal(root_id, expected_root_id)
+            assert_equal(parent_id, expected_parent_id)
+            assert_equal(value, i + 2)
+
+    def assert_ids(self, res, len):
+        i, root = len, res
+        while root.parent:
+            root = root.parent
+        node = res
+        while node:
+            root_id, parent_id, value = node.get(timeout=5)
+            assert_equal(value, i)
+            assert_equal(root_id, root.id)
+            if node.parent:
+                assert_equal(parent_id, node.parent.id)
+            node = node.parent
+            i -= 1
+
+    @testcase('redis', iterations=1)
+    def parentids_chord(self):
+        self.assert_parentids_chord()
+        self.assert_parentids_chord(uuid(), uuid())
+
+    def assert_parentids_chord(self, base_root=None, base_parent=None):
+        g = (
+            ids.si(1) |
+            ids.si(2) |
+            group(ids.si(i) for i in range(3, 50)) |
+            collect_ids.s(i=50) |
+            ids.si(51)
+        )
+        g.freeze(root_id=base_root, parent_id=base_parent)
+        res = g.apply_async(root_id=base_root, parent_id=base_parent)
+        expected_root_id = base_root or res.parent.parent.parent.id
+
+        root_id, parent_id, value = res.get(timeout=5)
+        assert_equal(value, 51)
+        assert_equal(root_id, expected_root_id)
+        assert_equal(parent_id, res.parent.id)
+
+        prev, (root_id, parent_id, value) = res.parent.get(timeout=5)
+        assert_equal(value, 50)
+        assert_equal(root_id, expected_root_id)
+        assert_equal(parent_id, res.parent.parent.id)
+
+        for i, p in enumerate(prev):
+            root_id, parent_id, value = p
+            assert_equal(root_id, expected_root_id)
+            assert_equal(parent_id, res.parent.parent.id)
+
+        root_id, parent_id, value = res.parent.parent.get(timeout=5)
+        assert_equal(value, 2)
+        assert_equal(parent_id, res.parent.parent.parent.id)
+        assert_equal(root_id, expected_root_id)
+
+        root_id, parent_id, value = res.parent.parent.parent.get(timeout=5)
+        assert_equal(value, 1)
+        assert_equal(root_id, expected_root_id)
+        assert_equal(parent_id, base_parent)
+
     @testcase('all', 'green')
     def manyshort(self):
         self.join(group(add.s(i, i) for i in range(1000))(),
                   timeout=10, propagate=True)
 
-    @testcase('all', 'green')
+    @testcase('all', 'green', iterations=1)
     def unicodetask(self):
         self.join(group(print_unicode.s() for _ in range(5))(),
                   timeout=1, propagate=True)

+ 1 - 1
funtests/stress/stress/templates.py

@@ -57,7 +57,7 @@ class default(object):
     result_serializer = 'json'
     result_persistent = True
     result_expires = 300
-    result_cache_max = -1
+    result_cache_max = 100
     task_default_queue = CSTRESS_QUEUE
     task_queues = [
         Queue(CSTRESS_QUEUE,

+ 1 - 1
requirements/extras/cassandra.txt

@@ -1 +1 @@
-pycassa
+cassandra-driver

+ 0 - 1
requirements/extras/new_cassandra.txt

@@ -1 +0,0 @@
-cassandra-driver

+ 0 - 1
setup.py

@@ -200,7 +200,6 @@ features = set([
     'eventlet', 'gevent', 'msgpack', 'yaml', 'redis',
     'mongodb', 'sqs', 'couchdb', 'riak', 'beanstalk', 'zookeeper',
     'zeromq', 'sqlalchemy', 'librabbitmq', 'pyro', 'slmq',
-    'new_cassandra',
 ])
 extras_require = dict((x, extras(x + '.txt')) for x in features)
 extra['extras_require'] = extras_require