test_basic.py 1007 B

1234567891011121314151617181920212223242526272829303132
  1. import operator
  2. import time
  3. from celery.task.control import broadcast
  4. from celery.tests.functional import tasks
  5. from celery.tests.functional.case import WorkerCase
  6. class test_basic(WorkerCase):
  7. def test_started(self):
  8. self.assertWorkerAlive()
  9. def test_roundtrip_simple_task(self):
  10. publisher = tasks.add.get_publisher()
  11. results = [(tasks.add.apply_async(i, publisher=publisher), i)
  12. for i in zip(xrange(100), xrange(100))]
  13. for result, i in results:
  14. self.assertEqual(result.get(timeout=10), operator.add(*i))
  15. def test_dump_active(self):
  16. tasks.sleeptask.delay(2)
  17. tasks.sleeptask.delay(2)
  18. time.sleep(0.2)
  19. r = broadcast("dump_active",
  20. arguments={"safe": True}, reply=True)
  21. active = self.my_response(r)
  22. self.assertEqual(len(active), 2)
  23. self.assertEqual(active[0]["name"], tasks.sleeptask.name)
  24. self.assertEqual(active[0]["args"], [2])