functional.py 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.utils.functional
  4. ~~~~~~~~~~~~~~~~~~~~~~~
  5. Utilities for functions.
  6. """
  7. from __future__ import absolute_import
  8. import threading
  9. from functools import wraps
  10. from itertools import islice
  11. from kombu.utils import cached_property
  12. from kombu.utils.functional import promise, maybe_promise
  13. from kombu.utils.compat import OrderedDict
  14. from celery.five import UserDict, UserList, items, keys, string_t
  15. KEYWORD_MARK = object()
  16. class LRUCache(UserDict):
  17. """LRU Cache implementation using a doubly linked list to track access.
  18. :keyword limit: The maximum number of keys to keep in the cache.
  19. When a new key is inserted and the limit has been exceeded,
  20. the *Least Recently Used* key will be discarded from the
  21. cache.
  22. """
  23. def __init__(self, limit=None):
  24. self.limit = limit
  25. self.mutex = threading.RLock()
  26. self.data = OrderedDict()
  27. def __getitem__(self, key):
  28. with self.mutex:
  29. value = self[key] = self.data.pop(key)
  30. return value
  31. def keys(self):
  32. # userdict.keys in py3k calls __getitem__
  33. return keys(self.data)
  34. def values(self):
  35. return list(self._iterate_values())
  36. def items(self):
  37. return list(self._iterate_items())
  38. def update(self, *args, **kwargs):
  39. with self.mutex:
  40. data, limit = self.data, self.limit
  41. data.update(*args, **kwargs)
  42. if limit and len(data) > limit:
  43. # pop additional items in case limit exceeded
  44. # negative overflow will lead to an empty list
  45. for item in islice(iter(data), len(data) - limit):
  46. data.pop(item)
  47. def __setitem__(self, key, value):
  48. # remove least recently used key.
  49. with self.mutex:
  50. if self.limit and len(self.data) >= self.limit:
  51. self.data.pop(next(iter(self.data)))
  52. self.data[key] = value
  53. def __iter__(self):
  54. return iter(self.data)
  55. def _iterate_items(self):
  56. for k in self:
  57. try:
  58. yield (k, self.data[k])
  59. except KeyError: # pragma: no cover
  60. pass
  61. iteritems = _iterate_items
  62. def _iterate_values(self):
  63. for k in self:
  64. try:
  65. yield self.data[k]
  66. except KeyError: # pragma: no cover
  67. pass
  68. itervalues = _iterate_values
  69. def incr(self, key, delta=1):
  70. with self.mutex:
  71. # this acts as memcached does- store as a string, but return a
  72. # integer as long as it exists and we can cast it
  73. newval = int(self.data.pop(key)) + delta
  74. self[key] = str(newval)
  75. return newval
  76. def __getstate__(self):
  77. d = dict(vars(self))
  78. d.pop('mutex')
  79. return d
  80. def __setstate__(self, state):
  81. self.__dict__ = state
  82. self.mutex = threading.RLock()
  83. def is_list(l, scalars=(dict, string_t)):
  84. """Returns true if object is list-like, but not a dict or string."""
  85. return hasattr(l, '__iter__') and not isinstance(l, scalars or ())
  86. def maybe_list(l, scalars=(dict, string_t)):
  87. """Returns list of one element if ``l`` is a scalar."""
  88. return l if l is None or is_list(l, scalars) else [l]
  89. def memoize(maxsize=None, Cache=LRUCache):
  90. def _memoize(fun):
  91. mutex = threading.Lock()
  92. cache = Cache(limit=maxsize)
  93. @wraps(fun)
  94. def _M(*args, **kwargs):
  95. key = args + (KEYWORD_MARK, ) + tuple(sorted(kwargs.items()))
  96. try:
  97. with mutex:
  98. value = cache[key]
  99. except KeyError:
  100. value = fun(*args, **kwargs)
  101. _M.misses += 1
  102. with mutex:
  103. cache[key] = value
  104. else:
  105. _M.hits += 1
  106. return value
  107. def clear():
  108. """Clear the cache and reset cache statistics."""
  109. cache.clear()
  110. _M.hits = _M.misses = 0
  111. _M.hits = _M.misses = 0
  112. _M.clear = clear
  113. _M.original_func = fun
  114. return _M
  115. return _memoize
  116. class mpromise(promise):
  117. """Memoized promise.
  118. The function is only evaluated once, every subsequent access
  119. will return the same value.
  120. .. attribute:: evaluated
  121. Set to to :const:`True` after the promise has been evaluated.
  122. """
  123. evaluated = False
  124. _value = None
  125. def evaluate(self):
  126. if not self.evaluated:
  127. self._value = super(mpromise, self).evaluate()
  128. self.evaluated = True
  129. return self._value
  130. def noop(*args, **kwargs):
  131. """No operation.
  132. Takes any arguments/keyword arguments and does nothing.
  133. """
  134. pass
  135. def first(predicate, it):
  136. """Returns the first element in `iterable` that `predicate` returns a
  137. :const:`True` value for.
  138. If `predicate` is None it will return the first item that is not None.
  139. """
  140. return next(
  141. (v for v in it if (predicate(v) if predicate else v is not None)),
  142. None,
  143. )
  144. def firstmethod(method):
  145. """Returns a function that with a list of instances,
  146. finds the first instance that returns a value for the given method.
  147. The list can also contain promises (:class:`promise`.)
  148. """
  149. def _matcher(it, *args, **kwargs):
  150. for obj in it:
  151. try:
  152. answer = getattr(maybe_promise(obj), method)(*args, **kwargs)
  153. except AttributeError:
  154. pass
  155. else:
  156. if answer is not None:
  157. return answer
  158. return _matcher
  159. def chunks(it, n):
  160. """Split an iterator into chunks with `n` elements each.
  161. Examples
  162. # n == 2
  163. >>> x = chunks(iter([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]), 2)
  164. >>> list(x)
  165. [[0, 1], [2, 3], [4, 5], [6, 7], [8, 9], [10]]
  166. # n == 3
  167. >>> x = chunks(iter([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]), 3)
  168. >>> list(x)
  169. [[0, 1, 2], [3, 4, 5], [6, 7, 8], [9, 10]]
  170. """
  171. # XXX This function is not used anymore, at least not by Celery itself.
  172. for first in it:
  173. yield [first] + list(islice(it, n - 1))
  174. def padlist(container, size, default=None):
  175. """Pad list with default elements.
  176. Examples:
  177. >>> first, last, city = padlist(['George', 'Costanza', 'NYC'], 3)
  178. ('George', 'Costanza', 'NYC')
  179. >>> first, last, city = padlist(['George', 'Costanza'], 3)
  180. ('George', 'Costanza', None)
  181. >>> first, last, city, planet = padlist(['George', 'Costanza',
  182. 'NYC'], 4, default='Earth')
  183. ('George', 'Costanza', 'NYC', 'Earth')
  184. """
  185. return list(container)[:size] + [default] * (size - len(container))
  186. def mattrgetter(*attrs):
  187. """Like :func:`operator.itemgetter` but returns :const:`None` on missing
  188. attributes instead of raising :exc:`AttributeError`."""
  189. return lambda obj: dict((attr, getattr(obj, attr, None))
  190. for attr in attrs)
  191. def uniq(it):
  192. """Returns all unique elements in ``it``, preserving order."""
  193. seen = set()
  194. return (seen.add(obj) or obj for obj in it if obj not in seen)
  195. def regen(it):
  196. """Regen takes any iterable, and if the object is an
  197. generator it will cache the evaluated list on first access,
  198. so that the generator can be "consumed" multiple times."""
  199. if isinstance(it, (list, tuple)):
  200. return it
  201. return _regen(it)
  202. class _regen(UserList, list):
  203. # must be subclass of list so that json can encode.
  204. def __init__(self, it):
  205. self.__it = it
  206. @cached_property
  207. def data(self):
  208. return list(self.__it)
  209. def dictfilter(d, **filterkeys):
  210. d = dict(d, **filterkeys) if filterkeys else d
  211. return dict((k, v) for k, v in items(d) if v is not None)