api.py 1.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. from functools import wraps
  2. import simplejson
  3. from tornado.web import RequestHandler, Application
  4. from celery.task import revoke
  5. from celery.monitoring.state import monitor_state
  6. def JSON(fun):
  7. @wraps(fun)
  8. def _write_json(self, *args, **kwargs):
  9. content = fun(self, *args, **kwargs)
  10. self.write(simplejson.dumps(content))
  11. return _write_json
  12. class APIHandler(RequestHandler):
  13. def __init__(self, *args, **kwargs):
  14. super(APIHandler, self).__init__(*args, **kwargs)
  15. self.set_header("Content-Type", "application/javascript")
  16. def api_handler(fun):
  17. @JSON
  18. def get(self, *args, **kwargs):
  19. return fun(self, *args, **kwargs)
  20. return type(fun.__name__, (APIHandler, ), {"get": get})
  21. @api_handler
  22. def task_state(request, task_id):
  23. return monitor_state.get_task_info(task_id)
  24. @api_handler
  25. def list_tasks(request):
  26. return monitor_state.tasks_by_time()
  27. @api_handler
  28. def list_tasks_by_name(request, name):
  29. return monitor_state.tasks_by_type()[name]
  30. @api_handler
  31. def list_task_types(request):
  32. return monitor_state.tasks_by_type()
  33. class RevokeTaskHandler(APIHandler):
  34. SUPPORTED_METHODS = ["POST"]
  35. @JSON
  36. def post(self):
  37. task_id = self.get_argument("task_id")
  38. revoke(task_id)
  39. return {"ok": True}
  40. API = [
  41. (r"/task/name/$", list_task_types),
  42. (r"/task/name/(.+?)", list_tasks_by_name),
  43. (r"/task/$", list_tasks),
  44. (r"/revoke/task/", RevokeTaskHandler),
  45. (r"/task/(.+)", task_state),
  46. ]