test_basic.py 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960
  1. import operator
  2. from celery.five import range
  3. from celery.tests.case import unittest
  4. from celery.tests.functional import tasks
  5. from celery.tests.functional.case import WorkerCase
  6. # funtest config
  7. import suite # noqa
  8. class test_basic(WorkerCase):
  9. def test_started(self):
  10. self.assertWorkerAlive()
  11. def test_roundtrip_simple_task(self):
  12. publisher = tasks.add.get_publisher()
  13. results = [(tasks.add.apply_async(i, publisher=publisher), i)
  14. for i in zip(range(100), range(100))]
  15. for result, i in results:
  16. self.assertEqual(result.get(timeout=10), operator.add(*i))
  17. def test_dump_active(self, sleep=1):
  18. r1 = tasks.sleeptask.delay(sleep)
  19. tasks.sleeptask.delay(sleep)
  20. self.ensure_accepted(r1.id)
  21. active = self.inspect().active(safe=True)
  22. self.assertTrue(active)
  23. active = active[self.worker.hostname]
  24. self.assertEqual(len(active), 2)
  25. self.assertEqual(active[0]['name'], tasks.sleeptask.name)
  26. self.assertEqual(active[0]['args'], [sleep])
  27. def test_dump_reserved(self, sleep=1):
  28. r1 = tasks.sleeptask.delay(sleep)
  29. tasks.sleeptask.delay(sleep)
  30. tasks.sleeptask.delay(sleep)
  31. tasks.sleeptask.delay(sleep)
  32. self.ensure_accepted(r1.id)
  33. reserved = self.inspect().reserved(safe=True)
  34. self.assertTrue(reserved)
  35. reserved = reserved[self.worker.hostname]
  36. self.assertEqual(reserved[0]['name'], tasks.sleeptask.name)
  37. self.assertEqual(reserved[0]['args'], [sleep])
  38. def test_dump_schedule(self, countdown=1):
  39. r1 = tasks.add.apply_async((2, 2), countdown=countdown)
  40. tasks.add.apply_async((2, 2), countdown=countdown)
  41. self.ensure_scheduled(r1.id, interval=0.1)
  42. schedule = self.inspect().scheduled(safe=True)
  43. self.assertTrue(schedule)
  44. schedule = schedule[self.worker.hostname]
  45. self.assertTrue(len(schedule), 2)
  46. self.assertEqual(schedule[0]['request']['name'], tasks.add.name)
  47. self.assertEqual(schedule[0]['request']['args'], [2, 2])
  48. if __name__ == '__main__':
  49. unittest.main()