functional.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.utils.functional
  4. ~~~~~~~~~~~~~~~~~~~~~~~
  5. Utilities for functions.
  6. """
  7. from __future__ import absolute_import, print_function
  8. import sys
  9. import threading
  10. from collections import OrderedDict
  11. from functools import partial, wraps
  12. from inspect import getargspec, isfunction
  13. from itertools import islice
  14. from kombu.utils import cached_property
  15. from kombu.utils.functional import lazy, maybe_evaluate, is_list, maybe_list
  16. from celery.five import UserDict, UserList, items, keys
  17. __all__ = ['LRUCache', 'is_list', 'maybe_list', 'memoize', 'mlazy', 'noop',
  18. 'first', 'firstmethod', 'chunks', 'padlist', 'mattrgetter', 'uniq',
  19. 'regen', 'dictfilter', 'lazy', 'maybe_evaluate', 'head_from_fun']
  20. IS_PYPY = hasattr(sys, 'pypy_version_info')
  21. KEYWORD_MARK = object()
  22. FUNHEAD_TEMPLATE = """
  23. def {fun_name}({fun_args}):
  24. return {fun_value}
  25. """
  26. class DummyContext(object):
  27. def __enter__(self):
  28. return self
  29. def __exit__(self, *exc_info):
  30. pass
  31. class LRUCache(UserDict):
  32. """LRU Cache implementation using a doubly linked list to track access.
  33. :keyword limit: The maximum number of keys to keep in the cache.
  34. When a new key is inserted and the limit has been exceeded,
  35. the *Least Recently Used* key will be discarded from the
  36. cache.
  37. """
  38. def __init__(self, limit=None):
  39. self.limit = limit
  40. self.mutex = threading.RLock()
  41. self.data = OrderedDict()
  42. def __getitem__(self, key):
  43. with self.mutex:
  44. value = self[key] = self.data.pop(key)
  45. return value
  46. def update(self, *args, **kwargs):
  47. with self.mutex:
  48. data, limit = self.data, self.limit
  49. data.update(*args, **kwargs)
  50. if limit and len(data) > limit:
  51. # pop additional items in case limit exceeded
  52. # negative overflow will lead to an empty list
  53. for item in islice(iter(data), len(data) - limit):
  54. data.pop(item)
  55. def popitem(self, last=True, _needs_lock=IS_PYPY):
  56. if not _needs_lock:
  57. return self.data.popitem(last)
  58. with self.mutex:
  59. return self.data.popitem(last)
  60. def __setitem__(self, key, value):
  61. # remove least recently used key.
  62. with self.mutex:
  63. if self.limit and len(self.data) >= self.limit:
  64. self.data.pop(next(iter(self.data)))
  65. self.data[key] = value
  66. def __iter__(self):
  67. return iter(self.data)
  68. def _iterate_items(self, _need_lock=IS_PYPY):
  69. with self.mutex if _need_lock else DummyContext():
  70. for k in self:
  71. try:
  72. yield (k, self.data[k])
  73. except KeyError: # pragma: no cover
  74. pass
  75. iteritems = _iterate_items
  76. def _iterate_values(self, _need_lock=IS_PYPY):
  77. with self.mutex if _need_lock else DummyContext():
  78. for k in self:
  79. try:
  80. yield self.data[k]
  81. except KeyError: # pragma: no cover
  82. pass
  83. itervalues = _iterate_values
  84. def _iterate_keys(self):
  85. # userdict.keys in py3k calls __getitem__
  86. return keys(self.data)
  87. iterkeys = _iterate_keys
  88. def incr(self, key, delta=1):
  89. with self.mutex:
  90. # this acts as memcached does- store as a string, but return a
  91. # integer as long as it exists and we can cast it
  92. newval = int(self.data.pop(key)) + delta
  93. self[key] = str(newval)
  94. return newval
  95. def __getstate__(self):
  96. d = dict(vars(self))
  97. d.pop('mutex')
  98. return d
  99. def __setstate__(self, state):
  100. self.__dict__ = state
  101. self.mutex = threading.RLock()
  102. if sys.version_info[0] == 3: # pragma: no cover
  103. keys = _iterate_keys
  104. values = _iterate_values
  105. items = _iterate_items
  106. else: # noqa
  107. def keys(self):
  108. return list(self._iterate_keys())
  109. def values(self):
  110. return list(self._iterate_values())
  111. def items(self):
  112. return list(self._iterate_items())
  113. def memoize(maxsize=None, keyfun=None, Cache=LRUCache):
  114. def _memoize(fun):
  115. mutex = threading.Lock()
  116. cache = Cache(limit=maxsize)
  117. @wraps(fun)
  118. def _M(*args, **kwargs):
  119. if keyfun:
  120. key = keyfun(args, kwargs)
  121. else:
  122. key = args + (KEYWORD_MARK,) + tuple(sorted(kwargs.items()))
  123. try:
  124. with mutex:
  125. value = cache[key]
  126. except KeyError:
  127. value = fun(*args, **kwargs)
  128. _M.misses += 1
  129. with mutex:
  130. cache[key] = value
  131. else:
  132. _M.hits += 1
  133. return value
  134. def clear():
  135. """Clear the cache and reset cache statistics."""
  136. cache.clear()
  137. _M.hits = _M.misses = 0
  138. _M.hits = _M.misses = 0
  139. _M.clear = clear
  140. _M.original_func = fun
  141. return _M
  142. return _memoize
  143. class mlazy(lazy):
  144. """Memoized lazy evaluation.
  145. The function is only evaluated once, every subsequent access
  146. will return the same value.
  147. .. attribute:: evaluated
  148. Set to to :const:`True` after the object has been evaluated.
  149. """
  150. evaluated = False
  151. _value = None
  152. def evaluate(self):
  153. if not self.evaluated:
  154. self._value = super(mlazy, self).evaluate()
  155. self.evaluated = True
  156. return self._value
  157. def noop(*args, **kwargs):
  158. """No operation.
  159. Takes any arguments/keyword arguments and does nothing.
  160. """
  161. pass
  162. def first(predicate, it):
  163. """Return the first element in `iterable` that `predicate` Gives a
  164. :const:`True` value for.
  165. If `predicate` is None it will return the first item that is not None.
  166. """
  167. return next(
  168. (v for v in it if (predicate(v) if predicate else v is not None)),
  169. None,
  170. )
  171. def firstmethod(method):
  172. """Return a function that with a list of instances,
  173. finds the first instance that gives a value for the given method.
  174. The list can also contain lazy instances
  175. (:class:`~kombu.utils.functional.lazy`.)
  176. """
  177. def _matcher(it, *args, **kwargs):
  178. for obj in it:
  179. try:
  180. answer = getattr(maybe_evaluate(obj), method)(*args, **kwargs)
  181. except AttributeError:
  182. pass
  183. else:
  184. if answer is not None:
  185. return answer
  186. return _matcher
  187. def chunks(it, n):
  188. """Split an iterator into chunks with `n` elements each.
  189. Examples
  190. # n == 2
  191. >>> x = chunks(iter([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]), 2)
  192. >>> list(x)
  193. [[0, 1], [2, 3], [4, 5], [6, 7], [8, 9], [10]]
  194. # n == 3
  195. >>> x = chunks(iter([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]), 3)
  196. >>> list(x)
  197. [[0, 1, 2], [3, 4, 5], [6, 7, 8], [9, 10]]
  198. """
  199. for first in it:
  200. yield [first] + list(islice(it, n - 1))
  201. def padlist(container, size, default=None):
  202. """Pad list with default elements.
  203. Examples:
  204. >>> first, last, city = padlist(['George', 'Costanza', 'NYC'], 3)
  205. ('George', 'Costanza', 'NYC')
  206. >>> first, last, city = padlist(['George', 'Costanza'], 3)
  207. ('George', 'Costanza', None)
  208. >>> first, last, city, planet = padlist(
  209. ... ['George', 'Costanza', 'NYC'], 4, default='Earth',
  210. ... )
  211. ('George', 'Costanza', 'NYC', 'Earth')
  212. """
  213. return list(container)[:size] + [default] * (size - len(container))
  214. def mattrgetter(*attrs):
  215. """Like :func:`operator.itemgetter` but return :const:`None` on missing
  216. attributes instead of raising :exc:`AttributeError`."""
  217. return lambda obj: {attr: getattr(obj, attr, None) for attr in attrs}
  218. def uniq(it):
  219. """Return all unique elements in ``it``, preserving order."""
  220. seen = set()
  221. return (seen.add(obj) or obj for obj in it if obj not in seen)
  222. def regen(it):
  223. """Regen takes any iterable, and if the object is an
  224. generator it will cache the evaluated list on first access,
  225. so that the generator can be "consumed" multiple times."""
  226. if isinstance(it, (list, tuple)):
  227. return it
  228. return _regen(it)
  229. class _regen(UserList, list):
  230. # must be subclass of list so that json can encode.
  231. def __init__(self, it):
  232. self.__it = it
  233. def __reduce__(self):
  234. return list, (self.data,)
  235. def __length_hint__(self):
  236. return self.__it.__length_hint__()
  237. @cached_property
  238. def data(self):
  239. return list(self.__it)
  240. def dictfilter(d=None, **kw):
  241. """Remove all keys from dict ``d`` whose value is :const:`None`"""
  242. d = kw if d is None else (dict(d, **kw) if kw else d)
  243. return {k: v for k, v in items(d) if v is not None}
  244. def _argsfromspec(spec, replace_defaults=True):
  245. if spec.defaults:
  246. split = len(spec.defaults)
  247. defaults = (list(range(len(spec.defaults))) if replace_defaults
  248. else spec.defaults)
  249. positional = spec.args[:-split]
  250. optional = list(zip(spec.args[-split:], defaults))
  251. else:
  252. positional, optional = spec.args, []
  253. return ', '.join(filter(None, [
  254. ', '.join(positional),
  255. ', '.join('{0}={1}'.format(k, v) for k, v in optional),
  256. '*{0}'.format(spec.varargs) if spec.varargs else None,
  257. '**{0}'.format(spec.keywords) if spec.keywords else None,
  258. ]))
  259. def head_from_fun(fun, bound=False, debug=False):
  260. if not isfunction(fun) and hasattr(fun, '__call__'):
  261. name, fun = fun.__class__.__name__, fun.__call__
  262. else:
  263. name = fun.__name__
  264. definition = FUNHEAD_TEMPLATE.format(
  265. fun_name=name,
  266. fun_args=_argsfromspec(getargspec(fun)),
  267. fun_value=1,
  268. )
  269. if debug:
  270. print(definition, file=sys.stderr)
  271. namespace = {'__name__': 'headof_{0}'.format(name)}
  272. exec(definition, namespace)
  273. result = namespace[name]
  274. result._source = definition
  275. if bound:
  276. return partial(result, object())
  277. return result