case.py 25 KB


  1. from __future__ import absolute_import
  2. try:
  3. import unittest # noqa
  4. unittest.skip
  5. from unittest.util import safe_repr, unorderable_list_difference
  6. except AttributeError:
  7. import unittest2 as unittest # noqa
  8. from unittest2.util import safe_repr, unorderable_list_difference # noqa
  9. import importlib
  10. import inspect
  11. import logging
  12. import numbers
  13. import os
  14. import platform
  15. import re
  16. import sys
  17. import threading
  18. import time
  19. import types
  20. import warnings
  21. from contextlib import contextmanager
  22. from copy import deepcopy
  23. from datetime import datetime, timedelta
  24. from functools import partial, wraps
  25. from types import ModuleType
  26. try:
  27. from unittest import mock
  28. except ImportError:
  29. import mock # noqa
  30. from nose import SkipTest
  31. from kombu import Queue
  32. from kombu.log import NullHandler
  33. from kombu.utils import nested, symbol_by_name
  34. from celery import Celery
  35. from celery.app import current_app
  36. from celery.backends.cache import CacheBackend, DummyClient
  37. from celery.exceptions import CDeprecationWarning, CPendingDeprecationWarning
  38. from celery.five import (
  39. WhateverIO, builtins, items, reraise,
  40. string_t, values, open_fqdn,
  41. )
  42. from celery.utils.functional import noop
  43. from celery.utils.imports import qualname
  44. __all__ = [
  45. 'Case', 'AppCase', 'Mock', 'MagicMock', 'ANY', 'TaskMessage',
  46. 'patch', 'call', 'sentinel', 'skip_unless_module',
  47. 'wrap_logger', 'with_environ', 'sleepdeprived',
  48. 'skip_if_environ', 'todo', 'skip', 'skip_if',
  49. 'skip_unless', 'mask_modules', 'override_stdouts', 'mock_module',
  50. 'replace_module_value', 'sys_platform', 'reset_modules',
  51. 'patch_modules', 'mock_context', 'mock_open', 'patch_many',
  52. 'assert_signal_called', 'skip_if_pypy',
  53. 'skip_if_jython', 'task_message_from_sig', 'restore_logging',
  54. ]
  55. patch = mock.patch
  56. call = mock.call
  57. sentinel = mock.sentinel
  58. MagicMock = mock.MagicMock
  59. ANY = mock.ANY
  60. PY3 = sys.version_info[0] == 3
  61. CASE_REDEFINES_SETUP = """\
  62. {name} (subclass of AppCase) redefines private "setUp", should be: "setup"\
  63. """
  64. CASE_REDEFINES_TEARDOWN = """\
  65. {name} (subclass of AppCase) redefines private "tearDown", \
  66. should be: "teardown"\
  67. """
  68. CASE_LOG_REDIRECT_EFFECT = """\
  69. Test {0} did not disable LoggingProxy for {1}\
  70. """
  71. CASE_LOG_LEVEL_EFFECT = """\
  72. Test {0} Modified the level of the root logger\
  73. """
  74. CASE_LOG_HANDLER_EFFECT = """\
  75. Test {0} Modified handlers for the root logger\
  76. """
  77. CELERY_TEST_CONFIG = {
  78. #: Don't want log output when running suite.
  79. 'CELERYD_HIJACK_ROOT_LOGGER': False,
  80. 'CELERY_SEND_TASK_ERROR_EMAILS': False,
  81. 'CELERY_DEFAULT_QUEUE': 'testcelery',
  82. 'CELERY_DEFAULT_EXCHANGE': 'testcelery',
  83. 'CELERY_DEFAULT_ROUTING_KEY': 'testcelery',
  84. 'CELERY_QUEUES': (
  85. Queue('testcelery', routing_key='testcelery'),
  86. ),
  87. 'CELERY_ENABLE_UTC': True,
  88. 'CELERY_TIMEZONE': 'UTC',
  89. 'CELERYD_LOG_COLOR': False,
  90. # Mongo results tests (only executed if installed and running)
  91. 'CELERY_MONGODB_BACKEND_SETTINGS': {
  92. 'host': os.environ.get('MONGO_HOST') or 'localhost',
  93. 'port': os.environ.get('MONGO_PORT') or 27017,
  94. 'database': os.environ.get('MONGO_DB') or 'celery_unittests',
  95. 'taskmeta_collection': (os.environ.get('MONGO_TASKMETA_COLLECTION')
  96. or 'taskmeta_collection'),
  97. 'user': os.environ.get('MONGO_USER'),
  98. 'password': os.environ.get('MONGO_PASSWORD'),
  99. }
  100. }
  101. class Trap(object):
  102. def __getattr__(self, name):
  103. raise RuntimeError('Test depends on current_app')
  104. class UnitLogging(symbol_by_name(Celery.log_cls)):
  105. def __init__(self, *args, **kwargs):
  106. super(UnitLogging, self).__init__(*args, **kwargs)
  107. self.already_setup = True
  108. def UnitApp(name=None, broker=None, backend=None,
  109. set_as_current=False, log=UnitLogging, **kwargs):
  110. app = Celery(name or 'celery.tests',
  111. broker=broker or 'memory://',
  112. backend=backend or 'cache+memory://',
  113. set_as_current=set_as_current,
  114. log=log,
  115. **kwargs)
  116. app.add_defaults(deepcopy(CELERY_TEST_CONFIG))
  117. return app
  118. class Mock(mock.Mock):
  119. def __init__(self, *args, **kwargs):
  120. attrs = kwargs.pop('attrs', None) or {}
  121. super(Mock, self).__init__(*args, **kwargs)
  122. for attr_name, attr_value in items(attrs):
  123. setattr(self, attr_name, attr_value)
  124. class _ContextMock(Mock):
  125. """Dummy class implementing __enter__ and __exit__
  126. as the with statement requires these to be implemented
  127. in the class, not just the instance."""
  128. def __enter__(self):
  129. pass
  130. def __exit__(self, *exc_info):
  131. pass
  132. def ContextMock(*args, **kwargs):
  133. obj = _ContextMock(*args, **kwargs)
  134. obj.attach_mock(_ContextMock(), '__enter__')
  135. obj.attach_mock(_ContextMock(), '__exit__')
  136. obj.__enter__.return_value = obj
  137. # if __exit__ return a value the exception is ignored,
  138. # so it must return None here.
  139. obj.__exit__.return_value = None
  140. return obj
  141. def _bind(f, o):
  142. @wraps(f)
  143. def bound_meth(*fargs, **fkwargs):
  144. return f(o, *fargs, **fkwargs)
  145. return bound_meth
  146. if PY3: # pragma: no cover
  147. def _get_class_fun(meth):
  148. return meth
  149. else:
  150. def _get_class_fun(meth):
  151. return meth.__func__
  152. class MockCallbacks(object):
  153. def __new__(cls, *args, **kwargs):
  154. r = Mock(name=cls.__name__)
  155. _get_class_fun(cls.__init__)(r, *args, **kwargs)
  156. for key, value in items(vars(cls)):
  157. if key not in ('__dict__', '__weakref__', '__new__', '__init__'):
  158. if inspect.ismethod(value) or inspect.isfunction(value):
  159. r.__getattr__(key).side_effect = _bind(value, r)
  160. else:
  161. r.__setattr__(key, value)
  162. return r
  163. def skip_unless_module(module):
  164. def _inner(fun):
  165. @wraps(fun)
  166. def __inner(*args, **kwargs):
  167. try:
  168. importlib.import_module(module)
  169. except ImportError:
  170. raise SkipTest('Does not have %s' % (module, ))
  171. return fun(*args, **kwargs)
  172. return __inner
  173. return _inner
  174. # -- adds assertWarns from recent unittest2, not in Python 2.7.
  175. class _AssertRaisesBaseContext(object):
  176. def __init__(self, expected, test_case, callable_obj=None,
  177. expected_regex=None):
  178. self.expected = expected
  179. self.failureException = test_case.failureException
  180. self.obj_name = None
  181. if isinstance(expected_regex, string_t):
  182. expected_regex = re.compile(expected_regex)
  183. self.expected_regex = expected_regex
  184. def _is_magic_module(m):
  185. # some libraries create custom module types that are lazily
  186. # lodaded, e.g. Django installs some modules in sys.modules that
  187. # will load _tkinter and other shit when touched.
  188. # pyflakes refuses to accept 'noqa' for this isinstance.
  189. cls, modtype = m.__class__, types.ModuleType
  190. return (cls is not modtype and (
  191. '__getattr__' in vars(m.__class__) or
  192. '__getattribute__' in vars(m.__class__)))
  193. class _AssertWarnsContext(_AssertRaisesBaseContext):
  194. """A context manager used to implement TestCase.assertWarns* methods."""
  195. def __enter__(self):
  196. # The __warningregistry__'s need to be in a pristine state for tests
  197. # to work properly.
  198. warnings.resetwarnings()
  199. for v in list(values(sys.modules)):
  200. # do not evaluate Django moved modules and other lazily
  201. # initialized modules.
  202. if v and not _is_magic_module(v):
  203. # use raw __getattribute__ to protect even better from
  204. # lazily loaded modules
  205. try:
  206. object.__getattribute__(v, '__warningregistry__')
  207. except AttributeError:
  208. pass
  209. else:
  210. object.__setattr__(v, '__warningregistry__', {})
  211. self.warnings_manager = warnings.catch_warnings(record=True)
  212. self.warnings = self.warnings_manager.__enter__()
  213. warnings.simplefilter('always', self.expected)
  214. return self
  215. def __exit__(self, exc_type, exc_value, tb):
  216. self.warnings_manager.__exit__(exc_type, exc_value, tb)
  217. if exc_type is not None:
  218. # let unexpected exceptions pass through
  219. return
  220. try:
  221. exc_name = self.expected.__name__
  222. except AttributeError:
  223. exc_name = str(self.expected)
  224. first_matching = None
  225. for m in self.warnings:
  226. w = m.message
  227. if not isinstance(w, self.expected):
  228. continue
  229. if first_matching is None:
  230. first_matching = w
  231. if (self.expected_regex is not None and
  232. not self.expected_regex.search(str(w))):
  233. continue
  234. # store warning for later retrieval
  235. self.warning = w
  236. self.filename = m.filename
  237. self.lineno = m.lineno
  238. return
  239. # Now we simply try to choose a helpful failure message
  240. if first_matching is not None:
  241. raise self.failureException(
  242. '%r does not match %r' % (
  243. self.expected_regex.pattern, str(first_matching)))
  244. if self.obj_name:
  245. raise self.failureException(
  246. '%s not triggered by %s' % (exc_name, self.obj_name))
  247. else:
  248. raise self.failureException('%s not triggered' % exc_name)
  249. class Case(unittest.TestCase):
  250. def assertWarns(self, expected_warning):
  251. return _AssertWarnsContext(expected_warning, self, None)
  252. def assertWarnsRegex(self, expected_warning, expected_regex):
  253. return _AssertWarnsContext(expected_warning, self,
  254. None, expected_regex)
  255. @contextmanager
  256. def assertDeprecated(self):
  257. with self.assertWarnsRegex(CDeprecationWarning,
  258. r'scheduled for removal'):
  259. yield
  260. @contextmanager
  261. def assertPendingDeprecation(self):
  262. with self.assertWarnsRegex(CPendingDeprecationWarning,
  263. r'scheduled for deprecation'):
  264. yield
  265. def assertDictContainsSubset(self, expected, actual, msg=None):
  266. missing, mismatched = [], []
  267. for key, value in items(expected):
  268. if key not in actual:
  269. missing.append(key)
  270. elif value != actual[key]:
  271. mismatched.append('%s, expected: %s, actual: %s' % (
  272. safe_repr(key), safe_repr(value),
  273. safe_repr(actual[key])))
  274. if not (missing or mismatched):
  275. return
  276. standard_msg = ''
  277. if missing:
  278. standard_msg = 'Missing: %s' % ','.join(map(safe_repr, missing))
  279. if mismatched:
  280. if standard_msg:
  281. standard_msg += '; '
  282. standard_msg += 'Mismatched values: %s' % (
  283. ','.join(mismatched))
  284. self.fail(self._formatMessage(msg, standard_msg))
  285. def assertItemsEqual(self, expected_seq, actual_seq, msg=None):
  286. missing = unexpected = None
  287. try:
  288. expected = sorted(expected_seq)
  289. actual = sorted(actual_seq)
  290. except TypeError:
  291. # Unsortable items (example: set(), complex(), ...)
  292. expected = list(expected_seq)
  293. actual = list(actual_seq)
  294. missing, unexpected = unorderable_list_difference(
  295. expected, actual)
  296. else:
  297. return self.assertSequenceEqual(expected, actual, msg=msg)
  298. errors = []
  299. if missing:
  300. errors.append(
  301. 'Expected, but missing:\n %s' % (safe_repr(missing), )
  302. )
  303. if unexpected:
  304. errors.append(
  305. 'Unexpected, but present:\n %s' % (safe_repr(unexpected), )
  306. )
  307. if errors:
  308. standardMsg = '\n'.join(errors)
  309. self.fail(self._formatMessage(msg, standardMsg))
  310. def depends_on_current_app(fun):
  311. if inspect.isclass(fun):
  312. fun.contained = False
  313. else:
  314. @wraps(fun)
  315. def __inner(self, *args, **kwargs):
  316. self.app.set_current()
  317. return fun(self, *args, **kwargs)
  318. return __inner
  319. class AppCase(Case):
  320. contained = True
  321. def __init__(self, *args, **kwargs):
  322. super(AppCase, self).__init__(*args, **kwargs)
  323. if self.__class__.__dict__.get('setUp'):
  324. raise RuntimeError(
  325. CASE_REDEFINES_SETUP.format(name=qualname(self)),
  326. )
  327. if self.__class__.__dict__.get('tearDown'):
  328. raise RuntimeError(
  329. CASE_REDEFINES_TEARDOWN.format(name=qualname(self)),
  330. )
  331. def Celery(self, *args, **kwargs):
  332. return UnitApp(*args, **kwargs)
  333. def setUp(self):
  334. self._threads_at_setup = list(threading.enumerate())
  335. from celery import _state
  336. from celery import result
  337. result.task_join_will_block = \
  338. _state.task_join_will_block = lambda: False
  339. self._current_app = current_app()
  340. self._default_app = _state.default_app
  341. trap = Trap()
  342. self._prev_tls = _state._tls
  343. _state.set_default_app(trap)
  344. class NonTLS(object):
  345. current_app = trap
  346. _state._tls = NonTLS()
  347. self.app = self.Celery(set_as_current=False)
  348. if not self.contained:
  349. self.app.set_current()
  350. root = logging.getLogger()
  351. self.__rootlevel = root.level
  352. self.__roothandlers = root.handlers
  353. _state._set_task_join_will_block(False)
  354. try:
  355. self.setup()
  356. except:
  357. self._teardown_app()
  358. raise
  359. def _teardown_app(self):
  360. from celery.utils.log import LoggingProxy
  361. assert sys.stdout
  362. assert sys.stderr
  363. assert sys.__stdout__
  364. assert sys.__stderr__
  365. this = self._get_test_name()
  366. if isinstance(sys.stdout, LoggingProxy) or \
  367. isinstance(sys.__stdout__, LoggingProxy):
  368. raise RuntimeError(CASE_LOG_REDIRECT_EFFECT.format(this, 'stdout'))
  369. if isinstance(sys.stderr, LoggingProxy) or \
  370. isinstance(sys.__stderr__, LoggingProxy):
  371. raise RuntimeError(CASE_LOG_REDIRECT_EFFECT.format(this, 'stderr'))
  372. backend = self.app.__dict__.get('backend')
  373. if backend is not None:
  374. if isinstance(backend, CacheBackend):
  375. if isinstance(backend.client, DummyClient):
  376. backend.client.cache.clear()
  377. backend._cache.clear()
  378. from celery import _state
  379. _state._set_task_join_will_block(False)
  380. _state.set_default_app(self._default_app)
  381. _state._tls = self._prev_tls
  382. _state._tls.current_app = self._current_app
  383. if self.app is not self._current_app:
  384. self.app.close()
  385. self.app = None
  386. self.assertEqual(
  387. self._threads_at_setup, list(threading.enumerate()),
  388. )
  389. # Make sure no test left the shutdown flags enabled.
  390. from celery.worker import state as worker_state
  391. # check for EX_OK
  392. self.assertIsNot(worker_state.should_stop, False)
  393. self.assertIsNot(worker_state.should_terminate, False)
  394. # check for other true values
  395. self.assertFalse(worker_state.should_stop)
  396. self.assertFalse(worker_state.should_terminate)
  397. def _get_test_name(self):
  398. return '.'.join([self.__class__.__name__, self._testMethodName])
  399. def tearDown(self):
  400. try:
  401. self.teardown()
  402. finally:
  403. self._teardown_app()
  404. self.assert_no_logging_side_effect()
  405. def assert_no_logging_side_effect(self):
  406. this = self._get_test_name()
  407. root = logging.getLogger()
  408. if root.level != self.__rootlevel:
  409. raise RuntimeError(CASE_LOG_LEVEL_EFFECT.format(this))
  410. if root.handlers != self.__roothandlers:
  411. raise RuntimeError(CASE_LOG_HANDLER_EFFECT.format(this))
  412. def setup(self):
  413. pass
  414. def teardown(self):
  415. pass
  416. def get_handlers(logger):
  417. return [h for h in logger.handlers if not isinstance(h, NullHandler)]
  418. @contextmanager
  419. def wrap_logger(logger, loglevel=logging.ERROR):
  420. old_handlers = get_handlers(logger)
  421. sio = WhateverIO()
  422. siohandler = logging.StreamHandler(sio)
  423. logger.handlers = [siohandler]
  424. try:
  425. yield sio
  426. finally:
  427. logger.handlers = old_handlers
  428. def with_environ(env_name, env_value):
  429. def _envpatched(fun):
  430. @wraps(fun)
  431. def _patch_environ(*args, **kwargs):
  432. prev_val = os.environ.get(env_name)
  433. os.environ[env_name] = env_value
  434. try:
  435. return fun(*args, **kwargs)
  436. finally:
  437. os.environ[env_name] = prev_val or ''
  438. return _patch_environ
  439. return _envpatched
  440. def sleepdeprived(module=time):
  441. def _sleepdeprived(fun):
  442. @wraps(fun)
  443. def __sleepdeprived(*args, **kwargs):
  444. old_sleep = module.sleep
  445. module.sleep = noop
  446. try:
  447. return fun(*args, **kwargs)
  448. finally:
  449. module.sleep = old_sleep
  450. return __sleepdeprived
  451. return _sleepdeprived
  452. def skip_if_environ(env_var_name):
  453. def _wrap_test(fun):
  454. @wraps(fun)
  455. def _skips_if_environ(*args, **kwargs):
  456. if os.environ.get(env_var_name):
  457. raise SkipTest('SKIP %s: %s set\n' % (
  458. fun.__name__, env_var_name))
  459. return fun(*args, **kwargs)
  460. return _skips_if_environ
  461. return _wrap_test
  462. def _skip_test(reason, sign):
  463. def _wrap_test(fun):
  464. @wraps(fun)
  465. def _skipped_test(*args, **kwargs):
  466. raise SkipTest('%s: %s' % (sign, reason))
  467. return _skipped_test
  468. return _wrap_test
  469. def todo(reason):
  470. """TODO test decorator."""
  471. return _skip_test(reason, 'TODO')
  472. def skip(reason):
  473. """Skip test decorator."""
  474. return _skip_test(reason, 'SKIP')
  475. def skip_if(predicate, reason):
  476. """Skip test if predicate is :const:`True`."""
  477. def _inner(fun):
  478. return predicate and skip(reason)(fun) or fun
  479. return _inner
  480. def skip_unless(predicate, reason):
  481. """Skip test if predicate is :const:`False`."""
  482. return skip_if(not predicate, reason)
  483. # Taken from
  484. # http://bitbucket.org/runeh/snippets/src/tip/missing_modules.py
  485. @contextmanager
  486. def mask_modules(*modnames):
  487. """Ban some modules from being importable inside the context
  488. For example:
  489. >>> with mask_modules('sys'):
  490. ... try:
  491. ... import sys
  492. ... except ImportError:
  493. ... print('sys not found')
  494. sys not found
  495. >>> import sys # noqa
  496. >>> sys.version
  497. (2, 5, 2, 'final', 0)
  498. """
  499. realimport = builtins.__import__
  500. def myimp(name, *args, **kwargs):
  501. if name in modnames:
  502. raise ImportError('No module named %s' % name)
  503. else:
  504. return realimport(name, *args, **kwargs)
  505. builtins.__import__ = myimp
  506. try:
  507. yield True
  508. finally:
  509. builtins.__import__ = realimport
  510. @contextmanager
  511. def override_stdouts():
  512. """Override `sys.stdout` and `sys.stderr` with `WhateverIO`."""
  513. prev_out, prev_err = sys.stdout, sys.stderr
  514. mystdout, mystderr = WhateverIO(), WhateverIO()
  515. sys.stdout = sys.__stdout__ = mystdout
  516. sys.stderr = sys.__stderr__ = mystderr
  517. try:
  518. yield mystdout, mystderr
  519. finally:
  520. sys.stdout = sys.__stdout__ = prev_out
  521. sys.stderr = sys.__stderr__ = prev_err
  522. def _old_patch(module, name, mocked):
  523. module = importlib.import_module(module)
  524. def _patch(fun):
  525. @wraps(fun)
  526. def __patched(*args, **kwargs):
  527. prev = getattr(module, name)
  528. setattr(module, name, mocked)
  529. try:
  530. return fun(*args, **kwargs)
  531. finally:
  532. setattr(module, name, prev)
  533. return __patched
  534. return _patch
  535. @contextmanager
  536. def replace_module_value(module, name, value=None):
  537. has_prev = hasattr(module, name)
  538. prev = getattr(module, name, None)
  539. if value:
  540. setattr(module, name, value)
  541. else:
  542. try:
  543. delattr(module, name)
  544. except AttributeError:
  545. pass
  546. try:
  547. yield
  548. finally:
  549. if prev is not None:
  550. setattr(sys, name, prev)
  551. if not has_prev:
  552. try:
  553. delattr(module, name)
  554. except AttributeError:
  555. pass
  556. pypy_version = partial(
  557. replace_module_value, sys, 'pypy_version_info',
  558. )
  559. platform_pyimp = partial(
  560. replace_module_value, platform, 'python_implementation',
  561. )
  562. @contextmanager
  563. def sys_platform(value):
  564. prev, sys.platform = sys.platform, value
  565. try:
  566. yield
  567. finally:
  568. sys.platform = prev
  569. @contextmanager
  570. def reset_modules(*modules):
  571. prev = dict((k, sys.modules.pop(k)) for k in modules if k in sys.modules)
  572. try:
  573. yield
  574. finally:
  575. sys.modules.update(prev)
  576. @contextmanager
  577. def patch_modules(*modules):
  578. prev = {}
  579. for mod in modules:
  580. prev[mod] = sys.modules.get(mod)
  581. sys.modules[mod] = ModuleType(mod)
  582. try:
  583. yield
  584. finally:
  585. for name, mod in items(prev):
  586. if mod is None:
  587. sys.modules.pop(name, None)
  588. else:
  589. sys.modules[name] = mod
  590. @contextmanager
  591. def mock_module(*names):
  592. prev = {}
  593. class MockModule(ModuleType):
  594. def __getattr__(self, attr):
  595. setattr(self, attr, Mock())
  596. return ModuleType.__getattribute__(self, attr)
  597. mods = []
  598. for name in names:
  599. try:
  600. prev[name] = sys.modules[name]
  601. except KeyError:
  602. pass
  603. mod = sys.modules[name] = MockModule(name)
  604. mods.append(mod)
  605. try:
  606. yield mods
  607. finally:
  608. for name in names:
  609. try:
  610. sys.modules[name] = prev[name]
  611. except KeyError:
  612. try:
  613. del(sys.modules[name])
  614. except KeyError:
  615. pass
  616. @contextmanager
  617. def mock_context(mock, typ=Mock):
  618. context = mock.return_value = Mock()
  619. context.__enter__ = typ()
  620. context.__exit__ = typ()
  621. def on_exit(*x):
  622. if x[0]:
  623. reraise(x[0], x[1], x[2])
  624. context.__exit__.side_effect = on_exit
  625. context.__enter__.return_value = context
  626. try:
  627. yield context
  628. finally:
  629. context.reset()
  630. @contextmanager
  631. def mock_open(typ=WhateverIO, side_effect=None):
  632. with patch(open_fqdn) as open_:
  633. with mock_context(open_) as context:
  634. if side_effect is not None:
  635. context.__enter__.side_effect = side_effect
  636. val = context.__enter__.return_value = typ()
  637. val.__exit__ = Mock()
  638. yield val
  639. def patch_many(*targets):
  640. return nested(*[patch(target) for target in targets])
  641. @contextmanager
  642. def assert_signal_called(signal, **expected):
  643. handler = Mock()
  644. call_handler = partial(handler)
  645. signal.connect(call_handler)
  646. try:
  647. yield handler
  648. finally:
  649. signal.disconnect(call_handler)
  650. handler.assert_called_with(signal=signal, **expected)
  651. def skip_if_pypy(fun):
  652. @wraps(fun)
  653. def _inner(*args, **kwargs):
  654. if getattr(sys, 'pypy_version_info', None):
  655. raise SkipTest('does not work on PyPy')
  656. return fun(*args, **kwargs)
  657. return _inner
  658. def skip_if_jython(fun):
  659. @wraps(fun)
  660. def _inner(*args, **kwargs):
  661. if sys.platform.startswith('java'):
  662. raise SkipTest('does not work on Jython')
  663. return fun(*args, **kwargs)
  664. return _inner
  665. def task_message_from_sig(app, sig, utc=True):
  666. sig.freeze()
  667. callbacks = sig.options.pop('link', None)
  668. errbacks = sig.options.pop('link_error', None)
  669. countdown = sig.options.pop('countdown', None)
  670. if countdown:
  671. eta = app.now() + timedelta(seconds=countdown)
  672. else:
  673. eta = sig.options.pop('eta', None)
  674. if eta and isinstance(eta, datetime):
  675. eta = eta.isoformat()
  676. expires = sig.options.pop('expires', None)
  677. if expires and isinstance(expires, numbers.Real):
  678. expires = app.now() + timedelta(seconds=expires)
  679. if expires and isinstance(expires, datetime):
  680. expires = expires.isoformat()
  681. return TaskMessage(
  682. sig.task, id=sig.id, args=sig.args,
  683. kwargs=sig.kwargs,
  684. callbacks=[dict(s) for s in callbacks] if callbacks else None,
  685. errbacks=[dict(s) for s in errbacks] if errbacks else None,
  686. eta=eta,
  687. expires=expires,
  688. )
  689. @contextmanager
  690. def restore_logging():
  691. outs = sys.stdout, sys.stderr, sys.__stdout__, sys.__stderr__
  692. root = logging.getLogger()
  693. level = root.level
  694. handlers = root.handlers
  695. try:
  696. yield
  697. finally:
  698. sys.stdout, sys.stderr, sys.__stdout__, sys.__stderr__ = outs
  699. root.level = level
  700. root.handlers[:] = handlers
  701. def TaskMessage(name, id=None, args=(), kwargs={}, callbacks=None,
  702. errbacks=None, chain=None, **options):
  703. from celery import uuid
  704. from kombu.serialization import dumps
  705. id = id or uuid()
  706. message = Mock(name='TaskMessage-{0}'.format(id))
  707. message.headers = {
  708. 'id': id,
  709. 'task': name,
  710. }
  711. embed = {'callbacks': callbacks, 'errbacks': errbacks, 'chain': chain}
  712. message.headers.update(options)
  713. message.content_type, message.content_encoding, message.body = dumps(
  714. (args, kwargs, embed), serializer='json',
  715. )
  716. message.payload = (args, kwargs, embed)
  717. return message