views.py 1.0 KB

12345678910111213141516171819202122232425262728293031
  1. """celery.views"""
  2. from django.http import HttpResponse
  3. from celery.task import is_done, delay_task
  4. from celery.result import AsyncResult
  5. from carrot.serialization import serialize as JSON_dump
  6. def is_task_done(request, task_id):
  7. """Returns task execute status in JSON format."""
  8. response_data = {"task": {"id": task_id, "executed": is_done(task_id)}}
  9. return HttpResponse(JSON_dump(response_data), mimetype="application/json")
  10. def task_status(request, task_id):
  11. """Returns task status and result in JSON format."""
  12. async_result = AsyncResult(task_id)
  13. status = async_result.status
  14. if status == "FAILURE":
  15. response_data = {
  16. "id": task_id,
  17. "status": status,
  18. "result": async_result.result.args[0],
  19. }
  20. else:
  21. response_data = {
  22. "id": task_id,
  23. "status": status,
  24. "result": async_result.result,
  25. }
  26. return HttpResponse(JSON_dump({"task": response_data}),
  27. mimetype="application/json")