functional.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.utils.functional
  4. ~~~~~~~~~~~~~~~~~~~~~~~
  5. Utilities for functions.
  6. """
  7. from __future__ import absolute_import, print_function
  8. import sys
  9. import threading
  10. from collections import OrderedDict
  11. from functools import partial, wraps
  12. try:
  13. from inspect import isfunction, getfullargspec as getargspec
  14. except ImportError: # Py2
  15. from inspect import isfunction, getargspec # noqa
  16. from itertools import chain, islice
  17. from kombu.utils.functional import (
  18. dictfilter, lazy, maybe_evaluate, is_list, maybe_list,
  19. )
  20. from vine import promise
  21. from celery.five import UserDict, UserList, keys, range
  22. __all__ = ['LRUCache', 'is_list', 'maybe_list', 'memoize', 'mlazy', 'noop',
  23. 'first', 'firstmethod', 'chunks', 'padlist', 'mattrgetter', 'uniq',
  24. 'regen', 'dictfilter', 'lazy', 'maybe_evaluate', 'head_from_fun']
  25. IS_PY3 = sys.version_info[0] == 3
  26. IS_PY2 = sys.version_info[0] == 2
  27. KEYWORD_MARK = object()
  28. FUNHEAD_TEMPLATE = """
  29. def {fun_name}({fun_args}):
  30. return {fun_value}
  31. """
  32. class DummyContext(object):
  33. def __enter__(self):
  34. return self
  35. def __exit__(self, *exc_info):
  36. pass
  37. class LRUCache(UserDict):
  38. """LRU Cache implementation using a doubly linked list to track access.
  39. :keyword limit: The maximum number of keys to keep in the cache.
  40. When a new key is inserted and the limit has been exceeded,
  41. the *Least Recently Used* key will be discarded from the
  42. cache.
  43. """
  44. def __init__(self, limit=None):
  45. self.limit = limit
  46. self.mutex = threading.RLock()
  47. self.data = OrderedDict()
  48. def __getitem__(self, key):
  49. with self.mutex:
  50. value = self[key] = self.data.pop(key)
  51. return value
  52. def update(self, *args, **kwargs):
  53. with self.mutex:
  54. data, limit = self.data, self.limit
  55. data.update(*args, **kwargs)
  56. if limit and len(data) > limit:
  57. # pop additional items in case limit exceeded
  58. for _ in range(len(data) - limit):
  59. data.popitem(last=False)
  60. def popitem(self, last=True):
  61. with self.mutex:
  62. return self.data.popitem(last)
  63. def __setitem__(self, key, value):
  64. # remove least recently used key.
  65. with self.mutex:
  66. if self.limit and len(self.data) >= self.limit:
  67. self.data.pop(next(iter(self.data)))
  68. self.data[key] = value
  69. def __iter__(self):
  70. return iter(self.data)
  71. def _iterate_items(self):
  72. with self.mutex:
  73. for k in self:
  74. try:
  75. yield (k, self.data[k])
  76. except KeyError: # pragma: no cover
  77. pass
  78. iteritems = _iterate_items
  79. def _iterate_values(self):
  80. with self.mutex:
  81. for k in self:
  82. try:
  83. yield self.data[k]
  84. except KeyError: # pragma: no cover
  85. pass
  86. itervalues = _iterate_values
  87. def _iterate_keys(self):
  88. # userdict.keys in py3k calls __getitem__
  89. with self.mutex:
  90. return keys(self.data)
  91. iterkeys = _iterate_keys
  92. def incr(self, key, delta=1):
  93. with self.mutex:
  94. # this acts as memcached does- store as a string, but return a
  95. # integer as long as it exists and we can cast it
  96. newval = int(self.data.pop(key)) + delta
  97. self[key] = str(newval)
  98. return newval
  99. def __getstate__(self):
  100. d = dict(vars(self))
  101. d.pop('mutex')
  102. return d
  103. def __setstate__(self, state):
  104. self.__dict__ = state
  105. self.mutex = threading.RLock()
  106. if sys.version_info[0] == 3: # pragma: no cover
  107. keys = _iterate_keys
  108. values = _iterate_values
  109. items = _iterate_items
  110. else: # noqa
  111. def keys(self):
  112. return list(self._iterate_keys())
  113. def values(self):
  114. return list(self._iterate_values())
  115. def items(self):
  116. return list(self._iterate_items())
  117. def memoize(maxsize=None, keyfun=None, Cache=LRUCache):
  118. def _memoize(fun):
  119. cache = Cache(limit=maxsize)
  120. @wraps(fun)
  121. def _M(*args, **kwargs):
  122. if keyfun:
  123. key = keyfun(args, kwargs)
  124. else:
  125. key = args + (KEYWORD_MARK,) + tuple(sorted(kwargs.items()))
  126. try:
  127. value = cache[key]
  128. except KeyError:
  129. value = fun(*args, **kwargs)
  130. _M.misses += 1
  131. cache[key] = value
  132. else:
  133. _M.hits += 1
  134. return value
  135. def clear():
  136. """Clear the cache and reset cache statistics."""
  137. cache.clear()
  138. _M.hits = _M.misses = 0
  139. _M.hits = _M.misses = 0
  140. _M.clear = clear
  141. _M.original_func = fun
  142. return _M
  143. return _memoize
  144. class mlazy(lazy):
  145. """Memoized lazy evaluation.
  146. The function is only evaluated once, every subsequent access
  147. will return the same value.
  148. .. attribute:: evaluated
  149. Set to to :const:`True` after the object has been evaluated.
  150. """
  151. evaluated = False
  152. _value = None
  153. def evaluate(self):
  154. if not self.evaluated:
  155. self._value = super(mlazy, self).evaluate()
  156. self.evaluated = True
  157. return self._value
  158. def noop(*args, **kwargs):
  159. """No operation.
  160. Takes any arguments/keyword arguments and does nothing.
  161. """
  162. pass
  163. def pass1(arg, *args, **kwargs):
  164. return arg
  165. def evaluate_promises(it):
  166. for value in it:
  167. if isinstance(value, promise):
  168. value = value()
  169. yield value
  170. def first(predicate, it):
  171. """Return the first element in `iterable` that `predicate` Gives a
  172. :const:`True` value for.
  173. If `predicate` is None it will return the first item that is not None.
  174. """
  175. return next(
  176. (v for v in evaluate_promises(it) if (
  177. predicate(v) if predicate is not None else v is not None)),
  178. None,
  179. )
  180. def firstmethod(method):
  181. """Return a function that with a list of instances,
  182. finds the first instance that gives a value for the given method.
  183. The list can also contain lazy instances
  184. (:class:`~kombu.utils.functional.lazy`.)
  185. """
  186. def _matcher(it, *args, **kwargs):
  187. for obj in it:
  188. try:
  189. answer = getattr(maybe_evaluate(obj), method)(*args, **kwargs)
  190. except AttributeError:
  191. pass
  192. else:
  193. if answer is not None:
  194. return answer
  195. return _matcher
  196. def chunks(it, n):
  197. """Split an iterator into chunks with `n` elements each.
  198. Examples
  199. # n == 2
  200. >>> x = chunks(iter([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]), 2)
  201. >>> list(x)
  202. [[0, 1], [2, 3], [4, 5], [6, 7], [8, 9], [10]]
  203. # n == 3
  204. >>> x = chunks(iter([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]), 3)
  205. >>> list(x)
  206. [[0, 1, 2], [3, 4, 5], [6, 7, 8], [9, 10]]
  207. """
  208. for first in it:
  209. yield [first] + list(islice(it, n - 1))
  210. def padlist(container, size, default=None):
  211. """Pad list with default elements.
  212. Examples:
  213. >>> first, last, city = padlist(['George', 'Costanza', 'NYC'], 3)
  214. ('George', 'Costanza', 'NYC')
  215. >>> first, last, city = padlist(['George', 'Costanza'], 3)
  216. ('George', 'Costanza', None)
  217. >>> first, last, city, planet = padlist(
  218. ... ['George', 'Costanza', 'NYC'], 4, default='Earth',
  219. ... )
  220. ('George', 'Costanza', 'NYC', 'Earth')
  221. """
  222. return list(container)[:size] + [default] * (size - len(container))
  223. def mattrgetter(*attrs):
  224. """Like :func:`operator.itemgetter` but return :const:`None` on missing
  225. attributes instead of raising :exc:`AttributeError`."""
  226. return lambda obj: {attr: getattr(obj, attr, None) for attr in attrs}
  227. def uniq(it):
  228. """Return all unique elements in ``it``, preserving order."""
  229. seen = set()
  230. return (seen.add(obj) or obj for obj in it if obj not in seen)
  231. def regen(it):
  232. """Regen takes any iterable, and if the object is an
  233. generator it will cache the evaluated list on first access,
  234. so that the generator can be "consumed" multiple times."""
  235. if isinstance(it, (list, tuple)):
  236. return it
  237. return _regen(it)
  238. class _regen(UserList, list):
  239. # must be subclass of list so that json can encode.
  240. def __init__(self, it):
  241. self.__it = it
  242. self.__index = 0
  243. self.__consumed = []
  244. def __reduce__(self):
  245. return list, (self.data,)
  246. def __length_hint__(self):
  247. return self.__it.__length_hint__()
  248. def __iter__(self):
  249. return chain(self.__consumed, self.__it)
  250. def __getitem__(self, index):
  251. if index < 0:
  252. return self.data[index]
  253. try:
  254. return self.__consumed[index]
  255. except IndexError:
  256. try:
  257. for i in range(self.__index, index + 1):
  258. self.__consumed.append(next(self.__it))
  259. except StopIteration:
  260. raise IndexError(index)
  261. else:
  262. return self.__consumed[index]
  263. @property
  264. def data(self):
  265. try:
  266. self.__consumed.extend(list(self.__it))
  267. except StopIteration:
  268. pass
  269. return self.__consumed
  270. def _argsfromspec(spec, replace_defaults=True):
  271. if spec.defaults:
  272. split = len(spec.defaults)
  273. defaults = (list(range(len(spec.defaults))) if replace_defaults
  274. else spec.defaults)
  275. positional = spec.args[:-split]
  276. optional = list(zip(spec.args[-split:], defaults))
  277. else:
  278. positional, optional = spec.args, []
  279. if IS_PY3: # pragma: no cover
  280. keywords = spec.varkw
  281. elif IS_PY2:
  282. keywords = spec.keywords # noqa
  283. return ', '.join(filter(None, [
  284. ', '.join(positional),
  285. ', '.join('{0}={1}'.format(k, v) for k, v in optional),
  286. '*{0}'.format(spec.varargs) if spec.varargs else None,
  287. '**{0}'.format(keywords) if keywords else None,
  288. ]))
  289. def head_from_fun(fun, bound=False, debug=False):
  290. if not isfunction(fun) and hasattr(fun, '__call__'):
  291. name, fun = fun.__class__.__name__, fun.__call__
  292. else:
  293. name = fun.__name__
  294. definition = FUNHEAD_TEMPLATE.format(
  295. fun_name=name,
  296. fun_args=_argsfromspec(getargspec(fun)),
  297. fun_value=1,
  298. )
  299. if debug: # pragma: no cover
  300. print(definition, file=sys.stderr)
  301. namespace = {'__name__': 'headof_{0}'.format(name)}
  302. exec(definition, namespace)
  303. result = namespace[name]
  304. result._source = definition
  305. if bound:
  306. return partial(result, object())
  307. return result