123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393 |
- # -*- coding: utf-8 -*-
- """
- celery.five
- ~~~~~~~~~~~
- Compatibility implementations of features
- only available in newer Python versions.
- """
- from __future__ import absolute_import
- __all__ = ['Counter', 'reload', 'UserList', 'UserDict', 'Queue', 'Empty',
- 'zip_longest', 'map', 'string', 'string_t',
- 'long_t', 'text_t', 'range', 'int_types', 'items', 'keys', 'values',
- 'nextfun', 'reraise', 'WhateverIO', 'with_metaclass',
- 'OrderedDict', 'THREAD_TIMEOUT_MAX', 'format_d',
- 'class_property', 'reclassmethod', 'create_module',
- 'recreate_module', 'monotonic']
- import io
- try:
- from collections import Counter
- except ImportError: # pragma: no cover
- from collections import defaultdict
- def Counter(): # noqa
- return defaultdict(int)
- ############## py3k #########################################################
- import sys
- PY3 = sys.version_info[0] == 3
- try:
- reload = reload # noqa
- except NameError: # pragma: no cover
- from imp import reload # noqa
- try:
- from UserList import UserList # noqa
- except ImportError: # pragma: no cover
- from collections import UserList # noqa
- try:
- from UserDict import UserDict # noqa
- except ImportError: # pragma: no cover
- from collections import UserDict # noqa
- from kombu.five import monotonic
- if PY3: # pragma: no cover
- import builtins
- from queue import Queue, Empty
- from itertools import zip_longest
- map = map
- string = str
- string_t = str
- long_t = int
- text_t = str
- range = range
- int_types = (int, )
- _byte_t = bytes
- open_fqdn = 'builtins.open'
- def items(d):
- return d.items()
- def keys(d):
- return d.keys()
- def values(d):
- return d.values()
- def nextfun(it):
- return it.__next__
- exec_ = getattr(builtins, 'exec')
- def reraise(tp, value, tb=None):
- if value.__traceback__ is not tb:
- raise value.with_traceback(tb)
- raise value
- else:
- import __builtin__ as builtins # noqa
- from Queue import Queue, Empty # noqa
- from itertools import imap as map, izip_longest as zip_longest # noqa
- string = unicode # noqa
- string_t = basestring # noqa
- text_t = unicode # noqa
- long_t = long # noqa
- range = xrange # noqa
- int_types = (int, long) # noqa
- _byte_t = (str, bytes) # noqa
- open_fqdn = '__builtin__.open'
- def items(d): # noqa
- return d.iteritems()
- def keys(d): # noqa
- return d.iterkeys()
- def values(d): # noqa
- return d.itervalues()
- def nextfun(it): # noqa
- return it.next
- def exec_(code, globs=None, locs=None): # pragma: no cover
- """Execute code in a namespace."""
- if globs is None:
- frame = sys._getframe(1)
- globs = frame.f_globals
- if locs is None:
- locs = frame.f_locals
- del frame
- elif locs is None:
- locs = globs
- exec("""exec code in globs, locs""")
- exec_("""def reraise(tp, value, tb=None): raise tp, value, tb""")
- def with_metaclass(Type, skip_attrs=set(['__dict__', '__weakref__'])):
- """Class decorator to set metaclass.
- Works with both Python 2 and Python 3 and it does not add
- an extra class in the lookup order like ``six.with_metaclass`` does
- (that is -- it copies the original class instead of using inheritance).
- """
- def _clone_with_metaclass(Class):
- attrs = dict((key, value) for key, value in items(vars(Class))
- if key not in skip_attrs)
- return Type(Class.__name__, Class.__bases__, attrs)
- return _clone_with_metaclass
- ############## collections.OrderedDict ######################################
- # was moved to kombu
- from kombu.utils.compat import OrderedDict # noqa
- ############## threading.TIMEOUT_MAX #######################################
- try:
- from threading import TIMEOUT_MAX as THREAD_TIMEOUT_MAX
- except ImportError:
- THREAD_TIMEOUT_MAX = 1e10 # noqa
- ############## format(int, ',d') ##########################
- if sys.version_info >= (2, 7): # pragma: no cover
- def format_d(i):
- return format(i, ',d')
- else: # pragma: no cover
- def format_d(i): # noqa
- s = '%d' % i
- groups = []
- while s and s[-1].isdigit():
- groups.append(s[-3:])
- s = s[:-3]
- return s + ','.join(reversed(groups))
- ############## Module Generation ##########################
- # Utilities to dynamically
- # recreate modules, either for lazy loading or
- # to create old modules at runtime instead of
- # having them litter the source tree.
- import operator
- import sys
- # import fails in python 2.5. fallback to reduce in stdlib
- try:
- from functools import reduce
- except ImportError:
- pass
- from importlib import import_module
- from types import ModuleType
- MODULE_DEPRECATED = """
- The module %s is deprecated and will be removed in a future version.
- """
- DEFAULT_ATTRS = set(['__file__', '__path__', '__doc__', '__all__'])
- # im_func is no longer available in Py3.
- # instead the unbound method itself can be used.
- if sys.version_info[0] == 3: # pragma: no cover
- def fun_of_method(method):
- return method
- else:
- def fun_of_method(method): # noqa
- return method.im_func
- def getappattr(path):
- """Gets attribute from the current_app recursively,
- e.g. getappattr('amqp.get_task_consumer')``."""
- from celery import current_app
- return current_app._rgetattr(path)
- def _compat_task_decorator(*args, **kwargs):
- from celery import current_app
- kwargs.setdefault('accept_magic_kwargs', True)
- return current_app.task(*args, **kwargs)
- def _compat_periodic_task_decorator(*args, **kwargs):
- from celery.task import periodic_task
- kwargs.setdefault('accept_magic_kwargs', True)
- return periodic_task(*args, **kwargs)
- COMPAT_MODULES = {
- 'celery': {
- 'execute': {
- 'send_task': 'send_task',
- },
- 'decorators': {
- 'task': _compat_task_decorator,
- 'periodic_task': _compat_periodic_task_decorator,
- },
- 'log': {
- 'get_default_logger': 'log.get_default_logger',
- 'setup_logger': 'log.setup_logger',
- 'setup_loggig_subsystem': 'log.setup_logging_subsystem',
- 'redirect_stdouts_to_logger': 'log.redirect_stdouts_to_logger',
- },
- 'messaging': {
- 'TaskPublisher': 'amqp.TaskPublisher',
- 'TaskConsumer': 'amqp.TaskConsumer',
- 'establish_connection': 'connection',
- 'get_consumer_set': 'amqp.TaskConsumer',
- },
- 'registry': {
- 'tasks': 'tasks',
- },
- },
- 'celery.task': {
- 'control': {
- 'broadcast': 'control.broadcast',
- 'rate_limit': 'control.rate_limit',
- 'time_limit': 'control.time_limit',
- 'ping': 'control.ping',
- 'revoke': 'control.revoke',
- 'discard_all': 'control.purge',
- 'inspect': 'control.inspect',
- },
- 'schedules': 'celery.schedules',
- 'chords': 'celery.canvas',
- }
- }
- class class_property(object):
- def __init__(self, getter=None, setter=None):
- if getter is not None and not isinstance(getter, classmethod):
- getter = classmethod(getter)
- if setter is not None and not isinstance(setter, classmethod):
- setter = classmethod(setter)
- self.__get = getter
- self.__set = setter
- info = getter.__get__(object) # just need the info attrs.
- self.__doc__ = info.__doc__
- self.__name__ = info.__name__
- self.__module__ = info.__module__
- def __get__(self, obj, type=None):
- if obj and type is None:
- type = obj.__class__
- return self.__get.__get__(obj, type)()
- def __set__(self, obj, value):
- if obj is None:
- return self
- return self.__set.__get__(obj)(value)
- def setter(self, setter):
- return self.__class__(self.__get, setter)
- def reclassmethod(method):
- return classmethod(fun_of_method(method))
- class MagicModule(ModuleType):
- _compat_modules = ()
- _all_by_module = {}
- _direct = {}
- _object_origins = {}
- def __getattr__(self, name):
- if name in self._object_origins:
- module = __import__(self._object_origins[name], None, None, [name])
- for item in self._all_by_module[module.__name__]:
- setattr(self, item, getattr(module, item))
- return getattr(module, name)
- elif name in self._direct: # pragma: no cover
- module = __import__(self._direct[name], None, None, [name])
- setattr(self, name, module)
- return module
- return ModuleType.__getattribute__(self, name)
- def __dir__(self):
- return list(set(self.__all__) | DEFAULT_ATTRS)
- def __reduce__(self):
- return import_module, (self.__name__, )
- def create_module(name, attrs, cls_attrs=None, pkg=None,
- base=MagicModule, prepare_attr=None):
- fqdn = '.'.join([pkg.__name__, name]) if pkg else name
- cls_attrs = {} if cls_attrs is None else cls_attrs
- pkg, _, modname = name.rpartition('.')
- cls_attrs['__module__'] = pkg
- attrs = dict((attr_name, prepare_attr(attr) if prepare_attr else attr)
- for attr_name, attr in items(attrs))
- module = sys.modules[fqdn] = type(modname, (base, ), cls_attrs)(fqdn)
- module.__dict__.update(attrs)
- return module
- def recreate_module(name, compat_modules=(), by_module={}, direct={},
- base=MagicModule, **attrs):
- old_module = sys.modules[name]
- origins = get_origins(by_module)
- compat_modules = COMPAT_MODULES.get(name, ())
- cattrs = dict(
- _compat_modules=compat_modules,
- _all_by_module=by_module, _direct=direct,
- _object_origins=origins,
- __all__=tuple(set(reduce(
- operator.add,
- [tuple(v) for v in [compat_modules, origins, direct, attrs]],
- ))),
- )
- new_module = create_module(name, attrs, cls_attrs=cattrs, base=base)
- new_module.__dict__.update(dict((mod, get_compat_module(new_module, mod))
- for mod in compat_modules))
- return old_module, new_module
- def get_compat_module(pkg, name):
- from .local import Proxy
- def prepare(attr):
- if isinstance(attr, string_t):
- return Proxy(getappattr, (attr, ))
- return attr
- attrs = COMPAT_MODULES[pkg.__name__][name]
- if isinstance(attrs, string_t):
- fqdn = '.'.join([pkg.__name__, name])
- module = sys.modules[fqdn] = import_module(attrs)
- return module
- attrs['__all__'] = list(attrs)
- return create_module(name, dict(attrs), pkg=pkg, prepare_attr=prepare)
- def get_origins(defs):
- origins = {}
- for module, attrs in items(defs):
- origins.update(dict((attr, module) for attr in attrs))
- return origins
- _SIO_write = io.StringIO.write
- _SIO_init = io.StringIO.__init__
- class WhateverIO(io.StringIO):
- def __init__(self, v=None, *a, **kw):
- _SIO_init(self, v.decode() if isinstance(v, _byte_t) else v, *a, **kw)
- def write(self, data):
- _SIO_write(self, data.decode() if isinstance(data, _byte_t) else data)
|