|
@@ -5,6 +5,7 @@ from datetime import datetime, timedelta
|
|
|
from functools import wraps
|
|
|
|
|
|
from celery import task
|
|
|
+from celery.task import current
|
|
|
from celery.app import app_or_default
|
|
|
from celery.task import task as task_dec
|
|
|
from celery.exceptions import RetryTaskError
|
|
@@ -30,178 +31,155 @@ def raise_exception(self, **kwargs):
|
|
|
|
|
|
|
|
|
class MockApplyTask(task.Task):
|
|
|
+ applied = 0
|
|
|
|
|
|
def run(self, x, y):
|
|
|
return x * y
|
|
|
|
|
|
@classmethod
|
|
|
def apply_async(self, *args, **kwargs):
|
|
|
- pass
|
|
|
-
|
|
|
-
|
|
|
-class IncrementCounterTask(task.Task):
|
|
|
- name = "c.unittest.increment_counter_task"
|
|
|
- count = 0
|
|
|
-
|
|
|
- def run(self, increment_by=1, **kwargs):
|
|
|
- increment_by = increment_by or 1
|
|
|
- self.__class__.count += increment_by
|
|
|
- return self.__class__.count
|
|
|
+ self.applied += 1
|
|
|
|
|
|
|
|
|
-class RaisingTask(task.Task):
|
|
|
- name = "c.unittest.raising_task"
|
|
|
+@task.task(name="c.unittest.increment_counter_task", count=0)
|
|
|
+def increment_counter(increment_by=1):
|
|
|
+ increment_counter.count += increment_by or 1
|
|
|
+ return increment_counter.count
|
|
|
|
|
|
- def run(self, **kwargs):
|
|
|
- raise KeyError("foo")
|
|
|
|
|
|
+@task.task(name="c.unittest.raising_task")
|
|
|
+def raising():
|
|
|
+ raise KeyError("foo")
|
|
|
|
|
|
-class RetryTask(task.Task):
|
|
|
- max_retries = 3
|
|
|
- iterations = 0
|
|
|
|
|
|
- def run(self, arg1, arg2, kwarg=1, max_retries=None, care=True):
|
|
|
- self.__class__.iterations += 1
|
|
|
- rmax = self.max_retries if max_retries is None else max_retries
|
|
|
+@task.task(max_retries=3, iterations=0)
|
|
|
+def retry_task(arg1, arg2, kwarg=1, max_retries=None, care=True):
|
|
|
+ current.iterations += 1
|
|
|
+ rmax = current.max_retries if max_retries is None else max_retries
|
|
|
|
|
|
- retries = self.request.retries
|
|
|
- if care and retries >= rmax:
|
|
|
- return arg1
|
|
|
- else:
|
|
|
- return self.retry(countdown=0, max_retries=max_retries)
|
|
|
+ retries = current.request.retries
|
|
|
+ if care and retries >= rmax:
|
|
|
+ return arg1
|
|
|
+ else:
|
|
|
+ return current.retry(countdown=0, max_retries=rmax)
|
|
|
|
|
|
|
|
|
-class RetryTaskNoArgs(task.Task):
|
|
|
- max_retries = 3
|
|
|
- iterations = 0
|
|
|
+@task.task(max_retries=3, iterations=0)
|
|
|
+def retry_task_noargs(**kwargs):
|
|
|
+ current.iterations += 1
|
|
|
|
|
|
- def run(self, **kwargs):
|
|
|
- self.__class__.iterations += 1
|
|
|
+ retries = kwargs["task_retries"]
|
|
|
+ if retries >= 3:
|
|
|
+ return 42
|
|
|
+ else:
|
|
|
+ return current.retry(countdown=0)
|
|
|
|
|
|
- retries = kwargs["task_retries"]
|
|
|
- if retries >= 3:
|
|
|
- return 42
|
|
|
- else:
|
|
|
- return self.retry(kwargs=kwargs, countdown=0)
|
|
|
|
|
|
+@task.task(max_retries=3, iterations=0, base=MockApplyTask)
|
|
|
+def retry_task_mockapply(arg1, arg2, kwarg=1, **kwargs):
|
|
|
+ current.iterations += 1
|
|
|
|
|
|
-class RetryTaskMockApply(task.Task):
|
|
|
- max_retries = 3
|
|
|
- iterations = 0
|
|
|
- applied = 0
|
|
|
-
|
|
|
- def run(self, arg1, arg2, kwarg=1, **kwargs):
|
|
|
- self.__class__.iterations += 1
|
|
|
-
|
|
|
- retries = kwargs["task_retries"]
|
|
|
- if retries >= 3:
|
|
|
- return arg1
|
|
|
- else:
|
|
|
- kwargs.update({"kwarg": kwarg})
|
|
|
- return self.retry(args=[arg1, arg2], kwargs=kwargs, countdown=0)
|
|
|
-
|
|
|
- @classmethod
|
|
|
- def apply_async(self, *args, **kwargs):
|
|
|
- self.applied = 1
|
|
|
+ retries = kwargs["task_retries"]
|
|
|
+ if retries >= 3:
|
|
|
+ return arg1
|
|
|
+ else:
|
|
|
+ kwargs.update(kwarg=kwarg)
|
|
|
+ return current.retry(countdown=0)
|
|
|
|
|
|
|
|
|
class MyCustomException(Exception):
|
|
|
"""Random custom exception."""
|
|
|
|
|
|
|
|
|
-class RetryTaskCustomExc(task.Task):
|
|
|
- max_retries = 3
|
|
|
- iterations = 0
|
|
|
+@task.task(max_retries=3, iterations=0, accept_magic_kwargs=True)
|
|
|
+def retry_task_customexc(arg1, arg2, kwarg=1, **kwargs):
|
|
|
+ current.iterations += 1
|
|
|
|
|
|
- def run(self, arg1, arg2, kwarg=1, **kwargs):
|
|
|
- self.__class__.iterations += 1
|
|
|
-
|
|
|
- retries = kwargs["task_retries"]
|
|
|
- if retries >= 3:
|
|
|
- return arg1 + kwarg
|
|
|
- else:
|
|
|
- try:
|
|
|
- raise MyCustomException("Elaine Marie Benes")
|
|
|
- except MyCustomException, exc:
|
|
|
- kwargs.update({"kwarg": kwarg})
|
|
|
- return self.retry(args=[arg1, arg2], kwargs=kwargs,
|
|
|
- countdown=0, exc=exc)
|
|
|
+ retries = kwargs["task_retries"]
|
|
|
+ if retries >= 3:
|
|
|
+ return arg1 + kwarg
|
|
|
+ else:
|
|
|
+ try:
|
|
|
+ raise MyCustomException("Elaine Marie Benes")
|
|
|
+ except MyCustomException, exc:
|
|
|
+ kwargs.update(kwarg=kwarg)
|
|
|
+ return current.retry(countdown=0, exc=exc)
|
|
|
|
|
|
|
|
|
class TestTaskRetries(Case):
|
|
|
|
|
|
def test_retry(self):
|
|
|
- RetryTask.max_retries = 3
|
|
|
- RetryTask.iterations = 0
|
|
|
- result = RetryTask.apply([0xFF, 0xFFFF])
|
|
|
+ retry_task.__class__.max_retries = 3
|
|
|
+ retry_task.iterations = 0
|
|
|
+ result = retry_task.apply([0xFF, 0xFFFF])
|
|
|
self.assertEqual(result.get(), 0xFF)
|
|
|
- self.assertEqual(RetryTask.iterations, 4)
|
|
|
+ self.assertEqual(retry_task.iterations, 4)
|
|
|
|
|
|
- RetryTask.max_retries = 3
|
|
|
- RetryTask.iterations = 0
|
|
|
- result = RetryTask.apply([0xFF, 0xFFFF], {"max_retries": 10})
|
|
|
+ retry_task.__class__.max_retries = 3
|
|
|
+ retry_task.iterations = 0
|
|
|
+ result = retry_task.apply([0xFF, 0xFFFF], {"max_retries": 10})
|
|
|
self.assertEqual(result.get(), 0xFF)
|
|
|
- self.assertEqual(RetryTask.iterations, 11)
|
|
|
+ self.assertEqual(retry_task.iterations, 11)
|
|
|
|
|
|
def test_retry_no_args(self):
|
|
|
- RetryTaskNoArgs.max_retries = 3
|
|
|
- RetryTaskNoArgs.iterations = 0
|
|
|
- result = RetryTaskNoArgs.apply()
|
|
|
+ retry_task_noargs.__class__.max_retries = 3
|
|
|
+ retry_task_noargs.iterations = 0
|
|
|
+ result = retry_task_noargs.apply()
|
|
|
self.assertEqual(result.get(), 42)
|
|
|
- self.assertEqual(RetryTaskNoArgs.iterations, 4)
|
|
|
+ self.assertEqual(retry_task_noargs.iterations, 4)
|
|
|
|
|
|
def test_retry_kwargs_can_be_empty(self):
|
|
|
with self.assertRaises(RetryTaskError):
|
|
|
- RetryTaskMockApply.retry(args=[4, 4], kwargs=None)
|
|
|
+ retry_task_mockapply.retry(args=[4, 4], kwargs=None)
|
|
|
|
|
|
def test_retry_not_eager(self):
|
|
|
- RetryTaskMockApply.request.called_directly = False
|
|
|
+ retry_task_mockapply.request.called_directly = False
|
|
|
exc = Exception("baz")
|
|
|
try:
|
|
|
- RetryTaskMockApply.retry(args=[4, 4], kwargs={"task_retries": 0},
|
|
|
- exc=exc, throw=False)
|
|
|
- self.assertTrue(RetryTaskMockApply.applied)
|
|
|
+ retry_task_mockapply.retry(args=[4, 4], kwargs={"task_retries": 0},
|
|
|
+ exc=exc, throw=False)
|
|
|
+ self.assertTrue(retry_task_mockapply.__class__.applied)
|
|
|
finally:
|
|
|
- RetryTaskMockApply.applied = 0
|
|
|
+ retry_task_mockapply.__class__.applied = 0
|
|
|
|
|
|
try:
|
|
|
with self.assertRaises(RetryTaskError):
|
|
|
- RetryTaskMockApply.retry(
|
|
|
+ retry_task_mockapply.retry(
|
|
|
args=[4, 4], kwargs={"task_retries": 0},
|
|
|
exc=exc, throw=True)
|
|
|
- self.assertTrue(RetryTaskMockApply.applied)
|
|
|
+ self.assertTrue(retry_task_mockapply.__class__.applied)
|
|
|
finally:
|
|
|
- RetryTaskMockApply.applied = 0
|
|
|
+ retry_task_mockapply.__class__.applied = 0
|
|
|
|
|
|
def test_retry_with_kwargs(self):
|
|
|
- RetryTaskCustomExc.max_retries = 3
|
|
|
- RetryTaskCustomExc.iterations = 0
|
|
|
- result = RetryTaskCustomExc.apply([0xFF, 0xFFFF], {"kwarg": 0xF})
|
|
|
+ retry_task_customexc.__class__.max_retries = 3
|
|
|
+ retry_task_customexc.iterations = 0
|
|
|
+ result = retry_task_customexc.apply([0xFF, 0xFFFF], {"kwarg": 0xF})
|
|
|
self.assertEqual(result.get(), 0xFF + 0xF)
|
|
|
- self.assertEqual(RetryTaskCustomExc.iterations, 4)
|
|
|
+ self.assertEqual(retry_task_customexc.iterations, 4)
|
|
|
|
|
|
def test_retry_with_custom_exception(self):
|
|
|
- RetryTaskCustomExc.max_retries = 2
|
|
|
- RetryTaskCustomExc.iterations = 0
|
|
|
- result = RetryTaskCustomExc.apply([0xFF, 0xFFFF], {"kwarg": 0xF})
|
|
|
+ retry_task_customexc.__class__.max_retries = 2
|
|
|
+ retry_task_customexc.iterations = 0
|
|
|
+ result = retry_task_customexc.apply([0xFF, 0xFFFF], {"kwarg": 0xF})
|
|
|
with self.assertRaises(MyCustomException):
|
|
|
result.get()
|
|
|
- self.assertEqual(RetryTaskCustomExc.iterations, 3)
|
|
|
+ self.assertEqual(retry_task_customexc.iterations, 3)
|
|
|
|
|
|
def test_max_retries_exceeded(self):
|
|
|
- RetryTask.max_retries = 2
|
|
|
- RetryTask.iterations = 0
|
|
|
- result = RetryTask.apply([0xFF, 0xFFFF], {"care": False})
|
|
|
- with self.assertRaises(RetryTask.MaxRetriesExceededError):
|
|
|
+ retry_task.__class__.max_retries = 2
|
|
|
+ retry_task.iterations = 0
|
|
|
+ result = retry_task.apply([0xFF, 0xFFFF], {"care": False})
|
|
|
+ with self.assertRaises(retry_task.MaxRetriesExceededError):
|
|
|
result.get()
|
|
|
- self.assertEqual(RetryTask.iterations, 3)
|
|
|
+ self.assertEqual(retry_task.iterations, 3)
|
|
|
|
|
|
- RetryTask.max_retries = 1
|
|
|
- RetryTask.iterations = 0
|
|
|
- result = RetryTask.apply([0xFF, 0xFFFF], {"care": False})
|
|
|
- with self.assertRaises(RetryTask.MaxRetriesExceededError):
|
|
|
+ retry_task.__class__.max_retries = 1
|
|
|
+ retry_task.iterations = 0
|
|
|
+ result = retry_task.apply([0xFF, 0xFFFF], {"care": False})
|
|
|
+ with self.assertRaises(retry_task.MaxRetriesExceededError):
|
|
|
result.get()
|
|
|
- self.assertEqual(RetryTask.iterations, 2)
|
|
|
+ self.assertEqual(retry_task.iterations, 2)
|
|
|
|
|
|
|
|
|
class TestCeleryTasks(Case):
|
|
@@ -215,19 +193,13 @@ class TestCeleryTasks(Case):
|
|
|
|
|
|
self.assertIs(pickle.loads(pickle.dumps(xxx)), xxx)
|
|
|
|
|
|
- def createTaskCls(self, cls_name, task_name=None):
|
|
|
- attrs = {"__module__": self.__module__}
|
|
|
- if task_name:
|
|
|
- attrs["name"] = task_name
|
|
|
-
|
|
|
- cls = type(cls_name, (task.Task, ), attrs)
|
|
|
- cls.run = return_True
|
|
|
- return cls
|
|
|
+ def createTask(self, name):
|
|
|
+ return task.task(__module__=self.__module__, name=name)(return_True)
|
|
|
|
|
|
def test_AsyncResult(self):
|
|
|
task_id = uuid()
|
|
|
- result = RetryTask.AsyncResult(task_id)
|
|
|
- self.assertEqual(result.backend, RetryTask.backend)
|
|
|
+ result = retry_task.AsyncResult(task_id)
|
|
|
+ self.assertEqual(result.backend, retry_task.backend)
|
|
|
self.assertEqual(result.task_id, task_id)
|
|
|
|
|
|
def assertNextTaskDataEqual(self, consumer, presult, task_name,
|
|
@@ -258,92 +230,87 @@ class TestCeleryTasks(Case):
|
|
|
|
|
|
def test_task_kwargs_must_be_dictionary(self):
|
|
|
with self.assertRaises(ValueError):
|
|
|
- IncrementCounterTask.apply_async([], "str")
|
|
|
+ increment_counter.apply_async([], "str")
|
|
|
|
|
|
def test_task_args_must_be_list(self):
|
|
|
with self.assertRaises(ValueError):
|
|
|
- IncrementCounterTask.apply_async("str", {})
|
|
|
+ increment_counter.apply_async("str", {})
|
|
|
|
|
|
def test_regular_task(self):
|
|
|
- T1 = self.createTaskCls("T1", "c.unittest.t.t1")
|
|
|
- self.assertIsInstance(T1(), T1)
|
|
|
- self.assertTrue(T1().run())
|
|
|
- self.assertTrue(callable(T1()),
|
|
|
+ T1 = self.createTask("c.unittest.t.t1")
|
|
|
+ self.assertIsInstance(T1, task.BaseTask)
|
|
|
+ self.assertTrue(T1.run())
|
|
|
+ self.assertTrue(callable(T1),
|
|
|
"Task class is callable()")
|
|
|
- self.assertTrue(T1()(),
|
|
|
+ self.assertTrue(T1(),
|
|
|
"Task class runs run() when called")
|
|
|
|
|
|
- # task name generated out of class module + name.
|
|
|
- T2 = self.createTaskCls("T2")
|
|
|
- self.assertTrue(T2().name.endswith("test_task.T2"))
|
|
|
-
|
|
|
- t1 = T1()
|
|
|
- consumer = t1.get_consumer()
|
|
|
+ consumer = T1.get_consumer()
|
|
|
with self.assertRaises(NotImplementedError):
|
|
|
consumer.receive("foo", "foo")
|
|
|
consumer.discard_all()
|
|
|
self.assertIsNone(consumer.fetch())
|
|
|
|
|
|
# Without arguments.
|
|
|
- presult = t1.delay()
|
|
|
- self.assertNextTaskDataEqual(consumer, presult, t1.name)
|
|
|
+ presult = T1.delay()
|
|
|
+ self.assertNextTaskDataEqual(consumer, presult, T1.name)
|
|
|
|
|
|
# With arguments.
|
|
|
- presult2 = t1.apply_async(kwargs=dict(name="George Costanza"))
|
|
|
- self.assertNextTaskDataEqual(consumer, presult2, t1.name,
|
|
|
+ presult2 = T1.apply_async(kwargs=dict(name="George Costanza"))
|
|
|
+ self.assertNextTaskDataEqual(consumer, presult2, T1.name,
|
|
|
name="George Costanza")
|
|
|
|
|
|
# send_task
|
|
|
- sresult = send_task(t1.name, kwargs=dict(name="Elaine M. Benes"))
|
|
|
- self.assertNextTaskDataEqual(consumer, sresult, t1.name,
|
|
|
+ sresult = send_task(T1.name, kwargs=dict(name="Elaine M. Benes"))
|
|
|
+ self.assertNextTaskDataEqual(consumer, sresult, T1.name,
|
|
|
name="Elaine M. Benes")
|
|
|
|
|
|
# With eta.
|
|
|
- presult2 = t1.apply_async(kwargs=dict(name="George Costanza"),
|
|
|
+ presult2 = T1.apply_async(kwargs=dict(name="George Costanza"),
|
|
|
eta=datetime.utcnow() + timedelta(days=1),
|
|
|
expires=datetime.utcnow() + timedelta(days=2))
|
|
|
- self.assertNextTaskDataEqual(consumer, presult2, t1.name,
|
|
|
+ self.assertNextTaskDataEqual(consumer, presult2, T1.name,
|
|
|
name="George Costanza", test_eta=True, test_expires=True)
|
|
|
|
|
|
# With countdown.
|
|
|
- presult2 = t1.apply_async(kwargs=dict(name="George Costanza"),
|
|
|
+ presult2 = T1.apply_async(kwargs=dict(name="George Costanza"),
|
|
|
countdown=10, expires=12)
|
|
|
- self.assertNextTaskDataEqual(consumer, presult2, t1.name,
|
|
|
+ self.assertNextTaskDataEqual(consumer, presult2, T1.name,
|
|
|
name="George Costanza", test_eta=True, test_expires=True)
|
|
|
|
|
|
# Discarding all tasks.
|
|
|
consumer.discard_all()
|
|
|
- t1.apply_async()
|
|
|
+ T1.apply_async()
|
|
|
self.assertEqual(consumer.discard_all(), 1)
|
|
|
self.assertIsNone(consumer.fetch())
|
|
|
|
|
|
self.assertFalse(presult.successful())
|
|
|
- t1.backend.mark_as_done(presult.task_id, result=None)
|
|
|
+ T1.backend.mark_as_done(presult.task_id, result=None)
|
|
|
self.assertTrue(presult.successful())
|
|
|
|
|
|
- publisher = t1.get_publisher()
|
|
|
+ publisher = T1.get_publisher()
|
|
|
self.assertTrue(publisher.exchange)
|
|
|
|
|
|
def test_context_get(self):
|
|
|
- request = self.createTaskCls("T1", "c.unittest.t.c.g").request
|
|
|
+ request = self.createTask("c.unittest.t.c.g").request
|
|
|
request.foo = 32
|
|
|
self.assertEqual(request.get("foo"), 32)
|
|
|
self.assertEqual(request.get("bar", 36), 36)
|
|
|
request.clear()
|
|
|
|
|
|
def test_task_class_repr(self):
|
|
|
- task = self.createTaskCls("T1", "c.unittest.t.repr")
|
|
|
- self.assertIn("class Task of", repr(task._get_app().Task))
|
|
|
+ task = self.createTask("c.unittest.t.repr")
|
|
|
+ self.assertIn("class Task of", repr(task.app.Task))
|
|
|
|
|
|
def test_after_return(self):
|
|
|
- task = self.createTaskCls("T1", "c.unittest.t.after_return")()
|
|
|
+ task = self.createTask("c.unittest.t.after_return")
|
|
|
task.request.chord = return_True_task.subtask()
|
|
|
task.after_return("SUCCESS", 1.0, "foobar", (), {}, None)
|
|
|
task.request.clear()
|
|
|
|
|
|
def test_send_task_sent_event(self):
|
|
|
- T1 = self.createTaskCls("T1", "c.unittest.t.t1")
|
|
|
- app = T1._get_app()
|
|
|
+ T1 = self.createTask("c.unittest.t.t1")
|
|
|
+ app = T1.app
|
|
|
conn = app.broker_connection()
|
|
|
chan = conn.channel()
|
|
|
app.conf.CELERY_SEND_TASK_SENT_EVENT = True
|
|
@@ -366,11 +333,11 @@ class TestCeleryTasks(Case):
|
|
|
|
|
|
def test_get_publisher(self):
|
|
|
connection = app_or_default().broker_connection()
|
|
|
- p = IncrementCounterTask.get_publisher(connection, auto_declare=False,
|
|
|
- exchange="foo")
|
|
|
+ p = increment_counter.get_publisher(connection, auto_declare=False,
|
|
|
+ exchange="foo")
|
|
|
self.assertEqual(p.exchange.name, "foo")
|
|
|
- p = IncrementCounterTask.get_publisher(connection, auto_declare=False,
|
|
|
- exchange_type="fanout")
|
|
|
+ p = increment_counter.get_publisher(connection, auto_declare=False,
|
|
|
+ exchange_type="fanout")
|
|
|
self.assertEqual(p.exchange.type, "fanout")
|
|
|
|
|
|
def test_update_state(self):
|
|
@@ -406,13 +373,12 @@ class TestCeleryTasks(Case):
|
|
|
self.assertTrue(yyy2.__name__)
|
|
|
|
|
|
def test_get_logger(self):
|
|
|
- T1 = self.createTaskCls("T1", "c.unittest.t.t1")
|
|
|
- t1 = T1()
|
|
|
+ t1 = self.createTask("c.unittest.t.t1")
|
|
|
logfh = WhateverIO()
|
|
|
logger = t1.get_logger(logfile=logfh, loglevel=0)
|
|
|
self.assertTrue(logger)
|
|
|
|
|
|
- T1.request.loglevel = 3
|
|
|
+ t1.request.loglevel = 3
|
|
|
logger = t1.get_logger(logfile=logfh, loglevel=None)
|
|
|
self.assertTrue(logger)
|
|
|
|
|
@@ -427,35 +393,35 @@ class TestTaskSet(Case):
|
|
|
self.assertListEqual(res.join(), [True, True, True, True, True])
|
|
|
|
|
|
def test_counter_taskset(self):
|
|
|
- IncrementCounterTask.count = 0
|
|
|
+ increment_counter.count = 0
|
|
|
ts = task.TaskSet(tasks=[
|
|
|
- IncrementCounterTask.subtask((), {}),
|
|
|
- IncrementCounterTask.subtask((), {"increment_by": 2}),
|
|
|
- IncrementCounterTask.subtask((), {"increment_by": 3}),
|
|
|
- IncrementCounterTask.subtask((), {"increment_by": 4}),
|
|
|
- IncrementCounterTask.subtask((), {"increment_by": 5}),
|
|
|
- IncrementCounterTask.subtask((), {"increment_by": 6}),
|
|
|
- IncrementCounterTask.subtask((), {"increment_by": 7}),
|
|
|
- IncrementCounterTask.subtask((), {"increment_by": 8}),
|
|
|
- IncrementCounterTask.subtask((), {"increment_by": 9}),
|
|
|
+ increment_counter.subtask((), {}),
|
|
|
+ increment_counter.subtask((), {"increment_by": 2}),
|
|
|
+ increment_counter.subtask((), {"increment_by": 3}),
|
|
|
+ increment_counter.subtask((), {"increment_by": 4}),
|
|
|
+ increment_counter.subtask((), {"increment_by": 5}),
|
|
|
+ increment_counter.subtask((), {"increment_by": 6}),
|
|
|
+ increment_counter.subtask((), {"increment_by": 7}),
|
|
|
+ increment_counter.subtask((), {"increment_by": 8}),
|
|
|
+ increment_counter.subtask((), {"increment_by": 9}),
|
|
|
])
|
|
|
self.assertEqual(ts.total, 9)
|
|
|
|
|
|
- consumer = IncrementCounterTask().get_consumer()
|
|
|
+ consumer = increment_counter.get_consumer()
|
|
|
consumer.purge()
|
|
|
consumer.close()
|
|
|
taskset_res = ts.apply_async()
|
|
|
subtasks = taskset_res.subtasks
|
|
|
taskset_id = taskset_res.taskset_id
|
|
|
- consumer = IncrementCounterTask().get_consumer()
|
|
|
+ consumer = increment_counter.get_consumer()
|
|
|
for subtask in subtasks:
|
|
|
m = consumer.fetch().payload
|
|
|
self.assertDictContainsSubset({"taskset": taskset_id,
|
|
|
- "task": IncrementCounterTask.name,
|
|
|
+ "task": increment_counter.name,
|
|
|
"id": subtask.task_id}, m)
|
|
|
- IncrementCounterTask().run(
|
|
|
+ increment_counter(
|
|
|
increment_by=m.get("kwargs", {}).get("increment_by"))
|
|
|
- self.assertEqual(IncrementCounterTask.count, sum(xrange(1, 10)))
|
|
|
+ self.assertEqual(increment_counter.count, sum(xrange(1, 10)))
|
|
|
|
|
|
def test_named_taskset(self):
|
|
|
prefix = "test_named_taskset-"
|
|
@@ -468,35 +434,34 @@ class TestTaskApply(Case):
|
|
|
|
|
|
def test_apply_throw(self):
|
|
|
with self.assertRaises(KeyError):
|
|
|
- RaisingTask.apply(throw=True)
|
|
|
+ raising.apply(throw=True)
|
|
|
|
|
|
def test_apply_with_CELERY_EAGER_PROPAGATES_EXCEPTIONS(self):
|
|
|
- app = RaisingTask._get_app()
|
|
|
- app.conf.CELERY_EAGER_PROPAGATES_EXCEPTIONS = True
|
|
|
+ raising.app.conf.CELERY_EAGER_PROPAGATES_EXCEPTIONS = True
|
|
|
try:
|
|
|
with self.assertRaises(KeyError):
|
|
|
- RaisingTask.apply()
|
|
|
+ raising.apply()
|
|
|
finally:
|
|
|
- app.conf.CELERY_EAGER_PROPAGATES_EXCEPTIONS = False
|
|
|
+ raising.app.conf.CELERY_EAGER_PROPAGATES_EXCEPTIONS = False
|
|
|
|
|
|
def test_apply(self):
|
|
|
- IncrementCounterTask.count = 0
|
|
|
+ increment_counter.count = 0
|
|
|
|
|
|
- e = IncrementCounterTask.apply()
|
|
|
+ e = increment_counter.apply()
|
|
|
self.assertIsInstance(e, EagerResult)
|
|
|
self.assertEqual(e.get(), 1)
|
|
|
|
|
|
- e = IncrementCounterTask.apply(args=[1])
|
|
|
+ e = increment_counter.apply(args=[1])
|
|
|
self.assertEqual(e.get(), 2)
|
|
|
|
|
|
- e = IncrementCounterTask.apply(kwargs={"increment_by": 4})
|
|
|
+ e = increment_counter.apply(kwargs={"increment_by": 4})
|
|
|
self.assertEqual(e.get(), 6)
|
|
|
|
|
|
self.assertTrue(e.successful())
|
|
|
self.assertTrue(e.ready())
|
|
|
self.assertTrue(repr(e).startswith("<EagerResult:"))
|
|
|
|
|
|
- f = RaisingTask.apply()
|
|
|
+ f = raising.apply()
|
|
|
self.assertTrue(f.ready())
|
|
|
self.assertFalse(f.successful())
|
|
|
self.assertTrue(f.traceback)
|
|
@@ -504,8 +469,9 @@ class TestTaskApply(Case):
|
|
|
f.get()
|
|
|
|
|
|
|
|
|
-class MyPeriodic(task.PeriodicTask):
|
|
|
- run_every = timedelta(hours=1)
|
|
|
+@task.periodic_task(run_every=timedelta(hours=1))
|
|
|
+def my_periodic():
|
|
|
+ pass
|
|
|
|
|
|
|
|
|
class TestPeriodicTask(Case):
|
|
@@ -516,18 +482,18 @@ class TestPeriodicTask(Case):
|
|
|
|
|
|
def test_remaining_estimate(self):
|
|
|
self.assertIsInstance(
|
|
|
- MyPeriodic().run_every.remaining_estimate(datetime.utcnow()),
|
|
|
+ my_periodic.run_every.remaining_estimate(datetime.utcnow()),
|
|
|
timedelta)
|
|
|
|
|
|
def test_is_due_not_due(self):
|
|
|
- due, remaining = MyPeriodic().run_every.is_due(datetime.utcnow())
|
|
|
+ due, remaining = my_periodic.run_every.is_due(datetime.utcnow())
|
|
|
self.assertFalse(due)
|
|
|
# This assertion may fail if executed in the
|
|
|
# first minute of an hour, thus 59 instead of 60
|
|
|
self.assertGreater(remaining, 59)
|
|
|
|
|
|
def test_is_due(self):
|
|
|
- p = MyPeriodic()
|
|
|
+ p = my_periodic
|
|
|
due, remaining = p.run_every.is_due(
|
|
|
datetime.utcnow() - p.run_every.run_every)
|
|
|
self.assertTrue(due)
|
|
@@ -535,28 +501,34 @@ class TestPeriodicTask(Case):
|
|
|
timedelta_seconds(p.run_every.run_every))
|
|
|
|
|
|
def test_schedule_repr(self):
|
|
|
- p = MyPeriodic()
|
|
|
+ p = my_periodic
|
|
|
self.assertTrue(repr(p.run_every))
|
|
|
|
|
|
|
|
|
-class EveryMinutePeriodic(task.PeriodicTask):
|
|
|
- run_every = crontab()
|
|
|
+@task.periodic_task(run_every=crontab())
|
|
|
+def every_minute():
|
|
|
+ pass
|
|
|
|
|
|
|
|
|
-class QuarterlyPeriodic(task.PeriodicTask):
|
|
|
- run_every = crontab(minute="*/15")
|
|
|
+@task.periodic_task(run_every=crontab(minute="*/15"))
|
|
|
+def quarterly():
|
|
|
+ pass
|
|
|
|
|
|
|
|
|
-class HourlyPeriodic(task.PeriodicTask):
|
|
|
- run_every = crontab(minute=30)
|
|
|
+@task.periodic_task(run_every=crontab(minute=30))
|
|
|
+def hourly():
|
|
|
+ pass
|
|
|
|
|
|
|
|
|
-class DailyPeriodic(task.PeriodicTask):
|
|
|
- run_every = crontab(hour=7, minute=30)
|
|
|
+@task.periodic_task(run_every=crontab(hour=7, minute=30))
|
|
|
+def daily():
|
|
|
+ pass
|
|
|
|
|
|
|
|
|
-class WeeklyPeriodic(task.PeriodicTask):
|
|
|
- run_every = crontab(hour=7, minute=30, day_of_week="thursday")
|
|
|
+@task.periodic_task(run_every=crontab(hour=7, minute=30,
|
|
|
+ day_of_week="thursday"))
|
|
|
+def weekly():
|
|
|
+ pass
|
|
|
|
|
|
|
|
|
def patch_crontab_nowfun(cls, retval):
|
|
@@ -788,100 +760,106 @@ class test_crontab_is_due(Case):
|
|
|
|
|
|
def test_every_minute_execution_is_due(self):
|
|
|
last_ran = self.now - timedelta(seconds=61)
|
|
|
- due, remaining = EveryMinutePeriodic().run_every.is_due(last_ran)
|
|
|
+ due, remaining = every_minute.run_every.is_due(last_ran)
|
|
|
self.assertTrue(due)
|
|
|
self.seconds_almost_equal(remaining, self.next_minute, 1)
|
|
|
|
|
|
def test_every_minute_execution_is_not_due(self):
|
|
|
last_ran = self.now - timedelta(seconds=self.now.second)
|
|
|
- due, remaining = EveryMinutePeriodic().run_every.is_due(last_ran)
|
|
|
+ due, remaining = every_minute.run_every.is_due(last_ran)
|
|
|
self.assertFalse(due)
|
|
|
self.seconds_almost_equal(remaining, self.next_minute, 1)
|
|
|
|
|
|
# 29th of May 2010 is a saturday
|
|
|
- @patch_crontab_nowfun(HourlyPeriodic, datetime(2010, 5, 29, 10, 30))
|
|
|
+ @patch_crontab_nowfun(hourly, datetime(2010, 5, 29, 10, 30))
|
|
|
def test_execution_is_due_on_saturday(self):
|
|
|
last_ran = self.now - timedelta(seconds=61)
|
|
|
- due, remaining = EveryMinutePeriodic().run_every.is_due(last_ran)
|
|
|
+ due, remaining = every_minute.run_every.is_due(last_ran)
|
|
|
self.assertTrue(due)
|
|
|
self.seconds_almost_equal(remaining, self.next_minute, 1)
|
|
|
|
|
|
# 30th of May 2010 is a sunday
|
|
|
- @patch_crontab_nowfun(HourlyPeriodic, datetime(2010, 5, 30, 10, 30))
|
|
|
+ @patch_crontab_nowfun(hourly, datetime(2010, 5, 30, 10, 30))
|
|
|
def test_execution_is_due_on_sunday(self):
|
|
|
last_ran = self.now - timedelta(seconds=61)
|
|
|
- due, remaining = EveryMinutePeriodic().run_every.is_due(last_ran)
|
|
|
+ due, remaining = every_minute.run_every.is_due(last_ran)
|
|
|
self.assertTrue(due)
|
|
|
self.seconds_almost_equal(remaining, self.next_minute, 1)
|
|
|
|
|
|
# 31st of May 2010 is a monday
|
|
|
- @patch_crontab_nowfun(HourlyPeriodic, datetime(2010, 5, 31, 10, 30))
|
|
|
+ @patch_crontab_nowfun(hourly, datetime(2010, 5, 31, 10, 30))
|
|
|
def test_execution_is_due_on_monday(self):
|
|
|
last_ran = self.now - timedelta(seconds=61)
|
|
|
- due, remaining = EveryMinutePeriodic().run_every.is_due(last_ran)
|
|
|
+ due, remaining = every_minute.run_every.is_due(last_ran)
|
|
|
self.assertTrue(due)
|
|
|
self.seconds_almost_equal(remaining, self.next_minute, 1)
|
|
|
|
|
|
- @patch_crontab_nowfun(HourlyPeriodic, datetime(2010, 5, 10, 10, 30))
|
|
|
+ @patch_crontab_nowfun(hourly, datetime(2010, 5, 10, 10, 30))
|
|
|
def test_every_hour_execution_is_due(self):
|
|
|
- due, remaining = HourlyPeriodic().run_every.is_due(datetime(2010, 5, 10, 6, 30))
|
|
|
+ due, remaining = hourly.run_every.is_due(
|
|
|
+ datetime(2010, 5, 10, 6, 30))
|
|
|
self.assertTrue(due)
|
|
|
self.assertEqual(remaining, 60 * 60)
|
|
|
|
|
|
- @patch_crontab_nowfun(HourlyPeriodic, datetime(2010, 5, 10, 10, 29))
|
|
|
+ @patch_crontab_nowfun(hourly, datetime(2010, 5, 10, 10, 29))
|
|
|
def test_every_hour_execution_is_not_due(self):
|
|
|
- due, remaining = HourlyPeriodic().run_every.is_due(datetime(2010, 5, 10, 9, 30))
|
|
|
+ due, remaining = hourly.run_every.is_due(
|
|
|
+ datetime(2010, 5, 10, 9, 30))
|
|
|
self.assertFalse(due)
|
|
|
self.assertEqual(remaining, 60)
|
|
|
|
|
|
- @patch_crontab_nowfun(QuarterlyPeriodic, datetime(2010, 5, 10, 10, 15))
|
|
|
+ @patch_crontab_nowfun(quarterly, datetime(2010, 5, 10, 10, 15))
|
|
|
def test_first_quarter_execution_is_due(self):
|
|
|
- due, remaining = QuarterlyPeriodic().run_every.is_due(
|
|
|
+ due, remaining = quarterly.run_every.is_due(
|
|
|
datetime(2010, 5, 10, 6, 30))
|
|
|
self.assertTrue(due)
|
|
|
self.assertEqual(remaining, 15 * 60)
|
|
|
|
|
|
- @patch_crontab_nowfun(QuarterlyPeriodic, datetime(2010, 5, 10, 10, 30))
|
|
|
+ @patch_crontab_nowfun(quarterly, datetime(2010, 5, 10, 10, 30))
|
|
|
def test_second_quarter_execution_is_due(self):
|
|
|
- due, remaining = QuarterlyPeriodic().run_every.is_due(
|
|
|
+ due, remaining = quarterly.run_every.is_due(
|
|
|
datetime(2010, 5, 10, 6, 30))
|
|
|
self.assertTrue(due)
|
|
|
self.assertEqual(remaining, 15 * 60)
|
|
|
|
|
|
- @patch_crontab_nowfun(QuarterlyPeriodic, datetime(2010, 5, 10, 10, 14))
|
|
|
+ @patch_crontab_nowfun(quarterly, datetime(2010, 5, 10, 10, 14))
|
|
|
def test_first_quarter_execution_is_not_due(self):
|
|
|
- due, remaining = QuarterlyPeriodic().run_every.is_due(
|
|
|
+ due, remaining = quarterly.run_every.is_due(
|
|
|
datetime(2010, 5, 10, 10, 0))
|
|
|
self.assertFalse(due)
|
|
|
self.assertEqual(remaining, 60)
|
|
|
|
|
|
- @patch_crontab_nowfun(QuarterlyPeriodic, datetime(2010, 5, 10, 10, 29))
|
|
|
+ @patch_crontab_nowfun(quarterly, datetime(2010, 5, 10, 10, 29))
|
|
|
def test_second_quarter_execution_is_not_due(self):
|
|
|
- due, remaining = QuarterlyPeriodic().run_every.is_due(
|
|
|
+ due, remaining = quarterly.run_every.is_due(
|
|
|
datetime(2010, 5, 10, 10, 15))
|
|
|
self.assertFalse(due)
|
|
|
self.assertEqual(remaining, 60)
|
|
|
|
|
|
- @patch_crontab_nowfun(DailyPeriodic, datetime(2010, 5, 10, 7, 30))
|
|
|
+ @patch_crontab_nowfun(daily, datetime(2010, 5, 10, 7, 30))
|
|
|
def test_daily_execution_is_due(self):
|
|
|
- due, remaining = DailyPeriodic().run_every.is_due(datetime(2010, 5, 9, 7, 30))
|
|
|
+ due, remaining = daily.run_every.is_due(
|
|
|
+ datetime(2010, 5, 9, 7, 30))
|
|
|
self.assertTrue(due)
|
|
|
self.assertEqual(remaining, 24 * 60 * 60)
|
|
|
|
|
|
- @patch_crontab_nowfun(DailyPeriodic, datetime(2010, 5, 10, 10, 30))
|
|
|
+ @patch_crontab_nowfun(daily, datetime(2010, 5, 10, 10, 30))
|
|
|
def test_daily_execution_is_not_due(self):
|
|
|
- due, remaining = DailyPeriodic().run_every.is_due(datetime(2010, 5, 10, 7, 30))
|
|
|
+ due, remaining = daily.run_every.is_due(
|
|
|
+ datetime(2010, 5, 10, 7, 30))
|
|
|
self.assertFalse(due)
|
|
|
self.assertEqual(remaining, 21 * 60 * 60)
|
|
|
|
|
|
- @patch_crontab_nowfun(WeeklyPeriodic, datetime(2010, 5, 6, 7, 30))
|
|
|
+ @patch_crontab_nowfun(weekly, datetime(2010, 5, 6, 7, 30))
|
|
|
def test_weekly_execution_is_due(self):
|
|
|
- due, remaining = WeeklyPeriodic().run_every.is_due(datetime(2010, 4, 30, 7, 30))
|
|
|
+ due, remaining = weekly.run_every.is_due(
|
|
|
+ datetime(2010, 4, 30, 7, 30))
|
|
|
self.assertTrue(due)
|
|
|
self.assertEqual(remaining, 7 * 24 * 60 * 60)
|
|
|
|
|
|
- @patch_crontab_nowfun(WeeklyPeriodic, datetime(2010, 5, 7, 10, 30))
|
|
|
+ @patch_crontab_nowfun(weekly, datetime(2010, 5, 7, 10, 30))
|
|
|
def test_weekly_execution_is_not_due(self):
|
|
|
- due, remaining = WeeklyPeriodic().run_every.is_due(datetime(2010, 5, 6, 7, 30))
|
|
|
+ due, remaining = weekly.run_every.is_due(
|
|
|
+ datetime(2010, 5, 6, 7, 30))
|
|
|
self.assertFalse(due)
|
|
|
self.assertEqual(remaining, 6 * 24 * 60 * 60 - 3 * 60 * 60)
|