| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566 | 
							- import operator
 
- import os
 
- import sys
 
- import time
 
- # funtest config
 
- sys.path.insert(0, os.getcwd())
 
- sys.path.insert(0, os.path.join(os.getcwd(), os.pardir))
 
- import suite
 
- from celery.tests.utils import unittest
 
- from celery.tests.functional import tasks
 
- from celery.tests.functional.case import WorkerCase
 
- from celery.task.control import broadcast
 
- class test_basic(WorkerCase):
 
-     def test_started(self):
 
-         self.assertWorkerAlive()
 
-     def test_roundtrip_simple_task(self):
 
-         publisher = tasks.add.get_publisher()
 
-         results = [(tasks.add.apply_async(i, publisher=publisher), i)
 
-                         for i in zip(xrange(100), xrange(100))]
 
-         for result, i in results:
 
-             self.assertEqual(result.get(timeout=10), operator.add(*i))
 
-     def test_dump_active(self, sleep=1):
 
-         r1 = tasks.sleeptask.delay(sleep)
 
-         r2 = tasks.sleeptask.delay(sleep)
 
-         self.ensure_accepted(r1.task_id)
 
-         active = self.inspect().active(safe=True)
 
-         self.assertTrue(active)
 
-         active = active[self.worker.hostname]
 
-         self.assertEqual(len(active), 2)
 
-         self.assertEqual(active[0]["name"], tasks.sleeptask.name)
 
-         self.assertEqual(active[0]["args"], [sleep])
 
-     def test_dump_reserved(self, sleep=1):
 
-         r1 = tasks.sleeptask.delay(sleep)
 
-         r2 = tasks.sleeptask.delay(sleep)
 
-         r3 = tasks.sleeptask.delay(sleep)
 
-         r4 = tasks.sleeptask.delay(sleep)
 
-         self.ensure_accepted(r1.task_id)
 
-         reserved = self.inspect().reserved(safe=True)
 
-         self.assertTrue(reserved)
 
-         reserved = reserved[self.worker.hostname]
 
-         self.assertEqual(reserved[0]["name"], tasks.sleeptask.name)
 
-         self.assertEqual(reserved[0]["args"], [sleep])
 
-     def test_dump_schedule(self, countdown=1):
 
-         r1 = tasks.add.apply_async((2, 2), countdown=countdown)
 
-         r2 = tasks.add.apply_async((2, 2), countdown=countdown)
 
-         self.ensure_scheduled(r1.task_id, interval=0.1)
 
-         schedule = self.inspect().scheduled(safe=True)
 
-         self.assertTrue(schedule)
 
-         schedule = schedule[self.worker.hostname]
 
-         self.assertTrue(len(schedule), 2)
 
-         self.assertEqual(schedule[0]["request"]["name"], tasks.add.name)
 
-         self.assertEqual(schedule[0]["request"]["args"], [2, 2])
 
- if __name__ == "__main__":
 
-     unittest.main()
 
 
  |