test_basic.py 2.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364
  1. import operator
  2. import os
  3. import sys
  4. # funtest config
  5. sys.path.insert(0, os.getcwd())
  6. sys.path.insert(0, os.path.join(os.getcwd(), os.pardir))
  7. import suite # noqa
  8. from celery.five import range
  9. from celery.tests.case import unittest
  10. from celery.tests.functional import tasks
  11. from celery.tests.functional.case import WorkerCase
  12. class test_basic(WorkerCase):
  13. def test_started(self):
  14. self.assertWorkerAlive()
  15. def test_roundtrip_simple_task(self):
  16. publisher = tasks.add.get_publisher()
  17. results = [(tasks.add.apply_async(i, publisher=publisher), i)
  18. for i in zip(range(100), range(100))]
  19. for result, i in results:
  20. self.assertEqual(result.get(timeout=10), operator.add(*i))
  21. def test_dump_active(self, sleep=1):
  22. r1 = tasks.sleeptask.delay(sleep)
  23. tasks.sleeptask.delay(sleep)
  24. self.ensure_accepted(r1.id)
  25. active = self.inspect().active(safe=True)
  26. self.assertTrue(active)
  27. active = active[self.worker.hostname]
  28. self.assertEqual(len(active), 2)
  29. self.assertEqual(active[0]['name'], tasks.sleeptask.name)
  30. self.assertEqual(active[0]['args'], [sleep])
  31. def test_dump_reserved(self, sleep=1):
  32. r1 = tasks.sleeptask.delay(sleep)
  33. tasks.sleeptask.delay(sleep)
  34. tasks.sleeptask.delay(sleep)
  35. tasks.sleeptask.delay(sleep)
  36. self.ensure_accepted(r1.id)
  37. reserved = self.inspect().reserved(safe=True)
  38. self.assertTrue(reserved)
  39. reserved = reserved[self.worker.hostname]
  40. self.assertEqual(reserved[0]['name'], tasks.sleeptask.name)
  41. self.assertEqual(reserved[0]['args'], [sleep])
  42. def test_dump_schedule(self, countdown=1):
  43. r1 = tasks.add.apply_async((2, 2), countdown=countdown)
  44. tasks.add.apply_async((2, 2), countdown=countdown)
  45. self.ensure_scheduled(r1.id, interval=0.1)
  46. schedule = self.inspect().scheduled(safe=True)
  47. self.assertTrue(schedule)
  48. schedule = schedule[self.worker.hostname]
  49. self.assertTrue(len(schedule), 2)
  50. self.assertEqual(schedule[0]['request']['name'], tasks.add.name)
  51. self.assertEqual(schedule[0]['request']['args'], [2, 2])
  52. if __name__ == '__main__':
  53. unittest.main()