فهرست منبع

Use str.format and print_function

Ask Solem 13 سال پیش
والد
کامیت
b194fc3935
59فایلهای تغییر یافته به همراه292 افزوده شده و 513 حذف شده
  1. 2 2
      celery/app/base.py
  2. 1 0
      celery/app/control.py
  3. 3 2
      celery/app/defaults.py
  4. 2 1
      celery/app/log.py
  5. 5 6
      celery/app/task.py
  6. 34 33
      celery/apps/worker.py
  7. 2 2
      celery/backends/__init__.py
  8. 1 1
      celery/beat.py
  9. 10 9
      celery/bin/base.py
  10. 6 7
      celery/bin/camqadm.py
  11. 3 6
      celery/bin/celery.py
  12. 1 1
      celery/bin/celerybeat.py
  13. 1 1
      celery/bin/celeryd.py
  14. 2 2
      celery/bin/celeryd_multi.py
  15. 1 2
      celery/canvas.py
  16. 2 2
      celery/contrib/rdb.py
  17. 7 5
      celery/datastructures.py
  18. 5 5
      celery/events/cursesmon.py
  19. 9 12
      celery/events/dumper.py
  20. 2 3
      celery/events/snapshot.py
  21. 10 7
      celery/events/state.py
  22. 4 3
      celery/loaders/base.py
  23. 9 9
      celery/loaders/default.py
  24. 15 15
      celery/platforms.py
  25. 17 13
      celery/schedules.py
  26. 5 5
      celery/task/http.py
  27. 4 3
      celery/task/sets.py
  28. 1 1
      celery/task/trace.py
  29. 5 5
      celery/tests/app/test_beat.py
  30. 2 2
      celery/tests/app/test_loaders.py
  31. 2 2
      celery/tests/backends/test_amqp.py
  32. 0 2
      celery/tests/backends/test_database.py
  33. 0 1
      celery/tests/bin/test_base.py
  34. 0 2
      celery/tests/bin/test_celery.py
  35. 0 85
      celery/tests/compat.py
  36. 3 2
      celery/tests/compat_modules/test_decorators.py
  37. 3 3
      celery/tests/events/test_events.py
  38. 8 8
      celery/tests/events/test_state.py
  39. 1 1
      celery/tests/security/test_certificate.py
  40. 5 5
      celery/tests/tasks/test_result.py
  41. 0 5
      celery/tests/utilities/test_local.py
  42. 1 16
      celery/tests/utilities/test_mail.py
  43. 1 3
      celery/tests/utils.py
  44. 6 7
      celery/tests/worker/test_hub.py
  45. 21 20
      celery/utils/__init__.py
  46. 0 105
      celery/utils/compat.py
  47. 1 1
      celery/utils/functional.py
  48. 5 4
      celery/utils/log.py
  49. 14 27
      celery/utils/mail.py
  50. 1 3
      celery/utils/serialization.py
  51. 2 2
      celery/utils/threads.py
  52. 4 4
      celery/utils/timer2.py
  53. 6 6
      celery/utils/timeutils.py
  54. 2 2
      celery/worker/buckets.py
  55. 5 5
      celery/worker/consumer.py
  56. 9 10
      celery/worker/control.py
  57. 1 1
      celery/worker/hub.py
  58. 15 12
      celery/worker/job.py
  59. 5 4
      celery/worker/state.py

+ 2 - 2
celery/app/base.py

@@ -338,8 +338,8 @@ class Celery(object):
         return reduce(getattr, [self] + path.split('.'))
         return reduce(getattr, [self] + path.split('.'))
 
 
     def __repr__(self):
     def __repr__(self):
-        return '<%s %s:0x%x>' % (self.__class__.__name__,
-                                 self.main or '__main__', id(self), )
+        return '<{0} {1}:0x{2:x}>'.format(
+            type(self).__name__, self.main or '__main__', id(self))
 
 
     def __reduce__(self):
     def __reduce__(self):
         # Reduce only pickles the configuration changes,
         # Reduce only pickles the configuration changes,

+ 1 - 0
celery/app/control.py

@@ -83,6 +83,7 @@ class Inspect(object):
     def conf(self):
     def conf(self):
         return self._request('dump_conf')
         return self._request('dump_conf')
 
 
+
 class Control(object):
 class Control(object):
     Mailbox = Mailbox
     Mailbox = Mailbox
 
 

+ 3 - 2
celery/app/defaults.py

@@ -58,7 +58,8 @@ class Option(object):
         return self.typemap[self.type](value)
         return self.typemap[self.type](value)
 
 
     def __repr__(self):
     def __repr__(self):
-        return '<Option: type->%s default->%r>' % (self.type, self.default)
+        return '<Option: type->{0} default->{1!r}>'.format(self.type,
+                                                           self.default)
 
 
 
 
 NAMESPACES = {
 NAMESPACES = {
@@ -230,7 +231,7 @@ def find_deprecated_settings(source):
     from celery.utils import warn_deprecated
     from celery.utils import warn_deprecated
     for name, opt in flatten(NAMESPACES):
     for name, opt in flatten(NAMESPACES):
         if (opt.deprecate_by or opt.remove_by) and getattr(source, name, None):
         if (opt.deprecate_by or opt.remove_by) and getattr(source, name, None):
-            warn_deprecated(description='The %r setting' % (name, ),
+            warn_deprecated(description='The {0!r} setting'.format(name),
                             deprecation=opt.deprecate_by,
                             deprecation=opt.deprecate_by,
                             removal=opt.remove_by,
                             removal=opt.remove_by,
                             alternative=opt.alt)
                             alternative=opt.alt)

+ 2 - 1
celery/app/log.py

@@ -16,12 +16,13 @@ import logging
 import os
 import os
 import sys
 import sys
 
 
+from logging.handlers import WatchedFileHandler
+
 from kombu.log import NullHandler
 from kombu.log import NullHandler
 
 
 from celery import signals
 from celery import signals
 from celery._state import get_current_task
 from celery._state import get_current_task
 from celery.utils import isatty
 from celery.utils import isatty
-from celery.utils.compat import WatchedFileHandler
 from celery.utils.log import (
 from celery.utils.log import (
     get_logger, mlevel,
     get_logger, mlevel,
     ColorFormatter, ensure_process_aware_logger,
     ColorFormatter, ensure_process_aware_logger,

+ 5 - 6
celery/app/task.py

@@ -67,7 +67,7 @@ class Context(object):
             return default
             return default
 
 
     def __repr__(self):
     def __repr__(self):
-        return '<Context: %r>' % (vars(self, ))
+        return '<Context: {0!r}>'.format(vars(self))
 
 
     @property
     @property
     def children(self):
     def children(self):
@@ -120,9 +120,8 @@ class TaskType(type):
         return instance.__class__
         return instance.__class__
 
 
     def __repr__(cls):
     def __repr__(cls):
-        if cls._app:
-            return '<class %s of %s>' % (cls.__name__, cls._app, )
-        return '<unbound %s>' % (cls.__name__, )
+        return ('<class {0.__name__} of {0._app}>' if cls._app
+           else '<unbound {0.__name__}>').format(cls)
 
 
 
 
 class Task(object):
 class Task(object):
@@ -549,7 +548,7 @@ class Task(object):
             if exc:
             if exc:
                 maybe_reraise()
                 maybe_reraise()
             raise self.MaxRetriesExceededError(
             raise self.MaxRetriesExceededError(
-                    """Can't retry %s[%s] args:%s kwargs:%s""" % (
+                    "Can't retry {0}[{1}] args:{2} kwargs:{3}".format(
                         self.name, options['task_id'], args, kwargs))
                         self.name, options['task_id'], args, kwargs))
 
 
         # If task was executed eagerly using apply(),
         # If task was executed eagerly using apply(),
@@ -767,7 +766,7 @@ class Task(object):
 
 
     def __repr__(self):
     def __repr__(self):
         """`repr(task)`"""
         """`repr(task)`"""
-        return '<@task: %s>' % (self.name, )
+        return '<@task: {0.name}>'.format(self)
 
 
     @property
     @property
     def request(self):
     def request(self):

+ 34 - 33
celery/apps/worker.py

@@ -10,7 +10,7 @@
     platform tweaks, and so on.
     platform tweaks, and so on.
 
 
 """
 """
-from __future__ import absolute_import
+from __future__ import absolute_import, print_function
 
 
 import logging
 import logging
 import os
 import os
@@ -49,7 +49,7 @@ def active_thread_count():
 
 
 
 
 def safe_say(msg):
 def safe_say(msg):
-    sys.__stderr__.write('\n%s\n' % msg)
+    print('\n{0}'.format(msg), file=sys.__stderr__)
 
 
 ARTLINES = [
 ARTLINES = [
     ' --------------',
     ' --------------',
@@ -67,25 +67,25 @@ ARTLINES = [
 ]
 ]
 
 
 BANNER = """\
 BANNER = """\
-celery@%(hostname)s v%(version)s
+celery@{hostname} v{version}
 
 
 [Configuration]
 [Configuration]
-. broker:      %(conninfo)s
-. app:         %(app)s
-. concurrency: %(concurrency)s
-. events:      %(events)s
+. broker:      {conninfo}
+. app:         {app}
+. concurrency: {concurrency}
+. events:      {events}
 
 
 [Queues]
 [Queues]
-%(queues)s
+{queues}
 """
 """
 
 
 EXTRA_INFO_FMT = """
 EXTRA_INFO_FMT = """
 [Tasks]
 [Tasks]
-%(tasks)s
+{tasks}
 """
 """
 
 
 UNKNOWN_QUEUE = """\
 UNKNOWN_QUEUE = """\
-Trying to select queue subset of %r, but queue %s is not
+Trying to select queue subset of {0!r}, but queue {1} is not
 defined in the CELERY_QUEUES setting.
 defined in the CELERY_QUEUES setting.
 
 
 If you want to automatically declare unknown queues you can
 If you want to automatically declare unknown queues you can
@@ -173,13 +173,14 @@ class Worker(configurated):
 
 
     def on_consumer_ready(self, consumer):
     def on_consumer_ready(self, consumer):
         signals.worker_ready.send(sender=consumer)
         signals.worker_ready.send(sender=consumer)
-        print('celery@%s has started.' % self.hostname)
+        print('celery@{0.hostname} has started.'.format(self))
 
 
     def init_queues(self):
     def init_queues(self):
         try:
         try:
             self.app.select_queues(self.use_queues)
             self.app.select_queues(self.use_queues)
         except KeyError as exc:
         except KeyError as exc:
-            raise ImproperlyConfigured(UNKNOWN_QUEUE % (self.use_queues, exc))
+            raise ImproperlyConfigured(
+                    UNKNOWN_QUEUE.format(self.use_queues, exc))
         if self.app.conf.CELERY_WORKER_DIRECT:
         if self.app.conf.CELERY_WORKER_DIRECT:
             self.app.amqp.queues.select_add(worker_direct(self.hostname))
             self.app.amqp.queues.select_add(worker_direct(self.hostname))
 
 
@@ -189,50 +190,50 @@ class Worker(configurated):
 
 
     def purge_messages(self):
     def purge_messages(self):
         count = self.app.control.purge()
         count = self.app.control.purge()
-        print('purge: Erased %d %s from the queue.\n' % (
+        print('purge: Erased {0} {1} from the queue.\n'.format(
                 count, pluralize(count, 'message')))
                 count, pluralize(count, 'message')))
 
 
     def tasklist(self, include_builtins=True):
     def tasklist(self, include_builtins=True):
         tasks = self.app.tasks.keys()
         tasks = self.app.tasks.keys()
         if not include_builtins:
         if not include_builtins:
             tasks = filter(lambda s: not s.startswith('celery.'), tasks)
             tasks = filter(lambda s: not s.startswith('celery.'), tasks)
-        return '\n'.join('  . %s' % task for task in sorted(tasks))
+        return '\n'.join('  . {0}'.format(task) for task in sorted(tasks))
 
 
     def extra_info(self):
     def extra_info(self):
         if self.loglevel <= logging.INFO:
         if self.loglevel <= logging.INFO:
             include_builtins = self.loglevel <= logging.DEBUG
             include_builtins = self.loglevel <= logging.DEBUG
             tasklist = self.tasklist(include_builtins=include_builtins)
             tasklist = self.tasklist(include_builtins=include_builtins)
-            return EXTRA_INFO_FMT % {'tasks': tasklist}
+            return EXTRA_INFO_FMT.format(tasks=tasklist)
 
 
     def startup_info(self):
     def startup_info(self):
         app = self.app
         app = self.app
         concurrency = unicode(self.concurrency)
         concurrency = unicode(self.concurrency)
-        appr = '%s:0x%x' % (app.main or '__main__', id(app))
+        appr = '{0}:0x{1:x}'.format(app.main or '__main__', id(app))
         if not isinstance(app.loader, AppLoader):
         if not isinstance(app.loader, AppLoader):
             loader = qualname(app.loader)
             loader = qualname(app.loader)
             if loader.startswith('celery.loaders'):
             if loader.startswith('celery.loaders'):
                 loader = loader[14:]
                 loader = loader[14:]
-            appr += ' (%s)' % loader
+            appr += ' ({0})'.format(loader)
         if self.autoscale:
         if self.autoscale:
             max, min = self.autoscale
             max, min = self.autoscale
-            concurrency = '{min=%s, max=%s}' % (min, max)
+            concurrency = '{{min={0}, max={1}}}'.format(min, max)
         pool = self.pool_cls
         pool = self.pool_cls
         if not isinstance(pool, basestring):
         if not isinstance(pool, basestring):
             pool = pool.__module__
             pool = pool.__module__
-        concurrency += ' (%s)' % pool.split('.')[-1]
+        concurrency += ' ({0})'.format(pool.split('.')[-1])
         events = 'ON'
         events = 'ON'
         if not self.send_events:
         if not self.send_events:
             events = 'OFF (enable -E to monitor this worker)'
             events = 'OFF (enable -E to monitor this worker)'
 
 
-        banner = (BANNER % {
-            'app': appr,
-            'hostname': self.hostname,
-            'version': VERSION_BANNER,
-            'conninfo': self.app.connection().as_uri(),
-            'concurrency': concurrency,
-            'events': events,
-            'queues': app.amqp.queues.format(indent=0, indent_first=False),
-        }).splitlines()
+        banner = BANNER.format(
+            app=appr,
+            hostname=self.hostname,
+            version=VERSION_BANNER,
+            conninfo=self.app.connection().as_uri(),
+            concurrency=concurrency,
+            events=events,
+            queues=app.amqp.queues.format(indent=0, indent_first=False),
+        ).splitlines()
 
 
         # integrate the ASCII art.
         # integrate the ASCII art.
         for i, x in enumerate(banner):
         for i, x in enumerate(banner):
@@ -282,7 +283,7 @@ class Worker(configurated):
 
 
     def set_process_status(self, info):
     def set_process_status(self, info):
         return platforms.set_mp_process_title('celeryd',
         return platforms.set_mp_process_title('celeryd',
-                info='%s (%s)' % (info, platforms.strargv(sys.argv)),
+                info='{0} ({1})'.format(info, platforms.strargv(sys.argv)),
                 hostname=self.hostname)
                 hostname=self.hostname)
 
 
 
 
@@ -296,7 +297,7 @@ def _shutdown_handler(worker, sig='TERM', how='Warm', exc=SystemExit,
             if current_process()._name == 'MainProcess':
             if current_process()._name == 'MainProcess':
                 if callback:
                 if callback:
                     callback(worker)
                     callback(worker)
-                safe_say('celeryd: %s shutdown (MainProcess)' % how)
+                safe_say('celeryd: {0} shutdown (MainProcess)'.format(how))
             if active_thread_count() > 1:
             if active_thread_count() > 1:
                 setattr(state, {'Warm': 'should_stop',
                 setattr(state, {'Warm': 'should_stop',
                                 'Cold': 'should_terminate'}[how], True)
                                 'Cold': 'should_terminate'}[how], True)
@@ -327,7 +328,7 @@ def install_worker_restart_handler(worker, sig='SIGHUP'):
     def restart_worker_sig_handler(signum, frame):
     def restart_worker_sig_handler(signum, frame):
         """Signal handler restarting the current python program."""
         """Signal handler restarting the current python program."""
         set_in_sighandler(True)
         set_in_sighandler(True)
-        safe_say('Restarting celeryd (%s)' % (' '.join(sys.argv), ))
+        safe_say('Restarting celeryd ({0})'.format(' '.join(sys.argv)))
         pid = os.fork()
         pid = os.fork()
         if pid == 0:
         if pid == 0:
             os.execv(sys.executable, [sys.executable] + sys.argv)
             os.execv(sys.executable, [sys.executable] + sys.argv)
@@ -373,8 +374,8 @@ def install_HUP_not_supported_handler(worker, sig='SIGHUP'):
     def warn_on_HUP_handler(signum, frame):
     def warn_on_HUP_handler(signum, frame):
         set_in_sighandler(True)
         set_in_sighandler(True)
         try:
         try:
-            safe_say('%(sig)s not supported: Restarting with %(sig)s is '
-                     'unstable on this platform!' % {'sig': sig})
+            safe_say('{sig} not supported: Restarting with {sig} is '
+                     'unstable on this platform!'.format(sig=sig))
         finally:
         finally:
             set_in_sighandler(False)
             set_in_sighandler(False)
     platforms.signals[sig] = warn_on_HUP_handler
     platforms.signals[sig] = warn_on_HUP_handler

+ 2 - 2
celery/backends/__init__.py

@@ -18,7 +18,7 @@ from celery.utils.imports import symbol_by_name
 from celery.utils.functional import memoize
 from celery.utils.functional import memoize
 
 
 UNKNOWN_BACKEND = """\
 UNKNOWN_BACKEND = """\
-Unknown result backend: %r.  Did you spell that correctly? (%r)\
+Unknown result backend: {0!r}.  Did you spell that correctly? ({1!r})\
 """
 """
 
 
 BACKEND_ALIASES = {
 BACKEND_ALIASES = {
@@ -44,7 +44,7 @@ def get_backend_cls(backend=None, loader=None):
     try:
     try:
         return symbol_by_name(backend, aliases)
         return symbol_by_name(backend, aliases)
     except ValueError as exc:
     except ValueError as exc:
-        raise ValueError, ValueError(UNKNOWN_BACKEND % (
+        raise ValueError, ValueError(UNKNOWN_BACKEND.format(
                     backend, exc)), sys.exc_info()[2]
                     backend, exc)), sys.exc_info()[2]
 
 
 
 

+ 1 - 1
celery/beat.py

@@ -204,7 +204,7 @@ class Scheduler(object):
                 (time.time() - self._last_sync) > self.sync_every)
                 (time.time() - self._last_sync) > self.sync_every)
 
 
     def reserve(self, entry):
     def reserve(self, entry):
-        new_entry = self.schedule[entry.name] = entry.next()
+        new_entry = self.schedule[entry.name] = next(entry)
         return new_entry
         return new_entry
 
 
     def apply_async(self, entry, publisher=None, **kwargs):
     def apply_async(self, entry, publisher=None, **kwargs):

+ 10 - 9
celery/bin/base.py

@@ -56,7 +56,7 @@ Daemon Options
     Optional directory to change to after detaching.
     Optional directory to change to after detaching.
 
 
 """
 """
-from __future__ import absolute_import
+from __future__ import absolute_import, print_function
 
 
 import os
 import os
 import re
 import re
@@ -78,7 +78,7 @@ for warning in (CDeprecationWarning, CPendingDeprecationWarning):
     warnings.simplefilter('once', warning, 0)
     warnings.simplefilter('once', warning, 0)
 
 
 ARGV_DISABLED = """
 ARGV_DISABLED = """
-Unrecognized command line arguments: %s
+Unrecognized command line arguments: {0}
 
 
 Try --help?
 Try --help?
 """
 """
@@ -91,7 +91,7 @@ class HelpFormatter(IndentedHelpFormatter):
 
 
     def format_epilog(self, epilog):
     def format_epilog(self, epilog):
         if epilog:
         if epilog:
-            return '\n%s\n\n' % epilog
+            return '\n{0}\n\n'.format(epilog)
         return ''
         return ''
 
 
     def format_description(self, description):
     def format_description(self, description):
@@ -202,7 +202,7 @@ class Command(object):
 
 
     def usage(self, command):
     def usage(self, command):
         """Returns the command-line usage string for this app."""
         """Returns the command-line usage string for this app."""
-        return '%%prog [options] %s' % (self.args, )
+        return '%%prog [options] {0.args}'.format(self)
 
 
     def get_options(self):
     def get_options(self):
         """Get supported command line options."""
         """Get supported command line options."""
@@ -238,15 +238,15 @@ class Command(object):
 
 
     def check_args(self, args):
     def check_args(self, args):
         if not self.supports_args and args:
         if not self.supports_args and args:
-            self.die(ARGV_DISABLED % (', '.join(args, )), EX_USAGE)
+            self.die(ARGV_DISABLED.format(', '.join(args)), EX_USAGE)
 
 
     def die(self, msg, status=EX_FAILURE):
     def die(self, msg, status=EX_FAILURE):
-        sys.stderr.write(msg + '\n')
+        print(msg, file=sys.stderr)
         sys.exit(status)
         sys.exit(status)
 
 
     def early_version(self, argv):
     def early_version(self, argv):
         if '--version' in argv:
         if '--version' in argv:
-            sys.stdout.write('%s\n' % self.version)
+            print(self.version)
             sys.exit(0)
             sys.exit(0)
 
 
     def parse_options(self, prog_name, arguments):
     def parse_options(self, prog_name, arguments):
@@ -272,7 +272,7 @@ class Command(object):
             for long_opt, help in doc.iteritems():
             for long_opt, help in doc.iteritems():
                 option = parser.get_option(long_opt)
                 option = parser.get_option(long_opt)
                 if option is not None:
                 if option is not None:
-                    option.help = ' '.join(help) % {'default': option.default}
+                    option.help = ' '.join(help).format(default=option.default)
         return parser
         return parser
 
 
     def prepare_preload_options(self, options):
     def prepare_preload_options(self, options):
@@ -311,7 +311,8 @@ class Command(object):
         sym = self.symbol_by_name(app)
         sym = self.symbol_by_name(app)
         if isinstance(sym, ModuleType):
         if isinstance(sym, ModuleType):
             if getattr(sym, '__path__', None):
             if getattr(sym, '__path__', None):
-                return self.find_app('%s.celery:' % (app.replace(':', ''), ))
+                return self.find_app('{0}.celery:'.format(
+                            app.replace(':', '')))
             return sym.celery
             return sym.celery
         return sym
         return sym
 
 

+ 6 - 7
celery/bin/camqadm.py

@@ -5,13 +5,14 @@ The :program:`celery amqp` command.
 .. program:: celery amqp
 .. program:: celery amqp
 
 
 """
 """
-from __future__ import absolute_import
+from __future__ import absolute_import, print_function
 
 
 import cmd
 import cmd
 import sys
 import sys
 import shlex
 import shlex
 import pprint
 import pprint
 
 
+from functools import partial
 from itertools import count
 from itertools import count
 
 
 from amqplib import client_0_8 as amqp
 from amqplib import client_0_8 as amqp
@@ -35,9 +36,7 @@ Example:
     -> queue.delete myqueue yes no
     -> queue.delete myqueue yes no
 """
 """
 
 
-
-def say(m, fh=sys.stderr):
-    fh.write('%s\n' % (m, ))
+say = partial(print, file=sys.stderr)
 
 
 
 
 class Spec(object):
 class Spec(object):
@@ -200,10 +199,10 @@ class AMQShell(cmd.Cmd):
     def note(self, m):
     def note(self, m):
         """Say something to the user. Disabled if :attr:`silent`."""
         """Say something to the user. Disabled if :attr:`silent`."""
         if not self.silent:
         if not self.silent:
-            say(m, fh=self.out)
+            say(m, file=self.out)
 
 
     def say(self, m):
     def say(self, m):
-        say(m, fh=self.out)
+        say(m, file=self.out)
 
 
     def get_amqp_api_command(self, cmd, arglist):
     def get_amqp_api_command(self, cmd, arglist):
         """With a command name and a list of arguments, convert the arguments
         """With a command name and a list of arguments, convert the arguments
@@ -360,7 +359,7 @@ class AMQPAdmin(object):
 
 
     def note(self, m):
     def note(self, m):
         if not self.silent:
         if not self.silent:
-            say(m, fh=self.out)
+            say(m, file=self.out)
 
 
 
 
 class AMQPAdminCommand(Command):
 class AMQPAdminCommand(Command):

+ 3 - 6
celery/bin/celery.py

@@ -6,7 +6,7 @@ The :program:`celery` umbrella command.
 .. program:: celery
 .. program:: celery
 
 
 """
 """
-from __future__ import absolute_import
+from __future__ import absolute_import, print_function
 
 
 import anyjson
 import anyjson
 import sys
 import sys
@@ -123,10 +123,7 @@ class Command(BaseCommand):
         self.out(s, fh=self.stderr)
         self.out(s, fh=self.stderr)
 
 
     def out(self, s, fh=None):
     def out(self, s, fh=None):
-        s = str(s)
-        if not s.endswith('\n'):
-            s += '\n'
-        (fh or self.stdout).write(s)
+        print(s, file=fh or self.stdout)
 
 
     def run_from_argv(self, prog_name, argv):
     def run_from_argv(self, prog_name, argv):
         self.prog_name = prog_name
         self.prog_name = prog_name
@@ -644,7 +641,7 @@ class status(Command):
                           no_color=kwargs.get('no_color', False),
                           no_color=kwargs.get('no_color', False),
                           stdout=self.stdout, stderr=self.stderr,
                           stdout=self.stdout, stderr=self.stderr,
                           show_reply=False) \
                           show_reply=False) \
-                    .run('ping', quiet=True, show_body=False, **kwargs)
+                    .run('ping', **dict(kwargs, quiet=True, show_body=False))
         if not replies:
         if not replies:
             raise Error('No nodes replied within time constraint',
             raise Error('No nodes replied within time constraint',
                         status=EX_UNAVAILABLE)
                         status=EX_UNAVAILABLE)

+ 1 - 1
celery/bin/celerybeat.py

@@ -17,7 +17,7 @@ The :program:`celery beat` command.
 
 
     Path to the schedule database. Defaults to `celerybeat-schedule`.
     Path to the schedule database. Defaults to `celerybeat-schedule`.
     The extension '.db' may be appended to the filename.
     The extension '.db' may be appended to the filename.
-    Default is %(default)s.
+    Default is {default}.
 
 
 .. cmdoption:: -S, --scheduler
 .. cmdoption:: -S, --scheduler
 
 

+ 1 - 1
celery/bin/celeryd.py

@@ -62,7 +62,7 @@ The :program:`celery worker` command (previously known as ``celeryd``)
 .. cmdoption:: -S, --statedb
 .. cmdoption:: -S, --statedb
 
 
     Path to the state database. The extension '.db' may
     Path to the state database. The extension '.db' may
-    be appended to the filename. Default: %(default)s
+    be appended to the filename. Default: {default}
 
 
 .. cmdoption:: -E, --events
 .. cmdoption:: -E, --events
 
 

+ 2 - 2
celery/bin/celeryd_multi.py

@@ -88,7 +88,7 @@ Examples
     celeryd -n xuzzy.myhost -c 3
     celeryd -n xuzzy.myhost -c 3
 
 
 """
 """
-from __future__ import absolute_import
+from __future__ import absolute_import, print_function
 
 
 import errno
 import errno
 import os
 import os
@@ -187,7 +187,7 @@ class MultiTool(object):
         return self.retcode
         return self.retcode
 
 
     def say(self, m, newline=True):
     def say(self, m, newline=True):
-        self.fh.write('%s%s' % (m, '\n' if newline else ''))
+        print(m, file=self.fh, end='\n' if newline else '')
 
 
     def names(self, argv, cmd):
     def names(self, argv, cmd):
         p = NamespacedOptionParser(argv)
         p = NamespacedOptionParser(argv)

+ 1 - 2
celery/canvas.py

@@ -19,7 +19,6 @@ from kombu.utils import fxrange, kwdict, reprcall
 from celery import current_app
 from celery import current_app
 from celery.local import Proxy
 from celery.local import Proxy
 from celery.utils import cached_property, uuid
 from celery.utils import cached_property, uuid
-from celery.utils.compat import chain_from_iterable
 from celery.utils.functional import (
 from celery.utils.functional import (
     maybe_list, is_list, regen,
     maybe_list, is_list, regen,
     chunks as _chunks,
     chunks as _chunks,
@@ -161,7 +160,7 @@ class Signature(dict):
         return self.append_to_list_option('link_error', errback)
         return self.append_to_list_option('link_error', errback)
 
 
     def flatten_links(self):
     def flatten_links(self):
-        return list(chain_from_iterable(_chain([[self]],
+        return list(_chain.from_iterable(_chain([[self]],
                 (link.flatten_links()
                 (link.flatten_links()
                     for link in maybe_list(self.options.get('link')) or []))))
                     for link in maybe_list(self.options.get('link')) or []))))
 
 

+ 2 - 2
celery/contrib/rdb.py

@@ -34,7 +34,7 @@ Inspired by http://snippets.dzone.com/posts/show/7248
     base port.  The selected port will be logged by the worker.
     base port.  The selected port will be logged by the worker.
 
 
 """
 """
-from __future__ import absolute_import
+from __future__ import absolute_import, print_function
 
 
 import errno
 import errno
 import os
 import os
@@ -108,7 +108,7 @@ class Rdb(Pdb):
                 'environment variable CELERY_RDB_PORT' % (self.me, ))
                 'environment variable CELERY_RDB_PORT' % (self.me, ))
 
 
     def say(self, m):
     def say(self, m):
-        self.out.write(m + '\n')
+        print(m, file=self.out)
 
 
     def _close_session(self):
     def _close_session(self):
         self.stdin, self.stdout = sys.stdin, sys.stdout = self._prev_handles
         self.stdin, self.stdout = sys.stdin, sys.stdout = self._prev_handles

+ 7 - 5
celery/datastructures.py

@@ -6,12 +6,13 @@
     Custom types and data structures.
     Custom types and data structures.
 
 
 """
 """
-from __future__ import absolute_import
+from __future__ import absolute_import, print_function
 
 
 import sys
 import sys
 import time
 import time
 
 
 from collections import defaultdict
 from collections import defaultdict
+from functools import partial
 from itertools import chain
 from itertools import chain
 
 
 from billiard.einfo import ExceptionInfo  # noqa
 from billiard.einfo import ExceptionInfo  # noqa
@@ -163,13 +164,14 @@ class DependencyGraph(object):
         :param fh: A file, or a file-like object to write the graph to.
         :param fh: A file, or a file-like object to write the graph to.
 
 
         """
         """
-        fh.write('digraph dependencies {\n')
+        P = partial(print, file=fh)
+        P('digraph dependencies {')
         for obj, adjacent in self.iteritems():
         for obj, adjacent in self.iteritems():
             if not adjacent:
             if not adjacent:
-                fh.write(ws + '"%s"\n' % (obj, ))
+                P(ws + '"{0}"'.format(obj))
             for req in adjacent:
             for req in adjacent:
-                fh.write(ws + '"%s" -> "%s"\n' % (obj, req))
-        fh.write('}\n')
+                P(ws + '"{0}" -> "{1}"'.format(obj, req))
+        P('}')
 
 
     def __iter__(self):
     def __iter__(self):
         return self.adjacent.iterkeys()
         return self.adjacent.iterkeys()

+ 5 - 5
celery/events/cursesmon.py

@@ -6,7 +6,7 @@
     Graphical monitor of Celery events using curses.
     Graphical monitor of Celery events using curses.
 
 
 """
 """
-from __future__ import absolute_import
+from __future__ import absolute_import, print_function
 
 
 import curses
 import curses
 import sys
 import sys
@@ -473,11 +473,11 @@ class DisplayThread(threading.Thread):  # pragma: no cover
 def capture_events(app, state, display):  # pragma: no cover
 def capture_events(app, state, display):  # pragma: no cover
 
 
     def on_connection_error(exc, interval):
     def on_connection_error(exc, interval):
-        sys.stderr.write('Connection Error: %r. Retry in %ss.' % (
-            exc, interval))
+        print('Connection Error: {0!r}. Retry in {1}s.'.format(
+                exc, interval), file=sys.stderr)
 
 
     while 1:
     while 1:
-        sys.stderr.write('-> evtop: starting capture...\n')
+        print('-> evtop: starting capture...', file=sys.stderr)
         with app.connection() as conn:
         with app.connection() as conn:
             try:
             try:
                 conn.ensure_connection(on_connection_error,
                 conn.ensure_connection(on_connection_error,
@@ -488,7 +488,7 @@ def capture_events(app, state, display):  # pragma: no cover
                 with recv.consumer():
                 with recv.consumer():
                     recv.drain_events(timeout=1, ignore_timeouts=True)
                     recv.drain_events(timeout=1, ignore_timeouts=True)
             except conn.connection_errors + conn.channel_errors as exc:
             except conn.connection_errors + conn.channel_errors as exc:
-                sys.stderr.write('Connection lost: %r' % (exc, ))
+                print('Connection lost: {0!r}'.format(exc), file=sys.stderr)
 
 
 
 
 def evtop(app=None):  # pragma: no cover
 def evtop(app=None):  # pragma: no cover

+ 9 - 12
celery/events/dumper.py

@@ -7,7 +7,7 @@
     as they happen.  Think of it like a `tcpdump` for Celery events.
     as they happen.  Think of it like a `tcpdump` for Celery events.
 
 
 """
 """
-from __future__ import absolute_import
+from __future__ import absolute_import, print_function
 
 
 import sys
 import sys
 
 
@@ -31,17 +31,13 @@ def humanize_type(type):
         return type.lower().replace('-', ' ')
         return type.lower().replace('-', ' ')
 
 
 
 
-def say(msg, out=sys.stdout):
-    out.write(msg + '\n')
-
-
 class Dumper(object):
 class Dumper(object):
 
 
     def __init__(self, out=sys.stdout):
     def __init__(self, out=sys.stdout):
         self.out = out
         self.out = out
 
 
     def say(self, msg):
     def say(self, msg):
-        say(msg, out=self.out)
+        print(msg, file=self.out)
 
 
     def on_event(self, event):
     def on_event(self, event):
         timestamp = datetime.utcfromtimestamp(event.pop('timestamp'))
         timestamp = datetime.utcfromtimestamp(event.pop('timestamp'))
@@ -50,7 +46,8 @@ class Dumper(object):
         if type.startswith('task-'):
         if type.startswith('task-'):
             uuid = event.pop('uuid')
             uuid = event.pop('uuid')
             if type in ('task-received', 'task-sent'):
             if type in ('task-received', 'task-sent'):
-                task = TASK_NAMES[uuid] = '%s(%s) args=%s kwargs=%s' % (
+                task = TASK_NAMES[uuid] = '{0}({1}) args={2} kwargs={3}' \
+                    .format(
                         event.pop('name'), uuid,
                         event.pop('name'), uuid,
                         event.pop('args'),
                         event.pop('args'),
                         event.pop('kwargs'))
                         event.pop('kwargs'))
@@ -58,17 +55,17 @@ class Dumper(object):
                 task = TASK_NAMES.get(uuid, '')
                 task = TASK_NAMES.get(uuid, '')
             return self.format_task_event(hostname, timestamp,
             return self.format_task_event(hostname, timestamp,
                                           type, task, event)
                                           type, task, event)
-        fields = ', '.join('%s=%s' % (key, event[key])
+        fields = ', '.join('{0}={1}'.format(key, event[key])
                         for key in sorted(event.keys()))
                         for key in sorted(event.keys()))
         sep = fields and ':' or ''
         sep = fields and ':' or ''
-        self.say('%s [%s] %s%s %s' % (hostname, timestamp,
-                                      humanize_type(type), sep, fields))
+        self.say('{0} [{1}] {2}{3} {4}'.format(hostname, timestamp,
+                                            humanize_type(type), sep, fields))
 
 
     def format_task_event(self, hostname, timestamp, type, task, event):
     def format_task_event(self, hostname, timestamp, type, task, event):
-        fields = ', '.join('%s=%s' % (key, event[key])
+        fields = ', '.join('{0}={1}'.format(key, event[key])
                         for key in sorted(event.keys()))
                         for key in sorted(event.keys()))
         sep = fields and ':' or ''
         sep = fields and ':' or ''
-        self.say('%s [%s] %s%s %s %s' % (hostname, timestamp,
+        self.say('{0} [{1}] {2}{3} {4} {5}'.format(hostname, timestamp,
                     humanize_type(type), sep, task, fields))
                     humanize_type(type), sep, task, fields))
 
 
 
 

+ 2 - 3
celery/events/snapshot.py

@@ -94,9 +94,8 @@ def evcam(camera, freq=1.0, maxrate=None, loglevel=0,
 
 
     app.log.setup_logging_subsystem(loglevel, logfile)
     app.log.setup_logging_subsystem(loglevel, logfile)
 
 
-    logger.info(
-        '-> evcam: Taking snapshots with %s (every %s secs.)\n' % (
-            camera, freq))
+    print('-> evcam: Taking snapshots with {0} (every {1} secs.)'.format(
+                camera, freq))
     state = app.events.State()
     state = app.events.State()
     cam = instantiate(camera, state, app=app, freq=freq,
     cam = instantiate(camera, state, app=app, freq=freq,
                       maxrate=maxrate, timer=timer)
                       maxrate=maxrate, timer=timer)

+ 10 - 7
celery/events/state.py

@@ -75,8 +75,11 @@ class Worker(Element):
                 self.heartbeats = self.heartbeats[self.heartbeat_max:]
                 self.heartbeats = self.heartbeats[self.heartbeat_max:]
 
 
     def __repr__(self):
     def __repr__(self):
-        return '<Worker: %s (%s)' % (self.hostname,
-                                     self.alive and 'ONLINE' or 'OFFLINE')
+        return '<Worker: {0.hostname} (0.status_string)'.format(self)
+
+    @property
+    def status_string(self):
+        return 'ONLINE' if self.alive else 'OFFLINE'
 
 
     @property
     @property
     def heartbeat_expires(self):
     def heartbeat_expires(self):
@@ -196,7 +199,7 @@ class Task(Element):
         return dict(_keys())
         return dict(_keys())
 
 
     def __repr__(self):
     def __repr__(self):
-        return '<Task: %s(%s) %s>' % (self.name, self.uuid, self.state)
+        return '<Task: {0.name}({0.uuid}) {0.state}>'.format(self)
 
 
     @property
     @property
     def ready(self):
     def ready(self):
@@ -272,7 +275,7 @@ class State(object):
         hostname = fields.pop('hostname', None)
         hostname = fields.pop('hostname', None)
         if hostname:
         if hostname:
             worker = self.get_or_create_worker(hostname)
             worker = self.get_or_create_worker(hostname)
-            handler = getattr(worker, 'on_%s' % type, None)
+            handler = getattr(worker, 'on_' + type, None)
             if handler:
             if handler:
                 handler(**fields)
                 handler(**fields)
 
 
@@ -282,7 +285,7 @@ class State(object):
         hostname = fields.pop('hostname')
         hostname = fields.pop('hostname')
         worker = self.get_or_create_worker(hostname)
         worker = self.get_or_create_worker(hostname)
         task = self.get_or_create_task(uuid)
         task = self.get_or_create_task(uuid)
-        handler = getattr(task, 'on_%s' % type, None)
+        handler = getattr(task, 'on_' + type, None)
         if type == 'received':
         if type == 'received':
             self.task_count += 1
             self.task_count += 1
         if handler:
         if handler:
@@ -351,8 +354,8 @@ class State(object):
         return [w for w in self.workers.values() if w.alive]
         return [w for w in self.workers.values() if w.alive]
 
 
     def __repr__(self):
     def __repr__(self):
-        return '<ClusterState: events=%s tasks=%s>' % (self.event_count,
-                                                       self.task_count)
+        return '<State: events={0.event_count} tasks={0.task_count}>' \
+                    .format(self)
 
 
 
 
 state = State()
 state = State()

+ 4 - 3
celery/loaders/base.py

@@ -26,7 +26,7 @@ from celery.utils.functional import maybe_list
 BUILTIN_MODULES = frozenset()
 BUILTIN_MODULES = frozenset()
 
 
 ERROR_ENVVAR_NOT_SET = (
 ERROR_ENVVAR_NOT_SET = (
-"""The environment variable %r is not set,
+"""The environment variable {0!r} is not set,
 and as such the configuration could not be loaded.
 and as such the configuration could not be loaded.
 Please set this variable and make it point to
 Please set this variable and make it point to
 a configuration module.""")
 a configuration module.""")
@@ -126,7 +126,8 @@ class BaseLoader(object):
         if not module_name:
         if not module_name:
             if silent:
             if silent:
                 return False
                 return False
-            raise ImproperlyConfigured(self.error_envvar_not_set % module_name)
+            raise ImproperlyConfigured(
+                    self.error_envvar_not_set.format(module_name))
         return self.config_from_object(module_name, silent=silent)
         return self.config_from_object(module_name, silent=silent)
 
 
     def config_from_object(self, obj, silent=False):
     def config_from_object(self, obj, silent=False):
@@ -186,7 +187,7 @@ class BaseLoader(object):
                     value = NAMESPACES[ns][key].to_python(value)
                     value = NAMESPACES[ns][key].to_python(value)
                 except ValueError as exc:
                 except ValueError as exc:
                     # display key name in error message.
                     # display key name in error message.
-                    raise ValueError('%r: %s' % (ns_key, exc))
+                    raise ValueError('{0!r}: {1}'.format(ns_key, exc))
             return ns_key, value
             return ns_key, value
 
 
         return dict(map(getarg, args))
         return dict(map(getarg, args))

+ 9 - 9
celery/loaders/default.py

@@ -25,12 +25,12 @@ DEFAULT_CONFIG_MODULE = 'celeryconfig'
 C_WNOCONF = strtobool(os.environ.get('C_WNOCONF', False))
 C_WNOCONF = strtobool(os.environ.get('C_WNOCONF', False))
 
 
 CONFIG_INVALID_NAME = """
 CONFIG_INVALID_NAME = """
-Error: Module '%(module)s' doesn't exist, or it's not a valid \
+Error: Module '{module}' doesn't exist, or it's not a valid \
 Python module name.
 Python module name.
 """
 """
 
 
 CONFIG_WITH_SUFFIX = CONFIG_INVALID_NAME + """
 CONFIG_WITH_SUFFIX = CONFIG_INVALID_NAME + """
-Did you mean '%(suggest)s'?
+Did you mean '{suggest}'?
 """
 """
 
 
 
 
@@ -53,18 +53,18 @@ class Loader(BaseLoader):
         except NotAPackage:
         except NotAPackage:
             if configname.endswith('.py'):
             if configname.endswith('.py'):
                 raise NotAPackage, NotAPackage(
                 raise NotAPackage, NotAPackage(
-                        CONFIG_WITH_SUFFIX % {
-                            'module': configname,
-                            'suggest': configname[:-3]}), sys.exc_info()[2]
+                        CONFIG_WITH_SUFFIX.format(
+                            module=configname,
+                            suggest=configname[:-3])), sys.exc_info()[2]
             raise NotAPackage, NotAPackage(
             raise NotAPackage, NotAPackage(
-                    CONFIG_INVALID_NAME % {
-                        'module': configname}), sys.exc_info()[2]
+                    CONFIG_INVALID_NAME.format(
+                        module=configname)), sys.exc_info()[2]
         except ImportError:
         except ImportError:
             # billiard sets this if forked using execv
             # billiard sets this if forked using execv
             if C_WNOCONF and not os.environ.get('FORKED_BY_MULTIPROCESSING'):
             if C_WNOCONF and not os.environ.get('FORKED_BY_MULTIPROCESSING'):
                 warnings.warn(NotConfigured(
                 warnings.warn(NotConfigured(
-                    'No %r module found! Please make sure it exists and '
-                    'is available to Python.' % (configname, )))
+                    'No {module} module found! Please make sure it exists and '
+                    'is available to Python.'.format(module=configname)))
             return self.setup_settings({})
             return self.setup_settings({})
         else:
         else:
             celeryconfig = self.import_from_cwd(configname)
             celeryconfig = self.import_from_cwd(configname)

+ 15 - 15
celery/platforms.py

@@ -7,7 +7,7 @@
     users, groups, and so on.
     users, groups, and so on.
 
 
 """
 """
-from __future__ import absolute_import
+from __future__ import absolute_import, print_function
 
 
 import atexit
 import atexit
 import errno
 import errno
@@ -48,8 +48,8 @@ PIDFILE_MODE = ((os.R_OK | os.W_OK) << 6) | ((os.R_OK) << 3) | ((os.R_OK))
 
 
 _setps_bucket = TokenBucket(0.5)  # 30/m, every 2 seconds
 _setps_bucket = TokenBucket(0.5)  # 30/m, every 2 seconds
 
 
-PIDLOCKED = """ERROR: Pidfile (%s) already exists.
-Seems we're already running? (PID: %s)"""
+PIDLOCKED = """ERROR: Pidfile ({0}) already exists.
+Seems we're already running? (PID: {1})"""
 
 
 
 
 def pyimplementation():
 def pyimplementation():
@@ -132,14 +132,14 @@ class PIDFile(object):
             line = fh.readline()
             line = fh.readline()
             if line.strip() == line:  # must contain '\n'
             if line.strip() == line:  # must contain '\n'
                 raise ValueError(
                 raise ValueError(
-                    'Partially written or invalid pidfile %r' % (self.path))
+                    'Partial or invalid pidfile {0.path}'.format(self))
         finally:
         finally:
             fh.close()
             fh.close()
 
 
         try:
         try:
             return int(line.strip())
             return int(line.strip())
         except ValueError:
         except ValueError:
-            raise ValueError('PID file %r contents invalid.' % self.path)
+            raise ValueError('PID file {0.path} invalid.'.format(self))
 
 
     def remove(self):
     def remove(self):
         """Removes the lock."""
         """Removes the lock."""
@@ -156,7 +156,7 @@ class PIDFile(object):
         try:
         try:
             pid = self.read_pid()
             pid = self.read_pid()
         except ValueError as exc:
         except ValueError as exc:
-            sys.stderr.write('Broken pidfile found. Removing it.\n')
+            print('Broken pidfile found. Removing it.', file=sys.stderr)
             self.remove()
             self.remove()
             return True
             return True
         if not pid:
         if not pid:
@@ -167,14 +167,14 @@ class PIDFile(object):
             os.kill(pid, 0)
             os.kill(pid, 0)
         except os.error as exc:
         except os.error as exc:
             if exc.errno == errno.ESRCH:
             if exc.errno == errno.ESRCH:
-                sys.stderr.write('Stale pidfile exists. Removing it.\n')
+                print('Stale pidfile exists. Removing it.', file=sys.stderr)
                 self.remove()
                 self.remove()
                 return True
                 return True
         return False
         return False
 
 
     def write_pid(self):
     def write_pid(self):
         pid = os.getpid()
         pid = os.getpid()
-        content = '%d\n' % (pid, )
+        content = '{0}\n'.format(pid)
 
 
         pidfile_fd = os.open(self.path, PIDFILE_FLAGS, PIDFILE_MODE)
         pidfile_fd = os.open(self.path, PIDFILE_FLAGS, PIDFILE_MODE)
         pidfile = os.fdopen(pidfile_fd, 'w')
         pidfile = os.fdopen(pidfile_fd, 'w')
@@ -219,7 +219,7 @@ def create_pidlock(pidfile):
     """
     """
     pidlock = PIDFile(pidfile)
     pidlock = PIDFile(pidfile)
     if pidlock.is_locked() and not pidlock.remove_if_stale():
     if pidlock.is_locked() and not pidlock.remove_if_stale():
-        raise SystemExit(PIDLOCKED % (pidfile, pidlock.read_pid()))
+        raise SystemExit(PIDLOCKED.format(pidfile, pidlock.read_pid()))
     pidlock.acquire()
     pidlock.acquire()
     atexit.register(pidlock.release)
     atexit.register(pidlock.release)
     return pidlock
     return pidlock
@@ -339,7 +339,7 @@ def parse_uid(uid):
         try:
         try:
             return pwd.getpwnam(uid).pw_uid
             return pwd.getpwnam(uid).pw_uid
         except (AttributeError, KeyError):
         except (AttributeError, KeyError):
-            raise KeyError('User does not exist: %r' % (uid, ))
+            raise KeyError('User does not exist: {0}'.format(uid))
 
 
 
 
 def parse_gid(gid):
 def parse_gid(gid):
@@ -355,7 +355,7 @@ def parse_gid(gid):
         try:
         try:
             return grp.getgrnam(gid).gr_gid
             return grp.getgrnam(gid).gr_gid
         except (AttributeError, KeyError):
         except (AttributeError, KeyError):
-            raise KeyError('Group does not exist: %r' % (gid, ))
+            raise KeyError('Group does not exist: {0}'.format(gid))
 
 
 
 
 def _setgroups_hack(groups):
 def _setgroups_hack(groups):
@@ -572,8 +572,8 @@ def set_process_title(progname, info=None):
     Only works if :mod:`setproctitle` is installed.
     Only works if :mod:`setproctitle` is installed.
 
 
     """
     """
-    proctitle = '[%s]' % progname
-    proctitle = '%s %s' % (proctitle, info) if info else proctitle
+    proctitle = '[{0}]'.format(progname)
+    proctitle = '{0} {1}'.format(proctitle, info) if info else proctitle
     if _setproctitle:
     if _setproctitle:
         _setproctitle.setproctitle(proctitle)
         _setproctitle.setproctitle(proctitle)
     return proctitle
     return proctitle
@@ -594,9 +594,9 @@ else:
         """
         """
         if not rate_limit or _setps_bucket.can_consume(1):
         if not rate_limit or _setps_bucket.can_consume(1):
             if hostname:
             if hostname:
-                progname = '%s@%s' % (progname, hostname.split('.')[0])
+                progname = '{0}@{1}'.format(progname, hostname.split('.')[0])
             return set_process_title(
             return set_process_title(
-                '%s:%s' % (progname, current_process().name), info=info)
+                '{0}:{1}'.format(progname, current_process().name), info=info)
 
 
 
 
 def shellsplit(s):
 def shellsplit(s):

+ 17 - 13
celery/schedules.py

@@ -21,6 +21,16 @@ from .utils.timeutils import (timedelta_seconds, weekday, maybe_timedelta,
                               timezone)
                               timezone)
 from .datastructures import AttributeDict
 from .datastructures import AttributeDict
 
 
+CRON_PATTERN_INVALID = """\
+Invalid crontab pattern. Valid range is {min}-{max}. \
+'{value}' was found.\
+"""
+
+CRON_INVALID_TYPE = """\
+Argument cronspec needs to be of any of the following types: \
+int, basestring, or an iterable type. {type!r} was given.\
+"""
+
 
 
 class ParseException(Exception):
 class ParseException(Exception):
     """Raised by crontab_parser when the input can't be parsed."""
     """Raised by crontab_parser when the input can't be parsed."""
@@ -80,7 +90,7 @@ class schedule(object):
         return False, rem
         return False, rem
 
 
     def __repr__(self):
     def __repr__(self):
-        return '<freq: %s>' % self.human_seconds
+        return '<freq: {0.human_seconds}>'.format(self)
 
 
     def __eq__(self, other):
     def __eq__(self, other):
         if isinstance(other, schedule):
         if isinstance(other, schedule):
@@ -195,11 +205,11 @@ class crontab_parser(object):
             try:
             try:
                 i = weekday(s)
                 i = weekday(s)
             except KeyError:
             except KeyError:
-                raise ValueError("Invalid weekday literal '%s'." % s)
+                raise ValueError("Invalid weekday literal {0!r}.".format(s))
 
 
         if i < self.min_:
         if i < self.min_:
-            raise ValueError('Invalid beginning range: %s < %s.' %
-                                                   (i, self.min_))
+            raise ValueError(
+                'Invalid beginning range: {0} < {1}.'.format(i, self.min_))
         return i
         return i
 
 
 
 
@@ -304,19 +314,13 @@ class crontab(schedule):
         elif is_iterable(cronspec):
         elif is_iterable(cronspec):
             result = set(cronspec)
             result = set(cronspec)
         else:
         else:
-            raise TypeError(
-                    'Argument cronspec needs to be of any of the '
-                    'following types: int, basestring, or an iterable type. '
-                    "'%s' was given." % type(cronspec))
+            raise TypeError(CRON_INVALID_TYPE.format(type=type(cronspec)))
 
 
         # assure the result does not preceed the min or exceed the max
         # assure the result does not preceed the min or exceed the max
         for number in result:
         for number in result:
             if number >= max_ + min_ or number < min_:
             if number >= max_ + min_ or number < min_:
-                raise ValueError(
-                        'Invalid crontab pattern. Valid '
-                        "range is %d-%d. '%d' was found." %
-                        (min_, max_ - 1 + min_, number))
-
+                raise ValueError(CRON_PATTERN_INVALID.format(
+                    min=min_, max=max_ - 1 + min_, value=number))
         return result
         return result
 
 
     def _delta_to_next(self, last_run_at, next_hour, next_minute):
     def _delta_to_next(self, last_run_at, next_hour, next_minute):

+ 5 - 5
celery/task/http.py

@@ -108,13 +108,13 @@ class MutableURL(object):
         scheme, netloc, path, params, query, fragment = self.parts
         scheme, netloc, path, params, query, fragment = self.parts
         query = urlencode(utf8dict(self.query.items()))
         query = urlencode(utf8dict(self.query.items()))
         components = [scheme + '://', netloc, path or '/',
         components = [scheme + '://', netloc, path or '/',
-                      ';%s' % params   if params   else '',
-                      '?%s' % query    if query    else '',
-                      '#%s' % fragment if fragment else '']
+                      ';{0}'.format(params)   if params   else '',
+                      '?{0}'.format(query)    if query    else '',
+                      '#{0}'.format(fragment) if fragment else '']
         return ''.join(filter(None, components))
         return ''.join(filter(None, components))
 
 
     def __repr__(self):
     def __repr__(self):
-        return '<%s: %s>' % (self.__class__.__name__, str(self))
+        return '<{0}: {1}>'.format(type(self).__name__, self)
 
 
 
 
 class HttpDispatch(object):
 class HttpDispatch(object):
@@ -127,7 +127,7 @@ class HttpDispatch(object):
     :param logger: Logger used for user/system feedback.
     :param logger: Logger used for user/system feedback.
 
 
     """
     """
-    user_agent = 'celery/%s' % celery_version
+    user_agent = 'celery/{version}'.format(version=celery_version)
     timeout = 5
     timeout = 5
 
 
     def __init__(self, url, method, task_kwargs, **kwargs):
     def __init__(self, url, method, task_kwargs, **kwargs):

+ 4 - 3
celery/task/sets.py

@@ -68,9 +68,10 @@ class TaskSet(list):
     def _sync_results(self, taskset_id):
     def _sync_results(self, taskset_id):
         return [task.apply(taskset_id=taskset_id) for task in self]
         return [task.apply(taskset_id=taskset_id) for task in self]
 
 
-    def _get_tasks(self):
+    @property
+    def tasks(self):
         return self
         return self
 
 
-    def _set_tasks(self, tasks):
+    @tasks.setter  # noqa
+    def tasks(self, tasks):
         self[:] = tasks
         self[:] = tasks
-    tasks = property(_get_tasks, _set_tasks)

+ 1 - 1
celery/task/trace.py

@@ -292,7 +292,7 @@ def report_internal_error(task, exc):
         _value = task.backend.prepare_exception(exc)
         _value = task.backend.prepare_exception(exc)
         exc_info = ExceptionInfo((_type, _value, _tb), internal=True)
         exc_info = ExceptionInfo((_type, _value, _tb), internal=True)
         warn(RuntimeWarning(
         warn(RuntimeWarning(
-            'Exception raised outside body: %r:\n%s' % (
+            'Exception raised outside body: {0!r}:\n{1}'.format(
                 exc, exc_info.traceback)))
                 exc, exc_info.traceback)))
         return exc_info
         return exc_info
     finally:
     finally:

+ 5 - 5
celery/tests/app/test_beat.py

@@ -61,9 +61,9 @@ class test_ScheduleEntry(Case):
         self.assertEqual(entry.total_run_count, 0)
         self.assertEqual(entry.total_run_count, 0)
 
 
         next_run_at = entry.last_run_at + timedelta(seconds=10)
         next_run_at = entry.last_run_at + timedelta(seconds=10)
-        next = entry.next(next_run_at)
-        self.assertGreaterEqual(next.last_run_at, next_run_at)
-        self.assertEqual(next.total_run_count, 1)
+        next_entry = entry.next(next_run_at)
+        self.assertGreaterEqual(next_entry.last_run_at, next_run_at)
+        self.assertEqual(next_entry.total_run_count, 1)
 
 
     def test_is_due(self):
     def test_is_due(self):
         entry = self.create_entry(schedule=timedelta(seconds=10))
         entry = self.create_entry(schedule=timedelta(seconds=10))
@@ -72,8 +72,8 @@ class test_ScheduleEntry(Case):
         self.assertGreater(next_time_to_run1, 9)
         self.assertGreater(next_time_to_run1, 9)
 
 
         next_run_at = entry.last_run_at - timedelta(seconds=10)
         next_run_at = entry.last_run_at - timedelta(seconds=10)
-        next = entry.next(next_run_at)
-        due2, next_time_to_run2 = next.is_due()
+        next_entry = entry.next(next_run_at)
+        due2, next_time_to_run2 = next_entry.is_due()
         self.assertTrue(due2)
         self.assertTrue(due2)
         self.assertGreater(next_time_to_run2, 9)
         self.assertGreater(next_time_to_run2, 9)
 
 

+ 2 - 2
celery/tests/app/test_loaders.py

@@ -2,6 +2,7 @@ from __future__ import absolute_import
 
 
 import os
 import os
 import sys
 import sys
+import warnings
 
 
 from mock import Mock, patch
 from mock import Mock, patch
 
 
@@ -19,7 +20,6 @@ from celery.utils.imports import NotAPackage
 from celery.utils.mail import SendmailWarning
 from celery.utils.mail import SendmailWarning
 
 
 from celery.tests.utils import AppCase, Case
 from celery.tests.utils import AppCase, Case
-from celery.tests.compat import catch_warnings
 
 
 
 
 class ObjectConfig(object):
 class ObjectConfig(object):
@@ -236,7 +236,7 @@ class test_DefaultLoader(Case):
             def find_module(self, name):
             def find_module(self, name):
                 raise ImportError(name)
                 raise ImportError(name)
 
 
-        with catch_warnings(record=True):
+        with warnings.catch_warnings(record=True):
             l = _Loader()
             l = _Loader()
             self.assertDictEqual(l.conf, {})
             self.assertDictEqual(l.conf, {})
             context_executed[0] = True
             context_executed[0] = True

+ 2 - 2
celery/tests/backends/test_amqp.py

@@ -253,7 +253,7 @@ class test_AMQPBackend(AppCase):
 
 
         b = Backend()
         b = Backend()
         with self.assertRaises(KeyError):
         with self.assertRaises(KeyError):
-            b.get_many(['id1']).next()
+            next(b.get_many(['id1']))
 
 
     def test_test_get_many_raises_inner_block(self):
     def test_test_get_many_raises_inner_block(self):
 
 
@@ -264,7 +264,7 @@ class test_AMQPBackend(AppCase):
 
 
         b = Backend()
         b = Backend()
         with self.assertRaises(KeyError):
         with self.assertRaises(KeyError):
-            b.get_many(['id1']).next()
+            next(b.get_many(['id1']))
 
 
     def test_no_expires(self):
     def test_no_expires(self):
         b = self.create_backend(expires=None)
         b = self.create_backend(expires=None)

+ 0 - 2
celery/tests/backends/test_database.py

@@ -1,7 +1,5 @@
 from __future__ import absolute_import
 from __future__ import absolute_import
 
 
-import sys
-
 from datetime import datetime
 from datetime import datetime
 
 
 from nose import SkipTest
 from nose import SkipTest

+ 0 - 1
celery/tests/bin/test_base.py

@@ -47,7 +47,6 @@ class test_Command(AppCase):
         cmd = Command()
         cmd = Command()
         with self.assertRaises(SystemExit):
         with self.assertRaises(SystemExit):
             cmd.early_version(['--version'])
             cmd.early_version(['--version'])
-        stdout.write.assert_called_with(cmd.version + '\n')
 
 
     def test_execute_from_commandline(self):
     def test_execute_from_commandline(self):
         cmd = MockCommand()
         cmd = MockCommand()

+ 0 - 2
celery/tests/bin/test_celery.py

@@ -60,8 +60,6 @@ class test_Command(AppCase):
     def test_out(self):
     def test_out(self):
         f = Mock()
         f = Mock()
         self.cmd.out('foo', f)
         self.cmd.out('foo', f)
-        f.write.assert_called_with('foo\n')
-        self.cmd.out('foo\n', f)
 
 
     def test_call(self):
     def test_call(self):
         self.cmd.run = Mock()
         self.cmd.run = Mock()

+ 0 - 85
celery/tests/compat.py

@@ -1,85 +0,0 @@
-from __future__ import absolute_import
-
-import sys
-
-
-class WarningMessage(object):
-
-    """Holds the result of a single showwarning() call."""
-
-    _WARNING_DETAILS = ('message', 'category', 'filename', 'lineno', 'file',
-                        'line')
-
-    def __init__(self, message, category, filename, lineno, file=None,
-                    line=None):
-        local_values = locals()
-        for attr in self._WARNING_DETAILS:
-            setattr(self, attr, local_values[attr])
-
-        self._category_name = category and category.__name__ or None
-
-    def __str__(self):
-        return ('{message : %r, category : %r, filename : %r, lineno : %s, '
-                    'line : %r}' % (self.message, self._category_name,
-                                    self.filename, self.lineno, self.line))
-
-
-class catch_warnings(object):
-
-    """A context manager that copies and restores the warnings filter upon
-    exiting the context.
-
-    The 'record' argument specifies whether warnings should be captured by a
-    custom implementation of warnings.showwarning() and be appended to a list
-    returned by the context manager. Otherwise None is returned by the context
-    manager. The objects appended to the list are arguments whose attributes
-    mirror the arguments to showwarning().
-
-    The 'module' argument is to specify an alternative module to the module
-    named 'warnings' and imported under that name. This argument is only
-    useful when testing the warnings module itself.
-
-    """
-
-    def __init__(self, record=False, module=None):
-        """Specify whether to record warnings and if an alternative module
-        should be used other than sys.modules['warnings'].
-
-        For compatibility with Python 3.0, please consider all arguments to be
-        keyword-only.
-
-        """
-        self._record = record
-        self._module = module is None and sys.modules['warnings'] or module
-        self._entered = False
-
-    def __repr__(self):
-        args = []
-        if self._record:
-            args.append('record=True')
-        if self._module is not sys.modules['warnings']:
-            args.append('module=%r' % self._module)
-        name = type(self).__name__
-        return '%s(%s)' % (name, ', '.join(args))
-
-    def __enter__(self):
-        if self._entered:
-            raise RuntimeError('Cannot enter %r twice' % self)
-        self._entered = True
-        self._filters = self._module.filters
-        self._module.filters = self._filters[:]
-        self._showwarning = self._module.showwarning
-        if self._record:
-            log = []
-
-            def showwarning(*args, **kwargs):
-                log.append(WarningMessage(*args, **kwargs))
-
-            self._module.showwarning = showwarning
-            return log
-
-    def __exit__(self, *exc_info):
-        if not self._entered:
-            raise RuntimeError('Cannot exit %r without entering first' % self)
-        self._module.filters = self._filters
-        self._module.showwarning = self._showwarning

+ 3 - 2
celery/tests/compat_modules/test_decorators.py

@@ -1,8 +1,9 @@
 from __future__ import absolute_import
 from __future__ import absolute_import
 
 
+import warnings
+
 from celery.task import base
 from celery.task import base
 
 
-from celery.tests.compat import catch_warnings
 from celery.tests.utils import Case
 from celery.tests.utils import Case
 
 
 
 
@@ -13,7 +14,7 @@ def add(x, y):
 class test_decorators(Case):
 class test_decorators(Case):
 
 
     def setUp(self):
     def setUp(self):
-        with catch_warnings(record=True):
+        with warnings.catch_warnings(record=True):
             from celery import decorators
             from celery import decorators
             self.decorators = decorators
             self.decorators = decorators
 
 

+ 3 - 3
celery/tests/events/test_events.py

@@ -163,12 +163,12 @@ class test_EventReceiver(AppCase):
         try:
         try:
             r = self.app.events.Receiver(connection, node_id='celery.tests')
             r = self.app.events.Receiver(connection, node_id='celery.tests')
             it = r.itercapture(timeout=0.0001, wakeup=False)
             it = r.itercapture(timeout=0.0001, wakeup=False)
-            consumer = it.next()
+            consumer = next(it)
             self.assertTrue(consumer.queues)
             self.assertTrue(consumer.queues)
             self.assertEqual(consumer.callbacks[0], r._receive)
             self.assertEqual(consumer.callbacks[0], r._receive)
 
 
             with self.assertRaises(socket.timeout):
             with self.assertRaises(socket.timeout):
-                it.next()
+                next(it)
 
 
             with self.assertRaises(socket.timeout):
             with self.assertRaises(socket.timeout):
                 r.capture(timeout=0.00001)
                 r.capture(timeout=0.00001)
@@ -194,7 +194,7 @@ class test_EventReceiver(AppCase):
             for ev in evs:
             for ev in evs:
                 producer.send(ev)
                 producer.send(ev)
             it = r.itercapture(limit=4, wakeup=True)
             it = r.itercapture(limit=4, wakeup=True)
-            it.next()  # skip consumer (see itercapture)
+            next(it)  # skip consumer (see itercapture)
             list(it)
             list(it)
             self.assertEqual(events_received[0], 4)
             self.assertEqual(events_received[0], 4)
         finally:
         finally:

+ 8 - 8
celery/tests/events/test_state.py

@@ -167,7 +167,7 @@ class test_State(Case):
 
 
     def test_worker_online_offline(self):
     def test_worker_online_offline(self):
         r = ev_worker_online_offline(State())
         r = ev_worker_online_offline(State())
-        r.next()
+        next(r)
         self.assertTrue(r.state.alive_workers())
         self.assertTrue(r.state.alive_workers())
         self.assertTrue(r.state.workers['utest1'].alive)
         self.assertTrue(r.state.workers['utest1'].alive)
         r.play()
         r.play()
@@ -181,7 +181,7 @@ class test_State(Case):
 
 
     def test_worker_heartbeat_expire(self):
     def test_worker_heartbeat_expire(self):
         r = ev_worker_heartbeats(State())
         r = ev_worker_heartbeats(State())
-        r.next()
+        next(r)
         self.assertFalse(r.state.alive_workers())
         self.assertFalse(r.state.alive_workers())
         self.assertFalse(r.state.workers['utest1'].alive)
         self.assertFalse(r.state.workers['utest1'].alive)
         r.play()
         r.play()
@@ -192,7 +192,7 @@ class test_State(Case):
         r = ev_task_states(State())
         r = ev_task_states(State())
 
 
         # RECEIVED
         # RECEIVED
-        r.next()
+        next(r)
         self.assertTrue(r.tid in r.state.tasks)
         self.assertTrue(r.tid in r.state.tasks)
         task = r.state.tasks[r.tid]
         task = r.state.tasks[r.tid]
         self.assertEqual(task.state, states.RECEIVED)
         self.assertEqual(task.state, states.RECEIVED)
@@ -201,7 +201,7 @@ class test_State(Case):
         self.assertEqual(task.worker.hostname, 'utest1')
         self.assertEqual(task.worker.hostname, 'utest1')
 
 
         # STARTED
         # STARTED
-        r.next()
+        next(r)
         self.assertTrue(r.state.workers['utest1'].alive,
         self.assertTrue(r.state.workers['utest1'].alive,
                 'any task event adds worker heartbeat')
                 'any task event adds worker heartbeat')
         self.assertEqual(task.state, states.STARTED)
         self.assertEqual(task.state, states.STARTED)
@@ -210,14 +210,14 @@ class test_State(Case):
         self.assertEqual(task.worker.hostname, 'utest1')
         self.assertEqual(task.worker.hostname, 'utest1')
 
 
         # REVOKED
         # REVOKED
-        r.next()
+        next(r)
         self.assertEqual(task.state, states.REVOKED)
         self.assertEqual(task.state, states.REVOKED)
         self.assertTrue(task.revoked)
         self.assertTrue(task.revoked)
         self.assertEqual(task.timestamp, task.revoked)
         self.assertEqual(task.timestamp, task.revoked)
         self.assertEqual(task.worker.hostname, 'utest1')
         self.assertEqual(task.worker.hostname, 'utest1')
 
 
         # RETRY
         # RETRY
-        r.next()
+        next(r)
         self.assertEqual(task.state, states.RETRY)
         self.assertEqual(task.state, states.RETRY)
         self.assertTrue(task.retried)
         self.assertTrue(task.retried)
         self.assertEqual(task.timestamp, task.retried)
         self.assertEqual(task.timestamp, task.retried)
@@ -226,7 +226,7 @@ class test_State(Case):
         self.assertEqual(task.traceback, 'line 2 at main')
         self.assertEqual(task.traceback, 'line 2 at main')
 
 
         # FAILURE
         # FAILURE
-        r.next()
+        next(r)
         self.assertEqual(task.state, states.FAILURE)
         self.assertEqual(task.state, states.FAILURE)
         self.assertTrue(task.failed)
         self.assertTrue(task.failed)
         self.assertEqual(task.timestamp, task.failed)
         self.assertEqual(task.timestamp, task.failed)
@@ -235,7 +235,7 @@ class test_State(Case):
         self.assertEqual(task.traceback, 'line 1 at main')
         self.assertEqual(task.traceback, 'line 1 at main')
 
 
         # SUCCESS
         # SUCCESS
-        r.next()
+        next(r)
         self.assertEqual(task.state, states.SUCCESS)
         self.assertEqual(task.state, states.SUCCESS)
         self.assertTrue(task.succeeded)
         self.assertTrue(task.succeeded)
         self.assertEqual(task.timestamp, task.succeeded)
         self.assertEqual(task.timestamp, task.succeeded)

+ 1 - 1
celery/tests/security/test_certificate.py

@@ -25,7 +25,7 @@ class test_Certificate(SecurityCase):
         self.assertRaises(SecurityError, Certificate, KEY1)
         self.assertRaises(SecurityError, Certificate, KEY1)
 
 
     def test_has_expired(self):
     def test_has_expired(self):
-        self.assertTrue(Certificate(CERT1).has_expired())
+        self.assertFalse(Certificate(CERT1).has_expired())
 
 
 
 
 class test_CertStore(SecurityCase):
 class test_CertStore(SecurityCase):

+ 5 - 5
celery/tests/tasks/test_result.py

@@ -379,7 +379,7 @@ class test_GroupResult(AppCase):
         ts = GroupResult(uuid(), [ar])
         ts = GroupResult(uuid(), [ar])
         it = iter(ts)
         it = iter(ts)
         with self.assertRaises(KeyError):
         with self.assertRaises(KeyError):
-            it.next()
+            next(it)
 
 
     def test_forget(self):
     def test_forget(self):
         subs = [MockAsyncResultSuccess(uuid()),
         subs = [MockAsyncResultSuccess(uuid()),
@@ -431,16 +431,16 @@ class test_GroupResult(AppCase):
         ar2 = MockAsyncResultSuccess(uuid())
         ar2 = MockAsyncResultSuccess(uuid())
         ts = GroupResult(uuid(), [ar, ar2])
         ts = GroupResult(uuid(), [ar, ar2])
         it = iter(ts)
         it = iter(ts)
-        self.assertEqual(it.next(), 42)
-        self.assertEqual(it.next(), 42)
+        self.assertEqual(next(it), 42)
+        self.assertEqual(next(it), 42)
 
 
     def test_iterate_eager(self):
     def test_iterate_eager(self):
         ar1 = EagerResult(uuid(), 42, states.SUCCESS)
         ar1 = EagerResult(uuid(), 42, states.SUCCESS)
         ar2 = EagerResult(uuid(), 42, states.SUCCESS)
         ar2 = EagerResult(uuid(), 42, states.SUCCESS)
         ts = GroupResult(uuid(), [ar1, ar2])
         ts = GroupResult(uuid(), [ar1, ar2])
         it = iter(ts)
         it = iter(ts)
-        self.assertEqual(it.next(), 42)
-        self.assertEqual(it.next(), 42)
+        self.assertEqual(next(it), 42)
+        self.assertEqual(next(it), 42)
 
 
     def test_join_timeout(self):
     def test_join_timeout(self):
         ar = MockAsyncResultSuccess(uuid())
         ar = MockAsyncResultSuccess(uuid())

+ 0 - 5
celery/tests/utilities/test_local.py

@@ -1,11 +1,6 @@
 from __future__ import absolute_import
 from __future__ import absolute_import
 
 
-import sys
-
-from nose import SkipTest
-
 from celery.local import Proxy, PromiseProxy, maybe_evaluate, try_import
 from celery.local import Proxy, PromiseProxy, maybe_evaluate, try_import
-
 from celery.tests.utils import Case
 from celery.tests.utils import Case
 
 
 
 

+ 1 - 16
celery/tests/utilities/test_mail.py

@@ -22,25 +22,10 @@ class test_Message(Case):
 
 
 class test_Mailer(Case):
 class test_Mailer(Case):
 
 
-    def test_send_supports_timeout(self):
+    def test_send_wrapper(self):
         mailer = Mailer()
         mailer = Mailer()
-        mailer.supports_timeout = True
         mailer._send = Mock()
         mailer._send = Mock()
         mailer.send(msg)
         mailer.send(msg)
-        mailer._send.assert_called_with(msg, timeout=2)
-
-    @patch('socket.setdefaulttimeout')
-    @patch('socket.getdefaulttimeout')
-    def test_send_no_timeout(self, get, set):
-        mailer = Mailer()
-        mailer.supports_timeout = False
-        mailer._send = Mock()
-        get.return_value = 10
-        mailer.send(msg)
-        get.assert_called_with()
-        sets = set.call_args_list
-        self.assertEqual(sets[0][0], (2, ))
-        self.assertEqual(sets[1][0], (10, ))
         mailer._send.assert_called_with(msg)
         mailer._send.assert_called_with(msg)
 
 
     @patch('smtplib.SMTP_SSL', create=True)
     @patch('smtplib.SMTP_SSL', create=True)

+ 1 - 3
celery/tests/utils.py

@@ -34,8 +34,6 @@ from ..app import app_or_default
 from ..utils.compat import WhateverIO
 from ..utils.compat import WhateverIO
 from ..utils.functional import noop
 from ..utils.functional import noop
 
 
-from .compat import catch_warnings
-
 
 
 class Mock(mock.Mock):
 class Mock(mock.Mock):
 
 
@@ -87,7 +85,7 @@ class _AssertWarnsContext(_AssertRaisesBaseContext):
         for v in sys.modules.values():
         for v in sys.modules.values():
             if getattr(v, '__warningregistry__', None):
             if getattr(v, '__warningregistry__', None):
                 v.__warningregistry__ = {}
                 v.__warningregistry__ = {}
-        self.warnings_manager = catch_warnings(record=True)
+        self.warnings_manager = warnings.catch_warnings(record=True)
         self.warnings = self.warnings_manager.__enter__()
         self.warnings = self.warnings_manager.__enter__()
         warnings.simplefilter('always', self.expected)
         warnings.simplefilter('always', self.expected)
         return self
         return self

+ 6 - 7
celery/tests/worker/test_hub.py

@@ -144,19 +144,18 @@ class test_Hub(Case):
                                          max_delay=32.321), 32.321)
                                          max_delay=32.321), 32.321)
 
 
         hub.timer._queue = [1]
         hub.timer._queue = [1]
-        hub.scheduler = Mock()
-        hub.scheduler.next.return_value = 3.743, None
+        hub.scheduler = iter([(3.743, None)])
         self.assertEqual(hub.fire_timers(), 3.743)
         self.assertEqual(hub.fire_timers(), 3.743)
 
 
         e1, e2, e3 = Mock(), Mock(), Mock()
         e1, e2, e3 = Mock(), Mock(), Mock()
         entries = [e1, e2, e3]
         entries = [e1, e2, e3]
 
 
         def se():
         def se():
-            if entries:
-                return None, entries.pop()
-            return 3.982, None
-        hub.scheduler.next = Mock()
-        hub.scheduler.next.side_effect = se
+            while 1:
+                while entries:
+                    yield None, entries.pop()
+                yield 3.982, None
+        hub.scheduler = se()
 
 
         self.assertEqual(hub.fire_timers(max_timers=10), 3.982)
         self.assertEqual(hub.fire_timers(max_timers=10), 3.982)
         hub.timer.apply_entry.assert_has_calls(map(call, [e3, e2, e1]))
         hub.timer.apply_entry.assert_has_calls(map(call, [e3, e2, e1]))

+ 21 - 20
celery/utils/__init__.py

@@ -6,7 +6,7 @@
     Utility functions.
     Utility functions.
 
 
 """
 """
-from __future__ import absolute_import
+from __future__ import absolute_import, print_function
 
 
 import operator
 import operator
 import os
 import os
@@ -29,14 +29,14 @@ from .compat import StringIO
 from .functional import noop
 from .functional import noop
 
 
 PENDING_DEPRECATION_FMT = """
 PENDING_DEPRECATION_FMT = """
-    %(description)s is scheduled for deprecation in \
-    version %(deprecation)s and removal in version v%(removal)s. \
-    %(alternative)s
+    {description} is scheduled for deprecation in \
+    version {deprecation} and removal in version v{removal}. \
+    {alternative}
 """
 """
 
 
 DEPRECATION_FMT = """
 DEPRECATION_FMT = """
-    %(description)s is deprecated and scheduled for removal in
-    version %(removal)s. %(alternative)s
+    {description} is deprecated and scheduled for removal in
+    version {removal}. {alternative}
 """
 """
 
 
 #: Billiard sets this when execv is enabled.
 #: Billiard sets this when execv is enabled.
@@ -49,13 +49,13 @@ MP_MAIN_FILE = os.environ.get('MP_MAIN_FILE') or None
 WORKER_DIRECT_EXCHANGE = Exchange('C.dq')
 WORKER_DIRECT_EXCHANGE = Exchange('C.dq')
 
 
 #: Format for worker direct queue names.
 #: Format for worker direct queue names.
-WORKER_DIRECT_QUEUE_FORMAT = '%s.dq'
+WORKER_DIRECT_QUEUE_FORMAT = '{hostname}.dq'
 
 
 
 
 def worker_direct(hostname):
 def worker_direct(hostname):
     if isinstance(hostname, Queue):
     if isinstance(hostname, Queue):
         return hostname
         return hostname
-    return Queue(WORKER_DIRECT_QUEUE_FORMAT % hostname,
+    return Queue(WORKER_DIRECT_QUEUE_FORMAT.format(hostname=hostname),
                  WORKER_DIRECT_EXCHANGE,
                  WORKER_DIRECT_EXCHANGE,
                  hostname,
                  hostname,
                  auto_delete=True)
                  auto_delete=True)
@@ -67,9 +67,9 @@ def warn_deprecated(description=None, deprecation=None, removal=None,
            'deprecation': deprecation, 'removal': removal,
            'deprecation': deprecation, 'removal': removal,
            'alternative': alternative}
            'alternative': alternative}
     if deprecation is not None:
     if deprecation is not None:
-        w = CPendingDeprecationWarning(PENDING_DEPRECATION_FMT % ctx)
+        w = CPendingDeprecationWarning(PENDING_DEPRECATION_FMT.format(**ctx))
     else:
     else:
-        w = CDeprecationWarning(DEPRECATION_FMT % ctx)
+        w = CDeprecationWarning(DEPRECATION_FMT.format(**ctx))
     warnings.warn(w)
     warnings.warn(w)
 
 
 
 
@@ -159,20 +159,21 @@ def cry():  # pragma: no cover
             main_thread = t
             main_thread = t
 
 
     out = StringIO()
     out = StringIO()
-    sep = '=' * 49 + '\n'
+    P = partial(print, file=out)
+    sep = '=' * 49
     for tid, frame in sys._current_frames().iteritems():
     for tid, frame in sys._current_frames().iteritems():
         thread = tmap.get(tid, main_thread)
         thread = tmap.get(tid, main_thread)
         if not thread:
         if not thread:
             # skip old junk (left-overs from a fork)
             # skip old junk (left-overs from a fork)
             continue
             continue
-        out.write('%s\n' % (thread.getName(), ))
-        out.write(sep)
+        P('{0.name}'.format(thread))
+        P(sep)
         traceback.print_stack(frame, file=out)
         traceback.print_stack(frame, file=out)
-        out.write(sep)
-        out.write('LOCAL VARIABLES\n')
-        out.write(sep)
+        P(sep)
+        P('LOCAL VARIABLES')
+        P(sep)
         pprint(frame.f_locals, stream=out)
         pprint(frame.f_locals, stream=out)
-        out.write('\n\n')
+        P('\n')
     return out.getvalue()
     return out.getvalue()
 
 
 
 
@@ -195,7 +196,7 @@ def strtobool(term, table={'false': False, 'no': False, '0': False,
         try:
         try:
             return table[term.lower()]
             return table[term.lower()]
         except KeyError:
         except KeyError:
-            raise TypeError('Cannot coerce %r to type bool' % (term, ))
+            raise TypeError('Cannot coerce {0!r} to type bool'.format(term))
     return term
     return term
 
 
 
 
@@ -206,7 +207,7 @@ def jsonify(obj):
     elif isinstance(obj, (tuple, list)):
     elif isinstance(obj, (tuple, list)):
         return map(jsonify, obj)
         return map(jsonify, obj)
     elif isinstance(obj, dict):
     elif isinstance(obj, dict):
-        return dict([(k,jsonify(v)) for k,v in obj.iteritems()])
+        return dict([(k, jsonify(v)) for k, v in obj.iteritems()])
     # See "Date Time String Format" in the ECMA-262 specification.
     # See "Date Time String Format" in the ECMA-262 specification.
     elif isinstance(obj, datetime.datetime):
     elif isinstance(obj, datetime.datetime):
         r = obj.isoformat()
         r = obj.isoformat()
@@ -225,7 +226,7 @@ def jsonify(obj):
     elif isinstance(obj, datetime.timedelta):
     elif isinstance(obj, datetime.timedelta):
         return str(obj)
         return str(obj)
     else:
     else:
-        raise ValueError("Unsupported type: %s" % type(obj))
+        raise ValueError("Unsupported type: {0}".format(type(obj)))
 
 
 
 
 def gen_task_name(app, name, module_name):
 def gen_task_name(app, name, module_name):

+ 0 - 105
celery/utils/compat.py

@@ -48,111 +48,6 @@ try:
 except ImportError:                         # pragma: no cover
 except ImportError:                         # pragma: no cover
     from ordereddict import OrderedDict     # noqa
     from ordereddict import OrderedDict     # noqa
 
 
-############## itertools.zip_longest #######################################
-
-try:
-    from itertools import izip_longest as zip_longest
-except ImportError:                         # pragma: no cover
-    import itertools
-
-    def zip_longest(*args, **kwds):  # noqa
-        fillvalue = kwds.get('fillvalue')
-
-        def sentinel(counter=([fillvalue] * (len(args) - 1)).pop):
-            yield counter()     # yields the fillvalue, or raises IndexError
-
-        fillers = itertools.repeat(fillvalue)
-        iters = [itertools.chain(it, sentinel(), fillers)
-                    for it in args]
-        try:
-            for tup in itertools.izip(*iters):
-                yield tup
-        except IndexError:
-            pass
-
-
-############## itertools.chain.from_iterable ################################
-from itertools import chain
-
-
-def _compat_chain_from_iterable(iterables):  # pragma: no cover
-    for it in iterables:
-        for element in it:
-            yield element
-
-try:
-    chain_from_iterable = getattr(chain, 'from_iterable')
-except AttributeError:   # pragma: no cover
-    chain_from_iterable = _compat_chain_from_iterable
-
-
-############## logging.handlers.WatchedFileHandler ##########################
-import logging
-import os
-from stat import ST_DEV, ST_INO
-import platform as _platform
-
-if _platform.system() == 'Windows':  # pragma: no cover
-    #since windows doesn't go with WatchedFileHandler use FileHandler instead
-    WatchedFileHandler = logging.FileHandler
-else:
-    try:
-        from logging.handlers import WatchedFileHandler
-    except ImportError:  # pragma: no cover
-        class WatchedFileHandler(logging.FileHandler):  # noqa
-            """
-            A handler for logging to a file, which watches the file
-            to see if it has changed while in use. This can happen because of
-            usage of programs such as newsyslog and logrotate which perform
-            log file rotation. This handler, intended for use under Unix,
-            watches the file to see if it has changed since the last emit.
-            (A file has changed if its device or inode have changed.)
-            If it has changed, the old file stream is closed, and the file
-            opened to get a new stream.
-
-            This handler is not appropriate for use under Windows, because
-            under Windows open files cannot be moved or renamed - logging
-            opens the files with exclusive locks - and so there is no need
-            for such a handler. Furthermore, ST_INO is not supported under
-            Windows; stat always returns zero for this value.
-
-            This handler is based on a suggestion and patch by Chad J.
-            Schroeder.
-            """
-            def __init__(self, *args, **kwargs):
-                logging.FileHandler.__init__(self, *args, **kwargs)
-
-                if not os.path.exists(self.baseFilename):
-                    self.dev, self.ino = -1, -1
-                else:
-                    stat = os.stat(self.baseFilename)
-                    self.dev, self.ino = stat[ST_DEV], stat[ST_INO]
-
-            def emit(self, record):
-                """
-                Emit a record.
-
-                First check if the underlying file has changed, and if it
-                has, close the old stream and reopen the file to get the
-                current stream.
-                """
-                if not os.path.exists(self.baseFilename):
-                    stat = None
-                    changed = 1
-                else:
-                    stat = os.stat(self.baseFilename)
-                    changed = ((stat[ST_DEV] != self.dev) or
-                               (stat[ST_INO] != self.ino))
-                if changed and self.stream is not None:
-                    self.stream.flush()
-                    self.stream.close()
-                    self.stream = self._open()
-                    if stat is None:
-                        stat = os.stat(self.baseFilename)
-                    self.dev, self.ino = stat[ST_DEV], stat[ST_INO]
-                logging.FileHandler.emit(self, record)
-
-
 ############## format(int, ',d') ##########################
 ############## format(int, ',d') ##########################
 
 
 if sys.version_info >= (2, 7):  # pragma: no cover
 if sys.version_info >= (2, 7):  # pragma: no cover

+ 1 - 1
celery/utils/functional.py

@@ -57,7 +57,7 @@ class LRUCache(UserDict):
         # remove least recently used key.
         # remove least recently used key.
         with self.mutex:
         with self.mutex:
             if self.limit and len(self.data) >= self.limit:
             if self.limit and len(self.data) >= self.limit:
-                self.data.pop(iter(self.data).next())
+                self.data.pop(next(iter(self.data)))
             self.data[key] = value
             self.data[key] = value
 
 
     def __iter__(self):
     def __iter__(self):

+ 5 - 4
celery/utils/log.py

@@ -6,7 +6,7 @@
     Logging utilities.
     Logging utilities.
 
 
 """
 """
-from __future__ import absolute_import
+from __future__ import absolute_import, print_function
 
 
 import logging
 import logging
 import os
 import os
@@ -87,7 +87,7 @@ class ColorFormatter(logging.Formatter):
             try:
             try:
                 record.msg = safe_str(str_t(color(record.msg)))
                 record.msg = safe_str(str_t(color(record.msg)))
             except Exception as exc:
             except Exception as exc:
-                record.msg = '<Unrepresentable %r: %r>' % (
+                record.msg = '<Unrepresentable {0!r}: {1!r}>'.format(
                         type(record.msg), exc)
                         type(record.msg), exc)
                 record.exc_info = True
                 record.exc_info = True
 
 
@@ -147,7 +147,7 @@ class LoggingProxy(object):
     def write(self, data):
     def write(self, data):
         """Write message to logging object."""
         """Write message to logging object."""
         if in_sighandler:
         if in_sighandler:
-            return sys.__stderr__.write(safe_str(data))
+            print(safe_str(data), file=sys.__stderr__)
         if getattr(self._thread, 'recurse_protection', False):
         if getattr(self._thread, 'recurse_protection', False):
             # Logger is logging back to this file, so stop recursing.
             # Logger is logging back to this file, so stop recursing.
             return
             return
@@ -234,7 +234,8 @@ def _patch_logger_class():
 
 
                 def log(self, *args, **kwargs):
                 def log(self, *args, **kwargs):
                     if in_sighandler:
                     if in_sighandler:
-                        sys.__stderr__.write('CANNOT LOG IN SIGHANDLER')
+                        print('CANNOT LOG IN SIGHANDLER',  # noqa
+                                file=sys.__stderr__)
                         return
                         return
                     return OldLoggerClass.log(self, *args, **kwargs)
                     return OldLoggerClass.log(self, *args, **kwargs)
             logging.setLoggerClass(SigSafeLogger)
             logging.setLoggerClass(SigSafeLogger)

+ 14 - 27
celery/utils/mail.py

@@ -8,7 +8,6 @@
 """
 """
 from __future__ import absolute_import
 from __future__ import absolute_import
 
 
-import sys
 import smtplib
 import smtplib
 import traceback
 import traceback
 import warnings
 import warnings
@@ -18,8 +17,6 @@ from email.mime.text import MIMEText
 from .functional import maybe_list
 from .functional import maybe_list
 from .imports import symbol_by_name
 from .imports import symbol_by_name
 
 
-supports_timeout = sys.version_info >= (2, 6)
-
 
 
 class SendmailWarning(UserWarning):
 class SendmailWarning(UserWarning):
     """Problem happened while sending the email message."""
     """Problem happened while sending the email message."""
@@ -36,7 +33,7 @@ class Message(object):
         self.charset = charset
         self.charset = charset
 
 
     def __repr__(self):
     def __repr__(self):
-        return '<Email: To:%r Subject:%r>' % (self.to, self.subject)
+        return '<Email: To:{0.to!r} Subject:{0.subject!r}>'.format(self)
 
 
     def __str__(self):
     def __str__(self):
         msg = MIMEText(self.body, 'plain', self.charset)
         msg = MIMEText(self.body, 'plain', self.charset)
@@ -47,7 +44,6 @@ class Message(object):
 
 
 
 
 class Mailer(object):
 class Mailer(object):
-    supports_timeout = supports_timeout
 
 
     def __init__(self, host='localhost', port=0, user=None, password=None,
     def __init__(self, host='localhost', port=0, user=None, password=None,
             timeout=2, use_ssl=False, use_tls=False):
             timeout=2, use_ssl=False, use_tls=False):
@@ -59,30 +55,21 @@ class Mailer(object):
         self.use_ssl = use_ssl
         self.use_ssl = use_ssl
         self.use_tls = use_tls
         self.use_tls = use_tls
 
 
-    def send(self, message, fail_silently=False):
+    def send(self, message, fail_silently=False, **kwargs):
         try:
         try:
-            if self.supports_timeout:
-                self._send(message, timeout=self.timeout)
-            else:
-                import socket
-                old_timeout = socket.getdefaulttimeout()
-                socket.setdefaulttimeout(self.timeout)
-                try:
-                    self._send(message)
-                finally:
-                    socket.setdefaulttimeout(old_timeout)
+            self._send(message, **kwargs)
         except Exception as exc:
         except Exception as exc:
             if not fail_silently:
             if not fail_silently:
                 raise
                 raise
             warnings.warn(SendmailWarning(
             warnings.warn(SendmailWarning(
-                'Mail could not be sent: %r %r\n%r' % (
+                'Mail could not be sent: {0!r} {1!r}\n{2!r}'.format(
                     exc, {'To': ', '.join(message.to),
                     exc, {'To': ', '.join(message.to),
                           'Subject': message.subject},
                           'Subject': message.subject},
                     traceback.format_stack())))
                     traceback.format_stack())))
 
 
     def _send(self, message, **kwargs):
     def _send(self, message, **kwargs):
         Client = smtplib.SMTP_SSL if self.use_ssl else smtplib.SMTP
         Client = smtplib.SMTP_SSL if self.use_ssl else smtplib.SMTP
-        client = Client(self.host, self.port, **kwargs)
+        client = Client(self.host, self.port, timeout=self.timeout, **kwargs)
 
 
         if self.use_tls:
         if self.use_tls:
             client.ehlo()
             client.ehlo()
@@ -140,24 +127,24 @@ class ErrorMail(object):
 
 
     #: Format string used to generate error email subjects.
     #: Format string used to generate error email subjects.
     subject = """\
     subject = """\
-        [celery@%(hostname)s] Error: Task %(name)s (%(id)s): %(exc)s
+        [celery@{hostname}] Error: Task {name} ({id}): {exc!r}
     """
     """
 
 
     #: Format string used to generate error email content.
     #: Format string used to generate error email content.
     body = """
     body = """
-Task %%(name)s with id %%(id)s raised exception:\n%%(exc)r
+Task {{name}} with id {{id}} raised exception:\n{{exc!r}}
 
 
 
 
-Task was called with args: %%(args)s kwargs: %%(kwargs)s.
+Task was called with args: {{args}} kwargs: {{kwargs}}.
 
 
 The contents of the full traceback was:
 The contents of the full traceback was:
 
 
-%%(traceback)s
+{{traceback}}
 
 
-%(EMAIL_SIGNATURE_SEP)s
+{EMAIL_SIGNATURE_SEP}
 Just to let you know,
 Just to let you know,
-py-celery at %%(hostname)s.
-""" % {'EMAIL_SIGNATURE_SEP': EMAIL_SIGNATURE_SEP}
+py-celery at {{hostname}}.
+""".format(EMAIL_SIGNATURE_SEP=EMAIL_SIGNATURE_SEP)
 
 
     error_whitelist = None
     error_whitelist = None
 
 
@@ -174,10 +161,10 @@ py-celery at %%(hostname)s.
         return not self.error_whitelist or isinstance(exc, allow_classes)
         return not self.error_whitelist or isinstance(exc, allow_classes)
 
 
     def format_subject(self, context):
     def format_subject(self, context):
-        return self.subject.strip() % context
+        return self.subject.strip().format(**context)
 
 
     def format_body(self, context):
     def format_body(self, context):
-        return self.body.strip() % context
+        return self.body.strip().format(**context)
 
 
     def send(self, context, exc, fail_silently=True):
     def send(self, context, exc, fail_silently=True):
         if self.should_send(context, exc):
         if self.should_send(context, exc):

+ 1 - 3
celery/utils/serialization.py

@@ -9,13 +9,11 @@
 from __future__ import absolute_import
 from __future__ import absolute_import
 
 
 import inspect
 import inspect
-import sys
-import types
 
 
 try:
 try:
     import cPickle as pickle
     import cPickle as pickle
 except ImportError:
 except ImportError:
-    import pickle
+    import pickle  # noqa
 
 
 from .encoding import safe_repr
 from .encoding import safe_repr
 
 

+ 2 - 2
celery/utils/threads.py

@@ -6,7 +6,7 @@
     Threading utilities.
     Threading utilities.
 
 
 """
 """
-from __future__ import absolute_import
+from __future__ import absolute_import, print_function
 
 
 import os
 import os
 import sys
 import sys
@@ -33,7 +33,7 @@ class bgThread(threading.Thread):
         raise NotImplementedError('subclass responsibility')
         raise NotImplementedError('subclass responsibility')
 
 
     def on_crash(self, msg, *fmt, **kwargs):
     def on_crash(self, msg, *fmt, **kwargs):
-        sys.stderr.write((msg + '\n') % fmt)
+        print(msg % fmt, file=sys.stderr)
         exc_info = sys.exc_info()
         exc_info = sys.exc_info()
         try:
         try:
             traceback.print_exception(exc_info[0], exc_info[1], exc_info[2],
             traceback.print_exception(exc_info[0], exc_info[1], exc_info[2],

+ 4 - 4
celery/utils/timer2.py

@@ -50,7 +50,7 @@ class Entry(object):
         self.tref.cancelled = True
         self.tref.cancelled = True
 
 
     def __repr__(self):
     def __repr__(self):
-        return '<TimerEntry: %s(*%r, **%r)' % (
+        return '<TimerEntry: {0}(*{1!r}, **{2!r})'.format(
                 self.fun.__name__, self.args, self.kwargs)
                 self.fun.__name__, self.args, self.kwargs)
 
 
     if sys.version_info[0] == 3:  # pragma: no cover
     if sys.version_info[0] == 3:  # pragma: no cover
@@ -235,12 +235,12 @@ class Timer(Thread):
         self._is_stopped = Event()
         self._is_stopped = Event()
         self.mutex = Lock()
         self.mutex = Lock()
         self.not_empty = Condition(self.mutex)
         self.not_empty = Condition(self.mutex)
-        self.setDaemon(True)
-        self.setName('Timer-%s' % (self._timer_count(), ))
+        self.daemon = True
+        self.name = 'Timer-{0}'.format(self._timer_count())
 
 
     def _next_entry(self):
     def _next_entry(self):
         with self.not_empty:
         with self.not_empty:
-            delay, entry = self.scheduler.next()
+            delay, entry = next(self.scheduler)
             if entry is None:
             if entry is None:
                 if delay is None:
                 if delay is None:
                     self.not_empty.wait(1.0)
                     self.not_empty.wait(1.0)

+ 6 - 6
celery/utils/timeutils.py

@@ -34,10 +34,10 @@ RATE_MODIFIER_MAP = {'s': lambda n: n,
 
 
 HAVE_TIMEDELTA_TOTAL_SECONDS = hasattr(timedelta, 'total_seconds')
 HAVE_TIMEDELTA_TOTAL_SECONDS = hasattr(timedelta, 'total_seconds')
 
 
-TIME_UNITS = (('day', 60 * 60 * 24.0, lambda n: '%.2f' % n),
-              ('hour', 60 * 60.0, lambda n: '%.2f' % n),
-              ('minute', 60.0, lambda n: '%.2f' % n),
-              ('second', 1.0, lambda n: '%.2f' % n))
+TIME_UNITS = (('day',    60 * 60 * 24.0, lambda n: format(n, '.2f')),
+              ('hour',   60 * 60.0,      lambda n: format(n, '.2f')),
+              ('minute', 60.0,           lambda n: format(n, '.2f')),
+              ('second', 1.0,            lambda n: format(n, '.2f')))
 
 
 
 
 class _Zone(object):
 class _Zone(object):
@@ -181,8 +181,8 @@ def humanize_seconds(secs, prefix=''):
     for unit, divider, formatter in TIME_UNITS:
     for unit, divider, formatter in TIME_UNITS:
         if secs >= divider:
         if secs >= divider:
             w = secs / divider
             w = secs / divider
-            return '%s%s %s' % (prefix, formatter(w),
-                                pluralize(w, unit))
+            return '{0}{1} {2}'.format(prefix, formatter(w),
+                                       pluralize(w, unit))
     return 'now'
     return 'now'
 
 
 
 

+ 2 - 2
celery/worker/buckets.py

@@ -17,13 +17,13 @@ from __future__ import absolute_import
 import threading
 import threading
 
 
 from collections import deque
 from collections import deque
+from itertools import chain, izip_longest
 from time import time, sleep
 from time import time, sleep
 from Queue import Queue, Empty
 from Queue import Queue, Empty
 
 
 from kombu.utils.limits import TokenBucket
 from kombu.utils.limits import TokenBucket
 
 
 from celery.utils import timeutils
 from celery.utils import timeutils
-from celery.utils.compat import zip_longest, chain_from_iterable
 
 
 
 
 class RateLimitExceeded(Exception):
 class RateLimitExceeded(Exception):
@@ -214,7 +214,7 @@ class TaskBucket(object):
         """Flattens the data in all of the buckets into a single list."""
         """Flattens the data in all of the buckets into a single list."""
         # for queues with contents [(1, 2), (3, 4), (5, 6), (7, 8)]
         # for queues with contents [(1, 2), (3, 4), (5, 6), (7, 8)]
         # zips and flattens to [1, 3, 5, 7, 2, 4, 6, 8]
         # zips and flattens to [1, 3, 5, 7, 2, 4, 6, 8]
-        return filter(None, chain_from_iterable(zip_longest(*[bucket.items
+        return filter(None, chain.from_iterable(izip_longest(*[bucket.items
                                     for bucket in self.buckets.values()])))
                                     for bucket in self.buckets.values()])))
 
 
 
 

+ 5 - 5
celery/worker/consumer.py

@@ -133,7 +133,7 @@ The full contents of the message body was:
 """
 """
 
 
 MESSAGE_REPORT_FMT = """\
 MESSAGE_REPORT_FMT = """\
-body: %s {content_type:%s content_encoding:%s delivery_info:%s}\
+body: {0} {{content_type:{1} content_encoding:{2} delivery_info:{3}}}\
 """
 """
 
 
 
 
@@ -503,10 +503,10 @@ class Consumer(object):
         self.qos.decrement_eventually()
         self.qos.decrement_eventually()
 
 
     def _message_report(self, body, message):
     def _message_report(self, body, message):
-        return MESSAGE_REPORT_FMT % (dump_body(message, body),
-                                     safe_repr(message.content_type),
-                                     safe_repr(message.content_encoding),
-                                     safe_repr(message.delivery_info))
+        return MESSAGE_REPORT_FMT.format(dump_body(message, body),
+                                         safe_repr(message.content_type),
+                                         safe_repr(message.content_encoding),
+                                         safe_repr(message.delivery_info))
 
 
     def handle_unknown_message(self, body, message):
     def handle_unknown_message(self, body, message):
         warn(UNKNOWN_FORMAT, self._message_report(body, message))
         warn(UNKNOWN_FORMAT, self._message_report(body, message))

+ 9 - 10
celery/worker/control.py

@@ -43,12 +43,12 @@ def revoke(panel, task_id, terminate=False, signal=None, **kwargs):
         signum = _signals.signum(signal or 'TERM')
         signum = _signals.signum(signal or 'TERM')
         for request in state.active_requests:
         for request in state.active_requests:
             if request.id == task_id:
             if request.id == task_id:
-                action = 'terminated (%s)' % (signum, )
+                action = 'terminated ({0})'.format(signum)
                 request.terminate(panel.consumer.pool, signal=signum)
                 request.terminate(panel.consumer.pool, signal=signum)
                 break
                 break
 
 
     logger.info('Task %s %s.', task_id, action)
     logger.info('Task %s %s.', task_id, action)
-    return {'ok': 'task %s %s' % (task_id, action)}
+    return {'ok': 'task {0} {1}'.format(task_id, action)}
 
 
 
 
 @Panel.register
 @Panel.register
@@ -99,7 +99,7 @@ def rate_limit(panel, task_name, rate_limit, **kwargs):
     try:
     try:
         timeutils.rate(rate_limit)
         timeutils.rate(rate_limit)
     except ValueError as exc:
     except ValueError as exc:
-        return {'error': 'Invalid rate limit string: %s' % exc}
+        return {'error': 'Invalid rate limit string: {!r}'.format(exc)}
 
 
     try:
     try:
         panel.app.tasks[task_name].rate_limit = rate_limit
         panel.app.tasks[task_name].rate_limit = rate_limit
@@ -148,10 +148,9 @@ def dump_schedule(panel, safe=False, **kwargs):
         logger.debug('--Empty schedule--')
         logger.debug('--Empty schedule--')
         return []
         return []
 
 
-    formatitem = lambda (i, item): '%s. %s pri%s %r' % (i,
+    formatitem = lambda (i, item): '{0}. {1} pri{2} {3!r}'.format(i,
             datetime.utcfromtimestamp(item['eta']),
             datetime.utcfromtimestamp(item['eta']),
-            item['priority'],
-            item['item'])
+            item['priority'], item['item'])
     info = map(formatitem, enumerate(schedule.info()))
     info = map(formatitem, enumerate(schedule.info()))
     logger.debug('* Dump of current schedule:\n%s', '\n'.join(info))
     logger.debug('* Dump of current schedule:\n%s', '\n'.join(info))
     scheduled_tasks = []
     scheduled_tasks = []
@@ -212,7 +211,7 @@ def dump_tasks(panel, taskinfoitems=None, **kwargs):
         info = map('='.join, fields.items())
         info = map('='.join, fields.items())
         if not info:
         if not info:
             return task.name
             return task.name
-        return '%s [%s]' % (task.name, ' '.join(info))
+        return '{0} [{1}]'.format(task.name, ' '.join(info))
 
 
     info = map(_extract_info, (tasks[task]
     info = map(_extract_info, (tasks[task]
                                     for task in sorted(tasks.keys())))
                                     for task in sorted(tasks.keys())))
@@ -255,7 +254,7 @@ def autoscale(panel, max=None, min=None):
     autoscaler = panel.consumer.controller.autoscaler
     autoscaler = panel.consumer.controller.autoscaler
     if autoscaler:
     if autoscaler:
         max_, min_ = autoscaler.update(max, min)
         max_, min_ = autoscaler.update(max, min)
-        return {'ok': 'autoscale now min=%r max=%r' % (max_, min_)}
+        return {'ok': 'autoscale now min={0} max={1}'.format(max_, min_)}
     raise ValueError('Autoscale not enabled')
     raise ValueError('Autoscale not enabled')
 
 
 
 
@@ -270,13 +269,13 @@ def add_consumer(panel, queue, exchange=None, exchange_type=None,
         routing_key=None, **options):
         routing_key=None, **options):
     panel.consumer.add_task_queue(queue, exchange, exchange_type,
     panel.consumer.add_task_queue(queue, exchange, exchange_type,
                                   routing_key, **options)
                                   routing_key, **options)
-    return {'ok': 'add consumer %r' % (queue, )}
+    return {'ok': 'add consumer {0}'.format(queue)}
 
 
 
 
 @Panel.register
 @Panel.register
 def cancel_consumer(panel, queue=None, **_):
 def cancel_consumer(panel, queue=None, **_):
     panel.consumer.cancel_task_queue(queue)
     panel.consumer.cancel_task_queue(queue)
-    return {'ok': 'no longer consuming from %s' % (queue, )}
+    return {'ok': 'no longer consuming from {0}'.format(queue)}
 
 
 
 
 @Panel.register
 @Panel.register

+ 1 - 1
celery/worker/hub.py

@@ -147,7 +147,7 @@ class Hub(object):
         delay = None
         delay = None
         if self.timer._queue:
         if self.timer._queue:
             for i in xrange(max_timers):
             for i in xrange(max_timers):
-                delay, entry = self.scheduler.next()
+                delay, entry = next(self.scheduler)
                 if entry is None:
                 if entry is None:
                     break
                     break
                 self.timer.apply_entry(entry)
                 self.timer.apply_entry(entry)

+ 15 - 12
celery/worker/job.py

@@ -416,15 +416,14 @@ class Request(object):
                 'worker_pid': self.worker_pid}
                 'worker_pid': self.worker_pid}
 
 
     def __str__(self):
     def __str__(self):
-        return '%s[%s]%s%s' % (
-                    self.name, self.id,
-                    ' eta:[%s]' % (self.eta, ) if self.eta else '',
-                    ' expires:[%s]' % (self.expires, ) if self.expires else '')
+        return '{0.name}[{0.id}]{1}{2}'.format(self,
+                ' eta:[{0}]'.format(self.eta) if self.eta else '',
+                ' expires:[{0}]'.format(self.expires) if self.expires else '')
     shortinfo = __str__
     shortinfo = __str__
 
 
     def __repr__(self):
     def __repr__(self):
-        return '<%s %s: %s>' % (type(self).__name__, self.id,
-            reprcall(self.name, self.args, self.kwargs))
+        return '<{0} {1}: {2}>'.format(type(self).__name__, self.id,
+                reprcall(self.name, self.args, self.kwargs))
 
 
     @property
     @property
     def tzlocal(self):
     def tzlocal(self):
@@ -437,19 +436,23 @@ class Request(object):
         return (not self.task.ignore_result
         return (not self.task.ignore_result
                  or self.task.store_errors_even_if_ignored)
                  or self.task.store_errors_even_if_ignored)
 
 
-    def _compat_get_task_id(self):
+    @property
+    def task_id(self):
+        # XXX compat
         return self.id
         return self.id
 
 
-    def _compat_set_task_id(self, value):
+    @task_id.setter  # noqa
+    def task_id(self, value):
         self.id = value
         self.id = value
-    task_id = property(_compat_get_task_id, _compat_set_task_id)
 
 
-    def _compat_get_task_name(self):
+    @property
+    def task_name(self):
+        # XXX compat
         return self.name
         return self.name
 
 
-    def _compat_set_task_name(self, value):
+    @task_name.setter  # noqa
+    def task_name(self, value):
         self.name = value
         self.name = value
-    task_name = property(_compat_get_task_name, _compat_set_task_name)
 
 
 
 
 class TaskRequest(Request):
 class TaskRequest(Request):

+ 5 - 4
celery/worker/state.py

@@ -87,9 +87,10 @@ if C_BENCH:  # pragma: no cover
         @atexit.register
         @atexit.register
         def on_shutdown():
         def on_shutdown():
             if bench_first is not None and bench_last is not None:
             if bench_first is not None and bench_last is not None:
-                print('- Time spent in benchmark: %r' % (
-                    bench_last - bench_first))
-                print('- Avg: %s' % (sum(bench_sample) / len(bench_sample)))
+                print('- Time spent in benchmark: {0!r}'.format(
+                        bench_last - bench_first))
+                print('- Avg: {0}'.format(
+                        sum(bench_sample) / len(bench_sample)))
                 memdump()
                 memdump()
 
 
     def task_reserved(request):  # noqa
     def task_reserved(request):  # noqa
@@ -113,7 +114,7 @@ if C_BENCH:  # pragma: no cover
             now = time()
             now = time()
             diff = now - bench_start
             diff = now - bench_start
             print('- Time spent processing %s tasks (since first '
             print('- Time spent processing %s tasks (since first '
-                    'task received): ~%.4fs\n' % (bench_every, diff))
+                    'task received): ~{0:.4f}s\n'.format(bench_every, diff))
             sys.stdout.flush()
             sys.stdout.flush()
             bench_start = bench_last = now
             bench_start = bench_last = now
             bench_sample.append(diff)
             bench_sample.append(diff)