Browse Source

Removes most of .five use

Ask Solem 8 years ago
parent
commit
adaf153de8
100 changed files with 265 additions and 409 deletions
  1. 1 1
      celery/__init__.py
  2. 4 5
      celery/app/amqp.py
  3. 1 2
      celery/app/annotations.py
  4. 3 4
      celery/app/base.py
  5. 6 7
      celery/app/defaults.py
  6. 2 2
      celery/app/log.py
  7. 1 2
      celery/app/registry.py
  8. 4 5
      celery/app/routes.py
  9. 2 2
      celery/app/task.py
  10. 1 1
      celery/app/trace.py
  11. 8 9
      celery/app/utils.py
  12. 1 2
      celery/apps/beat.py
  13. 4 5
      celery/apps/worker.py
  14. 2 3
      celery/backends/__init__.py
  15. 2 2
      celery/backends/async.py
  16. 4 5
      celery/backends/base.py
  17. 0 1
      celery/backends/database/__init__.py
  18. 3 4
      celery/backends/mongodb.py
  19. 1 2
      celery/backends/redis.py
  20. 1 2
      celery/backends/rpc.py
  21. 7 7
      celery/beat.py
  22. 3 4
      celery/bin/amqp.py
  23. 6 6
      celery/bin/base.py
  24. 6 7
      celery/bin/celery.py
  25. 1 2
      celery/bin/graph.py
  26. 3 4
      celery/bin/multi.py
  27. 1 2
      celery/bin/worker.py
  28. 8 10
      celery/bootsteps.py
  29. 8 9
      celery/concurrency/asynpool.py
  30. 3 3
      celery/concurrency/base.py
  31. 1 2
      celery/concurrency/prefork.py
  32. 5 6
      celery/contrib/migrate.py
  33. 0 2
      celery/contrib/rdb.py
  34. 1 2
      celery/contrib/sphinx.py
  35. 1 2
      celery/events/__init__.py
  36. 7 6
      celery/events/cursesmon.py
  37. 10 11
      celery/events/state.py
  38. 1 3
      celery/exceptions.py
  39. 5 6
      celery/loaders/base.py
  40. 2 4
      celery/local.py
  41. 6 7
      celery/platforms.py
  42. 4 4
      celery/result.py
  43. 2 3
      celery/schedules.py
  44. 1 2
      celery/security/certificate.py
  45. 1 4
      celery/security/utils.py
  46. 2 3
      celery/task/base.py
  47. 2 3
      celery/tests/app/test_amqp.py
  48. 1 2
      celery/tests/app/test_app.py
  49. 2 3
      celery/tests/app/test_beat.py
  50. 0 1
      celery/tests/app/test_builtins.py
  51. 2 3
      celery/tests/app/test_defaults.py
  52. 1 2
      celery/tests/app/test_loaders.py
  53. 1 2
      celery/tests/app/test_schedules.py
  54. 1 1
      celery/tests/backends/test_amqp.py
  55. 4 20
      celery/tests/backends/test_base.py
  56. 6 7
      celery/tests/backends/test_cache.py
  57. 1 2
      celery/tests/bin/test_base.py
  58. 0 1
      celery/tests/concurrency/test_prefork.py
  59. 0 1
      celery/tests/events/test_state.py
  60. 2 1
      celery/tests/security/test_security.py
  61. 0 1
      celery/tests/tasks/test_chord.py
  62. 0 1
      celery/tests/tasks/test_result.py
  63. 3 4
      celery/tests/tasks/test_tasks.py
  64. 1 2
      celery/tests/utils/test_collections.py
  65. 1 2
      celery/tests/utils/test_functional.py
  66. 2 2
      celery/tests/utils/test_graph.py
  67. 1 3
      celery/tests/utils/test_imports.py
  68. 2 5
      celery/tests/utils/test_local.py
  69. 0 1
      celery/tests/utils/test_saferef.py
  70. 4 7
      celery/tests/utils/test_saferepr.py
  71. 1 2
      celery/tests/utils/test_term.py
  72. 1 1
      celery/tests/worker/test_control.py
  73. 2 1
      celery/tests/worker/test_loops.py
  74. 1 1
      celery/tests/worker/test_request.py
  75. 1 1
      celery/tests/worker/test_worker.py
  76. 1 4
      celery/utils/abstract.py
  77. 18 55
      celery/utils/collections.py
  78. 2 2
      celery/utils/debug.py
  79. 1 2
      celery/utils/dispatch/signal.py
  80. 2 3
      celery/utils/functional.py
  81. 5 8
      celery/utils/graph.py
  82. 1 2
      celery/utils/imports.py
  83. 3 6
      celery/utils/log.py
  84. 3 5
      celery/utils/saferepr.py
  85. 5 7
      celery/utils/serialization.py
  86. 8 10
      celery/utils/term.py
  87. 1 3
      celery/utils/text.py
  88. 2 3
      celery/utils/threads.py
  89. 1 3
      celery/utils/timer2.py
  90. 2 4
      celery/utils/timeutils.py
  91. 1 2
      celery/worker/__init__.py
  92. 4 4
      celery/worker/consumer/consumer.py
  93. 1 2
      celery/worker/consumer/gossip.py
  94. 2 3
      celery/worker/consumer/mingle.py
  95. 4 3
      celery/worker/control.py
  96. 1 2
      celery/worker/request.py
  97. 4 2
      celery/worker/state.py
  98. 2 1
      docs/tutorials/task-cookbook.rst
  99. 2 2
      funtests/benchmarks/bench_worker.py
  100. 0 1
      funtests/suite/test_leak.py

+ 1 - 1
celery/__init__.py

@@ -47,7 +47,7 @@ del(_temp)
 del(re)
 
 if os.environ.get('C_IMPDEBUG'):  # pragma: no cover
-    from .five import builtins
+    import builtins
 
     def debug_import(name, locals=None, globals=None,
                      fromlist=None, level=-1, real_import=builtins.__import__):

+ 4 - 5
celery/app/amqp.py

@@ -16,7 +16,6 @@ from kombu.utils import cached_property
 from kombu.utils.functional import maybe_list
 
 from celery import signals
-from celery.five import items, string_t
 from celery.local import try_import
 from celery.utils.nodenames import anon_nodename
 from celery.utils.saferepr import saferepr
@@ -47,7 +46,7 @@ task_message = namedtuple('task_message',
 
 def utf8dict(d, encoding='utf-8'):
     return {k.decode(encoding) if isinstance(k, bytes) else k: v
-            for k, v in items(d)}
+            for k, v in d.items()}
 
 
 class Queues(dict):
@@ -77,7 +76,7 @@ class Queues(dict):
         self.max_priority = max_priority
         if isinstance(queues, (tuple, list)):
             queues = {q.name: q for q in queues}
-        for name, q in items(queues or {}):
+        for name, q in (queues or {}).items():
             self.add(q) if isinstance(q, Queue) else self.add_compat(name, **q)
 
     def __getitem__(self, name):
@@ -157,7 +156,7 @@ class Queues(dict):
         if not active:
             return ''
         info = [QUEUE_FORMAT.strip().format(q)
-                for _, q in sorted(items(active))]
+                for _, q in sorted(active.items())]
         if indent_first:
             return textindent('\n'.join(info), indent)
         return info[0] + '\n' + textindent('\n'.join(info[1:]), indent)
@@ -495,7 +494,7 @@ class AMQP:
             if queue is None and exchange is None:
                 queue = default_queue
             if queue is not None:
-                if isinstance(queue, string_t):
+                if isinstance(queue, str):
                     qname, queue = queue, queues[queue]
                 else:
                     qname = queue.name

+ 1 - 2
celery/app/annotations.py

@@ -9,7 +9,6 @@ This prepares and performs the annotations in the
 """
 from __future__ import absolute_import, unicode_literals
 
-from celery.five import string_t
 from celery.utils.functional import firstmethod, mlazy
 from celery.utils.imports import instantiate
 
@@ -40,7 +39,7 @@ def prepare(annotations):
     def expand_annotation(annotation):
         if isinstance(annotation, dict):
             return MapAnnotation(annotation)
-        elif isinstance(annotation, string_t):
+        elif isinstance(annotation, str):
             return mlazy(instantiate, annotation)
         return annotation
 

+ 3 - 4
celery/app/base.py

@@ -6,7 +6,7 @@ import os
 import threading
 import warnings
 
-from collections import defaultdict, deque
+from collections import UserDict, defaultdict, deque
 from operator import attrgetter
 
 from kombu import pools
@@ -25,7 +25,6 @@ from celery._state import (
     _announce_app_finalized,
 )
 from celery.exceptions import AlwaysEagerIgnored, ImproperlyConfigured
-from celery.five import UserDict, bytes_if_py2, values
 from celery.loaders import get_loader_cls
 from celery.local import PromiseProxy, maybe_evaluate
 from celery.utils import abstract
@@ -437,7 +436,7 @@ class Celery:
                 while pending:
                     maybe_evaluate(pending.popleft())
 
-                for task in values(self._tasks):
+                for task in self._tasks.values():
                     task.bind(self)
 
                 self.on_after_finalize.send(sender=self)
@@ -930,7 +929,7 @@ class Celery:
         if not keep_reduce:
             attrs['__reduce__'] = __reduce__
 
-        return type(bytes_if_py2(name or Class.__name__), (Class,), attrs)
+        return type(name or Class.__name__, (Class,), attrs)
 
     def _rgetattr(self, path):
         return attrgetter(path)(self)

+ 6 - 7
celery/app/defaults.py

@@ -7,7 +7,6 @@ import sys
 from collections import deque, namedtuple
 from datetime import timedelta
 
-from celery.five import items, keys, values
 from celery.utils.functional import memoize
 from celery.utils.serialization import strtobool
 
@@ -41,7 +40,7 @@ searchresult = namedtuple('searchresult', ('namespace', 'key', 'type'))
 
 def Namespace(__old__=None, **options):
     if __old__ is not None:
-        for opt in values(options):
+        for opt in options.values():
             if not opt.old:
                 opt.old = __old__
     return options
@@ -62,7 +61,7 @@ class Option:
     def __init__(self, default=None, *args, **kwargs):
         self.default = default
         self.type = kwargs.get('type') or 'string'
-        for attr, value in items(kwargs):
+        for attr, value in kwargs.items():
             setattr(self, attr, value)
 
     def to_python(self, value):
@@ -297,7 +296,7 @@ def flatten(d, root='', keyfilter=_flatten_keys):
     stack = deque([(root, d)])
     while stack:
         ns, options = stack.popleft()
-        for key, opt in items(options):
+        for key, opt in options.items():
             if isinstance(opt, dict):
                 stack.append((ns + key + '_', opt))
             else:
@@ -312,8 +311,8 @@ _TO_OLD_KEY = {new_key: old_key for old_key, new_key, _ in __compat}
 _TO_NEW_KEY = {old_key: new_key for old_key, new_key, _ in __compat}
 __compat = None
 
-SETTING_KEYS = set(keys(DEFAULTS))
-_OLD_SETTING_KEYS = set(keys(_TO_NEW_KEY))
+SETTING_KEYS = set(DEFAULTS)
+_OLD_SETTING_KEYS = set(_TO_NEW_KEY)
 
 
 def find_deprecated_settings(source):  # pragma: no cover
@@ -337,7 +336,7 @@ def find(name, namespace='celery'):
         )
     except KeyError:
         # - Try all the other namespaces.
-        for ns, opts in items(NAMESPACES):
+        for ns, opts in NAMESPACES.items():
             if ns.lower() == name.lower():
                 return searchresult(None, ns, opts)
             elif isinstance(opts, dict):

+ 2 - 2
celery/app/log.py

@@ -19,7 +19,7 @@ from kombu.utils.encoding import set_default_encoding_file
 
 from celery import signals
 from celery._state import get_current_task
-from celery.five import class_property, string_t
+from celery.five import class_property
 from celery.platforms import isatty
 from celery.utils.log import (
     get_logger, mlevel,
@@ -138,7 +138,7 @@ class Logging:
 
         # This is a hack for multiprocessing's fork+exec, so that
         # logging before Process.run works.
-        logfile_name = logfile if isinstance(logfile, string_t) else ''
+        logfile_name = logfile if isinstance(logfile, str) else ''
         os.environ.update(_MP_FORK_LOGLEVEL_=str(loglevel),
                           _MP_FORK_LOGFILE_=logfile_name,
                           _MP_FORK_LOGFORMAT_=format)

+ 1 - 2
celery/app/registry.py

@@ -8,7 +8,6 @@ from importlib import import_module
 
 from celery._state import get_current_app
 from celery.exceptions import NotRegistered
-from celery.five import items
 
 __all__ = ['TaskRegistry']
 
@@ -50,7 +49,7 @@ class TaskRegistry(dict):
         return self.filter_types('periodic')
 
     def filter_types(self, type):
-        return {name: task for name, task in items(self)
+        return {name: task for name, task in self.items()
                 if getattr(task, 'type', 'regular') == type}
 
 

+ 4 - 5
celery/app/routes.py

@@ -13,7 +13,6 @@ from collections import Mapping, OrderedDict
 from kombu import Queue
 
 from celery.exceptions import QueueNotFound
-from celery.five import items, string_t
 from celery.utils.collections import lpmerge
 from celery.utils.functional import maybe_evaluate, mlazy
 from celery.utils.imports import symbol_by_name
@@ -30,7 +29,7 @@ class MapRoute:
     """Creates a router out of a :class:`dict`."""
 
     def __init__(self, map):
-        map = items(map) if isinstance(map, Mapping) else map
+        map = map.items() if isinstance(map, Mapping) else map
         self.map = {}
         self.patterns = OrderedDict()
         for k, v in map:
@@ -48,7 +47,7 @@ class MapRoute:
             pass
         except ValueError:
             return {'queue': self.map[name]}
-        for regex, route in items(self.patterns):
+        for regex, route in self.patterns.items():
             if regex.match(name):
                 try:
                     return dict(route)
@@ -78,7 +77,7 @@ class Router:
 
     def expand_destination(self, route):
         # Route can be a queue name: convenient for direct exchanges.
-        if isinstance(route, string_t):
+        if isinstance(route, str):
             queue, route = route, {}
         else:
             # can use defaults from configured queue, but override specific
@@ -126,7 +125,7 @@ def prepare(routes):
     def expand_route(route):
         if isinstance(route, (Mapping, list, tuple)):
             return MapRoute(route)
-        if isinstance(route, string_t):
+        if isinstance(route, str):
             return mlazy(expand_router_string, route)
         return route
 

+ 2 - 2
celery/app/task.py

@@ -12,7 +12,7 @@ from celery import states
 from celery._state import _task_stack
 from celery.canvas import signature
 from celery.exceptions import Ignore, MaxRetriesExceededError, Reject, Retry
-from celery.five import class_property, items
+from celery.five import class_property
 from celery.result import EagerResult
 from celery.utils import abstract
 from celery.utils.functional import mattrgetter, maybe_list
@@ -326,7 +326,7 @@ class Task:
     @classmethod
     def annotate(self):
         for d in resolve_all_annotations(self.app.annotations, self):
-            for key, value in items(d):
+            for key, value in d.items():
                 if key.startswith('@'):
                     self.add_around(key[1:], value)
                 else:

+ 1 - 1
celery/app/trace.py

@@ -17,6 +17,7 @@ import os
 import sys
 
 from collections import namedtuple
+from time import monotonic
 from warnings import warn
 
 from billiard.einfo import ExceptionInfo
@@ -30,7 +31,6 @@ from celery._state import _task_stack
 from celery.app import set_default_app
 from celery.app.task import Task as BaseTask, Context
 from celery.exceptions import Ignore, Reject, Retry, InvalidTaskError
-from celery.five import monotonic
 from celery.utils.log import get_logger
 from celery.utils.nodenames import gethostname
 from celery.utils.objects import mro_lookup

+ 8 - 9
celery/app/utils.py

@@ -13,7 +13,6 @@ from types import ModuleType
 from kombu.utils.url import maybe_sanitize_url
 
 from celery.exceptions import ImproperlyConfigured
-from celery.five import items, keys, string_t, values
 from celery.platforms import pyimplementation
 from celery.utils.collections import ConfigurationView
 from celery.utils.text import pretty
@@ -152,8 +151,8 @@ class Settings(ConfigurationView):
     def table(self, with_defaults=False, censored=True):
         filt = filter_hidden_settings if censored else lambda v: v
         return filt({
-            k: v for k, v in items(
-                self if with_defaults else self.without_defaults())
+            k: v for k, v in (
+                self if with_defaults else self.without_defaults()).items()
             if not k.startswith('_')
         })
 
@@ -162,7 +161,7 @@ class Settings(ConfigurationView):
         configuration."""
         return '\n'.join(
             '{0}: {1}'.format(key, pretty(value, width=50))
-            for key, value in items(self.table(with_defaults, censored)))
+            for key, value in self.table(with_defaults, censored.items()))
 
 
 def _new_key_to_old(key, convert=_TO_OLD_KEY.get):
@@ -190,7 +189,7 @@ def detect_settings(conf, preconf={}, ignore_keys=set(), prefix=None,
     source = conf
     if conf is None:
         source, conf = preconf, {}
-    have = set(keys(source)) - ignore_keys
+    have = set(source) - ignore_keys
     is_in_new = have.intersection(all_keys)
     is_in_old = have.intersection(old_keys)
 
@@ -227,7 +226,7 @@ def detect_settings(conf, preconf={}, ignore_keys=set(), prefix=None,
             for key in sorted(really_left)
         )))
 
-    preconf = {info.convert.get(k, k): v for k, v in items(preconf)}
+    preconf = {info.convert.get(k, k): v for k, v in preconf.items()}
     defaults = dict(deepcopy(info.defaults), **preconf)
     return Settings(preconf, [conf, defaults], info.key_t, prefix=prefix)
 
@@ -275,7 +274,7 @@ def filter_hidden_settings(conf):
     def maybe_censor(key, value, mask='*' * 8):
         if isinstance(value, Mapping):
             return filter_hidden_settings(value)
-        if isinstance(key, string_t):
+        if isinstance(key, str):
             if HIDDEN_SETTINGS.search(key):
                 return mask
             elif 'broker_url' in key.lower():
@@ -286,7 +285,7 @@ def filter_hidden_settings(conf):
 
         return value
 
-    return {k: maybe_censor(k, v) for k, v in items(conf)}
+    return {k: maybe_censor(k, v) for k, v in conf.items()}
 
 
 def bugreport(app):
@@ -346,7 +345,7 @@ def find_app(app, symbol_by_name=symbol_by_name, imp=import_from_cwd):
                         )
                     except ImportError:
                         pass
-                for suspect in values(vars(sym)):
+                for suspect in vars(sym).values():
                     if isinstance(suspect, Celery):
                         return suspect
                 raise

+ 1 - 2
celery/apps/beat.py

@@ -14,7 +14,6 @@ import sys
 from datetime import datetime
 
 from celery import VERSION_BANNER, platforms, beat
-from celery.five import text_t
 from celery.utils.imports import qualname
 from celery.utils.log import LOG_LEVELS, get_logger
 from celery.utils.timeutils import humanize_seconds
@@ -110,7 +109,7 @@ class Beat:
 
     def banner(self, service):
         c = self.colored
-        return text_t(  # flake8: noqa
+        return str(  # flake8: noqa
             c.blue('__    ', c.magenta('-'),
             c.blue('    ... __   '), c.magenta('-'),
             c.blue('        _\n'),

+ 4 - 5
celery/apps/worker.py

@@ -21,7 +21,6 @@ from kombu.utils.encoding import safe_str
 from celery import VERSION_BANNER, platforms, signals
 from celery.app import trace
 from celery.exceptions import WorkerShutdown, WorkerTerminate
-from celery.five import string, string_t
 from celery.loaders.app import AppLoader
 from celery.platforms import EX_FAILURE, EX_OK, check_privileges, isatty
 from celery.utils.debug import cry
@@ -136,8 +135,8 @@ class Worker(WorkController):
         # Dump configuration to screen so we have some basic information
         # for when users sends bug reports.
         print(safe_str(''.join([
-            string(self.colored.cyan(' \n', self.startup_info())),
-            string(self.colored.reset(self.extra_info() or '')),
+            str(self.colored.cyan(' \n', self.startup_info())),
+            str(self.colored.reset(self.extra_info() or '')),
         ])), file=sys.__stdout__)
         self.set_process_status('-active-')
         self.install_platform_tweaks(self)
@@ -175,7 +174,7 @@ class Worker(WorkController):
 
     def startup_info(self):
         app = self.app
-        concurrency = string(self.concurrency)
+        concurrency = str(self.concurrency)
         appr = '{0}:{1:#x}'.format(app.main or '__main__', id(app))
         if not isinstance(app.loader, AppLoader):
             loader = qualname(app.loader)
@@ -183,7 +182,7 @@ class Worker(WorkController):
                 loader = loader[14:]
             appr += ' ({0})'.format(loader)
         pool = self.pool_cls
-        if not isinstance(pool, string_t):
+        if not isinstance(pool, str):
             pool = pool.__module__
         concurrency += ' ({0})'.format(pool.split('.')[-1])
         events = 'ON'

+ 2 - 3
celery/backends/__init__.py

@@ -8,7 +8,6 @@ import types
 from celery.exceptions import ImproperlyConfigured
 from celery.local import Proxy
 from celery._state import current_app
-from celery.five import reraise
 from celery.utils.imports import symbol_by_name
 
 __all__ = ['get_backend_cls', 'get_backend_by_url']
@@ -47,8 +46,8 @@ def get_backend_cls(backend=None, loader=None):
     try:
         cls = symbol_by_name(backend, aliases)
     except ValueError as exc:
-        reraise(ImproperlyConfigured, ImproperlyConfigured(
-            UNKNOWN_BACKEND.format(backend, exc)), sys.exc_info()[2])
+        raise ImproperlyConfigured(UNKNOWN_BACKEND.format(
+            backend, exc)).with_traceback(sys.exc_info()[2])
     if isinstance(cls, types.ModuleType):
         raise ImproperlyConfigured(UNKNOWN_BACKEND.format(
             backend, 'is a Python module, not a backend class.'))

+ 2 - 2
celery/backends/async.py

@@ -4,15 +4,15 @@ from __future__ import absolute_import, unicode_literals
 import socket
 
 from collections import deque
-from time import sleep
+from time import monotonic, sleep
 from weakref import WeakKeyDictionary
+from queue import Empty
 
 from kombu.syn import detect_environment
 from kombu.utils import cached_property
 
 from celery import states
 from celery.exceptions import TimeoutError
-from celery.five import Empty, monotonic
 
 drainers = {}
 

+ 4 - 5
celery/backends/base.py

@@ -27,7 +27,6 @@ from celery import states
 from celery import current_app, group, maybe_signature
 from celery.app import current_task
 from celery.exceptions import ChordError, TimeoutError, TaskRevokedError
-from celery.five import items
 from celery.result import (
     GroupResult, ResultBase, allow_join_result, result_from_tuple,
 )
@@ -390,7 +389,7 @@ class Backend:
 
     def apply_chord(self, header, partial_args, group_id, body,
                     options={}, **kwargs):
-        fixed_options = {k: v for k, v in items(options) if k != 'task_id'}
+        fixed_options = {k: v for k, v in options.items() if k != 'task_id'}
         result = header(*partial_args, task_id=group_id, **fixed_options or {})
         self.fallback_chord_unlock(group_id, body, **kwargs)
         return result
@@ -555,7 +554,7 @@ class BaseKeyValueStoreBackend(Backend):
             # client returns dict so mapping preserved.
             return {
                 self._strip_prefix(k): v
-                for k, v in self._filter_ready(items(values))
+                for k, v in self._filter_ready(values.items())
             }
         else:
             # client returns list so need to recreate mapping.
@@ -589,7 +588,7 @@ class BaseKeyValueStoreBackend(Backend):
                                                  for k in keys]), keys)
             cache.update(r)
             ids.difference_update({bytes_to_str(v) for v in r})
-            for key, value in items(r):
+            for key, value in r.items():
                 if on_message is not None:
                     on_message(value)
                 yield bytes_to_str(key), value
@@ -644,7 +643,7 @@ class BaseKeyValueStoreBackend(Backend):
                           result=None, options={}, **kwargs):
         self.save_group(group_id, self.app.GroupResult(group_id, result))
 
-        fixed_options = {k: v for k, v in items(options) if k != 'task_id'}
+        fixed_options = {k: v for k, v in options.items() if k != 'task_id'}
 
         return header(*partial_args, task_id=group_id, **fixed_options or {})
 

+ 0 - 1
celery/backends/database/__init__.py

@@ -11,7 +11,6 @@ from vine.utils import wraps
 from celery import states
 from celery.backends.base import BaseBackend
 from celery.exceptions import ImproperlyConfigured
-from celery.five import range
 from celery.utils.timeutils import maybe_timedelta
 
 from .models import Task

+ 3 - 4
celery/backends/mongodb.py

@@ -9,7 +9,6 @@ from kombu.utils.url import maybe_sanitize_url
 from kombu.exceptions import EncodeError
 from celery import states
 from celery.exceptions import ImproperlyConfigured
-from celery.five import string_t, items
 
 from .base import BaseBackend
 
@@ -67,7 +66,7 @@ class MongoBackend(BaseBackend):
                 'MongoDB backend.')
 
         # Set option defaults
-        for key, value in items(self._prepare_client_options()):
+        for key, value in self._prepare_client_options().items():
             self.options.setdefault(key, value)
 
         # update conf with mongo uri data, only if uri was given
@@ -138,8 +137,8 @@ class MongoBackend(BaseBackend):
                 # This enables the use of replica sets and sharding.
                 # See pymongo.Connection() for more info.
                 host = self.host
-                if isinstance(host, string_t) \
-                   and not host.startswith('mongodb://'):
+                if (isinstance(host, str) and
+                        not host.startswith('mongodb://')):
                     host = 'mongodb://{0}:{1}'.format(host, self.port)
             # don't change self.options
             conf = dict(self.options)

+ 1 - 2
celery/backends/redis.py

@@ -11,7 +11,6 @@ from celery import states
 from celery._state import task_join_will_block
 from celery.canvas import maybe_signature
 from celery.exceptions import ChordError, ImproperlyConfigured
-from celery.five import string_t
 from celery.utils import deprecated
 from celery.utils.functional import dictfilter
 from celery.utils.log import get_logger
@@ -165,7 +164,7 @@ class RedisBackend(base.BaseKeyValueStoreBackend, async.AsyncBackendMixin):
 
         # db may be string and start with / like in kombu.
         db = connparams.get('db') or 0
-        db = db.strip('/') if isinstance(db, string_t) else db
+        db = db.strip('/') if isinstance(db, str) else db
         connparams['db'] = int(db)
 
         # Query parameters override other parameters

+ 1 - 2
celery/backends/rpc.py

@@ -12,7 +12,6 @@ from kombu.utils import cached_property, register_after_fork
 from celery import current_task
 from celery import states
 from celery._state import task_join_will_block
-from celery.five import items, range
 from celery.utils.functional import dictfilter
 from celery.utils.timeutils import maybe_s_to_ms
 
@@ -176,7 +175,7 @@ class BaseRPCBackend(base.Backend, AsyncBackendMixin):
                 prev = None
 
         latest = latest_by_id.pop(task_id, None)
-        for tid, msg in items(latest_by_id):
+        for tid, msg in latest_by_id.items():
             self.on_out_of_band_result(tid, msg)
 
         if latest:

+ 7 - 7
celery/beat.py

@@ -13,6 +13,7 @@ import traceback
 from collections import namedtuple
 from functools import total_ordering
 from threading import Event, Thread
+from time import monotonic
 
 from billiard import ensure_multiprocessing
 from billiard.context import Process
@@ -23,7 +24,6 @@ from kombu.utils.functional import maybe_evaluate
 from . import __version__
 from . import platforms
 from . import signals
-from .five import items, monotonic, reraise, values
 from .schedules import maybe_schedule, crontab
 from .utils.imports import instantiate
 from .utils.timeutils import humanize_seconds
@@ -132,7 +132,7 @@ class ScheduleEntry:
         return self.schedule.is_due(self.last_run_at)
 
     def __iter__(self):
-        return iter(items(vars(self)))
+        return iter(vars(self).items())
 
     def __repr__(self):
         return '<{name} {0.name} {call} {0.schedule}'.format(
@@ -250,7 +250,7 @@ class Scheduler:
         H = self._heap
         if H is None:
             H = self._heap = [event_t(_when(e, e.is_due()[1]) or 0, 5, e)
-                              for e in values(self.schedule)]
+                              for e in self.schedule.values()]
             heapify(H)
         if not H:
             return max_interval
@@ -300,9 +300,9 @@ class Scheduler:
                                       producer=producer,
                                       **entry.options)
         except Exception as exc:
-            reraise(SchedulingError, SchedulingError(
+            raise SchedulingError(
                 "Couldn't apply scheduled task {0.name}: {exc}".format(
-                    entry, exc=exc)), sys.exc_info()[2])
+                    entry, exc=exc)).with_traceback(sys.exc_info()[2])
         finally:
             self._tasks_since_sync += 1
             if self.should_sync():
@@ -342,7 +342,7 @@ class Scheduler:
     def update_from_dict(self, dict_):
         self.schedule.update({
             name: self._maybe_entry(name, entry)
-            for name, entry in items(dict_)
+            for name, entry in dict_.items()
         })
 
     def merge_inplace(self, b):
@@ -468,7 +468,7 @@ class PersistentScheduler(Scheduler):
         self._store.update(__version__=__version__, tz=tz, utc_enabled=utc)
         self.sync()
         debug('Current schedule:\n' + '\n'.join(
-            repr(entry) for entry in values(entries)))
+            repr(entry) for entry in entries.values()))
 
     def get_schedule(self):
         return self._store['entries']

+ 3 - 4
celery/bin/amqp.py

@@ -18,7 +18,6 @@ from kombu.utils.encoding import safe_str
 from celery.utils.functional import padlist
 
 from celery.bin.base import Command
-from celery.five import string_t
 from celery.utils.serialization import strtobool
 
 __all__ = ['AMQPAdmin', 'AMQShell', 'Spec', 'amqp']
@@ -251,7 +250,7 @@ class AMQShell(cmd.Cmd):
 
         Look-up order is: :attr:`builtins` -> :attr:`amqp`.
         """
-        if isinstance(arglist, string_t):
+        if isinstance(arglist, str):
             arglist = shlex.split(safe_str(arglist))
         if cmd in self.builtins:
             return getattr(self, self.builtins[cmd])(*arglist)
@@ -271,7 +270,7 @@ class AMQShell(cmd.Cmd):
 
     def onecmd(self, line):
         """Parse line and execute command."""
-        if isinstance(line, string_t):
+        if isinstance(line, str):
             line = shlex.split(safe_str(line))
         cmd, arg, line = self.parseline(line)
         if not line:
@@ -289,7 +288,7 @@ class AMQShell(cmd.Cmd):
     def respond(self, retval):
         """What to do with the return value of a command."""
         if retval is not None:
-            if isinstance(retval, string_t):
+            if isinstance(retval, str):
                 self.say(retval)
             else:
                 self.say(pprint.pformat(retval))

+ 6 - 6
celery/bin/base.py

@@ -11,6 +11,7 @@ import json
 
 from collections import defaultdict
 from heapq import heappush
+from inspect import getfullargspec
 from optparse import (
     OptionParser, OptionGroup, IndentedHelpFormatter, make_option as Option,
 )
@@ -19,7 +20,6 @@ from pprint import pformat
 from celery import VERSION_BANNER, Celery, maybe_patch_concurrency
 from celery import signals
 from celery.exceptions import CDeprecationWarning, CPendingDeprecationWarning
-from celery.five import getfullargspec, items, string, string_t
 from celery.platforms import EX_FAILURE, EX_OK, EX_USAGE
 from celery.utils import term
 from celery.utils import text
@@ -269,7 +269,7 @@ class Command:
         pass
 
     def expanduser(self, value):
-        if isinstance(value, string_t):
+        if isinstance(value, str):
             return os.path.expanduser(value)
         return value
 
@@ -321,7 +321,7 @@ class Command:
         if options:
             options = {
                 k: self.expanduser(v)
-                for k, v in items(vars(options)) if not k.startswith('_')
+                for k, v in vars(options).items() if not k.startswith('_')
             }
         args = [self.expanduser(arg) for arg in args]
         self.check_args(args)
@@ -376,7 +376,7 @@ class Command:
     def prepare_parser(self, parser):
         docs = [self.parse_doc(doc) for doc in (self.doc, __doc__) if doc]
         for doc in docs:
-            for long_opt, help in items(doc):
+            for long_opt, help in doc.items():
                 option = parser.get_option(long_opt)
                 if option is not None:
                     option.help = ' '.join(help).format(default=option.default)
@@ -582,8 +582,8 @@ class Command:
                 return self.pretty_dict_ok_error(n)
             else:
                 return OK, json.dumps(n, sort_keys=True, indent=4)
-        if isinstance(n, string_t):
-            return OK, string(n)
+        if isinstance(n, str):
+            return OK, str(n)
         return OK, pformat(n)
 
     def say_chat(self, direction, title, body=''):

+ 6 - 7
celery/bin/celery.py

@@ -267,7 +267,6 @@ from importlib import import_module
 from kombu.utils import json
 
 from celery.app import defaults
-from celery.five import keys, string_t, values
 from celery.platforms import EX_OK, EX_FAILURE, EX_UNAVAILABLE, EX_USAGE
 from celery.utils import term
 from celery.utils import text
@@ -424,10 +423,10 @@ class call(Command):
                    queue=None, exchange=None, routing_key=None,
                    eta=None, expires=None):
         # arguments
-        args = json.loads(args) if isinstance(args, string_t) else args
-        kwargs = json.loads(kwargs) if isinstance(kwargs, string_t) else kwargs
+        args = json.loads(args) if isinstance(args, str) else args
+        kwargs = json.loads(kwargs) if isinstance(kwargs, str) else kwargs
 
-        # Expires can be int/float.
+        # expires can be int/float.
         try:
             expires = float(expires)
         except (TypeError, ValueError):
@@ -480,7 +479,7 @@ class purge(Command):
     def run(self, force=False, queues=None, exclude_queues=None, **kwargs):
         queues = set(str_to_list(queues or []))
         exclude = set(str_to_list(exclude_queues or []))
-        names = (queues or set(keys(self.app.amqp.queues))) - exclude
+        names = (queues or set(self.app.amqp.queues.keys())) - exclude
         qnum = len(names)
 
         messages = None
@@ -624,7 +623,7 @@ class _RemoteControl(Command):
         output_json = kwargs.get('json')
         destination = kwargs.get('destination')
         timeout = kwargs.get('timeout') or self.choices[method][0]
-        if destination and isinstance(destination, string_t):
+        if destination and isinstance(destination, str):
             destination = [dest.strip() for dest in destination.split(',')]
 
         handler = getattr(self, method, self.call)
@@ -875,7 +874,7 @@ class shell(Command):  # pragma: no cover
 
         if not without_tasks:
             self.locals.update({
-                task.__name__: task for task in values(self.app.tasks)
+                task.__name__: task for task in self.app.tasks.values()
                 if not task.name.startswith('celery.')
             })
 

+ 1 - 2
celery/bin/graph.py

@@ -7,7 +7,6 @@ from __future__ import absolute_import, unicode_literals
 
 from operator import itemgetter
 
-from celery.five import items
 from celery.utils.graph import DependencyGraph, GraphFormatter
 
 from .base import Command
@@ -148,7 +147,7 @@ class graph(Command):
         except KeyError:
             replies = self.app.control.inspect().stats()
             workers, threads = [], []
-            for worker, reply in items(replies):
+            for worker, reply in replies.items():
                 workers.append(worker)
                 threads.append(reply['pool']['max-concurrency'])
 

+ 3 - 4
celery/bin/multi.py

@@ -110,7 +110,6 @@ from kombu.utils import cached_property
 from kombu.utils.encoding import from_utf8
 
 from celery import VERSION_BANNER
-from celery.five import items
 from celery.platforms import Pidfile, IS_WINDOWS
 from celery.utils import term
 from celery.utils.nodenames import (
@@ -468,7 +467,7 @@ def _args_for_node(p, name, prefix, suffix, cmd, append, options):
 
     argv = ([expand(cmd)] +
             [format_opt(opt, expand(value))
-                for opt, value in items(p.optmerge(ns, options))] +
+                for opt, value in p.optmerge(ns, options).items())] +
             [p.passthrough])
     if append:
         argv.append(expand(append))
@@ -508,7 +507,7 @@ def _get_ranges(names):
 def _update_ns_opts(p, names):
     # Numbers in args always refers to the index in the list of names.
     # (e.g. `start foo bar baz -c:1` where 1 is foo, 2 is bar, and so on).
-    for ns_name, ns_opts in list(items(p.namespaces)):
+    for ns_name, ns_opts in list(p.namespaces.items()):
         if ns_name.isdigit():
             ns_index = int(ns_name) - 1
             if ns_index < 0:
@@ -520,7 +519,7 @@ def _update_ns_opts(p, names):
 
 
 def _update_ns_ranges(p, ranges):
-    for ns_name, ns_opts in list(items(p.namespaces)):
+    for ns_name, ns_opts in list(p.namespaces.items()):
         if ',' in ns_name or (ranges and '-' in ns_name):
             for subns in parse_ns_range(ns_name, ranges):
                 p.namespaces[subns].update(ns_opts)

+ 1 - 2
celery/bin/worker.py

@@ -170,7 +170,6 @@ from optparse import OptionGroup
 from celery import concurrency
 from celery.bin.base import Command, daemon_options
 from celery.bin.celeryd_detach import detached_celeryd
-from celery.five import string_t
 from celery.platforms import maybe_drop_privileges
 from celery.utils.log import LOG_LEVELS, mlevel
 from celery.utils.nodenames import default_nodename
@@ -231,7 +230,7 @@ class worker(Command):
             except KeyError:  # pragma: no cover
                 self.die('Unknown level {0!r}. Please use one of {1}.'.format(
                     loglevel, '|'.join(
-                        l for l in LOG_LEVELS if isinstance(l, string_t))))
+                        l for l in LOG_LEVELS if isinstance(l, str))))
 
         worker = self.app.Worker(
             hostname=hostname, pool_cls=pool_cls, loglevel=loglevel,

+ 8 - 10
celery/bootsteps.py

@@ -9,7 +9,6 @@ from kombu.common import ignore_errors
 from kombu.utils import symbol_by_name
 from kombu.utils.encoding import bytes_to_str
 
-from .five import bytes_if_py2, values, with_metaclass
 from .utils.graph import DependencyGraph, GraphFormatter
 from .utils.imports import instantiate, qualname
 from .utils.log import get_logger
@@ -225,12 +224,12 @@ class Blueprint:
         return self.steps[name]
 
     def _find_last(self):
-        return next((C for C in values(self.steps) if C.last), None)
+        return next((C for C in self.steps.values() if C.last), None)
 
     def _firstpass(self, steps):
-        for step in values(steps):
+        for step in steps.values():
             step.requires = [symbol_by_name(dep) for dep in step.requires]
-        stream = deque(step.requires for step in values(steps))
+        stream = deque(step.requires for step in steps.values())
         while stream:
             for node in stream.popleft():
                 node = symbol_by_name(node)
@@ -241,7 +240,7 @@ class Blueprint:
     def _finalize_steps(self, steps):
         last = self._find_last()
         self._firstpass(steps)
-        it = ((C, C.requires) for C in values(steps))
+        it = ((C, C.requires) for C in steps.values())
         G = self.graph = DependencyGraph(
             it, formatter=self.GraphFormatter(root=last),
         )
@@ -285,14 +284,13 @@ class StepType(type):
         return super(StepType, cls).__new__(cls, name, bases, attrs)
 
     def __str__(self):
-        return bytes_if_py2(self.name)
+        return self.name
 
     def __repr__(self):
-        return bytes_if_py2('step:{0.name}{{{0.requires!r}}}'.format(self))
+        return 'step:{0.name}{{{0.requires!r}}}'.format(self)
 
 
-@with_metaclass(StepType)
-class Step:
+class Step(metaclass=StepType):
     """A Bootstep.
 
     The :meth:`__init__` method is called when the step
@@ -347,7 +345,7 @@ class Step:
         pass
 
     def __repr__(self):
-        return bytes_if_py2('<step: {0.alias}>'.format(self))
+        return '<step: {0.alias}>'.format(self)
 
     @property
     def alias(self):

+ 8 - 9
celery/concurrency/asynpool.py

@@ -24,7 +24,7 @@ import struct
 import sys
 import time
 
-from collections import deque, namedtuple
+from collections import Counter, deque, namedtuple
 from io import BytesIO
 from numbers import Integral
 from pickle import HIGHEST_PROTOCOL
@@ -41,7 +41,6 @@ from kombu.utils import fxrange
 from kombu.utils.eventio import SELECT_BAD_FD
 from vine import promise
 
-from celery.five import Counter, items, values
 from celery.utils.functional import noop
 from celery.utils.log import get_logger
 from celery.worker import state as worker_state
@@ -469,7 +468,7 @@ class AsynPool(_pool.Pool):
 
         # Timers include calling maintain_pool at a regular interval
         # to be certain processes are restarted.
-        for handler, interval in items(self.timers):
+        for handler, interval in self.timers.items():
             hub.call_repeatedly(interval, handler)
 
         hub.on_tick.add(self.on_poll_start)
@@ -569,7 +568,7 @@ class AsynPool(_pool.Pool):
             # job._write_to and job._scheduled_for attributes used to recover
             # message boundaries when processes exit.
             infd = proc.inqW_fd
-            for job in values(cache):
+            for job in cache.values():
                 if job._write_to and job._write_to.inqW_fd == infd:
                     job._write_to = proc
                 if job._scheduled_for and job._scheduled_for.inqW_fd == infd:
@@ -928,7 +927,7 @@ class AsynPool(_pool.Pool):
         if self._state == TERMINATE:
             return
         # cancel all tasks that have not been accepted so that NACK is sent.
-        for job in values(self._cache):
+        for job in self._cache.values():
             if not job._accepted:
                 job._cancel()
 
@@ -947,7 +946,7 @@ class AsynPool(_pool.Pool):
                 # flush outgoing buffers
                 intervals = fxrange(0.01, 0.1, 0.01, repeatlast=True)
                 owned_by = {}
-                for job in values(self._cache):
+                for job in self._cache.values():
                     writer = _get_job_writer(job)
                     if writer is not None:
                         owned_by[writer] = job
@@ -1009,7 +1008,7 @@ class AsynPool(_pool.Pool):
         Here we will find an unused slot, as there should always
         be one available when we start a new process.
         """
-        return next(q for q, owner in items(self._queues)
+        return next(q for q, owner in self._queues.items()
                     if owner is None)
 
     def on_grow(self, n):
@@ -1078,7 +1077,7 @@ class AsynPool(_pool.Pool):
     def human_write_stats(self):
         if self.write_stats is None:
             return 'N/A'
-        vals = list(values(self.write_stats))
+        vals = list(self.write_stats.values())
         total = sum(vals)
 
         def per(v, total):
@@ -1136,7 +1135,7 @@ class AsynPool(_pool.Pool):
     def _find_worker_queues(self, proc):
         """Find the queues owned by ``proc``."""
         try:
-            return next(q for q, owner in items(self._queues)
+            return next(q for q, owner in self._queues.items()
                         if owner == proc)
         except StopIteration:
             raise ValueError(proc)

+ 3 - 3
celery/concurrency/base.py

@@ -6,12 +6,13 @@ import logging
 import os
 import sys
 
+from time import monotonic
+
 from billiard.einfo import ExceptionInfo
 from billiard.exceptions import WorkerLostError
 from kombu.utils.encoding import safe_repr
 
 from celery.exceptions import WorkerShutdown, WorkerTerminate
-from celery.five import monotonic, reraise
 from celery.utils import timer2
 from celery.utils.text import truncate
 from celery.utils.log import get_logger
@@ -36,8 +37,7 @@ def apply_target(target, args=(), kwargs={}, callback=None,
         raise
     except BaseException as exc:
         try:
-            reraise(WorkerLostError, WorkerLostError(repr(exc)),
-                    sys.exc_info()[2])
+            raise WorkerLostError(repr(exc)).with_traceback(sys.exc_info()[2])
         except WorkerLostError:
             callback(ExceptionInfo())
     else:

+ 1 - 2
celery/concurrency/prefork.py

@@ -16,7 +16,6 @@ from celery import signals
 from celery._state import set_default_app, _set_task_join_will_block
 from celery.app import trace
 from celery.concurrency.base import BasePool
-from celery.five import items
 from celery.utils.functional import noop
 from celery.utils.log import get_logger
 
@@ -73,7 +72,7 @@ def process_initializer(app, hostname):
         trace._tasks = app._tasks  # enables fast_trace_task optimization.
     # rebuild execution handler for all tasks.
     from celery.app.trace import build_tracer
-    for name, task in items(app.tasks):
+    for name, task in app.tasks.items():
         task.__trace__ = build_tracer(name, task, app.loader, hostname,
                                       app=app)
     from celery.worker import state as worker_state

+ 5 - 6
celery/contrib/migrate.py

@@ -12,7 +12,6 @@ from kombu.common import maybe_declare
 from kombu.utils.encoding import ensure_bytes
 
 from celery.app import app_or_default
-from celery.five import string, string_t
 from celery.utils.nodenames import worker_direct
 
 __all__ = [
@@ -41,7 +40,7 @@ class State:
     def strtotal(self):
         if not self.total_apx:
             return '?'
-        return string(self.total_apx)
+        return str(self.total_apx)
 
     def __repr__(self):
         if self.filtered:
@@ -113,7 +112,7 @@ def migrate_tasks(source, dest, migrate=migrate_task, app=None,
 
 
 def _maybe_queue(app, q):
-    if isinstance(q, string_t):
+    if isinstance(q, str):
         return app.amqp.queues[q]
     return q
 
@@ -167,7 +166,7 @@ def move(predicate, connection=None, exchange=None, routing_key=None,
     .. code-block:: python
 
         def transform(value):
-            if isinstance(value, string_t):
+            if isinstance(value, str):
                 return Queue(value, Exchange(value), value)
             return value
 
@@ -225,7 +224,7 @@ def task_id_in(ids, body, message):
 
 
 def prepare_queues(queues):
-    if isinstance(queues, string_t):
+    if isinstance(queues, str):
         queues = queues.split(',')
     if isinstance(queues, list):
         queues = dict(tuple(islice(cycle(q.split(':')), None, 2))
@@ -243,7 +242,7 @@ def start_filter(app, conn, filter, limit=None, timeout=1.0,
     queues = prepare_queues(queues)
     consume_from = [_maybe_queue(app, q)
                     for q in consume_from or list(queues)]
-    if isinstance(tasks, string_t):
+    if isinstance(tasks, str):
         tasks = set(tasks.split(','))
     if tasks is None:
         tasks = set()

+ 0 - 2
celery/contrib/rdb.py

@@ -52,8 +52,6 @@ from pdb import Pdb
 
 from billiard.process import current_process
 
-from celery.five import range
-
 __all__ = [
     'CELERY_RDB_HOST', 'CELERY_RDB_PORT', 'DEFAULT_PORT',
     'Rdb', 'debugger', 'set_trace',

+ 1 - 2
celery/contrib/sphinx.py

@@ -30,13 +30,12 @@ Use ``.. autotask::`` to manually document a task.
 """
 from __future__ import absolute_import, unicode_literals
 
-from inspect import formatargspec
+from inspect import formatargspec, getfullargspec
 
 from sphinx.domains.python import PyModulelevel
 from sphinx.ext.autodoc import FunctionDocumenter
 
 from celery.app.task import BaseTask
-from celery.five import getfullargspec
 
 
 class TaskDocumenter(FunctionDocumenter):

+ 1 - 2
celery/events/__init__.py

@@ -22,7 +22,6 @@ from kombu.mixins import ConsumerMixin
 from kombu.utils import cached_property, uuid
 
 from celery.app import app_or_default
-from celery.five import items
 from celery.utils.functional import dictfilter
 from celery.utils.nodenames import anon_nodename
 from celery.utils.timeutils import adjust_timestamp, utcoffset, maybe_s_to_ms
@@ -262,7 +261,7 @@ class EventDispatcher:
                 self._outbound_buffer.clear()
         if groups:
             with self.mutex:
-                for group, events in items(self._group_buffer):
+                for group, events in self._group_buffer.items():
                     self._publish(events, self.producer, '%s.multi' % group)
                     events[:] = []  # list.clear
 

+ 7 - 6
celery/events/cursesmon.py

@@ -15,7 +15,6 @@ from math import ceil
 from celery import VERSION_BANNER
 from celery import states
 from celery.app import app_or_default
-from celery.five import items, values
 from celery.utils.text import abbr, abbrtask
 
 __all__ = ['CursesMonitor', 'evtop']
@@ -207,7 +206,7 @@ class CursesMonitor:  # pragma: no cover
             for subreply in reply:
                 curline = next(y)
 
-                host, response = next(items(subreply))
+                host, response = next(subreply.items())
                 host = '{0}: '.format(host)
                 self.win.addstr(curline, 3, host, curses.A_BOLD)
                 attr = curses.A_NORMAL
@@ -390,7 +389,7 @@ class CursesMonitor:  # pragma: no cover
                         info['result'] = abbr(info['result'], 16)
                     info = ' '.join(
                         '{0}={1}'.format(key, value)
-                        for key, value in items(info)
+                        for key, value in info.items()
                     )
                     detail = '... -> key i'
                 infowin = abbr(info,
@@ -419,8 +418,10 @@ class CursesMonitor:  # pragma: no cover
                 my - 3, x + len(self.info_str),
                 STATUS_SCREEN.format(
                     s=self.state,
-                    w_alive=len([w for w in values(self.state.workers)
-                                 if w.alive]),
+                    w_alive=len([
+                        w for w in self.state.workers.values()
+                        if w.alive
+                    ]),
                     w_all=len(self.state.workers),
                 ),
                 curses.A_DIM,
@@ -479,7 +480,7 @@ class CursesMonitor:  # pragma: no cover
 
     @property
     def workers(self):
-        return [hostname for hostname, w in items(self.state.workers)
+        return [hostname for hostname, w in self.state.workers.items()
                 if w.alive]
 
 

+ 10 - 11
celery/events/state.py

@@ -31,7 +31,6 @@ from kombu.clocks import timetuple
 from kombu.utils import cached_property
 
 from celery import states
-from celery.five import items, values
 from celery.utils.functional import LRUCache, memoize, pass1
 from celery.utils.log import get_logger
 
@@ -187,10 +186,10 @@ class Worker:
 
         def event(type_, timestamp=None,
                   local_received=None, fields=None,
-                  max_drift=HEARTBEAT_DRIFT_MAX, items=items, abs=abs, int=int,
+                  max_drift=HEARTBEAT_DRIFT_MAX, abs=abs, int=int,
                   insort=bisect.insort, len=len):
             fields = fields or {}
-            for k, v in items(fields):
+            for k, v in fields.items():
                 _set(self, k, v)
             if type_ == 'offline':
                 heartbeats[:] = []
@@ -212,7 +211,7 @@ class Worker:
         return event
 
     def update(self, f, **kw):
-        for k, v in items(dict(f, **kw) if kw else f):
+        for k, v in (dict(f, **kw) if kw else f).items():
             setattr(self, k, v)
 
     def __repr__(self):
@@ -294,7 +293,7 @@ class Task:
             self.__dict__.update(kwargs)
 
     def event(self, type_, timestamp=None, local_received=None, fields=None,
-              precedence=states.precedence, items=items,
+              precedence=states.precedence,
               setattr=setattr, task_event_to_state=TASK_EVENT_TO_STATE.get,
               RETRY=states.RETRY):
         fields = fields or {}
@@ -315,7 +314,7 @@ class Task:
             keep = self.merge_rules.get(state)
             if keep is not None:
                 fields = {
-                    k: v for k, v in items(fields) if k in keep
+                    k: v for k, v in fields.items() if k in keep
                 }
         else:
             fields.update(state=state, timestamp=timestamp)
@@ -618,12 +617,12 @@ class State:
     def rebuild_taskheap(self, timetuple=timetuple):
         heap = self._taskheap[:] = [
             timetuple(t.clock, t.timestamp, t.origin, ref(t))
-            for t in values(self.tasks)
+            for t in self.tasks.values()
         ]
         heap.sort()
 
     def itertasks(self, limit=None):
-        for index, row in enumerate(items(self.tasks)):
+        for index, row in enumerate(self.tasks.items()):
             yield row
             if limit and index + 1 >= limit:
                 break
@@ -677,7 +676,7 @@ class State:
 
     def alive_workers(self):
         """Return a list of (seemingly) alive workers."""
-        return (w for w in values(self.workers) if w.alive)
+        return (w for w in self.workers.values() if w.alive)
 
     def __repr__(self):
         return R_STATE.format(self)
@@ -693,9 +692,9 @@ class State:
 
 
 def _serialize_Task_WeakSet_Mapping(mapping):
-    return {name: [t.id for t in tasks] for name, tasks in items(mapping)}
+    return {name: [t.id for t in tasks] for name, tasks in mapping.items()}
 
 
 def _deserialize_Task_WeakSet_Mapping(mapping, tasks):
     return {name: WeakSet(tasks[i] for i in ids if i in tasks)
-            for name, ids in items(mapping or {})}
+            for name, ids in (mapping or {}).items()}

+ 1 - 3
celery/exceptions.py

@@ -4,8 +4,6 @@ from __future__ import absolute_import, unicode_literals
 
 import numbers
 
-from .five import string_t
-
 from billiard.exceptions import (  # noqa
     SoftTimeLimitExceeded, TimeLimitExceeded, WorkerLostError, Terminated,
 )
@@ -64,7 +62,7 @@ class Retry(TaskPredicate):
     def __init__(self, message=None, exc=None, when=None, **kwargs):
         from kombu.utils.encoding import safe_repr
         self.message = message
-        if isinstance(exc, string_t):
+        if isinstance(exc, str):
             self.exc, self.excs = None, exc
         else:
             self.exc, self.excs = exc, safe_repr(exc) if exc else None

+ 5 - 6
celery/loaders/base.py

@@ -13,7 +13,6 @@ from datetime import datetime
 from kombu.utils import json
 
 from celery import signals
-from celery.five import reraise, string_t
 from celery.utils.collections import DictAttribute, force_mapping
 from celery.utils.functional import maybe_list
 from celery.utils.imports import (
@@ -129,7 +128,7 @@ class BaseLoader:
         self.on_worker_process_init()
 
     def config_from_object(self, obj, silent=False):
-        if isinstance(obj, string_t):
+        if isinstance(obj, str):
             try:
                 obj = self._smart_import(obj, imp=self.import_from_cwd)
             except (ImportError, AttributeError):
@@ -159,10 +158,10 @@ class BaseLoader:
             self.find_module(name)
         except NotAPackage:
             if name.endswith('.py'):
-                reraise(NotAPackage, NotAPackage(CONFIG_WITH_SUFFIX.format(
-                    module=name, suggest=name[:-3])), sys.exc_info()[2])
-            reraise(NotAPackage, NotAPackage(CONFIG_INVALID_NAME.format(
-                module=name)), sys.exc_info()[2])
+                 raise NotAPackage(CONFIG_WITH_SUFFIX.format(
+                    module=name, suggest=name[:-3])).with_traceback(sys.exc_info()[2])
+            raise NotAPackage(CONFIG_INVALID_NAME.format(
+                module=name)).with_traceback(sys.exc_info()[2])
         else:
             return self.import_from_cwd(name)
 

+ 2 - 4
celery/local.py

@@ -11,8 +11,6 @@ from __future__ import absolute_import, unicode_literals
 import importlib
 import sys
 
-from .five import bytes_if_py2, string
-
 __all__ = ['Proxy', 'PromiseProxy', 'try_import', 'maybe_evaluate']
 
 __module__ = __name__  # used by Proxy class body
@@ -35,7 +33,7 @@ def _default_cls_attr(name, type_, cls_value):
     def __get__(self, obj, cls=None):
         return self.__getter(obj) if obj is not None else self
 
-    return type(bytes_if_py2(name), (type_,), {
+    return type(name, (type_,), {
         '__new__': __new__, '__get__': __get__,
     })
 
@@ -292,7 +290,7 @@ class Proxy:
 
         def __unicode__(self):
             try:
-                return string(self._get_current_object())
+                return str(self._get_current_object())
             except RuntimeError:  # pragma: no cover
                 return repr(self)
 

+ 6 - 7
celery/platforms.py

@@ -22,7 +22,6 @@ from kombu.utils.encoding import safe_str
 from contextlib import contextmanager
 
 from .local import try_import
-from .five import items, reraise, string_t
 
 try:
     from billiard.process import current_process
@@ -138,7 +137,7 @@ class Pidfile:
         try:
             self.write_pid()
         except OSError as exc:
-            reraise(LockFailed, LockFailed(str(exc)), sys.exc_info()[2])
+            raise LockFailed(str(exc)).with_traceback(sys.exc_info()[2])
         return self
     __enter__ = acquire
 
@@ -293,7 +292,7 @@ class DaemonContext:
     def __init__(self, pidfile=None, workdir=None, umask=None,
                  fake=False, after_chdir=None, after_forkers=True,
                  **kwargs):
-        if isinstance(umask, string_t):
+        if isinstance(umask, str):
             # octal or decimal, depending on initial zero.
             umask = int(umask, 8 if umask.startswith('0') else 10)
         self.workdir = workdir or DAEMON_WORKDIR
@@ -619,8 +618,8 @@ class Signals:
         """Get signal number from signal name."""
         if isinstance(signal_name, numbers.Integral):
             return signal_name
-        if not isinstance(signal_name, string_t) \
-                or not signal_name.isupper():
+        if (not isinstance(signal_name, str) or
+                not signal_name.isupper()):
             raise TypeError('signal name must be uppercase string.')
         if not signal_name.startswith('SIG'):
             signal_name = 'SIG' + signal_name
@@ -658,7 +657,7 @@ class Signals:
 
     def update(self, _d_=None, **sigmap):
         """Set signal handlers from a mapping."""
-        for signal_name, handler in items(dict(_d_ or {}, **sigmap)):
+        for signal_name, handler in dict(_d_ or {}, **sigmap).items():
             self[signal_name] = handler
 
 signals = Signals()
@@ -707,7 +706,7 @@ else:
 
 def get_errno_name(n):
     """Get errno for string, e.g. ``ENOENT``."""
-    if isinstance(n, string_t):
+    if isinstance(n, str):
         return getattr(errno, n)
     return n
 

+ 4 - 4
celery/result.py

@@ -7,6 +7,7 @@ import time
 from collections import OrderedDict, deque
 from contextlib import contextmanager
 from copy import copy
+from time import monotonic
 
 from kombu.utils import cached_property
 from vine import Thenable, barrier, promise
@@ -16,7 +17,6 @@ from . import states
 from ._state import _set_task_join_will_block, task_join_will_block
 from .app import app_or_default
 from .exceptions import ImproperlyConfigured, IncompleteStream, TimeoutError
-from .five import items, range, string_t, monotonic
 from .utils import deprecated
 from .utils.graph import DependencyGraph, GraphFormatter
 
@@ -317,7 +317,7 @@ class AsyncResult(ResultBase):
     def __eq__(self, other):
         if isinstance(other, AsyncResult):
             return other.id == self.id
-        elif isinstance(other, string_t):
+        elif isinstance(other, str):
             return other == self.id
         return NotImplemented
 
@@ -472,7 +472,7 @@ class ResultSet(ResultBase):
         Raises:
             KeyError: if the result is not a member.
         """
-        if isinstance(result, string_t):
+        if isinstance(result, str):
             result = self.app.AsyncResult(result)
         try:
             self.results.remove(result)
@@ -583,7 +583,7 @@ class ResultSet(ResultBase):
 
         while results:
             removed = set()
-            for task_id, result in items(results):
+            for task_id, result in results.items():
                 if result.ready():
                     yield result.get(timeout=timeout and timeout - elapsed,
                                      propagate=propagate)

+ 2 - 3
celery/schedules.py

@@ -12,7 +12,6 @@ from datetime import datetime, timedelta
 from kombu.utils import cached_property
 
 from . import current_app
-from .five import range, string_t
 from .utils.collections import AttributeDict
 from .utils.timeutils import (
     weekday, maybe_timedelta, remaining, humanize_seconds,
@@ -272,7 +271,7 @@ class crontab_parser:
         return list(range(self.min_, self.max_ + self.min_))
 
     def _expand_number(self, s):
-        if isinstance(s, string_t) and s[0] == '-':
+        if isinstance(s, str) and s[0] == '-':
             raise self.ParseException('negative numbers not supported')
         try:
             i = int(s)
@@ -411,7 +410,7 @@ class crontab(schedule):
         """
         if isinstance(cronspec, numbers.Integral):
             result = {cronspec}
-        elif isinstance(cronspec, string_t):
+        elif isinstance(cronspec, str):
             result = crontab_parser(max_, min_).parse(cronspec)
         elif isinstance(cronspec, set):
             result = cronspec

+ 1 - 2
celery/security/certificate.py

@@ -8,7 +8,6 @@ import os
 from kombu.utils.encoding import bytes_to_str
 
 from celery.exceptions import SecurityError
-from celery.five import values
 
 from .utils import crypto, reraise_errors
 
@@ -54,7 +53,7 @@ class CertStore:
 
     def itercerts(self):
         """an iterator over the certificates"""
-        for c in values(self._certs):
+        for c in self._certs.values():
             yield c
 
     def __getitem__(self, id):

+ 1 - 4
celery/security/utils.py

@@ -7,7 +7,6 @@ import sys
 from contextlib import contextmanager
 
 from celery.exceptions import SecurityError
-from celery.five import reraise
 
 try:
     from OpenSSL import crypto
@@ -24,6 +23,4 @@ def reraise_errors(msg='{0!r}', errors=None):
     try:
         yield
     except errors as exc:
-        reraise(SecurityError,
-                SecurityError(msg.format(exc)),
-                sys.exc_info()[2])
+        raise SecurityError(msg.format(exc)).with_traceback(sys.exc_info()[2])

+ 2 - 3
celery/task/base.py

@@ -12,7 +12,7 @@ from kombu import Exchange
 
 from celery import current_app
 from celery.app.task import Context, Task as BaseTask, _reprtask
-from celery.five import class_property, reclassmethod, with_metaclass
+from celery.five import class_property, reclassmethod
 from celery.local import Proxy
 from celery.schedules import maybe_schedule
 from celery.utils.log import get_task_logger
@@ -119,8 +119,7 @@ class TaskType(type):
         return _reprtask(cls)
 
 
-@with_metaclass(TaskType)
-class Task(BaseTask):
+class Task(BaseTask, metaclass=TaskType):
     """Deprecated Task base class.
 
     Modern applications should use :class:`celery.Task` instead.

+ 2 - 3
celery/tests/app/test_amqp.py

@@ -6,7 +6,6 @@ from kombu import Exchange, Queue
 
 from celery import uuid
 from celery.app.amqp import Queues, utf8dict
-from celery.five import keys
 from celery.utils.timeutils import to_utc
 
 from celery.tests.case import AppCase, Mock
@@ -116,13 +115,13 @@ class test_Queues(AppCase):
         q = Queues()
         q.select(['foo', 'bar'])
         q.select_add('baz')
-        self.assertItemsEqual(keys(q._consume_from), ['foo', 'bar', 'baz'])
+        self.assertItemsEqual(q._consume_from.keys(), ['foo', 'bar', 'baz'])
 
     def test_deselect(self):
         q = Queues()
         q.select(['foo', 'bar'])
         q.deselect('bar')
-        self.assertItemsEqual(keys(q._consume_from), ['foo'])
+        self.assertItemsEqual(q._consume_from.keys(), ['foo'])
 
     def test_with_ha_policy_compat(self):
         q = Queues(ha_policy='all')

+ 1 - 2
celery/tests/app/test_app.py

@@ -16,7 +16,6 @@ from celery import _state
 from celery.app import base as _appbase
 from celery.app import defaults
 from celery.exceptions import ImproperlyConfigured
-from celery.five import keys
 from celery.loaders.base import unconfigured
 from celery.platforms import pyimplementation
 from celery.utils.serialization import pickle
@@ -323,7 +322,7 @@ class test_App(AppCase):
         with self.Celery(broker='foo://bar') as app:
             app.conf.worker_agent = 'foo:Bar'
             self.assertFalse(app.configured)
-            self.assertTrue(list(keys(app.conf)))
+            self.assertTrue(list(app.conf.keys()))
             self.assertFalse(app.configured)
             self.assertIn('worker_agent', app.conf)
             self.assertFalse(app.configured)

+ 2 - 3
celery/tests/app/test_beat.py

@@ -7,7 +7,6 @@ from pickle import dumps, loads
 
 from celery import beat
 from celery import uuid
-from celery.five import keys, string_t
 from celery.schedules import schedule
 from celery.utils.objects import Bunch
 
@@ -238,7 +237,7 @@ class test_Scheduler(AppCase):
 
     def test_info(self):
         scheduler = mScheduler(app=self.app)
-        self.assertIsInstance(scheduler.info, string_t)
+        self.assertIsInstance(scheduler.info, str)
 
     def test_maybe_entry(self):
         s = mScheduler(app=self.app)
@@ -440,7 +439,7 @@ class test_Service(AppCase):
         self.assertIsInstance(schedule, dict)
         self.assertIsInstance(s.scheduler, beat.Scheduler)
         scheduled = list(schedule.keys())
-        for task_name in keys(sh['entries']):
+        for task_name in sh['entries'].keys():
             self.assertIn(task_name, scheduled)
 
         s.sync()

+ 0 - 1
celery/tests/app/test_builtins.py

@@ -2,7 +2,6 @@ from __future__ import absolute_import, unicode_literals
 
 from celery import group, chord
 from celery.app import builtins
-from celery.five import range
 from celery.utils.functional import pass1
 
 from celery.tests.case import AppCase, ContextMock, Mock, patch

+ 2 - 3
celery/tests/app/test_defaults.py

@@ -8,7 +8,6 @@ from celery.app.defaults import (
     _OLD_DEFAULTS, _OLD_SETTING_KEYS, _TO_NEW_KEY, _TO_OLD_KEY,
     DEFAULTS, NAMESPACES, SETTING_KEYS
 )
-from celery.five import values
 
 from celery.tests.case import AppCase, mock
 
@@ -46,8 +45,8 @@ class test_defaults(AppCase):
         self.assertFalse(any(key.islower() for key in _TO_NEW_KEY))
         self.assertFalse(any(key.isupper() for key in SETTING_KEYS))
         self.assertFalse(any(key.islower() for key in _OLD_SETTING_KEYS))
-        self.assertFalse(any(value.isupper() for value in values(_TO_NEW_KEY)))
-        self.assertFalse(any(value.islower() for value in values(_TO_OLD_KEY)))
+        self.assertFalse(any(value.isupper() for value in _TO_NEW_KEY.values()))
+        self.assertFalse(any(value.islower() for value in _TO_OLD_KEY.values()))
 
         for key in _TO_NEW_KEY:
             self.assertIn(key, _OLD_SETTING_KEYS)

+ 1 - 2
celery/tests/app/test_loaders.py

@@ -6,7 +6,6 @@ import warnings
 
 from celery import loaders
 from celery.exceptions import NotConfigured
-from celery.five import bytes_if_py2
 from celery.loaders import base
 from celery.loaders import default
 from celery.loaders.app import AppLoader
@@ -136,7 +135,7 @@ class test_DefaultLoader(AppCase):
             pass
 
         configname = os.environ.get('CELERY_CONFIG_MODULE') or 'celeryconfig'
-        celeryconfig = ConfigModule(bytes_if_py2(configname))
+        celeryconfig = ConfigModule(configname)
         celeryconfig.imports = ('os', 'sys')
 
         prevconfig = sys.modules.get(configname)

+ 1 - 2
celery/tests/app/test_schedules.py

@@ -6,7 +6,6 @@ from contextlib import contextmanager
 from datetime import datetime, timedelta
 from pickle import dumps, loads
 
-from celery.five import items
 from celery.schedules import (
     ParseException, crontab, crontab_parser, schedule, solar,
 )
@@ -610,7 +609,7 @@ class test_crontab_is_due(AppCase):
         l2, d2, n2 = due.remaining_delta(last_ran, ffwd=relativedelta)
         if not isinstance(d1, relativedelta):
             self.assertEqual(l1, l2)
-            for field, value in items(d1._fields()):
+            for field, value in d1._fields().items():
                 self.assertEqual(getattr(d1, field), value)
             self.assertFalse(d2.years)
             self.assertFalse(d2.months)

+ 1 - 1
celery/tests/backends/test_amqp.py

@@ -5,13 +5,13 @@ import pickle
 from contextlib import contextmanager
 from datetime import timedelta
 from pickle import dumps, loads
+from queue import Empty, Queue
 
 from billiard.einfo import ExceptionInfo
 
 from celery import states
 from celery import uuid
 from celery.backends.amqp import AMQPBackend
-from celery.five import Empty, Queue, range
 from celery.result import AsyncResult
 
 from celery.tests.case import AppCase, Mock, depends_on_current_app, mock

+ 4 - 20
celery/tests/backends/test_base.py

@@ -6,7 +6,6 @@ import types
 from contextlib import contextmanager
 
 from celery.exceptions import ChordError, TimeoutError
-from celery.five import items, bytes_if_py2, range
 from celery.utils import serialization
 from celery.utils.serialization import subclass_exception
 from celery.utils.serialization import find_pickleable_exception as fnpe
@@ -32,19 +31,9 @@ class wrapobject:
     def __init__(self, *args, **kwargs):
         self.args = args
 
-if sys.version_info[0] == 3 or getattr(sys, 'pypy_version_info', None):
-    Oldstyle = None
-else:
-    Oldstyle = types.ClassType(bytes_if_py2('Oldstyle'), (), {})
-Unpickleable = subclass_exception(
-    bytes_if_py2('Unpickleable'), KeyError, 'foo.module',
-)
-Impossible = subclass_exception(
-    bytes_if_py2('Impossible'), object, 'foo.module',
-)
-Lookalike = subclass_exception(
-    bytes_if_py2('Lookalike'), wrapobject, 'foo.module',
-)
+Unpickleable = subclass_exception('Unpickleable', KeyError, 'foo.module')
+Impossible = subclass_exception('Impossible', object, 'foo.module')
+Lookalike = subclass_exception('Lookalike', wrapobject, 'foo.module')
 
 
 class test_nulldict(Case):
@@ -91,11 +80,6 @@ class test_BaseBackend_interface(AppCase):
 
 class test_exception_pickle(AppCase):
 
-    @skip.if_python3(reason='does not support old style classes')
-    @skip.if_pypy()
-    def test_oldstyle(self):
-        self.assertTrue(fnpe(Oldstyle()))
-
     def test_BaseException(self):
         self.assertIsNone(fnpe(Exception()))
 
@@ -355,7 +339,7 @@ class test_KeyValueStoreBackend(AppCase):
         for is_dict in True, False:
             self.b.mget_returns_dict = is_dict
             ids = {uuid(): i for i in range(10)}
-            for id, i in items(ids):
+            for id, i in ids.items():
                 self.b.mark_as_done(id, i)
             it = self.b.get_many(list(ids))
             for i, (got_id, got_state) in enumerate(it):

+ 6 - 7
celery/tests/backends/test_cache.py

@@ -11,7 +11,6 @@ from celery import states
 from celery import group, signature, uuid
 from celery.backends.cache import CacheBackend, DummyClient, backends
 from celery.exceptions import ImproperlyConfigured
-from celery.five import items, bytes_if_py2, string, text_t
 
 from celery.tests.case import AppCase, Mock, mock, patch, skip
 
@@ -153,7 +152,7 @@ class MemcachedClient(DummyClient):
         if PY3:
             key_t, must_be, not_be, cod = bytes, 'string', 'bytes', 'decode'
         else:
-            key_t, must_be, not_be, cod = text_t, 'bytes', 'string', 'encode'
+            key_t, must_be, not_be, cod = str, 'bytes', 'string', 'encode'
         if isinstance(key, key_t):
             raise MyMemcachedStringEncodingError(
                 'Keys must be {0}, not {1}.  Convert your '
@@ -166,7 +165,7 @@ class MockCacheMixin:
 
     @contextmanager
     def mock_memcache(self):
-        memcache = types.ModuleType(bytes_if_py2('memcache'))
+        memcache = types.ModuleType('memcache')
         memcache.Client = MemcachedClient
         memcache.Client.__module__ = memcache.__name__
         prev, sys.modules['memcache'] = sys.modules.get('memcache'), memcache
@@ -178,7 +177,7 @@ class MockCacheMixin:
 
     @contextmanager
     def mock_pylibmc(self):
-        pylibmc = types.ModuleType(bytes_if_py2('pylibmc'))
+        pylibmc = types.ModuleType('pylibmc')
         pylibmc.Client = MemcachedClient
         pylibmc.Client.__module__ = pylibmc.__name__
         prev = sys.modules.get('pylibmc')
@@ -229,7 +228,7 @@ class test_get_best_memcache(AppCase, MockCacheMixin):
     def test_backends(self):
         from celery.backends.cache import backends
         with self.mock_memcache():
-            for name, fun in items(backends):
+            for name, fun in backends.items():
                 self.assertTrue(fun())
 
 
@@ -241,7 +240,7 @@ class test_memcache_key(AppCase, MockCacheMixin):
                 with mock.mask_modules('pylibmc'):
                     from celery.backends import cache
                     cache._imp = [None]
-                    task_id, result = string(uuid()), 42
+                    task_id, result = str(uuid()), 42
                     b = cache.CacheBackend(backend='memcache', app=self.app)
                     b.store_result(task_id, result, state=states.SUCCESS)
                     self.assertEqual(b.get_result(task_id), result)
@@ -262,7 +261,7 @@ class test_memcache_key(AppCase, MockCacheMixin):
             with self.mock_pylibmc():
                 from celery.backends import cache
                 cache._imp = [None]
-                task_id, result = string(uuid()), 42
+                task_id, result = str(uuid()), 42
                 b = cache.CacheBackend(backend='memcache', app=self.app)
                 b.store_result(task_id, result, state=states.SUCCESS)
                 self.assertEqual(b.get_result(task_id), result)

+ 1 - 2
celery/tests/bin/test_base.py

@@ -8,7 +8,6 @@ from celery.bin.base import (
     Extensions,
     HelpFormatter,
 )
-from celery.five import bytes_if_py2
 from celery.utils.objects import Bunch
 
 from celery.tests.case import (
@@ -353,7 +352,7 @@ class test_Command(AppCase):
         cmd = MockCommand(app=self.app)
         with patch('celery.bin.base.symbol_by_name') as sbn:
             from types import ModuleType
-            x = ModuleType(bytes_if_py2('proj'))
+            x = ModuleType('proj')
 
             def on_sbn(*args, **kwargs):
 

+ 0 - 1
celery/tests/concurrency/test_prefork.py

@@ -7,7 +7,6 @@ import socket
 from itertools import cycle
 
 from celery.app.defaults import DEFAULTS
-from celery.five import range
 from celery.utils.collections import AttributeDict
 from celery.utils.functional import noop
 from celery.utils.objects import Bunch

+ 0 - 1
celery/tests/events/test_state.py

@@ -18,7 +18,6 @@ from celery.events.state import (
     Task,
     heartbeat_expires,
 )
-from celery.five import range
 from celery.tests.case import AppCase, Mock, patch, skip
 
 try:

+ 2 - 1
celery/tests/security/test_security.py

@@ -14,10 +14,11 @@ Generated with:
 """
 from __future__ import absolute_import, unicode_literals
 
+import builtins
+
 from kombu.serialization import disable_insecure_serializers
 
 from celery.exceptions import ImproperlyConfigured, SecurityError
-from celery.five import builtins
 from celery.security import disable_untrusted_serializers, setup_security
 from celery.security.utils import reraise_errors
 from kombu.serialization import registry

+ 0 - 1
celery/tests/tasks/test_chord.py

@@ -6,7 +6,6 @@ from celery import group, uuid
 from celery import canvas
 from celery import result
 from celery.exceptions import ChordError, Retry
-from celery.five import range
 from celery.result import AsyncResult, GroupResult, EagerResult
 from celery.tests.case import AppCase, Mock
 

+ 0 - 1
celery/tests/tasks/test_result.py

@@ -9,7 +9,6 @@ from celery.backends.base import SyncBackendMixin
 from celery.exceptions import (
     ImproperlyConfigured, IncompleteStream, TimeoutError,
 )
-from celery.five import range
 from celery.result import (
     AsyncResult,
     EagerResult,

+ 3 - 4
celery/tests/tasks/test_tasks.py

@@ -9,7 +9,6 @@ from celery import Task
 from celery import group, uuid
 from celery.app.task import _reprtask
 from celery.exceptions import Ignore, Retry
-from celery.five import items, range, string_t
 from celery.result import EagerResult
 from celery.utils.timeutils import parse_iso8601
 
@@ -307,14 +306,14 @@ class test_tasks(TasksCase):
         self.assertEqual(task_data['task'], task_name)
         task_kwargs = task_data.get('kwargs', {})
         if test_eta:
-            self.assertIsInstance(task_data.get('eta'), string_t)
+            self.assertIsInstance(task_data.get('eta'), str)
             to_datetime = parse_iso8601(task_data.get('eta'))
             self.assertIsInstance(to_datetime, datetime)
         if test_expires:
-            self.assertIsInstance(task_data.get('expires'), string_t)
+            self.assertIsInstance(task_data.get('expires'), str)
             to_datetime = parse_iso8601(task_data.get('expires'))
             self.assertIsInstance(to_datetime, datetime)
-        for arg_name, arg_value in items(kwargs):
+        for arg_name, arg_value in kwargs.items():
             self.assertEqual(task_kwargs.get(arg_name), arg_value)
 
     def test_incomplete_task_cls(self):

+ 1 - 2
celery/tests/utils/test_collections.py

@@ -16,7 +16,6 @@ from celery.utils.collections import (
     LimitedSet,
     Messagebuffer,
 )
-from celery.five import items
 from celery.utils.objects import Bunch
 
 from celery.tests.case import Case, skip
@@ -101,7 +100,7 @@ class test_ConfigurationView(Case):
         expected = {'changed_key': 1,
                     'default_key': 1,
                     'both': 2}
-        self.assertDictEqual(dict(items(self.view)), expected)
+        self.assertDictEqual(dict(self.view.items()), expected)
         self.assertItemsEqual(list(iter(self.view)),
                               list(expected.keys()))
         self.assertItemsEqual(list(self.view.keys()), list(expected.keys()))

+ 1 - 2
celery/tests/utils/test_functional.py

@@ -2,7 +2,6 @@ from __future__ import absolute_import, unicode_literals
 
 from kombu.utils.functional import lazy
 
-from celery.five import range, nextfun
 from celery.utils.functional import (
     DummyContext,
     fun_takes_argument,
@@ -89,7 +88,7 @@ class test_mlazy(Case):
     def test_is_memoized(self):
 
         it = iter(range(20, 30))
-        p = mlazy(nextfun(it))
+        p = mlazy(next(it))
         self.assertEqual(p(), 20)
         self.assertTrue(p.evaluated)
         self.assertEqual(p(), 20)

+ 2 - 2
celery/tests/utils/test_graph.py

@@ -1,6 +1,6 @@
 from __future__ import absolute_import, unicode_literals
 
-from celery.five import WhateverIO, items
+from celery.five import WhateverIO
 from celery.utils.graph import DependencyGraph
 
 from celery.tests.case import Case, Mock
@@ -53,7 +53,7 @@ class test_DependencyGraph(Case):
 
     def test_items(self):
         self.assertDictEqual(
-            dict(items(self.graph1())),
+            dict(self.graph1().items()),
             {'A': [], 'B': [], 'C': ['A'], 'D': ['C', 'B']},
         )
 

+ 1 - 3
celery/tests/utils/test_imports.py

@@ -1,7 +1,5 @@
 from __future__ import absolute_import, unicode_literals
 
-from celery.five import bytes_if_py2
-
 from celery.utils.imports import (
     NotAPackage,
     qualname,
@@ -25,7 +23,7 @@ class test_import_utils(Case):
         self.assertTrue(find_module('celery.worker.request'))
 
     def test_qualname(self):
-        Class = type(bytes_if_py2('Fox'), (object,), {
+        Class = type('Fox', (object,), {
             '__module__': 'quick.brown',
         })
         self.assertEqual(qualname(Class), 'quick.brown.Fox')

+ 2 - 5
celery/tests/utils/test_local.py

@@ -2,7 +2,6 @@ from __future__ import absolute_import, unicode_literals
 
 import sys
 
-from celery.five import string, long_t
 from celery.local import (
     Proxy,
     PromiseProxy,
@@ -94,10 +93,10 @@ class test_Proxy(Case):
                 return 'REPR'
 
         x = Proxy(lambda: X())
-        self.assertEqual(string(x), 'UNICODE')
+        self.assertEqual(str(x), 'UNICODE')
         del(X.__unicode__)
         del(X.__str__)
-        self.assertEqual(string(x), 'REPR')
+        self.assertEqual(str(x), 'REPR')
 
     def test_dir(self):
 
@@ -267,8 +266,6 @@ class test_Proxy(Case):
         x = Proxy(lambda: 10)
         self.assertEqual(type(x.__float__()), float)
         self.assertEqual(type(x.__int__()), int)
-        if not PY3:
-            self.assertEqual(type(x.__long__()), long_t)
         self.assertTrue(hex(x))
         self.assertTrue(oct(x))
 

+ 0 - 1
celery/tests/utils/test_saferef.py

@@ -1,6 +1,5 @@
 from __future__ import absolute_import, unicode_literals
 
-from celery.five import range
 from celery.utils.dispatch.saferef import safe_ref
 from celery.tests.case import Case
 

+ 4 - 7
celery/tests/utils/test_saferepr.py

@@ -5,8 +5,6 @@ import re
 from decimal import Decimal
 from pprint import pprint
 
-from celery.five import items, long_t, text_t, values
-
 from celery.utils.saferepr import saferepr
 
 from celery.tests.case import Case
@@ -15,10 +13,9 @@ D_NUMBERS = {
     b'integer': 1,
     b'float': 1.3,
     b'decimal': Decimal('1.3'),
-    b'long': long_t(4),
     b'complex': complex(13.3),
 }
-D_INT_KEYS = {v: k for k, v in items(D_NUMBERS)}
+D_INT_KEYS = {v: k for k, v in D_NUMBERS.items()}
 
 QUICK_BROWN_FOX = 'The quick brown fox jumps over the lazy dog.'
 B_QUICK_BROWN_FOX = b'The quick brown fox jumps over the lazy dog.'
@@ -30,7 +27,7 @@ D_TEXT = {
     b'xuzzy': B_QUICK_BROWN_FOX,
 }
 
-L_NUMBERS = list(values(D_NUMBERS))
+L_NUMBERS = list(D_NUMBERS.values())
 
 D_TEXT_LARGE = {
     b'bazxuzzyfoobarlongverylonglong': QUICK_BROWN_FOX * 30,
@@ -55,7 +52,7 @@ RE_LONG_SUFFIX = re.compile(r'(\d)+L')
 
 
 def old_repr(s):
-    return text_t(RE_LONG_SUFFIX.sub(
+    return str(RE_LONG_SUFFIX.sub(
         r'\1',
         RE_EMPTY_SET_REPR.sub(
             RE_EMPTY_SET_REPR_REPLACE,
@@ -122,7 +119,7 @@ class dict3(dict):
 class test_saferepr(Case):
 
     def test_safe_types(self):
-        for value in values(D_NUMBERS):
+        for value in D_NUMBERS.values():
             self.assertEqual(saferepr(value), old_repr(value))
 
     def test_numbers_dict(self):

+ 1 - 2
celery/tests/utils/test_term.py

@@ -5,7 +5,6 @@ import sys
 
 from celery.utils import term
 from celery.utils.term import colored, fg
-from celery.five import text_t
 
 from celery.tests.case import Case, skip
 
@@ -53,7 +52,7 @@ class test_colored(Case):
         self.assertTrue(str(colored().iwhite('f')))
         self.assertTrue(str(colored().reset('f')))
 
-        self.assertTrue(text_t(colored().green('∂bar')))
+        self.assertTrue(str(colored().green('∂bar')))
 
         self.assertTrue(
             colored().red('éefoo') + colored().green('∂bar'))

+ 1 - 1
celery/tests/worker/test_control.py

@@ -5,11 +5,11 @@ import socket
 
 from collections import defaultdict
 from datetime import datetime, timedelta
+from queue import Queue as FastQueue
 
 from kombu import pidbox
 from kombu.utils import uuid
 
-from celery.five import Queue as FastQueue
 from celery.utils.timer2 import Timer
 from celery.worker import WorkController as _WC
 from celery.worker import consumer

+ 2 - 1
celery/tests/worker/test_loops.py

@@ -3,13 +3,14 @@ from __future__ import absolute_import, unicode_literals
 import errno
 import socket
 
+from queue import Empty
+
 from kombu.async import Hub, READ, WRITE, ERR
 
 from celery.bootsteps import CLOSE, RUN
 from celery.exceptions import (
     InvalidTaskError, WorkerLostError, WorkerShutdown, WorkerTerminate,
 )
-from celery.five import Empty
 from celery.platforms import EX_FAILURE
 from celery.worker import state
 from celery.worker.consumer import Consumer

+ 1 - 1
celery/tests/worker/test_request.py

@@ -8,6 +8,7 @@ import socket
 import sys
 
 from datetime import datetime, timedelta
+from time import monotonic
 
 from billiard.einfo import ExceptionInfo
 from kombu.utils import uuid
@@ -33,7 +34,6 @@ from celery.exceptions import (
     Terminated,
     WorkerLostError,
 )
-from celery.five import monotonic
 from celery.signals import task_revoked
 from celery.worker import request as module
 from celery.worker.request import (

+ 1 - 1
celery/tests/worker/test_worker.py

@@ -8,6 +8,7 @@ from collections import deque
 from datetime import datetime, timedelta
 from functools import partial
 from threading import Event
+from queue import Empty, Queue as FastQueue
 
 from amqp import ChannelError
 from kombu import Connection
@@ -22,7 +23,6 @@ from celery.exceptions import (
     WorkerShutdown, WorkerTerminate, TaskRevokedError,
     InvalidTaskError, ImproperlyConfigured,
 )
-from celery.five import Empty, range, Queue as FastQueue
 from celery.platforms import EX_FAILURE
 from celery import worker as worker_module
 from celery.worker import components

+ 1 - 4
celery/utils/abstract.py

@@ -5,8 +5,6 @@ from __future__ import absolute_import, unicode_literals
 from abc import ABCMeta, abstractmethod, abstractproperty
 from collections import Callable
 
-from celery.five import with_metaclass
-
 __all__ = ['CallableTask', 'CallableSignature']
 
 
@@ -14,8 +12,7 @@ def _hasattr(C, attr):
     return any(attr in B.__dict__ for B in C.__mro__)
 
 
-@with_metaclass(ABCMeta)
-class _AbstractClass:
+class _AbstractClass(metaclass=ABCMeta)
     __required_attributes__ = frozenset()
 
     @classmethod

+ 18 - 55
celery/utils/collections.py

@@ -11,8 +11,7 @@ from collections import (
 )
 from heapq import heapify, heappush, heappop
 from itertools import chain, count
-
-from celery.five import Empty, items, keys, values
+from queue import Empty
 
 from .functional import first, uniq
 from .text import match_case
@@ -56,7 +55,7 @@ def lpmerge(L, R):
     Keeps values from `L`, if the value in `R` is :const:`None`.
     """
     setitem = L.__setitem__
-    [setitem(k, v) for k, v in items(R) if v is not None]
+    [setitem(k, v) for k, v in R.items() if v is not None]
     return L
 
 
@@ -66,7 +65,7 @@ class OrderedDict(_OrderedDict):
         def _LRUkey(self):
             # return value of od.keys does not support __next__,
             # but this version will also not create a copy of the list.
-            return next(iter(keys(self)))
+            return next(iter(self.keys()))
     else:
         if _dict_is_ordered:  # pragma: no cover
             def _LRUkey(self):
@@ -172,37 +171,19 @@ class DictAttribute:
     def __contains__(self, key):
         return hasattr(self.obj, key)
 
-    def _iterate_keys(self):
+    def keys(self):
         return iter(dir(self.obj))
-    iterkeys = _iterate_keys
 
     def __iter__(self):
-        return self._iterate_keys()
+        return self.keys()
 
-    def _iterate_items(self):
-        for key in self._iterate_keys():
+    def items(self):
+        for key in self.keys():
             yield key, getattr(self.obj, key)
-    iteritems = _iterate_items
 
-    def _iterate_values(self):
-        for key in self._iterate_keys():
+    def values(self):
+        for key in self.keys():
             yield getattr(self.obj, key)
-    itervalues = _iterate_values
-
-    if sys.version_info[0] == 3:  # pragma: no cover
-        items = _iterate_items
-        keys = _iterate_keys
-        values = _iterate_values
-    else:
-
-        def keys(self):
-            return list(self)
-
-        def items(self):
-            return list(self._iterate_items())
-
-        def values(self):
-            return list(self._iterate_values())
 MutableMapping.register(DictAttribute)
 
 
@@ -271,7 +252,7 @@ class ChainMap(MutableMapping):
         return len(set().union(*self.maps))
 
     def __iter__(self):
-        return self._iterate_keys()
+        return self.keys()
 
     def __contains__(self, key):
         key = self._key(key)
@@ -309,32 +290,14 @@ class ChainMap(MutableMapping):
         # changes take precedence.
         return chain(*[op(d) for d in reversed(self.maps)])
 
-    def _iterate_keys(self):
+    def keys(self):
         return uniq(self._iter(lambda d: d.keys()))
-    iterkeys = _iterate_keys
 
-    def _iterate_items(self):
+    def items(self):
         return ((key, self[key]) for key in self)
-    iteritems = _iterate_items
 
-    def _iterate_values(self):
+    def values(self):
         return (self[key] for key in self)
-    itervalues = _iterate_values
-
-    if sys.version_info[0] == 3:  # pragma: no cover
-        keys = _iterate_keys
-        items = _iterate_items
-        values = _iterate_values
-
-    else:  # noqa
-        def keys(self):
-            return list(self._iterate_keys())
-
-        def items(self):
-            return list(self._iterate_items())
-
-        def values(self):
-            return list(self._iterate_values())
 
 
 class ConfigurationView(ChainMap, AttributeDictMixin):
@@ -494,7 +457,7 @@ class LimitedSet:
 
     def _refresh_heap(self):
         """Time consuming recreating of heap. Do not run this too often."""
-        self._heap[:] = [entry for entry in values(self._data)]
+        self._heap[:] = [entry for entry in self._data.values()]
         heapify(self._heap)
 
     def _maybe_refresh_heap(self):
@@ -527,7 +490,7 @@ class LimitedSet:
             self.purge()
         elif isinstance(other, dict):
             # revokes are sent as a dict
-            for key, inserted in items(other):
+            for key, inserted in other.items():
                 if isinstance(inserted, (tuple, list)):
                     # in case someone uses ._data directly for sending update
                     inserted = inserted[0]
@@ -594,7 +557,7 @@ class LimitedSet:
             >>> r == s
             True
         """
-        return {key: inserted for inserted, key in values(self._data)}
+        return {key: inserted for inserted, key in self._data.values()}
 
     def __eq__(self, other):
         return self._data == other._data
@@ -608,7 +571,7 @@ class LimitedSet:
         )
 
     def __iter__(self):
-        return (i for _, i in sorted(values(self._data)))
+        return (i for _, i in sorted(self._data.values()))
 
     def __len__(self):
         return len(self._data)
@@ -730,7 +693,7 @@ class BufferMap(OrderedDict, Evictable):
         self.bufmaxsize = 1000
         if iterable:
             self.update(iterable)
-        self.total = sum(len(buf) for buf in items(self))
+        self.total = sum(len(buf) for buf in self.items())
 
     def put(self, key, item):
         self._get_or_create_buffer(key).put(item)

+ 2 - 2
celery/utils/debug.py

@@ -10,7 +10,7 @@ from contextlib import contextmanager
 from functools import partial
 from pprint import pprint
 
-from celery.five import WhateverIO, items, range
+from celery.five import WhateverIO
 from celery.platforms import signals
 
 try:
@@ -175,7 +175,7 @@ def cry(out=None, sepchr='=', seplen=49):  # pragma: no cover
     tmap = {t.ident: t for t in threading.enumerate()}
 
     sep = sepchr * seplen
-    for tid, frame in items(sys._current_frames()):
+    for tid, frame in sys._current_frames().items():
         thread = tmap.get(tid)
         if not thread:
             # skip old junk (left-overs from a fork)

+ 1 - 2
celery/utils/dispatch/signal.py

@@ -4,7 +4,6 @@ from __future__ import absolute_import, unicode_literals
 
 import weakref
 
-from celery.five import range, text_t
 from celery.local import PromiseProxy, Proxy
 from celery.utils.log import get_logger
 
@@ -20,7 +19,7 @@ WEAKREF_TYPES = (weakref.ReferenceType, saferef.BoundMethodWeakref)
 def _make_id(target):  # pragma: no cover
     if isinstance(target, Proxy):
         target = target._get_current_object()
-    if isinstance(target, (bytes, text_t)):
+    if isinstance(target, (bytes, str)):
         # see Issue #2475
         return target
     if hasattr(target, '__func__'):

+ 2 - 3
celery/utils/functional.py

@@ -4,8 +4,9 @@ from __future__ import absolute_import, print_function, unicode_literals
 
 import sys
 
+from collections import UserList
 from functools import partial
-from inspect import isfunction
+from inspect import getfullargspec, isfunction
 from itertools import chain, islice
 
 from kombu.utils.functional import (
@@ -14,8 +15,6 @@ from kombu.utils.functional import (
 )
 from vine import promise
 
-from celery.five import UserList, getfullargspec, range
-
 __all__ = [
     'LRUCache', 'is_list', 'maybe_list', 'memoize', 'mlazy', 'noop',
     'first', 'firstmethod', 'chunks', 'padlist', 'mattrgetter', 'uniq',

+ 5 - 8
celery/utils/graph.py

@@ -7,8 +7,6 @@ from textwrap import dedent
 
 from kombu.utils.encoding import safe_str, bytes_to_str
 
-from celery.five import items
-
 __all__ = ['DOT', 'CycleError', 'DependencyGraph', 'GraphFormatter']
 
 
@@ -105,7 +103,7 @@ class DependencyGraph:
 
     def edges(self):
         """Return generator that yields for all edges in the graph."""
-        return (obj for obj, adj in items(self) if adj)
+        return (obj for obj, adj in self.items() if adj)
 
     def _khan62(self):
         """Khans simple topological sort algorithm from '62
@@ -183,7 +181,7 @@ class DependencyGraph:
                 seen.add(draw.label(obj))
 
         P(draw.head())
-        for obj, adjacent in items(self):
+        for obj, adjacent in self.items():
             if not adjacent:
                 if_not_seen(draw.terminal_node, obj)
             for req in adjacent:
@@ -206,9 +204,8 @@ class DependencyGraph:
     def __contains__(self, obj):
         return obj in self.adjacent
 
-    def _iterate_items(self):
-        return items(self.adjacent)
-    items = iteritems = _iterate_items
+    def items(self):
+        return self.adjacent.items()
 
     def __repr__(self):
         return '\n'.join(self.repr_node(N) for N in self)
@@ -265,7 +262,7 @@ class GraphFormatter:
     def attrs(self, d, scheme=None):
         d = dict(self.scheme, **dict(scheme, **d or {}) if scheme else d)
         return self._attrsep.join(
-            safe_str(self.attr(k, v)) for k, v in items(d)
+            safe_str(self.attr(k, v)) for k, v in d.items()
         )
 
     def head(self, **attrs):

+ 1 - 2
celery/utils/imports.py

@@ -8,11 +8,10 @@ import os
 import sys
 
 from contextlib import contextmanager
+from imp import reload
 
 from kombu.utils import symbol_by_name
 
-from celery.five import reload
-
 #: Billiard sets this when execv is enabled.
 #: We use it to find out the name of the original ``__main__``
 #: module, so that we can properly rewrite the name of the

+ 3 - 6
celery/utils/log.py

@@ -10,12 +10,9 @@ import threading
 import traceback
 
 from contextlib import contextmanager
-from kombu.five import values
 from kombu.log import get_logger as _get_logger, LOG_LEVELS
 from kombu.utils.encoding import safe_str
 
-from celery.five import string_t, text_t
-
 from .term import colored
 
 __all__ = [
@@ -46,7 +43,7 @@ def set_in_sighandler(value):
 
 def iter_open_logger_fds():
     seen = set()
-    loggers = (list(values(logging.Logger.manager.loggerDict)) +
+    loggers = (list(logging.Logger.manager.loggerDict.values()) +
                [logging.getLogger(None)])
     for logger in loggers:
         try:
@@ -148,8 +145,8 @@ class ColorFormatter(logging.Formatter):
                 # so need to reorder calls based on type.
                 # Issue #427
                 try:
-                    if isinstance(msg, string_t):
-                        return text_t(color(safe_str(msg)))
+                    if isinstance(msg, str):
+                        return str(color(safe_str(msg)))
                     return safe_str(color(msg))
                 except UnicodeDecodeError:  # pragma: no cover
                     return safe_str(msg)  # skip colors

+ 3 - 5
celery/utils/saferepr.py

@@ -24,8 +24,6 @@ from pprint import _recursion
 
 from kombu.utils.encoding import bytes_to_str
 
-from celery.five import items, text_t
-
 from .text import truncate, truncate_bytes
 
 __all__ = ['saferepr', 'reprstream']
@@ -43,7 +41,7 @@ _key = namedtuple('_key', ('value',))
 _quoted = namedtuple('_quoted', ('value',))
 _dirty = namedtuple('_dirty', ('objid',))
 
-chars_t = (bytes, text_t)
+chars_t = (bytes, str)
 safe_t = (Number,)
 set_t = (frozenset, set)
 
@@ -70,7 +68,7 @@ def _chaindict(mapping,
                LIT_DICT_KVSEP=LIT_DICT_KVSEP,
                LIT_LIST_SEP=LIT_LIST_SEP):
     size = len(mapping)
-    for i, (k, v) in enumerate(items(mapping)):
+    for i, (k, v) in enumerate(mapping.items()):
         yield _key(k)
         yield LIT_DICT_KVSEP
         yield v
@@ -155,7 +153,7 @@ def reprstream(stack, seen=None, maxlevels=3, level=0, isinstance=isinstance):
             elif isinstance(val, Decimal):
                 yield repr(val), it
             elif isinstance(val, safe_t):
-                yield text_t(val), it
+                yield str(val), it
             elif isinstance(val, chars_t):
                 yield _quoted(val), it
             elif isinstance(val, range_t):  # pragma: no cover

+ 5 - 7
celery/utils/serialization.py

@@ -13,8 +13,6 @@ from itertools import takewhile
 
 from kombu.utils.encoding import bytes_to_str, str_to_bytes
 
-from celery.five import bytes_if_py2, items, reraise, string_t
-
 from .encoding import safe_repr
 
 try:
@@ -37,7 +35,7 @@ except NameError:  # pragma: no cover
 
 
 def subclass_exception(name, parent, module):  # noqa
-    return type(bytes_if_py2(name), (parent,), {'__module__': module})
+    return type(name, (parent,), {'__module__': module})
 
 
 def find_pickleable_exception(exc, loads=pickle.loads,
@@ -181,7 +179,7 @@ def strtobool(term, table={'false': False, 'no': False, '0': False,
                            'on': True, 'off': False}):
     """Convert common terms for true/false to bool
     (true/false/yes/no/on/off/1/0)."""
-    if isinstance(term, string_t):
+    if isinstance(term, str):
         try:
             return table[term.lower()]
         except KeyError:
@@ -190,7 +188,7 @@ def strtobool(term, table={'false': False, 'no': False, '0': False,
 
 
 def jsonify(obj,
-            builtin_types=(numbers.Real, string_t), key=None,
+            builtin_types=(numbers.Real, str), key=None,
             keyfilter=None,
             unknown_type_filter=None):
     """Transforms object making it suitable for json serialization"""
@@ -208,7 +206,7 @@ def jsonify(obj,
         return [_jsonify(v) for v in obj]
     elif isinstance(obj, dict):
         return {
-            k: _jsonify(v, key=k) for k, v in items(obj)
+            k: _jsonify(v, key=k) for k, v in obj.items()
             if (keyfilter(k) if keyfilter else 1)
         }
     elif isinstance(obj, datetime.datetime):
@@ -242,7 +240,7 @@ def maybe_reraise():
     exc_info = sys.exc_info()
     try:
         if exc_info[2]:
-            reraise(exc_info[0], exc_info[1], exc_info[2])
+            raise
     finally:
         # see http://docs.python.org/library/sys.html#sys.exc_info
         del(exc_info)

+ 8 - 10
celery/utils/term.py

@@ -6,8 +6,6 @@ import platform
 
 from functools import reduce
 
-from celery.five import string
-
 __all__ = ['colored']
 
 BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = range(8)
@@ -47,36 +45,36 @@ class colored:
                       'white': self.white}
 
     def _add(self, a, b):
-        return string(a) + string(b)
+        return str(a) + str(b)
 
     def _fold_no_color(self, a, b):
         try:
             A = a.no_color()
         except AttributeError:
-            A = string(a)
+            A = str(a)
         try:
             B = b.no_color()
         except AttributeError:
-            B = string(b)
+            B = str(b)
 
-        return ''.join((string(A), string(B)))
+        return ''.join((str(A), str(B)))
 
     def no_color(self):
         if self.s:
-            return string(reduce(self._fold_no_color, self.s))
+            return str(reduce(self._fold_no_color, self.s))
         return ''
 
     def embed(self):
         prefix = ''
         if self.enabled:
             prefix = self.op
-        return ''.join((string(prefix), string(reduce(self._add, self.s))))
+        return ''.join((str(prefix), str(reduce(self._add, self.s))))
 
     def __str__(self):
         suffix = ''
         if self.enabled:
             suffix = RESET_SEQ
-        return string(''.join((self.embed(), string(suffix))))
+        return str(''.join((self.embed(), str(suffix))))
 
     def node(self, s, op):
         return self.__class__(enabled=self.enabled, op=op, *s)
@@ -148,4 +146,4 @@ class colored:
         return self.node(s or [''], RESET_SEQ)
 
     def __add__(self, other):
-        return string(self) + string(other)
+        return str(self) + str(other)

+ 1 - 3
celery/utils/text.py

@@ -10,8 +10,6 @@ from textwrap import fill
 
 from pprint import pformat
 
-from celery.five import string_t
-
 __all__ = [
     'abbr', 'abbrtask', 'dedent', 'dedent_initial',
     'ensure_newlines', 'ensure_sep',
@@ -29,7 +27,7 @@ RE_FORMAT = re.compile(r'%(\w)')
 
 
 def str_to_list(s):
-    if isinstance(s, string_t):
+    if isinstance(s, str):
         return s.split(',')
     return s
 

+ 2 - 3
celery/utils/threads.py

@@ -11,7 +11,6 @@ import traceback
 from contextlib import contextmanager
 
 from celery.local import Proxy
-from celery.five import THREAD_TIMEOUT_MAX, items
 
 try:
     from greenlet import getcurrent as get_ident
@@ -90,7 +89,7 @@ class bgThread(threading.Thread):
         self._is_shutdown.set()
         self._is_stopped.wait()
         if self.is_alive():
-            self.join(THREAD_TIMEOUT_MAX)
+            self.join(threading.TIMEOUT_MAX)
 
 
 def release_local(local):
@@ -121,7 +120,7 @@ class Local:
         object.__setattr__(self, '__ident_func__', get_ident)
 
     def __iter__(self):
-        return iter(items(self.__storage__))
+        return iter(self.__storage__.items())
 
     def __call__(self, proxy):
         """Create a proxy for a name."""

+ 1 - 3
celery/utils/timer2.py

@@ -15,8 +15,6 @@ import threading
 from itertools import count
 from time import sleep
 
-from celery.five import THREAD_TIMEOUT_MAX
-
 from kombu.async.timer import Entry, Timer as Schedule, to_timestamp, logger
 
 TIMER_DEBUG = os.environ.get('TIMER_DEBUG')
@@ -92,7 +90,7 @@ class Timer(threading.Thread):
         self._is_shutdown.set()
         if self.running:
             self._is_stopped.wait()
-            self.join(THREAD_TIMEOUT_MAX)
+            self.join(threading.TIMEOUT_MAX)
             self.running = False
 
     def ensure_started(self):

+ 2 - 4
celery/utils/timeutils.py

@@ -14,8 +14,6 @@ from kombu.utils import cached_property, reprcall
 
 from pytz import timezone as _timezone, AmbiguousTimeError, FixedOffset
 
-from celery.five import string_t
-
 from .functional import dictfilter
 from .iso8601 import parse_iso8601
 from .text import pluralize
@@ -140,7 +138,7 @@ class _Zone:
         return localize(dt, self.local)
 
     def get_timezone(self, zone):
-        if isinstance(zone, string_t):
+        if isinstance(zone, str):
             return _timezone(zone)
         return zone
 
@@ -219,7 +217,7 @@ def rate(rate):
     """Parse rate strings, such as `"100/m"`, `"2/h"` or `"0.5/s"`
     and convert them to seconds."""
     if rate:
-        if isinstance(rate, string_t):
+        if isinstance(rate, str):
             ops, _, modifier = rate.partition('/')
             return RATE_MODIFIER_MAP[modifier or 's'](float(ops)) or 0
         return rate or 0

+ 1 - 2
celery/worker/__init__.py

@@ -33,7 +33,6 @@ from celery import signals
 from celery.exceptions import (
     ImproperlyConfigured, WorkerTerminate, TaskRevokedError,
 )
-from celery.five import values
 from celery.platforms import EX_FAILURE, create_pidlock
 from celery.utils.imports import reload_from_cwd
 from celery.utils.log import mlevel, worker_logger as logger
@@ -190,7 +189,7 @@ class WorkController:
             [self.app.loader.import_task_module(m) for m in includes]
         self.include = includes
         task_modules = {task.__class__.__module__
-                        for task in values(self.app.tasks)}
+                        for task in self.app.tasks.values()}
         self.app.conf.include = tuple(set(prev) | task_modules)
 
     def prepare_args(self, **kwargs):

+ 4 - 4
celery/worker/consumer/consumer.py

@@ -26,7 +26,7 @@ from celery import bootsteps
 from celery import signals
 from celery.app.trace import build_tracer
 from celery.exceptions import InvalidTaskError, NotRegistered
-from celery.five import buffer_t, items, values
+from celery.five import buffer_t
 from celery.utils.functional import noop
 from celery.utils.log import get_logger
 from celery.utils.nodenames import gethostname
@@ -230,7 +230,7 @@ class Consumer:
 
     def reset_rate_limits(self):
         self.task_buckets.update(
-            (n, self.bucket_for_task(t)) for n, t in items(self.app.tasks)
+            (n, self.bucket_for_task(t)) for n, t in self.app.tasks.items()
         )
 
     def _update_prefetch_count(self, index=0):
@@ -381,7 +381,7 @@ class Consumer:
             self.controller.semaphore.clear()
         if self.timer:
             self.timer.clear()
-        for bucket in values(self.task_buckets):
+        for bucket in self.task_buckets.values():
             if bucket:
                 bucket.clear_pending()
         reserved_requests.clear()
@@ -507,7 +507,7 @@ class Consumer:
 
     def update_strategies(self):
         loader = self.app.loader
-        for name, task in items(self.app.tasks):
+        for name, task in self.app.tasks.items():
             self.strategies[name] = task.start_strategy(self.app, self)
             task.__trace__ = build_tracer(name, task, loader, self.hostname,
                                           app=self.app)

+ 1 - 2
celery/worker/consumer/gossip.py

@@ -10,7 +10,6 @@ from kombu import Consumer
 from kombu.async.semaphore import DummyLock
 
 from celery import bootsteps
-from celery.five import values
 from celery.utils.log import get_logger
 from celery.utils.objects import Bunch
 
@@ -156,7 +155,7 @@ class Gossip(bootsteps.ConsumerStep):
     def periodic(self):
         workers = self.state.workers
         dirty = set()
-        for worker in values(workers):
+        for worker in workers.values():
             if not worker.alive:
                 dirty.add(worker)
                 self.on_node_lost(worker)

+ 2 - 3
celery/worker/consumer/mingle.py

@@ -2,7 +2,6 @@
 from __future__ import absolute_import, unicode_literals
 
 from celery import bootsteps
-from celery.five import items
 from celery.utils.log import get_logger
 
 from .events import Events
@@ -34,9 +33,9 @@ class Mingle(bootsteps.StartStopStep):
         replies.pop(c.hostname, None)  # delete my own response
         if replies:
             info('mingle: sync with %s nodes',
-                 len([reply for reply, value in items(replies) if value]))
+                 len([reply for reply, value in replies.items() if value]))
             [self.on_node_reply(c, nodename, reply)
-             for nodename, reply in items(replies) if reply]
+             for nodename, reply in replies.items() if reply]
             info('mingle: sync complete')
         else:
             info('mingle: all alone')

+ 4 - 3
celery/worker/control.py

@@ -5,11 +5,12 @@ from __future__ import absolute_import, unicode_literals
 import io
 import tempfile
 
+from collections import UserDict
+
 from billiard.common import TERM_SIGNAME
 from kombu.utils.encoding import safe_repr
 
 from celery.exceptions import WorkerShutdown
-from celery.five import UserDict, items, string_t
 from celery.platforms import signals as _signals
 from celery.utils import timeutils
 from celery.utils.functional import maybe_list
@@ -59,7 +60,7 @@ def dump_conf(state, with_defaults=False, **kwargs):
 
 
 def _wanted_config_key(key):
-    return isinstance(key, string_t) and not key.startswith('__')
+    return isinstance(key, str) and not key.startswith('__')
 
 
 # -- Task
@@ -292,7 +293,7 @@ def dump_tasks(state, taskinfoitems=None, builtins=False, **kwargs):
             if getattr(task, field, None) is not None
         }
         if fields:
-            info = ['='.join(f) for f in items(fields)]
+            info = ['='.join(f) for f in fields.items()]
             return '{0} [{1}]'.format(task.name, ' '.join(info))
         return task.name
 

+ 1 - 2
celery/worker/request.py

@@ -20,7 +20,6 @@ from celery.exceptions import (
     SoftTimeLimitExceeded, TimeLimitExceeded,
     WorkerLostError, Terminated, Retry, Reject,
 )
-from celery.five import string
 from celery.platforms import signals as _signals
 from celery.utils.functional import noop
 from celery.utils.log import get_logger
@@ -344,7 +343,7 @@ class Request:
         # time to write the result.
         if isinstance(exc, Terminated):
             self._announce_revoked(
-                'terminated', True, string(exc), False)
+                'terminated', True, str(exc), False)
             send_failed_event = False  # already sent revoked event
         elif isinstance(exc, WorkerLostError) or not return_ok:
             self.task.backend.mark_as_failure(

+ 4 - 2
celery/worker/state.py

@@ -13,12 +13,13 @@ import shelve
 import weakref
 import zlib
 
+from collections import Counter
+
 from kombu.serialization import pickle, pickle_protocol
 from kombu.utils import cached_property
 
 from celery import __version__
 from celery.exceptions import WorkerShutdown, WorkerTerminate
-from celery.five import Counter
 from celery.utils.collections import LimitedSet
 
 __all__ = [
@@ -113,8 +114,9 @@ C_BENCH_EVERY = int(os.environ.get('C_BENCH_EVERY') or
 if C_BENCH:  # pragma: no cover
     import atexit
 
+    from time import monotonic
+
     from billiard.process import current_process
-    from celery.five import monotonic
     from celery.utils.debug import memdump, sample_mem
 
     all_count = 0

+ 2 - 1
docs/tutorials/task-cookbook.rst

@@ -37,8 +37,9 @@ For this reason your tasks run-time should not exceed the timeout.
 
 .. code-block:: python
 
+    from time import monotonic
+
     from celery import task
-    from celery.five import monotonic
     from celery.utils.log import get_task_logger
     from contextlib import contextmanager
     from django.core.cache import cache

+ 2 - 2
funtests/benchmarks/bench_worker.py

@@ -3,14 +3,14 @@ from __future__ import absolute_import, print_function, unicode_literals
 import os
 import sys
 
+from time import monotonic
+
 os.environ.update(
     NOSETPS='yes',
     USE_FAST_LOCALS='yes',
 )
 
 from celery import Celery  # noqa
-from celery.five import range  # noqa
-from kombu.five import monotonic  # noqa
 
 DEFAULT_ITS = 40000
 

+ 0 - 1
funtests/suite/test_leak.py

@@ -7,7 +7,6 @@ import shlex
 import subprocess
 
 from celery import current_app
-from celery.five import range
 from celery.tests.case import SkipTest, unittest
 
 import suite  # noqa