Sfoglia il codice sorgente

Merge branch 'master' into 3.1

Ask Solem 11 anni fa
parent
commit
42e1492cae
46 ha cambiato i file con 449 aggiunte e 186 eliminazioni
  1. 74 0
      Changelog
  2. 1 1
      README.rst
  3. 1 1
      celery/__init__.py
  4. 1 3
      celery/app/__init__.py
  5. 3 2
      celery/app/amqp.py
  6. 9 6
      celery/app/base.py
  7. 9 1
      celery/app/builtins.py
  8. 13 0
      celery/app/task.py
  9. 1 1
      celery/app/trace.py
  10. 19 7
      celery/backends/base.py
  11. 24 13
      celery/backends/cache.py
  12. 1 1
      celery/backends/database/models.py
  13. 2 10
      celery/bin/amqp.py
  14. 2 1
      celery/bin/beat.py
  15. 18 0
      celery/canvas.py
  16. 16 5
      celery/concurrency/asynpool.py
  17. 4 4
      celery/events/__init__.py
  18. 6 2
      celery/events/state.py
  19. 21 15
      celery/five.py
  20. 11 0
      celery/fixups/django.py
  21. 8 3
      celery/local.py
  22. 1 1
      celery/task/http.py
  23. 30 0
      celery/tests/app/test_amqp.py
  24. 17 9
      celery/tests/backends/test_cache.py
  25. 1 1
      celery/tests/case.py
  26. 2 2
      celery/tests/compat_modules/test_http.py
  27. 4 3
      celery/tests/fixups/test_django.py
  28. 1 3
      celery/tests/tasks/test_context.py
  29. 3 3
      celery/tests/tasks/test_tasks.py
  30. 4 1
      celery/tests/utils/test_local.py
  31. 1 0
      celery/tests/worker/test_request.py
  32. 2 1
      celery/tests/worker/test_worker.py
  33. 8 2
      celery/utils/__init__.py
  34. 12 10
      celery/utils/dispatch/saferef.py
  35. 1 2
      celery/utils/dispatch/signal.py
  36. 3 0
      celery/utils/functional.py
  37. 50 50
      celery/worker/consumer.py
  38. 3 2
      celery/worker/control.py
  39. 13 7
      celery/worker/job.py
  40. 1 1
      docs/includes/introduction.txt
  41. 27 0
      docs/userguide/tasks.rst
  42. 4 4
      docs/userguide/workers.rst
  43. 10 1
      extra/centos/celeryd
  44. 2 2
      requirements/default.txt
  45. 2 2
      setup.cfg
  46. 3 3
      tox.ini

+ 74 - 0
Changelog

@@ -8,6 +8,80 @@ This document contains change notes for bugfix releases in the 3.1.x series
 (Cipater), please see :ref:`whatsnew-3.1` for an overview of what's
 new in Celery 3.1.
 
+.. _version-3.1.6:
+
+3.1.6
+=====
+:release-date: 2013-12-02 6:00 P.M UTC
+:release-by: Ask Solem
+
+- Now depends on :mod:`billiard` 3.3.0.10.
+
+- Now depends on :ref:`Kombu 3.0.7 <kombu:version-3.0.7>`.
+
+- Fixed problem where Mingle caused the worker to hang at startup
+  (Issue #1686).
+
+- Beat: Would attempt to drop privileges twice (Issue #1708).
+
+- Windows: Fixed error with ``geteuid`` not being available (Issue #1676).
+
+- Tasks can now provide a list of expected error classes (Issue #1682).
+
+    The list should only include errors that the task is expected to raise
+    during normal operation::
+
+        @task(throws=(KeyError, HttpNotFound))
+
+    What happens when an exceptions is raised depends on the type of error:
+
+    - Expected errors (included in ``Task.throws``)
+
+        Will be logged using severity ``INFO``, and traceback is excluded.
+
+    - Unexpected errors
+
+        Will be logged using severity ``ERROR``, with traceback included.
+
+- Cache result backend now compatible with Python 3 (Issue #1697).
+
+- CentOS init script: Now compatible with sys-v style init symlinks.
+
+    Fix contributed by Jonathan Jordan.
+
+- Events: Fixed problem when task name is not defined (Issue #1710).
+
+    Fix contributed by Mher Movsisyan.
+
+- Task: Fixed unbound local errors (Issue #1684).
+
+    Fix contributed by Markus Ullmann.
+
+- Canvas: Now unrolls groups with only one task (optimization) (Issue #1656).
+
+- Task: Fixed problem with eta and timezones.
+
+    Fix contributed by Alexander Koval.
+
+- Django: Worker now performs model validation (Issue #1681).
+
+- Task decorator now emits less confusing errors when used with
+  incorrect arguments (Issue #1692).
+
+- Task: New method ``Task.send_event`` can be used to send custom events
+  to Flower and other monitors.
+
+- Fixed a compatibility issue with non-abstract task classes
+
+- Events from clients now uses new node name format (``gen<pid>@<hostname>``).
+
+- Fixed rare bug with Callable not being defined at interpreter shutdown
+  (Issue #1678).
+
+    Fix contributed by Nick Johnson.
+
+- Fixed Python 2.6 compatibility (Issue #1679).
+
 .. _version-3.1.5:
 
 3.1.5

+ 1 - 1
README.rst

@@ -4,7 +4,7 @@
 
 .. image:: http://cloud.github.com/downloads/celery/celery/celery_128.png
 
-:Version: 3.1.5 (Cipater)
+:Version: 3.1.6 (Cipater)
 :Web: http://celeryproject.org/
 :Download: http://pypi.python.org/pypi/celery/
 :Source: http://github.com/celery/celery/

+ 1 - 1
celery/__init__.py

@@ -14,7 +14,7 @@ version_info_t = namedtuple(
 )
 
 SERIES = 'Cipater'
-VERSION = version_info_t(3, 1, 5, '', '')
+VERSION = version_info_t(3, 1, 6, '', '')
 __version__ = '{0.major}.{0.minor}.{0.micro}{0.releaselevel}'.format(VERSION)
 __author__ = 'Ask Solem'
 __contact__ = 'ask@celeryproject.org'

+ 1 - 3
celery/app/__init__.py

@@ -10,8 +10,6 @@ from __future__ import absolute_import
 
 import os
 
-from collections import Callable
-
 from celery.local import Proxy
 from celery import _state
 from celery._state import (
@@ -148,6 +146,6 @@ def shared_task(*args, **kwargs):
             return Proxy(task_by_cons)
         return __inner
 
-    if len(args) == 1 and isinstance(args[0], Callable):
+    if len(args) == 1 and callable(args[0]):
         return create_shared_task(**kwargs)(args[0])
     return create_shared_task(*args, **kwargs)

+ 3 - 2
celery/app/amqp.py

@@ -21,6 +21,7 @@ from kombu.utils.functional import maybe_list
 from celery import signals
 from celery.five import items, string_t
 from celery.utils.text import indent as textindent
+from celery.utils.timeutils import to_utc
 
 from . import app_or_default
 from . import routes as _routes
@@ -249,12 +250,12 @@ class TaskProducer(Producer):
             now = now or self.app.now()
             eta = now + timedelta(seconds=countdown)
             if self.utc:
-                eta = eta.replace(tzinfo=self.app.timezone)
+                eta = to_utc(eta).astimezone(self.app.timezone)
         if isinstance(expires, (int, float)):
             now = now or self.app.now()
             expires = now + timedelta(seconds=expires)
             if self.utc:
-                expires = expires.replace(tzinfo=self.app.timezone)
+                expires = to_utc(expires).astimezone(self.app.timezone)
         eta = eta and eta.isoformat()
         expires = expires and expires.isoformat()
 

+ 9 - 6
celery/app/base.py

@@ -12,7 +12,7 @@ import os
 import threading
 import warnings
 
-from collections import Callable, defaultdict, deque
+from collections import defaultdict, deque
 from contextlib import contextmanager
 from copy import deepcopy
 from operator import attrgetter
@@ -213,11 +213,14 @@ class Celery(object):
 
             return _create_task_cls
 
-        if len(args) == 1 and isinstance(args[0], Callable):
-            return inner_create_task_cls(**opts)(*args)
+        if len(args) == 1:
+            if callable(args[0]):
+                return inner_create_task_cls(**opts)(*args)
+            raise TypeError('argument 1 to @task() must be a callable')
         if args:
             raise TypeError(
-                'task() takes no arguments (%s given)' % (len(args, )))
+                '@task() takes exactly 1 argument ({0} given)'.format(
+                    sum([len(args), len(opts)])))
         return inner_create_task_cls(**opts)
 
     def _task_from_fun(self, fun, **options):
@@ -252,7 +255,7 @@ class Celery(object):
                     task.bind(self)
 
     def add_defaults(self, fun):
-        if not isinstance(fun, Callable):
+        if not callable(fun):
             d, fun = fun, lambda: d
         if self.configured:
             return self.conf.add_defaults(fun())
@@ -290,7 +293,7 @@ class Celery(object):
 
     def _autodiscover_tasks(self, packages, related_name='tasks', **kwargs):
         # argument may be lazy
-        packages = packages() if isinstance(packages, Callable) else packages
+        packages = packages() if callable(packages) else packages
         self.loader.autodiscover_tasks(packages, related_name)
 
     def send_task(self, name, args=None, kwargs=None, countdown=None,

+ 9 - 1
celery/app/builtins.py

@@ -222,7 +222,10 @@ def add_group_task(app):
 
 @shared_task
 def add_chain_task(app):
-    from celery.canvas import Signature, chain, chord, group, maybe_signature
+    from celery.canvas import (
+        Signature, chain, chord, group, maybe_signature, maybe_unroll_group,
+    )
+
     _app = app
 
     class Chain(app.Task):
@@ -244,10 +247,13 @@ def add_chain_task(app):
                 res = task.freeze()
                 i += 1
 
+                if isinstance(task, group):
+                    task = maybe_unroll_group(task)
                 if isinstance(task, chain):
                     # splice the chain
                     steps.extendleft(reversed(task.tasks))
                     continue
+
                 elif isinstance(task, group) and steps and \
                         not isinstance(steps[0], group):
                     # automatically upgrade group(..) | s to chord(group, s)
@@ -271,6 +277,8 @@ def add_chain_task(app):
                     tasks.append(task)
                 prev_task, prev_res = task, res
 
+            print(tasks)
+
             return tasks, results
 
         def apply_async(self, args=(), kwargs={}, group_id=None, chord=None,

+ 13 - 0
celery/app/task.py

@@ -313,6 +313,14 @@ class Task(object):
     #: :setting:`CELERY_ACKS_LATE` setting.
     acks_late = None
 
+    #: List/tuple of expected exceptions.
+    #:
+    #: These are errors that are expected in normal operation
+    #: and that should not be regarded as a real error by the worker.
+    #: Currently this means that the state will be updated to an error
+    #: state, but the worker will not log the event as an error.
+    throws = ()
+
     #: Default task expiry time.
     expires = None
 
@@ -761,6 +769,11 @@ class Task(object):
         from celery import xstarmap
         return xstarmap(self.s(), it, app=self.app)
 
+    def send_event(self, type_, **fields):
+        req = self.request
+        with self.app.events.default_dispatcher(hostname=req.hostname) as d:
+            return d.send(type_, uuid=req.id, **fields)
+
     def update_state(self, task_id=None, state=None, meta=None):
         """Update task state.
 

+ 1 - 1
celery/app/trace.py

@@ -194,7 +194,7 @@ def build_tracer(name, task, loader=None, hostname=None, store_errors=True,
     signature = canvas.maybe_signature  # maybe_ does not clone if already
 
     def trace_task(uuid, args, kwargs, request=None):
-        R = I = None
+        R = I = retval = state = None
         kwargs = kwdict(kwargs)
         try:
             push_task(task)

+ 19 - 7
celery/backends/base.py

@@ -324,11 +324,23 @@ BaseDictBackend = BaseBackend  # XXX compat
 
 
 class KeyValueStoreBackend(BaseBackend):
-    task_keyprefix = ensure_bytes('celery-task-meta-')
-    group_keyprefix = ensure_bytes('celery-taskset-meta-')
-    chord_keyprefix = ensure_bytes('chord-unlock-')
+    key_t = ensure_bytes
+    task_keyprefix = 'celery-task-meta-'
+    group_keyprefix = 'celery-taskset-meta-'
+    chord_keyprefix = 'chord-unlock-'
     implements_incr = False
 
+    def __init__(self, *args, **kwargs):
+        if hasattr(self.key_t, '__func__'):
+            self.key_t = self.key_t.__func__  # remove binding
+        self._encode_prefixes()
+        super(KeyValueStoreBackend, self).__init__(*args, **kwargs)
+
+    def _encode_prefixes(self):
+        self.task_keyprefix = self.key_t(self.task_keyprefix)
+        self.group_keyprefix = self.key_t(self.group_keyprefix)
+        self.chord_keyprefix = self.key_t(self.chord_keyprefix)
+
     def get(self, key):
         raise NotImplementedError('Must implement the get method.')
 
@@ -349,19 +361,19 @@ class KeyValueStoreBackend(BaseBackend):
 
     def get_key_for_task(self, task_id):
         """Get the cache key for a task by id."""
-        return self.task_keyprefix + ensure_bytes(task_id)
+        return self.task_keyprefix + self.key_t(task_id)
 
     def get_key_for_group(self, group_id):
         """Get the cache key for a group by id."""
-        return self.group_keyprefix + ensure_bytes(group_id)
+        return self.group_keyprefix + self.key_t(group_id)
 
     def get_key_for_chord(self, group_id):
         """Get the cache key for the chord waiting on group with given id."""
-        return self.chord_keyprefix + ensure_bytes(group_id)
+        return self.chord_keyprefix + self.key_t(group_id)
 
     def _strip_prefix(self, key):
         """Takes bytes, emits string."""
-        key = ensure_bytes(key)
+        key = self.key_t(key)
         for prefix in self.task_keyprefix, self.group_keyprefix:
             if key.startswith(prefix):
                 return bytes_to_str(key[len(prefix):])

+ 24 - 13
celery/backends/cache.py

@@ -8,7 +8,10 @@
 """
 from __future__ import absolute_import
 
+import sys
+
 from kombu.utils import cached_property
+from kombu.utils.encoding import bytes_to_str, ensure_bytes
 
 from celery.exceptions import ImproperlyConfigured
 from celery.utils.functional import LRUCache
@@ -19,6 +22,8 @@ __all__ = ['CacheBackend']
 
 _imp = [None]
 
+PY3 = sys.version_info[0] == 3
+
 REQUIRES_BACKEND = """\
 The memcached backend requires either pylibmc or python-memcached.\
 """
@@ -31,7 +36,7 @@ Please use one of the following backends instead: {1}\
 
 def import_best_memcache():
     if _imp[0] is None:
-        is_pylibmc = False
+        is_pylibmc, memcache_key_t = False, ensure_bytes
         try:
             import pylibmc as memcache
             is_pylibmc = True
@@ -40,17 +45,22 @@ def import_best_memcache():
                 import memcache  # noqa
             except ImportError:
                 raise ImproperlyConfigured(REQUIRES_BACKEND)
-        _imp[0] = (is_pylibmc, memcache)
+        if PY3:
+            memcache_key_t = bytes_to_str
+        _imp[0] = (is_pylibmc, memcache, memcache_key_t)
     return _imp[0]
 
 
 def get_best_memcache(*args, **kwargs):
-    behaviors = kwargs.pop('behaviors', None)
-    is_pylibmc, memcache = import_best_memcache()
-    client = memcache.Client(*args, **kwargs)
-    if is_pylibmc and behaviors is not None:
-        client.behaviors = behaviors
-    return client
+    is_pylibmc, memcache, key_t = import_best_memcache()
+    Client = _Client = memcache.Client
+
+    if not is_pylibmc:
+        def Client(*args, **kwargs):  # noqa
+            kwargs.pop('behaviors', None)
+            return _Client(*args, **kwargs)
+
+    return Client, key_t
 
 
 class DummyClient(object):
@@ -75,10 +85,10 @@ class DummyClient(object):
         return self.cache.incr(key, delta)
 
 
-backends = {'memcache': lambda: get_best_memcache,
-            'memcached': lambda: get_best_memcache,
-            'pylibmc': lambda: get_best_memcache,
-            'memory': lambda: DummyClient}
+backends = {'memcache': get_best_memcache,
+            'memcached': get_best_memcache,
+            'pylibmc': get_best_memcache,
+            'memory': lambda: (DummyClient, ensure_bytes)}
 
 
 class CacheBackend(KeyValueStoreBackend):
@@ -100,10 +110,11 @@ class CacheBackend(KeyValueStoreBackend):
             self.servers = servers.rstrip('/').split(';')
         self.expires = self.prepare_expires(expires, type=int)
         try:
-            self.Client = backends[self.backend]()
+            self.Client, self.key_t = backends[self.backend]()
         except KeyError:
             raise ImproperlyConfigured(UNKNOWN_BACKEND.format(
                 self.backend, ', '.join(backends)))
+        self._encode_prefixes()  # rencode the keyprefixes
 
     def get(self, key):
         return self.client.get(key)

+ 1 - 1
celery/backends/database/models.py

@@ -57,7 +57,7 @@ class TaskSet(ResultModelBase):
     id = sa.Column(sa.Integer, sa.Sequence('taskset_id_sequence'),
                    autoincrement=True, primary_key=True)
     taskset_id = sa.Column(sa.String(255), unique=True)
-    result = sa.Column(sa.PickleType, nullable=True)
+    result = sa.Column(PickleType, nullable=True)
     date_done = sa.Column(sa.DateTime, default=datetime.utcnow,
                           nullable=True)
 

+ 2 - 10
celery/bin/amqp.py

@@ -12,7 +12,6 @@ import sys
 import shlex
 import pprint
 
-from collections import Callable
 from functools import partial
 from itertools import count
 
@@ -68,14 +67,7 @@ class Spec(object):
         self.returns = kwargs.get('returns')
 
     def coerce(self, index, value):
-        """Coerce value for argument at index.
-
-        E.g. if :attr:`args` is `[('is_active', bool)]`:
-
-            >>> coerce(0, 'False')
-            False
-
-        """
+        """Coerce value for argument at index."""
         arg_info = self.args[index]
         arg_type = arg_info[1]
         # Might be a custom way to coerce the string value,
@@ -99,7 +91,7 @@ class Spec(object):
         """Format the return value of this command in a human-friendly way."""
         if not self.returns:
             return 'ok.' if response is None else response
-        if isinstance(self.returns, Callable):
+        if callable(self.returns):
             return self.returns(response)
         return self.returns.format(response)
 

+ 2 - 1
celery/bin/beat.py

@@ -65,7 +65,8 @@ class beat(Command):
 
     def run(self, detach=False, logfile=None, pidfile=None, uid=None,
             gid=None, umask=None, working_directory=None, **kwargs):
-        maybe_drop_privileges(uid=uid, gid=gid)
+        if not detach:
+            maybe_drop_privileges(uid=uid, gid=gid)
         workdir = working_directory
         kwargs.pop('app', None)
         beat = partial(self.app.Beat,

+ 18 - 0
celery/canvas.py

@@ -82,6 +82,22 @@ class _getitem_property(object):
         self._path(obj)[self.key] = value
 
 
+def maybe_unroll_group(g):
+    """Unroll group with only one member."""
+    # Issue #1656
+    try:
+        size = len(g.tasks)
+    except TypeError:
+        try:
+            size = g.tasks.__length_hint__()
+        except (AttributeError, TypeError):
+            pass
+        else:
+            return list(g.tasks)[0] if size == 1 else g
+    else:
+        return g.tasks[0] if size == 1 else g
+
+
 class Signature(dict):
     """Class that wraps the arguments and execution options
     for a single task invocation.
@@ -240,6 +256,8 @@ class Signature(dict):
         )))
 
     def __or__(self, other):
+        if isinstance(other, group):
+            other = maybe_unroll_group(other)
         if not isinstance(self, chain) and isinstance(other, chain):
             return chain((self, ) + other.tasks, app=self._app)
         elif isinstance(other, chain):

+ 16 - 5
celery/concurrency/asynpool.py

@@ -1020,12 +1020,16 @@ class AsynPool(_pool.Pool):
     def _stop_task_handler(task_handler):
         """Called at shutdown to tell processes that we are shutting down."""
         for proc in task_handler.pool:
-            setblocking(proc.inq._writer, 1)
             try:
-                proc.inq.put(None)
-            except OSError as exc:
-                if get_errno(exc) != errno.EBADF:
-                    raise
+                setblocking(proc.inq._writer, 1)
+            except (OSError, IOError):
+                pass
+            else:
+                try:
+                    proc.inq.put(None)
+                except OSError as exc:
+                    if get_errno(exc) != errno.EBADF:
+                        raise
 
     def create_result_handler(self):
         return super(AsynPool, self).create_result_handler(
@@ -1075,6 +1079,13 @@ class AsynPool(_pool.Pool):
                 try:
                     task = resq.recv()
                 except (OSError, IOError, EOFError) as exc:
+                    if get_errno(exc) == errno.EINTR:
+                        continue
+                    elif get_errno(exc) == errno.EAGAIN:
+                        break
+                    else:
+                        debug('got %r while flushing process %r',
+                              exc, proc, exc_info=1)
                     if get_errno(exc) not in UNAVAIL:
                         debug('got %r while flushing process %r',
                               exc, proc, exc_info=1)

+ 4 - 4
celery/events/__init__.py

@@ -12,7 +12,6 @@ from __future__ import absolute_import
 
 import os
 import time
-import socket
 import threading
 import warnings
 
@@ -27,7 +26,7 @@ from kombu.mixins import ConsumerMixin
 from kombu.utils import cached_property
 
 from celery.app import app_or_default
-from celery.utils import uuid
+from celery.utils import anon_nodename, uuid
 from celery.utils.functional import dictfilter
 from celery.utils.timeutils import adjust_timestamp, utcoffset, maybe_s_to_ms
 
@@ -90,7 +89,8 @@ class EventDispatcher(object):
     :param connection: Connection to the broker.
 
     :keyword hostname: Hostname to identify ourselves as,
-        by default uses the hostname returned by :func:`socket.gethostname`.
+        by default uses the hostname returned by
+        :func:`~celery.utils.anon_nodename`.
 
     :keyword groups: List of groups to send events for.  :meth:`send` will
         ignore send requests to groups not in this list.
@@ -126,7 +126,7 @@ class EventDispatcher(object):
         self.app = app_or_default(app or self.app)
         self.connection = connection
         self.channel = channel
-        self.hostname = hostname or socket.gethostname()
+        self.hostname = hostname or anon_nodename()
         self.buffer_while_offline = buffer_while_offline
         self.mutex = threading.Lock()
         self.producer = None

+ 6 - 2
celery/events/state.py

@@ -18,6 +18,7 @@
 """
 from __future__ import absolute_import
 
+import sys
 import threading
 
 from datetime import datetime
@@ -48,6 +49,8 @@ Substantial drift from %s may mean clocks are out of sync.  Current drift is
 %s seconds.  [orig: %s recv: %s]
 """
 
+CAN_KWDICT = sys.version_info >= (2, 6, 5)
+
 logger = get_logger(__name__)
 warn = logger.warning
 
@@ -352,7 +355,7 @@ class State(object):
             worker, created = self.get_or_create_worker(hostname)
             handler = getattr(worker, 'on_' + type, None)
             if handler:
-                handler(**fields)
+                handler(**(fields if CAN_KWDICT else kwdict(fields)))
             return worker, created
 
     def task_event(self, type, fields, timetuple=timetuple):
@@ -436,7 +439,8 @@ class State(object):
 
     def task_types(self):
         """Return a list of all seen task types."""
-        return list(sorted(set(task.name for task in values(self.tasks))))
+        return list(sorted(set(task.name for task in values(self.tasks)
+                               if task.name is not None)))
 
     def alive_workers(self):
         """Return a list of (seemingly) alive workers."""

+ 21 - 15
celery/five.py

@@ -11,13 +11,15 @@
 from __future__ import absolute_import
 
 __all__ = ['Counter', 'reload', 'UserList', 'UserDict', 'Queue', 'Empty',
-           'zip_longest', 'StringIO', 'BytesIO', 'map', 'string', 'string_t',
+           'zip_longest', 'map', 'string', 'string_t',
            'long_t', 'text_t', 'range', 'int_types', 'items', 'keys', 'values',
            'nextfun', 'reraise', 'WhateverIO', 'with_metaclass',
            'OrderedDict', 'THREAD_TIMEOUT_MAX', 'format_d',
            'class_property', 'reclassmethod', 'create_module',
            'recreate_module', 'monotonic']
 
+import io
+
 try:
     from collections import Counter
 except ImportError:  # pragma: no cover
@@ -53,7 +55,6 @@ if PY3:  # pragma: no cover
 
     from queue import Queue, Empty
     from itertools import zip_longest
-    from io import StringIO, BytesIO
 
     map = map
     string = str
@@ -62,6 +63,7 @@ if PY3:  # pragma: no cover
     text_t = str
     range = range
     int_types = (int, )
+    _byte_t = bytes
 
     open_fqdn = 'builtins.open'
 
@@ -84,24 +86,17 @@ if PY3:  # pragma: no cover
             raise value.with_traceback(tb)
         raise value
 
-    class WhateverIO(StringIO):
-
-        def write(self, data):
-            if isinstance(data, bytes):
-                data = data.encode()
-            StringIO.write(self, data)
-
 else:
     import __builtin__ as builtins  # noqa
     from Queue import Queue, Empty  # noqa
     from itertools import imap as map, izip_longest as zip_longest  # noqa
-    from StringIO import StringIO   # noqa
     string = unicode                # noqa
     string_t = basestring           # noqa
-    text_t = unicode
+    text_t = unicode                # noqa
     long_t = long                   # noqa
-    range = xrange
-    int_types = (int, long)
+    range = xrange                  # noqa
+    int_types = (int, long)         # noqa
+    _byte_t = (str, bytes)          # noqa
 
     open_fqdn = '__builtin__.open'
 
@@ -131,8 +126,6 @@ else:
 
     exec_("""def reraise(tp, value, tb=None): raise tp, value, tb""")
 
-    BytesIO = WhateverIO = StringIO         # noqa
-
 
 def with_metaclass(Type, skip_attrs=set(['__dict__', '__weakref__'])):
     """Class decorator to set metaclass.
@@ -385,3 +378,16 @@ def get_origins(defs):
     for module, attrs in items(defs):
         origins.update(dict((attr, module) for attr in attrs))
     return origins
+
+
+_SIO_write = io.StringIO.write
+_SIO_init = io.StringIO.__init__
+
+
+class WhateverIO(io.StringIO):
+
+    def __init__(self, v=None, *a, **kw):
+        _SIO_init(self, v.decode() if isinstance(v, _byte_t) else v, *a, **kw)
+
+    def write(self, data):
+        _SIO_write(self, data.decode() if isinstance(data, _byte_t) else data)

+ 11 - 0
celery/fixups/django.py

@@ -1,5 +1,6 @@
 from __future__ import absolute_import
 
+import io
 import os
 import sys
 import warnings
@@ -132,12 +133,22 @@ class DjangoWorkerFixup(object):
             _oracle_database_errors
         )
 
+    def validate_models(self):
+        from django.core.management.validation import get_validation_errors
+        s = io.StringIO()
+        num_errors = get_validation_errors(s, None)
+        if num_errors:
+            raise RuntimeError(
+                'One or more Django models did not validate:\n{0}'.format(
+                    s.getvalue()))
+
     def install(self):
         signals.beat_embedded_init.connect(self.close_database)
         signals.worker_ready.connect(self.on_worker_ready)
         signals.task_prerun.connect(self.on_task_prerun)
         signals.task_postrun.connect(self.on_task_postrun)
         signals.worker_process_init.connect(self.on_worker_process_init)
+        self.validate_models()
         self.close_database()
         self.close_cache()
         return self

+ 8 - 3
celery/local.py

@@ -13,13 +13,16 @@
 from __future__ import absolute_import
 
 import importlib
+import sys
 
-from .five import long_t, string
+from .five import string
 
 __all__ = ['Proxy', 'PromiseProxy', 'try_import', 'maybe_evaluate']
 
 __module__ = __name__  # used by Proxy class body
 
+PY3 = sys.version_info[0] == 3
+
 
 def _default_cls_attr(name, type_, cls_value):
     # Proxy uses properties to forward the standard
@@ -160,7 +163,6 @@ class Proxy(object):
     __ne__ = lambda x, o: x._get_current_object() != o
     __gt__ = lambda x, o: x._get_current_object() > o
     __ge__ = lambda x, o: x._get_current_object() >= o
-    __cmp__ = lambda x, o: cmp(x._get_current_object(), o)
     __hash__ = lambda x: hash(x._get_current_object())
     __call__ = lambda x, *a, **kw: x._get_current_object()(*a, **kw)
     __len__ = lambda x: len(x._get_current_object())
@@ -188,7 +190,6 @@ class Proxy(object):
     __invert__ = lambda x: ~(x._get_current_object())
     __complex__ = lambda x: complex(x._get_current_object())
     __int__ = lambda x: int(x._get_current_object())
-    __long__ = lambda x: long_t(x._get_current_object())
     __float__ = lambda x: float(x._get_current_object())
     __oct__ = lambda x: oct(x._get_current_object())
     __hex__ = lambda x: hex(x._get_current_object())
@@ -198,6 +199,10 @@ class Proxy(object):
     __exit__ = lambda x, *a, **kw: x._get_current_object().__exit__(*a, **kw)
     __reduce__ = lambda x: x._get_current_object().__reduce__()
 
+    if not PY3:
+        __cmp__ = lambda x, o: cmp(x._get_current_object(), o)  # noqa
+        __long__ = lambda x: long(x._get_current_object())      # noqa
+
 
 class PromiseProxy(Proxy):
     """This is a proxy to an object that has not yet been evaulated.

+ 1 - 1
celery/task/http.py

@@ -46,7 +46,7 @@ else:
         keys/values encoded."""
         return dict(
             (k.encode('utf-8'),
-             v.encode('utf-8') if isinstance(v, unicode) else v)
+             v.encode('utf-8') if isinstance(v, unicode) else v)  # noqa
             for k, v in tup)
 
 

+ 30 - 0
celery/tests/app/test_amqp.py

@@ -1,5 +1,9 @@
 from __future__ import absolute_import
 
+import datetime
+
+import pytz
+
 from kombu import Exchange, Queue
 
 from celery.app.amqp import Queues, TaskPublisher
@@ -47,6 +51,32 @@ class test_TaskProducer(AppCase):
         self.assertEqual(prod.publish.call_args[1]['exchange'], 'yyy')
         self.assertEqual(prod.publish.call_args[1]['routing_key'], 'zzz')
 
+    def test_publish_with_countdown(self):
+        prod = self.app.amqp.TaskProducer(Mock())
+        prod.channel.connection.client.declared_entities = set()
+        prod.publish = Mock()
+        now = datetime.datetime(2013, 11, 26, 16, 48, 46)
+        prod.publish_task('tasks.add', (1, 1), {}, retry=False,
+                          countdown=10, now=now)
+        self.assertEqual(
+            prod.publish.call_args[0][0]['eta'],
+            '2013-11-26T16:48:56+00:00',
+        )
+
+    def test_publish_with_countdown_and_timezone(self):
+        # use timezone with fixed offset to be sure it won't be changed
+        self.app.conf.CELERY_TIMEZONE = pytz.FixedOffset(120)
+        prod = self.app.amqp.TaskProducer(Mock())
+        prod.channel.connection.client.declared_entities = set()
+        prod.publish = Mock()
+        now = datetime.datetime(2013, 11, 26, 16, 48, 46)
+        prod.publish_task('tasks.add', (2, 2), {}, retry=False,
+                          countdown=20, now=now)
+        self.assertEqual(
+            prod.publish.call_args[0][0]['eta'],
+            '2013-11-26T18:49:06+02:00',
+        )
+
     def test_event_dispatcher(self):
         prod = self.app.amqp.TaskProducer(Mock())
         self.assertTrue(prod.event_dispatcher)

+ 17 - 9
celery/tests/backends/test_cache.py

@@ -18,6 +18,8 @@ from celery.tests.case import (
     AppCase, Mock, mask_modules, patch, reset_modules,
 )
 
+PY3 = sys.version_info[0] == 3
+
 
 class SomeClass(object):
 
@@ -122,10 +124,15 @@ class MyMemcachedStringEncodingError(Exception):
 class MemcachedClient(DummyClient):
 
     def set(self, key, value, *args, **kwargs):
-        if isinstance(key, text_t):
+        if PY3:
+            key_t, must_be, not_be, cod = bytes, 'string', 'bytes', 'decode'
+        else:
+            key_t, must_be, not_be, cod = text_t, 'bytes', 'string', 'encode'
+        if isinstance(key, key_t):
             raise MyMemcachedStringEncodingError(
-                'Keys must be bytes, not string.  Convert your '
-                'strings using mystring.encode(charset)!')
+                'Keys must be {0}, not {1}.  Convert your '
+                'strings using mystring.{2}(charset)!'.format(
+                    must_be, not_be, cod))
         return super(MemcachedClient, self).set(key, value, *args, **kwargs)
 
 
@@ -164,7 +171,7 @@ class test_get_best_memcache(AppCase, MockCacheMixin):
             with reset_modules('celery.backends.cache'):
                 from celery.backends import cache
                 cache._imp = [None]
-                self.assertEqual(cache.get_best_memcache().__module__,
+                self.assertEqual(cache.get_best_memcache()[0].__module__,
                                  'pylibmc')
 
     def test_memcache(self):
@@ -173,7 +180,7 @@ class test_get_best_memcache(AppCase, MockCacheMixin):
                 with mask_modules('pylibmc'):
                     from celery.backends import cache
                     cache._imp = [None]
-                    self.assertEqual(cache.get_best_memcache().__module__,
+                    self.assertEqual(cache.get_best_memcache()[0]().__module__,
                                      'memcache')
 
     def test_no_implementations(self):
@@ -189,14 +196,15 @@ class test_get_best_memcache(AppCase, MockCacheMixin):
             with reset_modules('celery.backends.cache'):
                 from celery.backends import cache
                 cache._imp = [None]
-                cache.get_best_memcache(behaviors={'foo': 'bar'})
+                cache.get_best_memcache()[0](behaviors={'foo': 'bar'})
                 self.assertTrue(cache._imp[0])
-                cache.get_best_memcache()
+                cache.get_best_memcache()[0]()
 
     def test_backends(self):
         from celery.backends.cache import backends
-        for name, fun in items(backends):
-            self.assertTrue(fun())
+        with self.mock_memcache():
+            for name, fun in items(backends):
+                self.assertTrue(fun())
 
 
 class test_memcache_key(AppCase, MockCacheMixin):

+ 1 - 1
celery/tests/case.py

@@ -534,7 +534,7 @@ def mask_modules(*modnames):
         ...     try:
         ...         import sys
         ...     except ImportError:
-        ...         print 'sys not found'
+        ...         print('sys not found')
         sys not found
 
         >>> import sys  # noqa

+ 2 - 2
celery/tests/compat_modules/test_http.py

@@ -11,7 +11,7 @@ except ImportError:  # py3k
 from anyjson import dumps
 from kombu.utils.encoding import from_utf8
 
-from celery.five import StringIO, items
+from celery.five import WhateverIO, items
 from celery.task import http
 from celery.tests.case import AppCase, Case
 
@@ -24,7 +24,7 @@ def mock_urlopen(response_method):
     @wraps(urlopen)
     def _mocked(url, *args, **kwargs):
         response_data, headers = response_method(url)
-        return addinfourl(StringIO(response_data), headers, url)
+        return addinfourl(WhateverIO(response_data), headers, url)
 
     http.urlopen = _mocked
 

+ 4 - 3
celery/tests/fixups/test_django.py

@@ -21,10 +21,11 @@ class FixupCase(AppCase):
 
     @contextmanager
     def fixup_context(self, app):
-        with patch('celery.fixups.django.import_module') as import_module:
+        with patch('celery.fixups.django.DjangoWorkerFixup.validate_models'):
             with patch('celery.fixups.django.symbol_by_name') as symbyname:
-                f = self.Fixup(app)
-                yield f, import_module, symbyname
+                with patch('celery.fixups.django.import_module') as impmod:
+                    f = self.Fixup(app)
+                    yield f, impmod, symbyname
 
 
 class test_DjangoFixup(FixupCase):

+ 1 - 3
celery/tests/tasks/test_context.py

@@ -1,8 +1,6 @@
 # -*- coding: utf-8 -*-'
 from __future__ import absolute_import
 
-from collections import Callable
-
 from celery.app.task import Context
 from celery.tests.case import AppCase
 
@@ -15,7 +13,7 @@ def get_context_as_dict(ctx, getter=getattr):
         if attr_name.startswith('_'):
             continue   # Ignore pseudo-private attributes
         attr = getter(ctx, attr_name)
-        if isinstance(attr, Callable):
+        if callable(attr):
             continue   # Ignore methods and other non-trivial types
         defaults[attr_name] = attr
     return defaults

+ 3 - 3
celery/tests/tasks/test_tasks.py

@@ -1,6 +1,5 @@
 from __future__ import absolute_import
 
-from collections import Callable
 from datetime import datetime, timedelta
 
 from kombu import Queue
@@ -267,8 +266,9 @@ class test_tasks(TasksCase):
     def test_regular_task(self):
         self.assertIsInstance(self.mytask, Task)
         self.assertTrue(self.mytask.run())
-        self.assertTrue(isinstance(self.mytask, Callable),
-                        'Task class is callable()')
+        self.assertTrue(
+            callable(self.mytask), 'Task class is callable()',
+        )
         self.assertTrue(self.mytask(), 'Task class runs run() when called')
 
         with self.app.connection_or_acquire() as conn:

+ 4 - 1
celery/tests/utils/test_local.py

@@ -11,6 +11,8 @@ from celery.local import (
 )
 from celery.tests.case import Case, Mock
 
+PY3 = sys.version_info[0] == 3
+
 
 class test_try_import(Case):
 
@@ -258,7 +260,8 @@ class test_Proxy(Case):
         x = Proxy(lambda: 10)
         self.assertEqual(type(x.__float__()), float)
         self.assertEqual(type(x.__int__()), int)
-        self.assertEqual(type(x.__long__()), long_t)
+        if not PY3:
+            self.assertEqual(type(x.__long__()), long_t)
         self.assertTrue(hex(x))
         self.assertTrue(oct(x))
 

+ 1 - 0
celery/tests/worker/test_request.py

@@ -762,6 +762,7 @@ class test_Request(AppCase):
             'name': job.name,
             'id': job.id,
             'exc': 'FOOBARBAZ',
+            'description': 'raised unexpected',
             'traceback': 'foobarbaz',
         }
         self.assertTrue(x)

+ 2 - 1
celery/tests/worker/test_worker.py

@@ -27,7 +27,7 @@ from celery.utils import worker_direct
 from celery.utils.serialization import pickle
 from celery.utils.timer2 import Timer
 
-from celery.tests.case import AppCase, Mock, patch, restore_logging
+from celery.tests.case import AppCase, Mock, SkipTest, patch, restore_logging
 
 
 def MockStep(step=None):
@@ -760,6 +760,7 @@ class test_WorkController(AppCase):
             ws.send.assert_called_with(sender=self.worker)
 
     def test_process_shutdown_on_worker_shutdown(self):
+        raise SkipTest('unstable test')
         from celery.concurrency.prefork import process_destructor
         from celery.concurrency.asynpool import Worker
         with patch('celery.signals.worker_process_shutdown') as ws:

+ 8 - 2
celery/utils/__init__.py

@@ -9,6 +9,7 @@
 from __future__ import absolute_import, print_function
 
 import os
+import socket
 import sys
 import traceback
 import warnings
@@ -21,7 +22,7 @@ from pprint import pprint
 from kombu.entity import Exchange, Queue
 
 from celery.exceptions import CPendingDeprecationWarning, CDeprecationWarning
-from celery.five import StringIO, items, reraise, string_t
+from celery.five import WhateverIO, items, reraise, string_t
 
 __all__ = ['worker_direct', 'warn_deprecated', 'deprecated', 'lpmerge',
            'is_iterable', 'isatty', 'cry', 'maybe_reraise', 'strtobool',
@@ -149,7 +150,7 @@ def cry(out=None, sepchr='=', seplen=49):  # pragma: no cover
     taken from https://gist.github.com/737056."""
     import threading
 
-    out = StringIO() if out is None else out
+    out = WhateverIO() if out is None else out
     P = partial(print, file=out)
 
     # get a map of threads by their ID so we can print their names
@@ -270,6 +271,11 @@ def nodename(name, hostname):
     return NODENAME_SEP.join((name, hostname))
 
 
+def anon_nodename(hostname=None, prefix='gen'):
+    return nodename(''.join([prefix, str(os.getpid())]),
+                    hostname or socket.gethostname())
+
+
 def nodesplit(nodename):
     """Split node name into tuple of name/hostname."""
     parts = nodename.split(NODENAME_SEP, 1)

+ 12 - 10
celery/utils/dispatch/saferef.py

@@ -7,13 +7,14 @@ aren't handled by the core weakref module).
 """
 from __future__ import absolute_import
 
-import weakref
+import sys
 import traceback
-
-from collections import Callable
+import weakref
 
 __all__ = ['safe_ref']
 
+PY3 = sys.version_info[0] == 3
+
 
 def safe_ref(target, on_delete=None):  # pragma: no cover
     """Return a *safe* weak reference to a callable target
@@ -35,7 +36,7 @@ def safe_ref(target, on_delete=None):  # pragma: no cover
             don't know how to create reference""".format(target)
         return get_bound_method_weakref(target=target,
                                         on_delete=on_delete)
-    if isinstance(on_delete, Callable):
+    if callable(on_delete):
         return weakref.ref(target, on_delete)
     else:
         return weakref.ref(target)
@@ -140,7 +141,7 @@ class BoundMethodWeakref(object):  # pragma: no cover
                 pass
             for function in methods:
                 try:
-                    if isinstance(function, Callable):
+                    if callable(function):
                         function(self)
                 except Exception as exc:
                     try:
@@ -180,11 +181,12 @@ class BoundMethodWeakref(object):  # pragma: no cover
         return self() is not None
     __nonzero__ = __bool__  # py2
 
-    def __cmp__(self, other):
-        """Compare with another reference"""
-        if not isinstance(other, self.__class__):
-            return cmp(self.__class__, type(other))
-        return cmp(self.key, other.key)
+    if not PY3:
+        def __cmp__(self, other):
+            """Compare with another reference"""
+            if not isinstance(other, self.__class__):
+                return cmp(self.__class__, type(other))  # noqa
+            return cmp(self.key, other.key)              # noqa
 
     def __call__(self):
         """Return a strong reference to the bound method

+ 1 - 2
celery/utils/dispatch/signal.py

@@ -3,7 +3,6 @@
 from __future__ import absolute_import
 
 import weakref
-from collections import Callable
 from . import saferef
 from celery.five import range
 
@@ -95,7 +94,7 @@ class Signal(object):  # pragma: no cover
 
             return _connect_signal
 
-        if args and isinstance(args[0], Callable):
+        if args and callable(args[0]):
             return _handle_options(*args[1:], **kwargs)(args[0])
         return _handle_options(*args, **kwargs)
 

+ 3 - 0
celery/utils/functional.py

@@ -289,6 +289,9 @@ class _regen(UserList, list):
     def __reduce__(self):
         return list, (self.data, )
 
+    def __length_hint__(self):
+        return self.__it.__length_hint__()
+
     @cached_property
     def data(self):
         return list(self.__it)

+ 50 - 50
celery/worker/consumer.py

@@ -120,7 +120,7 @@ MINGLE_GET_FIELDS = itemgetter('clock', 'revoked')
 
 def dump_body(m, body):
     if isinstance(body, buffer_t):
-        body = bytes_t(buffer)
+        body = bytes_t(body)
     return '{0} ({1}b)'.format(truncate(safe_repr(body), 1024),
                                len(m.body))
 
@@ -536,20 +536,6 @@ class Heart(bootsteps.StartStopStep):
     shutdown = stop
 
 
-class Control(bootsteps.StartStopStep):
-    requires = (Events, )
-
-    def __init__(self, c, **kwargs):
-        self.is_green = c.pool is not None and c.pool.is_green
-        self.box = (pidbox.gPidbox if self.is_green else pidbox.Pidbox)(c)
-        self.start = self.box.start
-        self.stop = self.box.stop
-        self.shutdown = self.box.shutdown
-
-    def include_if(self, c):
-        return c.app.conf.CELERY_ENABLE_REMOTE_CONTROL
-
-
 class Tasks(bootsteps.StartStopStep):
     requires = (Events, )
 
@@ -592,9 +578,57 @@ class Agent(bootsteps.StartStopStep):
         return agent
 
 
+class Mingle(bootsteps.StartStopStep):
+    label = 'Mingle'
+    requires = (Events, )
+    compatible_transports = set(['amqp', 'redis'])
+
+    def __init__(self, c, without_mingle=False, **kwargs):
+        self.enabled = not without_mingle and self.compatible_transport(c.app)
+
+    def compatible_transport(self, app):
+        with app.connection() as conn:
+            return conn.transport.driver_type in self.compatible_transports
+
+    def start(self, c):
+        info('mingle: searching for neighbors')
+        I = c.app.control.inspect(timeout=1.0, connection=c.connection)
+        replies = I.hello(c.hostname, revoked._data) or {}
+        replies.pop(c.hostname, None)
+        if replies:
+            info('mingle: sync with %s nodes',
+                 len([reply for reply, value in items(replies) if value]))
+            for reply in values(replies):
+                if reply:
+                    try:
+                        other_clock, other_revoked = MINGLE_GET_FIELDS(reply)
+                    except KeyError:  # reply from pre-3.1 worker
+                        pass
+                    else:
+                        c.app.clock.adjust(other_clock)
+                        revoked.update(other_revoked)
+            info('mingle: sync complete')
+        else:
+            info('mingle: all alone')
+
+
+class Control(bootsteps.StartStopStep):
+    requires = (Mingle, )
+
+    def __init__(self, c, **kwargs):
+        self.is_green = c.pool is not None and c.pool.is_green
+        self.box = (pidbox.gPidbox if self.is_green else pidbox.Pidbox)(c)
+        self.start = self.box.start
+        self.stop = self.box.stop
+        self.shutdown = self.box.shutdown
+
+    def include_if(self, c):
+        return c.app.conf.CELERY_ENABLE_REMOTE_CONTROL
+
+
 class Gossip(bootsteps.ConsumerStep):
     label = 'Gossip'
-    requires = (Events, )
+    requires = (Mingle, )
     _cons_stamp_fields = itemgetter(
         'id', 'clock', 'hostname', 'pid', 'topic', 'action', 'cver',
     )
@@ -747,40 +781,6 @@ class Gossip(bootsteps.ConsumerStep):
             self.clock.forward()
 
 
-class Mingle(bootsteps.StartStopStep):
-    label = 'Mingle'
-    requires = (Gossip, )
-    compatible_transports = set(['amqp', 'redis'])
-
-    def __init__(self, c, without_mingle=False, **kwargs):
-        self.enabled = not without_mingle and self.compatible_transport(c.app)
-
-    def compatible_transport(self, app):
-        with app.connection() as conn:
-            return conn.transport.driver_type in self.compatible_transports
-
-    def start(self, c):
-        info('mingle: searching for neighbors')
-        I = c.app.control.inspect(timeout=1.0, connection=c.connection)
-        replies = I.hello(c.hostname, revoked._data) or {}
-        replies.pop(c.hostname, None)
-        if replies:
-            info('mingle: sync with %s nodes',
-                 len([reply for reply, value in items(replies) if value]))
-            for reply in values(replies):
-                if reply:
-                    try:
-                        other_clock, other_revoked = MINGLE_GET_FIELDS(reply)
-                    except KeyError:  # reply from pre-3.1 worker
-                        pass
-                    else:
-                        c.app.clock.adjust(other_clock)
-                        revoked.update(other_revoked)
-            info('mingle: sync complete')
-        else:
-            info('mingle: all alone')
-
-
 class Evloop(bootsteps.StartStopStep):
     label = 'event loop'
     last = True

+ 3 - 2
celery/worker/control.py

@@ -8,11 +8,12 @@
 """
 from __future__ import absolute_import
 
+import io
 import tempfile
 
 from kombu.utils.encoding import safe_repr
 
-from celery.five import UserDict, items, StringIO
+from celery.five import UserDict, items
 from celery.platforms import signals as _signals
 from celery.utils import timeutils
 from celery.utils.functional import maybe_list
@@ -247,7 +248,7 @@ def memsample(state, **kwargs):  # pragma: no cover
 @Panel.register
 def memdump(state, samples=10, **kwargs):  # pragma: no cover
     from celery.utils.debug import memdump
-    out = StringIO()
+    out = io.StringIO()
     memdump(file=out)
     return out.getvalue()
 

+ 13 - 7
celery/worker/job.py

@@ -101,16 +101,16 @@ class Request(object):
 
     #: Format string used to log task failure.
     error_msg = """\
-        Task %(name)s[%(id)s] raised exception: %(exc)s
+        Task %(name)s[%(id)s] %(description)s: %(exc)s
     """
 
     #: Format string used to log internal error.
     internal_error_msg = """\
-        Task %(name)s[%(id)s] INTERNAL ERROR: %(exc)s
+        Task %(name)s[%(id)s] %(description)s: %(exc)s
     """
 
     ignored_msg = """\
-        Task %(name)s[%(id)s] ignored
+        Task %(name)s[%(id)s] %(description)s
     """
 
     rejected_msg = """\
@@ -437,17 +437,23 @@ class Request(object):
 
     def _log_error(self, einfo, send_failed_event=True):
         einfo.exception = get_pickled_exception(einfo.exception)
+        eobj = einfo.exception
         exception, traceback, exc_info, internal, sargs, skwargs = (
-            safe_repr(einfo.exception),
+            safe_repr(eobj),
             safe_str(einfo.traceback),
             einfo.exc_info,
             einfo.internal,
             safe_repr(self.args),
             safe_repr(self.kwargs),
         )
+        task = self.task
+        if task.throws and isinstance(eobj, task.throws):
+            severity, exc_info = logging.INFO, None
+            description = 'raised expected'
+        else:
+            severity = logging.ERROR
+            description = 'raised unexpected'
         format = self.error_msg
-        description = 'raised exception'
-        severity = logging.ERROR
         if send_failed_event:
             self.send_event(
                 'task-failed', exception=exception, traceback=traceback,
@@ -493,7 +499,7 @@ class Request(object):
                                    'hostname': self.hostname,
                                    'internal': internal}})
 
-        self.task.send_error_email(context, einfo.exception)
+        task.send_error_email(context, einfo.exception)
 
     def acknowledge(self):
         """Acknowledge task."""

+ 1 - 1
docs/includes/introduction.txt

@@ -1,4 +1,4 @@
-:Version: 3.1.5 (Cipater)
+:Version: 3.1.6 (Cipater)
 :Web: http://celeryproject.org/
 :Download: http://pypi.python.org/pypi/celery/
 :Source: http://github.com/celery/celery/

+ 27 - 0
docs/userguide/tasks.rst

@@ -470,6 +470,33 @@ General
     A value of :const:`None` will disable the retry limit and the
     task will retry forever until it succeeds.
 
+.. attribute:: Task.throws
+
+    Optional list of expected error classes that should not be regarded
+    as an actual error.
+
+    Errors in this list will be reported as a failure to the result backend,
+    but the worker will not log the event as an error, and no traceback will
+    be included.
+
+    Example:
+
+    .. code-block:: python
+
+        @task(throws=(KeyError, HttpNotFound)):
+        def get_foo():
+            something()
+
+    Error types:
+
+    - Expected errors (in ``Task.throws``)
+
+        Logged with severity ``INFO``, traceback excluded.
+
+    - Unexpected errors
+
+        Logged with severity ``ERROR``, with traceback included.
+
 .. attribute:: Task.default_retry_delay
 
     Default time in seconds before a retry of the task

+ 4 - 4
docs/userguide/workers.rst

@@ -1088,13 +1088,13 @@ they take a single argument: the current
 From there you have access to the active
 :class:`~celery.worker.consumer.Consumer` if needed.
 
-Here's an example control command that restarts the broker connection:
+Here's an example control command that increments the task prefetch count:
 
 .. code-block:: python
 
     from celery.worker.control import Panel
 
     @Panel.register
-    def reset_connection(state):
-        state.consumer.reset_connection()
-        return {'ok': 'connection reset'}
+    def increase_prefetch_count(state, n=1):
+        state.consumer.qos.increment_eventually(n)
+        return {'ok': 'prefetch count incremented'}

+ 10 - 1
extra/centos/celeryd

@@ -28,7 +28,16 @@
 #
 # Setting `prog` here allows you to symlink this init script, making it easy
 # to run multiple processes on the system.
-prog="$(basename $0)"
+
+# If we're invoked via SysV-style runlevel scripts we need to follow the 
+# link from rcX.d before working out the script name.
+if [[ `dirname $0` == /etc/rc*.d ]]; then
+    target="$(readlink $0)"
+else
+    target=$0
+fi
+
+prog="$(basename $target)"
 
 # Source the centos service helper functions
 source /etc/init.d/functions

+ 2 - 2
requirements/default.txt

@@ -1,3 +1,3 @@
 pytz>dev
-billiard>=3.3.0.8,<3.4
-kombu>=3.0.6,<4.0
+billiard>=3.3.0.10,<3.4
+kombu>=3.0.7,<4.0

+ 2 - 2
setup.cfg

@@ -15,5 +15,5 @@ upload-dir = docs/.build/html
 
 [bdist_rpm]
 requires = pytz >= 2011b
-           billiard >= 3.3.0.8
-           kombu >= 3.0.6
+           billiard >= 3.3.0.10
+           kombu >= 3.0.7

+ 3 - 3
tox.ini

@@ -23,7 +23,7 @@ deps = -r{toxinidir}/requirements/default.txt
        -r{toxinidir}/requirements/test-ci.txt
 commands = {toxinidir}/extra/release/removepyc.sh {toxinidir}
            pip install anyjson
-           nosetests --with-xunit                               \
+           nosetests -v --with-xunit                            \
                      --xunit-file={toxinidir}/nosetests.xml     \
                      --with-coverage3 --cover3-xml              \
                      --cover3-html-dir={toxinidir}/cover        \
@@ -36,7 +36,7 @@ deps = -r{toxinidir}/requirements/default.txt
        -r{toxinidir}/requirements/test-ci.txt
 commands = {toxinidir}/extra/release/removepyc.sh {toxinidir}
            pip install anyjson
-           nosetests --with-xunit                               \
+           nosetests -v --with-xunit                            \
                      --xunit-file={toxinidir}/nosetests.xml     \
                      --with-coverage3 --cover3-xml              \
                      --cover3-html-dir={toxinidir}/cover        \
@@ -49,7 +49,7 @@ deps = -r{toxinidir}/requirements/default.txt
        -r{toxinidir}/requirements/test-ci.txt
 commands = {toxinidir}/extra/release/removepyc.sh {toxinidir}
            pip install anyjson
-           nosetests --with-xunit                               \
+           nosetests -v --with-xunit                            \
                      --xunit-file={toxinidir}/nosetests.xml     \
                      --with-coverage3 --cover3-xml              \
                      --cover3-html-dir={toxinidir}/cover        \