12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364 |
- import operator
- import os
- import sys
- # funtest config
- sys.path.insert(0, os.getcwd())
- sys.path.insert(0, os.path.join(os.getcwd(), os.pardir))
- import suite # noqa
- from celery.five import range
- from celery.tests.case import unittest
- from celery.tests.functional import tasks
- from celery.tests.functional.case import WorkerCase
- class test_basic(WorkerCase):
- def test_started(self):
- self.assertWorkerAlive()
- def test_roundtrip_simple_task(self):
- publisher = tasks.add.get_publisher()
- results = [(tasks.add.apply_async(i, publisher=publisher), i)
- for i in zip(range(100), range(100))]
- for result, i in results:
- self.assertEqual(result.get(timeout=10), operator.add(*i))
- def test_dump_active(self, sleep=1):
- r1 = tasks.sleeptask.delay(sleep)
- tasks.sleeptask.delay(sleep)
- self.ensure_accepted(r1.id)
- active = self.inspect().active(safe=True)
- self.assertTrue(active)
- active = active[self.worker.hostname]
- self.assertEqual(len(active), 2)
- self.assertEqual(active[0]['name'], tasks.sleeptask.name)
- self.assertEqual(active[0]['args'], [sleep])
- def test_dump_reserved(self, sleep=1):
- r1 = tasks.sleeptask.delay(sleep)
- tasks.sleeptask.delay(sleep)
- tasks.sleeptask.delay(sleep)
- tasks.sleeptask.delay(sleep)
- self.ensure_accepted(r1.id)
- reserved = self.inspect().reserved(safe=True)
- self.assertTrue(reserved)
- reserved = reserved[self.worker.hostname]
- self.assertEqual(reserved[0]['name'], tasks.sleeptask.name)
- self.assertEqual(reserved[0]['args'], [sleep])
- def test_dump_schedule(self, countdown=1):
- r1 = tasks.add.apply_async((2, 2), countdown=countdown)
- tasks.add.apply_async((2, 2), countdown=countdown)
- self.ensure_scheduled(r1.id, interval=0.1)
- schedule = self.inspect().scheduled(safe=True)
- self.assertTrue(schedule)
- schedule = schedule[self.worker.hostname]
- self.assertTrue(len(schedule), 2)
- self.assertEqual(schedule[0]['request']['name'], tasks.add.name)
- self.assertEqual(schedule[0]['request']['args'], [2, 2])
- if __name__ == '__main__':
- unittest.main()
|