utils.py 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210
  1. from __future__ import with_statement, generators
  2. import os
  3. import sys
  4. import __builtin__
  5. from StringIO import StringIO
  6. from functools import wraps
  7. class GeneratorContextManager(object):
  8. def __init__(self, gen):
  9. self.gen = gen
  10. def __enter__(self):
  11. try:
  12. return self.gen.next()
  13. except StopIteration:
  14. raise RuntimeError("generator didn't yield")
  15. def __exit__(self, type, value, traceback):
  16. if type is None:
  17. try:
  18. self.gen.next()
  19. except StopIteration:
  20. return
  21. else:
  22. raise RuntimeError("generator didn't stop")
  23. else:
  24. try:
  25. self.gen.throw(type, value, traceback)
  26. raise RuntimeError("generator didn't stop after throw()")
  27. except StopIteration:
  28. return True
  29. except:
  30. if sys.exc_info()[1] is not value:
  31. raise
  32. def fallback_contextmanager(fun):
  33. def helper(*args, **kwds):
  34. return GeneratorContextManager(fun(*args, **kwds))
  35. return helper
  36. def execute_context(context, fun):
  37. val = context.__enter__()
  38. exc_info = (None, None, None)
  39. retval = None
  40. try:
  41. retval = fun(val)
  42. except:
  43. exc_info = sys.exc_info()
  44. context.__exit__(*exc_info)
  45. return retval
  46. try:
  47. from contextlib import contextmanager
  48. except ImportError:
  49. contextmanager = fallback_contextmanager
  50. from celery.utils import noop
  51. @contextmanager
  52. def eager_tasks():
  53. from celery import conf
  54. prev = conf.ALWAYS_EAGER
  55. conf.ALWAYS_EAGER = True
  56. yield True
  57. conf.ALWAYS_EAGER = prev
  58. def with_environ(env_name, env_value):
  59. def _envpatched(fun):
  60. @wraps(fun)
  61. def _patch_environ(*args, **kwargs):
  62. prev_val = os.environ.get(env_name)
  63. os.environ[env_name] = env_value
  64. try:
  65. return fun(*args, **kwargs)
  66. finally:
  67. if prev_val is not None:
  68. os.environ[env_name] = prev_val
  69. return _patch_environ
  70. return _envpatched
  71. def sleepdeprived(fun):
  72. @wraps(fun)
  73. def _sleepdeprived(*args, **kwargs):
  74. import time
  75. old_sleep = time.sleep
  76. time.sleep = noop
  77. try:
  78. return fun(*args, **kwargs)
  79. finally:
  80. time.sleep = old_sleep
  81. return _sleepdeprived
  82. def skip_if_environ(env_var_name):
  83. def _wrap_test(fun):
  84. @wraps(fun)
  85. def _skips_if_environ(*args, **kwargs):
  86. if os.environ.get(env_var_name):
  87. sys.stderr.write("SKIP %s: %s set\n" % (
  88. fun.__name__, env_var_name))
  89. return
  90. return fun(*args, **kwargs)
  91. return _skips_if_environ
  92. return _wrap_test
  93. def skip_if_quick(fun):
  94. return skip_if_environ("QUICKTEST")(fun)
  95. def _skip_test(reason, sign):
  96. def _wrap_test(fun):
  97. @wraps(fun)
  98. def _skipped_test(*args, **kwargs):
  99. sys.stderr.write("%s: %s " % (sign, reason))
  100. return _skipped_test
  101. return _wrap_test
  102. def todo(reason):
  103. """TODO test decorator."""
  104. return _skip_test(reason, "TODO")
  105. def skip(reason):
  106. """Skip test decorator."""
  107. return _skip_test(reason, "SKIP")
  108. def skip_if(predicate, reason):
  109. """Skip test if predicate is ``True``."""
  110. def _inner(fun):
  111. return predicate and skip(reason)(fun) or fun
  112. return _inner
  113. def skip_unless(predicate, reason):
  114. """Skip test if predicate is ``False``."""
  115. return skip_if(not predicate, reason)
  116. # Taken from
  117. # http://bitbucket.org/runeh/snippets/src/tip/missing_modules.py
  118. @contextmanager
  119. def mask_modules(*modnames):
  120. """Ban some modules from being importable inside the context
  121. For example:
  122. >>> with missing_modules("sys"):
  123. ... try:
  124. ... import sys
  125. ... except ImportError:
  126. ... print "sys not found"
  127. sys not found
  128. >>> import sys
  129. >>> sys.version
  130. (2, 5, 2, 'final', 0)
  131. """
  132. realimport = __builtin__.__import__
  133. def myimp(name, *args, **kwargs):
  134. if name in modnames:
  135. raise ImportError("No module named %s" % name)
  136. else:
  137. return realimport(name, *args, **kwargs)
  138. __builtin__.__import__ = myimp
  139. yield True
  140. __builtin__.__import__ = realimport
  141. @contextmanager
  142. def override_stdouts():
  143. """Override ``sys.stdout`` and ``sys.stderr`` with ``StringIO``."""
  144. prev_out, prev_err = sys.stdout, sys.stderr
  145. mystdout, mystderr = StringIO(), StringIO()
  146. sys.stdout = sys.__stdout__ = mystdout
  147. sys.stderr = sys.__stderr__ = mystderr
  148. yield mystdout, mystderr
  149. sys.stdout = sys.__stdout__ = prev_out
  150. sys.stderr = sys.__stderr__ = prev_err