test_basic.py 2.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162
  1. from __future__ import absolute_import
  2. import operator
  3. # funtest config
  4. import suite # noqa
  5. from celery.five import range
  6. from celery.tests.case import unittest
  7. from celery.tests.functional import tasks
  8. from celery.tests.functional.case import WorkerCase
  9. class test_basic(WorkerCase):
  10. def test_started(self):
  11. self.assertWorkerAlive()
  12. def test_roundtrip_simple_task(self):
  13. publisher = tasks.add.get_publisher()
  14. results = [(tasks.add.apply_async(i, publisher=publisher), i)
  15. for i in zip(range(100), range(100))]
  16. for result, i in results:
  17. self.assertEqual(result.get(timeout=10), operator.add(*i))
  18. def test_dump_active(self, sleep=1):
  19. r1 = tasks.sleeptask.delay(sleep)
  20. tasks.sleeptask.delay(sleep)
  21. self.ensure_accepted(r1.id)
  22. active = self.inspect().active(safe=True)
  23. self.assertTrue(active)
  24. active = active[self.worker.hostname]
  25. self.assertEqual(len(active), 2)
  26. self.assertEqual(active[0]['name'], tasks.sleeptask.name)
  27. self.assertEqual(active[0]['args'], [sleep])
  28. def test_dump_reserved(self, sleep=1):
  29. r1 = tasks.sleeptask.delay(sleep)
  30. tasks.sleeptask.delay(sleep)
  31. tasks.sleeptask.delay(sleep)
  32. tasks.sleeptask.delay(sleep)
  33. self.ensure_accepted(r1.id)
  34. reserved = self.inspect().reserved(safe=True)
  35. self.assertTrue(reserved)
  36. reserved = reserved[self.worker.hostname]
  37. self.assertEqual(reserved[0]['name'], tasks.sleeptask.name)
  38. self.assertEqual(reserved[0]['args'], [sleep])
  39. def test_dump_schedule(self, countdown=1):
  40. r1 = tasks.add.apply_async((2, 2), countdown=countdown)
  41. tasks.add.apply_async((2, 2), countdown=countdown)
  42. self.ensure_scheduled(r1.id, interval=0.1)
  43. schedule = self.inspect().scheduled(safe=True)
  44. self.assertTrue(schedule)
  45. schedule = schedule[self.worker.hostname]
  46. self.assertTrue(len(schedule), 2)
  47. self.assertEqual(schedule[0]['request']['name'], tasks.add.name)
  48. self.assertEqual(schedule[0]['request']['args'], [2, 2])
  49. if __name__ == '__main__':
  50. unittest.main()