test_basic.py 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566
  1. import operator
  2. import os
  3. import sys
  4. import time
  5. # funtest config
  6. sys.path.insert(0, os.getcwd())
  7. sys.path.insert(0, os.path.join(os.getcwd(), os.pardir))
  8. import suite
  9. from celery.tests.utils import unittest
  10. from celery.tests.functional import tasks
  11. from celery.tests.functional.case import WorkerCase
  12. from celery.task.control import broadcast
  13. class test_basic(WorkerCase):
  14. def test_started(self):
  15. self.assertWorkerAlive()
  16. def test_roundtrip_simple_task(self):
  17. publisher = tasks.add.get_publisher()
  18. results = [(tasks.add.apply_async(i, publisher=publisher), i)
  19. for i in zip(xrange(100), xrange(100))]
  20. for result, i in results:
  21. self.assertEqual(result.get(timeout=10), operator.add(*i))
  22. def test_dump_active(self, sleep=1):
  23. r1 = tasks.sleeptask.delay(sleep)
  24. r2 = tasks.sleeptask.delay(sleep)
  25. self.ensure_accepted(r1.id)
  26. active = self.inspect().active(safe=True)
  27. self.assertTrue(active)
  28. active = active[self.worker.hostname]
  29. self.assertEqual(len(active), 2)
  30. self.assertEqual(active[0]["name"], tasks.sleeptask.name)
  31. self.assertEqual(active[0]["args"], [sleep])
  32. def test_dump_reserved(self, sleep=1):
  33. r1 = tasks.sleeptask.delay(sleep)
  34. r2 = tasks.sleeptask.delay(sleep)
  35. r3 = tasks.sleeptask.delay(sleep)
  36. r4 = tasks.sleeptask.delay(sleep)
  37. self.ensure_accepted(r1.id)
  38. reserved = self.inspect().reserved(safe=True)
  39. self.assertTrue(reserved)
  40. reserved = reserved[self.worker.hostname]
  41. self.assertEqual(reserved[0]["name"], tasks.sleeptask.name)
  42. self.assertEqual(reserved[0]["args"], [sleep])
  43. def test_dump_schedule(self, countdown=1):
  44. r1 = tasks.add.apply_async((2, 2), countdown=countdown)
  45. r2 = tasks.add.apply_async((2, 2), countdown=countdown)
  46. self.ensure_scheduled(r1.id, interval=0.1)
  47. schedule = self.inspect().scheduled(safe=True)
  48. self.assertTrue(schedule)
  49. schedule = schedule[self.worker.hostname]
  50. self.assertTrue(len(schedule), 2)
  51. self.assertEqual(schedule[0]["request"]["name"], tasks.add.name)
  52. self.assertEqual(schedule[0]["request"]["args"], [2, 2])
  53. if __name__ == "__main__":
  54. unittest.main()