test_basic.py 2.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859
  1. import operator
  2. import time
  3. from celery.tests.functional import tasks
  4. from celery.tests.functional.case import WorkerCase
  5. from celery.task.control import broadcast
  6. class test_basic(WorkerCase):
  7. def test_started(self):
  8. self.assertWorkerAlive()
  9. def test_roundtrip_simple_task(self):
  10. publisher = tasks.add.get_publisher()
  11. results = [(tasks.add.apply_async(i, publisher=publisher), i)
  12. for i in zip(xrange(100), xrange(100))]
  13. for result, i in results:
  14. self.assertEqual(result.get(timeout=10), operator.add(*i))
  15. def test_dump_active(self, sleep=1):
  16. r1 = tasks.sleeptask.delay(sleep)
  17. r2 = tasks.sleeptask.delay(sleep)
  18. self.ensure_accepted(r1.task_id)
  19. active = self.inspect().active(safe=True)
  20. self.assertTrue(active)
  21. active = active[self.worker.hostname]
  22. self.assertEqual(len(active), 2)
  23. self.assertEqual(active[0]["name"], tasks.sleeptask.name)
  24. self.assertEqual(active[0]["args"], [sleep])
  25. def test_dump_reserved(self, sleep=1):
  26. r1 = tasks.sleeptask.delay(sleep)
  27. r2 = tasks.sleeptask.delay(sleep)
  28. r3 = tasks.sleeptask.delay(sleep)
  29. r4 = tasks.sleeptask.delay(sleep)
  30. self.ensure_accepted(r1.task_id)
  31. reserved = self.inspect().reserved(safe=True)
  32. self.assertTrue(reserved)
  33. reserved = reserved[self.worker.hostname]
  34. self.assertEqual(reserved[0]["name"], tasks.sleeptask.name)
  35. self.assertEqual(reserved[0]["args"], [sleep])
  36. def test_dump_schedule(self, countdown=1):
  37. r1 = tasks.add.apply_async((2, 2), countdown=countdown)
  38. r2 = tasks.add.apply_async((2, 2), countdown=countdown)
  39. self.ensure_scheduled(r1.task_id, interval=0.1)
  40. schedule = self.inspect().scheduled(safe=True)
  41. self.assertTrue(schedule)
  42. schedule = schedule[self.worker.hostname]
  43. self.assertTrue(len(schedule), 2)
  44. self.assertEqual(schedule[0]["request"]["name"], tasks.add.name)
  45. self.assertEqual(schedule[0]["request"]["args"], [2, 2])
  46. if __name__ == "__main__":
  47. from unittest2 import main
  48. main()