utils.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230
  1. from __future__ import generators
  2. import os
  3. import sys
  4. import __builtin__
  5. from StringIO import StringIO
  6. from nose import SkipTest
  7. from celery.app import app_or_default
  8. from celery.utils.functional import wraps
  9. class GeneratorContextManager(object):
  10. def __init__(self, gen):
  11. self.gen = gen
  12. def __enter__(self):
  13. try:
  14. return self.gen.next()
  15. except StopIteration:
  16. raise RuntimeError("generator didn't yield")
  17. def __exit__(self, type, value, traceback):
  18. if type is None:
  19. try:
  20. self.gen.next()
  21. except StopIteration:
  22. return
  23. else:
  24. raise RuntimeError("generator didn't stop")
  25. else:
  26. try:
  27. self.gen.throw(type, value, traceback)
  28. raise RuntimeError("generator didn't stop after throw()")
  29. except StopIteration:
  30. return True
  31. except AttributeError:
  32. raise value
  33. except:
  34. if sys.exc_info()[1] is not value:
  35. raise
  36. def fallback_contextmanager(fun):
  37. def helper(*args, **kwds):
  38. return GeneratorContextManager(fun(*args, **kwds))
  39. return helper
  40. def execute_context(context, fun):
  41. val = context.__enter__()
  42. exc_info = (None, None, None)
  43. retval = None
  44. try:
  45. retval = fun(val)
  46. except:
  47. exc_info = sys.exc_info()
  48. context.__exit__(*exc_info)
  49. return retval
  50. try:
  51. from contextlib import contextmanager
  52. except ImportError:
  53. contextmanager = fallback_contextmanager
  54. from celery.utils import noop
  55. @contextmanager
  56. def eager_tasks():
  57. app = app_or_default()
  58. prev = app.conf.CELERY_ALWAYS_EAGER
  59. app.conf.CELERY_ALWAYS_EAGER = True
  60. yield True
  61. app.conf.CELERY_ALWAYS_EAGER = prev
  62. def with_eager_tasks(fun):
  63. @wraps(fun)
  64. def _inner(*args, **kwargs):
  65. app = app_or_default()
  66. prev = app.conf.CELERY_ALWAYS_EAGER
  67. app.conf.CELERY_ALWAYS_EAGER = True
  68. try:
  69. return fun(*args, **kwargs)
  70. finally:
  71. app.conf.CELERY_ALWAYS_EAGER = prev
  72. def with_environ(env_name, env_value):
  73. def _envpatched(fun):
  74. @wraps(fun)
  75. def _patch_environ(*args, **kwargs):
  76. prev_val = os.environ.get(env_name)
  77. os.environ[env_name] = env_value
  78. try:
  79. return fun(*args, **kwargs)
  80. finally:
  81. if prev_val is not None:
  82. os.environ[env_name] = prev_val
  83. return _patch_environ
  84. return _envpatched
  85. def sleepdeprived(fun):
  86. @wraps(fun)
  87. def _sleepdeprived(*args, **kwargs):
  88. import time
  89. old_sleep = time.sleep
  90. time.sleep = noop
  91. try:
  92. return fun(*args, **kwargs)
  93. finally:
  94. time.sleep = old_sleep
  95. return _sleepdeprived
  96. def skip_if_environ(env_var_name):
  97. def _wrap_test(fun):
  98. @wraps(fun)
  99. def _skips_if_environ(*args, **kwargs):
  100. if os.environ.get(env_var_name):
  101. raise SkipTest("SKIP %s: %s set\n" % (
  102. fun.__name__, env_var_name))
  103. return fun(*args, **kwargs)
  104. return _skips_if_environ
  105. return _wrap_test
  106. def skip_if_quick(fun):
  107. return skip_if_environ("QUICKTEST")(fun)
  108. def _skip_test(reason, sign):
  109. def _wrap_test(fun):
  110. @wraps(fun)
  111. def _skipped_test(*args, **kwargs):
  112. raise SkipTest("%s: %s" % (sign, reason))
  113. return _skipped_test
  114. return _wrap_test
  115. def todo(reason):
  116. """TODO test decorator."""
  117. return _skip_test(reason, "TODO")
  118. def skip(reason):
  119. """Skip test decorator."""
  120. return _skip_test(reason, "SKIP")
  121. def skip_if(predicate, reason):
  122. """Skip test if predicate is :const:`True`."""
  123. def _inner(fun):
  124. return predicate and skip(reason)(fun) or fun
  125. return _inner
  126. def skip_unless(predicate, reason):
  127. """Skip test if predicate is :const:`False`."""
  128. return skip_if(not predicate, reason)
  129. # Taken from
  130. # http://bitbucket.org/runeh/snippets/src/tip/missing_modules.py
  131. @contextmanager
  132. def mask_modules(*modnames):
  133. """Ban some modules from being importable inside the context
  134. For example:
  135. >>> with missing_modules("sys"):
  136. ... try:
  137. ... import sys
  138. ... except ImportError:
  139. ... print "sys not found"
  140. sys not found
  141. >>> import sys
  142. >>> sys.version
  143. (2, 5, 2, 'final', 0)
  144. """
  145. realimport = __builtin__.__import__
  146. def myimp(name, *args, **kwargs):
  147. if name in modnames:
  148. raise ImportError("No module named %s" % name)
  149. else:
  150. return realimport(name, *args, **kwargs)
  151. __builtin__.__import__ = myimp
  152. yield True
  153. __builtin__.__import__ = realimport
  154. @contextmanager
  155. def override_stdouts():
  156. """Override `sys.stdout` and `sys.stderr` with `StringIO`."""
  157. prev_out, prev_err = sys.stdout, sys.stderr
  158. mystdout, mystderr = StringIO(), StringIO()
  159. sys.stdout = sys.__stdout__ = mystdout
  160. sys.stderr = sys.__stderr__ = mystderr
  161. yield mystdout, mystderr
  162. sys.stdout = sys.__stdout__ = prev_out
  163. sys.stderr = sys.__stderr__ = prev_err