|
@@ -36,7 +36,9 @@ from celery.five import monotonic
|
|
|
from celery.signals import task_revoked
|
|
|
from celery.utils import uuid
|
|
|
from celery.worker import request as module
|
|
|
-from celery.worker.request import Request, logger as req_logger
|
|
|
+from celery.worker.request import (
|
|
|
+ Request, create_request_cls, logger as req_logger,
|
|
|
+)
|
|
|
from celery.worker.state import revoked
|
|
|
|
|
|
from celery.tests.case import (
|
|
@@ -51,6 +53,39 @@ from celery.tests.case import (
|
|
|
)
|
|
|
|
|
|
|
|
|
+class RequestCase(AppCase):
|
|
|
+
|
|
|
+ def setup(self):
|
|
|
+ self.app.conf.result_serializer = 'pickle'
|
|
|
+
|
|
|
+ @self.app.task(shared=False)
|
|
|
+ def add(x, y, **kw_):
|
|
|
+ return x + y
|
|
|
+ self.add = add
|
|
|
+
|
|
|
+ @self.app.task(shared=False)
|
|
|
+ def mytask(i, **kwargs):
|
|
|
+ return i ** i
|
|
|
+ self.mytask = mytask
|
|
|
+
|
|
|
+ @self.app.task(shared=False)
|
|
|
+ def mytask_raising(i):
|
|
|
+ raise KeyError(i)
|
|
|
+ self.mytask_raising = mytask_raising
|
|
|
+
|
|
|
+ def xRequest(self, name=None, id=None, args=None, kwargs=None,
|
|
|
+ on_ack=None, on_reject=None, Request=Request, **head):
|
|
|
+ args = [1] if args is None else args
|
|
|
+ kwargs = {'f': 'x'} if kwargs is None else kwargs
|
|
|
+ on_ack = on_ack or Mock(name='on_ack')
|
|
|
+ on_reject = on_reject or Mock(name='on_reject')
|
|
|
+ message = TaskMessage(
|
|
|
+ name or self.mytask.name, id, args=args, kwargs=kwargs, **head
|
|
|
+ )
|
|
|
+ return Request(message, app=self.app,
|
|
|
+ on_ack=on_ack, on_reject=on_reject)
|
|
|
+
|
|
|
+
|
|
|
class test_mro_lookup(Case):
|
|
|
|
|
|
def test_order(self):
|
|
@@ -125,7 +160,7 @@ class test_Retry(AppCase):
|
|
|
self.assertEqual(ret.exc, exc)
|
|
|
|
|
|
|
|
|
-class test_trace_task(AppCase):
|
|
|
+class test_trace_task(RequestCase):
|
|
|
|
|
|
def setup(self):
|
|
|
|
|
@@ -162,7 +197,7 @@ class test_trace_task(AppCase):
|
|
|
def test_marked_as_started(self):
|
|
|
_started = []
|
|
|
|
|
|
- def store_result(tid, meta, state, **kwars):
|
|
|
+ def store_result(tid, meta, state, **kwargs):
|
|
|
if state == states.STARTED:
|
|
|
_started.append(tid)
|
|
|
self.mytask.backend.store_result = Mock(name='store_result')
|
|
@@ -207,25 +242,7 @@ class MockEventDispatcher(object):
|
|
|
self.sent.append(event)
|
|
|
|
|
|
|
|
|
-class test_Request(AppCase):
|
|
|
-
|
|
|
- def setup(self):
|
|
|
- self.app.conf.result_serializer = 'pickle'
|
|
|
-
|
|
|
- @self.app.task(shared=False)
|
|
|
- def add(x, y, **kw_):
|
|
|
- return x + y
|
|
|
- self.add = add
|
|
|
-
|
|
|
- @self.app.task(shared=False)
|
|
|
- def mytask(i, **kwargs):
|
|
|
- return i ** i
|
|
|
- self.mytask = mytask
|
|
|
-
|
|
|
- @self.app.task(shared=False)
|
|
|
- def mytask_raising(i):
|
|
|
- raise KeyError(i)
|
|
|
- self.mytask_raising = mytask_raising
|
|
|
+class test_Request(RequestCase):
|
|
|
|
|
|
def get_request(self, sig, Request=Request, **kwargs):
|
|
|
return Request(
|
|
@@ -239,6 +256,12 @@ class test_Request(AppCase):
|
|
|
**kwargs
|
|
|
)
|
|
|
|
|
|
+ def test_shadow(self):
|
|
|
+ self.assertEqual(
|
|
|
+ self.get_request(self.add.s(2, 2).set(shadow='fooxyz')).name,
|
|
|
+ 'fooxyz',
|
|
|
+ )
|
|
|
+
|
|
|
def test_invalid_eta_raises_InvalidTaskError(self):
|
|
|
with self.assertRaises(InvalidTaskError):
|
|
|
self.get_request(self.add.s(2, 2).set(eta='12345'))
|
|
@@ -358,18 +381,6 @@ class test_Request(AppCase):
|
|
|
req._tzlocal = 'foo'
|
|
|
self.assertEqual(req.tzlocal, 'foo')
|
|
|
|
|
|
- def xRequest(self, name=None, id=None, args=None, kwargs=None,
|
|
|
- on_ack=None, on_reject=None, **head):
|
|
|
- args = [1] if args is None else args
|
|
|
- kwargs = {'f': 'x'} if kwargs is None else kwargs
|
|
|
- on_ack = on_ack or Mock(name='on_ack')
|
|
|
- on_reject = on_reject or Mock(name='on_reject')
|
|
|
- message = TaskMessage(
|
|
|
- name or self.mytask.name, id, args=args, kwargs=kwargs, **head
|
|
|
- )
|
|
|
- return Request(message, app=self.app,
|
|
|
- on_ack=on_ack, on_reject=on_reject)
|
|
|
-
|
|
|
def test_task_wrapper_repr(self):
|
|
|
self.assertTrue(repr(self.xRequest()))
|
|
|
|
|
@@ -414,6 +425,23 @@ class test_Request(AppCase):
|
|
|
job.task_name = 'NAME'
|
|
|
self.assertEqual(job.name, 'NAME')
|
|
|
|
|
|
+ def test_terminate__pool_ref(self):
|
|
|
+ pool = Mock()
|
|
|
+ signum = signal.SIGTERM
|
|
|
+ job = self.get_request(self.mytask.s(1, f='x'))
|
|
|
+ job._apply_result = Mock(name='_apply_result')
|
|
|
+ with assert_signal_called(
|
|
|
+ task_revoked, sender=job.task, request=job,
|
|
|
+ terminated=True, expired=False, signum=signum):
|
|
|
+ job.time_start = monotonic()
|
|
|
+ job.worker_pid = 314
|
|
|
+ job.terminate(pool, signal='TERM')
|
|
|
+ job._apply_result().terminate.assert_called_with(signum)
|
|
|
+
|
|
|
+ job._apply_result = Mock(name='_apply_result2')
|
|
|
+ job._apply_result.return_value = None
|
|
|
+ job.terminate(pool, signal='TERM')
|
|
|
+
|
|
|
def test_terminate__task_started(self):
|
|
|
pool = Mock()
|
|
|
signum = signal.SIGTERM
|
|
@@ -627,6 +655,8 @@ class test_Request(AppCase):
|
|
|
def test_on_timeout(self, warn, error):
|
|
|
|
|
|
job = self.xRequest()
|
|
|
+ job.acknowledge = Mock(name='ack')
|
|
|
+ job.task.acks_late = True
|
|
|
job.on_timeout(soft=True, timeout=1337)
|
|
|
self.assertIn('Soft time limit', warn.call_args[0][0])
|
|
|
job.on_timeout(soft=False, timeout=1337)
|
|
@@ -634,6 +664,7 @@ class test_Request(AppCase):
|
|
|
self.assertEqual(
|
|
|
self.mytask.backend.get_status(job.id), states.FAILURE,
|
|
|
)
|
|
|
+ job.acknowledge.assert_called_with()
|
|
|
|
|
|
self.mytask.ignore_result = True
|
|
|
job = self.xRequest()
|
|
@@ -642,6 +673,12 @@ class test_Request(AppCase):
|
|
|
self.mytask.backend.get_status(job.id), states.PENDING,
|
|
|
)
|
|
|
|
|
|
+ job = self.xRequest()
|
|
|
+ job.acknowledge = Mock(name='ack')
|
|
|
+ job.task.acks_late = False
|
|
|
+ job.on_timeout(soft=True, timeout=1335)
|
|
|
+ self.assertFalse(job.acknowledge.called)
|
|
|
+
|
|
|
def test_fast_trace_task(self):
|
|
|
from celery.app import trace
|
|
|
setup_worker_optimizations(self.app)
|
|
@@ -874,23 +911,163 @@ class test_Request(AppCase):
|
|
|
self.assertEqual(p.args[1], tid)
|
|
|
self.assertEqual(p.args[3], job.message.body)
|
|
|
|
|
|
- def _test_on_failure(self, exception):
|
|
|
+ def _test_on_failure(self, exception, **kwargs):
|
|
|
tid = uuid()
|
|
|
job = self.xRequest(id=tid, args=[4])
|
|
|
job.send_event = Mock(name='send_event')
|
|
|
+ job.task.backend.mark_as_failure = Mock(name='mark_as_failure')
|
|
|
try:
|
|
|
raise exception
|
|
|
- except Exception:
|
|
|
+ except type(exception):
|
|
|
exc_info = ExceptionInfo()
|
|
|
- job.on_failure(exc_info)
|
|
|
+ job.on_failure(exc_info, **kwargs)
|
|
|
self.assertTrue(job.send_event.called)
|
|
|
+ return job
|
|
|
|
|
|
def test_on_failure(self):
|
|
|
self._test_on_failure(Exception('Inside unit tests'))
|
|
|
|
|
|
- def test_on_failure_unicode_exception(self):
|
|
|
+ def test_on_failure__unicode_exception(self):
|
|
|
self._test_on_failure(Exception('Бобры атакуют'))
|
|
|
|
|
|
- def test_on_failure_utf8_exception(self):
|
|
|
+ def test_on_failure__utf8_exception(self):
|
|
|
self._test_on_failure(Exception(
|
|
|
from_utf8('Бобры атакуют')))
|
|
|
+
|
|
|
+ def test_on_failure__WorkerLostError(self):
|
|
|
+ exc = WorkerLostError()
|
|
|
+ job = self._test_on_failure(exc)
|
|
|
+ job.task.backend.mark_as_failure.assert_called_with(
|
|
|
+ job.id, exc, request=job, store_result=True,
|
|
|
+ )
|
|
|
+
|
|
|
+ def test_on_failure__return_ok(self):
|
|
|
+ self._test_on_failure(KeyError(), return_ok=True)
|
|
|
+
|
|
|
+ def test_reject(self):
|
|
|
+ job = self.xRequest(id=uuid())
|
|
|
+ job.on_reject = Mock(name='on_reject')
|
|
|
+ job.acknowleged = False
|
|
|
+ job.reject(requeue=True)
|
|
|
+ job.on_reject.assert_called_with(
|
|
|
+ req_logger, job.connection_errors, True,
|
|
|
+ )
|
|
|
+ self.assertTrue(job.acknowledged)
|
|
|
+ job.on_reject.reset_mock()
|
|
|
+ job.reject(requeue=True)
|
|
|
+ self.assertFalse(job.on_reject.called)
|
|
|
+
|
|
|
+ def test_group(self):
|
|
|
+ gid = uuid()
|
|
|
+ job = self.xRequest(id=uuid(), group=gid)
|
|
|
+ self.assertEqual(job.group, gid)
|
|
|
+
|
|
|
+
|
|
|
+class test_create_request_class(RequestCase):
|
|
|
+
|
|
|
+ def setup(self):
|
|
|
+ RequestCase.setup(self)
|
|
|
+ self.task = Mock(name='task')
|
|
|
+ self.pool = Mock(name='pool')
|
|
|
+ self.eventer = Mock(name='eventer')
|
|
|
+
|
|
|
+ def create_request_cls(self, **kwargs):
|
|
|
+ return create_request_cls(
|
|
|
+ Request, self.task, self.pool, 'foo', self.eventer, **kwargs
|
|
|
+ )
|
|
|
+
|
|
|
+ def zRequest(self, Request=None, revoked_tasks=None, ref=None, **kwargs):
|
|
|
+ return self.xRequest(
|
|
|
+ Request=Request or self.create_request_cls(
|
|
|
+ ref=ref,
|
|
|
+ revoked_tasks=revoked_tasks,
|
|
|
+ ),
|
|
|
+ **kwargs)
|
|
|
+
|
|
|
+ def test_on_success(self):
|
|
|
+ self.zRequest(id=uuid()).on_success((False, "hey", 3.1222))
|
|
|
+
|
|
|
+ def test_on_success__SystemExit(self,
|
|
|
+ errors=(SystemExit, KeyboardInterrupt)):
|
|
|
+ for exc in errors:
|
|
|
+ einfo = None
|
|
|
+ try:
|
|
|
+ raise exc()
|
|
|
+ except exc:
|
|
|
+ einfo = ExceptionInfo()
|
|
|
+ with self.assertRaises(exc):
|
|
|
+ self.zRequest(id=uuid()).on_success((True, einfo, 1.0))
|
|
|
+
|
|
|
+ def test_on_success__calls_failure(self):
|
|
|
+ job = self.zRequest(id=uuid())
|
|
|
+ einfo = Mock(name='einfo')
|
|
|
+ job.on_failure = Mock(name='on_failure')
|
|
|
+ job.on_success((True, einfo, 1.0))
|
|
|
+ job.on_failure.assert_called_with(einfo, return_ok=True)
|
|
|
+
|
|
|
+ def test_on_success__acks_late_enabled(self):
|
|
|
+ self.task.acks_late = True
|
|
|
+ job = self.zRequest(id=uuid())
|
|
|
+ job.acknowledge = Mock(name='ack')
|
|
|
+ job.on_success((False, 'foo', 1.0))
|
|
|
+ job.acknowledge.assert_called_with()
|
|
|
+
|
|
|
+ def test_on_success__acks_late_disabled(self):
|
|
|
+ self.task.acks_late = False
|
|
|
+ job = self.zRequest(id=uuid())
|
|
|
+ job.acknowledge = Mock(name='ack')
|
|
|
+ job.on_success((False, 'foo', 1.0))
|
|
|
+ self.assertFalse(job.acknowledge.called)
|
|
|
+
|
|
|
+ def test_on_success__no_events(self):
|
|
|
+ self.eventer = None
|
|
|
+ job = self.zRequest(id=uuid())
|
|
|
+ job.send_event = Mock(name='send_event')
|
|
|
+ job.on_success((False, 'foo', 1.0))
|
|
|
+ self.assertFalse(job.send_event.called)
|
|
|
+
|
|
|
+ def test_on_success__with_events(self):
|
|
|
+ job = self.zRequest(id=uuid())
|
|
|
+ job.send_event = Mock(name='send_event')
|
|
|
+ job.on_success((False, 'foo', 1.0))
|
|
|
+ job.send_event.assert_called_with(
|
|
|
+ 'task-succeeded', result='foo', runtime=1.0,
|
|
|
+ )
|
|
|
+
|
|
|
+ def test_execute_using_pool__revoked(self):
|
|
|
+ tid = uuid()
|
|
|
+ job = self.zRequest(id=tid, revoked_tasks={tid})
|
|
|
+ job.revoked = Mock()
|
|
|
+ job.revoked.return_value = True
|
|
|
+ with self.assertRaises(TaskRevokedError):
|
|
|
+ job.execute_using_pool(self.pool)
|
|
|
+
|
|
|
+ def test_execute_using_pool__expired(self):
|
|
|
+ tid = uuid()
|
|
|
+ job = self.zRequest(id=tid, revoked_tasks=set())
|
|
|
+ job.expires = 1232133
|
|
|
+ job.revoked = Mock()
|
|
|
+ job.revoked.return_value = True
|
|
|
+ with self.assertRaises(TaskRevokedError):
|
|
|
+ job.execute_using_pool(self.pool)
|
|
|
+
|
|
|
+ def test_execute_using_pool(self):
|
|
|
+ from celery.app.trace import trace_task_ret as trace
|
|
|
+ weakref_ref = Mock(name='weakref.ref')
|
|
|
+ job = self.zRequest(id=uuid(), revoked_tasks=set(), ref=weakref_ref)
|
|
|
+ job.execute_using_pool(self.pool)
|
|
|
+ self.pool.apply_async.assert_called_with(
|
|
|
+ trace,
|
|
|
+ args=(job.type, job.id, job.request_dict, job.body,
|
|
|
+ job.content_type, job.content_encoding),
|
|
|
+ accept_callback=job.on_accepted,
|
|
|
+ timeout_callback=job.on_timeout,
|
|
|
+ callback=job.on_success,
|
|
|
+ error_callback=job.on_failure,
|
|
|
+ soft_timeout=self.task.soft_time_limit,
|
|
|
+ timeout=self.task.time_limit,
|
|
|
+ correlation_id=job.id,
|
|
|
+ )
|
|
|
+ self.assertTrue(job._apply_result)
|
|
|
+ weakref_ref.assert_called_with(self.pool.apply_async())
|
|
|
+ self.assertIs(job._apply_result, weakref_ref())
|