|
@@ -4,8 +4,10 @@ from celery.tests.utils import unittest
|
|
|
from datetime import datetime, timedelta
|
|
|
|
|
|
from celery import beat
|
|
|
+from celery import registry
|
|
|
from celery.result import AsyncResult
|
|
|
from celery.schedules import schedule
|
|
|
+from celery.task.base import Task
|
|
|
from celery.utils import gen_unique_id
|
|
|
|
|
|
|
|
@@ -147,6 +149,30 @@ always_pending = mocked_schedule(False, 1)
|
|
|
|
|
|
class test_Scheduler(unittest.TestCase):
|
|
|
|
|
|
+ def test_custom_schedule_dict(self):
|
|
|
+ custom = {"foo": "bar"}
|
|
|
+ scheduler = mScheduler(schedule=custom, lazy=True)
|
|
|
+ self.assertIs(scheduler.data, custom)
|
|
|
+
|
|
|
+ def test_apply_async_uses_registered_task_instances(self):
|
|
|
+ through_task = [False]
|
|
|
+
|
|
|
+ class MockTask(Task):
|
|
|
+
|
|
|
+ @classmethod
|
|
|
+ def apply_async(cls, *args, **kwargs):
|
|
|
+ through_task[0] = True
|
|
|
+
|
|
|
+ assert MockTask.name in registry.tasks
|
|
|
+
|
|
|
+ scheduler = mScheduler()
|
|
|
+ scheduler.apply_async(scheduler.Entry(task=MockTask.name))
|
|
|
+ self.assertTrue(through_task[0])
|
|
|
+
|
|
|
+ def test_info(self):
|
|
|
+ scheduler = mScheduler()
|
|
|
+ self.assertIsInstance(scheduler.info, basestring)
|
|
|
+
|
|
|
def test_due_tick(self):
|
|
|
scheduler = mScheduler()
|
|
|
scheduler.add(name="test_due_tick",
|
|
@@ -227,14 +253,26 @@ class test_Scheduler(unittest.TestCase):
|
|
|
|
|
|
class test_Service(unittest.TestCase):
|
|
|
|
|
|
- def test_start(self):
|
|
|
+ def get_service(self):
|
|
|
sh = MockShelve()
|
|
|
|
|
|
class PersistentScheduler(beat.PersistentScheduler):
|
|
|
persistence = Object()
|
|
|
persistence.open = lambda *a, **kw: sh
|
|
|
+ tick_raises_exit = False
|
|
|
+ shutdown_service = None
|
|
|
|
|
|
- s = beat.Service(scheduler_cls=PersistentScheduler)
|
|
|
+ def tick(self):
|
|
|
+ if self.tick_raises_exit:
|
|
|
+ raise SystemExit()
|
|
|
+ if self.shutdown_service:
|
|
|
+ self.shutdown_service._shutdown.set()
|
|
|
+ return 0.0
|
|
|
+
|
|
|
+ return beat.Service(scheduler_cls=PersistentScheduler), sh
|
|
|
+
|
|
|
+ def test_start(self):
|
|
|
+ s, sh = self.get_service()
|
|
|
self.assertIsInstance(s.schedule, dict)
|
|
|
self.assertIsInstance(s.scheduler, beat.Scheduler)
|
|
|
scheduled = s.schedule.keys()
|
|
@@ -258,6 +296,28 @@ class test_Service(unittest.TestCase):
|
|
|
finally:
|
|
|
s.scheduler._store = p
|
|
|
|
|
|
+ def test_start_embedded_process(self):
|
|
|
+ s, sh = self.get_service()
|
|
|
+ s._shutdown.set()
|
|
|
+ s.start(embedded_process=True)
|
|
|
+
|
|
|
+ def test_start_thread(self):
|
|
|
+ s, sh = self.get_service()
|
|
|
+ s._shutdown.set()
|
|
|
+ s.start(embedded_process=False)
|
|
|
+
|
|
|
+ def test_start_tick_raises_exit_error(self):
|
|
|
+ s, sh = self.get_service()
|
|
|
+ s.scheduler.tick_raises_exit = True
|
|
|
+ s.start()
|
|
|
+ self.assertTrue(s._shutdown.isSet())
|
|
|
+
|
|
|
+ def test_start_manages_one_tick_before_shutdown(self):
|
|
|
+ s, sh = self.get_service()
|
|
|
+ s.scheduler.shutdown_service = s
|
|
|
+ s.start()
|
|
|
+ self.assertTrue(s._shutdown.isSet())
|
|
|
+
|
|
|
|
|
|
class test_EmbeddedService(unittest.TestCase):
|
|
|
|