Browse Source

Merge branch 'master' into asyncmove

Ask Solem 11 years ago
parent
commit
3ca2519ae9

+ 70 - 3
celery/__init__.py

@@ -38,12 +38,12 @@ if os.environ.get('C_IMPDEBUG'):  # pragma: no cover
         return real_import(name, locals, globals, fromlist, level)
     builtins.__import__ = debug_import
 
+# This is never executed, but tricks static analyzers (PyDev, PyCharm,
+# pylint, etc.) into knowing the types of these symbols, and what
+# they contain.
 STATICA_HACK = True
 globals()['kcah_acitats'[::-1].upper()] = False
 if STATICA_HACK:  # pragma: no cover
-    # This is never executed, but tricks static analyzers (PyDev, PyCharm,
-    # pylint, etc.) into knowing the types of these symbols, and what
-    # they contain.
     from celery.app import shared_task                   # noqa
     from celery.app.base import Celery                   # noqa
     from celery.app.utils import bugreport               # noqa
@@ -54,6 +54,71 @@ if STATICA_HACK:  # pragma: no cover
     )
     from celery.utils import uuid                        # noqa
 
+# Eventlet/gevent patching must happen before importing
+# anything else, so these tools must be at top-level.
+
+
+def _find_option_with_arg(argv, short_opts=None, long_opts=None):
+    """Search argv for option specifying its short and longopt
+    alternatives.
+
+    Returns the value of the option if found.
+
+    """
+    for i, arg in enumerate(argv):
+        if arg.startswith('-'):
+            if long_opts and arg.startswith('--'):
+                name, _, val = arg.partition('=')
+                if name in long_opts:
+                    return val
+            if short_opts and arg in short_opts:
+                return argv[i + 1]
+    raise KeyError('|'.join(short_opts or [] + long_opts or []))
+
+
+def _patch_eventlet():
+    import eventlet
+    import eventlet.debug
+    eventlet.monkey_patch()
+    EVENTLET_DBLOCK = int(os.environ.get('EVENTLET_NOBLOCK', 0))
+    if EVENTLET_DBLOCK:
+        eventlet.debug.hub_blocking_detection(EVENTLET_DBLOCK)
+
+
+def _patch_gevent():
+    from gevent import monkey, version_info
+    monkey.patch_all()
+    if version_info[0] == 0:  # pragma: no cover
+        # Signals aren't working in gevent versions <1.0,
+        # and are not monkey patched by patch_all()
+        from gevent import signal as _gevent_signal
+        _signal = __import__('signal')
+        _signal.signal = _gevent_signal
+
+
+def maybe_patch_concurrency(argv=sys.argv,
+                            short_opts=['-P'], long_opts=['--pool'],
+                            patches={'eventlet': _patch_eventlet,
+                                     'gevent': _patch_gevent}):
+    """With short and long opt alternatives that specify the command line
+    option to set the pool, this makes sure that anything that needs
+    to be patched is completed as early as possible.
+    (e.g. eventlet/gevent monkey patches)."""
+    try:
+        pool = _find_option_with_arg(argv, short_opts, long_opts)
+    except KeyError:
+        pass
+    else:
+        try:
+            patcher = patches[pool]
+        except KeyError:
+            pass
+        else:
+            patcher()
+        # set up eventlet/gevent environments ASAP.
+        from celery import concurrency
+        concurrency.get_implementation(pool)
+
 # Lazy loading
 from .five import recreate_module
 
@@ -73,6 +138,8 @@ old_module, new_module = recreate_module(  # pragma: no cover
     __author__=__author__, __contact__=__contact__,
     __homepage__=__homepage__, __docformat__=__docformat__,
     VERSION=VERSION, SERIES=SERIES, VERSION_BANNER=VERSION_BANNER,
+    maybe_patch_concurrency=maybe_patch_concurrency,
+    _find_option_with_arg=_find_option_with_arg,
 )
 
 

+ 2 - 5
celery/__main__.py

@@ -4,6 +4,8 @@ import sys
 
 from os.path import basename
 
+from . import maybe_patch_concurrency
+
 __all__ = ['main']
 
 DEPRECATED_FMT = """
@@ -21,11 +23,6 @@ def _warn_deprecated(new):
     )
 
 
-def maybe_patch_concurrency():
-    from celery.platforms import maybe_patch_concurrency
-    maybe_patch_concurrency(sys.argv, ['-P'], ['--pool'])
-
-
 def main():
     maybe_patch_concurrency()
     from celery.bin.celery import main

+ 2 - 2
celery/app/control.py

@@ -104,8 +104,8 @@ class Inspect(object):
     def conf(self):
         return self._request('dump_conf')
 
-    def hello(self):
-        return self._request('hello')
+    def hello(self, from_node, revoked=None):
+        return self._request('hello', from_node=from_node, revoked=revoked)
 
     def memsample(self):
         return self._request('memsample')

+ 3 - 2
celery/app/defaults.py

@@ -39,7 +39,8 @@ DEFAULT_LOG_FMT = '[%(asctime)s: %(levelname)s] %(message)s'
 DEFAULT_TASK_LOG_FMT = """[%(asctime)s: %(levelname)s/%(processName)s] \
 %(task_name)s[%(task_id)s]: %(message)s"""
 
-_BROKER_OLD = {'deprecate_by': '2.5', 'remove_by': '4.0', 'alt': 'BROKER_URL'}
+_BROKER_OLD = {'deprecate_by': '2.5', 'remove_by': '4.0',
+               'alt': 'BROKER_URL setting'}
 _REDIS_OLD = {'deprecate_by': '2.5', 'remove_by': '4.0',
               'alt': 'URL form of CELERY_RESULT_BACKEND'}
 
@@ -241,7 +242,7 @@ def find_deprecated_settings(source):
             warn_deprecated(description='The {0!r} setting'.format(name),
                             deprecation=opt.deprecate_by,
                             removal=opt.remove_by,
-                            alternative='Use {0.alt} instead'.format(opt))
+                            alternative='Use the {0.alt} instead'.format(opt))
     return source
 
 

+ 1 - 0
celery/backends/__init__.py

@@ -30,6 +30,7 @@ BACKEND_ALIASES = {
     'cache': 'celery.backends.cache:CacheBackend',
     'redis': 'celery.backends.redis:RedisBackend',
     'mongodb': 'celery.backends.mongodb:MongoBackend',
+    'db': 'celery.backends.database:DatabaseBackend',
     'database': 'celery.backends.database:DatabaseBackend',
     'cassandra': 'celery.backends.cassandra:CassandraBackend',
     'couchbase': 'celery.backends.couchbase:CouchBaseBackend',

+ 10 - 8
celery/backends/base.py

@@ -19,8 +19,9 @@ import sys
 from datetime import timedelta
 
 from billiard.einfo import ExceptionInfo
+from kombu.async import maybe_block
 from kombu.serialization import (
-    encode, decode, prepare_accept_content,
+    dumps, loads, prepare_accept_content,
     registry as serializer_registry,
 )
 from kombu.utils.encoding import bytes_to_str, ensure_bytes, from_utf8
@@ -138,15 +139,15 @@ class BaseBackend(object):
         return result
 
     def encode(self, data):
-        _, _, payload = encode(data, serializer=self.serializer)
+        _, _, payload = dumps(data, serializer=self.serializer)
         return payload
 
     def decode(self, payload):
         payload = PY3 and payload or str(payload)
-        return decode(payload,
-                      content_type=self.content_type,
-                      content_encoding=self.content_encoding,
-                      accept=self.accept)
+        return loads(payload,
+                     content_type=self.content_type,
+                     content_encoding=self.content_encoding,
+                     accept=self.accept)
 
     def wait_for(self, task_id, timeout=None, propagate=True, interval=0.5):
         """Wait for task and return its result.
@@ -198,8 +199,9 @@ class BaseBackend(object):
     def store_result(self, task_id, result, status, traceback=None, **kwargs):
         """Update task state and result."""
         result = self.encode_result(result, status)
-        self._store_result(task_id, result, status, traceback, **kwargs)
-        return result
+        with maybe_block():
+            self._store_result(task_id, result, status, traceback, **kwargs)
+            return result
 
     def forget(self, task_id):
         self._cache.pop(task_id, None)

+ 4 - 8
celery/bin/base.py

@@ -79,13 +79,10 @@ from optparse import OptionParser, IndentedHelpFormatter, make_option as Option
 from pprint import pformat
 from types import ModuleType
 
-import celery
+from celery import VERSION_BANNER, Celery, maybe_patch_concurrency
 from celery.exceptions import CDeprecationWarning, CPendingDeprecationWarning
 from celery.five import items, string, string_t, values
-from celery.platforms import (
-    EX_FAILURE, EX_OK, EX_USAGE,
-    maybe_patch_concurrency,
-)
+from celery.platforms import EX_FAILURE, EX_OK, EX_USAGE
 from celery.utils import term
 from celery.utils import text
 from celery.utils.imports import symbol_by_name, import_from_cwd
@@ -181,7 +178,7 @@ class Command(object):
     args = ''
 
     #: Application version.
-    version = celery.VERSION_BANNER
+    version = VERSION_BANNER
 
     #: If false the parser will raise an exception if positional
     #: args are provided.
@@ -429,7 +426,7 @@ class Command(object):
             if self.enable_config_from_cmdline:
                 argv = self.process_cmdline_config(argv)
         else:
-            self.app = celery.Celery()
+            self.app = Celery()
         return argv
 
     def find_app(self, app):
@@ -448,7 +445,6 @@ class Command(object):
                                              app.replace(':', '')))
                     except ImportError:
                         pass
-                from celery.app.base import Celery
                 for suspect in values(vars(sym)):
                     if isinstance(suspect, Celery):
                         return suspect

+ 3 - 0
celery/bin/celeryd_detach.py

@@ -27,6 +27,8 @@ __all__ = ['detached_celeryd', 'detach']
 
 logger = get_logger(__name__)
 
+C_FAKEFORK = os.environ.get('C_FAKEFORK')
+
 OPTION_LIST = daemon_options(default_pidfile='celeryd.pid') + (
     Option('--fake',
            default=False, action='store_true', dest='fake',
@@ -36,6 +38,7 @@ OPTION_LIST = daemon_options(default_pidfile='celeryd.pid') + (
 
 def detach(path, argv, logfile=None, pidfile=None, uid=None,
            gid=None, umask=0, working_directory=None, fake=False, app=None):
+    fake = 1 if C_FAKEFORK else fake
     with detached(logfile, pidfile, uid, gid, umask, working_directory, fake):
         try:
             os.execv(path, [path] + argv)

+ 0 - 12
celery/concurrency/eventlet.py

@@ -8,13 +8,10 @@
 """
 from __future__ import absolute_import
 
-import os
 import sys
 
 __all__ = ['TaskPool']
 
-EVENTLET_NOPATCH = os.environ.get('EVENTLET_NOPATCH', False)
-EVENTLET_DBLOCK = int(os.environ.get('EVENTLET_NOBLOCK', 0))
 W_RACE = """\
 Celery module with %s imported before eventlet patched\
 """
@@ -30,15 +27,6 @@ for mod in (mod for mod in sys.modules if mod.startswith(RACE_MODS)):
             warnings.warn(RuntimeWarning(W_RACE % side))
 
 
-PATCHED = [0]
-if not EVENTLET_NOPATCH and not PATCHED[0]:  # pragma: no cover
-    PATCHED[0] += 1
-    import eventlet
-    import eventlet.debug
-    eventlet.monkey_patch()
-    if EVENTLET_DBLOCK:
-        eventlet.debug.hub_blocking_detection(EVENTLET_DBLOCK)
-
 from time import time
 
 from celery import signals

+ 0 - 14
celery/concurrency/gevent.py

@@ -8,20 +8,6 @@
 """
 from __future__ import absolute_import
 
-import os
-
-PATCHED = [0]
-if not os.environ.get('GEVENT_NOPATCH') and not PATCHED[0]:
-    PATCHED[0] += 1
-    from gevent import monkey, version_info
-    monkey.patch_all()
-    if version_info[0] == 0:  # pragma: no cover
-        # Signals aren't working in gevent versions <1.0,
-        # and are not monkey patched by patch_all()
-        from gevent import signal as _gevent_signal
-        _signal = __import__('signal')
-        _signal.signal = _gevent_signal
-
 try:
     from gevent import Timeout
 except ImportError:  # pragma: no cover

+ 1 - 2
celery/contrib/migrate.py

@@ -15,7 +15,6 @@ from itertools import cycle, islice
 
 from kombu import eventloop, Queue
 from kombu.common import maybe_declare
-from kombu.exceptions import StdChannelError
 from kombu.utils.encoding import ensure_bytes
 
 from celery.app import app_or_default
@@ -288,7 +287,7 @@ def start_filter(app, conn, filter, limit=None, timeout=1.0,
             _, mcount, _ = queue(consumer.channel).queue_declare(passive=True)
             if mcount:
                 state.total_apx += mcount
-        except conn.channel_errors + (StdChannelError, ):
+        except conn.channel_errors:
             pass
 
     # start migrating messages.

+ 0 - 33
celery/platforms.py

@@ -75,39 +75,6 @@ def pyimplementation():
         return 'CPython'
 
 
-def _find_option_with_arg(argv, short_opts=None, long_opts=None):
-    """Search argv for option specifying its short and longopt
-    alternatives.
-
-    Returns the value of the option if found.
-
-    """
-    for i, arg in enumerate(argv):
-        if arg.startswith('-'):
-            if long_opts and arg.startswith('--'):
-                name, _, val = arg.partition('=')
-                if name in long_opts:
-                    return val
-            if short_opts and arg in short_opts:
-                return argv[i + 1]
-    raise KeyError('|'.join(short_opts or [] + long_opts or []))
-
-
-def maybe_patch_concurrency(argv, short_opts=None, long_opts=None):
-    """With short and long opt alternatives that specify the command line
-    option to set the pool, this makes sure that anything that needs
-    to be patched is completed as early as possible.
-    (e.g. eventlet/gevent monkey patches)."""
-    try:
-        pool = _find_option_with_arg(argv, short_opts, long_opts)
-    except KeyError:
-        pass
-    else:
-        # set up eventlet/gevent environments ASAP.
-        from celery import concurrency
-        concurrency.get_implementation(pool)
-
-
 class LockFailed(Exception):
     """Raised if a pidlock can't be acquired."""
 

+ 4 - 4
celery/security/serialization.py

@@ -10,7 +10,7 @@ from __future__ import absolute_import
 
 import base64
 
-from kombu.serialization import registry, encode, decode
+from kombu.serialization import registry, dumps, loads
 from kombu.utils.encoding import bytes_to_str, str_to_bytes, ensure_bytes
 
 from .certificate import Certificate, FSCertStore
@@ -43,7 +43,7 @@ class SecureSerializer(object):
         assert self._key is not None
         assert self._cert is not None
         with reraise_errors('Unable to serialize: {0!r}', (Exception, )):
-            content_type, content_encoding, body = encode(
+            content_type, content_encoding, body = dumps(
                 data, serializer=self._serializer)
             # What we sign is the serialized body, not the body itself.
             # this way the receiver doesn't have to decode the contents
@@ -63,8 +63,8 @@ class SecureSerializer(object):
                                        payload['signer'],
                                        payload['body'])
             self._cert_store[signer].verify(body, signature, self._digest)
-        return decode(bytes_to_str(body), payload['content_type'],
-                      payload['content_encoding'], force=True)
+        return loads(bytes_to_str(body), payload['content_type'],
+                     payload['content_encoding'], force=True)
 
     def _pack(self, body, content_type, content_encoding, signer, signature,
               sep=str_to_bytes('\x00\x01')):

+ 6 - 10
celery/tests/__init__.py

@@ -14,18 +14,14 @@ except NameError:
     class WindowsError(Exception):
         pass
 
-os.environ.update(
-    #: warn if config module not found
-    C_WNOCONF='yes',
-    EVENTLET_NOPATCH='yes',
-    GEVENT_NOPATCH='yes',
-    KOMBU_DISABLE_LIMIT_PROTECTION='yes',
-    # virtual.QoS will not do sanity assertions when this is set.
-    KOMBU_UNITTEST='yes',
-)
-
 
 def setup():
+    os.environ.update(
+        # warn if config module not found
+        C_WNOCONF='yes',
+        KOMBU_DISABLE_LIMIT_PROTECTION='yes',
+    )
+
     if os.environ.get('COVER_ALL_MODULES') or '--with-coverage3' in sys.argv:
         from warnings import catch_warnings
         with catch_warnings(record=True):

+ 1 - 1
celery/tests/app/test_control.py

@@ -91,7 +91,7 @@ class test_inspect(AppCase):
 
     @with_mock_broadcast
     def test_hello(self):
-        self.i.hello()
+        self.i.hello('george@vandelay.com')
         self.assertIn('hello', MockMailbox.sent)
 
     @with_mock_broadcast

+ 1 - 1
celery/tests/bin/test_base.py

@@ -222,7 +222,7 @@ class test_Command(AppCase):
     def test_setup_app_no_respect(self):
         cmd = MockCommand()
         cmd.respects_app_option = False
-        with patch('celery.Celery') as cp:
+        with patch('celery.bin.base.Celery') as cp:
             cmd.setup_app_from_commandline(['--app=x.y:z'])
             self.assertTrue(cp.called)
 

+ 0 - 5
celery/tests/bin/test_celery.py

@@ -40,11 +40,6 @@ class test__main__(AppCase):
             self.assertIn('command is deprecated', stdout.getvalue())
             self.assertIn('YADDA YADDA', stdout.getvalue())
 
-    def test_maybe_patch_concurrency(self):
-        with patch('celery.platforms.maybe_patch_concurrency') as _mpc:
-            __main__.maybe_patch_concurrency()
-            _mpc.assert_called_with(sys.argv, ['-P'], ['--pool'])
-
     def test_main(self):
         with patch('celery.__main__.maybe_patch_concurrency') as mpc:
             with patch('celery.bin.celery.main') as main:

+ 4 - 14
celery/tests/concurrency/test_eventlet.py

@@ -1,6 +1,5 @@
 from __future__ import absolute_import
 
-import os
 import sys
 
 from nose import SkipTest
@@ -41,19 +40,10 @@ class EventletCase(AppCase):
 class test_aaa_eventlet_patch(EventletCase):
 
     def test_aaa_is_patched(self):
-        raise SkipTest('side effects')
-        monkey_patched = []
-        prev_monkey_patch = self.eventlet.monkey_patch
-        self.eventlet.monkey_patch = lambda: monkey_patched.append(True)
-        prev_eventlet = sys.modules.pop('celery.concurrency.eventlet', None)
-        os.environ.pop('EVENTLET_NOPATCH')
-        try:
-            import celery.concurrency.eventlet  # noqa
-            self.assertTrue(monkey_patched)
-        finally:
-            sys.modules['celery.concurrency.eventlet'] = prev_eventlet
-            os.environ['EVENTLET_NOPATCH'] = 'yes'
-            self.eventlet.monkey_patch = prev_monkey_patch
+        with patch('eventlet.monkey_patch', create=True) as monkey_patch:
+            from celery import maybe_patch_concurrency
+            maybe_patch_concurrency(['x', '-P', 'eventlet'])
+            monkey_patch.assert_called_with()
 
 
 eventlet_modules = (

+ 7 - 19
celery/tests/concurrency/test_gevent.py

@@ -1,8 +1,5 @@
 from __future__ import absolute_import
 
-import os
-import sys
-
 from nose import SkipTest
 from mock import Mock
 
@@ -14,7 +11,7 @@ from celery.concurrency.gevent import (
 )
 
 from celery.tests.case import (
-    AppCase, mock_module, patch_many, skip_if_pypy,
+    AppCase, mock_module, patch, patch_many, skip_if_pypy,
 )
 
 gevent_modules = (
@@ -41,21 +38,12 @@ class test_gevent_patch(GeventCase):
 
     def test_is_patched(self):
         with mock_module(*gevent_modules):
-            monkey_patched = []
-            import gevent
-            from gevent import monkey
-            gevent.version_info = (1, 0, 0)
-            prev_monkey_patch = monkey.patch_all
-            monkey.patch_all = lambda: monkey_patched.append(True)
-            prev_gevent = sys.modules.pop('celery.concurrency.gevent', None)
-            os.environ.pop('GEVENT_NOPATCH')
-            try:
-                import celery.concurrency.gevent  # noqa
-                self.assertTrue(monkey_patched)
-            finally:
-                sys.modules['celery.concurrency.gevent'] = prev_gevent
-                os.environ['GEVENT_NOPATCH'] = 'yes'
-                monkey.patch_all = prev_monkey_patch
+            with patch('gevent.monkey.patch_all', create=True) as patch_all:
+                import gevent
+                gevent.version_info = (1, 0, 0)
+                from celery import maybe_patch_concurrency
+                maybe_patch_concurrency(['x', '-P', 'gevent'])
+                self.assertTrue(patch_all.called)
 
 
 class test_Schedule(AppCase):

+ 3 - 2
celery/tests/contrib/test_migrate.py

@@ -3,8 +3,9 @@ from __future__ import absolute_import, unicode_literals
 from contextlib import contextmanager
 from mock import patch
 
+from amqp import ChannelError
+
 from kombu import Connection, Producer, Queue, Exchange
-from kombu.exceptions import StdChannelError
 
 from kombu.transport.virtual import QoS
 
@@ -300,7 +301,7 @@ class test_migrate_tasks(AppCase):
 
             def effect(*args, **kwargs):
                 if kwargs.get('passive'):
-                    raise StdChannelError()
+                    raise ChannelError('some channel error')
                 return 0, 3, 0
             qd.side_effect = effect
             migrate_tasks(x, y, app=self.app)

+ 1 - 1
celery/tests/utils/test_platforms.py

@@ -7,6 +7,7 @@ import signal
 
 from mock import Mock, patch, call
 
+from celery import _find_option_with_arg
 from celery import platforms
 from celery.five import open_fqdn
 from celery.platforms import (
@@ -27,7 +28,6 @@ from celery.platforms import (
     LockFailed,
     setgroups,
     _setgroups_hack,
-    _find_option_with_arg,
     close_open_fds,
 )
 

+ 1 - 1
celery/tests/worker/test_consumer.py

@@ -247,7 +247,7 @@ class test_Mingle(AppCase):
             }
 
             mingle.start(c)
-            I.hello.assert_called_with()
+            I.hello.assert_called_with(c.hostname, worker_state.revoked._data)
             c.app.clock.adjust.assert_has_calls([
                 call(312), call(29),
             ], any_order=True)

+ 2 - 1
celery/tests/worker/test_control.py

@@ -125,6 +125,7 @@ class test_ControlPanel(AppCase):
 
     def create_state(self, **kwargs):
         kwargs.setdefault('app', self.app)
+        kwargs.setdefault('hostname', hostname)
         return AttributeDict(kwargs)
 
     def create_panel(self, **kwargs):
@@ -165,7 +166,7 @@ class test_ControlPanel(AppCase):
         panel.state.app.clock.value = 313
         worker_state.revoked.add('revoked1')
         try:
-            x = panel.handle('hello')
+            x = panel.handle('hello', {'from_node': 'george@vandelay.com'})
             self.assertIn('revoked1', x['revoked'])
             self.assertEqual(x['clock'], 314)  # incremented
         finally:

+ 3 - 3
celery/tests/worker/test_worker.py

@@ -7,11 +7,11 @@ from collections import deque
 from datetime import datetime, timedelta
 from threading import Event
 
+from amqp import ChannelError
 from billiard.exceptions import WorkerLostError
 from kombu import Connection
 from kombu.async import READ, ERR
 from kombu.common import QoS, ignore_errors
-from kombu.exceptions import StdChannelError
 from kombu.transport.base import Message
 from mock import call, Mock, patch
 
@@ -624,12 +624,12 @@ class test_Consumer(AppCase):
     def test_connect_errback(self, sleep, connect):
         l = MyKombuConsumer(self.buffer.put, timer=self.timer, app=self.app)
         from kombu.transport.memory import Transport
-        Transport.connection_errors = (StdChannelError, )
+        Transport.connection_errors = (ChannelError, )
 
         def effect():
             if connect.call_count > 1:
                 return
-            raise StdChannelError()
+            raise ChannelError()
         connect.side_effect = effect
         l.connect()
         connect.assert_called_with()

+ 6 - 3
celery/utils/mail.py

@@ -15,6 +15,8 @@ import warnings
 
 from email.mime.text import MIMEText
 
+from kombu.async import maybe_block
+
 from .functional import maybe_list
 
 try:
@@ -185,6 +187,7 @@ py-celery at {{hostname}}.
 
     def send(self, context, exc, fail_silently=True):
         if self.should_send(context, exc):
-            self.task.app.mail_admins(self.format_subject(context),
-                                      self.format_body(context),
-                                      fail_silently=fail_silently)
+            with maybe_block():
+                self.task.app.mail_admins(self.format_subject(context),
+                                          self.format_body(context),
+                                          fail_silently=fail_silently)

+ 2 - 0
celery/utils/serialization.py

@@ -151,6 +151,8 @@ def get_pickleable_etype(cls, loads=pickle.loads, dumps=pickle.dumps):
         loads(dumps(cls))
     except:
         return Exception
+    else:
+        return cls
 
 
 def get_pickled_exception(exc):

+ 28 - 25
celery/worker/consumer.py

@@ -544,31 +544,6 @@ class Agent(bootsteps.StartStopStep):
         return agent
 
 
-class Mingle(bootsteps.StartStopStep):
-    label = 'Mingle'
-    requires = (Connection, )
-
-    def __init__(self, c, enable_mingle=True, **kwargs):
-        self.enabled = enable_mingle
-
-    def start(self, c):
-        info('mingle: searching for neighbors')
-        I = c.app.control.inspect(timeout=1.0, connection=c.connection)
-        replies = I.hello()
-        if replies:
-            for reply in values(replies):
-                try:
-                    other_clock, other_revoked = MINGLE_GET_FIELDS(reply)
-                except KeyError:  # reply from pre-3.1 worker
-                    pass
-                else:
-                    c.app.clock.adjust(other_clock)
-                    revoked.update(other_revoked)
-            info('mingle: synced with %s', ', '.join(replies))
-        else:
-            info('mingle: no one here')
-
-
 class Gossip(bootsteps.ConsumerStep):
     label = 'Gossip'
     requires = (Events, )
@@ -719,6 +694,34 @@ class Gossip(bootsteps.ConsumerStep):
             self.clock.forward()
 
 
+class Mingle(bootsteps.StartStopStep):
+    label = 'Mingle'
+    requires = (Gossip, )
+
+    def __init__(self, c, enable_mingle=True, **kwargs):
+        self.enabled = enable_mingle
+
+    def start(self, c):
+        info('mingle: searching for neighbors')
+        I = c.app.control.inspect(timeout=1.0, connection=c.connection)
+        replies = I.hello(c.hostname, revoked._data)
+        replies.pop(c.hostname, None)
+        if replies:
+            info('mingle: hello %s! sync with me',
+                 ', '.join(reply for reply, value in items(replies) if value))
+            for reply in values(replies):
+                if reply:
+                    try:
+                        other_clock, other_revoked = MINGLE_GET_FIELDS(reply)
+                    except KeyError:  # reply from pre-3.1 worker
+                        pass
+                    else:
+                        c.app.clock.adjust(other_clock)
+                        revoked.update(other_revoked)
+        else:
+            info('mingle: all alone')
+
+
 class Evloop(bootsteps.StartStopStep):
     label = 'event loop'
     last = True

+ 7 - 3
celery/worker/control.py

@@ -246,9 +246,13 @@ def dump_revoked(state, **kwargs):
 
 
 @Panel.register
-def hello(state, **kwargs):
-    return {'revoked': worker_state.revoked._data,
-            'clock': state.app.clock.forward()}
+def hello(state, from_node, revoked=None, **kwargs):
+    if from_node != state.hostname:
+        logger.info('sync with %s', from_node)
+        if revoked:
+            worker_state.revoked.update(revoked)
+        return {'revoked': worker_state.revoked._data,
+                'clock': state.app.clock.forward()}
 
 
 @Panel.register

+ 3 - 1
celery/worker/pidbox.py

@@ -3,6 +3,7 @@ from __future__ import absolute_import
 import socket
 import threading
 
+from kombu.async import maybe_block
 from kombu.common import ignore_errors
 from kombu.utils.encoding import safe_str
 
@@ -34,7 +35,8 @@ class Pidbox(object):
         self._forward_clock()  # just increase clock as clients usually don't
                                # have a valid clock to adjust with.
         try:
-            self.node.handle_message(body, message)
+            with maybe_block():
+                self.node.handle_message(body, message)
         except KeyError as exc:
             error('No such control command: %s', exc)
         except Exception as exc:

+ 0 - 15
docs/conf.py

@@ -3,11 +3,6 @@
 import sys
 import os
 
-# eventlet/gevent should not monkey patch anything.
-os.environ["GEVENT_NOPATCH"] = "yes"
-os.environ["EVENTLET_NOPATCH"] = "yes"
-os.environ["CELERY_LOADER"] = "default"
-
 this = os.path.dirname(os.path.abspath(__file__))
 
 # If your extensions are in another directory, add it here. If the directory
@@ -17,16 +12,6 @@ sys.path.append(os.path.join(os.pardir, "tests"))
 sys.path.append(os.path.join(this, "_ext"))
 import celery
 
-
-# use app loader
-from celery import Celery
-app = Celery(set_as_current=True)
-app.conf.update(BROKER_URL="memory://",
-                CELERY_RESULT_BACKEND="cache",
-                CELERY_CACHE_BACKEND="memory",
-                CELERYD_HIJACK_ROOT_LOGGER=False,
-                CELERYD_LOG_COLOR=False)
-
 # General configuration
 # ---------------------
 

+ 105 - 94
docs/configuration.rst

@@ -24,16 +24,15 @@ It should contain all you need to run a basic Celery set-up.
 .. code-block:: python
 
     ## Broker settings.
-    BROKER_URL = "amqp://guest:guest@localhost:5672//"
+    BROKER_URL = 'amqp://guest:guest@localhost:5672//'
 
     # List of modules to import when celery starts.
-    CELERY_IMPORTS = ("myapp.tasks", )
+    CELERY_IMPORTS = ('myapp.tasks', )
 
     ## Using the database to store task state and results.
-    CELERY_RESULT_BACKEND = "database"
-    CELERY_RESULT_DBURI = "sqlite:///mydatabase.db"
+    CELERY_RESULT_BACKEND = 'db+sqlite:///results.db'
 
-    CELERY_ANNOTATIONS = {"tasks.add": {"rate_limit": "10/s"}}
+    CELERY_ANNOTATIONS = {'tasks.add': {'rate_limit': '10/s'}}
 
 
 Configuration Directives
@@ -98,13 +97,13 @@ task:
 
 .. code-block:: python
 
-    CELERY_ANNOTATIONS = {"tasks.add": {"rate_limit": "10/s"}}
+    CELERY_ANNOTATIONS = {'tasks.add': {'rate_limit': '10/s'}}
 
 or change the same for all tasks:
 
 .. code-block:: python
 
-    CELERY_ANNOTATIONS = {"*": {"rate_limit": "10/s"}}
+    CELERY_ANNOTATIONS = {'*': {'rate_limit': '10/s'}}
 
 
 You can change methods too, for example the ``on_failure`` handler:
@@ -112,9 +111,9 @@ You can change methods too, for example the ``on_failure`` handler:
 .. code-block:: python
 
     def my_on_failure(self, exc, task_id, args, kwargs, einfo):
-        print("Oh no! Task failed: {0!r}".format(exc))
+        print('Oh no! Task failed: {0!r}'.format(exc))
 
-    CELERY_ANNOTATIONS = {"*": {"on_failure": my_on_failure}}
+    CELERY_ANNOTATIONS = {'*': {'on_failure': my_on_failure}}
 
 
 If you need more flexibility then you can use objects
@@ -125,8 +124,8 @@ instead of a dict to choose which tasks to annotate:
     class MyAnnotate(object):
 
         def annotate(self, task):
-            if task.name.startswith("tasks."):
-                return {"rate_limit": "10/s"}
+            if task.name.startswith('tasks.'):
+                return {'rate_limit': '10/s'}
 
     CELERY_ANNOTATIONS = (MyAnnotate(), {...})
 
@@ -235,7 +234,7 @@ Can be one of the following:
 CELERY_RESULT_SERIALIZER
 ~~~~~~~~~~~~~~~~~~~~~~~~
 
-Result serialization format.  Default is `"pickle"`. See
+Result serialization format.  Default is ``pickle``. See
 :ref:`calling-serializers` for information about supported
 serialization formats.
 
@@ -244,31 +243,36 @@ serialization formats.
 Database backend settings
 -------------------------
 
-.. setting:: CELERY_RESULT_DBURI
-
-CELERY_RESULT_DBURI
-~~~~~~~~~~~~~~~~~~~
+Database URL Examples
+~~~~~~~~~~~~~~~~~~~~~
 
-Please see `Supported Databases`_ for a table of supported databases.
-To use this backend you need to configure it with an
-`Connection String`_, some examples include:
+To use the database backend you have to configure the
+:setting:`CELERY_RESULT_BACKEND` setting with a connection URL and the ``db+``
+prefix:
 
 .. code-block:: python
 
+    CELERY_RESULT_BACKEND = 'db+scheme://user:password@host:port/dbname'
+
+Examples:
+
     # sqlite (filename)
-    CELERY_RESULT_DBURI = "sqlite:///celerydb.sqlite"
+    CELERY_RESULT_BACKEND = 'db+sqlite:///results.sqlite'
 
     # mysql
-    CELERY_RESULT_DBURI = "mysql://scott:tiger@localhost/foo"
+    CELERY_RESULT_BACKEND = 'db+mysql://scott:tiger@localhost/foo'
 
     # postgresql
-    CELERY_RESULT_DBURI = "postgresql://scott:tiger@localhost/mydatabase"
+    CELERY_RESULT_BACKEND = 'db+postgresql://scott:tiger@localhost/mydatabase'
 
     # oracle
-    CELERY_RESULT_DBURI = "oracle://scott:tiger@127.0.0.1:1521/sidname"
+    CELERY_RESULT_BACKEND = 'db+oracle://scott:tiger@127.0.0.1:1521/sidname'
+
+.. code-block:: python
 
-See `Connection String`_ for more information about connection
-strings.
+Please see `Supported Databases`_ for a table of supported databases,
+and `Connection String`_ for more information about connection
+strings (which is the part of the URI that comes after the ``db+`` prefix).
 
 .. _`Supported Databases`:
     http://www.sqlalchemy.org/docs/core/engines.html#supported-databases
@@ -276,6 +280,14 @@ strings.
 .. _`Connection String`:
     http://www.sqlalchemy.org/docs/core/engines.html#database-urls
 
+.. setting:: CELERY_RESULT_DBURI
+
+CELERY_RESULT_DBURI
+~~~~~~~~~~~~~~~~~~~
+
+This setting is no longer used as it's now possible to specify
+the database URL directly in the :setting:`CELERY_RESULT_BACKEND` setting.
+
 .. setting:: CELERY_RESULT_ENGINE_OPTIONS
 
 CELERY_RESULT_ENGINE_OPTIONS
@@ -285,7 +297,7 @@ To specify additional SQLAlchemy database engine options you can use
 the :setting:`CELERY_RESULT_ENGINE_OPTIONS` setting::
 
     # echo enables verbose logging from SQLAlchemy.
-    CELERY_RESULT_ENGINE_OPTIONS = {"echo": True}
+    CELERY_RESULT_ENGINE_OPTIONS = {'echo': True}
 
 
 .. setting:: CELERY_RESULT_DB_SHORT_LIVED_SESSIONS
@@ -315,16 +327,6 @@ you to customize the table names:
         'group': 'myapp_groupmeta',
     }
 
-Example configuration
-~~~~~~~~~~~~~~~~~~~~~
-
-.. code-block:: python
-
-    CELERY_RESULT_BACKEND = "database"
-    CELERY_RESULT_DBURI = "mysql://user:password@host/dbname"
-    CELERY_RESULT_DB_TASK_TABLENAME = "myapp_taskmeta"
-    CELERY_RESULT_DB_TASKSET_TABLENAME = "myapp_tasksetmeta"
-
 .. _conf-amqp-result-backend:
 
 AMQP backend settings
@@ -343,7 +345,7 @@ AMQP backend settings
 CELERY_RESULT_EXCHANGE
 ~~~~~~~~~~~~~~~~~~~~~~
 
-Name of the exchange to publish results in.  Default is `"celeryresults"`.
+Name of the exchange to publish results in.  Default is `celeryresults`.
 
 .. setting:: CELERY_RESULT_EXCHANGE_TYPE
 
@@ -367,7 +369,7 @@ Example configuration
 
 .. code-block:: python
 
-    CELERY_RESULT_BACKEND = "amqp"
+    CELERY_RESULT_BACKEND = 'amqp'
     CELERY_TASK_RESULT_EXPIRES = 18000  # 5 hours.
 
 .. _conf-cache-result-backend:
@@ -380,30 +382,25 @@ Cache backend settings
     The cache backend supports the `pylibmc`_ and `python-memcached`
     libraries.  The latter is used only if `pylibmc`_ is not installed.
 
-.. setting:: CELERY_CACHE_BACKEND
-
-CELERY_CACHE_BACKEND
-~~~~~~~~~~~~~~~~~~~~
-
 Using a single memcached server:
 
 .. code-block:: python
 
-    CELERY_CACHE_BACKEND = 'memcached://127.0.0.1:11211/'
+    CELERY_RESULT_BACKEND = 'cache+memcached://127.0.0.1:11211/'
 
 Using multiple memcached servers:
 
 .. code-block:: python
 
-    CELERY_RESULT_BACKEND = "cache"
-    CELERY_CACHE_BACKEND = 'memcached://172.19.26.240:11211;172.19.26.242:11211/'
+    CELERY_RESULT_BACKEND = """
+        cache+memcached://172.19.26.240:11211;172.19.26.242:11211/
+    """.strip()
 
 .. setting:: CELERY_CACHE_BACKEND_OPTIONS
 
-
 The "memory" backend stores the cache in memory only:
 
-    CELERY_CACHE_BACKEND = "memory"
+    CELERY_CACHE_BACKEND = 'memory'
 
 CELERY_CACHE_BACKEND_OPTIONS
 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~
@@ -413,11 +410,19 @@ setting:
 
 .. code-block:: python
 
-    CELERY_CACHE_BACKEND_OPTIONS = {"binary": True,
-                                    "behaviors": {"tcp_nodelay": True}}
+    CELERY_CACHE_BACKEND_OPTIONS = {'binary': True,
+                                    'behaviors': {'tcp_nodelay': True}}
 
 .. _`pylibmc`: http://sendapatch.se/projects/pylibmc/
 
+.. setting:: CELERY_CACHE_BACKEND
+
+CELERY_CACHE_BACKEND
+~~~~~~~~~~~~~~~~~~~~
+
+This setting is no longer used as it's now possible to specify
+the cache backend directly in the :setting:`CELERY_RESULT_BACKEND` setting.
+
 .. _conf-redis-result-backend:
 
 Redis backend settings
@@ -440,21 +445,21 @@ Configuring the backend URL
 This backend requires the :setting:`CELERY_RESULT_BACKEND`
 setting to be set to a Redis URL::
 
-    CELERY_RESULT_BACKEND = "redis://:password@host:port/db"
+    CELERY_RESULT_BACKEND = 'redis://:password@host:port/db'
 
 For example::
 
-    CELERY_RESULT_BACKEND = "redis://localhost/0"
+    CELERY_RESULT_BACKEND = 'redis://localhost/0'
 
 which is the same as::
 
-    CELERY_RESULT_BACKEND = "redis://"
+    CELERY_RESULT_BACKEND = 'redis://'
 
 The fields of the URL is defined as folows:
 
 - *host*
 
-Host name or IP address of the Redis server. e.g. `"localhost"`.
+Host name or IP address of the Redis server. e.g. `localhost`.
 
 - *port*
 
@@ -495,7 +500,7 @@ CELERY_MONGODB_BACKEND_SETTINGS
 This is a dict supporting the following keys:
 
 * host
-    Host name of the MongoDB server. Defaults to "localhost".
+    Host name of the MongoDB server. Defaults to ``localhost``.
 
 * port
     The port the MongoDB server is listening to. Defaults to 27017.
@@ -507,11 +512,11 @@ This is a dict supporting the following keys:
     Password to authenticate to the MongoDB server (optional).
 
 * database
-    The database name to connect to. Defaults to "celery".
+    The database name to connect to. Defaults to ``celery``.
 
 * taskmeta_collection
     The collection name to store task meta data.
-    Defaults to "celery_taskmeta".
+    Defaults to ``celery_taskmeta``.
 
 * max_pool_size
     Passed as max_pool_size to PyMongo's Connection or MongoClient
@@ -533,12 +538,12 @@ Example configuration
 
 .. code-block:: python
 
-    CELERY_RESULT_BACKEND = "mongodb"
+    CELERY_RESULT_BACKEND = 'mongodb'
     CELERY_MONGODB_BACKEND_SETTINGS = {
-        "host": "192.168.1.100",
-        "port": 30000,
-        "database": "mydb",
-        "taskmeta_collection": "my_taskmeta_collection",
+        'host': '192.168.1.100',
+        'port': 30000,
+        'database': 'mydb',
+        'taskmeta_collection': 'my_taskmeta_collection',
     }
 
 .. _conf-cassandra-result-backend:
@@ -564,35 +569,41 @@ This backend requires the following configuration directives to be set.
 CASSANDRA_SERVERS
 ~~~~~~~~~~~~~~~~~
 
-List of ``host:port`` Cassandra servers. e.g. ``["localhost:9160]"``.
+List of ``host:port`` Cassandra servers. e.g.::
+
+    CASSANDRA_SERVERS = ['localhost:9160']
 
 .. setting:: CASSANDRA_KEYSPACE
 
 CASSANDRA_KEYSPACE
 ~~~~~~~~~~~~~~~~~~
 
-The keyspace in which to store the results. e.g. ``"tasks_keyspace"``.
+The keyspace in which to store the results. e.g.::
+
+    CASSANDRA_KEYSPACE = 'tasks_keyspace'
 
 .. setting:: CASSANDRA_COLUMN_FAMILY
 
 CASSANDRA_COLUMN_FAMILY
 ~~~~~~~~~~~~~~~~~~~~~~~
 
-The column family in which to store the results. eg ``"tasks"``
+The column family in which to store the results. e.g.::
+
+    CASSANDRA_COLUMN_FAMILY = 'tasks'
 
 .. setting:: CASSANDRA_READ_CONSISTENCY
 
 CASSANDRA_READ_CONSISTENCY
 ~~~~~~~~~~~~~~~~~~~~~~~~~~
 
-The read consistency used. Values can be ``"ONE"``, ``"QUORUM"`` or ``"ALL"``.
+The read consistency used. Values can be ``ONE``, ``QUORUM`` or ``ALL``.
 
 .. setting:: CASSANDRA_WRITE_CONSISTENCY
 
 CASSANDRA_WRITE_CONSISTENCY
 ~~~~~~~~~~~~~~~~~~~~~~~~~~~
 
-The write consistency used. Values can be ``"ONE"``, ``"QUORUM"`` or ``"ALL"``.
+The write consistency used. Values can be ``ONE``, ``QUORUM`` or ``ALL``.
 
 .. setting:: CASSANDRA_DETAILED_MODE
 
@@ -620,11 +631,11 @@ Example configuration
 
 .. code-block:: python
 
-    CASSANDRA_SERVERS = ["localhost:9160"]
-    CASSANDRA_KEYSPACE = "celery"
-    CASSANDRA_COLUMN_FAMILY = "task_results"
-    CASSANDRA_READ_CONSISTENCY = "ONE"
-    CASSANDRA_WRITE_CONSISTENCY = "ONE"
+    CASSANDRA_SERVERS = ['localhost:9160']
+    CASSANDRA_KEYSPACE = 'celery'
+    CASSANDRA_COLUMN_FAMILY = 'task_results'
+    CASSANDRA_READ_CONSISTENCY = 'ONE'
+    CASSANDRA_WRITE_CONSISTENCY = 'ONE'
     CASSANDRA_DETAILED_MODE = True
     CASSANDRA_OPTIONS = {
         'timeout': 300,
@@ -678,7 +689,7 @@ Couchbase backend settings
 This backend can be configured via the :setting:`CELERY_RESULT_BACKEND`
 set to a couchbase URL::
 
-    CELERY_RESULT_BACKEND = "couchbase://username:password@host:port/bucket"
+    CELERY_RESULT_BACKEND = 'couchbase://username:password@host:port/bucket'
 
 
 .. setting:: CELERY_COUCHBASE_BACKEND_SETTINGS
@@ -689,13 +700,14 @@ CELERY_COUCHBASE_BACKEND_SETTINGS
 This is a dict supporting the following keys:
 
 * host
-    Host name of the Couchbase server. Defaults to "localhost".
+    Host name of the Couchbase server. Defaults to ``localhost``.
 
 * port
-    The port the Couchbase server is listening to. Defaults to 8091.
+    The port the Couchbase server is listening to. Defaults to ``8091``.
 
 * bucket
-    The default bucket the Couchbase server is writing to. Defaults to "default".
+    The default bucket the Couchbase server is writing to.
+    Defaults to ``default``.
 
 * username
     User name to authenticate to the Couchbase server as (optional).
@@ -719,8 +731,8 @@ CELERY_QUEUES
 The mapping of queues the worker consumes from.  This is a dictionary
 of queue name/options.  See :ref:`guide-routing` for more information.
 
-The default is a queue/exchange/binding key of `"celery"`, with
-exchange type `direct`.
+The default is a queue/exchange/binding key of ``celery``, with
+exchange type ``direct``.
 
 You don't have to care about this unless you want custom routing facilities.
 
@@ -1066,7 +1078,7 @@ CELERY_MESSAGE_COMPRESSION
 ~~~~~~~~~~~~~~~~~~~~~~~~~~
 
 Default compression used for task messages.
-Can be ``"gzip"``, ``"bzip2"`` (if available), or any custom
+Can be ``gzip``, ``bzip2`` (if available), or any custom
 compression schemes registered in the Kombu compression registry.
 
 The default is to send uncompressed messages.
@@ -1383,7 +1395,7 @@ Default is celery@localhost.
 EMAIL_HOST
 ~~~~~~~~~~
 
-The mail server to use.  Default is `"localhost"`.
+The mail server to use.  Default is ``localhost``.
 
 .. setting:: EMAIL_HOST_USER
 
@@ -1446,18 +1458,18 @@ george@vandelay.com and kramer@vandelay.com:
 
     # Name and email addresses of recipients
     ADMINS = (
-        ("George Costanza", "george@vandelay.com"),
-        ("Cosmo Kramer", "kosmo@vandelay.com"),
+        ('George Costanza', 'george@vandelay.com'),
+        ('Cosmo Kramer', 'kosmo@vandelay.com'),
     )
 
     # Email address used as sender (From field).
-    SERVER_EMAIL = "no-reply@vandelay.com"
+    SERVER_EMAIL = 'no-reply@vandelay.com'
 
     # Mailserver configuration
-    EMAIL_HOST = "mail.vandelay.com"
+    EMAIL_HOST = 'mail.vandelay.com'
     EMAIL_PORT = 25
-    # EMAIL_HOST_USER = "servers"
-    # EMAIL_HOST_PASSWORD = "s3cr3t"
+    # EMAIL_HOST_USER = 'servers'
+    # EMAIL_HOST_PASSWORD = 's3cr3t'
 
 .. _conf-events:
 
@@ -1515,7 +1527,7 @@ CELERY_EVENT_SERIALIZER
 ~~~~~~~~~~~~~~~~~~~~~~~
 
 Message serialization format used when sending event messages.
-Default is `"json"`. See :ref:`calling-serializers`.
+Default is ``json``. See :ref:`calling-serializers`.
 
 .. _conf-broadcast:
 
@@ -1531,7 +1543,7 @@ Name prefix for the queue used when listening for broadcast messages.
 The workers host name will be appended to the prefix to create the final
 queue name.
 
-Default is `"celeryctl"`.
+Default is ``celeryctl``.
 
 .. setting:: CELERY_BROADCAST_EXCHANGE
 
@@ -1540,14 +1552,14 @@ CELERY_BROADCAST_EXCHANGE
 
 Name of the exchange used for broadcast messages.
 
-Default is `"celeryctl"`.
+Default is ``celeryctl``.
 
 .. setting:: CELERY_BROADCAST_EXCHANGE_TYPE
 
 CELERY_BROADCAST_EXCHANGE_TYPE
 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 
-Exchange type used for broadcast messages.  Default is `"fanout"`.
+Exchange type used for broadcast messages.  Default is ``fanout``.
 
 .. _conf-logging:
 
@@ -1722,7 +1734,7 @@ CELERYD_AUTOSCALER
 
 Name of the autoscaler class to use.
 
-Default is ``"celery.worker.autoscale.Autoscaler"``.
+Default is ``celery.worker.autoscale:Autoscaler``.
 
 .. setting:: CELERYD_AUTORELOADER
 
@@ -1732,7 +1744,7 @@ CELERYD_AUTORELOADER
 Name of the autoreloader class used by the worker to reload
 Python modules and files that have changed.
 
-Default is: ``"celery.worker.autoreload.Autoreloader"``.
+Default is: ``celery.worker.autoreload:Autoreloader``.
 
 .. setting:: CELERYD_CONSUMER
 
@@ -1769,8 +1781,7 @@ See :ref:`beat-entries`.
 CELERYBEAT_SCHEDULER
 ~~~~~~~~~~~~~~~~~~~~
 
-The default scheduler class.  Default is
-`"celery.beat.PersistentScheduler"`.
+The default scheduler class.  Default is ``celery.beat:PersistentScheduler``.
 
 Can also be set via the :option:`-S` argument to
 :mod:`~celery.bin.beat`.