utils.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216
  1. from __future__ import generators
  2. import os
  3. import sys
  4. import __builtin__
  5. from StringIO import StringIO
  6. from nose import SkipTest
  7. from celery.utils.functional import wraps
  8. class GeneratorContextManager(object):
  9. def __init__(self, gen):
  10. self.gen = gen
  11. def __enter__(self):
  12. try:
  13. return self.gen.next()
  14. except StopIteration:
  15. raise RuntimeError("generator didn't yield")
  16. def __exit__(self, type, value, traceback):
  17. if type is None:
  18. try:
  19. self.gen.next()
  20. except StopIteration:
  21. return
  22. else:
  23. raise RuntimeError("generator didn't stop")
  24. else:
  25. try:
  26. self.gen.throw(type, value, traceback)
  27. raise RuntimeError("generator didn't stop after throw()")
  28. except StopIteration:
  29. return True
  30. except AttributeError:
  31. raise value
  32. except:
  33. if sys.exc_info()[1] is not value:
  34. raise
  35. def fallback_contextmanager(fun):
  36. def helper(*args, **kwds):
  37. return GeneratorContextManager(fun(*args, **kwds))
  38. return helper
  39. def execute_context(context, fun):
  40. val = context.__enter__()
  41. exc_info = (None, None, None)
  42. retval = None
  43. try:
  44. retval = fun(val)
  45. except:
  46. exc_info = sys.exc_info()
  47. context.__exit__(*exc_info)
  48. return retval
  49. try:
  50. from contextlib import contextmanager
  51. except ImportError:
  52. contextmanager = fallback_contextmanager
  53. from celery.utils import noop
  54. @contextmanager
  55. def eager_tasks():
  56. from celery import conf
  57. prev = conf.ALWAYS_EAGER
  58. conf.ALWAYS_EAGER = True
  59. yield True
  60. conf.ALWAYS_EAGER = prev
  61. def with_environ(env_name, env_value):
  62. def _envpatched(fun):
  63. @wraps(fun)
  64. def _patch_environ(*args, **kwargs):
  65. prev_val = os.environ.get(env_name)
  66. os.environ[env_name] = env_value
  67. try:
  68. return fun(*args, **kwargs)
  69. finally:
  70. if prev_val is not None:
  71. os.environ[env_name] = prev_val
  72. return _patch_environ
  73. return _envpatched
  74. def sleepdeprived(fun):
  75. @wraps(fun)
  76. def _sleepdeprived(*args, **kwargs):
  77. import time
  78. old_sleep = time.sleep
  79. time.sleep = noop
  80. try:
  81. return fun(*args, **kwargs)
  82. finally:
  83. time.sleep = old_sleep
  84. return _sleepdeprived
  85. def skip_if_environ(env_var_name):
  86. def _wrap_test(fun):
  87. @wraps(fun)
  88. def _skips_if_environ(*args, **kwargs):
  89. if os.environ.get(env_var_name):
  90. raise SkipTest("SKIP %s: %s set\n" % (
  91. fun.__name__, env_var_name))
  92. return fun(*args, **kwargs)
  93. return _skips_if_environ
  94. return _wrap_test
  95. def skip_if_quick(fun):
  96. return skip_if_environ("QUICKTEST")(fun)
  97. def _skip_test(reason, sign):
  98. def _wrap_test(fun):
  99. @wraps(fun)
  100. def _skipped_test(*args, **kwargs):
  101. raise SkipTest("%s: %s" % (sign, reason))
  102. return _skipped_test
  103. return _wrap_test
  104. def todo(reason):
  105. """TODO test decorator."""
  106. return _skip_test(reason, "TODO")
  107. def skip(reason):
  108. """Skip test decorator."""
  109. return _skip_test(reason, "SKIP")
  110. def skip_if(predicate, reason):
  111. """Skip test if predicate is ``True``."""
  112. def _inner(fun):
  113. return predicate and skip(reason)(fun) or fun
  114. return _inner
  115. def skip_unless(predicate, reason):
  116. """Skip test if predicate is ``False``."""
  117. return skip_if(not predicate, reason)
  118. # Taken from
  119. # http://bitbucket.org/runeh/snippets/src/tip/missing_modules.py
  120. @contextmanager
  121. def mask_modules(*modnames):
  122. """Ban some modules from being importable inside the context
  123. For example:
  124. >>> with missing_modules("sys"):
  125. ... try:
  126. ... import sys
  127. ... except ImportError:
  128. ... print "sys not found"
  129. sys not found
  130. >>> import sys
  131. >>> sys.version
  132. (2, 5, 2, 'final', 0)
  133. """
  134. realimport = __builtin__.__import__
  135. def myimp(name, *args, **kwargs):
  136. if name in modnames:
  137. raise ImportError("No module named %s" % name)
  138. else:
  139. return realimport(name, *args, **kwargs)
  140. __builtin__.__import__ = myimp
  141. yield True
  142. __builtin__.__import__ = realimport
  143. @contextmanager
  144. def override_stdouts():
  145. """Override ``sys.stdout`` and ``sys.stderr`` with ``StringIO``."""
  146. prev_out, prev_err = sys.stdout, sys.stderr
  147. mystdout, mystderr = StringIO(), StringIO()
  148. sys.stdout = sys.__stdout__ = mystdout
  149. sys.stderr = sys.__stderr__ = mystderr
  150. yield mystdout, mystderr
  151. sys.stdout = sys.__stdout__ = prev_out
  152. sys.stderr = sys.__stderr__ = prev_err