test_basic.py 2.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465
  1. import operator
  2. import os
  3. import sys
  4. import time
  5. # funtest config
  6. sys.path.insert(0, os.getcwd())
  7. sys.path.insert(0, os.path.join(os.getcwd(), os.pardir))
  8. import suite
  9. from celery.five import range
  10. from celery.tests.utils import unittest
  11. from celery.tests.functional import tasks
  12. from celery.tests.functional.case import WorkerCase
  13. class test_basic(WorkerCase):
  14. def test_started(self):
  15. self.assertWorkerAlive()
  16. def test_roundtrip_simple_task(self):
  17. publisher = tasks.add.get_publisher()
  18. results = [(tasks.add.apply_async(i, publisher=publisher), i)
  19. for i in zip(range(100), range(100))]
  20. for result, i in results:
  21. self.assertEqual(result.get(timeout=10), operator.add(*i))
  22. def test_dump_active(self, sleep=1):
  23. r1 = tasks.sleeptask.delay(sleep)
  24. r2 = tasks.sleeptask.delay(sleep)
  25. self.ensure_accepted(r1.id)
  26. active = self.inspect().active(safe=True)
  27. self.assertTrue(active)
  28. active = active[self.worker.hostname]
  29. self.assertEqual(len(active), 2)
  30. self.assertEqual(active[0]['name'], tasks.sleeptask.name)
  31. self.assertEqual(active[0]['args'], [sleep])
  32. def test_dump_reserved(self, sleep=1):
  33. r1 = tasks.sleeptask.delay(sleep)
  34. r2 = tasks.sleeptask.delay(sleep)
  35. r3 = tasks.sleeptask.delay(sleep)
  36. r4 = tasks.sleeptask.delay(sleep)
  37. self.ensure_accepted(r1.id)
  38. reserved = self.inspect().reserved(safe=True)
  39. self.assertTrue(reserved)
  40. reserved = reserved[self.worker.hostname]
  41. self.assertEqual(reserved[0]['name'], tasks.sleeptask.name)
  42. self.assertEqual(reserved[0]['args'], [sleep])
  43. def test_dump_schedule(self, countdown=1):
  44. r1 = tasks.add.apply_async((2, 2), countdown=countdown)
  45. r2 = tasks.add.apply_async((2, 2), countdown=countdown)
  46. self.ensure_scheduled(r1.id, interval=0.1)
  47. schedule = self.inspect().scheduled(safe=True)
  48. self.assertTrue(schedule)
  49. schedule = schedule[self.worker.hostname]
  50. self.assertTrue(len(schedule), 2)
  51. self.assertEqual(schedule[0]['request']['name'], tasks.add.name)
  52. self.assertEqual(schedule[0]['request']['args'], [2, 2])
  53. if __name__ == '__main__':
  54. unittest.main()