test_task_builtins.py 720 B

1234567891011121314151617181920212223242526272829
  1. import unittest2 as unittest
  2. from celery.task.builtins import ExecuteRemoteTask
  3. from celery.task.builtins import PingTask, DeleteExpiredTaskMetaTask
  4. from celery.serialization import pickle
  5. def some_func(i):
  6. return i * i
  7. class TestPingTask(unittest.TestCase):
  8. def test_ping(self):
  9. self.assertEqual(PingTask.apply().get(), 'pong')
  10. class TestRemoteExecuteTask(unittest.TestCase):
  11. def test_execute_remote(self):
  12. self.assertEqual(ExecuteRemoteTask.apply(
  13. args=[pickle.dumps(some_func), [10], {}]).get(),
  14. 100)
  15. class TestDeleteExpiredTaskMetaTask(unittest.TestCase):
  16. def test_run(self):
  17. DeleteExpiredTaskMetaTask.apply()