five.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.five
  4. ~~~~~~~~~~~
  5. Compatibility implementations of features
  6. only available in newer Python versions.
  7. """
  8. from __future__ import absolute_import
  9. ############## py3k #########################################################
  10. import sys
  11. PY3 = sys.version_info[0] == 3
  12. try:
  13. reload = reload # noqa
  14. except NameError: # pragma: no cover
  15. from imp import reload # noqa
  16. try:
  17. from UserList import UserList # noqa
  18. except ImportError: # pragma: no cover
  19. from collections import UserList # noqa
  20. try:
  21. from UserDict import UserDict # noqa
  22. except ImportError: # pragma: no cover
  23. from collections import UserDict # noqa
  24. if PY3:
  25. import builtins
  26. from queue import Queue, Empty
  27. from itertools import zip_longest
  28. from io import StringIO, BytesIO
  29. map = map
  30. string = str
  31. string_t = str
  32. long_t = int
  33. text_t = str
  34. range = range
  35. open_fqdn = 'builtins.open'
  36. def items(d):
  37. return d.items()
  38. def keys(d):
  39. return d.keys()
  40. def values(d):
  41. return d.values()
  42. def nextfun(it):
  43. return it.__next__
  44. exec_ = getattr(builtins, 'exec')
  45. def reraise(tp, value, tb=None):
  46. if value.__traceback__ is not tb:
  47. raise value.with_traceback(tb)
  48. raise value
  49. class WhateverIO(StringIO):
  50. def write(self, data):
  51. if isinstance(data, bytes):
  52. data = data.encode()
  53. StringIO.write(self, data)
  54. else:
  55. import __builtin__ as builtins # noqa
  56. from Queue import Queue, Empty # noqa
  57. from itertools import imap as map, izip_longest as zip_longest # noqa
  58. from StringIO import StringIO # noqa
  59. string = unicode # noqa
  60. string_t = basestring # noqa
  61. text_t = unicode
  62. long_t = long # noqa
  63. range = xrange
  64. open_fqdn = '__builtin__.open'
  65. def items(d): # noqa
  66. return d.iteritems()
  67. def keys(d): # noqa
  68. return d.iterkeys()
  69. def values(d): # noqa
  70. return d.itervalues()
  71. def nextfun(it): # noqa
  72. return it.next
  73. def exec_(code, globs=None, locs=None):
  74. """Execute code in a namespace."""
  75. if globs is None:
  76. frame = sys._getframe(1)
  77. globs = frame.f_globals
  78. if locs is None:
  79. locs = frame.f_locals
  80. del frame
  81. elif locs is None:
  82. locs = globs
  83. exec("""exec code in globs, locs""")
  84. exec_("""def reraise(tp, value, tb=None): raise tp, value, tb""")
  85. BytesIO = WhateverIO = StringIO # noqa
  86. def with_metaclass(Type, skip_attrs=set(['__dict__', '__weakref__'])):
  87. """Class decorator to set metaclass.
  88. Works with both Python 3 and Python 3 and it does not add
  89. an extra class in the lookup order like ``six.with_metaclass`` does
  90. (that is -- it copies the original class instead of using inheritance).
  91. """
  92. def _clone_with_metaclass(Class):
  93. attrs = dict((key, value) for key, value in items(vars(Class))
  94. if key not in skip_attrs)
  95. return Type(Class.__name__, Class.__bases__, attrs)
  96. return _clone_with_metaclass
  97. ############## collections.OrderedDict ######################################
  98. # was moved to kombu
  99. from kombu.utils.compat import OrderedDict # noqa
  100. ############## threading.TIMEOUT_MAX #######################################
  101. try:
  102. from threading import TIMEOUT_MAX as THREAD_TIMEOUT_MAX
  103. except ImportError:
  104. THREAD_TIMEOUT_MAX = 1e10 # noqa
  105. ############## format(int, ',d') ##########################
  106. if sys.version_info >= (2, 7): # pragma: no cover
  107. def format_d(i):
  108. return format(i, ',d')
  109. else: # pragma: no cover
  110. def format_d(i): # noqa
  111. s = '%d' % i
  112. groups = []
  113. while s and s[-1].isdigit():
  114. groups.append(s[-3:])
  115. s = s[:-3]
  116. return s + ','.join(reversed(groups))
  117. ############## Module Generation ##########################
  118. # Utilities to dynamically
  119. # recreate modules, either for lazy loading or
  120. # to create old modules at runtime instead of
  121. # having them litter the source tree.
  122. import operator
  123. import sys
  124. from functools import reduce
  125. from importlib import import_module
  126. from types import ModuleType
  127. MODULE_DEPRECATED = """
  128. The module %s is deprecated and will be removed in a future version.
  129. """
  130. DEFAULT_ATTRS = set(['__file__', '__path__', '__doc__', '__all__'])
  131. # im_func is no longer available in Py3.
  132. # instead the unbound method itself can be used.
  133. if sys.version_info[0] == 3: # pragma: no cover
  134. def fun_of_method(method):
  135. return method
  136. else:
  137. def fun_of_method(method): # noqa
  138. return method.im_func
  139. def getappattr(path):
  140. """Gets attribute from the current_app recursively,
  141. e.g. getappattr('amqp.get_task_consumer')``."""
  142. from celery import current_app
  143. return current_app._rgetattr(path)
  144. def _compat_task_decorator(*args, **kwargs):
  145. from celery import current_app
  146. kwargs.setdefault('accept_magic_kwargs', True)
  147. return current_app.task(*args, **kwargs)
  148. def _compat_periodic_task_decorator(*args, **kwargs):
  149. from celery.task import periodic_task
  150. kwargs.setdefault('accept_magic_kwargs', True)
  151. return periodic_task(*args, **kwargs)
  152. COMPAT_MODULES = {
  153. 'celery': {
  154. 'execute': {
  155. 'send_task': 'send_task',
  156. },
  157. 'decorators': {
  158. 'task': _compat_task_decorator,
  159. 'periodic_task': _compat_periodic_task_decorator,
  160. },
  161. 'log': {
  162. 'get_default_logger': 'log.get_default_logger',
  163. 'setup_logger': 'log.setup_logger',
  164. 'setup_loggig_subsystem': 'log.setup_logging_subsystem',
  165. 'redirect_stdouts_to_logger': 'log.redirect_stdouts_to_logger',
  166. },
  167. 'messaging': {
  168. 'TaskPublisher': 'amqp.TaskPublisher',
  169. 'TaskConsumer': 'amqp.TaskConsumer',
  170. 'establish_connection': 'connection',
  171. 'with_connection': 'with_default_connection',
  172. 'get_consumer_set': 'amqp.TaskConsumer',
  173. },
  174. 'registry': {
  175. 'tasks': 'tasks',
  176. },
  177. },
  178. 'celery.task': {
  179. 'control': {
  180. 'broadcast': 'control.broadcast',
  181. 'rate_limit': 'control.rate_limit',
  182. 'time_limit': 'control.time_limit',
  183. 'ping': 'control.ping',
  184. 'revoke': 'control.revoke',
  185. 'discard_all': 'control.purge',
  186. 'inspect': 'control.inspect',
  187. },
  188. 'schedules': 'celery.schedules',
  189. 'chords': 'celery.canvas',
  190. }
  191. }
  192. class class_property(object):
  193. def __init__(self, fget=None, fset=None):
  194. assert fget and isinstance(fget, classmethod)
  195. assert isinstance(fset, classmethod) if fset else True
  196. self.__get = fget
  197. self.__set = fset
  198. info = fget.__get__(object) # just need the info attrs.
  199. self.__doc__ = info.__doc__
  200. self.__name__ = info.__name__
  201. self.__module__ = info.__module__
  202. def __get__(self, obj, type=None):
  203. if obj and type is None:
  204. type = obj.__class__
  205. return self.__get.__get__(obj, type)()
  206. def __set__(self, obj, value):
  207. if obj is None:
  208. return self
  209. return self.__set.__get__(obj)(value)
  210. def reclassmethod(method):
  211. return classmethod(fun_of_method(method))
  212. class MagicModule(ModuleType):
  213. _compat_modules = ()
  214. _all_by_module = {}
  215. _direct = {}
  216. _object_origins = {}
  217. def __getattr__(self, name):
  218. if name in self._object_origins:
  219. module = __import__(self._object_origins[name], None, None, [name])
  220. for item in self._all_by_module[module.__name__]:
  221. setattr(self, item, getattr(module, item))
  222. return getattr(module, name)
  223. elif name in self._direct:
  224. module = __import__(self._direct[name], None, None, [name])
  225. setattr(self, name, module)
  226. return module
  227. return ModuleType.__getattribute__(self, name)
  228. def __dir__(self):
  229. return list(set(self.__all__) | DEFAULT_ATTRS)
  230. def create_module(name, attrs, cls_attrs=None, pkg=None, base=MagicModule,
  231. prepare_attr=None):
  232. fqdn = '.'.join([pkg.__name__, name]) if pkg else name
  233. cls_attrs = {} if cls_attrs is None else cls_attrs
  234. attrs = dict((attr_name, prepare_attr(attr) if prepare_attr else attr)
  235. for attr_name, attr in attrs.items())
  236. module = sys.modules[fqdn] = type(name, (base, ), cls_attrs)(fqdn)
  237. module.__dict__.update(attrs)
  238. return module
  239. def recreate_module(name, compat_modules=(), by_module={}, direct={},
  240. base=MagicModule, **attrs):
  241. old_module = sys.modules[name]
  242. origins = get_origins(by_module)
  243. compat_modules = COMPAT_MODULES.get(name, ())
  244. cattrs = dict(_compat_modules=compat_modules,
  245. _all_by_module=by_module, _direct=direct,
  246. _object_origins=origins,
  247. __all__=tuple(set(reduce(operator.add, map(tuple, [
  248. compat_modules, origins, direct, attrs])))))
  249. new_module = create_module(name, attrs, cls_attrs=cattrs, base=base)
  250. new_module.__dict__.update(dict((mod, get_compat_module(new_module, mod))
  251. for mod in compat_modules))
  252. return old_module, new_module
  253. def get_compat_module(pkg, name):
  254. from .local import Proxy
  255. def prepare(attr):
  256. if isinstance(attr, string_t):
  257. return Proxy(getappattr, (attr, ))
  258. return attr
  259. attrs = COMPAT_MODULES[pkg.__name__][name]
  260. if isinstance(attrs, string_t):
  261. fqdn = '.'.join([pkg.__name__, name])
  262. module = sys.modules[fqdn] = import_module(attrs)
  263. return module
  264. attrs['__all__'] = list(attrs)
  265. return create_module(name, dict(attrs), pkg=pkg, prepare_attr=prepare)
  266. def get_origins(defs):
  267. origins = {}
  268. for module, items in defs.items():
  269. origins.update(dict((item, module) for item in items))
  270. return origins