case.py 28 KB


  1. from __future__ import absolute_import
  2. try:
  3. import unittest # noqa
  4. unittest.skip
  5. from unittest.util import safe_repr, unorderable_list_difference
  6. except AttributeError:
  7. import unittest2 as unittest # noqa
  8. from unittest2.util import safe_repr, unorderable_list_difference # noqa
  9. import importlib
  10. import inspect
  11. import logging
  12. import numbers
  13. import os
  14. import platform
  15. import re
  16. import sys
  17. import threading
  18. import time
  19. import types
  20. import warnings
  21. from contextlib import contextmanager
  22. from copy import deepcopy
  23. from datetime import datetime, timedelta
  24. from functools import partial, wraps
  25. from types import ModuleType
  26. try:
  27. from unittest import mock
  28. except ImportError:
  29. import mock # noqa
  30. from nose import SkipTest
  31. from kombu import Queue
  32. from kombu.utils import symbol_by_name
  33. from celery import Celery
  34. from celery.app import current_app
  35. from celery.backends.cache import CacheBackend, DummyClient
  36. from celery.exceptions import CDeprecationWarning, CPendingDeprecationWarning
  37. from celery.five import (
  38. WhateverIO, builtins, items, reraise,
  39. string_t, values, open_fqdn,
  40. )
  41. from celery.utils.functional import noop
  42. from celery.utils.imports import qualname
  43. __all__ = [
  44. 'Case', 'AppCase', 'Mock', 'MagicMock', 'ANY', 'TaskMessage',
  45. 'patch', 'call', 'sentinel', 'skip_unless_module',
  46. 'wrap_logger', 'with_environ', 'sleepdeprived',
  47. 'skip_if_environ', 'todo', 'skip', 'skip_if',
  48. 'skip_unless', 'mask_modules', 'override_stdouts', 'mock_module',
  49. 'replace_module_value', 'sys_platform', 'reset_modules',
  50. 'patch_modules', 'mock_context', 'mock_open',
  51. 'assert_signal_called', 'skip_if_pypy',
  52. 'skip_if_jython', 'task_message_from_sig', 'restore_logging',
  53. ]
  54. patch = mock.patch
  55. call = mock.call
  56. sentinel = mock.sentinel
  57. MagicMock = mock.MagicMock
  58. ANY = mock.ANY
  59. PY3 = sys.version_info[0] == 3
  60. CASE_REDEFINES_SETUP = """\
  61. {name} (subclass of AppCase) redefines private "setUp", should be: "setup"\
  62. """
  63. CASE_REDEFINES_TEARDOWN = """\
  64. {name} (subclass of AppCase) redefines private "tearDown", \
  65. should be: "teardown"\
  66. """
  67. CASE_LOG_REDIRECT_EFFECT = """\
  68. Test {0} did not disable LoggingProxy for {1}\
  69. """
  70. CASE_LOG_LEVEL_EFFECT = """\
  71. Test {0} Modified the level of the root logger\
  72. """
  73. CASE_LOG_HANDLER_EFFECT = """\
  74. Test {0} Modified handlers for the root logger\
  75. """
  76. CELERY_TEST_CONFIG = {
  77. #: Don't want log output when running suite.
  78. 'worker_hijack_root_logger': False,
  79. 'worker_log_color': False,
  80. 'task_send_error_emails': False,
  81. 'task_default_queue': 'testcelery',
  82. 'task_default_exchange': 'testcelery',
  83. 'task_default_routing_key': 'testcelery',
  84. 'task_queues': (
  85. Queue('testcelery', routing_key='testcelery'),
  86. ),
  87. 'accept_content': ('json', 'pickle'),
  88. 'enable_utc': True,
  89. 'timezone': 'UTC',
  90. # Mongo results tests (only executed if installed and running)
  91. 'mongodb_backend_settings': {
  92. 'host': os.environ.get('MONGO_HOST') or 'localhost',
  93. 'port': os.environ.get('MONGO_PORT') or 27017,
  94. 'database': os.environ.get('MONGO_DB') or 'celery_unittests',
  95. 'taskmeta_collection': (os.environ.get('MONGO_TASKMETA_COLLECTION') or
  96. 'taskmeta_collection'),
  97. 'user': os.environ.get('MONGO_USER'),
  98. 'password': os.environ.get('MONGO_PASSWORD'),
  99. }
  100. }
  101. class Trap(object):
  102. def __getattr__(self, name):
  103. raise RuntimeError('Test depends on current_app')
  104. class UnitLogging(symbol_by_name(Celery.log_cls)):
  105. def __init__(self, *args, **kwargs):
  106. super(UnitLogging, self).__init__(*args, **kwargs)
  107. self.already_setup = True
  108. def UnitApp(name=None, set_as_current=False, log=UnitLogging,
  109. broker='memory://', backend='cache+memory://', **kwargs):
  110. app = Celery(name or 'celery.tests',
  111. set_as_current=set_as_current,
  112. log=log, broker=broker, backend=backend,
  113. **kwargs)
  114. app.add_defaults(deepcopy(CELERY_TEST_CONFIG))
  115. return app
  116. class Mock(mock.Mock):
  117. def __init__(self, *args, **kwargs):
  118. attrs = kwargs.pop('attrs', None) or {}
  119. super(Mock, self).__init__(*args, **kwargs)
  120. for attr_name, attr_value in items(attrs):
  121. setattr(self, attr_name, attr_value)
  122. class _ContextMock(Mock):
  123. """Dummy class implementing __enter__ and __exit__
  124. as the with statement requires these to be implemented
  125. in the class, not just the instance."""
  126. def __enter__(self):
  127. return self
  128. def __exit__(self, *exc_info):
  129. pass
  130. def ContextMock(*args, **kwargs):
  131. obj = _ContextMock(*args, **kwargs)
  132. obj.attach_mock(_ContextMock(), '__enter__')
  133. obj.attach_mock(_ContextMock(), '__exit__')
  134. obj.__enter__.return_value = obj
  135. # if __exit__ return a value the exception is ignored,
  136. # so it must return None here.
  137. obj.__exit__.return_value = None
  138. return obj
  139. def _bind(f, o):
  140. @wraps(f)
  141. def bound_meth(*fargs, **fkwargs):
  142. return f(o, *fargs, **fkwargs)
  143. return bound_meth
  144. if PY3: # pragma: no cover
  145. def _get_class_fun(meth):
  146. return meth
  147. else:
  148. def _get_class_fun(meth):
  149. return meth.__func__
  150. class MockCallbacks(object):
  151. def __new__(cls, *args, **kwargs):
  152. r = Mock(name=cls.__name__)
  153. _get_class_fun(cls.__init__)(r, *args, **kwargs)
  154. for key, value in items(vars(cls)):
  155. if key not in ('__dict__', '__weakref__', '__new__', '__init__'):
  156. if inspect.ismethod(value) or inspect.isfunction(value):
  157. r.__getattr__(key).side_effect = _bind(value, r)
  158. else:
  159. r.__setattr__(key, value)
  160. return r
  161. def skip_unless_module(module):
  162. def _inner(fun):
  163. @wraps(fun)
  164. def __inner(*args, **kwargs):
  165. try:
  166. importlib.import_module(module)
  167. except ImportError:
  168. raise SkipTest('Does not have %s' % (module,))
  169. return fun(*args, **kwargs)
  170. return __inner
  171. return _inner
  172. # -- adds assertWarns from recent unittest2, not in Python 2.7.
  173. class _AssertRaisesBaseContext(object):
  174. def __init__(self, expected, test_case, callable_obj=None,
  175. expected_regex=None):
  176. self.expected = expected
  177. self.failureException = test_case.failureException
  178. self.obj_name = None
  179. if isinstance(expected_regex, string_t):
  180. expected_regex = re.compile(expected_regex)
  181. self.expected_regex = expected_regex
  182. def _is_magic_module(m):
  183. # some libraries create custom module types that are lazily
  184. # lodaded, e.g. Django installs some modules in sys.modules that
  185. # will load _tkinter and other shit when touched.
  186. # pyflakes refuses to accept 'noqa' for this isinstance.
  187. cls, modtype = type(m), types.ModuleType
  188. try:
  189. variables = vars(cls)
  190. except TypeError:
  191. return True
  192. else:
  193. return (cls is not modtype and (
  194. '__getattr__' in variables or
  195. '__getattribute__' in variables))
  196. class _AssertWarnsContext(_AssertRaisesBaseContext):
  197. """A context manager used to implement TestCase.assertWarns* methods."""
  198. def __enter__(self):
  199. # The __warningregistry__'s need to be in a pristine state for tests
  200. # to work properly.
  201. warnings.resetwarnings()
  202. for v in list(values(sys.modules)):
  203. # do not evaluate Django moved modules and other lazily
  204. # initialized modules.
  205. if v and not _is_magic_module(v):
  206. # use raw __getattribute__ to protect even better from
  207. # lazily loaded modules
  208. try:
  209. object.__getattribute__(v, '__warningregistry__')
  210. except AttributeError:
  211. pass
  212. else:
  213. object.__setattr__(v, '__warningregistry__', {})
  214. self.warnings_manager = warnings.catch_warnings(record=True)
  215. self.warnings = self.warnings_manager.__enter__()
  216. warnings.simplefilter('always', self.expected)
  217. return self
  218. def __exit__(self, exc_type, exc_value, tb):
  219. self.warnings_manager.__exit__(exc_type, exc_value, tb)
  220. if exc_type is not None:
  221. # let unexpected exceptions pass through
  222. return
  223. try:
  224. exc_name = self.expected.__name__
  225. except AttributeError:
  226. exc_name = str(self.expected)
  227. first_matching = None
  228. for m in self.warnings:
  229. w = m.message
  230. if not isinstance(w, self.expected):
  231. continue
  232. if first_matching is None:
  233. first_matching = w
  234. if (self.expected_regex is not None and
  235. not self.expected_regex.search(str(w))):
  236. continue
  237. # store warning for later retrieval
  238. self.warning = w
  239. self.filename = m.filename
  240. self.lineno = m.lineno
  241. return
  242. # Now we simply try to choose a helpful failure message
  243. if first_matching is not None:
  244. raise self.failureException(
  245. '%r does not match %r' % (
  246. self.expected_regex.pattern, str(first_matching)))
  247. if self.obj_name:
  248. raise self.failureException(
  249. '%s not triggered by %s' % (exc_name, self.obj_name))
  250. else:
  251. raise self.failureException('%s not triggered' % exc_name)
  252. def alive_threads():
  253. return [thread for thread in threading.enumerate() if thread.is_alive()]
  254. class Case(unittest.TestCase):
  255. def patch(self, *path, **options):
  256. manager = patch('.'.join(path), **options)
  257. patched = manager.start()
  258. self.addCleanup(manager.stop)
  259. return patched
  260. def mock_modules(self, *mods):
  261. modules = []
  262. for mod in mods:
  263. mod = mod.split('.')
  264. modules.extend(reversed([
  265. '.'.join(mod[:-i] if i else mod) for i in range(len(mod))
  266. ]))
  267. modules = sorted(set(modules))
  268. return self.wrap_context(mock_module(*modules))
  269. def on_nth_call_do(self, mock, side_effect, n=1):
  270. def on_call(*args, **kwargs):
  271. if mock.call_count >= n:
  272. mock.side_effect = side_effect
  273. return mock.return_value
  274. mock.side_effect = on_call
  275. return mock
  276. def on_nth_call_return(self, mock, retval, n=1):
  277. def on_call(*args, **kwargs):
  278. if mock.call_count >= n:
  279. mock.return_value = retval
  280. return mock.return_value
  281. mock.side_effect = on_call
  282. return mock
  283. def mask_modules(self, *modules):
  284. self.wrap_context(mask_modules(*modules))
  285. def wrap_context(self, context):
  286. ret = context.__enter__()
  287. self.addCleanup(partial(context.__exit__, None, None, None))
  288. return ret
  289. def mock_environ(self, env_name, env_value):
  290. return self.wrap_context(mock_environ(env_name, env_value))
  291. def assertWarns(self, expected_warning):
  292. return _AssertWarnsContext(expected_warning, self, None)
  293. def assertWarnsRegex(self, expected_warning, expected_regex):
  294. return _AssertWarnsContext(expected_warning, self,
  295. None, expected_regex)
  296. @contextmanager
  297. def assertDeprecated(self):
  298. with self.assertWarnsRegex(CDeprecationWarning,
  299. r'scheduled for removal'):
  300. yield
  301. @contextmanager
  302. def assertPendingDeprecation(self):
  303. with self.assertWarnsRegex(CPendingDeprecationWarning,
  304. r'scheduled for deprecation'):
  305. yield
  306. def assertDictContainsSubset(self, expected, actual, msg=None):
  307. missing, mismatched = [], []
  308. for key, value in items(expected):
  309. if key not in actual:
  310. missing.append(key)
  311. elif value != actual[key]:
  312. mismatched.append('%s, expected: %s, actual: %s' % (
  313. safe_repr(key), safe_repr(value),
  314. safe_repr(actual[key])))
  315. if not (missing or mismatched):
  316. return
  317. standard_msg = ''
  318. if missing:
  319. standard_msg = 'Missing: %s' % ','.join(map(safe_repr, missing))
  320. if mismatched:
  321. if standard_msg:
  322. standard_msg += '; '
  323. standard_msg += 'Mismatched values: %s' % (
  324. ','.join(mismatched))
  325. self.fail(self._formatMessage(msg, standard_msg))
  326. def assertItemsEqual(self, expected_seq, actual_seq, msg=None):
  327. missing = unexpected = None
  328. try:
  329. expected = sorted(expected_seq)
  330. actual = sorted(actual_seq)
  331. except TypeError:
  332. # Unsortable items (example: set(), complex(), ...)
  333. expected = list(expected_seq)
  334. actual = list(actual_seq)
  335. missing, unexpected = unorderable_list_difference(
  336. expected, actual)
  337. else:
  338. return self.assertSequenceEqual(expected, actual, msg=msg)
  339. errors = []
  340. if missing:
  341. errors.append(
  342. 'Expected, but missing:\n %s' % (safe_repr(missing),)
  343. )
  344. if unexpected:
  345. errors.append(
  346. 'Unexpected, but present:\n %s' % (safe_repr(unexpected),)
  347. )
  348. if errors:
  349. standardMsg = '\n'.join(errors)
  350. self.fail(self._formatMessage(msg, standardMsg))
  351. def depends_on_current_app(fun):
  352. if inspect.isclass(fun):
  353. fun.contained = False
  354. else:
  355. @wraps(fun)
  356. def __inner(self, *args, **kwargs):
  357. self.app.set_current()
  358. return fun(self, *args, **kwargs)
  359. return __inner
  360. class AppCase(Case):
  361. contained = True
  362. _threads_at_startup = [None]
  363. def __init__(self, *args, **kwargs):
  364. super(AppCase, self).__init__(*args, **kwargs)
  365. if self.__class__.__dict__.get('setUp'):
  366. raise RuntimeError(
  367. CASE_REDEFINES_SETUP.format(name=qualname(self)),
  368. )
  369. if self.__class__.__dict__.get('tearDown'):
  370. raise RuntimeError(
  371. CASE_REDEFINES_TEARDOWN.format(name=qualname(self)),
  372. )
  373. def Celery(self, *args, **kwargs):
  374. return UnitApp(*args, **kwargs)
  375. def threads_at_startup(self):
  376. if self._threads_at_startup[0] is None:
  377. self._threads_at_startup[0] = alive_threads()
  378. return self._threads_at_startup[0]
  379. def setUp(self):
  380. self._threads_at_setup = self.threads_at_startup()
  381. from celery import _state
  382. from celery import result
  383. self._prev_res_join_block = result.task_join_will_block
  384. self._prev_state_join_block = _state.task_join_will_block
  385. result.task_join_will_block = \
  386. _state.task_join_will_block = lambda: False
  387. self._current_app = current_app()
  388. self._default_app = _state.default_app
  389. trap = Trap()
  390. self._prev_tls = _state._tls
  391. _state.set_default_app(trap)
  392. class NonTLS(object):
  393. current_app = trap
  394. _state._tls = NonTLS()
  395. self.app = self.Celery(set_as_current=False)
  396. if not self.contained:
  397. self.app.set_current()
  398. root = logging.getLogger()
  399. self.__rootlevel = root.level
  400. self.__roothandlers = root.handlers
  401. _state._set_task_join_will_block(False)
  402. try:
  403. self.setup()
  404. except:
  405. self._teardown_app()
  406. raise
  407. def _teardown_app(self):
  408. from celery import _state
  409. from celery import result
  410. from celery.utils.log import LoggingProxy
  411. assert sys.stdout
  412. assert sys.stderr
  413. assert sys.__stdout__
  414. assert sys.__stderr__
  415. this = self._get_test_name()
  416. result.task_join_will_block = self._prev_res_join_block
  417. _state.task_join_will_block = self._prev_state_join_block
  418. if isinstance(sys.stdout, (LoggingProxy, Mock)) or \
  419. isinstance(sys.__stdout__, (LoggingProxy, Mock)):
  420. raise RuntimeError(CASE_LOG_REDIRECT_EFFECT.format(this, 'stdout'))
  421. if isinstance(sys.stderr, (LoggingProxy, Mock)) or \
  422. isinstance(sys.__stderr__, (LoggingProxy, Mock)):
  423. raise RuntimeError(CASE_LOG_REDIRECT_EFFECT.format(this, 'stderr'))
  424. backend = self.app.__dict__.get('backend')
  425. if backend is not None:
  426. if isinstance(backend, CacheBackend):
  427. if isinstance(backend.client, DummyClient):
  428. backend.client.cache.clear()
  429. backend._cache.clear()
  430. from celery import _state
  431. _state._set_task_join_will_block(False)
  432. _state.set_default_app(self._default_app)
  433. _state._tls = self._prev_tls
  434. _state._tls.current_app = self._current_app
  435. if self.app is not self._current_app:
  436. self.app.close()
  437. self.app = None
  438. self.assertEqual(self._threads_at_setup, alive_threads())
  439. # Make sure no test left the shutdown flags enabled.
  440. from celery.worker import state as worker_state
  441. # check for EX_OK
  442. self.assertIsNot(worker_state.should_stop, False)
  443. self.assertIsNot(worker_state.should_terminate, False)
  444. # check for other true values
  445. self.assertFalse(worker_state.should_stop)
  446. self.assertFalse(worker_state.should_terminate)
  447. def _get_test_name(self):
  448. return '.'.join([self.__class__.__name__, self._testMethodName])
  449. def tearDown(self):
  450. try:
  451. self.teardown()
  452. finally:
  453. self._teardown_app()
  454. self.assert_no_logging_side_effect()
  455. def assert_no_logging_side_effect(self):
  456. this = self._get_test_name()
  457. root = logging.getLogger()
  458. if root.level != self.__rootlevel:
  459. raise RuntimeError(CASE_LOG_LEVEL_EFFECT.format(this))
  460. if root.handlers != self.__roothandlers:
  461. raise RuntimeError(CASE_LOG_HANDLER_EFFECT.format(this))
  462. def setup(self):
  463. pass
  464. def teardown(self):
  465. pass
  466. def get_handlers(logger):
  467. return [
  468. h for h in logger.handlers
  469. if not isinstance(h, logging.NullHandler)
  470. ]
  471. @contextmanager
  472. def wrap_logger(logger, loglevel=logging.ERROR):
  473. old_handlers = get_handlers(logger)
  474. sio = WhateverIO()
  475. siohandler = logging.StreamHandler(sio)
  476. logger.handlers = [siohandler]
  477. try:
  478. yield sio
  479. finally:
  480. logger.handlers = old_handlers
  481. @contextmanager
  482. def mock_environ(env_name, env_value):
  483. sentinel = object()
  484. prev_val = os.environ.get(env_name, sentinel)
  485. os.environ[env_name] = env_value
  486. try:
  487. yield env_value
  488. finally:
  489. if prev_val is sentinel:
  490. os.environ.pop(env_name, None)
  491. else:
  492. os.environ[env_name] = prev_val
  493. def with_environ(env_name, env_value):
  494. def _envpatched(fun):
  495. @wraps(fun)
  496. def _patch_environ(*args, **kwargs):
  497. with mock_environ(env_name, env_value):
  498. return fun(*args, **kwargs)
  499. return _patch_environ
  500. return _envpatched
  501. def sleepdeprived(module=time):
  502. def _sleepdeprived(fun):
  503. @wraps(fun)
  504. def __sleepdeprived(*args, **kwargs):
  505. old_sleep = module.sleep
  506. module.sleep = noop
  507. try:
  508. return fun(*args, **kwargs)
  509. finally:
  510. module.sleep = old_sleep
  511. return __sleepdeprived
  512. return _sleepdeprived
  513. def skip_if_environ(env_var_name):
  514. def _wrap_test(fun):
  515. @wraps(fun)
  516. def _skips_if_environ(*args, **kwargs):
  517. if os.environ.get(env_var_name):
  518. raise SkipTest('SKIP %s: %s set\n' % (
  519. fun.__name__, env_var_name))
  520. return fun(*args, **kwargs)
  521. return _skips_if_environ
  522. return _wrap_test
  523. def _skip_test(reason, sign):
  524. def _wrap_test(fun):
  525. @wraps(fun)
  526. def _skipped_test(*args, **kwargs):
  527. raise SkipTest('%s: %s' % (sign, reason))
  528. return _skipped_test
  529. return _wrap_test
  530. def todo(reason):
  531. """TODO test decorator."""
  532. return _skip_test(reason, 'TODO')
  533. def skip(reason):
  534. """Skip test decorator."""
  535. return _skip_test(reason, 'SKIP')
  536. def skip_if(predicate, reason):
  537. """Skip test if predicate is :const:`True`."""
  538. def _inner(fun):
  539. return predicate and skip(reason)(fun) or fun
  540. return _inner
  541. def skip_unless(predicate, reason):
  542. """Skip test if predicate is :const:`False`."""
  543. return skip_if(not predicate, reason)
  544. # Taken from
  545. # http://bitbucket.org/runeh/snippets/src/tip/missing_modules.py
  546. @contextmanager
  547. def mask_modules(*modnames):
  548. """Ban some modules from being importable inside the context
  549. For example:
  550. >>> with mask_modules('sys'):
  551. ... try:
  552. ... import sys
  553. ... except ImportError:
  554. ... print('sys not found')
  555. sys not found
  556. >>> import sys # noqa
  557. >>> sys.version
  558. (2, 5, 2, 'final', 0)
  559. """
  560. realimport = builtins.__import__
  561. def myimp(name, *args, **kwargs):
  562. if name in modnames:
  563. raise ImportError('No module named %s' % name)
  564. else:
  565. return realimport(name, *args, **kwargs)
  566. builtins.__import__ = myimp
  567. try:
  568. yield True
  569. finally:
  570. builtins.__import__ = realimport
  571. @contextmanager
  572. def override_stdouts():
  573. """Override `sys.stdout` and `sys.stderr` with `WhateverIO`."""
  574. prev_out, prev_err = sys.stdout, sys.stderr
  575. prev_rout, prev_rerr = sys.__stdout__, sys.__stderr__
  576. mystdout, mystderr = WhateverIO(), WhateverIO()
  577. sys.stdout = sys.__stdout__ = mystdout
  578. sys.stderr = sys.__stderr__ = mystderr
  579. try:
  580. yield mystdout, mystderr
  581. finally:
  582. sys.stdout = prev_out
  583. sys.stderr = prev_err
  584. sys.__stdout__ = prev_rout
  585. sys.__stderr__ = prev_rerr
  586. def disable_stdouts(fun):
  587. @wraps(fun)
  588. def disable(*args, **kwargs):
  589. with override_stdouts():
  590. return fun(*args, **kwargs)
  591. return disable
  592. def _old_patch(module, name, mocked):
  593. module = importlib.import_module(module)
  594. def _patch(fun):
  595. @wraps(fun)
  596. def __patched(*args, **kwargs):
  597. prev = getattr(module, name)
  598. setattr(module, name, mocked)
  599. try:
  600. return fun(*args, **kwargs)
  601. finally:
  602. setattr(module, name, prev)
  603. return __patched
  604. return _patch
  605. @contextmanager
  606. def replace_module_value(module, name, value=None):
  607. has_prev = hasattr(module, name)
  608. prev = getattr(module, name, None)
  609. if value:
  610. setattr(module, name, value)
  611. else:
  612. try:
  613. delattr(module, name)
  614. except AttributeError:
  615. pass
  616. try:
  617. yield
  618. finally:
  619. if prev is not None:
  620. setattr(module, name, prev)
  621. if not has_prev:
  622. try:
  623. delattr(module, name)
  624. except AttributeError:
  625. pass
  626. pypy_version = partial(
  627. replace_module_value, sys, 'pypy_version_info',
  628. )
  629. platform_pyimp = partial(
  630. replace_module_value, platform, 'python_implementation',
  631. )
  632. @contextmanager
  633. def sys_platform(value):
  634. prev, sys.platform = sys.platform, value
  635. try:
  636. yield
  637. finally:
  638. sys.platform = prev
  639. @contextmanager
  640. def reset_modules(*modules):
  641. prev = {k: sys.modules.pop(k) for k in modules if k in sys.modules}
  642. try:
  643. yield
  644. finally:
  645. sys.modules.update(prev)
  646. @contextmanager
  647. def patch_modules(*modules):
  648. prev = {}
  649. for mod in modules:
  650. prev[mod] = sys.modules.get(mod)
  651. sys.modules[mod] = ModuleType(mod)
  652. try:
  653. yield
  654. finally:
  655. for name, mod in items(prev):
  656. if mod is None:
  657. sys.modules.pop(name, None)
  658. else:
  659. sys.modules[name] = mod
  660. @contextmanager
  661. def mock_module(*names):
  662. prev = {}
  663. class MockModule(ModuleType):
  664. def __getattr__(self, attr):
  665. setattr(self, attr, Mock())
  666. return ModuleType.__getattribute__(self, attr)
  667. mods = []
  668. for name in names:
  669. try:
  670. prev[name] = sys.modules[name]
  671. except KeyError:
  672. pass
  673. mod = sys.modules[name] = MockModule(name)
  674. mods.append(mod)
  675. try:
  676. yield mods
  677. finally:
  678. for name in names:
  679. try:
  680. sys.modules[name] = prev[name]
  681. except KeyError:
  682. try:
  683. del(sys.modules[name])
  684. except KeyError:
  685. pass
  686. @contextmanager
  687. def mock_context(mock, typ=Mock):
  688. context = mock.return_value = Mock()
  689. context.__enter__ = typ()
  690. context.__exit__ = typ()
  691. def on_exit(*x):
  692. if x[0]:
  693. reraise(x[0], x[1], x[2])
  694. context.__exit__.side_effect = on_exit
  695. context.__enter__.return_value = context
  696. try:
  697. yield context
  698. finally:
  699. context.reset()
  700. @contextmanager
  701. def mock_open(typ=WhateverIO, side_effect=None):
  702. with patch(open_fqdn) as open_:
  703. with mock_context(open_) as context:
  704. if side_effect is not None:
  705. context.__enter__.side_effect = side_effect
  706. val = context.__enter__.return_value = typ()
  707. val.__exit__ = Mock()
  708. yield val
  709. @contextmanager
  710. def assert_signal_called(signal, **expected):
  711. handler = Mock()
  712. call_handler = partial(handler)
  713. signal.connect(call_handler)
  714. try:
  715. yield handler
  716. finally:
  717. signal.disconnect(call_handler)
  718. handler.assert_called_with(signal=signal, **expected)
  719. def skip_if_pypy(fun):
  720. @wraps(fun)
  721. def _inner(*args, **kwargs):
  722. if getattr(sys, 'pypy_version_info', None):
  723. raise SkipTest('does not work on PyPy')
  724. return fun(*args, **kwargs)
  725. return _inner
  726. def skip_if_jython(fun):
  727. @wraps(fun)
  728. def _inner(*args, **kwargs):
  729. if sys.platform.startswith('java'):
  730. raise SkipTest('does not work on Jython')
  731. return fun(*args, **kwargs)
  732. return _inner
  733. def TaskMessage(name, id=None, args=(), kwargs={}, callbacks=None,
  734. errbacks=None, chain=None, shadow=None, utc=None, **options):
  735. from celery import uuid
  736. from kombu.serialization import dumps
  737. id = id or uuid()
  738. message = Mock(name='TaskMessage-{0}'.format(id))
  739. message.headers = {
  740. 'id': id,
  741. 'task': name,
  742. 'shadow': shadow,
  743. }
  744. embed = {'callbacks': callbacks, 'errbacks': errbacks, 'chain': chain}
  745. message.headers.update(options)
  746. message.content_type, message.content_encoding, message.body = dumps(
  747. (args, kwargs, embed), serializer='json',
  748. )
  749. message.payload = (args, kwargs, embed)
  750. return message
  751. def TaskMessage1(name, id=None, args=(), kwargs={}, callbacks=None,
  752. errbacks=None, chain=None, **options):
  753. from celery import uuid
  754. from kombu.serialization import dumps
  755. id = id or uuid()
  756. message = Mock(name='TaskMessage-{0}'.format(id))
  757. message.headers = {}
  758. message.payload = {
  759. 'task': name,
  760. 'id': id,
  761. 'args': args,
  762. 'kwargs': kwargs,
  763. 'callbacks': callbacks,
  764. 'errbacks': errbacks,
  765. }
  766. message.payload.update(options)
  767. message.content_type, message.content_encoding, message.body = dumps(
  768. message.payload,
  769. )
  770. return message
  771. def task_message_from_sig(app, sig, utc=True, TaskMessage=TaskMessage):
  772. sig.freeze()
  773. callbacks = sig.options.pop('link', None)
  774. errbacks = sig.options.pop('link_error', None)
  775. countdown = sig.options.pop('countdown', None)
  776. if countdown:
  777. eta = app.now() + timedelta(seconds=countdown)
  778. else:
  779. eta = sig.options.pop('eta', None)
  780. if eta and isinstance(eta, datetime):
  781. eta = eta.isoformat()
  782. expires = sig.options.pop('expires', None)
  783. if expires and isinstance(expires, numbers.Real):
  784. expires = app.now() + timedelta(seconds=expires)
  785. if expires and isinstance(expires, datetime):
  786. expires = expires.isoformat()
  787. return TaskMessage(
  788. sig.task, id=sig.id, args=sig.args,
  789. kwargs=sig.kwargs,
  790. callbacks=[dict(s) for s in callbacks] if callbacks else None,
  791. errbacks=[dict(s) for s in errbacks] if errbacks else None,
  792. eta=eta,
  793. expires=expires,
  794. utc=utc,
  795. **sig.options
  796. )
  797. @contextmanager
  798. def restore_logging():
  799. outs = sys.stdout, sys.stderr, sys.__stdout__, sys.__stderr__
  800. root = logging.getLogger()
  801. level = root.level
  802. handlers = root.handlers
  803. try:
  804. yield
  805. finally:
  806. sys.stdout, sys.stderr, sys.__stdout__, sys.__stderr__ = outs
  807. root.level = level
  808. root.handlers[:] = handlers