builtins.py 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. from datetime import datetime
  2. from celery import conf
  3. from celery import log
  4. from celery.backends import default_backend
  5. from celery.registry import tasks
  6. from celery.utils import timeutils
  7. from celery.worker import state
  8. from celery.worker.state import revoked
  9. from celery.worker.control.registry import Panel
  10. TASK_INFO_FIELDS = ("exchange", "routing_key", "rate_limit")
  11. @Panel.register
  12. def revoke(panel, task_id, task_name=None, **kwargs):
  13. """Revoke task by task id."""
  14. revoked.add(task_id)
  15. backend = default_backend
  16. if task_name: # Use custom task backend (if any)
  17. try:
  18. backend = tasks[task_name].backend
  19. except KeyError:
  20. pass
  21. backend.mark_as_revoked(task_id)
  22. panel.logger.warn("Task %s revoked" % (task_id, ))
  23. return {"ok": "task %s revoked" % (task_id, )}
  24. @Panel.register
  25. def enable_events(panel):
  26. dispatcher = panel.listener.event_dispatcher
  27. if not dispatcher.enabled:
  28. dispatcher.enable()
  29. dispatcher.send("worker-online")
  30. panel.logger.warn("Events enabled by remote.")
  31. return {"ok": "events enabled"}
  32. return {"ok": "events already enabled"}
  33. @Panel.register
  34. def disable_events(panel):
  35. dispatcher = panel.listener.event_dispatcher
  36. if dispatcher.enabled:
  37. dispatcher.send("worker-offline")
  38. dispatcher.disable()
  39. panel.logger.warn("Events disabled by remote.")
  40. return {"ok": "events disabled"}
  41. return {"ok": "events already disabled"}
  42. @Panel.register
  43. def set_loglevel(panel, loglevel=None):
  44. if loglevel is not None:
  45. if not isinstance(loglevel, int):
  46. loglevel = conf.LOG_LEVELS[loglevel.upper()]
  47. log.get_default_logger(loglevel=loglevel)
  48. return {"ok": loglevel}
  49. @Panel.register
  50. def rate_limit(panel, task_name, rate_limit, **kwargs):
  51. """Set new rate limit for a task type.
  52. See :attr:`celery.task.base.Task.rate_limit`.
  53. :param task_name: Type of task.
  54. :param rate_limit: New rate limit.
  55. """
  56. try:
  57. timeutils.rate(rate_limit)
  58. except ValueError, exc:
  59. return {"error": "Invalid rate limit string: %s" % exc}
  60. try:
  61. tasks[task_name].rate_limit = rate_limit
  62. except KeyError:
  63. panel.logger.error("Rate limit attempt for unknown task %s" % (
  64. task_name, ))
  65. return {"error": "unknown task"}
  66. if conf.DISABLE_RATE_LIMITS:
  67. panel.logger.error("Rate limit attempt, but rate limits disabled.")
  68. return {"error": "rate limits disabled"}
  69. panel.listener.ready_queue.refresh()
  70. if not rate_limit:
  71. panel.logger.warn("Disabled rate limits for tasks of type %s" % (
  72. task_name, ))
  73. return {"ok": "rate limit disabled successfully"}
  74. panel.logger.warn("New rate limit for tasks of type %s: %s." % (
  75. task_name, rate_limit))
  76. return {"ok": "new rate limit set successfully"}
  77. @Panel.register
  78. def dump_schedule(panel, safe=False, **kwargs):
  79. schedule = panel.listener.eta_schedule.schedule
  80. if not schedule.queue:
  81. panel.logger.info("--Empty schedule--")
  82. return []
  83. formatitem = lambda (i, item): "%s. %s pri%s %r" % (i,
  84. datetime.fromtimestamp(item["eta"]),
  85. item["priority"],
  86. item["item"])
  87. info = map(formatitem, enumerate(schedule.info()))
  88. panel.logger.info("* Dump of current schedule:\n%s" % (
  89. "\n".join(info, )))
  90. scheduled_tasks = []
  91. for item in schedule.info():
  92. scheduled_tasks.append({"eta": item["eta"],
  93. "priority": item["priority"],
  94. "request": item["item"].info(safe=safe)})
  95. return scheduled_tasks
  96. @Panel.register
  97. def dump_reserved(panel, safe=False, **kwargs):
  98. ready_queue = panel.listener.ready_queue
  99. reserved = ready_queue.items
  100. if not reserved:
  101. panel.logger.info("--Empty queue--")
  102. return []
  103. panel.logger.info("* Dump of currently reserved tasks:\n%s" % (
  104. "\n".join(map(repr, reserved), )))
  105. return [request.info(safe=safe)
  106. for request in reserved]
  107. @Panel.register
  108. def dump_active(panel, safe=False, **kwargs):
  109. return [request.info(safe=safe)
  110. for request in state.active_requests]
  111. @Panel.register
  112. def stats(panel, **kwargs):
  113. return {"total": state.total_count,
  114. "pool": panel.listener.pool.info}
  115. @Panel.register
  116. def dump_revoked(panel, **kwargs):
  117. return list(state.revoked)
  118. @Panel.register
  119. def dump_tasks(panel, **kwargs):
  120. def _extract_info(task):
  121. fields = dict((field, str(getattr(task, field, None)))
  122. for field in TASK_INFO_FIELDS
  123. if getattr(task, field, None) is not None)
  124. info = map("=".join, fields.items())
  125. if not info:
  126. return task.name
  127. return "%s [%s]" % (task.name, " ".join(info))
  128. info = map(_extract_info, (tasks[task]
  129. for task in sorted(tasks.keys())))
  130. panel.logger.warn("* Dump of currently registered tasks:\n%s" % (
  131. "\n".join(info)))
  132. return info
  133. @Panel.register
  134. def ping(panel, **kwargs):
  135. return "pong"
  136. @Panel.register
  137. def shutdown(panel, **kwargs):
  138. panel.logger.critical("Got shutdown from remote.")
  139. raise SystemExit("Got shutdown from remote")