builtins.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186
  1. from datetime import datetime
  2. from celery import conf
  3. from celery import log
  4. from celery.backends import default_backend
  5. from celery.registry import tasks
  6. from celery.utils import timeutils
  7. from celery.worker import state
  8. from celery.worker.state import revoked
  9. from celery.worker.control.registry import Panel
  10. TASK_INFO_FIELDS = ("exchange", "routing_key", "rate_limit")
  11. @Panel.register
  12. def diagnose(panel, timeout=None, **kwargs):
  13. info = panel.listener.pool.diagnose(timeout=timeout)
  14. if info["waiting"]:
  15. panel.logger.error("Diagnose failed: %r" % (info, ))
  16. return {"error": info}
  17. panel.logger.info("Diagnose complete: %r" % (info, ))
  18. return {"ok": info}
  19. @Panel.register
  20. def revoke(panel, task_id, **kwargs):
  21. """Revoke task by task id."""
  22. revoked.add(task_id)
  23. panel.logger.warn("Task %s revoked" % (task_id, ))
  24. return {"ok": "task %s revoked" % (task_id, )}
  25. @Panel.register
  26. def enable_events(panel):
  27. dispatcher = panel.listener.event_dispatcher
  28. if not dispatcher.enabled:
  29. dispatcher.enable()
  30. dispatcher.send("worker-online")
  31. panel.logger.warn("Events enabled by remote.")
  32. return {"ok": "events enabled"}
  33. return {"ok": "events already enabled"}
  34. @Panel.register
  35. def disable_events(panel):
  36. dispatcher = panel.listener.event_dispatcher
  37. if dispatcher.enabled:
  38. dispatcher.send("worker-offline")
  39. dispatcher.disable()
  40. panel.logger.warn("Events disabled by remote.")
  41. return {"ok": "events disabled"}
  42. return {"ok": "events already disabled"}
  43. @Panel.register
  44. def set_loglevel(panel, loglevel=None):
  45. if loglevel is not None:
  46. if not isinstance(loglevel, int):
  47. loglevel = conf.LOG_LEVELS[loglevel.upper()]
  48. log.get_default_logger(loglevel=loglevel)
  49. return {"ok": loglevel}
  50. @Panel.register
  51. def rate_limit(panel, task_name, rate_limit, **kwargs):
  52. """Set new rate limit for a task type.
  53. See :attr:`celery.task.base.Task.rate_limit`.
  54. :param task_name: Type of task.
  55. :param rate_limit: New rate limit.
  56. """
  57. try:
  58. timeutils.rate(rate_limit)
  59. except ValueError, exc:
  60. return {"error": "Invalid rate limit string: %s" % exc}
  61. try:
  62. tasks[task_name].rate_limit = rate_limit
  63. except KeyError:
  64. panel.logger.error("Rate limit attempt for unknown task %s" % (
  65. task_name, ))
  66. return {"error": "unknown task"}
  67. if conf.DISABLE_RATE_LIMITS:
  68. panel.logger.error("Rate limit attempt, but rate limits disabled.")
  69. return {"error": "rate limits disabled"}
  70. panel.listener.ready_queue.refresh()
  71. if not rate_limit:
  72. panel.logger.warn("Disabled rate limits for tasks of type %s" % (
  73. task_name, ))
  74. return {"ok": "rate limit disabled successfully"}
  75. panel.logger.warn("New rate limit for tasks of type %s: %s." % (
  76. task_name, rate_limit))
  77. return {"ok": "new rate limit set successfully"}
  78. @Panel.register
  79. def dump_schedule(panel, safe=False, **kwargs):
  80. schedule = panel.listener.eta_schedule.schedule
  81. if not schedule.queue:
  82. panel.logger.info("--Empty schedule--")
  83. return []
  84. formatitem = lambda (i, item): "%s. %s pri%s %r" % (i,
  85. datetime.fromtimestamp(item["eta"]),
  86. item["priority"],
  87. item["item"])
  88. info = map(formatitem, enumerate(schedule.info()))
  89. panel.logger.info("* Dump of current schedule:\n%s" % (
  90. "\n".join(info, )))
  91. scheduled_tasks = []
  92. for item in schedule.info():
  93. scheduled_tasks.append({"eta": item["eta"],
  94. "priority": item["priority"],
  95. "request": item["item"].info(safe=safe)})
  96. return scheduled_tasks
  97. @Panel.register
  98. def dump_reserved(panel, safe=False, **kwargs):
  99. ready_queue = panel.listener.ready_queue
  100. reserved = ready_queue.items
  101. if not reserved:
  102. panel.logger.info("--Empty queue--")
  103. return []
  104. panel.logger.info("* Dump of currently reserved tasks:\n%s" % (
  105. "\n".join(map(repr, reserved), )))
  106. return [request.info(safe=safe)
  107. for request in reserved]
  108. @Panel.register
  109. def dump_active(panel, safe=False, **kwargs):
  110. return [request.info(safe=safe)
  111. for request in state.active_requests]
  112. @Panel.register
  113. def stats(panel, **kwargs):
  114. return {"total": state.total_count,
  115. "listener": panel.listener.info,
  116. "pool": panel.listener.pool.info}
  117. @Panel.register
  118. def dump_revoked(panel, **kwargs):
  119. return list(state.revoked)
  120. @Panel.register
  121. def dump_tasks(panel, **kwargs):
  122. def _extract_info(task):
  123. fields = dict((field, str(getattr(task, field, None)))
  124. for field in TASK_INFO_FIELDS
  125. if getattr(task, field, None) is not None)
  126. info = map("=".join, fields.items())
  127. if not info:
  128. return task.name
  129. return "%s [%s]" % (task.name, " ".join(info))
  130. info = map(_extract_info, (tasks[task]
  131. for task in sorted(tasks.keys())))
  132. panel.logger.warn("* Dump of currently registered tasks:\n%s" % (
  133. "\n".join(info)))
  134. return info
  135. @Panel.register
  136. def ping(panel, **kwargs):
  137. return "pong"
  138. @Panel.register
  139. def shutdown(panel, **kwargs):
  140. panel.logger.critical("Got shutdown from remote.")
  141. raise SystemExit("Got shutdown from remote")