Forráskód Böngészése

Merge branch 'master' into kombuRPC

Ask Solem 12 éve
szülő
commit
4ea4aab5ce
72 módosított fájl, 913 hozzáadás és 556 törlés
  1. 90 1
      Changelog
  2. 12 0
      celery/__main__.py
  3. 1 1
      celery/app/amqp.py
  4. 7 6
      celery/app/builtins.py
  5. 9 3
      celery/app/task.py
  6. 8 1
      celery/apps/worker.py
  7. 2 1
      celery/backends/cache.py
  8. 1 1
      celery/backends/redis.py
  9. 3 6
      celery/beat.py
  10. 4 3
      celery/bin/celeryd_multi.py
  11. 10 8
      celery/canvas.py
  12. 4 1
      celery/concurrency/__init__.py
  13. 1 0
      celery/concurrency/gevent.py
  14. 0 6
      celery/concurrency/threads.py
  15. 3 5
      celery/contrib/rdb.py
  16. 1 1
      celery/datastructures.py
  17. 1 1
      celery/loaders/base.py
  18. 83 76
      celery/platforms.py
  19. 3 2
      celery/result.py
  20. 41 36
      celery/schedules.py
  21. 25 7
      celery/states.py
  22. 89 7
      celery/task/trace.py
  23. 2 1
      celery/tests/backends/test_redis.py
  24. 11 4
      celery/tests/bin/test_celeryd.py
  25. 8 8
      celery/tests/bin/test_celeryd_multi.py
  26. 1 1
      celery/tests/concurrency/test_eventlet.py
  27. 7 6
      celery/tests/concurrency/test_gevent.py
  28. 4 1
      celery/tests/tasks/test_chord.py
  29. 22 0
      celery/tests/tasks/test_tasks.py
  30. 2 2
      celery/tests/utilities/test_datastructures.py
  31. 55 98
      celery/tests/utilities/test_platforms.py
  32. 1 17
      celery/tests/utilities/test_timeutils.py
  33. 1 0
      celery/tests/utils.py
  34. 31 5
      celery/tests/worker/test_request.py
  35. 3 0
      celery/tests/worker/test_worker.py
  36. 1 0
      celery/utils/__init__.py
  37. 7 3
      celery/utils/compat.py
  38. 7 7
      celery/utils/dispatch/saferef.py
  39. 7 1
      celery/utils/functional.py
  40. 1 4
      celery/utils/imports.py
  41. 72 0
      celery/utils/iso8601.py
  42. 11 11
      celery/utils/threads.py
  43. 2 2
      celery/utils/timer2.py
  44. 68 26
      celery/utils/timeutils.py
  45. 8 11
      celery/worker/__init__.py
  46. 3 7
      celery/worker/autoreload.py
  47. 43 18
      celery/worker/consumer.py
  48. 1 0
      celery/worker/job.py
  49. 2 1
      celery/worker/state.py
  50. 4 18
      docs/configuration.rst
  51. 0 11
      docs/django/first-steps-with-django.rst
  52. 5 20
      docs/faq.rst
  53. 5 4
      docs/getting-started/first-steps-with-celery.rst
  54. 1 9
      docs/getting-started/next-steps.rst
  55. 9 8
      docs/internals/protocol.rst
  56. 11 0
      docs/internals/reference/celery.worker.components.rst
  57. 1 0
      docs/internals/reference/index.rst
  58. 0 7
      docs/userguide/application.rst
  59. 1 1
      docs/userguide/monitoring.rst
  60. 0 22
      docs/userguide/periodic-tasks.rst
  61. 30 4
      docs/userguide/tasks.rst
  62. 1 1
      extra/generic-init.d/celerybeat
  63. 32 8
      extra/release/verify-reference-index.sh
  64. 4 1
      funtests/benchmarks/bench_worker.py
  65. 14 11
      requirements/README.rst
  66. 0 4
      requirements/default-py3k.txt
  67. 3 3
      requirements/default.txt
  68. 1 0
      requirements/extra-py3k.txt
  69. 0 2
      requirements/py26.txt
  70. 3 6
      setup.cfg
  71. 5 6
      setup.py
  72. 4 3
      tox.ini

+ 90 - 1
Changelog

@@ -20,7 +20,96 @@ If you're looking for versions prior to 3.x you should see :ref:`history`.
 - `Task.apply_async` now supports timeout and soft_timeout arguments (Issue #802)
 - `App.control.Inspect.conf` can be used for inspecting worker configuration
 
-.. _version-3.0.9:
+.. _version-3.0.10:
+
+3.0.10
+======
+:release-date: TBA
+
+- Now depends on kombu 2.4.7
+
+- Now depends on billiard 2.7.3.13
+
+    - Fixes crash at startup when using Django and pre-1.4 projects
+      (setup_environ).
+
+    - Hard time limits now sends the KILL signal shortly after TERM,
+      to terminate processes that have signal handlers blocked by C extensions.
+
+    - Billiard now installs even if the C extension cannot be built.
+
+        It's still recommended to build the C extension if you are using
+        a transport other than rabbitmq/redis (or use force_execv for some
+        other reason).
+
+    - Pool now sets a ``current_process().index`` attribute that can be used to create
+      as many log files as there are processes in the pool.
+
+- Canvas: chord/group/chain no longer modifies the state when called
+
+    Previously calling a chord/group/chain would modify the ids of subtasks
+    so that::
+
+        >>> c = chord([add.s(2, 2), add.s(4, 4)], xsum.s())
+        >>> c()
+        >>> c() <-- call again
+
+    at the second time the ids for the tasks would be the same as in the
+    previous invocation.  This is now fixed, so that calling a subtask
+    won't mutate any options.
+
+- Canvas: Chaining a chord to another task now works.
+
+- Worker: Fixed a bug where the request stack could be corrupted if
+  relative imports are used.
+
+    Problem usually manifested itself as an exception while trying to
+    send a failed task result (NoneType does not have id attribute).
+
+    Fix contributed by Sam Cooke.
+
+- The worker now makes sure the request/task stacks are not modified
+  by the initial ``Task.__call__``.
+
+    This would previously be a problem if a custom task class defined
+    ``__call__`` and also called ``super()``.
+
+- Because of many bugs the fast local optimization has been disabled,
+  and can only be enabled by setting the :envvar:`USE_FAST_LOCALS` attribute.
+
+- Worker: Now sets a default socket timeout of 5 seconds at shutdown
+  so that broken socket reads do not hinder proper shutdown (Issue #975).
+
+- More fixes related to late eventlet/gevent patching.
+
+- Documentation for the settings out of sync with reality:
+
+    - :setting:`CELERY_TASK_PUBLISH_RETRY`
+
+        Documented as disabled by default, but it was enabled by default
+        since 2.5 as stated by the 2.5 changelog.
+
+    - :setting:`CELERY_TASK_PUBLISH_RETRY_POLICY`
+
+        The default max_retries had been set to 100, but documented as being
+        3, and the interval_max was set to 1 but documented as 0.2.
+        The default setting are now set to 3 and 0.2 as it was originally
+        documented.
+
+    Fix contributed by Matt Long.
+
+- Worker: Log messages when connection established and lost have been improved
+  so that they are more useful when used with the upcoming multiple broker
+  hostlist for failover that is coming in the next Kombu version.
+
+- The repr of a crontab schedule value of '0' should be '*'  (Issue #972).
+
+- Revoked tasks are now removed from reserved/active state in the worker
+  (Issue #969)
+
+    Fix contributed by Alexey Zatelepin.
+
+- gevent: Now supports hard time limits using ``gevent.Timeout`.
 
 3.0.9
 =====

+ 12 - 0
celery/__main__.py

@@ -20,5 +20,17 @@ def _compat_worker():
     main()
 
 
+def _compat_multi():
+    maybe_patch_concurrency()
+    from celery.bin.celeryd_multi import main
+    main()
+
+
+def _compat_beat():
+    maybe_patch_concurrency()
+    from celery.bin.celerybeat import main
+    main()
+
+
 if __name__ == '__main__':
     main()

+ 1 - 1
celery/app/amqp.py

@@ -14,10 +14,10 @@ from weakref import WeakValueDictionary
 from kombu import Connection, Consumer, Exchange, Producer, Queue
 from kombu.common import entry_to_queue
 from kombu.pools import ProducerPool
+from kombu.utils import cached_property, uuid
 from kombu.utils.encoding import safe_repr
 
 from celery import signals
-from celery.utils import cached_property, uuid
 from celery.utils.text import indent as textindent
 
 from . import app_or_default

+ 7 - 6
celery/app/builtins.py

@@ -213,16 +213,17 @@ def add_chain_task(app):
                         next_step = steps.popleft()
                     except IndexError:
                         next_step = None
-                if next_step is not None:
-                    task = chord(task, body=next_step, task_id=tid)
+                    if next_step is not None:
+                        task = chord(task, body=next_step, task_id=tid)
                 if prev_task:
                     # link previous task to this task.
                     prev_task.link(task)
                     # set the results parent attribute.
                     res.parent = prev_res
 
-                results.append(res)
-                tasks.append(task)
+                if not isinstance(prev_task, chord):
+                    results.append(res)
+                    tasks.append(task)
                 prev_task, prev_res = task, res
 
             return tasks, results
@@ -275,8 +276,8 @@ def add_chord_task(app):
             prepare_member = self._prepare_member
 
             # - convert back to group if serialized
-            if not isinstance(header, group):
-                header = group([maybe_subtask(t) for t in  header])
+            tasks = header.tasks if isinstance(header, group) else header
+            header = group([maybe_subtask(s).clone() for s in tasks])
             # - eager applies the group inline
             if eager:
                 return header.apply(args=partial_args, task_id=group_id)

+ 9 - 3
celery/app/task.py

@@ -43,15 +43,20 @@ class Context(object):
     args = None
     kwargs = None
     retries = 0
+    eta = None
+    expires = None
     is_eager = False
     delivery_info = None
     taskset = None   # compat alias to group
     group = None
     chord = None
+    utc = None
     called_directly = True
     callbacks = None
     errbacks = None
+    timeouts = None
     _children = None   # see property
+    _protected = 0
 
     def __init__(self, *args, **kwargs):
         self.update(*args, **kwargs)
@@ -267,9 +272,9 @@ class Task(object):
         if not was_bound:
             self.annotate()
 
-        from celery.utils.threads import LocalStack
-        self.request_stack = LocalStack()
-        self.request_stack.push(Context())
+            from celery.utils.threads import LocalStack
+            self.request_stack = LocalStack()
+            self.request_stack.push(Context())
 
         # PeriodicTask uses this to add itself to the PeriodicTask schedule.
         self.on_bound(app)
@@ -544,6 +549,7 @@ class Task(object):
         if delivery_info:
             options.setdefault('exchange', delivery_info.get('exchange'))
             options.setdefault('routing_key', delivery_info.get('routing_key'))
+        options.setdefault('expires', request.expires)
 
         if not eta and countdown is None:
             countdown = self.default_retry_delay

+ 8 - 1
celery/apps/worker.py

@@ -24,6 +24,7 @@ from billiard import current_process
 from celery import VERSION_BANNER, platforms, signals
 from celery.exceptions import SystemTerminate
 from celery.loaders.app import AppLoader
+from celery.task import trace
 from celery.utils import cry, isatty
 from celery.utils.imports import qualname
 from celery.utils.log import get_logger, in_sighandler, set_in_sighandler
@@ -82,6 +83,9 @@ class Worker(WorkController):
 
     def on_before_init(self, purge=False, redirect_stdouts=None,
             redirect_stdouts_level=None, **kwargs):
+        # apply task execution optimizations
+        trace.setup_worker_optimizations(self.app)
+
         # this signal can be used to set up configuration for
         # workers by name.
         conf = self.app.conf
@@ -98,6 +102,9 @@ class Worker(WorkController):
         self.redirect_stdouts_level = redirect_stdouts_level
 
     def on_start(self):
+        # apply task execution optimizations
+        trace.setup_worker_optimizations(self.app)
+
         # this signal can be used to e.g. change queues after
         # the -Q option has been applied.
         signals.celeryd_after_setup.send(sender=self.hostname, instance=self,
@@ -120,7 +127,7 @@ class Worker(WorkController):
 
     def on_consumer_ready(self, consumer):
         signals.worker_ready.send(sender=consumer)
-        print('celery@{0.hostname} has started.'.format(self))
+        print('celery@{0.hostname} ready.'.format(self))
 
     def redirect_stdouts_to_logger(self):
         self.app.log.setup(self.loglevel, self.logfile,

+ 2 - 1
celery/backends/cache.py

@@ -8,9 +8,10 @@
 """
 from __future__ import absolute_import
 
+from kombu.utils import cached_property
+
 from celery.datastructures import LRUCache
 from celery.exceptions import ImproperlyConfigured
-from celery.utils import cached_property
 
 from .base import KeyValueStoreBackend
 

+ 1 - 1
celery/backends/redis.py

@@ -8,10 +8,10 @@
 """
 from __future__ import absolute_import
 
+from kombu.utils import cached_property
 from kombu.utils.url import _parse_url
 
 from celery.exceptions import ImproperlyConfigured
-from celery.utils import cached_property
 
 from .base import KeyValueStoreBackend
 

+ 3 - 6
celery/beat.py

@@ -7,6 +7,7 @@
 
 """
 from __future__ import absolute_import
+from __future__ import with_statement
 
 import errno
 import os
@@ -18,7 +19,7 @@ import traceback
 from threading import Event, Thread
 
 from billiard import Process, ensure_multiprocessing
-from kombu.utils import reprcall
+from kombu.utils import cached_property, reprcall
 from kombu.utils.functional import maybe_promise
 
 from . import __version__
@@ -27,7 +28,6 @@ from . import signals
 from . import current_app
 from .app import app_or_default
 from .schedules import maybe_schedule, crontab
-from .utils import cached_property
 from .utils.imports import instantiate
 from .utils.timeutils import humanize_seconds
 from .utils.log import get_logger
@@ -323,11 +323,8 @@ class PersistentScheduler(Scheduler):
 
     def _remove_db(self):
         for suffix in self.known_suffixes:
-            try:
+            with platforms.ignore_errno(errno.ENOENT):
                 os.remove(self.schedule_filename + suffix)
-            except OSError as exc:
-                if exc.errno != errno.ENOENT:
-                    raise
 
     def setup_schedule(self):
         try:

+ 4 - 3
celery/bin/celeryd_multi.py

@@ -92,6 +92,7 @@ from __future__ import absolute_import, print_function
 
 import errno
 import os
+import shlex
 import signal
 import socket
 import sys
@@ -105,7 +106,7 @@ from kombu.utils import cached_property
 from kombu.utils.encoding import from_utf8
 
 from celery import VERSION_BANNER
-from celery.platforms import PIDFile, shellsplit
+from celery.platforms import Pidfile, IS_WINDOWS
 from celery.utils import term
 from celery.utils.text import pluralize
 
@@ -299,7 +300,7 @@ class MultiTool(object):
             pid = None
             pidfile = expander(pidfile_template)
             try:
-                pid = PIDFile(pidfile).read_pid()
+                pid = Pidfile(pidfile).read_pid()
             except ValueError:
                 pass
             if pid:
@@ -373,7 +374,7 @@ class MultiTool(object):
 
     def waitexec(self, argv, path=sys.executable):
         args = ' '.join([path] + list(argv))
-        argstr = shellsplit(from_utf8(args))
+        argstr = shlex.split(from_utf8(args), posix=not IS_WINDOWS)
         pipe = Popen(argstr, env=self.env)
         self.info('  {0}'.format(' '.join(argstr)))
         retcode = pipe.wait()

+ 10 - 8
celery/canvas.py

@@ -11,14 +11,14 @@
 """
 from __future__ import absolute_import
 
+from copy import deepcopy
 from operator import itemgetter
 from itertools import chain as _chain, imap
 
-from kombu.utils import fxrange, kwdict, reprcall
+from kombu.utils import cached_property, fxrange, kwdict, reprcall, uuid
 
 from celery import current_app
 from celery.local import Proxy
-from celery.utils import cached_property, uuid
 from celery.utils.functional import (
     maybe_list, is_list, regen,
     chunks as _chunks,
@@ -116,10 +116,11 @@ class Signature(dict):
                 dict(self.kwargs, **kwargs) if kwargs else self.kwargs,
                 dict(self.options, **options) if options else self.options)
 
-    def clone(self, args=(), kwargs={}, **options):
-        args, kwargs, options = self._merge(args, kwargs, options)
-        s = Signature.from_dict({'task': self.task, 'args': args,
-                                 'kwargs': kwargs, 'options': options,
+    def clone(self, args=(), kwargs={}, **opts):
+        # need to deepcopy options so origins links etc. is not modified.
+        args, kwargs, opts = self._merge(args, kwargs, opts)
+        s = Signature.from_dict({'task': self.task, 'args': tuple(args),
+                                 'kwargs': kwargs, 'options': deepcopy(opts),
                                  'subtask_type': self.subtask_type,
                                  'immutable': self.immutable})
         s._type = self._type
@@ -352,11 +353,12 @@ class chord(Signature):
 
     def __call__(self, body=None, **kwargs):
         _chord = self.Chord
-        body = self.kwargs['body'] = body or self.kwargs['body']
+        body = (body or self.kwargs['body']).clone()
+        kwargs = dict(self.kwargs, body=body, **kwargs)
         if _chord.app.conf.CELERY_ALWAYS_EAGER:
             return self.apply((), kwargs)
         callback_id = body.options.setdefault('task_id', uuid())
-        _chord(**dict(self.kwargs, **kwargs))
+        _chord(**kwargs)
         return _chord.AsyncResult(callback_id)
 
     def clone(self, *args, **kwargs):

+ 4 - 1
celery/concurrency/__init__.py

@@ -8,7 +8,10 @@
 """
 from __future__ import absolute_import
 
-from celery.local import symbol_by_name
+# Import from kombu directly as it's used
+# early in the import stage, where celery.utils loads
+# too much (e.g. for eventlet patching)
+from kombu.utils import symbol_by_name
 
 ALIASES = {
     'processes': 'celery.concurrency.processes:TaskPool',

+ 1 - 0
celery/concurrency/gevent.py

@@ -7,6 +7,7 @@
 
 """
 from __future__ import absolute_import
+from __future__ import with_statement
 
 import os
 

+ 0 - 6
celery/concurrency/threads.py

@@ -8,16 +8,10 @@
 """
 from __future__ import absolute_import
 
-import os
-
 from celery.utils.compat import UserDict
 
 from .base import apply_target, BasePool
 
-#: Makes sure we don't use threading.local for stacks
-#: since apparently they don't work properly.
-os.environ['USE_PURE_LOCALS'] = '1'
-
 
 class NullDict(UserDict):
 

+ 3 - 5
celery/contrib/rdb.py

@@ -46,6 +46,8 @@ from pdb import Pdb
 
 from billiard import current_process
 
+from celery.platforms import ignore_errno
+
 default_port = 6899
 
 CELERY_RDB_HOST = os.environ.get('CELERY_RDB_HOST') or '127.0.0.1'
@@ -148,12 +150,8 @@ class Rdb(Pdb):
     def set_trace(self, frame=None):
         if frame is None:
             frame = _frame().f_back
-        try:
+        with ignore_errno(errno.ECONNRESET):
             Pdb.set_trace(self, frame)
-        except socket.error as exc:
-            # connection reset by peer.
-            if exc.errno != errno.ECONNRESET:
-                raise
 
     def set_quit(self):
         # this raises a BdbQuit exception that we are unable to catch.

+ 1 - 1
celery/datastructures.py

@@ -215,7 +215,7 @@ class AttributeDictMixin(object):
             return self[k]
         except KeyError:
             raise AttributeError(
-                "{0!r} object has no attribute {1!r}".format(
+                '{0!r} object has no attribute {1!r}'.format(
                     type(self).__name__, k))
 
     def __setattr__(self, key, value):

+ 1 - 1
celery/loaders/base.py

@@ -16,11 +16,11 @@ import re
 from datetime import datetime
 from itertools import imap
 
+from kombu.utils import cached_property
 from kombu.utils.encoding import safe_str
 
 from celery.datastructures import DictAttribute
 from celery.exceptions import ImproperlyConfigured
-from celery.utils import cached_property
 from celery.utils.imports import import_from_cwd, symbol_by_name
 from celery.utils.functional import maybe_list
 

+ 83 - 76
celery/platforms.py

@@ -13,22 +13,21 @@ import atexit
 import errno
 import os
 import platform as _platform
-import shlex
 import signal as _signal
 import sys
 
+from billiard import current_process
 from contextlib import contextmanager
 from itertools import imap
 
 from .local import try_import
 
-from kombu.utils.limits import TokenBucket
-
 _setproctitle = try_import('setproctitle')
 resource = try_import('resource')
 pwd = try_import('pwd')
 grp = try_import('grp')
 
+# exitcodes
 EX_OK = getattr(os, 'EX_OK', 0)
 EX_FAILURE = 1
 EX_UNAVAILABLE = getattr(os, 'EX_UNAVAILABLE', 69)
@@ -44,13 +43,12 @@ DAEMON_WORKDIR = '/'
 PIDFILE_FLAGS = os.O_CREAT | os.O_EXCL | os.O_WRONLY
 PIDFILE_MODE = ((os.R_OK | os.W_OK) << 6) | ((os.R_OK) << 3) | ((os.R_OK))
 
-_setps_bucket = TokenBucket(0.5)  # 30/m, every 2 seconds
-
 PIDLOCKED = """ERROR: Pidfile ({0}) already exists.
-Seems we're already running? (PID: {1})"""
+Seems we're already running? (pid: {1})"""
 
 
 def pyimplementation():
+    """Returns string identifying the current Python implementation."""
     if hasattr(_platform, 'python_implementation'):
         return _platform.python_implementation()
     elif sys.platform.startswith('java'):
@@ -65,6 +63,12 @@ def pyimplementation():
 
 
 def _find_option_with_arg(argv, short_opts=None, long_opts=None):
+    """Search argv for option specifying its short and longopt
+    alternatives.
+
+    Returns the value of the option if found.
+
+    """
     for i, arg in enumerate(argv):
         if arg.startswith('-'):
             if long_opts and arg.startswith('--'):
@@ -77,6 +81,10 @@ def _find_option_with_arg(argv, short_opts=None, long_opts=None):
 
 
 def maybe_patch_concurrency(argv, short_opts=None, long_opts=None):
+    """With short and long opt alternatives that specify the command-line
+    option to set the pool, this makes sure that anything that needs
+    to be patched is completed as early as possible.
+    (e.g. eventlet/gevent monkey patches)."""
     try:
         pool = _find_option_with_arg(argv, short_opts, long_opts)
     except KeyError:
@@ -89,7 +97,6 @@ def maybe_patch_concurrency(argv, short_opts=None, long_opts=None):
 
 class LockFailed(Exception):
     """Raised if a pidlock can't be acquired."""
-    pass
 
 
 def get_fdmax(default=None):
@@ -106,13 +113,14 @@ def get_fdmax(default=None):
     return fdmax
 
 
-class PIDFile(object):
-    """PID lock file.
+class Pidfile(object):
+    """Pidfile
 
     This is the type returned by :func:`create_pidlock`.
 
-    **Should not be used directly, use the :func:`create_pidlock`
-    context instead**
+    TIP: Use the :func:`create_pidlock` function instead,
+    which is more convenient and also removes stale pidfiles (when
+    the process holding the lock is no longer running).
 
     """
 
@@ -142,34 +150,23 @@ class PIDFile(object):
 
     def read_pid(self):
         """Reads and returns the current pid."""
-        try:
-            fh = open(self.path, 'r')
-        except IOError as exc:
-            if exc.errno == errno.ENOENT:
-                return
-            raise
-
-        try:
-            line = fh.readline()
-            if line.strip() == line:  # must contain '\n'
-                raise ValueError(
-                    'Partial or invalid pidfile {0.path}'.format(self))
-        finally:
-            fh.close()
-
-        try:
-            return int(line.strip())
-        except ValueError:
-            raise ValueError('PID file {0.path} invalid.'.format(self))
+        with ignore_errno('ENOENT'):
+            with open(self.path, 'r') as fh:
+                line = fh.readline()
+                if line.strip() == line:  # must contain '\n'
+                    raise ValueError(
+                        'Partial or invalid pidfile {0.path}'.format(self))
+
+                try:
+                    return int(line.strip())
+                except ValueError:
+                    raise ValueError(
+                        'pidfile {0.path} contents invalid.'.format(self))
 
     def remove(self):
         """Removes the lock."""
-        try:
+        with ignore_errno(errno.ENOENT, errno.EACCES):
             os.unlink(self.path)
-        except OSError as exc:
-            if exc.errno in (errno.ENOENT, errno.EACCES):
-                return
-            raise
 
     def remove_if_stale(self):
         """Removes the lock if the process is not running.
@@ -217,19 +214,21 @@ class PIDFile(object):
                     "Inconsistency: Pidfile content doesn't match at re-read")
         finally:
             rfh.close()
+PIDFile = Pidfile  # compat alias
 
 
 def create_pidlock(pidfile):
-    """Create and verify pid file.
+    """Create and verify pidfile.
 
-    If the pid file already exists the program exits with an error message,
-    however if the process it refers to is not running anymore, the pid file
+    If the pidfile already exists the program exits with an error message,
+    however if the process it refers to is not running anymore, the pidfile
     is deleted and the program continues.
 
-    The caller is responsible for releasing the lock before the program
-    exits.
+    This function will automatically install an :mod:`atexit` handler
+    to release the lock at exit, you can skip this by calling
+    :func:`_create_pidlock` instead.
 
-    :returns: :class:`PIDFile`.
+    :returns: :class:`Pidfile`.
 
     **Example**:
 
@@ -244,7 +243,7 @@ def create_pidlock(pidfile):
 
 
 def _create_pidlock(pidfile):
-    pidlock = PIDFile(pidfile)
+    pidlock = Pidfile(pidfile)
     if pidlock.is_locked() and not pidlock.remove_if_stale():
         raise SystemExit(PIDLOCKED.format(pidfile, pidlock.read_pid()))
     pidlock.acquire()
@@ -252,6 +251,7 @@ def _create_pidlock(pidfile):
 
 
 def fileno(f):
+    """Get object fileno, or :const:`None` if not defined."""
     try:
         return f.fileno()
     except AttributeError:
@@ -260,13 +260,11 @@ def fileno(f):
 
 class DaemonContext(object):
     _is_open = False
-    workdir = DAEMON_WORKDIR
-    umask = DAEMON_UMASK
 
     def __init__(self, pidfile=None, workdir=None, umask=None,
             fake=False, **kwargs):
-        self.workdir = workdir or self.workdir
-        self.umask = self.umask if umask is None else umask
+        self.workdir = workdir or DAEMON_WORKDIR
+        self.umask = DAEMON_UMASK if umask is None else umask
         self.fake = fake
         self.stdfds = (sys.stdin, sys.stdout, sys.stderr)
 
@@ -286,7 +284,7 @@ class DaemonContext(object):
             preserve = [fileno(f) for f in self.stdfds if fileno(f)]
             for fd in reversed(range(get_fdmax(default=2048))):
                 if fd not in preserve:
-                    with ignore_EBADF():
+                    with ignore_errno(errno.EBADF):
                         os.close(fd)
 
             for fd in self.stdfds:
@@ -316,7 +314,7 @@ def detached(logfile=None, pidfile=None, uid=None, gid=None, umask=0,
 
     :keyword logfile: Optional log file.  The ability to write to this file
        will be verified before the process is detached.
-    :keyword pidfile: Optional pid file.  The pid file will not be created,
+    :keyword pidfile: Optional pidfile.  The pidfile will not be created,
       as this is the responsibility of the child.  But the process will
       exit if the pid lock exists and the pid written is still running.
     :keyword uid: Optional user id or user name to change
@@ -332,7 +330,6 @@ def detached(logfile=None, pidfile=None, uid=None, gid=None, umask=0,
 
     .. code-block:: python
 
-        import atexit
         from celery.platforms import detached, create_pidlock
 
         with detached(logfile='/var/log/app.log', pidfile='/var/run/app.pid',
@@ -418,6 +415,7 @@ def _setgroups_hack(groups):
 
 
 def setgroups(groups):
+    """Set active groups from a list of group ids."""
     max_groups = None
     try:
         max_groups = os.sysconf('SC_NGROUPS_MAX')
@@ -434,6 +432,8 @@ def setgroups(groups):
 
 
 def initgroups(uid, gid):
+    """Compat version of :func:`os.initgroups` which was first
+    added to Python 2.7."""
     if not pwd:  # pragma: no cover
         return
     username = pwd.getpwuid(uid)[0]
@@ -444,25 +444,13 @@ def initgroups(uid, gid):
     setgroups(groups)
 
 
-def setegid(gid):
-    """Set effective group id."""
-    gid = parse_gid(gid)
-    if gid != os.getegid():
-        os.setegid(gid)
-
-
-def seteuid(uid):
-    """Set effective user id."""
-    uid = parse_uid(uid)
-    if uid != os.geteuid():
-        os.seteuid(uid)
-
-
 def setgid(gid):
+    """Version of :func:`os.setgid` supporting group names."""
     os.setgid(parse_gid(gid))
 
 
 def setuid(uid):
+    """Version of :func:`os.setuid` supporting usernames."""
     os.setuid(parse_uid(uid))
 
 
@@ -625,29 +613,48 @@ if os.environ.get('NOSETPS'):  # pragma: no cover
         pass
 else:
 
-    def set_mp_process_title(progname, info=None, hostname=None,  # noqa
-            rate_limit=False):
+    def set_mp_process_title(progname, info=None, hostname=None):  # noqa
         """Set the ps name using the multiprocessing process name.
 
         Only works if :mod:`setproctitle` is installed.
 
         """
-        if not rate_limit or _setps_bucket.can_consume(1):
-            from billiard import current_process
-            if hostname:
-                progname = '{0}@{1}'.format(progname, hostname.split('.')[0])
-            return set_process_title(
-                '{0}:{1}'.format(progname, current_process().name), info=info)
+        if hostname:
+            progname = '{0}@{1}'.format(progname, hostname.split('.')[0])
+        return set_process_title(
+            '{0}:{1}'.format(progname, current_process().name), info=info)
 
 
-def shellsplit(s):
-    return shlex.split(s, posix=not IS_WINDOWS)
+def get_errno(n):
+    """Get errno for string, e.g. ``ENOENT``."""
+    if isinstance(n, basestring):
+        return getattr(errno, n)
+    return n
 
 
 @contextmanager
-def ignore_EBADF():
+def ignore_errno(*errnos, **kwargs):
+    """Context manager to ignore specific POSIX error codes.
+
+    Takes a list of error codes to ignore, which can be either
+    the name of the code, or the code integer itself::
+
+        >>> with ignore_errno('ENOENT'):
+        ...     with open('foo', 'r'):
+        ...         return r.read()
+
+        >>> with ignore_errno(errno.ENOENT, errno.EPERM):
+        ...    pass
+
+    :keyword types: A tuple of exceptions to ignore (when the errno matches),
+                    defaults to :exc:`Exception`.
+    """
+    types = kwargs.get('types') or (Exception, )
+    errnos = [get_errno(errno) for errno in errnos]
     try:
         yield
-    except OSError as exc:
-        if exc.errno != errno.EBADF:
+    except types, exc:
+        if not hasattr(exc, 'errno'):
+            raise
+        if exc.errno not in errnos:
             raise

+ 3 - 2
celery/result.py

@@ -14,13 +14,14 @@ from collections import deque
 from copy import copy
 from itertools import imap
 
+from kombu.utils import cached_property
+from kombu.utils.compat import OrderedDict
+
 from . import current_app
 from . import states
 from .app import app_or_default
 from .datastructures import DependencyGraph
 from .exceptions import IncompleteStream, TimeoutError
-from .utils import cached_property
-from .utils.compat import OrderedDict
 
 
 def from_serializable(r):

+ 41 - 36
celery/schedules.py

@@ -13,14 +13,13 @@ import re
 
 from datetime import datetime, timedelta
 
-from dateutil.relativedelta import relativedelta
 from kombu.utils import cached_property
 
 from . import current_app
 from .utils import is_iterable
 from .utils.timeutils import (
     timedelta_seconds, weekday, maybe_timedelta, remaining,
-    humanize_seconds, timezone, maybe_make_aware
+    humanize_seconds, timezone, maybe_make_aware, ffwd
 )
 from .datastructures import AttributeDict
 
@@ -35,6 +34,10 @@ int, basestring, or an iterable type. {type!r} was given.\
 """
 
 
+def _weak_bool(s):
+    return 0 if s == '0' else s
+
+
 class ParseException(Exception):
     """Raised by crontab_parser when the input can't be parsed."""
 
@@ -122,10 +125,10 @@ class schedule(object):
     def utc_enabled(self):
         return self.app.conf.CELERY_ENABLE_UTC
 
-    @cached_property
-    def to_local(self):
-        return (timezone.to_local if self.utc_enabled
-                                  else timezone.to_local_fallback)
+    def to_local(self, dt):
+        if not self.utc_enabled:
+            return timezone.to_local_fallback(dt, self.tz)
+        return dt
 
 
 class crontab_parser(object):
@@ -396,13 +399,13 @@ class crontab(schedule):
             datedata.dom += 1
             roll_over()
 
-        return relativedelta(year=datedata.year,
-                             month=months_of_year[datedata.moy],
-                             day=days_of_month[datedata.dom],
-                             hour=next_hour,
-                             minute=next_minute,
-                             second=0,
-                             microsecond=0)
+        return ffwd(year=datedata.year,
+                    month=months_of_year[datedata.moy],
+                    day=days_of_month[datedata.dom],
+                    hour=next_hour,
+                    minute=next_minute,
+                    second=0,
+                    microsecond=0)
 
     def __init__(self, minute='*', hour='*', day_of_week='*',
             day_of_month='*', month_of_year='*', nowfun=None):
@@ -423,11 +426,11 @@ class crontab(schedule):
 
     def __repr__(self):
         return ('<crontab: %s %s %s %s %s (m/h/d/dM/MY)>' %
-                                            (self._orig_minute or '*',
-                                             self._orig_hour or '*',
-                                             self._orig_day_of_week or '*',
-                                             self._orig_day_of_month or '*',
-                                             self._orig_month_of_year or '*'))
+                        (_weak_bool(self._orig_minute) or '*',
+                         _weak_bool(self._orig_hour) or '*',
+                         _weak_bool(self._orig_day_of_week) or '*',
+                         _weak_bool(self._orig_day_of_month) or '*',
+                         _weak_bool(self._orig_month_of_year) or '*'))
 
     def __reduce__(self):
         return (self.__class__, (self._orig_minute,
@@ -436,9 +439,7 @@ class crontab(schedule):
                                  self._orig_day_of_month,
                                  self._orig_month_of_year), None)
 
-    def remaining_estimate(self, last_run_at, tz=None):
-        """Returns when the periodic task should run next as a timedelta."""
-        tz = tz or self.tz
+    def remaining_delta(self, last_run_at, ffwd=ffwd):
         last_run_at = self.maybe_make_aware(last_run_at)
         dow_num = last_run_at.isoweekday() % 7  # Sunday is day 0, not day 7
 
@@ -453,9 +454,9 @@ class crontab(schedule):
         if execute_this_hour:
             next_minute = min(minute for minute in self.minute
                                         if minute > last_run_at.minute)
-            delta = relativedelta(minute=next_minute,
-                                  second=0,
-                                  microsecond=0)
+            delta = ffwd(minute=next_minute,
+                         second=0,
+                         microsecond=0)
         else:
             next_minute = min(self.minute)
             execute_today = (execute_this_date and
@@ -464,10 +465,10 @@ class crontab(schedule):
             if execute_today:
                 next_hour = min(hour for hour in self.hour
                                         if hour > last_run_at.hour)
-                delta = relativedelta(hour=next_hour,
-                                      minute=next_minute,
-                                      second=0,
-                                      microsecond=0)
+                delta = ffwd(hour=next_hour,
+                             minute=next_minute,
+                             second=0,
+                             microsecond=0)
             else:
                 next_hour = min(self.hour)
                 all_dom_moy = (self._orig_day_of_month == '*' and
@@ -478,18 +479,22 @@ class crontab(schedule):
                                 self.day_of_week)
                     add_week = next_day == dow_num
 
-                    delta = relativedelta(weeks=add_week and 1 or 0,
-                                          weekday=(next_day - 1) % 7,
-                                          hour=next_hour,
-                                          minute=next_minute,
-                                          second=0,
-                                          microsecond=0)
+                    delta = ffwd(weeks=add_week and 1 or 0,
+                                 weekday=(next_day - 1) % 7,
+                                 hour=next_hour,
+                                 minute=next_minute,
+                                 second=0,
+                                 microsecond=0)
                 else:
                     delta = self._delta_to_next(last_run_at,
                                                 next_hour, next_minute)
 
-        return remaining(self.to_local(last_run_at, tz),
-                         delta, self.to_local(self.now(), tz))
+        now = self.maybe_make_aware(self.now())
+        return self.to_local(last_run_at), delta, self.to_local(now)
+
+    def remaining_estimate(self, last_run_at, ffwd=ffwd):
+        """Returns when the periodic task should run next as a timedelta."""
+        return remaining(*self.remaining_delta(last_run_at, ffwd=ffwd))
 
     def is_due(self, last_run_at):
         """Returns tuple of two items `(is_due, next_time_to_run)`,

+ 25 - 7
celery/states.py

@@ -3,7 +3,7 @@
 celery.states
 =============
 
-Built-in Task States.
+Built-in task states.
 
 .. _states:
 
@@ -12,6 +12,8 @@ States
 
 See :ref:`task-states`.
 
+.. _statesets:
+
 Sets
 ----
 
@@ -84,22 +86,38 @@ def precedence(state):
 
 class state(str):
     """State is a subclass of :class:`str`, implementing comparison
-    methods adhering to state precedence rules."""
+    methods adhering to state precedence rules::
+
+        >>> from celery.states import state, PENDING, SUCCESS
+
+        >>> state(PENDING) < state(SUCCESS)
+        True
+
+    Any custom state is considered to be lower than :state:`FAILURE` and
+    :state:`SUCCESS`, but higher than any of the other built-in states::
+
+        >>> state('PROGRESS') > state(STARTED)
+        True
+
+        >>> state('PROGRESS') > state('SUCCESS')
+        False
+
+    """
 
-    def compare(self, other, fun, default=False):
+    def compare(self, other, fun):
         return fun(precedence(self), precedence(other))
 
     def __gt__(self, other):
-        return self.compare(other, lambda a, b: a < b, True)
+        return self.compare(other, lambda a, b: a < b)
 
     def __ge__(self, other):
-        return self.compare(other, lambda a, b: a <= b, True)
+        return self.compare(other, lambda a, b: a <= b)
 
     def __lt__(self, other):
-        return self.compare(other, lambda a, b: a > b, False)
+        return self.compare(other, lambda a, b: a > b)
 
     def __le__(self, other):
-        return self.compare(other, lambda a, b: a >= b, False)
+        return self.compare(other, lambda a, b: a >= b)
 
 #: Task state is unknown (assumed pending since you know the id).
 PENDING = 'PENDING'

+ 89 - 7
celery/task/trace.py

@@ -25,7 +25,8 @@ from kombu.utils import kwdict
 
 from celery import current_app
 from celery import states, signals
-from celery._state import _task_stack, default_app
+from celery._state import _task_stack
+from celery.app import set_default_app
 from celery.app.task import Task as BaseTask, Context
 from celery.datastructures import ExceptionInfo
 from celery.exceptions import RetryTaskError
@@ -46,11 +47,9 @@ RETRY = states.RETRY
 FAILURE = states.FAILURE
 EXCEPTION_STATES = states.EXCEPTION_STATES
 
-try:
-    _tasks = default_app._tasks
-except AttributeError:
-    # Windows: will be set later by concurrency.processes.
-    pass
+#: set by :func:`setup_worker_optimizations`
+_tasks = None
+_patched = {}
 
 
 def mro_lookup(cls, attr, stop=(), monkey_patched=[]):
@@ -288,7 +287,15 @@ def trace_task(task, uuid, args, kwargs, request={}, **opts):
         return report_internal_error(task, exc)
 
 
-def trace_task_ret(task, uuid, args, kwargs, request={}):
+def _trace_task_ret(name, uuid, args, kwargs, request={}, **opts):
+    return trace_task(current_app.tasks[name],
+                      uuid, args, kwargs, request, **opts)
+trace_task_ret = _trace_task_ret
+
+
+def _fast_trace_task(task, uuid, args, kwargs, request={}):
+    # setup_worker_optimizations will point trace_task_ret to here,
+    # so this is the function used in the worker.
     return _tasks[task].__trace__(uuid, args, kwargs, request)[0]
 
 
@@ -309,3 +316,78 @@ def report_internal_error(task, exc):
         return exc_info
     finally:
         del(_tb)
+
+
+def setup_worker_optimizations(app):
+    global _tasks
+    global trace_task_ret
+
+    # make sure custom Task.__call__ methods that calls super
+    # will not mess up the request/task stack.
+    _install_stack_protection()
+
+    # all new threads start without a current app, so if an app is not
+    # passed on to the thread it will fall back to the "default app",
+    # which then could be the wrong app.  So for the worker
+    # we set this to always return our app.  This is a hack,
+    # and means that only a single app can be used for workers
+    # running in the same process.
+    set_default_app(app)
+
+    # evaluate all task classes by finalizing the app.
+    app.finalize()
+
+    # set fast shortcut to task registry
+    _tasks = app._tasks
+
+    trace_task_ret = _fast_trace_task
+    try:
+        sys.modules['celery.worker.job'].trace_task_ret = _fast_trace_task
+    except KeyError:
+        pass
+
+
+def reset_worker_optimizations():
+    global trace_task_ret
+    trace_task_ret = _trace_task_ret
+    try:
+        delattr(BaseTask, '_stackprotected')
+    except AttributeError:
+        pass
+    try:
+        BaseTask.__call__ = _patched.pop('BaseTask.__call__')
+    except KeyError:
+        pass
+    try:
+        sys.modules['celery.worker.job'].trace_task_ret = _trace_task_ret
+    except KeyError:
+        pass
+
+
+def _install_stack_protection():
+    # Patches BaseTask.__call__ in the worker to handle the edge case
+    # where people override it and also call super.
+    #
+    # - The worker optimizes away BaseTask.__call__ and instead
+    #   calls task.run directly.
+    # - so with the addition of current_task and the request stack
+    #   BaseTask.__call__ now pushes to those stacks so that
+    #   they work when tasks are called directly.
+    #
+    # The worker only optimizes away __call__ in the case
+    # where it has not been overridden, so the request/task stack
+    # will blow if a custom task class defines __call__ and also
+    # calls super().
+    if not getattr(BaseTask, '_stackprotected', False):
+        _patched['BaseTask.__call__'] = orig = BaseTask.__call__
+
+        def __protected_call__(self, *args, **kwargs):
+            stack = self.request_stack
+            req = stack.top
+            if req and not req._protected and len(stack) == 2 and \
+                    not req.called_directly:
+                req._protected = 1
+                return self.run(*args, **kwargs)
+            return orig(self, *args, **kwargs)
+        BaseTask.__call__ = __protected_call__
+        BaseTask._stackprotected = True

+ 2 - 1
celery/tests/backends/test_redis.py

@@ -6,13 +6,14 @@ from mock import Mock, patch
 from nose import SkipTest
 from pickle import loads, dumps
 
+from kombu.utils import cached_property, uuid
+
 from celery import current_app
 from celery import states
 from celery.datastructures import AttributeDict
 from celery.exceptions import ImproperlyConfigured
 from celery.result import AsyncResult
 from celery.task import subtask
-from celery.utils import cached_property, uuid
 from celery.utils.timeutils import timedelta_seconds
 
 from celery.tests.utils import Case

+ 11 - 4
celery/tests/bin/test_celeryd.py

@@ -19,6 +19,7 @@ from celery import current_app
 from celery.apps import worker as cd
 from celery.bin.celeryd import WorkerCommand, main as celeryd_main
 from celery.exceptions import ImproperlyConfigured, SystemTerminate
+from celery.task import trace
 from celery.utils.log import ensure_process_aware_logger
 from celery.worker import state
 
@@ -32,6 +33,13 @@ from celery.tests.utils import (
 ensure_process_aware_logger()
 
 
+class WorkerAppCase(AppCase):
+
+    def tearDown(self):
+        super(WorkerAppCase, self).tearDown()
+        trace.reset_worker_optimizations()
+
+
 def disable_stdouts(fun):
 
     @wraps(fun)
@@ -61,8 +69,7 @@ class Worker(cd.Worker):
         self.on_start()
 
 
-class test_Worker(AppCase):
-
+class test_Worker(WorkerAppCase):
     Worker = Worker
 
     def teardown(self):
@@ -369,7 +376,7 @@ class test_Worker(AppCase):
         self.assertTrue(worker_ready_sent[0])
 
 
-class test_funs(AppCase):
+class test_funs(WorkerAppCase):
 
     def test_active_thread_count(self):
         self.assertTrue(cd.active_thread_count())
@@ -417,7 +424,7 @@ class test_funs(AppCase):
             sys.argv = s
 
 
-class test_signal_handlers(AppCase):
+class test_signal_handlers(WorkerAppCase):
 
     class _Worker(object):
         stopped = False

+ 8 - 8
celery/tests/bin/test_celeryd_multi.py

@@ -261,7 +261,7 @@ class test_MultiTool(Case):
         self.assertEqual(sigs[1][0], ('b', 11, signal.SIGKILL))
         self.assertEqual(sigs[2][0], ('c', 12, signal.SIGKILL))
 
-    def prepare_pidfile_for_getpids(self, PIDFile):
+    def prepare_pidfile_for_getpids(self, Pidfile):
         class pids(object):
 
             def __init__(self, path):
@@ -273,13 +273,13 @@ class test_MultiTool(Case):
                             'celeryd@bar.pid': 11}[self.path]
                 except KeyError:
                     raise ValueError()
-        PIDFile.side_effect = pids
+        Pidfile.side_effect = pids
 
-    @patch('celery.bin.celeryd_multi.PIDFile')
+    @patch('celery.bin.celeryd_multi.Pidfile')
     @patch('socket.gethostname')
-    def test_getpids(self, gethostname, PIDFile):
+    def test_getpids(self, gethostname, Pidfile):
         gethostname.return_value = 'e.com'
-        self.prepare_pidfile_for_getpids(PIDFile)
+        self.prepare_pidfile_for_getpids(Pidfile)
         callback = Mock()
 
         p = NamespacedOptionParser(['foo', 'bar', 'baz'])
@@ -303,12 +303,12 @@ class test_MultiTool(Case):
         # without callback, should work
         nodes = self.t.getpids(p, 'celeryd', callback=None)
 
-    @patch('celery.bin.celeryd_multi.PIDFile')
+    @patch('celery.bin.celeryd_multi.Pidfile')
     @patch('socket.gethostname')
     @patch('celery.bin.celeryd_multi.sleep')
-    def test_shutdown_nodes(self, slepp, gethostname, PIDFile):
+    def test_shutdown_nodes(self, slepp, gethostname, Pidfile):
         gethostname.return_value = 'e.com'
-        self.prepare_pidfile_for_getpids(PIDFile)
+        self.prepare_pidfile_for_getpids(Pidfile)
         self.assertIsNone(self.t.shutdown_nodes([]))
         self.t.signal_node = Mock()
         node_alive = self.t.node_alive = Mock()

+ 1 - 1
celery/tests/concurrency/test_eventlet.py

@@ -41,7 +41,7 @@ class EventletCase(Case):
 class test_aaa_eventlet_patch(EventletCase):
 
     def test_aaa_is_patched(self):
-        raise SkipTest("side effects")
+        raise SkipTest('side effects')
         monkey_patched = []
         prev_monkey_patch = self.eventlet.monkey_patch
         self.eventlet.monkey_patch = lambda: monkey_patched.append(True)

+ 7 - 6
celery/tests/concurrency/test_gevent.py

@@ -111,9 +111,10 @@ class test_TasKPool(Case):
 class test_Timer(Case):
 
     def test_timer(self):
-        x = Timer()
-        x.ensure_started()
-        x.schedule = Mock()
-        x.start()
-        x.stop()
-        x.schedule.clear.assert_called_with()
+        with mock_module(*gevent_modules):
+            x = Timer()
+            x.ensure_started()
+            x.schedule = Mock()
+            x.start()
+            x.stop()
+            x.schedule.clear.assert_called_with()

+ 4 - 1
celery/tests/tasks/test_chord.py

@@ -132,7 +132,10 @@ class test_chord(AppCase):
             x = chord(add.s(i, i) for i in xrange(10))
             body = add.s(2)
             result = x(body)
-            self.assertEqual(result.id, body.options['task_id'])
+            self.assertTrue(result.id)
+            # does not modify original subtask
+            with self.assertRaises(KeyError):
+                body.options['task_id']
             self.assertTrue(chord.Chord.called)
         finally:
             chord.Chord = prev

+ 22 - 0
celery/tests/tasks/test_tasks.py

@@ -1035,9 +1035,31 @@ class test_crontab_is_due(Case):
             else:
                 break
 
+    def assertRelativedelta(self, due, last_ran):
+        try:
+            from dateutil.relativedelta import relativedelta
+        except ImportError:
+            return
+        l1, d1, n1 = due.run_every.remaining_delta(last_ran)
+        l2, d2, n2 = due.run_every.remaining_delta(last_ran,
+                                                   ffwd=relativedelta)
+        if not isinstance(d1, relativedelta):
+            self.assertEqual(l1, l2)
+            for field, value in d1._fields().iteritems():
+                self.assertEqual(getattr(d1, field), value)
+            self.assertFalse(d2.years)
+            self.assertFalse(d2.months)
+            self.assertFalse(d2.days)
+            self.assertFalse(d2.leapdays)
+            self.assertFalse(d2.hours)
+            self.assertFalse(d2.minutes)
+            self.assertFalse(d2.seconds)
+            self.assertFalse(d2.microseconds)
+
     def test_every_minute_execution_is_due(self):
         last_ran = self.now - timedelta(seconds=61)
         due, remaining = every_minute.run_every.is_due(last_ran)
+        self.assertRelativedelta(every_minute, last_ran)
         self.assertTrue(due)
         self.seconds_almost_equal(remaining, self.next_minute, 1)
 

+ 2 - 2
celery/tests/utilities/test_datastructures.py

@@ -9,7 +9,7 @@ from celery.datastructures import (
     ConfigurationView,
     DependencyGraph,
 )
-from celery.utils.threads import TIMEOUT_MAX
+from celery.utils.compat import THREAD_TIMEOUT_MAX
 from celery.tests.utils import Case, WhateverIO
 
 
@@ -222,7 +222,7 @@ class test_LRUCache(Case):
             def stop(self):
                 self._is_shutdown.set()
                 self._is_stopped.wait()
-                self.join(TIMEOUT_MAX)
+                self.join(THREAD_TIMEOUT_MAX)
 
         burglar = Burglar(x)
         burglar.start()

+ 55 - 98
celery/tests/utilities/test_platforms.py

@@ -11,53 +11,43 @@ from celery import current_app
 from celery import platforms
 from celery.platforms import (
     get_fdmax,
-    shellsplit,
-    ignore_EBADF,
+    ignore_errno,
     set_process_title,
     signals,
     maybe_drop_privileges,
     setuid,
     setgid,
-    seteuid,
-    setegid,
     initgroups,
     parse_uid,
     parse_gid,
     detached,
     DaemonContext,
     create_pidlock,
-    PIDFile,
+    Pidfile,
     LockFailed,
     setgroups,
     _setgroups_hack
 )
 
-from celery.tests.utils import Case, WhateverIO, override_stdouts
+from celery.tests.utils import Case, WhateverIO, override_stdouts, mock_open
 
 
-class test_ignore_EBADF(Case):
+class test_ignore_errno(Case):
 
     def test_raises_EBADF(self):
-        with ignore_EBADF():
+        with ignore_errno('EBADF'):
             exc = OSError()
             exc.errno = errno.EBADF
             raise exc
 
     def test_otherwise(self):
         with self.assertRaises(OSError):
-            with ignore_EBADF():
+            with ignore_errno('EBADF'):
                 exc = OSError()
                 exc.errno = errno.ENOENT
                 raise exc
 
 
-class test_shellsplit(Case):
-
-    def test_split(self):
-        self.assertEqual(shellsplit("the 'quick' brown fox"),
-                ['the', 'quick', 'brown', 'fox'])
-
-
 class test_set_process_title(Case):
 
     def when_no_setps(self):
@@ -178,20 +168,6 @@ if not current_app.IS_WINDOWS:
             parse_uid.assert_called_with('user')
             _setuid.assert_called_with(5001)
 
-        @patch('celery.platforms.parse_uid')
-        @patch('os.geteuid')
-        @patch('os.seteuid')
-        def test_seteuid(self, _seteuid, _geteuid, parse_uid):
-            parse_uid.return_value = 5001
-            _geteuid.return_value = 5001
-            seteuid('user')
-            parse_uid.assert_called_with('user')
-            self.assertFalse(_seteuid.called)
-
-            _geteuid.return_value = 1
-            seteuid('user')
-            _seteuid.assert_called_with(5001)
-
         @patch('celery.platforms.parse_gid')
         @patch('os.setgid')
         def test_setgid(self, _setgid, parse_gid):
@@ -200,20 +176,6 @@ if not current_app.IS_WINDOWS:
             parse_gid.assert_called_with('group')
             _setgid.assert_called_with(50001)
 
-        @patch('celery.platforms.parse_gid')
-        @patch('os.getegid')
-        @patch('os.setegid')
-        def test_setegid(self, _setegid, _getegid, parse_gid):
-            parse_gid.return_value = 50001
-            _getegid.return_value = 50001
-            setegid('group')
-            parse_gid.assert_called_with('group')
-            self.assertFalse(_setegid.called)
-
-            _getegid.return_value = 1
-            setegid('group')
-            _setegid.assert_called_with(50001)
-
         def test_parse_uid_when_int(self):
             self.assertEqual(parse_uid(5001), 5001)
 
@@ -362,11 +324,11 @@ if not current_app.IS_WINDOWS:
                 pass
             self.assertFalse(x._detach.called)
 
-    class test_PIDFile(Case):
+    class test_Pidfile(Case):
 
-        @patch('celery.platforms.PIDFile')
-        def test_create_pidlock(self, PIDFile):
-            p = PIDFile.return_value = Mock()
+        @patch('celery.platforms.Pidfile')
+        def test_create_pidlock(self, Pidfile):
+            p = Pidfile.return_value = Mock()
             p.is_locked.return_value = True
             p.remove_if_stale.return_value = False
             with self.assertRaises(SystemExit):
@@ -377,7 +339,7 @@ if not current_app.IS_WINDOWS:
             self.assertIs(ret, p)
 
         def test_context(self):
-            p = PIDFile('/var/pid')
+            p = Pidfile('/var/pid')
             p.write_pid = Mock()
             p.remove = Mock()
 
@@ -387,7 +349,7 @@ if not current_app.IS_WINDOWS:
             p.remove.assert_called_with()
 
         def test_acquire_raises_LockFailed(self):
-            p = PIDFile('/var/pid')
+            p = Pidfile('/var/pid')
             p.write_pid = Mock()
             p.write_pid.side_effect = OSError()
 
@@ -397,59 +359,54 @@ if not current_app.IS_WINDOWS:
 
         @patch('os.path.exists')
         def test_is_locked(self, exists):
-            p = PIDFile('/var/pid')
+            p = Pidfile('/var/pid')
             exists.return_value = True
             self.assertTrue(p.is_locked())
             exists.return_value = False
             self.assertFalse(p.is_locked())
 
-        @patch('__builtin__.open')
-        def test_read_pid(self, open_):
-            s = open_.return_value = WhateverIO()
-            s.write('1816\n')
-            s.seek(0)
-            p = PIDFile('/var/pid')
-            self.assertEqual(p.read_pid(), 1816)
-
-        @patch('__builtin__.open')
-        def test_read_pid_partially_written(self, open_):
-            s = open_.return_value = WhateverIO()
-            s.write('1816')
-            s.seek(0)
-            p = PIDFile('/var/pid')
-            with self.assertRaises(ValueError):
-                p.read_pid()
-
-        @patch('__builtin__.open')
-        def test_read_pid_raises_ENOENT(self, open_):
+        def test_read_pid(self):
+            with mock_open() as s:
+                s.write('1816\n')
+                s.seek(0)
+                p = Pidfile('/var/pid')
+                self.assertEqual(p.read_pid(), 1816)
+
+        def test_read_pid_partially_written(self):
+            with mock_open() as s:
+                s.write('1816')
+                s.seek(0)
+                p = Pidfile('/var/pid')
+                with self.assertRaises(ValueError):
+                    p.read_pid()
+
+        def test_read_pid_raises_ENOENT(self):
             exc = IOError()
             exc.errno = errno.ENOENT
-            open_.side_effect = exc
-            p = PIDFile('/var/pid')
-            self.assertIsNone(p.read_pid())
+            with mock_open(side_effect=exc):
+                p = Pidfile('/var/pid')
+                self.assertIsNone(p.read_pid())
 
-        @patch('__builtin__.open')
-        def test_read_pid_raises_IOError(self, open_):
+        def test_read_pid_raises_IOError(self):
             exc = IOError()
             exc.errno = errno.EAGAIN
-            open_.side_effect = exc
-            p = PIDFile('/var/pid')
-            with self.assertRaises(IOError):
-                p.read_pid()
-
-        @patch('__builtin__.open')
-        def test_read_pid_bogus_pidfile(self, open_):
-            s = open_.return_value = WhateverIO()
-            s.write('eighteensixteen\n')
-            s.seek(0)
-            p = PIDFile('/var/pid')
-            with self.assertRaises(ValueError):
-                p.read_pid()
+            with mock_open(side_effect=exc):
+                p = Pidfile('/var/pid')
+                with self.assertRaises(IOError):
+                    p.read_pid()
+
+        def test_read_pid_bogus_pidfile(self):
+            with mock_open() as s:
+                s.write('eighteensixteen\n')
+                s.seek(0)
+                p = Pidfile('/var/pid')
+                with self.assertRaises(ValueError):
+                    p.read_pid()
 
         @patch('os.unlink')
         def test_remove(self, unlink):
             unlink.return_value = True
-            p = PIDFile('/var/pid')
+            p = Pidfile('/var/pid')
             p.remove()
             unlink.assert_called_with(p.path)
 
@@ -458,7 +415,7 @@ if not current_app.IS_WINDOWS:
             exc = OSError()
             exc.errno = errno.ENOENT
             unlink.side_effect = exc
-            p = PIDFile('/var/pid')
+            p = Pidfile('/var/pid')
             p.remove()
             unlink.assert_called_with(p.path)
 
@@ -467,7 +424,7 @@ if not current_app.IS_WINDOWS:
             exc = OSError()
             exc.errno = errno.EACCES
             unlink.side_effect = exc
-            p = PIDFile('/var/pid')
+            p = Pidfile('/var/pid')
             p.remove()
             unlink.assert_called_with(p.path)
 
@@ -476,14 +433,14 @@ if not current_app.IS_WINDOWS:
             exc = OSError()
             exc.errno = errno.EAGAIN
             unlink.side_effect = exc
-            p = PIDFile('/var/pid')
+            p = Pidfile('/var/pid')
             with self.assertRaises(OSError):
                 p.remove()
             unlink.assert_called_with(p.path)
 
         @patch('os.kill')
         def test_remove_if_stale_process_alive(self, kill):
-            p = PIDFile('/var/pid')
+            p = Pidfile('/var/pid')
             p.read_pid = Mock()
             p.read_pid.return_value = 1816
             kill.return_value = 0
@@ -498,7 +455,7 @@ if not current_app.IS_WINDOWS:
         @patch('os.kill')
         def test_remove_if_stale_process_dead(self, kill):
             with override_stdouts():
-                p = PIDFile('/var/pid')
+                p = Pidfile('/var/pid')
                 p.read_pid = Mock()
                 p.read_pid.return_value = 1816
                 p.remove = Mock()
@@ -511,7 +468,7 @@ if not current_app.IS_WINDOWS:
 
         def test_remove_if_stale_broken_pid(self):
             with override_stdouts():
-                p = PIDFile('/var/pid')
+                p = Pidfile('/var/pid')
                 p.read_pid = Mock()
                 p.read_pid.side_effect = ValueError()
                 p.remove = Mock()
@@ -520,7 +477,7 @@ if not current_app.IS_WINDOWS:
                 p.remove.assert_called_with()
 
         def test_remove_if_stale_no_pidfile(self):
-            p = PIDFile('/var/pid')
+            p = Pidfile('/var/pid')
             p.read_pid = Mock()
             p.read_pid.return_value = None
             p.remove = Mock()
@@ -542,7 +499,7 @@ if not current_app.IS_WINDOWS:
             r.write('1816\n')
             r.seek(0)
 
-            p = PIDFile('/var/pid')
+            p = Pidfile('/var/pid')
             p.write_pid()
             w.seek(0)
             self.assertEqual(w.readline(), '1816\n')
@@ -569,7 +526,7 @@ if not current_app.IS_WINDOWS:
             r.write('11816\n')
             r.seek(0)
 
-            p = PIDFile('/var/pid')
+            p = Pidfile('/var/pid')
             with self.assertRaises(LockFailed):
                 p.write_pid()
 

+ 1 - 17
celery/tests/utilities/test_timeutils.py

@@ -2,9 +2,6 @@ from __future__ import absolute_import
 
 from datetime import datetime, timedelta
 
-from mock import Mock
-
-from celery.exceptions import ImproperlyConfigured
 from celery.utils import timeutils
 from celery.utils.timeutils import timezone
 from celery.tests.utils import Case
@@ -74,17 +71,4 @@ class test_timeutils(Case):
 class test_timezone(Case):
 
     def test_get_timezone_with_pytz(self):
-        prev, timeutils.pytz = timeutils.pytz, Mock()
-        try:
-            self.assertTrue(timezone.get_timezone('UTC'))
-        finally:
-            timeutils.pytz = prev
-
-    def test_get_timezone_without_pytz(self):
-        prev, timeutils.pytz = timeutils.pytz, None
-        try:
-            self.assertTrue(timezone.get_timezone('UTC'))
-            with self.assertRaises(ImproperlyConfigured):
-                timezone.get_timezone('Europe/Oslo')
-        finally:
-            timeutils.pytz = prev
+        self.assertTrue(timezone.get_timezone('UTC'))

+ 1 - 0
celery/tests/utils.py

@@ -514,6 +514,7 @@ def mock_open(typ=WhateverIO, side_effect=None):
             if side_effect is not None:
                 context.__enter__.side_effect = side_effect
             val = context.__enter__.return_value = typ()
+            val.__exit__ = Mock()
             yield val
 
 

+ 31 - 5
celery/tests/worker/test_request.py

@@ -27,10 +27,12 @@ from celery.exceptions import (
 )
 from celery.task.trace import (
     trace_task,
-    trace_task_ret,
+    _trace_task_ret,
     TraceInfo,
     mro_lookup,
     build_tracer,
+    setup_worker_optimizations,
+    reset_worker_optimizations,
 )
 from celery.result import AsyncResult
 from celery.signals import task_revoked
@@ -41,7 +43,7 @@ from celery.worker import job as module
 from celery.worker.job import Request, TaskRequest
 from celery.worker.state import revoked
 
-from celery.tests.utils import Case, assert_signal_called
+from celery.tests.utils import AppCase, Case, assert_signal_called
 
 scratch = {'ACK': False}
 some_kwargs_scratchpad = {}
@@ -231,7 +233,7 @@ class MockEventDispatcher(object):
         self.sent.append(event)
 
 
-class test_TaskRequest(Case):
+class test_TaskRequest(AppCase):
 
     def test_task_wrapper_repr(self):
         tw = TaskRequest(mytask.name, uuid(), [1], {'f': 'x'})
@@ -570,10 +572,34 @@ class test_TaskRequest(Case):
         finally:
             mytask.ignore_result = False
 
+    def test_fast_trace_task(self):
+        from celery.task import trace
+        setup_worker_optimizations(self.app)
+        self.assertIs(trace.trace_task_ret, trace._fast_trace_task)
+        try:
+            mytask.__trace__ = build_tracer(mytask.name, mytask,
+                                            self.app.loader, 'test')
+            res = trace.trace_task_ret(mytask.name, uuid(), [4], {})
+            self.assertEqual(res, 4 ** 4)
+        finally:
+            reset_worker_optimizations()
+            self.assertIs(trace.trace_task_ret, trace._trace_task_ret)
+        delattr(mytask, '__trace__')
+        res = trace.trace_task_ret(mytask.name, uuid(), [4], {})
+        self.assertEqual(res, 4 ** 4)
+
     def test_trace_task_ret(self):
         mytask.__trace__ = build_tracer(mytask.name, mytask,
-                                        current_app.loader, 'test')
-        res = trace_task_ret(mytask.name, uuid(), [4], {})
+                                        self.app.loader, 'test')
+        res = _trace_task_ret(mytask.name, uuid(), [4], {})
+        self.assertEqual(res, 4 ** 4)
+
+    def test_trace_task_ret__no_trace(self):
+        try:
+            delattr(mytask, '__trace__')
+        except AttributeError:
+            pass
+        res = _trace_task_ret(mytask.name, uuid(), [4], {})
         self.assertEqual(res, 4 ** 4)
 
     def test_execute_safe_catches_exception(self):

+ 3 - 0
celery/tests/worker/test_worker.py

@@ -631,6 +631,9 @@ class test_Consumer(Case):
             def channel(self):
                 return Mock()
 
+            def as_uri(self):
+                return 'dummy://'
+
             def drain_events(self, **kwargs):
                 if not self.calls:
                     self.calls += 1

+ 1 - 0
celery/utils/__init__.py

@@ -247,6 +247,7 @@ def gen_task_name(app, name, module_name):
         return '.'.join([app.main, name])
     return '.'.join(filter(None, [module_name, name]))
 
+
 # ------------------------------------------------------------------------ #
 # > XXX Compat
 from .log import LOG_LEVELS     # noqa

+ 7 - 3
celery/utils/compat.py

@@ -43,10 +43,14 @@ else:
 
 
 ############## collections.OrderedDict ######################################
+# was moved to kombu
+from kombu.utils.compat import OrderedDict  # noqa
+
+############## threading.TIMEOUT_MAX #######################################
 try:
-    from collections import OrderedDict
-except ImportError:                         # pragma: no cover
-    from ordereddict import OrderedDict     # noqa
+    from threading import TIMEOUT_MAX as THREAD_TIMEOUT_MAX
+except ImportError:
+    THREAD_TIMEOUT_MAX = 1e10  # noqa
 
 ############## format(int, ',d') ##########################
 

+ 7 - 7
celery/utils/dispatch/saferef.py

@@ -23,12 +23,12 @@ def safe_ref(target, on_delete=None):  # pragma: no cover
         goes out of scope with the reference object, (either a
         :class:`weakref.ref` or a :class:`BoundMethodWeakref`) as argument.
     """
-    if getattr(target, "im_self", None) is not None:
+    if getattr(target, 'im_self', None) is not None:
         # Turn a bound method into a BoundMethodWeakref instance.
         # Keep track of these instances for lookup by disconnect().
         assert hasattr(target, 'im_func'), \
-            """safe_ref target {0!r} has im_self, but no im_func, " \
-            "don't know how to create reference""".format(target)
+            """safe_ref target {0!r} has im_self, but no im_func: \
+            don't know how to create reference""".format(target)
         return get_bound_method_weakref(target=target,
                                         on_delete=on_delete)
     if callable(on_delete):
@@ -142,8 +142,8 @@ class BoundMethodWeakref(object):  # pragma: no cover
                     try:
                         traceback.print_exc()
                     except AttributeError:
-                        print("Exception during saferef {0} cleanup function "
-                              "{1}: {2}".format(self, function, exc))
+                        print('Exception during saferef {0} cleanup function '
+                              '{1}: {2}'.format(self, function, exc))
 
         self.deletion_methods = [on_delete]
         self.key = self.calculate_key(target)
@@ -163,7 +163,7 @@ class BoundMethodWeakref(object):  # pragma: no cover
 
     def __str__(self):
         """Give a friendly representation of the object"""
-        return """{0}( {1}.{2} )""".format(
+        return '{0}( {1}.{2} )'.format(
             type(self).__name__,
             self.self_name,
             self.func_name,
@@ -212,7 +212,7 @@ class BoundNonDescriptorMethodWeakref(BoundMethodWeakref):  # pragma: no cover
         ...     pass
 
         >>> def foo(self):
-        ...     return "foo"
+        ...     return 'foo'
         >>> A.bar = foo
 
     But this shouldn't be a common use case. So, on platforms where methods

+ 7 - 1
celery/utils/functional.py

@@ -16,8 +16,9 @@ from itertools import islice
 
 from kombu.utils import cached_property
 from kombu.utils.functional import promise, maybe_promise
+from kombu.utils.compat import OrderedDict
 
-from .compat import UserDict, UserList, OrderedDict
+from .compat import UserDict, UserList
 
 KEYWORD_MARK = object()
 is_not_None = partial(operator.is_not, None)
@@ -260,3 +261,8 @@ class _regen(UserList, list):
     @cached_property
     def data(self):
         return list(self.__it)
+
+
+def dictfilter(d, **keys):
+    d = dict(d, **keys) if keys else d
+    return dict((k, v) for k, v in d.iteritems() if v is not None)

+ 1 - 4
celery/utils/imports.py

@@ -15,10 +15,7 @@ import sys
 
 from contextlib import contextmanager
 
-# symbol_by_name was moved to local because it's used
-# early in the import stage, where celery.utils loads
-# too much (e.g. for eventlet patching)
-from celery.local import symbol_by_name
+from kombu.utils import symbol_by_name
 
 from .compat import reload
 

+ 72 - 0
celery/utils/iso8601.py

@@ -0,0 +1,72 @@
+"""
+Original taken from pyiso8601 (http://code.google.com/p/pyiso8601/)
+Modified to match the behavior of dateutil.parser:
+    - raise ValueError instead of ParseError
+    - returns naive datetimes by default
+    - uses pytz.FixedOffset
+
+This is the original License:
+
+Copyright (c) 2007 Michael Twomey
+
+Permission is hereby granted, free of charge, to any person obtaining a
+copy of this software and associated documentation files (the
+"Software"), to deal in the Software without restriction, including
+without limitation the rights to use, copy, modify, merge, publish,
+distribute, sublicense, and/or sell copies of the Software, and to
+permit persons to whom the Software is furnished to do so, subject to
+the following conditions:
+
+The above copyright notice and this permission notice shall be included
+in all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
+OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
+IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
+CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
+TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
+SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+"""
+from __future__ import absolute_import
+
+import re
+
+from datetime import datetime
+from pytz import FixedOffset
+
+# Adapted from http://delete.me.uk/2005/03/iso8601.html
+ISO8601_REGEX = re.compile(
+    r'(?P<year>[0-9]{4})(-(?P<month>[0-9]{1,2})(-(?P<day>[0-9]{1,2})'
+    r'((?P<separator>.)(?P<hour>[0-9]{2}):(?P<minute>[0-9]{2})'
+    '(:(?P<second>[0-9]{2})(\.(?P<fraction>[0-9]+))?)?'
+    r'(?P<timezone>Z|(([-+])([0-9]{2}):([0-9]{2})))?)?)?)?'
+)
+TIMEZONE_REGEX = re.compile(
+    '(?P<prefix>[+-])(?P<hours>[0-9]{2}).(?P<minutes>[0-9]{2})'
+)
+
+
+def parse_iso8601(datestring):
+    """Parses ISO 8601 dates into datetime objects"""
+    m = ISO8601_REGEX.match(datestring)
+    if not m:
+        raise ValueError('unable to parse date string %r' % datestring)
+    groups = m.groupdict()
+    tz = groups['timezone']
+    if tz and tz != 'Z':
+        m = TIMEZONE_REGEX.match(tz)
+        prefix, hours, minutes = m.groups()
+        hours, minutes = int(hours), int(minutes)
+        if prefix == '-':
+            hours = -hours
+            minutes = -minutes
+        tz = FixedOffset(minutes + hours * 60)
+    frac = groups['fraction']
+    groups['fraction'] = int(float('0.%s' % frac) * 1e6) if frac else 0
+    return datetime(
+        int(groups['year']), int(groups['month']), int(groups['day']),
+        int(groups['hour']), int(groups['minute']), int(groups['second']),
+        int(groups['fraction']), tz
+    )

+ 11 - 11
celery/utils/threads.py

@@ -13,17 +13,10 @@ import sys
 import threading
 import traceback
 
-from kombu.syn import _detect_environment
-
 from celery.local import Proxy
+from celery.utils.compat import THREAD_TIMEOUT_MAX
 
-USE_PURE_LOCALS = os.environ.get('USE_PURE_LOCALS')
-
-
-try:
-    TIMEOUT_MAX = threading.TIMEOUT_MAX
-except AttributeError:
-    TIMEOUT_MAX = 1e10  # noqa
+USE_FAST_LOCALS = os.environ.get('USE_FAST_LOCALS')
 
 
 class bgThread(threading.Thread):
@@ -76,7 +69,7 @@ class bgThread(threading.Thread):
         self._is_shutdown.set()
         self._is_stopped.wait()
         if self.is_alive():
-            self.join(TIMEOUT_MAX)
+            self.join(THREAD_TIMEOUT_MAX)
 
 try:
     from greenlet import getcurrent as get_ident
@@ -220,6 +213,10 @@ class _LocalStack(object):
         else:
             return stack.pop()
 
+    def __len__(self):
+        stack = getattr(self._local, 'stack', None)
+        return len(stack) if stack else 0
+
     @property
     def stack(self):
         """get_current_worker_task uses this to find
@@ -301,7 +298,10 @@ class _FastLocalStack(threading.local):
         except (AttributeError, IndexError):
             return None
 
-if _detect_environment() == 'default' and not USE_PURE_LOCALS:
+    def __len__(self):
+        return len(self.stack)
+
+if USE_FAST_LOCALS:
     LocalStack = _FastLocalStack
 else:
     # - See #706

+ 2 - 2
celery/utils/timer2.py

@@ -19,7 +19,7 @@ from functools import wraps
 from itertools import count, imap
 from time import time, sleep, mktime
 
-from celery.utils.threads import TIMEOUT_MAX
+from celery.utils.compat import THREAD_TIMEOUT_MAX
 from kombu.log import get_logger
 
 VERSION = (1, 0, 0)
@@ -274,7 +274,7 @@ class Timer(threading.Thread):
         if self.running:
             self._is_shutdown.set()
             self._is_stopped.wait()
-            self.join(TIMEOUT_MAX)
+            self.join(THREAD_TIMEOUT_MAX)
             self.running = False
 
     def ensure_started(self):

+ 68 - 26
celery/utils/timeutils.py

@@ -8,24 +8,23 @@
 """
 from __future__ import absolute_import
 
+import os
 import time as _time
 from itertools import izip
 
-from datetime import datetime, timedelta, tzinfo
+from calendar import monthrange
+from datetime import date, datetime, timedelta, tzinfo
 
-from dateutil import tz
-from dateutil.parser import parse as parse_iso8601
-from kombu.utils import cached_property
+from kombu.utils import cached_property, reprcall
 
-from celery.exceptions import ImproperlyConfigured
+from pytz import timezone as _timezone
 
+from .functional import dictfilter
+from .iso8601 import parse_iso8601
 from .text import pluralize
 
-try:
-    import pytz
-except ImportError:     # pragma: no cover
-    pytz = None         # noqa
 
+C_REMDEBUG = os.environ.get('C_REMDEBUG', False)
 
 DAYNAMES = 'sun', 'mon', 'tue', 'wed', 'thu', 'fri', 'sat'
 WEEKDAYS = dict(izip(DAYNAMES, range(7)))
@@ -48,11 +47,9 @@ _local_timezone = None
 
 
 class LocalTimezone(tzinfo):
-    """
-    Local time implementation taken from Python's docs.
+    """Local time implementation taken from Python's docs.
 
-    Used only when pytz isn't available, and most likely inaccurate. If you're
-    having trouble with this class, don't waste your time, just install pytz.
+    Used only when UTC is not enabled.
     """
 
     def __init__(self):
@@ -67,7 +64,7 @@ class LocalTimezone(tzinfo):
         tzinfo.__init__(self)
 
     def __repr__(self):
-        return "<LocalTimezone>"
+        return '<LocalTimezone>'
 
     def utcoffset(self, dt):
         if self._isdst(dt):
@@ -119,17 +116,12 @@ class _Zone(object):
 
     def get_timezone(self, zone):
         if isinstance(zone, basestring):
-            if pytz is None:
-                if zone == 'UTC':
-                    return tz.gettz('UTC')
-                raise ImproperlyConfigured(
-                    'Timezones requires the pytz library')
-            return pytz.timezone(zone)
+            return _timezone(zone)
         return zone
 
     @cached_property
     def local(self):
-        return tz.tzlocal()
+        return _get_local_timezone()
 
     @cached_property
     def utc(self):
@@ -207,7 +199,11 @@ def remaining(start, ends_in, now=None, relative=False):
     end_date = start + ends_in
     if relative:
         end_date = delta_resolution(end_date, ends_in)
-    return end_date - now
+    ret = end_date - now
+    if C_REMDEBUG:
+        print('rem: NOW:%r START:%r ENDS_IN:%r END_DATE:%s REM:%s' % (
+            now, start, ends_in, end_date, ret))
+    return ret
 
 
 def rate(rate):
@@ -238,15 +234,20 @@ def weekday(name):
         raise KeyError(name)
 
 
-def humanize_seconds(secs, prefix=''):
+def humanize_seconds(secs, prefix='', sep=''):
     """Show seconds in human form, e.g. 60 is "1 minute", 7200 is "2
-    hours"."""
+    hours".
+
+    :keyword prefix: Can be used to add a preposition to the output,
+        e.g. 'in' will give 'in 1 second', but add nothing to 'now'.
+
+    """
     secs = float(secs)
     for unit, divider, formatter in TIME_UNITS:
         if secs >= divider:
             w = secs / divider
-            return '{0}{1} {2}'.format(prefix, formatter(w),
-                                       pluralize(w, unit))
+            return '{0}{1}{2} {3}'.format(prefix, sep, formatter(w),
+                                          pluralize(w, unit))
     return 'now'
 
 
@@ -297,3 +298,44 @@ def maybe_make_aware(dt, tz=None):
         dt = to_utc(dt)
     return localize(dt,
         timezone.utc if tz is None else timezone.tz_or_local(tz))
+
+
+class ffwd(object):
+    """Version of relativedelta that only supports addition."""
+
+    def __init__(self, year=None, month=None, weeks=0, weekday=None, day=None,
+            hour=None, minute=None, second=None, microsecond=None, **kwargs):
+        self.year = year
+        self.month = month
+        self.weeks = weeks
+        self.weekday = weekday
+        self.day = day
+        self.hour = hour
+        self.minute = minute
+        self.second = second
+        self.microsecond = microsecond
+        self.days = weeks * 7
+        self._has_time = self.hour is not None or self.minute is not None
+
+    def __repr__(self):
+        return reprcall('ffwd', (), self._fields(weeks=self.weeks,
+                                                 weekday=self.weekday))
+
+    def __radd__(self, other):
+        if not isinstance(other, date):
+            return NotImplemented
+        year = self.year or other.year
+        month = self.month or other.month
+        day = min(monthrange(year, month)[1], self.day or other.day)
+        ret = other.replace(**dict(dictfilter(self._fields()),
+                            year=year, month=month, day=day))
+        if self.weekday is not None:
+            ret += timedelta(days=(7 - ret.weekday() + self.weekday) % 7)
+        return ret + timedelta(days=self.days)
+
+    def _fields(self, **extra):
+        return dictfilter({
+            'year': self.year, 'month': self.month, 'day': self.day,
+            'hour': self.hour, 'minute': self.minute,
+            'second': self.second, 'microsecond': self.microsecond,
+        }, **extra)

+ 8 - 11
celery/worker/__init__.py

@@ -24,12 +24,11 @@ from kombu.utils.finalize import Finalize
 from celery import concurrency as _concurrency
 from celery import platforms
 from celery import signals
-from celery.app import app_or_default, set_default_app
+from celery.app import app_or_default
 from celery.app.abstract import configurated, from_config
 from celery.exceptions import (
     ImproperlyConfigured, SystemTerminate, TaskRevokedError,
 )
-from celery.task import trace
 from celery.utils import worker_direct
 from celery.utils.imports import qualname, reload_from_cwd
 from celery.utils.log import mlevel, worker_logger as logger
@@ -43,6 +42,7 @@ try:
 except ImportError:  # pragma: no cover
     IGNORE_ERRORS = ()
 
+#: Worker states
 RUN = 0x1
 CLOSE = 0x2
 TERMINATE = 0x3
@@ -55,6 +55,9 @@ If you want to automatically declare unknown queues you can
 enable the CELERY_CREATE_MISSING_QUEUES setting.
 """
 
+#: Default socket timeout at shutdown.
+SHUTDOWN_SOCKET_TIMEOUT = 5.0
+
 
 class Namespace(bootsteps.Namespace):
     """This is the boot-step namespace of the :class:`WorkController`.
@@ -111,15 +114,6 @@ class WorkController(configurated):
 
     def __init__(self, app=None, hostname=None, **kwargs):
         self.app = app_or_default(app or self.app)
-        # all new threads start without a current app, so if an app is not
-        # passed on to the thread it will fall back to the "default app",
-        # which then could be the wrong app.  So for the worker
-        # we set this to always return our app.  This is a hack,
-        # and means that only a single app can be used for workers
-        # running in the same process.
-        set_default_app(self.app)
-        self.app.finalize()
-        trace._tasks = self.app._tasks   # optimization
         self.hostname = hostname or socket.gethostname()
         self.on_before_init(**kwargs)
 
@@ -267,6 +261,8 @@ class WorkController(configurated):
 
     def _shutdown(self, warm=True):
         what = 'Stopping' if warm else 'Terminating'
+        socket_timeout = socket.getdefaulttimeout()
+        socket.setdefaulttimeout(SHUTDOWN_SOCKET_TIMEOUT)  # Issue 975
 
         if self._state in (self.CLOSE, self.TERMINATE):
             return
@@ -297,6 +293,7 @@ class WorkController(configurated):
         if self.pidlock:
             self.pidlock.release()
         self._state = self.TERMINATE
+        socket.setdefaulttimeout(socket_timeout)
         self._shutdown_complete.set()
 
     def reload(self, modules=None, reload=False, reloader=None):

+ 3 - 7
celery/worker/autoreload.py

@@ -7,7 +7,6 @@
 """
 from __future__ import absolute_import
 
-import errno
 import hashlib
 import os
 import select
@@ -19,7 +18,7 @@ from threading import Event
 
 from kombu.utils import eventio
 
-from celery.platforms import ignore_EBADF
+from celery.platforms import ignore_errno
 from celery.utils.imports import module_file
 from celery.utils.log import get_logger
 from celery.utils.threads import bgThread
@@ -149,7 +148,7 @@ class KQueueMonitor(BaseMonitor):
         for f, fd in self.filemap.iteritems():
             if fd is not None:
                 poller.unregister(fd)
-                with ignore_EBADF():  # pragma: no cover
+                with ignore_errno('EBADF'):  # pragma: no cover
                     os.close(fd)
         self.filemap.clear()
         self.fdmap.clear()
@@ -247,11 +246,8 @@ class Autoreloader(bgThread):
 
     def body(self):
         self.on_init()
-        try:
+        with ignore_errno('EINTR', 'EAGAIN'):
             self._monitor.start()
-        except OSError as exc:
-            if exc.errno not in (errno.EINTR, errno.EAGAIN):
-                raise
 
     def _maybe_modified(self, f):
         digest = file_hash(f)

+ 43 - 18
celery/worker/consumer.py

@@ -79,6 +79,7 @@ import threading
 from time import sleep
 from Queue import Empty
 
+from kombu.syn import _detect_environment
 from kombu.utils.encoding import safe_repr
 from kombu.utils.eventio import READ, WRITE, ERR
 
@@ -86,10 +87,11 @@ from celery.app import app_or_default
 from celery.datastructures import AttributeDict
 from celery.exceptions import InvalidTaskError, SystemTerminate
 from celery.task.trace import build_tracer
+from celery.utils import text
 from celery.utils import timer2
 from celery.utils.functional import noop
 from celery.utils.log import get_logger
-from celery.utils import text
+from celery.utils.timeutils import humanize_seconds
 
 from . import state
 from .bootsteps import StartStopComponent
@@ -110,6 +112,7 @@ Received and deleted unknown message. Wrong destination?!?
 
 The full contents of the message body was: %s
 """
+
 #: Error message for when an unregistered task is received.
 UNKNOWN_TASK_ERROR = """\
 Received unregistered task of type %s.
@@ -135,16 +138,29 @@ The full contents of the message body was:
 %s
 """
 
-MESSAGE_REPORT_FMT = """\
+MESSAGE_REPORT = """\
 body: {0} {{content_type:{1} content_encoding:{2} delivery_info:{3}}}\
 """
 
 
 RETRY_CONNECTION = """\
-Consumer: Connection to broker lost. \
+consumer: Connection to broker lost. \
 Trying to re-establish the connection...\
 """
 
+CONNECTION_ERROR = """\
+consumer: Cannot connect to %s: %s.
+%s
+"""
+
+CONNECTION_RETRY = """\
+Trying again {when}...\
+"""
+
+CONNECTION_FAILOVER = """\
+Will retry using next failover.\
+"""
+
 task_reserved = state.task_reserved
 
 logger = get_logger(__name__)
@@ -153,7 +169,7 @@ info, warn, error, crit = (logger.info, logger.warn,
 
 
 def debug(msg, *args, **kwargs):
-    logger.debug('Consumer: {0}'.format(msg), *args, **kwargs)
+    logger.debug('consumer: {0}'.format(msg), *args, **kwargs)
 
 
 def dump_body(m, body):
@@ -342,6 +358,12 @@ class Consumer(object):
         if not hub:
             self.amqheartbeat = 0
 
+        if _detect_environment() == 'gevent':
+            # there's a gevent bug that causes timeouts to not be reset,
+            # so if the connection timeout is exceeded once, it can NEVER
+            # connect again.
+            self.app.conf.BROKER_CONNECTION_TIMEOUT = None
+
     def update_strategies(self):
         S = self.strategies
         app = self.app
@@ -527,10 +549,10 @@ class Consumer(object):
         self.qos.decrement_eventually()
 
     def _message_report(self, body, message):
-        return MESSAGE_REPORT_FMT.format(dump_body(message, body),
-                                         safe_repr(message.content_type),
-                                         safe_repr(message.content_encoding),
-                                         safe_repr(message.delivery_info))
+        return MESSAGE_REPORT.format(dump_body(message, body),
+                                     safe_repr(message.content_type),
+                                     safe_repr(message.content_encoding),
+                                     safe_repr(message.delivery_info))
 
     def handle_unknown_message(self, body, message):
         warn(UNKNOWN_FORMAT, self._message_report(body, message))
@@ -591,7 +613,7 @@ class Consumer(object):
             debug('Closing broker connection...')
             self.maybe_conn_error(connection.close)
 
-    def stop_consumers(self, close_connection=True):
+    def stop_consumers(self, close_connection=True, join=True):
         """Stop consuming tasks and broadcast commands, also stops
         the heartbeat thread and event dispatcher.
 
@@ -608,7 +630,7 @@ class Consumer(object):
             self.heart = self.heart.stop()
 
         debug('Cancelling task consumer...')
-        if self.task_consumer:
+        if join and self.task_consumer:
             self.maybe_conn_error(self.task_consumer.cancel)
 
         if self.event_dispatcher:
@@ -617,7 +639,7 @@ class Consumer(object):
                     self.maybe_conn_error(self.event_dispatcher.close)
 
         debug('Cancelling broadcast consumer...')
-        if self.broadcast_consumer:
+        if join and self.broadcast_consumer:
             self.maybe_conn_error(self.broadcast_consumer.cancel)
 
         if close_connection:
@@ -675,6 +697,7 @@ class Consumer(object):
         self._pidbox_node_stopped = threading.Event()
         try:
             with self._open_connection() as conn:
+                info('pidbox: Connected to %s.', conn.as_uri())
                 self.pidbox_node.channel = conn.default_channel
                 self.broadcast_consumer = self.pidbox_node.listen(
                                             callback=self.on_control)
@@ -691,7 +714,7 @@ class Consumer(object):
         """Re-establish the broker connection and set up consumers,
         heartbeat and the event dispatcher."""
         debug('Re-establishing connection to the broker...')
-        self.stop_consumers()
+        self.stop_consumers(join=False)
 
         # Clear internal queues to get rid of old messages.
         # They can't be acked anyway, as a delivery tag is specific
@@ -701,7 +724,7 @@ class Consumer(object):
 
         # Re-establish the broker connection and setup the task consumer.
         self.connection = self._open_connection()
-        debug('Connection established.')
+        info('consumer: Connected to %s.', self.connection.as_uri())
         self.task_consumer = self.app.amqp.TaskConsumer(self.connection,
                                     on_decode_error=self.on_decode_error)
         # QoS: Reset prefetch window.
@@ -746,16 +769,18 @@ class Consumer(object):
         :setting:`BROKER_CONNECTION_RETRY` setting is enabled
 
         """
+        conn = self.app.connection(heartbeat=self.amqheartbeat)
 
         # Callback called for each retry while the connection
         # can't be established.
-        def _error_handler(exc, interval):
-            error('Consumer: Connection Error: %s. '
-                  'Trying again in %d seconds...', exc, interval)
+        def _error_handler(exc, interval, next_step=CONNECTION_RETRY):
+            if getattr(conn, 'alt', None) and interval == 0:
+                next_step = CONNECTION_FAILOVER
+            error(CONNECTION_ERROR, conn.as_uri(), exc,
+                  next_step.format(when=humanize_seconds(interval, 'in', ' ')))
 
         # remember that the connection is lazy, it won't establish
         # until it's needed.
-        conn = self.app.connection(heartbeat=self.amqheartbeat)
         if not self.app.conf.BROKER_CONNECTION_RETRY:
             # retry disabled, just call connect directly.
             conn.connect()
@@ -776,7 +801,7 @@ class Consumer(object):
         # anymore.
         self.close()
         debug('Stopping consumers...')
-        self.stop_consumers(close_connection=False)
+        self.stop_consumers(close_connection=False, join=True)
 
     def close(self):
         self._state = CLOSE

+ 1 - 0
celery/worker/job.py

@@ -249,6 +249,7 @@ class Request(object):
             self._terminate_on_ack = pool, signal
 
     def _announce_revoked(self, reason, terminated, signum, expired):
+        task_ready(self)
         self.send_event('task-revoked',
                         terminated=terminated, signum=signum, expired=expired)
         if self.store_errors:

+ 2 - 1
celery/worker/state.py

@@ -17,9 +17,10 @@ import shelve
 
 from collections import defaultdict
 
+from kombu.utils import cached_property
+
 from celery import __version__
 from celery.datastructures import LimitedSet
-from celery.utils import cached_property
 
 #: Worker software/platform information.
 SOFTWARE_INFO = {'sw_ident': 'py-celery',

+ 4 - 18
docs/configuration.rst

@@ -67,24 +67,10 @@ CELERY_TIMEZONE
 
 Configure Celery to use a custom time zone.
 The timezone value can be any time zone supported by the :mod:`pytz`
-library.  :mod:`pytz` must be installed for the selected zone
-to be used.
+library.
 
-If not set then the systems default local time zone is used.
-
-.. warning::
-
-    Celery requires the :mod:`pytz` library to be installed,
-    when using custom time zones (other than UTC).  You can
-    install it using :program:`pip` or :program:`easy_install`:
-
-    .. code-block:: bash
-
-        $ pip install pytz
-
-    Pytz is a library that defines the timzones of the world,
-    it changes quite frequently so it is not included in the Python Standard
-    Library.
+If not set then the UTC timezone is used if :setting:`CELERY_ENABLE_UTC` is
+enabled, otherwise it falls back to the local timezone.
 
 .. _conf-tasks:
 
@@ -944,7 +930,7 @@ Decides if publishing task messages will be retried in the case
 of connection loss or other connection errors.
 See also :setting:`CELERY_TASK_PUBLISH_RETRY_POLICY`.
 
-Disabled by default.
+Enabled by default.
 
 .. setting:: CELERY_TASK_PUBLISH_RETRY_POLICY
 

+ 0 - 11
docs/django/first-steps-with-django.rst

@@ -120,17 +120,6 @@ use the help command:
 
     $ python manage.py celery help
 
-.. admonition:: Help, it's crashing!
-
-    If the worker crashes and spews out a lot of output when it starts
-    then you should try specifying the settings manually:
-
-    .. code-block:: bash
-
-        $ python manage.py celery worker --loglevel=info --settings=settings
-
-    This is usually happens when using older Django project layouts.
-
 Calling our task
 ================
 

+ 5 - 20
docs/faq.rst

@@ -98,17 +98,16 @@ Billiard is a fork of the Python multiprocessing module containing
 many performance and stability improvements.  It is an eventual goal
 that these improvements will be merged back into Python one day.
 
-It is also used for compatibility with older Python versions.
+It is also used for compatibility with older Python versions
+that doesn't come with the multiprocessing module.
 
 .. _`billiard`: http://pypi.python.org/pypi/billiard
 
-- `python-dateutil`_
+- `pytz`
 
-The dateutil module is used by Celery to parse ISO-8601 formatted time strings,
-as well as its ``relativedelta`` class which is used in the implementation
-of crontab style periodic tasks.
+The pytz module provides timezone definitions and related tools.
 
-.. _`python-dateutil`: http://pypi.python.org/pypi/python-dateutil
+.. _`pytz`: http://pypi.python.org/pypi/pytz
 
 django-celery
 ~~~~~~~~~~~~~
@@ -914,17 +913,3 @@ The `-B` / `--beat` option to celeryd doesn't work?
 ----------------------------------------------------------------
 **Answer**: That's right. Run `celerybeat` and `celeryd` as separate
 services instead.
-
-.. _faq-windows-django-settings:
-
-`django-celery` can't find settings?
---------------------------------------
-
-**Answer**: You need to specify the :option:`--settings` argument to
-:program:`manage.py`:
-
-.. code-block:: bash
-
-    $ python manage.py celeryd start --settings=settings
-
-See http://bit.ly/bo9RSw

+ 5 - 4
docs/getting-started/first-steps-with-celery.rst

@@ -277,12 +277,13 @@ Configuration
 Celery, like a consumer appliance doesn't need much to be operated.
 It has an input and an output, where you must connect the input to a broker and maybe
 the output to a result backend if so wanted.  But if you look closely at the back
-there is a lid revealing lots of sliders, dials and buttons: this is the configuration.
+there's a lid revealing loads of sliders, dials and buttons: this is the configuration.
 
-The default configuration should be good enough for most uses, but there
-are many things to tweak so that Celery works just the way you want it to.
+The default configuration should be good enough for most uses, but there's
+many things to tweak so Celery works just the way you want it to.
 Reading about the options available is a good idea to get familiar with what
-can be configured, see the :ref:`configuration` reference.
+can be configured. You can read about the options in the the
+:ref:`configuration` reference.
 
 The configuration can be set on the app directly or by using a dedicated
 configuration module.

+ 1 - 9
docs/getting-started/next-steps.rst

@@ -675,15 +675,7 @@ All times and dates, internally and in messages uses the UTC timezone.
 When the worker receives a message, for example with a countdown set it
 converts that UTC time to local time.  If you wish to use
 a different timezone than the system timezone then you must
-configure that using the :setting:`CELERY_TIMEZONE` setting.
-
-To use custom timezones you also have to install the :mod:`pytz` library:
-
-.. code-block:: bash
-
-    $ pip install pytz
-
-Setting a custom timezone::
+configure that using the :setting:`CELERY_TIMEZONE` setting::
 
     celery.conf.CELERY_TIMEZONE = 'Europe/London'
 

+ 9 - 8
docs/internals/protocol.rst

@@ -54,14 +54,6 @@ Message format
     will be expired when the message is received and the expiration date
     has been exceeded.
 
-* timeouts
-    :`tuple`:
-
-    .. versionadded:: 2.7
-
-    Task execution timeouts. This is a tuple of hard and soft timeouts.
-    Timeout values are `int` or `float`.
-
 Extensions
 ==========
 
@@ -107,6 +99,15 @@ to process it.
 
     A list of subtasks to apply if an error occurs while executing the task.
 
+* timeouts
+    :`tuple`:
+
+    .. versionadded:: 3.1
+
+    Task execution timeouts. This is a tuple of hard and soft timeouts.
+    Timeout values are `int` or `float`.
+
+
 Example message
 ===============
 

+ 11 - 0
docs/internals/reference/celery.worker.components.rst

@@ -0,0 +1,11 @@
+========================================
+ celery.worker.components
+========================================
+
+.. contents::
+    :local:
+.. currentmodule:: celery.worker.components
+
+.. automodule:: celery.worker.components
+    :members:
+    :undoc-members:

+ 1 - 0
docs/internals/reference/index.rst

@@ -9,6 +9,7 @@
     :maxdepth: 1
 
     celery.worker
+    celery.worker.components
     celery.worker.consumer
     celery.worker.job
     celery.worker.mediator

+ 0 - 7
docs/userguide/application.rst

@@ -146,13 +146,6 @@ that are consulted in order:
 ``config_from_object``
 ----------------------
 
-.. sidebar:: Timezones & pytz
-
-    Setting a time zone other than UTC requires the :mod:`pytz` library
-    to be installed, see the :setting:`CELERY_TIMEZONE` setting for more
-    information.
-
-
 The :meth:`@Celery.config_from_object` method loads configuration
 from a configuration object.
 

+ 1 - 1
docs/userguide/monitoring.rst

@@ -217,7 +217,7 @@ Install Celery Flower:
 
     $ pip install flower
 
-Launch Celery Flower and open http://localhost:8008 in browser:
+Launch Celery Flower and open http://localhost:5555 in browser:
 
 .. code-block:: bash
 

+ 0 - 22
docs/userguide/periodic-tasks.rst

@@ -31,15 +31,6 @@ The periodic task schedules uses the UTC time zone by default,
 but you can change the time zone used using the :setting:`CELERY_TIMEZONE`
 setting.
 
-If you use a time zone other than UTC it's recommended to install the
-:mod:`pytz` library as this can improve the accuracy and keep your timezone
-specifications up to date:
-
-.. code-block:: bash
-
-    $ pip install -U pytz
-
-
 An example time zone could be `Europe/London`:
 
 .. code-block:: python
@@ -231,19 +222,6 @@ the :setting:`CELERY_TIMEZONE` setting:
     Celery is also compatible with the new ``USE_TZ`` setting introduced
     in Django 1.4.
 
-.. note::
-
-    The `pytz`_ library is recommended when setting a default timezone.
-    If :mod:`pytz` is not installed it will fallback to the mod:`dateutil`
-    library, which depends on a system timezone file being available for
-    the timezone selected.
-
-    Timezone definitions change frequently, so for the best results
-    an up to date :mod:`pytz` installation should be used.
-
-
-.. _`pytz`: http://pypi.python.org/pypi/pytz/
-
 .. _beat-starting:
 
 Starting the Scheduler

+ 30 - 4
docs/userguide/tasks.rst

@@ -199,6 +199,9 @@ The request defines the following attributes:
 
 :taskset: The unique id of the taskset this task is a member of (if any).
 
+:chord: The unique id of the chord this task belongs to (if the task
+        is part of the header).
+
 :args: Positional arguments.
 
 :kwargs: Keyword arguments.
@@ -209,6 +212,14 @@ The request defines the following attributes:
 :is_eager: Set to :const:`True` if the task is executed locally in
            the client, and not by a worker.
 
+:eta: The original ETA of the task (if any).
+      This is in UTC time (depending on the :setting:`CELERY_ENABLE_UTC`
+      setting).
+
+:expires: The original expiry time of the task (if any).
+          This is in UTC time (depending on the :setting:`CELERY_ENABLE_UTC`
+          setting).
+
 :logfile: The file the worker logs to.  See `Logging`_.
 
 :loglevel: The current log level used.
@@ -222,6 +233,15 @@ The request defines the following attributes:
                 Availability of keys in this dict depends on the
                 message broker used.
 
+:called_directly: This flag is set to true if the task was not
+                  executed by the worker.
+
+:callbacks: A list of subtasks to be called if this task returns successfully.
+
+:errback: A list of subtasks to be called if this task fails.
+
+:utc: Set to true the caller has utc enabled (:setting:`CELERY_ENABLE_UTC`).
+
 
 An example task accessing information in the context is:
 
@@ -390,6 +410,10 @@ General
     exception will be raised.  *NOTE:* You have to call :meth:`~@Task.retry`
     manually, as it will not automatically retry on exception..
 
+    The default value is 3.
+    A value of :const:`None` will disable the retry limit and the
+    task will retry forever until it succeeds.
+
 .. attribute:: Task.default_retry_delay
 
     Default time in seconds before a retry of the task
@@ -398,8 +422,10 @@ General
 
 .. attribute:: Task.rate_limit
 
-    Set the rate limit for this task type, i.e. how many times in
-    a given period of time is the task allowed to run.
+    Set the rate limit for this task type which limits the number of tasks
+    that can be run in a given time frame.  Tasks will still complete when
+    a rate limit is in effect, but it may take some time before it's allowed to
+    start.
 
     If this is :const:`None` no rate limit is in effect.
     If it is an integer, it is interpreted as "tasks per second".
@@ -699,8 +725,8 @@ state metadata.  This can then be used to create e.g. progress bars.
 Creating pickleable exceptions
 ------------------------------
 
-A little known Python fact is that exceptions must behave a certain
-way to support being pickled.
+A rarely known Python fact is that exceptions must conform to some
+simple rules to support being serialized by the pickle module.
 
 Tasks that raise exceptions that are not pickleable will not work
 properly when Pickle is used as the serializer.

+ 1 - 1
extra/generic-init.d/celerybeat

@@ -26,7 +26,7 @@ DEFAULT_LOG_FILE="/var/log/celerybeat.log"
 DEFAULT_LOG_LEVEL="INFO"
 DEFAULT_CELERYBEAT="celery beat"
 
-# /etc/init.d/ssh: start and stop the celery task worker daemon.
+# /etc/init.d/celerybeat: start and stop the celery periodic task scheduler daemon.
 
 if test -f /etc/default/celeryd; then
     . /etc/default/celeryd

+ 32 - 8
extra/release/verify-reference-index.sh

@@ -1,21 +1,45 @@
 #!/bin/bash
 
+RETVAL=0
+
 verify_index() {
+    retval=0
+    for refdir in $*; do
+        verify_modules_in_index "$refdir/index.rst"
+        verify_files "$refdir"
+    done
+    return $RETVAL
+}
+
+verify_files() {
+    for path in $1/*.rst; do
+        rst=${path##*/}
+        modname=${rst%*.rst}
+        if [ $modname != "index" ]; then
+            modpath=$(echo $modname | tr . /)
+            pkg="$modpath/__init__.py"
+            mod="$modpath.py"
+            if [ ! -f "$pkg" ]; then
+                if [ ! -f "$mod" ]; then
+                    echo "*** NO MODULE $modname for reference '$path'"
+                    RETVAL=1
+                fi
+            fi
+        fi
+    done
+}
+
+verify_modules_in_index() {
     modules=$(grep "celery." "$1" | \
                 perl -ple's/^\s*|\s*$//g;s{\.}{/}g;')
-    retval=0
     for module in $modules; do
         if [ ! -f "$module.py" ]; then
             if [ ! -f "$module/__init__.py" ]; then
-                echo "Outdated reference: $module"
-                retval=1
+                echo "*** IN INDEX BUT NO MODULE: $module"
+                RETVAL=1
             fi
         fi
     done
-
-    return $retval
 }
 
-verify_index docs/reference/index.rst && \
-    verify_index docs/internals/reference/index.rst
-
+verify_index docs/reference docs/internals/reference

+ 4 - 1
funtests/benchmarks/bench_worker.py

@@ -4,7 +4,10 @@ import os
 import sys
 import time
 
-os.environ['NOSETPS'] = 'yes'
+os.environ.update(
+    NOSETPS='yes',
+    USE_FAST_LOCALS='yes',
+)
 
 import anyjson
 JSONIMP = os.environ.get('JSONIMP')

+ 14 - 11
requirements/README.rst

@@ -10,13 +10,9 @@ Index
 
     Default requirements for Python 2.7+.
 
-* :file:`requirements/default-py3k.txt`
+* :file:`requirements/extra-py3k.txt`
 
-    Default requirements for Python 3.2+.
-
-* :file:`requirements/py26.txt`
-
-    Extra requirements needed to run on Python 2.6.
+    Extra requirements for Python 3.2+.
 
 * :file:`requirements/jython.txt`
 
@@ -47,15 +43,22 @@ Index
 
     Requirement file installing the current master branch of Celery and deps.
 
-
 Examples
 ========
 
-Running the tests using Python 2.6
-----------------------------------
+Installing requirements for running Python 3
+--------------------------------------------
 
 ::
 
-    $ pip -E $VIRTUAL_ENV install -U -r requirements/default.txt
-    $ pip -E $VIRTUAL_ENV install -U -r requirements/test.txt
+    $ pip install -U -r requirements/default.txt
+    $ pip install -U -r requirements/extra-py3k.txt
+
+
+Running the tests
+-----------------
+
+::
 
+    $ pip install -U -r requirements/default.txt
+    $ pip install -U -r requirements/test.txt

+ 0 - 4
requirements/default-py3k.txt

@@ -1,4 +0,0 @@
-billiard>=2.7.3.12
-python-dateutil>=2.1
-pytz
-kombu>=2.4.5,<3.0

+ 3 - 3
requirements/default.txt

@@ -1,3 +1,3 @@
-billiard>=2.7.3.12
-python-dateutil>=2.1
-kombu>=2.4.5,<3.0
+pytz
+billiard>=2.7.3.13
+kombu>=2.4.7,<3.0

+ 1 - 0
requirements/extra-py3k.txt

@@ -0,0 +1 @@
+pytz

+ 0 - 2
requirements/py26.txt

@@ -1,2 +0,0 @@
-importlib
-ordereddict

+ 3 - 6
setup.cfg

@@ -14,9 +14,6 @@ all_files = 1
 upload-dir = docs/.build/html
 
 [bdist_rpm]
-requires = uuid
-           importlib
-           billiard >= 2.7.3.12
-           python-dateutil >= 2.1
-           kombu >= 2.4.5
-           ordereddict
+requires = pytz
+           billiard>=2.7.3.13
+           kombu >= 2.4.7

+ 5 - 6
setup.py

@@ -151,12 +151,11 @@ def reqs(f):
     return filter(None, [strip_comments(l) for l in open(
         os.path.join(os.getcwd(), 'requirements', f)).readlines()])
 
-install_requires = reqs('default-py3k.txt' if is_py3k else 'default.txt')
-
+install_requires = reqs('default.txt')
+if is_py3k:
+    install_requires.extend(reqs('extra-py3k.txt'))
 if is_jython:
     install_requires.extend(reqs('jython.txt'))
-if py_version[0:2] == (2, 6):
-    install_requires.extend(reqs('py26.txt'))
 
 # -*- Tests Requires -*-
 
@@ -178,11 +177,11 @@ console_scripts = entrypoints['console_scripts'] = [
 if CELERY_COMPAT_PROGRAMS:
     console_scripts.extend([
         'celeryd = celery.__main__:_compat_worker',
-        'celerybeat = celery.bin.celerybeat:main',
+        'celerybeat = celery.__main__:_compat_beat',
         'camqadm = celery.bin.camqadm:main',
         'celeryev = celery.bin.celeryev:main',
         'celeryctl = celery.bin.celeryctl:main',
-        'celeryd-multi = celery.bin.celeryd_multi:main',
+        'celeryd-multi = celery.__main__:_compat_multi',
     ])
 
 # bundles: Only relevant for Celery developers.

+ 4 - 3
tox.ini

@@ -10,7 +10,8 @@ commands = nosetests
 recreate = True
 basepython = python3.2
 changedir = .tox
-deps = -r{toxinidir}/requirements/default-py3k.txt
+deps = -r{toxinidir}/requirements/default.txt
+       -r{toxinidir}/requirements/extra-py3k.txt
 commands = {toxinidir}/extra/release/removepyc.sh {toxinidir}
            {envbindir}/easy_install -U distribute
            {envbindir}/pip install                              \
@@ -24,7 +25,8 @@ commands = {toxinidir}/extra/release/removepyc.sh {toxinidir}
 recreate = True
 basepython = python3.3
 changedir = .tox
-deps = -r{toxinidir}/requirements/default-py3k.txt
+deps = -r{toxinidir}/requirements/default.txt
+       -r{toxinidir}/requirements/extra-py3k.txt
 commands = {toxinidir}/extra/release/removepyc.sh {toxinidir}
            {envbindir}/easy_install -U distribute
            {envbindir}/pip install                              \
@@ -49,7 +51,6 @@ commands = {toxinidir}/extra/release/removepyc.sh {toxinidir}
 [testenv:py26]
 basepython = python2.6
 deps = -r{toxinidir}/requirements/default.txt
-       -r{toxinidir}/requirements/py26.txt
        -r{toxinidir}/requirements/test.txt
        -r{toxinidir}/requirements/test-ci.txt
 commands = {toxinidir}/extra/release/removepyc.sh {toxinidir}