case.py 21 KB


  1. from __future__ import absolute_import
  2. try:
  3. import unittest # noqa
  4. unittest.skip
  5. from unittest.util import safe_repr, unorderable_list_difference
  6. except AttributeError:
  7. import unittest2 as unittest # noqa
  8. from unittest2.util import safe_repr, unorderable_list_difference # noqa
  9. import importlib
  10. import inspect
  11. import logging
  12. import os
  13. import platform
  14. import re
  15. import sys
  16. import threading
  17. import time
  18. import warnings
  19. from contextlib import contextmanager
  20. from copy import deepcopy
  21. from datetime import datetime, timedelta
  22. from functools import partial, wraps
  23. from types import ModuleType
  24. try:
  25. from unittest import mock
  26. except ImportError:
  27. import mock # noqa
  28. from nose import SkipTest
  29. from kombu import Queue
  30. from kombu.log import NullHandler
  31. from kombu.utils import nested, symbol_by_name
  32. from celery import Celery
  33. from celery.app import current_app
  34. from celery.backends.cache import CacheBackend, DummyClient
  35. from celery.five import (
  36. WhateverIO, builtins, items, reraise,
  37. string_t, values, open_fqdn,
  38. )
  39. from celery.utils.functional import noop
  40. from celery.utils.imports import qualname
  41. __all__ = [
  42. 'Case', 'AppCase', 'Mock', 'MagicMock',
  43. 'patch', 'call', 'sentinel', 'skip_unless_module',
  44. 'wrap_logger', 'with_environ', 'sleepdeprived',
  45. 'skip_if_environ', 'todo', 'skip', 'skip_if',
  46. 'skip_unless', 'mask_modules', 'override_stdouts', 'mock_module',
  47. 'replace_module_value', 'sys_platform', 'reset_modules',
  48. 'patch_modules', 'mock_context', 'mock_open', 'patch_many',
  49. 'assert_signal_called', 'skip_if_pypy',
  50. 'skip_if_jython', 'body_from_sig', 'restore_logging',
  51. ]
  52. patch = mock.patch
  53. call = mock.call
  54. sentinel = mock.sentinel
  55. MagicMock = mock.MagicMock
  56. CASE_REDEFINES_SETUP = """\
  57. {name} (subclass of AppCase) redefines private "setUp", should be: "setup"\
  58. """
  59. CASE_REDEFINES_TEARDOWN = """\
  60. {name} (subclass of AppCase) redefines private "tearDown", \
  61. should be: "teardown"\
  62. """
  63. CASE_LOG_REDIRECT_EFFECT = """\
  64. Test {0} did not disable LoggingProxy for {1}\
  65. """
  66. CASE_LOG_LEVEL_EFFECT = """\
  67. Test {0} Modified the level of the root logger\
  68. """
  69. CASE_LOG_HANDLER_EFFECT = """\
  70. Test {0} Modified handlers for the root logger\
  71. """
  72. CELERY_TEST_CONFIG = {
  73. #: Don't want log output when running suite.
  74. 'CELERYD_HIJACK_ROOT_LOGGER': False,
  75. 'CELERY_SEND_TASK_ERROR_EMAILS': False,
  76. 'CELERY_DEFAULT_QUEUE': 'testcelery',
  77. 'CELERY_DEFAULT_EXCHANGE': 'testcelery',
  78. 'CELERY_DEFAULT_ROUTING_KEY': 'testcelery',
  79. 'CELERY_QUEUES': (
  80. Queue('testcelery', routing_key='testcelery'),
  81. ),
  82. 'CELERY_ENABLE_UTC': True,
  83. 'CELERY_TIMEZONE': 'UTC',
  84. 'CELERYD_LOG_COLOR': False,
  85. # Mongo results tests (only executed if installed and running)
  86. 'CELERY_MONGODB_BACKEND_SETTINGS': {
  87. 'host': os.environ.get('MONGO_HOST') or 'localhost',
  88. 'port': os.environ.get('MONGO_PORT') or 27017,
  89. 'database': os.environ.get('MONGO_DB') or 'celery_unittests',
  90. 'taskmeta_collection': (os.environ.get('MONGO_TASKMETA_COLLECTION')
  91. or 'taskmeta_collection'),
  92. 'user': os.environ.get('MONGO_USER'),
  93. 'password': os.environ.get('MONGO_PASSWORD'),
  94. }
  95. }
  96. class Trap(object):
  97. def __getattr__(self, name):
  98. raise RuntimeError('Test depends on current_app')
  99. class UnitLogging(symbol_by_name(Celery.log_cls)):
  100. def __init__(self, *args, **kwargs):
  101. super(UnitLogging, self).__init__(*args, **kwargs)
  102. self.already_setup = True
  103. def UnitApp(name=None, broker=None, backend=None,
  104. set_as_current=False, log=UnitLogging, **kwargs):
  105. app = Celery(name or 'celery.tests',
  106. broker=broker or 'memory://',
  107. backend=backend or 'cache+memory://',
  108. set_as_current=set_as_current,
  109. log=log,
  110. **kwargs)
  111. app.add_defaults(deepcopy(CELERY_TEST_CONFIG))
  112. return app
  113. class Mock(mock.Mock):
  114. def __init__(self, *args, **kwargs):
  115. attrs = kwargs.pop('attrs', None) or {}
  116. super(Mock, self).__init__(*args, **kwargs)
  117. for attr_name, attr_value in items(attrs):
  118. setattr(self, attr_name, attr_value)
  119. def skip_unless_module(module):
  120. def _inner(fun):
  121. @wraps(fun)
  122. def __inner(*args, **kwargs):
  123. try:
  124. importlib.import_module(module)
  125. except ImportError:
  126. raise SkipTest('Does not have %s' % (module, ))
  127. return fun(*args, **kwargs)
  128. return __inner
  129. return _inner
  130. # -- adds assertWarns from recent unittest2, not in Python 2.7.
  131. class _AssertRaisesBaseContext(object):
  132. def __init__(self, expected, test_case, callable_obj=None,
  133. expected_regex=None):
  134. self.expected = expected
  135. self.failureException = test_case.failureException
  136. self.obj_name = None
  137. if isinstance(expected_regex, string_t):
  138. expected_regex = re.compile(expected_regex)
  139. self.expected_regex = expected_regex
  140. class _AssertWarnsContext(_AssertRaisesBaseContext):
  141. """A context manager used to implement TestCase.assertWarns* methods."""
  142. def __enter__(self):
  143. # The __warningregistry__'s need to be in a pristine state for tests
  144. # to work properly.
  145. warnings.resetwarnings()
  146. for v in list(values(sys.modules)):
  147. if getattr(v, '__warningregistry__', None):
  148. v.__warningregistry__ = {}
  149. self.warnings_manager = warnings.catch_warnings(record=True)
  150. self.warnings = self.warnings_manager.__enter__()
  151. warnings.simplefilter('always', self.expected)
  152. return self
  153. def __exit__(self, exc_type, exc_value, tb):
  154. self.warnings_manager.__exit__(exc_type, exc_value, tb)
  155. if exc_type is not None:
  156. # let unexpected exceptions pass through
  157. return
  158. try:
  159. exc_name = self.expected.__name__
  160. except AttributeError:
  161. exc_name = str(self.expected)
  162. first_matching = None
  163. for m in self.warnings:
  164. w = m.message
  165. if not isinstance(w, self.expected):
  166. continue
  167. if first_matching is None:
  168. first_matching = w
  169. if (self.expected_regex is not None and
  170. not self.expected_regex.search(str(w))):
  171. continue
  172. # store warning for later retrieval
  173. self.warning = w
  174. self.filename = m.filename
  175. self.lineno = m.lineno
  176. return
  177. # Now we simply try to choose a helpful failure message
  178. if first_matching is not None:
  179. raise self.failureException(
  180. '%r does not match %r' % (
  181. self.expected_regex.pattern, str(first_matching)))
  182. if self.obj_name:
  183. raise self.failureException(
  184. '%s not triggered by %s' % (exc_name, self.obj_name))
  185. else:
  186. raise self.failureException('%s not triggered' % exc_name)
  187. class Case(unittest.TestCase):
  188. def assertWarns(self, expected_warning):
  189. return _AssertWarnsContext(expected_warning, self, None)
  190. def assertWarnsRegex(self, expected_warning, expected_regex):
  191. return _AssertWarnsContext(expected_warning, self,
  192. None, expected_regex)
  193. def assertDictContainsSubset(self, expected, actual, msg=None):
  194. missing, mismatched = [], []
  195. for key, value in items(expected):
  196. if key not in actual:
  197. missing.append(key)
  198. elif value != actual[key]:
  199. mismatched.append('%s, expected: %s, actual: %s' % (
  200. safe_repr(key), safe_repr(value),
  201. safe_repr(actual[key])))
  202. if not (missing or mismatched):
  203. return
  204. standard_msg = ''
  205. if missing:
  206. standard_msg = 'Missing: %s' % ','.join(map(safe_repr, missing))
  207. if mismatched:
  208. if standard_msg:
  209. standard_msg += '; '
  210. standard_msg += 'Mismatched values: %s' % (
  211. ','.join(mismatched))
  212. self.fail(self._formatMessage(msg, standard_msg))
  213. def assertItemsEqual(self, expected_seq, actual_seq, msg=None):
  214. missing = unexpected = None
  215. try:
  216. expected = sorted(expected_seq)
  217. actual = sorted(actual_seq)
  218. except TypeError:
  219. # Unsortable items (example: set(), complex(), ...)
  220. expected = list(expected_seq)
  221. actual = list(actual_seq)
  222. missing, unexpected = unorderable_list_difference(
  223. expected, actual)
  224. else:
  225. return self.assertSequenceEqual(expected, actual, msg=msg)
  226. errors = []
  227. if missing:
  228. errors.append(
  229. 'Expected, but missing:\n %s' % (safe_repr(missing), )
  230. )
  231. if unexpected:
  232. errors.append(
  233. 'Unexpected, but present:\n %s' % (safe_repr(unexpected), )
  234. )
  235. if errors:
  236. standardMsg = '\n'.join(errors)
  237. self.fail(self._formatMessage(msg, standardMsg))
  238. def depends_on_current_app(fun):
  239. if inspect.isclass(fun):
  240. fun.contained = False
  241. else:
  242. @wraps(fun)
  243. def __inner(self, *args, **kwargs):
  244. self.app.set_current()
  245. return fun(self, *args, **kwargs)
  246. return __inner
  247. class AppCase(Case):
  248. contained = True
  249. def __init__(self, *args, **kwargs):
  250. super(AppCase, self).__init__(*args, **kwargs)
  251. if self.__class__.__dict__.get('setUp'):
  252. raise RuntimeError(
  253. CASE_REDEFINES_SETUP.format(name=qualname(self)),
  254. )
  255. if self.__class__.__dict__.get('tearDown'):
  256. raise RuntimeError(
  257. CASE_REDEFINES_TEARDOWN.format(name=qualname(self)),
  258. )
  259. def Celery(self, *args, **kwargs):
  260. return UnitApp(*args, **kwargs)
  261. def setUp(self):
  262. self._threads_at_setup = list(threading.enumerate())
  263. from celery import _state
  264. from celery import result
  265. result.task_join_will_block = \
  266. _state.task_join_will_block = lambda: False
  267. self._current_app = current_app()
  268. self._default_app = _state.default_app
  269. trap = Trap()
  270. _state.set_default_app(trap)
  271. _state._tls.current_app = trap
  272. self.app = self.Celery(set_as_current=False)
  273. if not self.contained:
  274. self.app.set_current()
  275. root = logging.getLogger()
  276. self.__rootlevel = root.level
  277. self.__roothandlers = root.handlers
  278. _state._set_task_join_will_block(False)
  279. try:
  280. self.setup()
  281. except:
  282. self._teardown_app()
  283. raise
  284. def _teardown_app(self):
  285. from celery.utils.log import LoggingProxy
  286. assert sys.stdout
  287. assert sys.stderr
  288. assert sys.__stdout__
  289. assert sys.__stderr__
  290. this = self._get_test_name()
  291. if isinstance(sys.stdout, LoggingProxy) or \
  292. isinstance(sys.__stdout__, LoggingProxy):
  293. raise RuntimeError(CASE_LOG_REDIRECT_EFFECT.format(this, 'stdout'))
  294. if isinstance(sys.stderr, LoggingProxy) or \
  295. isinstance(sys.__stderr__, LoggingProxy):
  296. raise RuntimeError(CASE_LOG_REDIRECT_EFFECT.format(this, 'stderr'))
  297. backend = self.app.__dict__.get('backend')
  298. if backend is not None:
  299. if isinstance(backend, CacheBackend):
  300. if isinstance(backend.client, DummyClient):
  301. backend.client.cache.clear()
  302. backend._cache.clear()
  303. from celery._state import (
  304. _tls, set_default_app, _set_task_join_will_block,
  305. )
  306. _set_task_join_will_block(False)
  307. set_default_app(self._default_app)
  308. _tls.current_app = self._current_app
  309. if self.app is not self._current_app:
  310. self.app.close()
  311. self.app = None
  312. self.assertEqual(
  313. self._threads_at_setup, list(threading.enumerate()),
  314. )
  315. def _get_test_name(self):
  316. return '.'.join([self.__class__.__name__, self._testMethodName])
  317. def tearDown(self):
  318. try:
  319. self.teardown()
  320. finally:
  321. self._teardown_app()
  322. self.assert_no_logging_side_effect()
  323. def assert_no_logging_side_effect(self):
  324. this = self._get_test_name()
  325. root = logging.getLogger()
  326. if root.level != self.__rootlevel:
  327. raise RuntimeError(CASE_LOG_LEVEL_EFFECT.format(this))
  328. if root.handlers != self.__roothandlers:
  329. raise RuntimeError(CASE_LOG_HANDLER_EFFECT.format(this))
  330. def setup(self):
  331. pass
  332. def teardown(self):
  333. pass
  334. def get_handlers(logger):
  335. return [h for h in logger.handlers if not isinstance(h, NullHandler)]
  336. @contextmanager
  337. def wrap_logger(logger, loglevel=logging.ERROR):
  338. old_handlers = get_handlers(logger)
  339. sio = WhateverIO()
  340. siohandler = logging.StreamHandler(sio)
  341. logger.handlers = [siohandler]
  342. try:
  343. yield sio
  344. finally:
  345. logger.handlers = old_handlers
  346. def with_environ(env_name, env_value):
  347. def _envpatched(fun):
  348. @wraps(fun)
  349. def _patch_environ(*args, **kwargs):
  350. prev_val = os.environ.get(env_name)
  351. os.environ[env_name] = env_value
  352. try:
  353. return fun(*args, **kwargs)
  354. finally:
  355. os.environ[env_name] = prev_val or ''
  356. return _patch_environ
  357. return _envpatched
  358. def sleepdeprived(module=time):
  359. def _sleepdeprived(fun):
  360. @wraps(fun)
  361. def __sleepdeprived(*args, **kwargs):
  362. old_sleep = module.sleep
  363. module.sleep = noop
  364. try:
  365. return fun(*args, **kwargs)
  366. finally:
  367. module.sleep = old_sleep
  368. return __sleepdeprived
  369. return _sleepdeprived
  370. def skip_if_environ(env_var_name):
  371. def _wrap_test(fun):
  372. @wraps(fun)
  373. def _skips_if_environ(*args, **kwargs):
  374. if os.environ.get(env_var_name):
  375. raise SkipTest('SKIP %s: %s set\n' % (
  376. fun.__name__, env_var_name))
  377. return fun(*args, **kwargs)
  378. return _skips_if_environ
  379. return _wrap_test
  380. def _skip_test(reason, sign):
  381. def _wrap_test(fun):
  382. @wraps(fun)
  383. def _skipped_test(*args, **kwargs):
  384. raise SkipTest('%s: %s' % (sign, reason))
  385. return _skipped_test
  386. return _wrap_test
  387. def todo(reason):
  388. """TODO test decorator."""
  389. return _skip_test(reason, 'TODO')
  390. def skip(reason):
  391. """Skip test decorator."""
  392. return _skip_test(reason, 'SKIP')
  393. def skip_if(predicate, reason):
  394. """Skip test if predicate is :const:`True`."""
  395. def _inner(fun):
  396. return predicate and skip(reason)(fun) or fun
  397. return _inner
  398. def skip_unless(predicate, reason):
  399. """Skip test if predicate is :const:`False`."""
  400. return skip_if(not predicate, reason)
  401. # Taken from
  402. # http://bitbucket.org/runeh/snippets/src/tip/missing_modules.py
  403. @contextmanager
  404. def mask_modules(*modnames):
  405. """Ban some modules from being importable inside the context
  406. For example:
  407. >>> with missing_modules('sys'):
  408. ... try:
  409. ... import sys
  410. ... except ImportError:
  411. ... print 'sys not found'
  412. sys not found
  413. >>> import sys
  414. >>> sys.version
  415. (2, 5, 2, 'final', 0)
  416. """
  417. realimport = builtins.__import__
  418. def myimp(name, *args, **kwargs):
  419. if name in modnames:
  420. raise ImportError('No module named %s' % name)
  421. else:
  422. return realimport(name, *args, **kwargs)
  423. builtins.__import__ = myimp
  424. try:
  425. yield True
  426. finally:
  427. builtins.__import__ = realimport
  428. @contextmanager
  429. def override_stdouts():
  430. """Override `sys.stdout` and `sys.stderr` with `WhateverIO`."""
  431. prev_out, prev_err = sys.stdout, sys.stderr
  432. mystdout, mystderr = WhateverIO(), WhateverIO()
  433. sys.stdout = sys.__stdout__ = mystdout
  434. sys.stderr = sys.__stderr__ = mystderr
  435. try:
  436. yield mystdout, mystderr
  437. finally:
  438. sys.stdout = sys.__stdout__ = prev_out
  439. sys.stderr = sys.__stderr__ = prev_err
  440. def _old_patch(module, name, mocked):
  441. module = importlib.import_module(module)
  442. def _patch(fun):
  443. @wraps(fun)
  444. def __patched(*args, **kwargs):
  445. prev = getattr(module, name)
  446. setattr(module, name, mocked)
  447. try:
  448. return fun(*args, **kwargs)
  449. finally:
  450. setattr(module, name, prev)
  451. return __patched
  452. return _patch
  453. @contextmanager
  454. def replace_module_value(module, name, value=None):
  455. has_prev = hasattr(module, name)
  456. prev = getattr(module, name, None)
  457. if value:
  458. setattr(module, name, value)
  459. else:
  460. try:
  461. delattr(module, name)
  462. except AttributeError:
  463. pass
  464. try:
  465. yield
  466. finally:
  467. if prev is not None:
  468. setattr(sys, name, prev)
  469. if not has_prev:
  470. try:
  471. delattr(module, name)
  472. except AttributeError:
  473. pass
  474. pypy_version = partial(
  475. replace_module_value, sys, 'pypy_version_info',
  476. )
  477. platform_pyimp = partial(
  478. replace_module_value, platform, 'python_implementation',
  479. )
  480. @contextmanager
  481. def sys_platform(value):
  482. prev, sys.platform = sys.platform, value
  483. try:
  484. yield
  485. finally:
  486. sys.platform = prev
  487. @contextmanager
  488. def reset_modules(*modules):
  489. prev = dict((k, sys.modules.pop(k)) for k in modules if k in sys.modules)
  490. try:
  491. yield
  492. finally:
  493. sys.modules.update(prev)
  494. @contextmanager
  495. def patch_modules(*modules):
  496. prev = {}
  497. for mod in modules:
  498. prev[mod] = sys.modules.get(mod)
  499. sys.modules[mod] = ModuleType(mod)
  500. try:
  501. yield
  502. finally:
  503. for name, mod in items(prev):
  504. if mod is None:
  505. sys.modules.pop(name, None)
  506. else:
  507. sys.modules[name] = mod
  508. @contextmanager
  509. def mock_module(*names):
  510. prev = {}
  511. class MockModule(ModuleType):
  512. def __getattr__(self, attr):
  513. setattr(self, attr, Mock())
  514. return ModuleType.__getattribute__(self, attr)
  515. mods = []
  516. for name in names:
  517. try:
  518. prev[name] = sys.modules[name]
  519. except KeyError:
  520. pass
  521. mod = sys.modules[name] = MockModule(name)
  522. mods.append(mod)
  523. try:
  524. yield mods
  525. finally:
  526. for name in names:
  527. try:
  528. sys.modules[name] = prev[name]
  529. except KeyError:
  530. try:
  531. del(sys.modules[name])
  532. except KeyError:
  533. pass
  534. @contextmanager
  535. def mock_context(mock, typ=Mock):
  536. context = mock.return_value = Mock()
  537. context.__enter__ = typ()
  538. context.__exit__ = typ()
  539. def on_exit(*x):
  540. if x[0]:
  541. reraise(x[0], x[1], x[2])
  542. context.__exit__.side_effect = on_exit
  543. context.__enter__.return_value = context
  544. try:
  545. yield context
  546. finally:
  547. context.reset()
  548. @contextmanager
  549. def mock_open(typ=WhateverIO, side_effect=None):
  550. with patch(open_fqdn) as open_:
  551. with mock_context(open_) as context:
  552. if side_effect is not None:
  553. context.__enter__.side_effect = side_effect
  554. val = context.__enter__.return_value = typ()
  555. val.__exit__ = Mock()
  556. yield val
  557. def patch_many(*targets):
  558. return nested(*[patch(target) for target in targets])
  559. @contextmanager
  560. def assert_signal_called(signal, **expected):
  561. handler = Mock()
  562. call_handler = partial(handler)
  563. signal.connect(call_handler)
  564. try:
  565. yield handler
  566. finally:
  567. signal.disconnect(call_handler)
  568. handler.assert_called_with(signal=signal, **expected)
  569. def skip_if_pypy(fun):
  570. @wraps(fun)
  571. def _inner(*args, **kwargs):
  572. if getattr(sys, 'pypy_version_info', None):
  573. raise SkipTest('does not work on PyPy')
  574. return fun(*args, **kwargs)
  575. return _inner
  576. def skip_if_jython(fun):
  577. @wraps(fun)
  578. def _inner(*args, **kwargs):
  579. if sys.platform.startswith('java'):
  580. raise SkipTest('does not work on Jython')
  581. return fun(*args, **kwargs)
  582. return _inner
  583. def body_from_sig(app, sig, utc=True):
  584. sig.freeze()
  585. callbacks = sig.options.pop('link', None)
  586. errbacks = sig.options.pop('link_error', None)
  587. countdown = sig.options.pop('countdown', None)
  588. if countdown:
  589. eta = app.now() + timedelta(seconds=countdown)
  590. else:
  591. eta = sig.options.pop('eta', None)
  592. if eta and isinstance(eta, datetime):
  593. eta = eta.isoformat()
  594. expires = sig.options.pop('expires', None)
  595. if expires and isinstance(expires, int):
  596. expires = app.now() + timedelta(seconds=expires)
  597. if expires and isinstance(expires, datetime):
  598. expires = expires.isoformat()
  599. return {
  600. 'task': sig.task,
  601. 'id': sig.id,
  602. 'args': sig.args,
  603. 'kwargs': sig.kwargs,
  604. 'callbacks': [dict(s) for s in callbacks] if callbacks else None,
  605. 'errbacks': [dict(s) for s in errbacks] if errbacks else None,
  606. 'eta': eta,
  607. 'utc': utc,
  608. 'expires': expires,
  609. }
  610. @contextmanager
  611. def restore_logging():
  612. outs = sys.stdout, sys.stderr, sys.__stdout__, sys.__stderr__
  613. root = logging.getLogger()
  614. level = root.level
  615. handlers = root.handlers
  616. try:
  617. yield
  618. finally:
  619. sys.stdout, sys.stderr, sys.__stdout__, sys.__stderr__ = outs
  620. root.level = level
  621. root.handlers[:] = handlers