__init__.py 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.utils
  4. ~~~~~~~~~~~~
  5. Utility functions.
  6. """
  7. from __future__ import absolute_import
  8. from __future__ import with_statement
  9. import operator
  10. import os
  11. import sys
  12. import traceback
  13. import warnings
  14. import types
  15. import datetime
  16. from functools import partial, wraps
  17. from inspect import getargspec
  18. from pprint import pprint
  19. from kombu.entity import Exchange, Queue
  20. from celery.exceptions import CPendingDeprecationWarning, CDeprecationWarning
  21. from .compat import StringIO
  22. from .functional import noop
  23. PENDING_DEPRECATION_FMT = """
  24. %(description)s is scheduled for deprecation in \
  25. version %(deprecation)s and removal in version v%(removal)s. \
  26. %(alternative)s
  27. """
  28. DEPRECATION_FMT = """
  29. %(description)s is deprecated and scheduled for removal in
  30. version %(removal)s. %(alternative)s
  31. """
  32. #: Billiard sets this when execv is enabled.
  33. #: We use it to find out the name of the original ``__main__``
  34. #: module, so that we can properly rewrite the name of the
  35. #: task to be that of ``App.main``.
  36. MP_MAIN_FILE = os.environ.get('MP_MAIN_FILE') or None
  37. #: Exchange for worker direct queues.
  38. WORKER_DIRECT_EXCHANGE = Exchange('C.dq')
  39. #: Format for worker direct queue names.
  40. WORKER_DIRECT_QUEUE_FORMAT = '%s.dq'
  41. def worker_direct(hostname):
  42. if isinstance(hostname, Queue):
  43. return hostname
  44. return Queue(WORKER_DIRECT_QUEUE_FORMAT % hostname,
  45. WORKER_DIRECT_EXCHANGE,
  46. hostname,
  47. auto_delete=True)
  48. def warn_deprecated(description=None, deprecation=None,
  49. removal=None, alternative=None):
  50. ctx = {'description': description,
  51. 'deprecation': deprecation, 'removal': removal,
  52. 'alternative': alternative}
  53. if deprecation is not None:
  54. w = CPendingDeprecationWarning(PENDING_DEPRECATION_FMT % ctx)
  55. else:
  56. w = CDeprecationWarning(DEPRECATION_FMT % ctx)
  57. warnings.warn(w)
  58. def deprecated(description=None, deprecation=None,
  59. removal=None, alternative=None):
  60. def _inner(fun):
  61. @wraps(fun)
  62. def __inner(*args, **kwargs):
  63. from .imports import qualname
  64. warn_deprecated(description=description or qualname(fun),
  65. deprecation=deprecation,
  66. removal=removal,
  67. alternative=alternative)
  68. return fun(*args, **kwargs)
  69. return __inner
  70. return _inner
  71. def lpmerge(L, R):
  72. """In place left precedent dictionary merge.
  73. Keeps values from `L`, if the value in `R` is :const:`None`."""
  74. set = L.__setitem__
  75. [set(k, v) for k, v in R.iteritems() if v is not None]
  76. return L
  77. def is_iterable(obj):
  78. try:
  79. iter(obj)
  80. except TypeError:
  81. return False
  82. return True
  83. def fun_takes_kwargs(fun, kwlist=[]):
  84. """With a function, and a list of keyword arguments, returns arguments
  85. in the list which the function takes.
  86. If the object has an `argspec` attribute that is used instead
  87. of using the :meth:`inspect.getargspec` introspection.
  88. :param fun: The function to inspect arguments of.
  89. :param kwlist: The list of keyword arguments.
  90. Examples
  91. >>> def foo(self, x, y, logfile=None, loglevel=None):
  92. ... return x * y
  93. >>> fun_takes_kwargs(foo, ['logfile', 'loglevel', 'task_id'])
  94. ['logfile', 'loglevel']
  95. >>> def foo(self, x, y, **kwargs):
  96. >>> fun_takes_kwargs(foo, ['logfile', 'loglevel', 'task_id'])
  97. ['logfile', 'loglevel', 'task_id']
  98. """
  99. argspec = getattr(fun, 'argspec', getargspec(fun))
  100. args, _varargs, keywords, _defaults = argspec
  101. if keywords is not None:
  102. return kwlist
  103. return filter(partial(operator.contains, args), kwlist)
  104. def isatty(fh):
  105. # Fixes bug with mod_wsgi:
  106. # mod_wsgi.Log object has no attribute isatty.
  107. return getattr(fh, 'isatty', None) and fh.isatty()
  108. def cry(): # pragma: no cover
  109. """Return stacktrace of all active threads.
  110. From https://gist.github.com/737056
  111. """
  112. import threading
  113. tmap = {}
  114. main_thread = None
  115. # get a map of threads by their ID so we can print their names
  116. # during the traceback dump
  117. for t in threading.enumerate():
  118. if getattr(t, 'ident', None):
  119. tmap[t.ident] = t
  120. else:
  121. main_thread = t
  122. out = StringIO()
  123. sep = '=' * 49 + '\n'
  124. for tid, frame in sys._current_frames().iteritems():
  125. thread = tmap.get(tid, main_thread)
  126. if not thread:
  127. # skip old junk (left-overs from a fork)
  128. continue
  129. out.write('%s\n' % (thread.getName(), ))
  130. out.write(sep)
  131. traceback.print_stack(frame, file=out)
  132. out.write(sep)
  133. out.write('LOCAL VARIABLES\n')
  134. out.write(sep)
  135. pprint(frame.f_locals, stream=out)
  136. out.write('\n\n')
  137. return out.getvalue()
  138. def maybe_reraise():
  139. """Reraise if an exception is currently being handled, or return
  140. otherwise."""
  141. exc_info = sys.exc_info()
  142. try:
  143. if exc_info[2]:
  144. raise exc_info[0], exc_info[1], exc_info[2]
  145. finally:
  146. # see http://docs.python.org/library/sys.html#sys.exc_info
  147. del(exc_info)
  148. def strtobool(term, table={'false': False, 'no': False, '0': False,
  149. 'true': True, 'yes': True, '1': True,
  150. 'on': True, 'off': False}):
  151. if isinstance(term, basestring):
  152. try:
  153. return table[term.lower()]
  154. except KeyError:
  155. raise TypeError('Cannot coerce %r to type bool' % (term, ))
  156. return term
  157. def jsonify(obj):
  158. "Transforms object making it suitable for json serialization"
  159. if isinstance(obj, (int, float, basestring, types.NoneType)):
  160. return obj
  161. elif isinstance(obj, (tuple, list)):
  162. return map(jsonify, obj)
  163. elif isinstance(obj, dict):
  164. return dict([(k,jsonify(v)) for k,v in obj.iteritems()])
  165. # See "Date Time String Format" in the ECMA-262 specification.
  166. elif isinstance(obj, datetime.datetime):
  167. r = obj.isoformat()
  168. if obj.microsecond:
  169. r = r[:23] + r[26:]
  170. if r.endswith('+00:00'):
  171. r = r[:-6] + 'Z'
  172. return r
  173. elif isinstance(obj, datetime.date):
  174. return obj.isoformat()
  175. elif isinstance(obj, datetime.time):
  176. r = obj.isoformat()
  177. if obj.microsecond:
  178. r = r[:12]
  179. return r
  180. elif isinstance(obj, datetime.timedelta):
  181. return str(obj)
  182. else:
  183. raise ValueError("Unsupported type: %s" % type(obj))
  184. def gen_task_name(app, name, module_name):
  185. try:
  186. module = sys.modules[module_name]
  187. except KeyError:
  188. # Fix for manage.py shell_plus (Issue #366)
  189. module = None
  190. if module is not None:
  191. module_name = module.__name__
  192. # - If the task module is used as the __main__ script
  193. # - we need to rewrite the module part of the task name
  194. # - to match App.main.
  195. if MP_MAIN_FILE and module.__file__ == MP_MAIN_FILE:
  196. # - see comment about :envvar:`MP_MAIN_FILE` above.
  197. module_name = '__main__'
  198. if module_name == '__main__' and app.main:
  199. return '.'.join([app.main, name])
  200. return '.'.join(filter(None, [module_name, name]))
  201. # ------------------------------------------------------------------------ #
  202. # > XXX Compat
  203. from .log import LOG_LEVELS # noqa
  204. from .imports import ( # noqa
  205. qualname as get_full_cls_name, symbol_by_name as get_cls_by_name,
  206. instantiate, import_from_cwd
  207. )
  208. from .functional import chunks, noop # noqa
  209. from kombu.utils import cached_property, kwdict, uuid # noqa
  210. gen_unique_id = uuid