Browse Source

Merge branch '3.0'

Conflicts:
	Changelog
	celery/app/amqp.py
	celery/backends/amqp.py
	celery/bin/amqp.py
	celery/canvas.py
	celery/tests/app/test_app.py
	celery/tests/bin/test_celeryd_detach.py
	celery/tests/worker/test_worker.py
	celery/worker/consumer.py
Ask Solem 12 years ago
parent
commit
3871cf4ee5

+ 479 - 0
Changelog

@@ -4,9 +4,488 @@
  Change history
 ================
 
+<<<<<<< HEAD
 This document contains change notes for bugfix releases in the 3.1.x series
 (Cipater), please see :ref:`whatsnew-3.1` for an overview of what's
 new in Celery 3.1.
+=======
+.. contents::
+    :local:
+
+If you're looking for versions prior to 3.0.x you should go to :ref:`history`.
+
+.. _version-3.0.13:
+
+3.0.13
+======
+:release-date: 2012-11-30 XX:XX:XX X.X UTC
+
+- Fixed a deadlock issue that could occur when the producer pool
+  inherited the connection pool instance of the parent process.
+
+- The :option:`--loader` option now works again (Issue #1066).
+
+- :program:`celery` umbrella command: All subcommands now supports
+  the :option:`--workdir` option (Issue #1063).
+
+- Groups included in chains now give GroupResults (Issue #1057)
+
+    Previously it would incorrectly add a regular result instead of a group
+    result, but now this works:
+
+    .. code-block:: python
+
+        # [4 + 4, 4 + 8, 16 + 8]
+        >>> res = (add.s(2, 2) | group(add.s(4), add.s(8), add.s(16)))()
+        >>> res
+        <GroupResult: a0acf905-c704-499e-b03a-8d445e6398f7 [
+            4346501c-cb99-4ad8-8577-12256c7a22b1,
+            b12ead10-a622-4d44-86e9-3193a778f345,
+            26c7a420-11f3-4b33-8fac-66cd3b62abfd]>
+
+
+- Chains can now chain other chains and use partial arguments (Issue #1057).
+
+    Example:
+
+    .. code-block:: python
+
+        >>> c1 = (add.s(2) | add.s(4))
+        >>> c2 = (add.s(8) | add.s(16))
+
+        >>> c3 = (c1 | c2)
+
+        # 8 + 2 + 4 + 8 + 16
+        >>> assert c3(8).get() == 38
+
+- Subtasks can now be used with unregistered tasks.
+
+    You can specify subtasks even if you just have the name::
+
+        >>> s = subtask(task_name, args=(), kwargs=())
+        >>> s.delay()
+
+- The :program:`celery shell` command now always adds the current
+  directory to the module path.
+
+- The worker will now properly handle the :exc:`pytz.AmbiguousTimeError`
+  exception raised when an ETA/countdown is prepared while being in DST
+  transition (Issue #1061).
+
+- force_execv: Now makes sure that task symbols in the original
+  task modules will always use the correct app instance (Issue #1072).
+
+- AMQP Backend: Now republishes result messages that have been polled
+  (using ``result.ready()`` and friends, ``result.get()`` will not do this
+  in this version).
+
+- Handling of ETA/countdown fixed when the :setting:`CELERY_ENABLE_UTC`
+   setting is disabled (Issue #1065).
+
+- A number of uneeded properties were included in messages,
+  caused by accidentally passing ``Queue.as_dict`` as message properties.
+
+- Fixed a typo in the broadcast routing documentation (Issue #1026).
+
+- Rewrote confusing section about idempotence in the task user guide.
+
+- Fixed typo in the daemonization tutorial (Issue #1055).
+
+- Fixed several typos in the documentation.
+
+    Contributed by Marius Gedminas.
+
+- Batches: Now works when using the eventlet pool.
+
+    Fix contributed by Thomas Grainger.
+
+- Batches: Added example sending results to :mod:`celery.contrib.batches`.
+
+    Contributed by Thomas Grainger.
+
+- Fixed problem when using earlier versions of :mod:`pytz`.
+
+    Fix contributed by Vlad.
+
+- Docs updated to include the default value for the
+  :setting:`CELERY_TASK_RESULT_EXPIRES` setting.
+
+- Improvements to the django-celery tutorial.
+
+    Contributed by Locker537.
+
+- The ``add_consumer`` control command did not properly persist
+  the addition of new queues so that they survived connection failure
+  (Issue #1079).
+
+
+3.0.12
+======
+:release-date: 2012-11-06 02:00 P.M UTC
+
+- Now depends on kombu 2.4.8
+
+    - [Redis] New and improved fair queue cycle algorithm (Kevin McCarthy).
+    - [Redis] Now uses a Redis-based mutex when restoring messages.
+    - [Redis] Number of messages that can be restored in one interval is no
+              longer limited (but can be set using the
+              ``unacked_restore_limit``
+              :setting:`transport option <BROKER_TRANSPORT_OPTIONS>`.)
+    - Heartbeat value can be specified in broker URLs (Mher Movsisyan).
+    - Fixed problem with msgpack on Python 3 (Jasper Bryant-Greene).
+
+- Now depends on billiard 2.7.3.18
+
+- Celery can now be used with static analysis tools like PyDev/PyCharm/pylint
+  etc.
+
+- Development documentation has moved to Read The Docs.
+
+    The new URL is: http://docs.celeryproject.org/en/master
+
+- New :setting:`CELERY_QUEUE_HA_POLICY` setting used to set the default
+  HA policy for queues when using RabbitMQ.
+
+- New method ``Task.subtask_from_request`` returns a subtask using the current
+  request.
+
+- Results get_many method did not respect timeout argument.
+
+    Fix contributed by Remigiusz Modrzejewski
+
+- generic_init.d scripts now support setting :envvar:`CELERY_CREATE_DIRS` to
+  always create log and pid directories (Issue #1045).
+
+    This can be set in your :file:`/etc/default/celeryd`.
+
+- Fixed strange kombu import problem on Python 3.2 (Issue #1034).
+
+- Worker: ETA scheduler now uses millisecond precision (Issue #1040).
+
+- The ``--config`` argument to programs is now supported by all loaders.
+
+- The :setting:`CASSANDRA_OPTIONS` setting has now been documented.
+
+    Contributed by Jared Biel.
+
+- Task methods (:mod:`celery.contrib.methods`) cannot be used with the old
+  task base class, the task decorator in that module now inherits from the new.
+
+- An optimization was too eager and caused some logging messages to never emit.
+
+- :mod:`celery.contrib.batches` now works again.
+
+- Fixed missing whitespace in ``bdist_rpm`` requirements (Issue #1046).
+
+- Event state's ``tasks_by_name`` applied limit before filtering by name.
+
+    Fix contributed by Alexander A. Sosnovskiy.
+
+.. _version-3.0.11:
+
+3.0.11
+======
+:release-date: 2012-09-26 04:00 P.M UTC
+
+- [security:low] generic-init.d scripts changed permissions of /var/log & /var/run
+
+    In the daemonization tutorial the recommended directories were as follows:
+
+    .. code-block:: bash
+
+        CELERYD_LOG_FILE="/var/log/celery/%n.log"
+        CELERYD_PID_FILE="/var/run/celery/%n.pid"
+
+    But in the scripts themselves the default files were ``/var/log/celery%n.log``
+    and ``/var/run/celery%n.pid``, so if the user did not change the location
+    by configuration, the directories ``/var/log`` and ``/var/run`` would be
+    created - and worse have their permissions and owners changed.
+
+    This change means that:
+
+        - Default pid file is ``/var/run/celery/%n.pid``
+        - Default log file is ``/var/log/celery/%n.log``
+
+        - The directories are only created and have their permissions
+          changed if *no custom locations are set*.
+
+    Users can force paths to be created by calling the ``create-paths``
+    subcommand:
+
+    .. code-block:: bash
+
+        $ sudo /etc/init.d/celeryd create-paths
+
+    .. admonition:: Upgrading Celery will not update init scripts
+
+        To update the init scripts you have to re-download
+        the files from source control and update them manually.
+        You can find the init scripts for version 3.0.x at:
+
+            http://github.com/celery/celery/tree/3.0/extra/generic-init.d
+
+- Now depends on billiard 2.7.3.17
+
+- Fixes request stack protection when app is initialized more than
+  once (Issue #1003).
+
+- ETA tasks now properly works when system timezone is not the same
+  as the configured timezone (Issue #1004).
+
+- Terminating a task now works if the task has been sent to the
+  pool but not yet acknowledged by a pool process (Issue #1007).
+
+    Fix contributed by Alexey Zatelepin
+
+- Terminating a task now properly updates the state of the task to revoked,
+  and sends a ``task-revoked`` event.
+
+- Generic worker init script now waits for workers to shutdown by default.
+
+- Multi: No longer parses --app option (Issue #1008).
+
+- Multi: stop_verify command renamed to stopwait.
+
+- Daemonization: Now delays trying to create pidfile/logfile until after
+  the working directory has been changed into.
+
+- :program:`celery worker` and :program:`celery beat` commands now respects
+  the :option:`--no-color` option (Issue #999).
+
+- Fixed typos in eventlet examples (Issue #1000)
+
+    Fix contributed by Bryan Bishop.
+    Congratulations on opening bug #1000!
+
+- Tasks that raise :exc:`~celery.exceptions.Ignore` are now acknowledged.
+
+- Beat: Now shows the name of the entry in ``sending due task`` logs.
+
+.. _version-3.0.10:
+
+3.0.10
+======
+:release-date: 2012-09-20 05:30 P.M BST
+
+- Now depends on kombu 2.4.7
+
+- Now depends on billiard 2.7.3.14
+
+    - Fixes crash at startup when using Django and pre-1.4 projects
+      (setup_environ).
+
+    - Hard time limits now sends the KILL signal shortly after TERM,
+      to terminate processes that have signal handlers blocked by C extensions.
+
+    - Billiard now installs even if the C extension cannot be built.
+
+        It's still recommended to build the C extension if you are using
+        a transport other than rabbitmq/redis (or use forced execv for some
+        other reason).
+
+    - Pool now sets a ``current_process().index`` attribute that can be used to create
+      as many log files as there are processes in the pool.
+
+- Canvas: chord/group/chain no longer modifies the state when called
+
+    Previously calling a chord/group/chain would modify the ids of subtasks
+    so that:
+
+    .. code-block:: python
+
+        >>> c = chord([add.s(2, 2), add.s(4, 4)], xsum.s())
+        >>> c()
+        >>> c() <-- call again
+
+    at the second time the ids for the tasks would be the same as in the
+    previous invocation.  This is now fixed, so that calling a subtask
+    won't mutate any options.
+
+- Canvas: Chaining a chord to another task now works (Issue #965).
+
+- Worker: Fixed a bug where the request stack could be corrupted if
+  relative imports are used.
+
+    Problem usually manifested itself as an exception while trying to
+    send a failed task result (``NoneType does not have id attribute``).
+
+    Fix contributed by Sam Cooke.
+
+- Tasks can now raise :exc:`~celery.exceptions.Ignore` to skip updating states
+  or events after return.
+
+    Example:
+
+    .. code-block:: python
+
+        from celery.exceptions import Ignore
+
+        @task
+        def custom_revokes():
+            if redis.sismember('tasks.revoked', custom_revokes.request.id):
+                raise Ignore()
+
+- The worker now makes sure the request/task stacks are not modified
+  by the initial ``Task.__call__``.
+
+    This would previously be a problem if a custom task class defined
+    ``__call__`` and also called ``super()``.
+
+- Because of problems the fast local optimization has been disabled,
+  and can only be enabled by setting the :envvar:`USE_FAST_LOCALS` attribute.
+
+- Worker: Now sets a default socket timeout of 5 seconds at shutdown
+  so that broken socket reads do not hinder proper shutdown (Issue #975).
+
+- More fixes related to late eventlet/gevent patching.
+
+- Documentation for settings out of sync with reality:
+
+    - :setting:`CELERY_TASK_PUBLISH_RETRY`
+
+        Documented as disabled by default, but it was enabled by default
+        since 2.5 as stated by the 2.5 changelog.
+
+    - :setting:`CELERY_TASK_PUBLISH_RETRY_POLICY`
+
+        The default max_retries had been set to 100, but documented as being
+        3, and the interval_max was set to 1 but documented as 0.2.
+        The default setting are now set to 3 and 0.2 as it was originally
+        documented.
+
+    Fix contributed by Matt Long.
+
+- Worker: Log messages when connection established and lost have been improved.
+
+- The repr of a crontab schedule value of '0' should be '*'  (Issue #972).
+
+- Revoked tasks are now removed from reserved/active state in the worker
+  (Issue #969)
+
+    Fix contributed by Alexey Zatelepin.
+
+- gevent: Now supports hard time limits using ``gevent.Timeout``.
+
+- Documentation: Links to init scripts now point to the 3.0 branch instead
+  of the development branch (master).
+
+- Documentation: Fixed typo in signals user guide (Issue #986).
+
+    ``instance.app.queues`` -> ``instance.app.amqp.queues``.
+
+- Eventlet/gevent: The worker did not properly set the custom app
+  for new greenlets.
+
+- Eventlet/gevent: Fixed a bug where the worker could not recover
+  from connection loss (Issue #959).
+
+    Also, because of a suspected bug in gevent the
+    :setting:`BROKER_CONNECTION_TIMEOUT` setting has been disabled
+    when using gevent
+
+3.0.9
+=====
+:release-date: 2012-08-31 06:00 P.M BST
+
+- Important note for users of Django and the database scheduler!
+
+    Recently a timezone issue has been fixed for periodic tasks,
+    but erroneous timezones could have already been stored in the
+    database, so for the fix to work you need to reset
+    the ``last_run_at`` fields.
+
+    You can do this by executing the following command:
+
+    .. code-block:: bash
+
+        $ python manage.py shell
+        >>> from djcelery.models import PeriodicTask
+        >>> PeriodicTask.objects.update(last_run_at=None)
+
+    You also have to do this if you change the timezone or
+    :setting:`CELERY_ENABLE_UTC` setting.
+
+- Note about the :setting:`CELERY_ENABLE_UTC` setting.
+
+    If you previously disabled this just to force periodic tasks to work with
+    your timezone, then you are now *encouraged to re-enable it*.
+
+- Now depends on Kombu 2.4.5 which fixes PyPy + Jython installation.
+
+- Fixed bug with timezones when :setting:`CELERY_ENABLE_UTC` is disabled
+  (Issue #952).
+
+- Fixed a typo in the celerybeat upgrade mechanism (Issue #951).
+
+- Make sure the `exc_info` argument to logging is resolved (Issue #899).
+
+- Fixed problem with Python 3.2 and thread join timeout overflow (Issue #796).
+
+- A test case was occasionally broken for Python 2.5.
+
+- Unit test suite now passes for PyPy 1.9.
+
+- App instances now supports the with statement.
+
+    This calls the new :meth:`~celery.Celery.close` method at exit, which
+    cleans up after the app like closing pool connections.
+
+    Note that this is only necessary when dynamically creating apps,
+    e.g. for "temporary" apps.
+
+- Support for piping a subtask to a chain.
+
+    For example:
+
+    .. code-block:: python
+
+        pipe = sometask.s() | othertask.s()
+        new_pipe = mytask.s() | pipe
+
+    Contributed by Steve Morin.
+
+- Fixed problem with group results on non-pickle serializers.
+
+    Fix contributed by Steeve Morin.
+
+.. _version-3.0.8:
+
+3.0.8
+=====
+:release-date: 2012-08-29 05:00 P.M BST
+
+- Now depends on Kombu 2.4.4
+
+- Fixed problem with amqplib and receiving larger message payloads
+  (Issue #922).
+
+    The problem would manifest itself as either the worker hanging,
+    or occasionally a ``Framing error`` exception appearing.
+
+    Users of the new ``pyamqp://`` transport must upgrade to
+    :mod:`amqp` 0.9.3.
+
+- Beat: Fixed another timezone bug with interval and crontab schedules
+  (Issue #943).
+
+- Beat: The schedule file is now automatically cleared if the timezone
+  is changed.
+
+    The schedule is also cleared when you upgrade to 3.0.8 from an earlier
+    version, this to register the initial timezone info.
+
+- Events: The :event:`worker-heartbeat` event now include processed and active
+  count fields.
+
+    Contributed by Mher Movsisyan.
+
+- Fixed error with error email and new task classes (Issue #931).
+
+- ``BaseTask.__call__`` is no longer optimized away if it has been monkey
+  patched.
+
+- Fixed shutdown issue when using gevent (Issue #911 & Issue #936).
+>>>>>>> 3.0
 
 If you're looking for versions prior to 3.1 you should visit our
 :ref:`history` of releases.

+ 30 - 28
celery/app/amqp.py

@@ -178,17 +178,20 @@ class TaskProducer(Producer):
             expires=None, exchange=None, exchange_type=None,
             event_dispatcher=None, retry=None, retry_policy=None,
             queue=None, now=None, retries=0, chord=None, callbacks=None,
-            errbacks=None, mandatory=None, priority=None, immediate=None,
-            routing_key=None, serializer=None, delivery_mode=None,
-            compression=None, reply_to=None, timeout=None, soft_timeout=None,
-            timeouts=None, declare=None, **kwargs):
+            errbacks=None, routing_key=None, serializer=None,
+            delivery_mode=None, compression=None, reply_to=None,
+            timeout=None, soft_timeout=None, timeouts=None,
+            declare=None, **kwargs):
         """Send task message."""
         retry = self.retry if retry is None else retry
 
         declare = declare or []
+        qname = queue
         if queue is not None:
             if isinstance(queue, basestring):
-                queue = self.queues[queue]
+                qname, queue = queue, self.queues[queue]
+            else:
+                qname = queue.name
             exchange = exchange or queue.exchange.name
             routing_key = routing_key or queue.routing_key
 
@@ -212,30 +215,29 @@ class TaskProducer(Producer):
         eta = eta and eta.isoformat()
         expires = expires and expires.isoformat()
 
-        body = {'task': task_name,
-                'id': task_id,
-                'args': task_args,
-                'kwargs': task_kwargs,
-                'retries': retries or 0,
-                'eta': eta,
-                'expires': expires,
-                'utc': self.utc,
-                'callbacks': callbacks,
-                'errbacks': errbacks,
-                'reply_to': reply_to,
-                'timeouts': timeouts or (timeout, soft_timeout)}
-        group_id = group_id or taskset_id
-        if group_id:
-            body['taskset'] = group_id
-        if chord:
-            body['chord'] = chord
-
-        self.publish(body, exchange=exchange, mandatory=mandatory,
-             immediate=immediate, routing_key=routing_key,
+        body = {
+            'task': task_name,
+            'id': task_id,
+            'args': task_args,
+            'kwargs': task_kwargs,
+            'retries': retries or 0,
+            'eta': eta,
+            'expires': expires,
+            'utc': self.utc,
+            'callbacks': callbacks,
+            'errbacks': errbacks,
+            'reply_to': reply_to,
+            'timeouts': timeouts or (timeout, soft_timeout)}
+            'taskset': group_id or taskset_id,
+            'chord': chord,
+        }
+
+        self.publish(body,
+             exchange=exchange, routing_key=routing_key,
              serializer=serializer or self.serializer,
              compression=compression or self.compression,
-             retry=retry, retry_policy=_rp, delivery_mode=delivery_mode,
-             priority=priority, declare=declare,
+             retry=retry, retry_policy=_rp,
+             delivery_mode=delivery_mode, declare=declare,
              **kwargs)
 
         signals.task_sent.send(sender=task_name, **body)
@@ -250,7 +252,7 @@ class TaskProducer(Producer):
                                                retries=retries,
                                                eta=eta,
                                                expires=expires,
-                                               queue=queue,
+                                               queue=qname,
                                                exchange=exname,
                                                routing_key=routing_key)
         return task_id

+ 1 - 1
celery/app/builtins.py

@@ -79,7 +79,7 @@ def add_unlock_chord_task(app):
         if result.ready():
             subtask(callback).delay(j(propagate=propagate))
         else:
-            unlock_chord.retry(countdown=interval, max_retries=max_retries)
+            return unlock_chord.retry(countdown=interval, max_retries=max_retries)
     return unlock_chord
 
 

+ 4 - 4
celery/app/routes.py

@@ -59,9 +59,9 @@ class Router(object):
             # things (like the routing_key): great for topic exchanges.
             queue = route.pop('queue', None)
 
-        if queue:  # expand config from configured queue.
+        if queue:
             try:
-                _Q = self.queues[queue]  # noqa
+                Q = self.queues[queue]  # noqa
             except KeyError:
                 if not self.create_missing:
                     raise QueueNotFound(
@@ -69,9 +69,9 @@ class Router(object):
                 for key in 'exchange', 'routing_key':
                     if route.get(key) is None:
                         route[key] = queue
-                self.app.amqp.queues.add(queue, **route)
+                Q = self.app.amqp.queues.add(queue, **route)
             # needs to be declared by publisher
-            route['queue'] = queue
+            route['queue'] = Q
         return route
 
     def lookup_route(self, task, args=None, kwargs=None):

+ 15 - 2
celery/backends/amqp.py

@@ -16,8 +16,7 @@ import time
 
 from collections import deque
 
-from kombu.entity import Exchange, Queue
-from kombu.messaging import Consumer, Producer
+from kombu import Exchange, Queue, Producer, Consumer
 
 from celery import states
 from celery.exceptions import TimeoutError
@@ -104,6 +103,18 @@ class AMQPBackend(BaseBackend):
     def _routing_key(self, task_id):
         return task_id.replace('-', '')
 
+    def _republish(self, channel, task_id, body, content_type,
+            content_encoding):
+        return Producer(channel).publish(body,
+            exchange=self.exchange,
+            routing_key=self._routing_key(task_id),
+            serializer=self.serializer,
+            content_type=content_type,
+            content_encoding=content_encoding,
+            retry=True, retry_policy=self.retry_policy,
+            declare=[self.on_reply_declare(task_id)],
+        )
+
     def _store_result(self, task_id, result, status, traceback=None):
         """Send task return value and status."""
         with self.mutex:
@@ -159,6 +170,8 @@ class AMQPBackend(BaseBackend):
 
             if latest:
                 # new state to report
+                self._republish(channel, task_id, latest.body,
+                                latest.content_type, latest.content_encoding)
                 payload = self._cache[task_id] = latest.payload
                 return payload
             else:

+ 18 - 3
celery/canvas.py

@@ -12,6 +12,7 @@
 from __future__ import absolute_import
 
 from copy import deepcopy
+from functools import partial as _partial
 from operator import itemgetter
 from itertools import chain as _chain
 
@@ -19,7 +20,7 @@ from kombu.utils import cached_property, fxrange, kwdict, reprcall, uuid
 
 from celery import current_app
 from celery.local import Proxy
-from celery.result import GroupResult
+from celery.result import AsyncResult, GroupResult
 from celery.utils.functional import (
     maybe_list, is_list, regen,
     chunks as _chunks,
@@ -134,7 +135,7 @@ class Signature(dict):
             tid = opts['task_id']
         except KeyError:
             tid = opts['task_id'] = _id or uuid()
-        return self.type.AsyncResult(tid)
+        return self.AsyncResult(tid)
 
     def replace(self, args=None, kwargs=None, options=None):
         s = self.clone()
@@ -155,7 +156,7 @@ class Signature(dict):
     def apply_async(self, args=(), kwargs={}, **options):
         # For callbacks: extra args are prepended to the stored args.
         args, kwargs, options = self._merge(args, kwargs, options)
-        return self.type.apply_async(args, kwargs, **options)
+        return self._apply_async(args, kwargs, **options)
 
     def append_to_list_option(self, key, value):
         items = self.options.setdefault(key, [])
@@ -218,6 +219,20 @@ class Signature(dict):
     @cached_property
     def type(self):
         return self._type or current_app.tasks[self['task']]
+
+    @cached_property
+    def AsyncResult(self):
+        try:
+            return self.type.AsyncResult
+        except KeyError:  # task not registered
+            return AsyncResult
+
+    @cached_property
+    def _apply_async(self):
+        try:
+            return self.type.apply_async
+        except KeyError:
+            return _partial(current_app.send_task, self['task'])
     task = _getitem_property('task')
     args = _getitem_property('args')
     kwargs = _getitem_property('kwargs')

+ 2 - 2
celery/task/base.py

@@ -43,8 +43,8 @@ class Task(BaseTask):
     exchange = None
     exchange_type = None
     delivery_mode = None
-    mandatory = False
-    immediate = False
+    mandatory = False  # XXX deprecated
+    immediate = False  # XXX deprecated
     priority = None
     type = 'regular'
     disable_error_emails = False

+ 17 - 19
celery/tests/app/test_routes.py

@@ -2,6 +2,7 @@ from __future__ import absolute_import
 
 from functools import wraps
 
+from kombu import Exchange
 from kombu.utils.functional import maybe_promise
 
 from celery import current_app
@@ -58,12 +59,7 @@ d_queue = {'exchange': current_app.conf.CELERY_DEFAULT_EXCHANGE,
 
 
 class RouteCase(Case):
-
-    def assertAnswer(self, answer, expected):
-        self.assertEqual(answer['exchange'].name, expected['exchange'])
-        self.assertEqual(answer['routing_key'], expected['routing_key'])
-        if 'queue' in expected:
-            self.assertEqual(answer['queue'], expected['queue'])
+    pass
 
 
 class test_MapRoute(RouteCase):
@@ -72,7 +68,10 @@ class test_MapRoute(RouteCase):
     def test_route_for_task_expanded_route(self):
         expand = E(current_app.amqp.queues)
         route = routes.MapRoute({mytask.name: {'queue': 'foo'}})
-        self.assertAnswer(expand(route.route_for_task(mytask.name)), a_queue)
+        self.assertEqual(
+            expand(route.route_for_task(mytask.name))['queue'].name,
+            'foo',
+        )
         self.assertIsNone(route.route_for_task('celery.awesome'))
 
     @with_queues(foo=a_queue, bar=b_queue)
@@ -102,8 +101,8 @@ class test_lookup_route(RouteCase):
         R = routes.prepare(({mytask.name: {'queue': 'bar'}},
                             {mytask.name: {'queue': 'foo'}}))
         router = Router(R, current_app.amqp.queues)
-        self.assertAnswer(router.route({}, mytask.name,
-                          args=[1, 2], kwargs={}), b_queue)
+        self.assertEqual(router.route({}, mytask.name,
+                         args=[1, 2], kwargs={})['queue'].name, 'bar')
 
     @with_queues()
     def test_expands_queue_in_options(self):
@@ -117,17 +116,16 @@ class test_lookup_route(RouteCase):
                               'immediate': False},
                              mytask.name,
                              args=[1, 2], kwargs={})
-        self.assertDictContainsSubset({'routing_key': 'testq',
-                                       'immediate': False},
-                                       route)
-        self.assertEqual(route['exchange'].name, 'testq')
-        self.assertIn('queue', route)
+        self.assertEqual(route['queue'].name, 'testq')
+        self.assertEqual(route['queue'].exchange, Exchange('testq'))
+        self.assertEqual(route['queue'].routing_key, 'testq')
+        self.assertEqual(route['immediate'], False)
 
     @with_queues(foo=a_queue, bar=b_queue)
     def test_expand_destination_string(self):
         x = Router({}, current_app.amqp.queues)
         dest = x.expand_destination('foo')
-        self.assertEqual(dest['exchange'].name, 'fooexchange')
+        self.assertEqual(dest['queue'].name, 'foo')
 
     @with_queues(foo=a_queue, bar=b_queue, **{
         current_app.conf.CELERY_DEFAULT_QUEUE: d_queue})
@@ -135,10 +133,10 @@ class test_lookup_route(RouteCase):
         R = routes.prepare(({'celery.xaza': {'queue': 'bar'}},
                             {mytask.name: {'queue': 'foo'}}))
         router = Router(R, current_app.amqp.queues)
-        self.assertAnswer(router.route({}, mytask.name,
-                          args=[1, 2], kwargs={}), a_queue)
-        self.assertAnswer(router.route({}, 'celery.poza'),
-                dict(d_queue, queue=current_app.conf.CELERY_DEFAULT_QUEUE))
+        self.assertEqual(router.route({}, mytask.name,
+                          args=[1, 2], kwargs={})['queue'].name, 'foo')
+        self.assertEqual(router.route({}, 'celery.poza')['queue'].name,
+                current_app.conf.CELERY_DEFAULT_QUEUE)
 
 
 class test_prepare(Case):

+ 12 - 2
celery/tests/backends/test_amqp.py

@@ -1,5 +1,6 @@
 from __future__ import absolute_import
 
+import pickle
 import socket
 
 from datetime import timedelta
@@ -15,7 +16,7 @@ from celery.exceptions import TimeoutError
 from celery.five import Empty, Queue, range
 from celery.utils import uuid
 
-from celery.tests.utils import AppCase, sleepdeprived
+from celery.tests.utils import AppCase, sleepdeprived, Mock
 
 
 class SomeClass(object):
@@ -131,11 +132,14 @@ class test_AMQPBackend(AppCase):
             def __init__(self, **merge):
                 self.payload = dict({'status': states.STARTED,
                                      'result': None}, **merge)
+                self.body = pickle.dumps(self.payload)
+                self.content_type = 'application/x-python-serialize'
+                self.content_encoding = 'binary'
 
         class MockBinding(object):
 
             def __init__(self, *args, **kwargs):
-                pass
+                self.channel = Mock()
 
             def __call__(self, *args, **kwargs):
                 return self
@@ -149,10 +153,14 @@ class test_AMQPBackend(AppCase):
                 except Empty:
                     pass
 
+            def is_bound(self):
+                return True
+
         class MockBackend(AMQPBackend):
             Queue = MockBinding
 
         backend = MockBackend()
+        backend._republish = Mock()
 
         # FFWD's to the latest state.
         results.put(Message(status=states.RECEIVED, seq=1))
@@ -169,6 +177,8 @@ class test_AMQPBackend(AppCase):
         backend.get_task_meta(tid)
         self.assertIn(tid, backend._cache, 'Caches last known state')
 
+        self.assertTrue(backend._republish.called)
+
         # Returns cache if no new states.
         results.queue.clear()
         assert not results.qsize()

+ 1 - 2
celery/tests/bin/test_celeryd_detach.py

@@ -85,8 +85,7 @@ class test_Command(Case):
         x.execute_from_commandline(self.argv)
         self.assertTrue(exit.called)
         detach.assert_called_with(path=x.execv_path, uid=None, gid=None,
-            umask=0, fake=False,
-            logfile='/var/log', pidfile='celeryd.pid',
+            umask=0, fake=False, logfile='/var/log', pidfile='celeryd.pid',
             argv=['-m', 'celery', 'worker', '-c', '1', '-lDEBUG',
                   '--logfile=/var/log', '--pidfile=celeryd.pid',
                   '--', '.disable_rate_limits=1'],

+ 1 - 1
celery/tests/worker/test_worker.py

@@ -332,11 +332,11 @@ class test_Consumer(Case):
         l.event_dispatcher = Mock()
         l.node = MockNode()
         l.update_strategies()
+        l.qos = Mock()
 
         callback = self._get_on_message(l)
         callback(m.decode(), m)
         self.assertTrue(m.acknowledged)
-        self.assertTrue(to_timestamp.call_count)
 
     @patch('celery.worker.consumer.error')
     def test_receive_message_InvalidTaskError(self, error):

+ 4 - 3
docs/configuration.rst

@@ -751,9 +751,10 @@ Only the scheme part (``transport://``) is required, the rest
 is optional, and defaults to the specific transports default values.
 
 The transport part is the broker implementation to use, and the
-default is ``amqp``, but there are many other choices including
-``librabbitmq``, ``amqplib``, ``redis``, ``beanstalk``,
-``sqlalchemy``, ``django``, ``mongodb``, ``couchdb`` and ``pika``.
+default is ``amqp``, which uses ``librabbitmq`` by default or falls back to
+``pyamqp`` if that is not installed.  Also there are many other choices including
+``redis``, ``beanstalk``, ``sqlalchemy``, ``django``, ``mongodb``,
+``couchdb``.
 It can also be a fully qualified path to your own transport implementation.
 
 See the Kombu documentation for more information about broker URLs.

+ 3 - 3
docs/faq.rst

@@ -122,12 +122,12 @@ kombu
 
 Kombu depends on the following packages:
 
-- `amqplib`_
+- `amqp`_
 
 The underlying pure-Python amqp client implementation.  AMQP being the default
-broker it is a natural dependency.
+broker this is a natural dependency.
 
-.. _`amqplib`: http://pypi.python.org/pypi/amqplib
+.. _`amqp`: http://pypi.python.org/pypi/amqp
 
 - `anyjson`_
 

+ 43 - 3
docs/history/changelog-3.0.rst

@@ -8,8 +8,7 @@ This document contains change notes for bugfix releases in the 3.0.x series
 (Chiastic Slide), please see :ref:`whatsnew-3.0` for an overview of what's
 new in Celery 3.0.
 
-If you're looking for versions prior to 3.0 you should visit our
-:ref:`history` of releases.
+If you're looking for versions prior to 3.0.x you should go to :ref:`history`.
 
 .. contents::
     :local:
@@ -18,7 +17,7 @@ If you're looking for versions prior to 3.0 you should visit our
 
 3.0.13
 ======
-:release-date: TBA
+:release-date: 2012-11-30 XX:XX:XX X.X UTC
 
 - Fixed a deadlock issue that could occur when the producer pool
   inherited the connection pool instance of the parent process.
@@ -58,6 +57,13 @@ If you're looking for versions prior to 3.0 you should visit our
         # 8 + 2 + 4 + 8 + 16
         >>> assert c3(8).get() == 38
 
+- Subtasks can now be used with unregistered tasks.
+
+    You can specify subtasks even if you just have the name::
+
+        >>> s = subtask(task_name, args=(), kwargs=())
+        >>> s.delay()
+
 - The :program:`celery shell` command now always adds the current
   directory to the module path.
 
@@ -68,15 +74,49 @@ If you're looking for versions prior to 3.0 you should visit our
 - force_execv: Now makes sure that task symbols in the original
   task modules will always use the correct app instance (Issue #1072).
 
+- AMQP Backend: Now republishes result messages that have been polled
+  (using ``result.ready()`` and friends, ``result.get()`` will not do this
+  in this version).
+
 - Handling of ETA/countdown fixed when the :setting:`CELERY_ENABLE_UTC`
    setting is disabled (Issue #1065).
 
+- A number of uneeded properties were included in messages,
+  caused by accidentally passing ``Queue.as_dict`` as message properties.
+
 - Fixed a typo in the broadcast routing documentation (Issue #1026).
 
 - Rewrote confusing section about idempotence in the task user guide.
 
 - Fixed typo in the daemonization tutorial (Issue #1055).
 
+- Fixed several typos in the documentation.
+
+    Contributed by Marius Gedminas.
+
+- Batches: Now works when using the eventlet pool.
+
+    Fix contributed by Thomas Grainger.
+
+- Batches: Added example sending results to :mod:`celery.contrib.batches`.
+
+    Contributed by Thomas Grainger.
+
+- Fixed problem when using earlier versions of :mod:`pytz`.
+
+    Fix contributed by Vlad.
+
+- Docs updated to include the default value for the
+  :setting:`CELERY_TASK_RESULT_EXPIRES` setting.
+
+- Improvements to the django-celery tutorial.
+
+    Contributed by Locker537.
+
+- The ``add_consumer`` control command did not properly persist
+  the addition of new queues so that they survived connection failure
+  (Issue #1079).
+
 .. _version-3.0.12:
 
 3.0.12

+ 0 - 14
docs/userguide/calling.rst

@@ -491,20 +491,6 @@ AMQP's full routing capabilities. Interested parties may read the
 
     Routing key used to determine.
 
-- mandatory
-
-    This sets the delivery to be mandatory.  An exception will be raised
-    if there are no running workers able to take on the task.
-
-    Not supported by :mod:`amqplib`.
-
-- immediate
-
-    Request immediate delivery. Will raise an exception
-    if the task cannot be routed to a worker immediately.
-
-    Not supported by :mod:`amqplib`.
-
 - priority
 
     A number between `0` and `9`, where `0` is the highest priority.

+ 1 - 1
docs/userguide/optimizing.rst

@@ -66,7 +66,7 @@ If you're using RabbitMQ (AMQP) as the broker then you can install the
 
 The 'amqp' transport will automatically use the librabbitmq module if it's
 installed, or you can also specify the transport you want directly by using
-the ``amqplib://`` or ``librabbitmq://`` prefixes.
+the ``pyamqp://`` or ``librabbitmq://`` prefixes.
 
 .. _optimizing-connection-pools:
 

+ 3 - 2
docs/whatsnew-3.0.rst

@@ -61,7 +61,7 @@ Highlights
 
         Celery will automatically use the :mod:`librabbitmq` module
         if installed, which is a very fast and memory-optimized
-        replacement for the amqplib module.
+        replacement for the py-amqp module.
 
     - Redis support is more reliable with improved ack emulation.
 
@@ -112,7 +112,8 @@ or Redis as a broker, resulting in:
 - Sub-millisecond timer precision.
 - Faster shutdown times.
 
-The transports supported are:  ``amqplib``, ``librabbitmq``, and ``redis``
+The transports supported are:  ``py-amqp`` ``librabbitmq``, ``redis``,
+and ``amqplib``.
 Hopefully this can be extended to include additional broker transports
 in the future.