__init__.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.utils
  4. ~~~~~~~~~~~~
  5. Utility functions.
  6. :copyright: (c) 2009 - 2011 by Ask Solem.
  7. :license: BSD, see LICENSE for more details.
  8. """
  9. from __future__ import absolute_import
  10. from __future__ import with_statement
  11. import os
  12. import sys
  13. import operator
  14. import imp as _imp
  15. import importlib
  16. import logging
  17. import threading
  18. import traceback
  19. import warnings
  20. from contextlib import contextmanager
  21. from functools import partial, wraps
  22. from inspect import getargspec
  23. from itertools import islice
  24. from pprint import pprint
  25. from kombu.utils import cached_property, gen_unique_id # noqa
  26. uuid = gen_unique_id
  27. from ..exceptions import CPendingDeprecationWarning, CDeprecationWarning
  28. from .compat import StringIO
  29. from .encoding import safe_repr as _safe_repr
  30. LOG_LEVELS = dict(logging._levelNames)
  31. LOG_LEVELS["FATAL"] = logging.FATAL
  32. LOG_LEVELS[logging.FATAL] = "FATAL"
  33. PENDING_DEPRECATION_FMT = """
  34. %(description)s is scheduled for deprecation in \
  35. version %(deprecation)s and removal in version v%(removal)s. \
  36. %(alternative)s
  37. """
  38. DEPRECATION_FMT = """
  39. %(description)s is deprecated and scheduled for removal in
  40. version %(removal)s. %(alternative)s
  41. """
  42. def warn_deprecated(description=None, deprecation=None, removal=None,
  43. alternative=None):
  44. ctx = {"description": description,
  45. "deprecation": deprecation, "removal": removal,
  46. "alternative": alternative}
  47. if deprecation is not None:
  48. w = CPendingDeprecationWarning(PENDING_DEPRECATION_FMT % ctx)
  49. else:
  50. w = CDeprecationWarning(DEPRECATION_FMT % ctx)
  51. warnings.warn(w)
  52. def deprecated(description=None, deprecation=None, removal=None,
  53. alternative=None):
  54. def _inner(fun):
  55. @wraps(fun)
  56. def __inner(*args, **kwargs):
  57. warn_deprecated(description=description or qualname(fun),
  58. deprecation=deprecation,
  59. removal=removal,
  60. alternative=alternative)
  61. return fun(*args, **kwargs)
  62. return __inner
  63. return _inner
  64. def lpmerge(L, R):
  65. """Left precedent dictionary merge. Keeps values from `l`, if the value
  66. in `r` is :const:`None`."""
  67. return dict(L, **dict((k, v) for k, v in R.iteritems() if v is not None))
  68. class promise(object):
  69. """A promise.
  70. Evaluated when called or if the :meth:`evaluate` method is called.
  71. The function is evaluated on every access, so the value is not
  72. memoized (see :class:`mpromise`).
  73. Overloaded operations that will evaluate the promise:
  74. :meth:`__str__`, :meth:`__repr__`, :meth:`__cmp__`.
  75. """
  76. def __init__(self, fun, *args, **kwargs):
  77. self._fun = fun
  78. self._args = args
  79. self._kwargs = kwargs
  80. def __call__(self):
  81. return self.evaluate()
  82. def evaluate(self):
  83. return self._fun(*self._args, **self._kwargs)
  84. def __str__(self):
  85. return str(self())
  86. def __repr__(self):
  87. return repr(self())
  88. def __cmp__(self, rhs):
  89. if isinstance(rhs, self.__class__):
  90. return -cmp(rhs, self())
  91. return cmp(self(), rhs)
  92. def __eq__(self, rhs):
  93. return self() == rhs
  94. def __deepcopy__(self, memo):
  95. memo[id(self)] = self
  96. return self
  97. def __reduce__(self):
  98. return (self.__class__, (self._fun, ), {"_args": self._args,
  99. "_kwargs": self._kwargs})
  100. class mpromise(promise):
  101. """Memoized promise.
  102. The function is only evaluated once, every subsequent access
  103. will return the same value.
  104. .. attribute:: evaluated
  105. Set to to :const:`True` after the promise has been evaluated.
  106. """
  107. evaluated = False
  108. _value = None
  109. def evaluate(self):
  110. if not self.evaluated:
  111. self._value = super(mpromise, self).evaluate()
  112. self.evaluated = True
  113. return self._value
  114. def maybe_promise(value):
  115. """Evaluates if the value is a promise."""
  116. if isinstance(value, promise):
  117. return value.evaluate()
  118. return value
  119. def noop(*args, **kwargs):
  120. """No operation.
  121. Takes any arguments/keyword arguments and does nothing.
  122. """
  123. pass
  124. if sys.version_info >= (2, 6):
  125. def kwdict(kwargs):
  126. return kwargs
  127. else:
  128. def kwdict(kwargs): # noqa
  129. """Make sure keyword arguments are not in unicode.
  130. This should be fixed in newer Python versions,
  131. see: http://bugs.python.org/issue4978.
  132. """
  133. return dict((key.encode("utf-8"), value)
  134. for key, value in kwargs.items())
  135. def first(predicate, iterable):
  136. """Returns the first element in `iterable` that `predicate` returns a
  137. :const:`True` value for."""
  138. for item in iterable:
  139. if predicate(item):
  140. return item
  141. def firstmethod(method):
  142. """Returns a functions that with a list of instances,
  143. finds the first instance that returns a value for the given method.
  144. The list can also contain promises (:class:`promise`.)
  145. """
  146. def _matcher(seq, *args, **kwargs):
  147. for cls in seq:
  148. try:
  149. answer = getattr(maybe_promise(cls), method)(*args, **kwargs)
  150. if answer is not None:
  151. return answer
  152. except AttributeError:
  153. pass
  154. return _matcher
  155. def chunks(it, n):
  156. """Split an iterator into chunks with `n` elements each.
  157. Examples
  158. # n == 2
  159. >>> x = chunks(iter([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]), 2)
  160. >>> list(x)
  161. [[0, 1], [2, 3], [4, 5], [6, 7], [8, 9], [10]]
  162. # n == 3
  163. >>> x = chunks(iter([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]), 3)
  164. >>> list(x)
  165. [[0, 1, 2], [3, 4, 5], [6, 7, 8], [9, 10]]
  166. """
  167. for first in it:
  168. yield [first] + list(islice(it, n - 1))
  169. def padlist(container, size, default=None):
  170. """Pad list with default elements.
  171. Examples:
  172. >>> first, last, city = padlist(["George", "Costanza", "NYC"], 3)
  173. ("George", "Costanza", "NYC")
  174. >>> first, last, city = padlist(["George", "Costanza"], 3)
  175. ("George", "Costanza", None)
  176. >>> first, last, city, planet = padlist(["George", "Costanza",
  177. "NYC"], 4, default="Earth")
  178. ("George", "Costanza", "NYC", "Earth")
  179. """
  180. return list(container)[:size] + [default] * (size - len(container))
  181. def is_iterable(obj):
  182. try:
  183. iter(obj)
  184. except TypeError:
  185. return False
  186. return True
  187. def mattrgetter(*attrs):
  188. """Like :func:`operator.itemgetter` but returns :const:`None` on missing
  189. attributes instead of raising :exc:`AttributeError`."""
  190. return lambda obj: dict((attr, getattr(obj, attr, None))
  191. for attr in attrs)
  192. if sys.version_info >= (3, 3):
  193. def qualname(obj):
  194. return obj.__qualname__
  195. else:
  196. def qualname(obj): # noqa
  197. if not hasattr(obj, "__name__") and hasattr(obj, "__class__"):
  198. return qualname(obj.__class__)
  199. return '.'.join([obj.__module__, obj.__name__])
  200. get_full_cls_name = qualname # XXX Compat
  201. def fun_takes_kwargs(fun, kwlist=[]):
  202. """With a function, and a list of keyword arguments, returns arguments
  203. in the list which the function takes.
  204. If the object has an `argspec` attribute that is used instead
  205. of using the :meth:`inspect.getargspec` introspection.
  206. :param fun: The function to inspect arguments of.
  207. :param kwlist: The list of keyword arguments.
  208. Examples
  209. >>> def foo(self, x, y, logfile=None, loglevel=None):
  210. ... return x * y
  211. >>> fun_takes_kwargs(foo, ["logfile", "loglevel", "task_id"])
  212. ["logfile", "loglevel"]
  213. >>> def foo(self, x, y, **kwargs):
  214. >>> fun_takes_kwargs(foo, ["logfile", "loglevel", "task_id"])
  215. ["logfile", "loglevel", "task_id"]
  216. """
  217. argspec = getattr(fun, "argspec", getargspec(fun))
  218. args, _varargs, keywords, _defaults = argspec
  219. if keywords != None:
  220. return kwlist
  221. return filter(partial(operator.contains, args), kwlist)
  222. def get_cls_by_name(name, aliases={}, imp=None, package=None,
  223. sep='.', **kwargs):
  224. """Get class by name.
  225. The name should be the full dot-separated path to the class::
  226. modulename.ClassName
  227. Example::
  228. celery.concurrency.processes.TaskPool
  229. ^- class name
  230. or using ':' to separate module and symbol::
  231. celery.concurrency.processes:TaskPool
  232. If `aliases` is provided, a dict containing short name/long name
  233. mappings, the name is looked up in the aliases first.
  234. Examples:
  235. >>> get_cls_by_name("celery.concurrency.processes.TaskPool")
  236. <class 'celery.concurrency.processes.TaskPool'>
  237. >>> get_cls_by_name("default", {
  238. ... "default": "celery.concurrency.processes.TaskPool"})
  239. <class 'celery.concurrency.processes.TaskPool'>
  240. # Does not try to look up non-string names.
  241. >>> from celery.concurrency.processes import TaskPool
  242. >>> get_cls_by_name(TaskPool) is TaskPool
  243. True
  244. """
  245. if imp is None:
  246. imp = importlib.import_module
  247. if not isinstance(name, basestring):
  248. return name # already a class
  249. name = aliases.get(name) or name
  250. sep = ':' if ':' in name else sep
  251. module_name, _, cls_name = name.rpartition(sep)
  252. if not module_name and package:
  253. module_name = package
  254. try:
  255. module = imp(module_name, package=package, **kwargs)
  256. except ValueError, exc:
  257. raise ValueError("Couldn't import %r: %s" % (name, exc))
  258. return getattr(module, cls_name)
  259. get_symbol_by_name = get_cls_by_name
  260. def instantiate(name, *args, **kwargs):
  261. """Instantiate class by name.
  262. See :func:`get_cls_by_name`.
  263. """
  264. return get_cls_by_name(name)(*args, **kwargs)
  265. def truncate_text(text, maxlen=128, suffix="..."):
  266. """Truncates text to a maximum number of characters."""
  267. if len(text) >= maxlen:
  268. return text[:maxlen].rsplit(" ", 1)[0] + suffix
  269. return text
  270. def abbr(S, max, ellipsis="..."):
  271. if S is None:
  272. return "???"
  273. if len(S) > max:
  274. return ellipsis and (S[:max - len(ellipsis)] + ellipsis) or S[:max]
  275. return S
  276. def abbrtask(S, max):
  277. if S is None:
  278. return "???"
  279. if len(S) > max:
  280. module, _, cls = S.rpartition(".")
  281. module = abbr(module, max - len(cls) - 3, False)
  282. return module + "[.]" + cls
  283. return S
  284. def isatty(fh):
  285. # Fixes bug with mod_wsgi:
  286. # mod_wsgi.Log object has no attribute isatty.
  287. return getattr(fh, "isatty", None) and fh.isatty()
  288. def textindent(t, indent=0):
  289. """Indent text."""
  290. return "\n".join(" " * indent + p for p in t.split("\n"))
  291. @contextmanager
  292. def cwd_in_path():
  293. cwd = os.getcwd()
  294. if cwd in sys.path:
  295. yield
  296. else:
  297. sys.path.insert(0, cwd)
  298. try:
  299. yield cwd
  300. finally:
  301. try:
  302. sys.path.remove(cwd)
  303. except ValueError:
  304. pass
  305. def find_module(module, path=None, imp=None):
  306. """Version of :func:`imp.find_module` supporting dots."""
  307. if imp is None:
  308. imp = importlib.import_module
  309. with cwd_in_path():
  310. if "." in module:
  311. last = None
  312. parts = module.split(".")
  313. for i, part in enumerate(parts[:-1]):
  314. path = imp(".".join(parts[:i + 1])).__path__
  315. last = _imp.find_module(parts[i + 1], path)
  316. return last
  317. return _imp.find_module(module)
  318. def import_from_cwd(module, imp=None, package=None):
  319. """Import module, but make sure it finds modules
  320. located in the current directory.
  321. Modules located in the current directory has
  322. precedence over modules located in `sys.path`.
  323. """
  324. if imp is None:
  325. imp = importlib.import_module
  326. with cwd_in_path():
  327. return imp(module, package=package)
  328. def cry(): # pragma: no cover
  329. """Return stacktrace of all active threads.
  330. From https://gist.github.com/737056
  331. """
  332. tmap = {}
  333. main_thread = None
  334. # get a map of threads by their ID so we can print their names
  335. # during the traceback dump
  336. for t in threading.enumerate():
  337. if getattr(t, "ident", None):
  338. tmap[t.ident] = t
  339. else:
  340. main_thread = t
  341. out = StringIO()
  342. sep = "=" * 49 + "\n"
  343. for tid, frame in sys._current_frames().iteritems():
  344. thread = tmap.get(tid, main_thread)
  345. if not thread:
  346. # skip old junk (left-overs from a fork)
  347. continue
  348. out.write("%s\n" % (thread.getName(), ))
  349. out.write(sep)
  350. traceback.print_stack(frame, file=out)
  351. out.write(sep)
  352. out.write("LOCAL VARIABLES\n")
  353. out.write(sep)
  354. pprint(frame.f_locals, stream=out)
  355. out.write("\n\n")
  356. return out.getvalue()
  357. def reprkwargs(kwargs, sep=', ', fmt="%s=%s"):
  358. return sep.join(fmt % (k, _safe_repr(v)) for k, v in kwargs.iteritems())
  359. def reprcall(name, args=(), kwargs=(), sep=', '):
  360. return "%s(%s%s%s)" % (name, sep.join(map(_safe_repr, args)),
  361. (args and kwargs) and sep or "",
  362. reprkwargs(kwargs, sep))