views.py 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. from django.http import HttpResponse, Http404
  2. from anyjson import serialize as JSON_dump
  3. from billiard.utils.functional import wraps
  4. from celery.utils import get_full_cls_name
  5. from celery.result import AsyncResult
  6. from celery.registry import tasks
  7. from celery.backends import default_backend
  8. def task_view(task):
  9. """Decorator turning any task into a view that applies the task
  10. asynchronously.
  11. Returns a JSON dictionary containing the keys ``ok``, and
  12. ``task_id``.
  13. """
  14. def _applier(request, **options):
  15. kwargs = request.method == "POST" and \
  16. request.POST.copy() or request.GET.copy()
  17. kwargs = dict((key.encode("utf-8"), value)
  18. for key, value in kwargs.items())
  19. result = task.apply_async(kwargs=kwargs)
  20. response_data = {"ok": "true", "task_id": result.task_id}
  21. return HttpResponse(JSON_dump(response_data),
  22. mimetype="application/json")
  23. return _applier
  24. def apply(request, task_name):
  25. """View applying a task.
  26. **Note:** Please use this with caution. Preferably you shouldn't make this
  27. publicly accessible without ensuring your code is safe!
  28. """
  29. try:
  30. task = tasks[task_name]
  31. except KeyError:
  32. raise Http404("apply: no such task")
  33. return task_view(task)(request)
  34. def is_task_successful(request, task_id):
  35. """Returns task execute status in JSON format."""
  36. response_data = {"task": {"id": task_id,
  37. "executed": AsyncResult(task_id).successful()}}
  38. return HttpResponse(JSON_dump(response_data), mimetype="application/json")
  39. def task_status(request, task_id):
  40. """Returns task status and result in JSON format."""
  41. status = default_backend.get_status(task_id)
  42. res = default_backend.get_result(task_id)
  43. response_data = dict(id=task_id, status=status, result=res)
  44. if status in default_backend.EXCEPTION_STATES:
  45. traceback = default_backend.get_traceback(task_id)
  46. response_data.update({"result": str(res.args[0]),
  47. "exc": get_full_cls_name(res.__class__),
  48. "traceback": traceback})
  49. return HttpResponse(JSON_dump({"task": response_data}),
  50. mimetype="application/json")
  51. def task_webhook(fun):
  52. """Decorator turning a function into a task webhook.
  53. If an exception is raised within the function, the decorated
  54. function catches this and returns an error JSON response, otherwise
  55. it returns the result as a JSON response.
  56. Example:
  57. .. code-block:: python
  58. @task_webhook
  59. def add(request):
  60. x = int(request.GET["x"])
  61. y = int(request.GET["y"])
  62. return x + y
  63. >>> response = add(request)
  64. >>> response.content
  65. '{"status": "success", "retval": 100}'
  66. """
  67. @wraps(fun)
  68. def _inner(*args, **kwargs):
  69. try:
  70. retval = fun(*args, **kwargs)
  71. except Exception, exc:
  72. response = {"status": "failure", "reason": str(exc)}
  73. else:
  74. response = {"status": "success", "retval": retval}
  75. return HttpResponse(JSON_dump(response), mimetype="application/json")
  76. return _inner