|
@@ -9,12 +9,9 @@ from pickle import loads, dumps
|
|
|
|
|
|
from kombu import Queue
|
|
|
|
|
|
+from celery import Task
|
|
|
+
|
|
|
from celery.task import (
|
|
|
- current,
|
|
|
- task,
|
|
|
- Task,
|
|
|
- BaseTask,
|
|
|
- TaskSet,
|
|
|
periodic_task,
|
|
|
PeriodicTask
|
|
|
)
|
|
@@ -26,7 +23,7 @@ from celery.schedules import crontab, crontab_parser, ParseException
|
|
|
from celery.utils import uuid
|
|
|
from celery.utils.timeutils import parse_iso8601, timedelta_seconds
|
|
|
|
|
|
-from celery.tests.case import AppCase, eager_tasks, WhateverIO
|
|
|
+from celery.tests.case import AppCase
|
|
|
|
|
|
|
|
|
def return_True(*args, **kwargs):
|
|
@@ -34,213 +31,217 @@ def return_True(*args, **kwargs):
|
|
|
return True
|
|
|
|
|
|
|
|
|
-return_True_task = task()(return_True)
|
|
|
-
|
|
|
-
|
|
|
def raise_exception(self, **kwargs):
|
|
|
raise Exception('%s error' % self.__class__)
|
|
|
|
|
|
|
|
|
class MockApplyTask(Task):
|
|
|
+ abstract = True
|
|
|
applied = 0
|
|
|
|
|
|
def run(self, x, y):
|
|
|
return x * y
|
|
|
|
|
|
- @classmethod
|
|
|
def apply_async(self, *args, **kwargs):
|
|
|
self.applied += 1
|
|
|
|
|
|
|
|
|
-@task(name='c.unittest.increment_counter_task', count=0)
|
|
|
-def increment_counter(increment_by=1):
|
|
|
- increment_counter.count += increment_by or 1
|
|
|
- return increment_counter.count
|
|
|
-
|
|
|
-
|
|
|
-@task(name='c.unittest.raising_task')
|
|
|
-def raising():
|
|
|
- raise KeyError('foo')
|
|
|
+class TasksCase(AppCase):
|
|
|
|
|
|
+ def setup(self):
|
|
|
|
|
|
-@task(max_retries=3, iterations=0)
|
|
|
-def retry_task(arg1, arg2, kwarg=1, max_retries=None, care=True):
|
|
|
- current.iterations += 1
|
|
|
- rmax = current.max_retries if max_retries is None else max_retries
|
|
|
-
|
|
|
- assert repr(current.request)
|
|
|
- retries = current.request.retries
|
|
|
- if care and retries >= rmax:
|
|
|
- return arg1
|
|
|
- else:
|
|
|
- raise current.retry(countdown=0, max_retries=rmax)
|
|
|
+ self.return_True_task = self.app.task(shared=False)(return_True)
|
|
|
|
|
|
+ @self.app.task(bind=True, count=0, shared=False)
|
|
|
+ def increment_counter(self, increment_by=1):
|
|
|
+ self.count += increment_by or 1
|
|
|
+ return self.count
|
|
|
+ self.increment_counter = increment_counter
|
|
|
|
|
|
-@task(max_retries=3, iterations=0, accept_magic_kwargs=True)
|
|
|
-def retry_task_noargs(**kwargs):
|
|
|
- current.iterations += 1
|
|
|
+ @self.app.task(shared=False)
|
|
|
+ def raising():
|
|
|
+ raise KeyError('foo')
|
|
|
+ self.raising = raising
|
|
|
|
|
|
- retries = kwargs['task_retries']
|
|
|
- if retries >= 3:
|
|
|
- return 42
|
|
|
- else:
|
|
|
- raise current.retry(countdown=0)
|
|
|
+ @self.app.task(bind=True, max_retries=3, iterations=0, shared=False)
|
|
|
+ def retry_task(self, arg1, arg2, kwarg=1, max_retries=None, care=True):
|
|
|
+ self.iterations += 1
|
|
|
+ rmax = self.max_retries if max_retries is None else max_retries
|
|
|
|
|
|
+ assert repr(self.request)
|
|
|
+ retries = self.request.retries
|
|
|
+ if care and retries >= rmax:
|
|
|
+ return arg1
|
|
|
+ else:
|
|
|
+ raise self.retry(countdown=0, max_retries=rmax)
|
|
|
+ self.retry_task = retry_task
|
|
|
|
|
|
-@task(max_retries=3, iterations=0, base=MockApplyTask,
|
|
|
- accept_magic_kwargs=True)
|
|
|
-def retry_task_mockapply(arg1, arg2, kwarg=1, **kwargs):
|
|
|
- current.iterations += 1
|
|
|
+ @self.app.task(bind=True, max_retries=3, iterations=0, shared=False)
|
|
|
+ def retry_task_noargs(self, **kwargs):
|
|
|
+ self.iterations += 1
|
|
|
|
|
|
- retries = kwargs['task_retries']
|
|
|
- if retries >= 3:
|
|
|
- return arg1
|
|
|
- else:
|
|
|
- kwargs.update(kwarg=kwarg)
|
|
|
- raise current.retry(countdown=0)
|
|
|
+ if self.request.retries >= 3:
|
|
|
+ return 42
|
|
|
+ else:
|
|
|
+ raise self.retry(countdown=0)
|
|
|
+ self.retry_task_noargs = retry_task_noargs
|
|
|
+
|
|
|
+ @self.app.task(bind=True, max_retries=3, iterations=0,
|
|
|
+ base=MockApplyTask, shared=False)
|
|
|
+ def retry_task_mockapply(self, arg1, arg2, kwarg=1):
|
|
|
+ self.iterations += 1
|
|
|
+
|
|
|
+ retries = self.request.retries
|
|
|
+ if retries >= 3:
|
|
|
+ return arg1
|
|
|
+ raise self.retry(countdown=0)
|
|
|
+ self.retry_task_mockapply = retry_task_mockapply
|
|
|
+
|
|
|
+ @self.app.task(bind=True, max_retries=3, iterations=0, shared=False)
|
|
|
+ def retry_task_customexc(self, arg1, arg2, kwarg=1, **kwargs):
|
|
|
+ self.iterations += 1
|
|
|
+
|
|
|
+ retries = self.request.retries
|
|
|
+ if retries >= 3:
|
|
|
+ return arg1 + kwarg
|
|
|
+ else:
|
|
|
+ try:
|
|
|
+ raise MyCustomException('Elaine Marie Benes')
|
|
|
+ except MyCustomException as exc:
|
|
|
+ kwargs.update(kwarg=kwarg)
|
|
|
+ raise self.retry(countdown=0, exc=exc)
|
|
|
+ self.retry_task_customexc = retry_task_customexc
|
|
|
|
|
|
|
|
|
class MyCustomException(Exception):
|
|
|
"""Random custom exception."""
|
|
|
|
|
|
|
|
|
-@task(max_retries=3, iterations=0, accept_magic_kwargs=True)
|
|
|
-def retry_task_customexc(arg1, arg2, kwarg=1, **kwargs):
|
|
|
- current.iterations += 1
|
|
|
-
|
|
|
- retries = kwargs['task_retries']
|
|
|
- if retries >= 3:
|
|
|
- return arg1 + kwarg
|
|
|
- else:
|
|
|
- try:
|
|
|
- raise MyCustomException('Elaine Marie Benes')
|
|
|
- except MyCustomException as exc:
|
|
|
- kwargs.update(kwarg=kwarg)
|
|
|
- raise current.retry(countdown=0, exc=exc)
|
|
|
-
|
|
|
-
|
|
|
-class test_task_retries(AppCase):
|
|
|
+class test_task_retries(TasksCase):
|
|
|
|
|
|
def test_retry(self):
|
|
|
- retry_task.__class__.max_retries = 3
|
|
|
- retry_task.iterations = 0
|
|
|
- retry_task.apply([0xFF, 0xFFFF])
|
|
|
- self.assertEqual(retry_task.iterations, 4)
|
|
|
+ self.retry_task.max_retries = 3
|
|
|
+ self.retry_task.iterations = 0
|
|
|
+ self.retry_task.apply([0xFF, 0xFFFF])
|
|
|
+ self.assertEqual(self.retry_task.iterations, 4)
|
|
|
|
|
|
- retry_task.__class__.max_retries = 3
|
|
|
- retry_task.iterations = 0
|
|
|
- retry_task.apply([0xFF, 0xFFFF], {'max_retries': 10})
|
|
|
- self.assertEqual(retry_task.iterations, 11)
|
|
|
+ self.retry_task.max_retries = 3
|
|
|
+ self.retry_task.iterations = 0
|
|
|
+ self.retry_task.apply([0xFF, 0xFFFF], {'max_retries': 10})
|
|
|
+ self.assertEqual(self.retry_task.iterations, 11)
|
|
|
|
|
|
def test_retry_no_args(self):
|
|
|
- assert retry_task_noargs.accept_magic_kwargs
|
|
|
- retry_task_noargs.__class__.max_retries = 3
|
|
|
- retry_task_noargs.iterations = 0
|
|
|
- retry_task_noargs.apply()
|
|
|
- self.assertEqual(retry_task_noargs.iterations, 4)
|
|
|
+ self.retry_task_noargs.max_retries = 3
|
|
|
+ self.retry_task_noargs.iterations = 0
|
|
|
+ self.retry_task_noargs.apply(propagate=True).get()
|
|
|
+ self.assertEqual(self.retry_task_noargs.iterations, 4)
|
|
|
|
|
|
def test_retry_kwargs_can_be_empty(self):
|
|
|
- retry_task_mockapply.push_request()
|
|
|
+ self.retry_task_mockapply.push_request()
|
|
|
try:
|
|
|
with self.assertRaises(RetryTaskError):
|
|
|
- retry_task_mockapply.retry(args=[4, 4], kwargs=None)
|
|
|
+ self.retry_task_mockapply.retry(args=[4, 4], kwargs=None)
|
|
|
finally:
|
|
|
- retry_task_mockapply.pop_request()
|
|
|
+ self.retry_task_mockapply.pop_request()
|
|
|
|
|
|
def test_retry_not_eager(self):
|
|
|
- retry_task_mockapply.push_request()
|
|
|
+ self.retry_task_mockapply.push_request()
|
|
|
try:
|
|
|
- retry_task_mockapply.request.called_directly = False
|
|
|
+ self.retry_task_mockapply.request.called_directly = False
|
|
|
exc = Exception('baz')
|
|
|
try:
|
|
|
- retry_task_mockapply.retry(
|
|
|
+ self.retry_task_mockapply.retry(
|
|
|
args=[4, 4], kwargs={'task_retries': 0},
|
|
|
exc=exc, throw=False,
|
|
|
)
|
|
|
- self.assertTrue(retry_task_mockapply.__class__.applied)
|
|
|
+ self.assertTrue(self.retry_task_mockapply.applied)
|
|
|
finally:
|
|
|
- retry_task_mockapply.__class__.applied = 0
|
|
|
+ self.retry_task_mockapply.applied = 0
|
|
|
|
|
|
try:
|
|
|
with self.assertRaises(RetryTaskError):
|
|
|
- retry_task_mockapply.retry(
|
|
|
+ self.retry_task_mockapply.retry(
|
|
|
args=[4, 4], kwargs={'task_retries': 0},
|
|
|
exc=exc, throw=True)
|
|
|
- self.assertTrue(retry_task_mockapply.__class__.applied)
|
|
|
+ self.assertTrue(self.retry_task_mockapply.applied)
|
|
|
finally:
|
|
|
- retry_task_mockapply.__class__.applied = 0
|
|
|
+ self.retry_task_mockapply.applied = 0
|
|
|
finally:
|
|
|
- retry_task_mockapply.pop_request()
|
|
|
+ self.retry_task_mockapply.pop_request()
|
|
|
|
|
|
def test_retry_with_kwargs(self):
|
|
|
- retry_task_customexc.__class__.max_retries = 3
|
|
|
- retry_task_customexc.iterations = 0
|
|
|
- retry_task_customexc.apply([0xFF, 0xFFFF], {'kwarg': 0xF})
|
|
|
- self.assertEqual(retry_task_customexc.iterations, 4)
|
|
|
+ self.retry_task_customexc.max_retries = 3
|
|
|
+ self.retry_task_customexc.iterations = 0
|
|
|
+ self.retry_task_customexc.apply([0xFF, 0xFFFF], {'kwarg': 0xF})
|
|
|
+ self.assertEqual(self.retry_task_customexc.iterations, 4)
|
|
|
|
|
|
def test_retry_with_custom_exception(self):
|
|
|
- retry_task_customexc.__class__.max_retries = 2
|
|
|
- retry_task_customexc.iterations = 0
|
|
|
- result = retry_task_customexc.apply([0xFF, 0xFFFF], {'kwarg': 0xF})
|
|
|
+ self.retry_task_customexc.max_retries = 2
|
|
|
+ self.retry_task_customexc.iterations = 0
|
|
|
+ result = self.retry_task_customexc.apply(
|
|
|
+ [0xFF, 0xFFFF], {'kwarg': 0xF},
|
|
|
+ )
|
|
|
with self.assertRaises(MyCustomException):
|
|
|
result.get()
|
|
|
- self.assertEqual(retry_task_customexc.iterations, 3)
|
|
|
+ self.assertEqual(self.retry_task_customexc.iterations, 3)
|
|
|
|
|
|
def test_max_retries_exceeded(self):
|
|
|
- retry_task.__class__.max_retries = 2
|
|
|
- retry_task.iterations = 0
|
|
|
- result = retry_task.apply([0xFF, 0xFFFF], {'care': False})
|
|
|
- with self.assertRaises(retry_task.MaxRetriesExceededError):
|
|
|
+ self.retry_task.max_retries = 2
|
|
|
+ self.retry_task.iterations = 0
|
|
|
+ result = self.retry_task.apply([0xFF, 0xFFFF], {'care': False})
|
|
|
+ with self.assertRaises(self.retry_task.MaxRetriesExceededError):
|
|
|
result.get()
|
|
|
- self.assertEqual(retry_task.iterations, 3)
|
|
|
+ self.assertEqual(self.retry_task.iterations, 3)
|
|
|
|
|
|
- retry_task.__class__.max_retries = 1
|
|
|
- retry_task.iterations = 0
|
|
|
- result = retry_task.apply([0xFF, 0xFFFF], {'care': False})
|
|
|
- with self.assertRaises(retry_task.MaxRetriesExceededError):
|
|
|
+ self.retry_task.max_retries = 1
|
|
|
+ self.retry_task.iterations = 0
|
|
|
+ result = self.retry_task.apply([0xFF, 0xFFFF], {'care': False})
|
|
|
+ with self.assertRaises(self.retry_task.MaxRetriesExceededError):
|
|
|
result.get()
|
|
|
- self.assertEqual(retry_task.iterations, 2)
|
|
|
+ self.assertEqual(self.retry_task.iterations, 2)
|
|
|
|
|
|
|
|
|
-class test_canvas_utils(AppCase):
|
|
|
+class test_canvas_utils(TasksCase):
|
|
|
|
|
|
def test_si(self):
|
|
|
- self.assertTrue(retry_task.si())
|
|
|
- self.assertTrue(retry_task.si().immutable)
|
|
|
+ self.assertTrue(self.retry_task.si())
|
|
|
+ self.assertTrue(self.retry_task.si().immutable)
|
|
|
|
|
|
def test_chunks(self):
|
|
|
- self.assertTrue(retry_task.chunks(range(100), 10))
|
|
|
+ self.assertTrue(self.retry_task.chunks(range(100), 10))
|
|
|
|
|
|
def test_map(self):
|
|
|
- self.assertTrue(retry_task.map(range(100)))
|
|
|
+ self.assertTrue(self.retry_task.map(range(100)))
|
|
|
|
|
|
def test_starmap(self):
|
|
|
- self.assertTrue(retry_task.starmap(range(100)))
|
|
|
+ self.assertTrue(self.retry_task.starmap(range(100)))
|
|
|
|
|
|
def test_on_success(self):
|
|
|
- retry_task.on_success(1, 1, (), {})
|
|
|
+ self.retry_task.on_success(1, 1, (), {})
|
|
|
|
|
|
|
|
|
-class test_tasks(AppCase):
|
|
|
+class test_tasks(TasksCase):
|
|
|
|
|
|
def now(self):
|
|
|
return self.app.now()
|
|
|
|
|
|
def test_unpickle_task(self):
|
|
|
+ self.app.set_current()
|
|
|
import pickle
|
|
|
|
|
|
- @task
|
|
|
+ @self.app.task(shared=True)
|
|
|
def xxx():
|
|
|
pass
|
|
|
self.assertIs(pickle.loads(pickle.dumps(xxx)), xxx.app.tasks[xxx.name])
|
|
|
|
|
|
- def createTask(self, name):
|
|
|
- return task(__module__=self.__module__, name=name)(return_True)
|
|
|
+ def create_task(self, name):
|
|
|
+ return self.app.task(__module__=self.__module__,
|
|
|
+ shared=False, name=name)(return_True)
|
|
|
|
|
|
def test_AsyncResult(self):
|
|
|
task_id = uuid()
|
|
|
- result = retry_task.AsyncResult(task_id)
|
|
|
- self.assertEqual(result.backend, retry_task.backend)
|
|
|
+ result = self.retry_task.AsyncResult(task_id)
|
|
|
+ self.assertEqual(result.backend, self.retry_task.backend)
|
|
|
self.assertEqual(result.id, task_id)
|
|
|
|
|
|
def assertNextTaskDataEqual(self, consumer, presult, task_name,
|
|
@@ -271,82 +272,80 @@ class test_tasks(AppCase):
|
|
|
|
|
|
def test_task_kwargs_must_be_dictionary(self):
|
|
|
with self.assertRaises(ValueError):
|
|
|
- increment_counter.apply_async([], 'str')
|
|
|
+ self.increment_counter.apply_async([], 'str')
|
|
|
|
|
|
def test_task_args_must_be_list(self):
|
|
|
with self.assertRaises(ValueError):
|
|
|
- increment_counter.apply_async('str', {})
|
|
|
+ self.increment_counter.apply_async('str', {})
|
|
|
|
|
|
def test_regular_task(self):
|
|
|
- T1 = self.createTask('c.unittest.t.t1')
|
|
|
- self.assertIsInstance(T1, BaseTask)
|
|
|
+ T1 = self.create_task('c.unittest.t.t1')
|
|
|
+ self.assertIsInstance(T1, Task)
|
|
|
self.assertTrue(T1.run())
|
|
|
self.assertTrue(isinstance(T1, Callable), 'Task class is callable()')
|
|
|
self.assertTrue(T1(), 'Task class runs run() when called')
|
|
|
|
|
|
- consumer = T1.get_consumer()
|
|
|
- with self.assertRaises(NotImplementedError):
|
|
|
- consumer.receive('foo', 'foo')
|
|
|
- consumer.purge()
|
|
|
- self.assertIsNone(consumer.queues[0].get())
|
|
|
- T1.get_consumer(queues=[Queue('foo')])
|
|
|
-
|
|
|
- # Without arguments.
|
|
|
- presult = T1.delay()
|
|
|
- self.assertNextTaskDataEqual(consumer, presult, T1.name)
|
|
|
-
|
|
|
- # With arguments.
|
|
|
- presult2 = T1.apply_async(kwargs=dict(name='George Costanza'))
|
|
|
- self.assertNextTaskDataEqual(
|
|
|
- consumer, presult2, T1.name, name='George Costanza',
|
|
|
- )
|
|
|
-
|
|
|
- # send_task
|
|
|
- sresult = send_task(T1.name, kwargs=dict(name='Elaine M. Benes'))
|
|
|
- self.assertNextTaskDataEqual(
|
|
|
- consumer, sresult, T1.name, name='Elaine M. Benes',
|
|
|
- )
|
|
|
-
|
|
|
- # With eta.
|
|
|
- presult2 = T1.apply_async(
|
|
|
- kwargs=dict(name='George Costanza'),
|
|
|
- eta=self.now() + timedelta(days=1),
|
|
|
- expires=self.now() + timedelta(days=2),
|
|
|
- )
|
|
|
- self.assertNextTaskDataEqual(
|
|
|
- consumer, presult2, T1.name,
|
|
|
- name='George Costanza', test_eta=True, test_expires=True,
|
|
|
- )
|
|
|
-
|
|
|
- # With countdown.
|
|
|
- presult2 = T1.apply_async(kwargs=dict(name='George Costanza'),
|
|
|
- countdown=10, expires=12)
|
|
|
- self.assertNextTaskDataEqual(
|
|
|
- consumer, presult2, T1.name,
|
|
|
- name='George Costanza', test_eta=True, test_expires=True,
|
|
|
- )
|
|
|
-
|
|
|
- # Discarding all tasks.
|
|
|
- consumer.purge()
|
|
|
- T1.apply_async()
|
|
|
- self.assertEqual(consumer.purge(), 1)
|
|
|
- self.assertIsNone(consumer.queues[0].get())
|
|
|
-
|
|
|
- self.assertFalse(presult.successful())
|
|
|
- T1.backend.mark_as_done(presult.id, result=None)
|
|
|
- self.assertTrue(presult.successful())
|
|
|
-
|
|
|
- publisher = T1.get_publisher()
|
|
|
- self.assertTrue(publisher.exchange)
|
|
|
+ with self.app.connection_or_acquire() as conn:
|
|
|
+ consumer = self.app.amqp.TaskConsumer(conn)
|
|
|
+ with self.assertRaises(NotImplementedError):
|
|
|
+ consumer.receive('foo', 'foo')
|
|
|
+ consumer.purge()
|
|
|
+ self.assertIsNone(consumer.queues[0].get())
|
|
|
+ self.app.amqp.TaskConsumer(conn, queues=[Queue('foo')])
|
|
|
+
|
|
|
+ # Without arguments.
|
|
|
+ presult = T1.delay()
|
|
|
+ self.assertNextTaskDataEqual(consumer, presult, T1.name)
|
|
|
+
|
|
|
+ # With arguments.
|
|
|
+ presult2 = T1.apply_async(kwargs=dict(name='George Costanza'))
|
|
|
+ self.assertNextTaskDataEqual(
|
|
|
+ consumer, presult2, T1.name, name='George Costanza',
|
|
|
+ )
|
|
|
+
|
|
|
+ # send_task
|
|
|
+ sresult = send_task(T1.name, kwargs=dict(name='Elaine M. Benes'))
|
|
|
+ self.assertNextTaskDataEqual(
|
|
|
+ consumer, sresult, T1.name, name='Elaine M. Benes',
|
|
|
+ )
|
|
|
+
|
|
|
+ # With eta.
|
|
|
+ presult2 = T1.apply_async(
|
|
|
+ kwargs=dict(name='George Costanza'),
|
|
|
+ eta=self.now() + timedelta(days=1),
|
|
|
+ expires=self.now() + timedelta(days=2),
|
|
|
+ )
|
|
|
+ self.assertNextTaskDataEqual(
|
|
|
+ consumer, presult2, T1.name,
|
|
|
+ name='George Costanza', test_eta=True, test_expires=True,
|
|
|
+ )
|
|
|
+
|
|
|
+ # With countdown.
|
|
|
+ presult2 = T1.apply_async(kwargs=dict(name='George Costanza'),
|
|
|
+ countdown=10, expires=12)
|
|
|
+ self.assertNextTaskDataEqual(
|
|
|
+ consumer, presult2, T1.name,
|
|
|
+ name='George Costanza', test_eta=True, test_expires=True,
|
|
|
+ )
|
|
|
+
|
|
|
+ # Discarding all tasks.
|
|
|
+ consumer.purge()
|
|
|
+ T1.apply_async()
|
|
|
+ self.assertEqual(consumer.purge(), 1)
|
|
|
+ self.assertIsNone(consumer.queues[0].get())
|
|
|
+
|
|
|
+ self.assertFalse(presult.successful())
|
|
|
+ T1.backend.mark_as_done(presult.id, result=None)
|
|
|
+ self.assertTrue(presult.successful())
|
|
|
|
|
|
def test_repr_v2_compat(self):
|
|
|
- task = type(self.createTask('c.unittest.v2c'))
|
|
|
+ task = type(self.create_task('c.unittest.v2c')._get_current_object())
|
|
|
task.__v2_compat__ = True
|
|
|
self.assertIn('v2 compatible', repr(task))
|
|
|
|
|
|
def test_apply_with_self(self):
|
|
|
|
|
|
- @task(__self__=42)
|
|
|
+ @self.app.task(__self__=42, shared=False)
|
|
|
def tawself(self):
|
|
|
return self
|
|
|
|
|
@@ -355,7 +354,7 @@ class test_tasks(AppCase):
|
|
|
self.assertEqual(tawself(), 42)
|
|
|
|
|
|
def test_context_get(self):
|
|
|
- task = self.createTask('c.unittest.t.c.g')
|
|
|
+ task = self.create_task('c.unittest.t.c.g')
|
|
|
task.push_request()
|
|
|
try:
|
|
|
request = task.request
|
|
@@ -367,7 +366,7 @@ class test_tasks(AppCase):
|
|
|
task.pop_request()
|
|
|
|
|
|
def test_task_class_repr(self):
|
|
|
- task = self.createTask('c.unittest.t.repr')
|
|
|
+ task = self.create_task('c.unittest.t.repr')
|
|
|
self.assertIn('class Task of', repr(task.app.Task))
|
|
|
prev, task.app.Task._app = task.app.Task._app, None
|
|
|
try:
|
|
@@ -376,8 +375,8 @@ class test_tasks(AppCase):
|
|
|
task.app.Task._app = prev
|
|
|
|
|
|
def test_bind_no_magic_kwargs(self):
|
|
|
- task = self.createTask('c.unittest.t.magic_kwargs')
|
|
|
- task.__class__.accept_magic_kwargs = None
|
|
|
+ task = self.create_task('c.unittest.t.magic_kwargs')
|
|
|
+ task.accept_magic_kwargs = None
|
|
|
task.bind(task.app)
|
|
|
|
|
|
def test_annotate(self):
|
|
@@ -387,40 +386,23 @@ class test_tasks(AppCase):
|
|
|
self.assertEqual(Task.FOO, 'BAR')
|
|
|
|
|
|
def test_after_return(self):
|
|
|
- task = self.createTask('c.unittest.t.after_return')
|
|
|
+ task = self.create_task('c.unittest.t.after_return')
|
|
|
task.push_request()
|
|
|
try:
|
|
|
- task.request.chord = return_True_task.s()
|
|
|
+ task.request.chord = self.return_True_task.s()
|
|
|
task.after_return('SUCCESS', 1.0, 'foobar', (), {}, None)
|
|
|
task.request.clear()
|
|
|
finally:
|
|
|
task.pop_request()
|
|
|
|
|
|
def test_send_task_sent_event(self):
|
|
|
- T1 = self.createTask('c.unittest.t.t1')
|
|
|
- app = T1.app
|
|
|
- with app.connection() as conn:
|
|
|
- app.conf.CELERY_SEND_TASK_SENT_EVENT = True
|
|
|
- del(app.amqp.__dict__['TaskProducer'])
|
|
|
- try:
|
|
|
- self.assertTrue(app.amqp.TaskProducer(conn).send_sent_event)
|
|
|
- finally:
|
|
|
- app.conf.CELERY_SEND_TASK_SENT_EVENT = False
|
|
|
- del(app.amqp.__dict__['TaskProducer'])
|
|
|
-
|
|
|
- def test_get_publisher(self):
|
|
|
- connection = self.app.connection()
|
|
|
- p = increment_counter.get_publisher(connection, auto_declare=False,
|
|
|
- exchange='foo')
|
|
|
- self.assertEqual(p.exchange.name, 'foo')
|
|
|
- p = increment_counter.get_publisher(connection, auto_declare=False,
|
|
|
- exchange='foo',
|
|
|
- exchange_type='fanout')
|
|
|
- self.assertEqual(p.exchange.type, 'fanout')
|
|
|
+ with self.app.connection() as conn:
|
|
|
+ self.app.conf.CELERY_SEND_TASK_SENT_EVENT = True
|
|
|
+ self.assertTrue(self.app.amqp.TaskProducer(conn).send_sent_event)
|
|
|
|
|
|
def test_update_state(self):
|
|
|
|
|
|
- @task
|
|
|
+ @self.app.task(shared=False)
|
|
|
def yyy():
|
|
|
pass
|
|
|
|
|
@@ -440,7 +422,7 @@ class test_tasks(AppCase):
|
|
|
|
|
|
def test_repr(self):
|
|
|
|
|
|
- @task
|
|
|
+ @self.app.task(shared=False)
|
|
|
def task_test_repr():
|
|
|
pass
|
|
|
|
|
@@ -448,113 +430,42 @@ class test_tasks(AppCase):
|
|
|
|
|
|
def test_has___name__(self):
|
|
|
|
|
|
- @task
|
|
|
+ @self.app.task(shared=False)
|
|
|
def yyy2():
|
|
|
pass
|
|
|
|
|
|
self.assertTrue(yyy2.__name__)
|
|
|
|
|
|
- def test_get_logger(self):
|
|
|
- t1 = self.createTask('c.unittest.t.t1')
|
|
|
- t1.push_request()
|
|
|
- try:
|
|
|
- logfh = WhateverIO()
|
|
|
- logger = t1.get_logger(logfile=logfh, loglevel=0)
|
|
|
- self.assertTrue(logger)
|
|
|
|
|
|
- t1.request.loglevel = 3
|
|
|
- logger = t1.get_logger(logfile=logfh, loglevel=None)
|
|
|
- self.assertTrue(logger)
|
|
|
- finally:
|
|
|
- t1.pop_request()
|
|
|
-
|
|
|
-
|
|
|
-class test_TaskSet(AppCase):
|
|
|
-
|
|
|
- def test_function_taskset(self):
|
|
|
- with eager_tasks(self.app):
|
|
|
- subtasks = [return_True_task.s(i) for i in range(1, 6)]
|
|
|
- ts = TaskSet(subtasks)
|
|
|
- res = ts.apply_async()
|
|
|
- self.assertListEqual(res.join(), [True, True, True, True, True])
|
|
|
-
|
|
|
- def test_counter_taskset(self):
|
|
|
- increment_counter.count = 0
|
|
|
- ts = TaskSet(tasks=[
|
|
|
- increment_counter.s(),
|
|
|
- increment_counter.s(increment_by=2),
|
|
|
- increment_counter.s(increment_by=3),
|
|
|
- increment_counter.s(increment_by=4),
|
|
|
- increment_counter.s(increment_by=5),
|
|
|
- increment_counter.s(increment_by=6),
|
|
|
- increment_counter.s(increment_by=7),
|
|
|
- increment_counter.s(increment_by=8),
|
|
|
- increment_counter.s(increment_by=9),
|
|
|
- ])
|
|
|
- self.assertEqual(ts.total, 9)
|
|
|
-
|
|
|
- consumer = increment_counter.get_consumer()
|
|
|
- consumer.purge()
|
|
|
- consumer.close()
|
|
|
- taskset_res = ts.apply_async()
|
|
|
- subtasks = taskset_res.subtasks
|
|
|
- taskset_id = taskset_res.taskset_id
|
|
|
- consumer = increment_counter.get_consumer()
|
|
|
- for subtask in subtasks:
|
|
|
- m = consumer.queues[0].get(accept=['pickle']).payload
|
|
|
- self.assertDictContainsSubset({'taskset': taskset_id,
|
|
|
- 'task': increment_counter.name,
|
|
|
- 'id': subtask.id}, m)
|
|
|
- increment_counter(
|
|
|
- increment_by=m.get('kwargs', {}).get('increment_by'))
|
|
|
- self.assertEqual(increment_counter.count, sum(range(1, 10)))
|
|
|
-
|
|
|
- def test_named_taskset(self):
|
|
|
- prefix = 'test_named_taskset-'
|
|
|
- ts = TaskSet([return_True_task.subtask([1])])
|
|
|
- res = ts.apply(taskset_id=prefix + uuid())
|
|
|
- self.assertTrue(res.taskset_id.startswith(prefix))
|
|
|
-
|
|
|
-
|
|
|
-class test_apply_task(AppCase):
|
|
|
+class test_apply_task(TasksCase):
|
|
|
|
|
|
def test_apply_throw(self):
|
|
|
with self.assertRaises(KeyError):
|
|
|
- raising.apply(throw=True)
|
|
|
-
|
|
|
- def test_apply_no_magic_kwargs(self):
|
|
|
- increment_counter.accept_magic_kwargs = False
|
|
|
- try:
|
|
|
- increment_counter.apply()
|
|
|
- finally:
|
|
|
- increment_counter.accept_magic_kwargs = True
|
|
|
+ self.raising.apply(throw=True)
|
|
|
|
|
|
def test_apply_with_CELERY_EAGER_PROPAGATES_EXCEPTIONS(self):
|
|
|
- raising.app.conf.CELERY_EAGER_PROPAGATES_EXCEPTIONS = True
|
|
|
- try:
|
|
|
- with self.assertRaises(KeyError):
|
|
|
- raising.apply()
|
|
|
- finally:
|
|
|
- raising.app.conf.CELERY_EAGER_PROPAGATES_EXCEPTIONS = False
|
|
|
+ self.app.conf.CELERY_EAGER_PROPAGATES_EXCEPTIONS = True
|
|
|
+ with self.assertRaises(KeyError):
|
|
|
+ self.raising.apply()
|
|
|
|
|
|
def test_apply(self):
|
|
|
- increment_counter.count = 0
|
|
|
+ self.increment_counter.count = 0
|
|
|
|
|
|
- e = increment_counter.apply()
|
|
|
+ e = self.increment_counter.apply()
|
|
|
self.assertIsInstance(e, EagerResult)
|
|
|
self.assertEqual(e.get(), 1)
|
|
|
|
|
|
- e = increment_counter.apply(args=[1])
|
|
|
+ e = self.increment_counter.apply(args=[1])
|
|
|
self.assertEqual(e.get(), 2)
|
|
|
|
|
|
- e = increment_counter.apply(kwargs={'increment_by': 4})
|
|
|
+ e = self.increment_counter.apply(kwargs={'increment_by': 4})
|
|
|
self.assertEqual(e.get(), 6)
|
|
|
|
|
|
self.assertTrue(e.successful())
|
|
|
self.assertTrue(e.ready())
|
|
|
self.assertTrue(repr(e).startswith('<EagerResult:'))
|
|
|
|
|
|
- f = raising.apply()
|
|
|
+ f = self.raising.apply()
|
|
|
self.assertTrue(f.ready())
|
|
|
self.assertFalse(f.successful())
|
|
|
self.assertTrue(f.traceback)
|