| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768 | import operatorimport osimport sysimport time# funtest configsys.path.insert(0, os.getcwd())sys.path.insert(0, os.path.join(os.getcwd(), os.pardir))import suitefrom celery.tests.utils import unittestfrom celery.tests.functional import tasksfrom celery.tests.functional.case import WorkerCasefrom celery.task.control import broadcastclass test_basic(WorkerCase):    def test_started(self):        self.assertWorkerAlive()    def test_roundtrip_simple_task(self):        publisher = tasks.add.get_publisher()        results = [(tasks.add.apply_async(i, publisher=publisher), i)                        for i in zip(xrange(100), xrange(100))]        for result, i in results:            self.assertEqual(result.get(timeout=10), operator.add(*i))    def test_dump_active(self, sleep=1):        r1 = tasks.sleeptask.delay(sleep)        r2 = tasks.sleeptask.delay(sleep)        self.ensure_accepted(r1.task_id)        active = self.inspect().active(safe=True)        self.assertTrue(active)        active = active[self.worker.hostname]        self.assertEqual(len(active), 2)        self.assertEqual(active[0]["name"], tasks.sleeptask.name)        self.assertEqual(active[0]["args"], [sleep])    def test_dump_reserved(self, sleep=1):        r1 = tasks.sleeptask.delay(sleep)        r2 = tasks.sleeptask.delay(sleep)        r3 = tasks.sleeptask.delay(sleep)        r4 = tasks.sleeptask.delay(sleep)        self.ensure_accepted(r1.task_id)        reserved = self.inspect().reserved(safe=True)        self.assertTrue(reserved)        reserved = reserved[self.worker.hostname]        self.assertEqual(reserved[0]["name"], tasks.sleeptask.name)        self.assertEqual(reserved[0]["args"], [sleep])    def test_dump_schedule(self, countdown=1):        r1 = tasks.add.apply_async((2, 2), countdown=countdown)        r2 = tasks.add.apply_async((2, 2), countdown=countdown)        self.ensure_scheduled(r1.task_id, interval=0.1)        schedule = self.inspect().scheduled(safe=True)        self.assertTrue(schedule)        schedule = schedule[self.worker.hostname]        self.assertTrue(len(schedule), 2)        self.assertEqual(schedule[0]["request"]["name"], tasks.add.name)        self.assertEqual(schedule[0]["request"]["args"], [2, 2])if __name__ == "__main__":    unittest.main()
 |