case.py 26 KB


  1. from __future__ import absolute_import
  2. try:
  3. import unittest # noqa
  4. unittest.skip
  5. from unittest.util import safe_repr, unorderable_list_difference
  6. except AttributeError:
  7. import unittest2 as unittest # noqa
  8. from unittest2.util import safe_repr, unorderable_list_difference # noqa
  9. import importlib
  10. import inspect
  11. import logging
  12. import numbers
  13. import os
  14. import platform
  15. import re
  16. import sys
  17. import threading
  18. import time
  19. import types
  20. import warnings
  21. from contextlib import contextmanager
  22. from copy import deepcopy
  23. from datetime import datetime, timedelta
  24. from functools import partial, wraps
  25. from types import ModuleType
  26. try:
  27. from unittest import mock
  28. except ImportError:
  29. import mock # noqa
  30. from nose import SkipTest
  31. from kombu import Queue
  32. from kombu.log import NullHandler
  33. from kombu.utils import nested, symbol_by_name
  34. from celery import Celery
  35. from celery.app import current_app
  36. from celery.backends.cache import CacheBackend, DummyClient
  37. from celery.exceptions import CDeprecationWarning, CPendingDeprecationWarning
  38. from celery.five import (
  39. WhateverIO, builtins, items, reraise,
  40. string_t, values, open_fqdn,
  41. )
  42. from celery.utils.functional import noop
  43. from celery.utils.imports import qualname
  44. __all__ = [
  45. 'Case', 'AppCase', 'Mock', 'MagicMock', 'ANY', 'TaskMessage',
  46. 'patch', 'call', 'sentinel', 'skip_unless_module',
  47. 'wrap_logger', 'with_environ', 'sleepdeprived',
  48. 'skip_if_environ', 'todo', 'skip', 'skip_if',
  49. 'skip_unless', 'mask_modules', 'override_stdouts', 'mock_module',
  50. 'replace_module_value', 'sys_platform', 'reset_modules',
  51. 'patch_modules', 'mock_context', 'mock_open', 'patch_many',
  52. 'assert_signal_called', 'skip_if_pypy',
  53. 'skip_if_jython', 'task_message_from_sig', 'restore_logging',
  54. ]
  55. patch = mock.patch
  56. call = mock.call
  57. sentinel = mock.sentinel
  58. MagicMock = mock.MagicMock
  59. ANY = mock.ANY
  60. PY3 = sys.version_info[0] == 3
  61. CASE_REDEFINES_SETUP = """\
  62. {name} (subclass of AppCase) redefines private "setUp", should be: "setup"\
  63. """
  64. CASE_REDEFINES_TEARDOWN = """\
  65. {name} (subclass of AppCase) redefines private "tearDown", \
  66. should be: "teardown"\
  67. """
  68. CASE_LOG_REDIRECT_EFFECT = """\
  69. Test {0} did not disable LoggingProxy for {1}\
  70. """
  71. CASE_LOG_LEVEL_EFFECT = """\
  72. Test {0} Modified the level of the root logger\
  73. """
  74. CASE_LOG_HANDLER_EFFECT = """\
  75. Test {0} Modified handlers for the root logger\
  76. """
  77. CELERY_TEST_CONFIG = {
  78. #: Don't want log output when running suite.
  79. 'worker_hijack_root_logger': False,
  80. 'worker_log_color': False,
  81. 'task_send_error_emails': False,
  82. 'task_default_queue': 'testcelery',
  83. 'task_default_exchange': 'testcelery',
  84. 'task_default_routing_key': 'testcelery',
  85. 'task_queues': (
  86. Queue('testcelery', routing_key='testcelery'),
  87. ),
  88. 'accept_content': ('json', 'pickle'),
  89. 'enable_utc': True,
  90. 'timezone': 'UTC',
  91. # Mongo results tests (only executed if installed and running)
  92. 'mongodb_backend_settings': {
  93. 'host': os.environ.get('MONGO_HOST') or 'localhost',
  94. 'port': os.environ.get('MONGO_PORT') or 27017,
  95. 'database': os.environ.get('MONGO_DB') or 'celery_unittests',
  96. 'taskmeta_collection': (os.environ.get('MONGO_TASKMETA_COLLECTION') or
  97. 'taskmeta_collection'),
  98. 'user': os.environ.get('MONGO_USER'),
  99. 'password': os.environ.get('MONGO_PASSWORD'),
  100. }
  101. }
  102. class Trap(object):
  103. def __getattr__(self, name):
  104. raise RuntimeError('Test depends on current_app')
  105. class UnitLogging(symbol_by_name(Celery.log_cls)):
  106. def __init__(self, *args, **kwargs):
  107. super(UnitLogging, self).__init__(*args, **kwargs)
  108. self.already_setup = True
  109. def UnitApp(name=None, set_as_current=False, log=UnitLogging,
  110. broker='memory://', backend='cache+memory://', **kwargs):
  111. app = Celery(name or 'celery.tests',
  112. set_as_current=set_as_current,
  113. log=log, broker=broker, backend=backend,
  114. **kwargs)
  115. app.add_defaults(deepcopy(CELERY_TEST_CONFIG))
  116. return app
  117. class Mock(mock.Mock):
  118. def __init__(self, *args, **kwargs):
  119. attrs = kwargs.pop('attrs', None) or {}
  120. super(Mock, self).__init__(*args, **kwargs)
  121. for attr_name, attr_value in items(attrs):
  122. setattr(self, attr_name, attr_value)
  123. class _ContextMock(Mock):
  124. """Dummy class implementing __enter__ and __exit__
  125. as the with statement requires these to be implemented
  126. in the class, not just the instance."""
  127. def __enter__(self):
  128. return self
  129. def __exit__(self, *exc_info):
  130. pass
  131. def ContextMock(*args, **kwargs):
  132. obj = _ContextMock(*args, **kwargs)
  133. obj.attach_mock(_ContextMock(), '__enter__')
  134. obj.attach_mock(_ContextMock(), '__exit__')
  135. obj.__enter__.return_value = obj
  136. # if __exit__ return a value the exception is ignored,
  137. # so it must return None here.
  138. obj.__exit__.return_value = None
  139. return obj
  140. def _bind(f, o):
  141. @wraps(f)
  142. def bound_meth(*fargs, **fkwargs):
  143. return f(o, *fargs, **fkwargs)
  144. return bound_meth
  145. if PY3: # pragma: no cover
  146. def _get_class_fun(meth):
  147. return meth
  148. else:
  149. def _get_class_fun(meth):
  150. return meth.__func__
  151. class MockCallbacks(object):
  152. def __new__(cls, *args, **kwargs):
  153. r = Mock(name=cls.__name__)
  154. _get_class_fun(cls.__init__)(r, *args, **kwargs)
  155. for key, value in items(vars(cls)):
  156. if key not in ('__dict__', '__weakref__', '__new__', '__init__'):
  157. if inspect.ismethod(value) or inspect.isfunction(value):
  158. r.__getattr__(key).side_effect = _bind(value, r)
  159. else:
  160. r.__setattr__(key, value)
  161. return r
  162. def skip_unless_module(module):
  163. def _inner(fun):
  164. @wraps(fun)
  165. def __inner(*args, **kwargs):
  166. try:
  167. importlib.import_module(module)
  168. except ImportError:
  169. raise SkipTest('Does not have %s' % (module,))
  170. return fun(*args, **kwargs)
  171. return __inner
  172. return _inner
  173. # -- adds assertWarns from recent unittest2, not in Python 2.7.
  174. class _AssertRaisesBaseContext(object):
  175. def __init__(self, expected, test_case, callable_obj=None,
  176. expected_regex=None):
  177. self.expected = expected
  178. self.failureException = test_case.failureException
  179. self.obj_name = None
  180. if isinstance(expected_regex, string_t):
  181. expected_regex = re.compile(expected_regex)
  182. self.expected_regex = expected_regex
  183. def _is_magic_module(m):
  184. # some libraries create custom module types that are lazily
  185. # lodaded, e.g. Django installs some modules in sys.modules that
  186. # will load _tkinter and other shit when touched.
  187. # pyflakes refuses to accept 'noqa' for this isinstance.
  188. cls, modtype = type(m), types.ModuleType
  189. try:
  190. variables = vars(cls)
  191. except TypeError:
  192. return True
  193. else:
  194. return (cls is not modtype and (
  195. '__getattr__' in variables or
  196. '__getattribute__' in variables))
  197. class _AssertWarnsContext(_AssertRaisesBaseContext):
  198. """A context manager used to implement TestCase.assertWarns* methods."""
  199. def __enter__(self):
  200. # The __warningregistry__'s need to be in a pristine state for tests
  201. # to work properly.
  202. warnings.resetwarnings()
  203. for v in list(values(sys.modules)):
  204. # do not evaluate Django moved modules and other lazily
  205. # initialized modules.
  206. if v and not _is_magic_module(v):
  207. # use raw __getattribute__ to protect even better from
  208. # lazily loaded modules
  209. try:
  210. object.__getattribute__(v, '__warningregistry__')
  211. except AttributeError:
  212. pass
  213. else:
  214. object.__setattr__(v, '__warningregistry__', {})
  215. self.warnings_manager = warnings.catch_warnings(record=True)
  216. self.warnings = self.warnings_manager.__enter__()
  217. warnings.simplefilter('always', self.expected)
  218. return self
  219. def __exit__(self, exc_type, exc_value, tb):
  220. self.warnings_manager.__exit__(exc_type, exc_value, tb)
  221. if exc_type is not None:
  222. # let unexpected exceptions pass through
  223. return
  224. try:
  225. exc_name = self.expected.__name__
  226. except AttributeError:
  227. exc_name = str(self.expected)
  228. first_matching = None
  229. for m in self.warnings:
  230. w = m.message
  231. if not isinstance(w, self.expected):
  232. continue
  233. if first_matching is None:
  234. first_matching = w
  235. if (self.expected_regex is not None and
  236. not self.expected_regex.search(str(w))):
  237. continue
  238. # store warning for later retrieval
  239. self.warning = w
  240. self.filename = m.filename
  241. self.lineno = m.lineno
  242. return
  243. # Now we simply try to choose a helpful failure message
  244. if first_matching is not None:
  245. raise self.failureException(
  246. '%r does not match %r' % (
  247. self.expected_regex.pattern, str(first_matching)))
  248. if self.obj_name:
  249. raise self.failureException(
  250. '%s not triggered by %s' % (exc_name, self.obj_name))
  251. else:
  252. raise self.failureException('%s not triggered' % exc_name)
  253. def alive_threads():
  254. return [thread for thread in threading.enumerate() if thread.is_alive()]
  255. class Case(unittest.TestCase):
  256. def assertWarns(self, expected_warning):
  257. return _AssertWarnsContext(expected_warning, self, None)
  258. def assertWarnsRegex(self, expected_warning, expected_regex):
  259. return _AssertWarnsContext(expected_warning, self,
  260. None, expected_regex)
  261. @contextmanager
  262. def assertDeprecated(self):
  263. with self.assertWarnsRegex(CDeprecationWarning,
  264. r'scheduled for removal'):
  265. yield
  266. @contextmanager
  267. def assertPendingDeprecation(self):
  268. with self.assertWarnsRegex(CPendingDeprecationWarning,
  269. r'scheduled for deprecation'):
  270. yield
  271. def assertDictContainsSubset(self, expected, actual, msg=None):
  272. missing, mismatched = [], []
  273. for key, value in items(expected):
  274. if key not in actual:
  275. missing.append(key)
  276. elif value != actual[key]:
  277. mismatched.append('%s, expected: %s, actual: %s' % (
  278. safe_repr(key), safe_repr(value),
  279. safe_repr(actual[key])))
  280. if not (missing or mismatched):
  281. return
  282. standard_msg = ''
  283. if missing:
  284. standard_msg = 'Missing: %s' % ','.join(map(safe_repr, missing))
  285. if mismatched:
  286. if standard_msg:
  287. standard_msg += '; '
  288. standard_msg += 'Mismatched values: %s' % (
  289. ','.join(mismatched))
  290. self.fail(self._formatMessage(msg, standard_msg))
  291. def assertItemsEqual(self, expected_seq, actual_seq, msg=None):
  292. missing = unexpected = None
  293. try:
  294. expected = sorted(expected_seq)
  295. actual = sorted(actual_seq)
  296. except TypeError:
  297. # Unsortable items (example: set(), complex(), ...)
  298. expected = list(expected_seq)
  299. actual = list(actual_seq)
  300. missing, unexpected = unorderable_list_difference(
  301. expected, actual)
  302. else:
  303. return self.assertSequenceEqual(expected, actual, msg=msg)
  304. errors = []
  305. if missing:
  306. errors.append(
  307. 'Expected, but missing:\n %s' % (safe_repr(missing),)
  308. )
  309. if unexpected:
  310. errors.append(
  311. 'Unexpected, but present:\n %s' % (safe_repr(unexpected),)
  312. )
  313. if errors:
  314. standardMsg = '\n'.join(errors)
  315. self.fail(self._formatMessage(msg, standardMsg))
  316. def depends_on_current_app(fun):
  317. if inspect.isclass(fun):
  318. fun.contained = False
  319. else:
  320. @wraps(fun)
  321. def __inner(self, *args, **kwargs):
  322. self.app.set_current()
  323. return fun(self, *args, **kwargs)
  324. return __inner
  325. class AppCase(Case):
  326. contained = True
  327. _threads_at_startup = [None]
  328. def __init__(self, *args, **kwargs):
  329. super(AppCase, self).__init__(*args, **kwargs)
  330. if self.__class__.__dict__.get('setUp'):
  331. raise RuntimeError(
  332. CASE_REDEFINES_SETUP.format(name=qualname(self)),
  333. )
  334. if self.__class__.__dict__.get('tearDown'):
  335. raise RuntimeError(
  336. CASE_REDEFINES_TEARDOWN.format(name=qualname(self)),
  337. )
  338. def Celery(self, *args, **kwargs):
  339. return UnitApp(*args, **kwargs)
  340. def threads_at_startup(self):
  341. if self._threads_at_startup[0] is None:
  342. self._threads_at_startup[0] = alive_threads()
  343. return self._threads_at_startup[0]
  344. def setUp(self):
  345. self._threads_at_setup = self.threads_at_startup()
  346. from celery import _state
  347. from celery import result
  348. result.task_join_will_block = \
  349. _state.task_join_will_block = lambda: False
  350. self._current_app = current_app()
  351. self._default_app = _state.default_app
  352. trap = Trap()
  353. self._prev_tls = _state._tls
  354. _state.set_default_app(trap)
  355. class NonTLS(object):
  356. current_app = trap
  357. _state._tls = NonTLS()
  358. self.app = self.Celery(set_as_current=False)
  359. if not self.contained:
  360. self.app.set_current()
  361. root = logging.getLogger()
  362. self.__rootlevel = root.level
  363. self.__roothandlers = root.handlers
  364. _state._set_task_join_will_block(False)
  365. try:
  366. self.setup()
  367. except:
  368. self._teardown_app()
  369. raise
  370. def _teardown_app(self):
  371. from celery.utils.log import LoggingProxy
  372. assert sys.stdout
  373. assert sys.stderr
  374. assert sys.__stdout__
  375. assert sys.__stderr__
  376. this = self._get_test_name()
  377. if isinstance(sys.stdout, (LoggingProxy, Mock)) or \
  378. isinstance(sys.__stdout__, (LoggingProxy, Mock)):
  379. raise RuntimeError(CASE_LOG_REDIRECT_EFFECT.format(this, 'stdout'))
  380. if isinstance(sys.stderr, (LoggingProxy, Mock)) or \
  381. isinstance(sys.__stderr__, (LoggingProxy, Mock)):
  382. raise RuntimeError(CASE_LOG_REDIRECT_EFFECT.format(this, 'stderr'))
  383. backend = self.app.__dict__.get('backend')
  384. if backend is not None:
  385. if isinstance(backend, CacheBackend):
  386. if isinstance(backend.client, DummyClient):
  387. backend.client.cache.clear()
  388. backend._cache.clear()
  389. from celery import _state
  390. _state._set_task_join_will_block(False)
  391. _state.set_default_app(self._default_app)
  392. _state._tls = self._prev_tls
  393. _state._tls.current_app = self._current_app
  394. if self.app is not self._current_app:
  395. self.app.close()
  396. self.app = None
  397. self.assertEqual(self._threads_at_setup, alive_threads())
  398. # Make sure no test left the shutdown flags enabled.
  399. from celery.worker import state as worker_state
  400. # check for EX_OK
  401. self.assertIsNot(worker_state.should_stop, False)
  402. self.assertIsNot(worker_state.should_terminate, False)
  403. # check for other true values
  404. self.assertFalse(worker_state.should_stop)
  405. self.assertFalse(worker_state.should_terminate)
  406. def _get_test_name(self):
  407. return '.'.join([self.__class__.__name__, self._testMethodName])
  408. def tearDown(self):
  409. try:
  410. self.teardown()
  411. finally:
  412. self._teardown_app()
  413. self.assert_no_logging_side_effect()
  414. def assert_no_logging_side_effect(self):
  415. this = self._get_test_name()
  416. root = logging.getLogger()
  417. if root.level != self.__rootlevel:
  418. raise RuntimeError(CASE_LOG_LEVEL_EFFECT.format(this))
  419. if root.handlers != self.__roothandlers:
  420. raise RuntimeError(CASE_LOG_HANDLER_EFFECT.format(this))
  421. def setup(self):
  422. pass
  423. def teardown(self):
  424. pass
  425. def get_handlers(logger):
  426. return [h for h in logger.handlers if not isinstance(h, NullHandler)]
  427. @contextmanager
  428. def wrap_logger(logger, loglevel=logging.ERROR):
  429. old_handlers = get_handlers(logger)
  430. sio = WhateverIO()
  431. siohandler = logging.StreamHandler(sio)
  432. logger.handlers = [siohandler]
  433. try:
  434. yield sio
  435. finally:
  436. logger.handlers = old_handlers
  437. def with_environ(env_name, env_value):
  438. def _envpatched(fun):
  439. @wraps(fun)
  440. def _patch_environ(*args, **kwargs):
  441. prev_val = os.environ.get(env_name)
  442. os.environ[env_name] = env_value
  443. try:
  444. return fun(*args, **kwargs)
  445. finally:
  446. os.environ[env_name] = prev_val or ''
  447. return _patch_environ
  448. return _envpatched
  449. def sleepdeprived(module=time):
  450. def _sleepdeprived(fun):
  451. @wraps(fun)
  452. def __sleepdeprived(*args, **kwargs):
  453. old_sleep = module.sleep
  454. module.sleep = noop
  455. try:
  456. return fun(*args, **kwargs)
  457. finally:
  458. module.sleep = old_sleep
  459. return __sleepdeprived
  460. return _sleepdeprived
  461. def skip_if_environ(env_var_name):
  462. def _wrap_test(fun):
  463. @wraps(fun)
  464. def _skips_if_environ(*args, **kwargs):
  465. if os.environ.get(env_var_name):
  466. raise SkipTest('SKIP %s: %s set\n' % (
  467. fun.__name__, env_var_name))
  468. return fun(*args, **kwargs)
  469. return _skips_if_environ
  470. return _wrap_test
  471. def _skip_test(reason, sign):
  472. def _wrap_test(fun):
  473. @wraps(fun)
  474. def _skipped_test(*args, **kwargs):
  475. raise SkipTest('%s: %s' % (sign, reason))
  476. return _skipped_test
  477. return _wrap_test
  478. def todo(reason):
  479. """TODO test decorator."""
  480. return _skip_test(reason, 'TODO')
  481. def skip(reason):
  482. """Skip test decorator."""
  483. return _skip_test(reason, 'SKIP')
  484. def skip_if(predicate, reason):
  485. """Skip test if predicate is :const:`True`."""
  486. def _inner(fun):
  487. return predicate and skip(reason)(fun) or fun
  488. return _inner
  489. def skip_unless(predicate, reason):
  490. """Skip test if predicate is :const:`False`."""
  491. return skip_if(not predicate, reason)
  492. # Taken from
  493. # http://bitbucket.org/runeh/snippets/src/tip/missing_modules.py
  494. @contextmanager
  495. def mask_modules(*modnames):
  496. """Ban some modules from being importable inside the context
  497. For example:
  498. >>> with mask_modules('sys'):
  499. ... try:
  500. ... import sys
  501. ... except ImportError:
  502. ... print('sys not found')
  503. sys not found
  504. >>> import sys # noqa
  505. >>> sys.version
  506. (2, 5, 2, 'final', 0)
  507. """
  508. realimport = builtins.__import__
  509. def myimp(name, *args, **kwargs):
  510. if name in modnames:
  511. raise ImportError('No module named %s' % name)
  512. else:
  513. return realimport(name, *args, **kwargs)
  514. builtins.__import__ = myimp
  515. try:
  516. yield True
  517. finally:
  518. builtins.__import__ = realimport
  519. @contextmanager
  520. def override_stdouts():
  521. """Override `sys.stdout` and `sys.stderr` with `WhateverIO`."""
  522. prev_out, prev_err = sys.stdout, sys.stderr
  523. mystdout, mystderr = WhateverIO(), WhateverIO()
  524. sys.stdout = sys.__stdout__ = mystdout
  525. sys.stderr = sys.__stderr__ = mystderr
  526. try:
  527. yield mystdout, mystderr
  528. finally:
  529. sys.stdout = sys.__stdout__ = prev_out
  530. sys.stderr = sys.__stderr__ = prev_err
  531. def _old_patch(module, name, mocked):
  532. module = importlib.import_module(module)
  533. def _patch(fun):
  534. @wraps(fun)
  535. def __patched(*args, **kwargs):
  536. prev = getattr(module, name)
  537. setattr(module, name, mocked)
  538. try:
  539. return fun(*args, **kwargs)
  540. finally:
  541. setattr(module, name, prev)
  542. return __patched
  543. return _patch
  544. @contextmanager
  545. def replace_module_value(module, name, value=None):
  546. has_prev = hasattr(module, name)
  547. prev = getattr(module, name, None)
  548. if value:
  549. setattr(module, name, value)
  550. else:
  551. try:
  552. delattr(module, name)
  553. except AttributeError:
  554. pass
  555. try:
  556. yield
  557. finally:
  558. if prev is not None:
  559. setattr(module, name, prev)
  560. if not has_prev:
  561. try:
  562. delattr(module, name)
  563. except AttributeError:
  564. pass
  565. pypy_version = partial(
  566. replace_module_value, sys, 'pypy_version_info',
  567. )
  568. platform_pyimp = partial(
  569. replace_module_value, platform, 'python_implementation',
  570. )
  571. @contextmanager
  572. def sys_platform(value):
  573. prev, sys.platform = sys.platform, value
  574. try:
  575. yield
  576. finally:
  577. sys.platform = prev
  578. @contextmanager
  579. def reset_modules(*modules):
  580. prev = {k: sys.modules.pop(k) for k in modules if k in sys.modules}
  581. try:
  582. yield
  583. finally:
  584. sys.modules.update(prev)
  585. @contextmanager
  586. def patch_modules(*modules):
  587. prev = {}
  588. for mod in modules:
  589. prev[mod] = sys.modules.get(mod)
  590. sys.modules[mod] = ModuleType(mod)
  591. try:
  592. yield
  593. finally:
  594. for name, mod in items(prev):
  595. if mod is None:
  596. sys.modules.pop(name, None)
  597. else:
  598. sys.modules[name] = mod
  599. @contextmanager
  600. def mock_module(*names):
  601. prev = {}
  602. class MockModule(ModuleType):
  603. def __getattr__(self, attr):
  604. setattr(self, attr, Mock())
  605. return ModuleType.__getattribute__(self, attr)
  606. mods = []
  607. for name in names:
  608. try:
  609. prev[name] = sys.modules[name]
  610. except KeyError:
  611. pass
  612. mod = sys.modules[name] = MockModule(name)
  613. mods.append(mod)
  614. try:
  615. yield mods
  616. finally:
  617. for name in names:
  618. try:
  619. sys.modules[name] = prev[name]
  620. except KeyError:
  621. try:
  622. del(sys.modules[name])
  623. except KeyError:
  624. pass
  625. @contextmanager
  626. def mock_context(mock, typ=Mock):
  627. context = mock.return_value = Mock()
  628. context.__enter__ = typ()
  629. context.__exit__ = typ()
  630. def on_exit(*x):
  631. if x[0]:
  632. reraise(x[0], x[1], x[2])
  633. context.__exit__.side_effect = on_exit
  634. context.__enter__.return_value = context
  635. try:
  636. yield context
  637. finally:
  638. context.reset()
  639. @contextmanager
  640. def mock_open(typ=WhateverIO, side_effect=None):
  641. with patch(open_fqdn) as open_:
  642. with mock_context(open_) as context:
  643. if side_effect is not None:
  644. context.__enter__.side_effect = side_effect
  645. val = context.__enter__.return_value = typ()
  646. val.__exit__ = Mock()
  647. yield val
  648. def patch_many(*targets):
  649. return nested(*[patch(target) for target in targets])
  650. @contextmanager
  651. def assert_signal_called(signal, **expected):
  652. handler = Mock()
  653. call_handler = partial(handler)
  654. signal.connect(call_handler)
  655. try:
  656. yield handler
  657. finally:
  658. signal.disconnect(call_handler)
  659. handler.assert_called_with(signal=signal, **expected)
  660. def skip_if_pypy(fun):
  661. @wraps(fun)
  662. def _inner(*args, **kwargs):
  663. if getattr(sys, 'pypy_version_info', None):
  664. raise SkipTest('does not work on PyPy')
  665. return fun(*args, **kwargs)
  666. return _inner
  667. def skip_if_jython(fun):
  668. @wraps(fun)
  669. def _inner(*args, **kwargs):
  670. if sys.platform.startswith('java'):
  671. raise SkipTest('does not work on Jython')
  672. return fun(*args, **kwargs)
  673. return _inner
  674. def task_message_from_sig(app, sig, utc=True):
  675. sig.freeze()
  676. callbacks = sig.options.pop('link', None)
  677. errbacks = sig.options.pop('link_error', None)
  678. countdown = sig.options.pop('countdown', None)
  679. if countdown:
  680. eta = app.now() + timedelta(seconds=countdown)
  681. else:
  682. eta = sig.options.pop('eta', None)
  683. if eta and isinstance(eta, datetime):
  684. eta = eta.isoformat()
  685. expires = sig.options.pop('expires', None)
  686. if expires and isinstance(expires, numbers.Real):
  687. expires = app.now() + timedelta(seconds=expires)
  688. if expires and isinstance(expires, datetime):
  689. expires = expires.isoformat()
  690. return TaskMessage(
  691. sig.task, id=sig.id, args=sig.args,
  692. kwargs=sig.kwargs,
  693. callbacks=[dict(s) for s in callbacks] if callbacks else None,
  694. errbacks=[dict(s) for s in errbacks] if errbacks else None,
  695. eta=eta,
  696. expires=expires,
  697. )
  698. @contextmanager
  699. def restore_logging():
  700. outs = sys.stdout, sys.stderr, sys.__stdout__, sys.__stderr__
  701. root = logging.getLogger()
  702. level = root.level
  703. handlers = root.handlers
  704. try:
  705. yield
  706. finally:
  707. sys.stdout, sys.stderr, sys.__stdout__, sys.__stderr__ = outs
  708. root.level = level
  709. root.handlers[:] = handlers
  710. def TaskMessage(name, id=None, args=(), kwargs={}, callbacks=None,
  711. errbacks=None, chain=None, **options):
  712. from celery import uuid
  713. from kombu.serialization import dumps
  714. id = id or uuid()
  715. message = Mock(name='TaskMessage-{0}'.format(id))
  716. message.headers = {
  717. 'id': id,
  718. 'task': name,
  719. }
  720. embed = {'callbacks': callbacks, 'errbacks': errbacks, 'chain': chain}
  721. message.headers.update(options)
  722. message.content_type, message.content_encoding, message.body = dumps(
  723. (args, kwargs, embed), serializer='json',
  724. )
  725. message.payload = (args, kwargs, embed)
  726. return message