__init__.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390
  1. from __future__ import generators
  2. import time
  3. import operator
  4. try:
  5. import ctypes
  6. except ImportError:
  7. ctypes = None
  8. import importlib
  9. import logging
  10. from datetime import datetime
  11. from uuid import UUID, uuid4, _uuid_generate_random
  12. from inspect import getargspec
  13. from itertools import islice
  14. from carrot.utils import rpartition
  15. from celery.utils.compat import all, any, defaultdict
  16. from celery.utils.timeutils import timedelta_seconds # was here before
  17. from celery.utils.functional import curry
  18. LOG_LEVELS = dict(logging._levelNames)
  19. LOG_LEVELS["FATAL"] = logging.FATAL
  20. LOG_LEVELS[logging.FATAL] = "FATAL"
  21. class promise(object):
  22. """A promise.
  23. Evaluated when called or if the :meth:`evaluate` method is called.
  24. The function is evaluated on every access, so the value is not
  25. memoized (see :class:`mpromise`).
  26. Overloaded operations that will evaluate the promise:
  27. :meth:`__str__`, :meth:`__repr__`, :meth:`__cmp__`.
  28. """
  29. def __init__(self, fun, *args, **kwargs):
  30. self._fun = fun
  31. self._args = args
  32. self._kwargs = kwargs
  33. def __call__(self):
  34. return self.evaluate()
  35. def evaluate(self):
  36. return self._fun(*self._args, **self._kwargs)
  37. def __str__(self):
  38. return str(self())
  39. def __repr__(self):
  40. return repr(self())
  41. def __cmp__(self, rhs):
  42. if isinstance(rhs, self.__class__):
  43. return -cmp(rhs, self())
  44. return cmp(self(), rhs)
  45. def __deepcopy__(self, memo):
  46. memo[id(self)] = self
  47. return self
  48. def __reduce__(self):
  49. return (self.__class__, (self._fun, ), {"_args": self._args,
  50. "_kwargs": self._kwargs})
  51. class mpromise(promise):
  52. """Memoized promise.
  53. The function is only evaluated once, every subsequent access
  54. will return the same value.
  55. .. attribute:: evaluated
  56. Set to to :const:`True` after the promise has been evaluated.
  57. """
  58. evaluated = False
  59. _value = None
  60. def evaluate(self):
  61. if not self.evaluated:
  62. self._value = super(mpromise, self).evaluate()
  63. self.evaluated = True
  64. return self._value
  65. def maybe_promise(value):
  66. """Evaluates if the value is a promise."""
  67. if isinstance(value, promise):
  68. return value.evaluate()
  69. return value
  70. def noop(*args, **kwargs):
  71. """No operation.
  72. Takes any arguments/keyword arguments and does nothing.
  73. """
  74. pass
  75. def kwdict(kwargs):
  76. """Make sure keyword arguments are not in unicode.
  77. This should be fixed in newer Python versions,
  78. see: http://bugs.python.org/issue4978.
  79. """
  80. return dict((key.encode("utf-8"), value)
  81. for key, value in kwargs.items())
  82. def first(predicate, iterable):
  83. """Returns the first element in ``iterable`` that ``predicate`` returns a
  84. ``True`` value for."""
  85. for item in iterable:
  86. if predicate(item):
  87. return item
  88. def firstmethod(method):
  89. """Returns a functions that with a list of instances,
  90. finds the first instance that returns a value for the given method.
  91. The list can also contain promises (:class:`promise`.)
  92. """
  93. def _matcher(seq, *args, **kwargs):
  94. for cls in seq:
  95. try:
  96. answer = getattr(maybe_promise(cls), method)(*args, **kwargs)
  97. if answer is not None:
  98. return answer
  99. except AttributeError:
  100. pass
  101. return _matcher
  102. def chunks(it, n):
  103. """Split an iterator into chunks with ``n`` elements each.
  104. Examples
  105. # n == 2
  106. >>> x = chunks(iter([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]), 2)
  107. >>> list(x)
  108. [[0, 1], [2, 3], [4, 5], [6, 7], [8, 9], [10]]
  109. # n == 3
  110. >>> x = chunks(iter([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]), 3)
  111. >>> list(x)
  112. [[0, 1, 2], [3, 4, 5], [6, 7, 8], [9, 10]]
  113. """
  114. for first in it:
  115. yield [first] + list(islice(it, n - 1))
  116. def gen_unique_id():
  117. """Generate a unique id, having - hopefully - a very small chance of
  118. collission.
  119. For now this is provided by :func:`uuid.uuid4`.
  120. """
  121. # Workaround for http://bugs.python.org/issue4607
  122. if ctypes and _uuid_generate_random:
  123. buffer = ctypes.create_string_buffer(16)
  124. _uuid_generate_random(buffer)
  125. return str(UUID(bytes=buffer.raw))
  126. return str(uuid4())
  127. def padlist(container, size, default=None):
  128. """Pad list with default elements.
  129. Examples:
  130. >>> first, last, city = padlist(["George", "Costanza", "NYC"], 3)
  131. ("George", "Costanza", "NYC")
  132. >>> first, last, city = padlist(["George", "Costanza"], 3)
  133. ("George", "Costanza", None)
  134. >>> first, last, city, planet = padlist(["George", "Costanza",
  135. "NYC"], 4, default="Earth")
  136. ("George", "Costanza", "NYC", "Earth")
  137. """
  138. return list(container)[:size] + [default] * (size - len(container))
  139. def is_iterable(obj):
  140. try:
  141. iter(obj)
  142. except TypeError:
  143. return False
  144. return True
  145. def mitemgetter(*items):
  146. """Like :func:`operator.itemgetter` but returns ``None`` on missing items
  147. instead of raising :exc:`KeyError`."""
  148. return lambda container: map(container.get, items)
  149. def mattrgetter(*attrs):
  150. """Like :func:`operator.itemgetter` but returns ``None`` on missing
  151. attributes instead of raising :exc:`AttributeError`."""
  152. return lambda obj: dict((attr, getattr(obj, attr, None))
  153. for attr in attrs)
  154. def get_full_cls_name(cls):
  155. """With a class, get its full module and class name."""
  156. return ".".join([cls.__module__,
  157. cls.__name__])
  158. def repeatlast(it):
  159. """Iterate over all elements in the iterator, and when its exhausted
  160. yield the last value infinitely."""
  161. for item in it:
  162. yield item
  163. while 1: # pragma: no cover
  164. yield item
  165. def retry_over_time(fun, catch, args=[], kwargs={}, errback=noop,
  166. max_retries=None, interval_start=2, interval_step=2, interval_max=30):
  167. """Retry the function over and over until max retries is exceeded.
  168. For each retry we sleep a for a while before we try again, this interval
  169. is increased for every retry until the max seconds is reached.
  170. :param fun: The function to try
  171. :param catch: Exceptions to catch, can be either tuple or a single
  172. exception class.
  173. :keyword args: Positional arguments passed on to the function.
  174. :keyword kwargs: Keyword arguments passed on to the function.
  175. :keyword errback: Callback for when an exception in ``catch`` is raised.
  176. The callback must take two arguments: ``exc`` and ``interval``, where
  177. ``exc`` is the exception instance, and ``interval`` is the time in
  178. seconds to sleep next..
  179. :keyword max_retries: Maximum number of retries before we give up.
  180. If this is not set, we will retry forever.
  181. :keyword interval_start: How long (in seconds) we start sleeping between
  182. retries.
  183. :keyword interval_step: By how much the interval is increased for each
  184. retry.
  185. :keyword interval_max: Maximum number of seconds to sleep between retries.
  186. """
  187. retries = 0
  188. interval_range = xrange(interval_start,
  189. interval_max + interval_start,
  190. interval_step)
  191. for interval in repeatlast(interval_range):
  192. try:
  193. retval = fun(*args, **kwargs)
  194. except catch, exc:
  195. if max_retries and retries > max_retries:
  196. raise
  197. errback(exc, interval)
  198. retries += 1
  199. time.sleep(interval)
  200. else:
  201. return retval
  202. def fun_takes_kwargs(fun, kwlist=[]):
  203. """With a function, and a list of keyword arguments, returns arguments
  204. in the list which the function takes.
  205. If the object has an ``argspec`` attribute that is used instead
  206. of using the :meth:`inspect.getargspec`` introspection.
  207. :param fun: The function to inspect arguments of.
  208. :param kwlist: The list of keyword arguments.
  209. Examples
  210. >>> def foo(self, x, y, logfile=None, loglevel=None):
  211. ... return x * y
  212. >>> fun_takes_kwargs(foo, ["logfile", "loglevel", "task_id"])
  213. ["logfile", "loglevel"]
  214. >>> def foo(self, x, y, **kwargs):
  215. >>> fun_takes_kwargs(foo, ["logfile", "loglevel", "task_id"])
  216. ["logfile", "loglevel", "task_id"]
  217. """
  218. argspec = getattr(fun, "argspec", getargspec(fun))
  219. args, _varargs, keywords, _defaults = argspec
  220. if keywords != None:
  221. return kwlist
  222. return filter(curry(operator.contains, args), kwlist)
  223. def get_cls_by_name(name, aliases={}):
  224. """Get class by name.
  225. The name should be the full dot-separated path to the class::
  226. modulename.ClassName
  227. Example::
  228. celery.concurrency.processes.TaskPool
  229. ^- class name
  230. If ``aliases`` is provided, a dict containing short name/long name
  231. mappings, the name is looked up in the aliases first.
  232. Examples:
  233. >>> get_cls_by_name("celery.concurrency.processes.TaskPool")
  234. <class 'celery.concurrency.processes.TaskPool'>
  235. >>> get_cls_by_name("default", {
  236. ... "default": "celery.concurrency.processes.TaskPool"})
  237. <class 'celery.concurrency.processes.TaskPool'>
  238. # Does not try to look up non-string names.
  239. >>> from celery.concurrency.processes import TaskPool
  240. >>> get_cls_by_name(TaskPool) is TaskPool
  241. True
  242. """
  243. if not isinstance(name, basestring):
  244. return name # already a class
  245. name = aliases.get(name) or name
  246. module_name, _, cls_name = rpartition(name, ".")
  247. module = importlib.import_module(module_name)
  248. return getattr(module, cls_name)
  249. get_symbol_by_name = get_cls_by_name
  250. def instantiate(name, *args, **kwargs):
  251. """Instantiate class by name.
  252. See :func:`get_cls_by_name`.
  253. """
  254. return get_cls_by_name(name)(*args, **kwargs)
  255. def truncate_text(text, maxlen=128, suffix="..."):
  256. """Truncates text to a maximum number of characters."""
  257. if len(text) >= maxlen:
  258. return text[:maxlen].rsplit(" ", 1)[0] + suffix
  259. return text
  260. def abbr(S, max, ellipsis="..."):
  261. if S is None:
  262. return "???"
  263. if len(S) > max:
  264. return ellipsis and (S[:max-len(ellipsis)] + ellipsis) or S[:max]
  265. return S
  266. def abbrtask(S, max):
  267. if S is None:
  268. return "???"
  269. if len(S) > max:
  270. module, _, cls = rpartition(S, ".")
  271. module = abbr(module, max - len(cls), False)
  272. return module + "[.]" + cls
  273. return S
  274. def isatty(fh):
  275. # Fixes bug with mod_wsgi:
  276. # mod_wsgi.Log object has no attribute isatty.
  277. return getattr(fh, "isatty", None) and fh.isatty()
  278. def textindent(t, indent=0):
  279. """Indent text."""
  280. return "\n".join(" " * indent + p for p in t.split("\n"))