builtins.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193
  1. from datetime import datetime
  2. from celery import conf
  3. from celery import log
  4. from celery.backends import default_backend
  5. from celery.registry import tasks
  6. from celery.utils import timeutils
  7. from celery.worker import state
  8. from celery.worker.state import revoked
  9. from celery.worker.control.registry import Panel
  10. TASK_INFO_FIELDS = ("exchange", "routing_key", "rate_limit")
  11. @Panel.register
  12. def diagnose(panel, timeout=None, **kwargs):
  13. info = panel.listener.pool.diagnose(timeout=timeout)
  14. if info["waiting"]:
  15. panel.logger.error("Diagnose failed: %r" % (info, ))
  16. return {"error": info}
  17. panel.logger.info("Diagnose complete: %r" % (info, ))
  18. return {"ok": info}
  19. @Panel.register
  20. def revoke(panel, task_id, task_name=None, **kwargs):
  21. """Revoke task by task id."""
  22. revoked.add(task_id)
  23. backend = default_backend
  24. if task_name: # use custom task backend (if any)
  25. try:
  26. backend = tasks[task_name].backend
  27. except KeyError:
  28. pass
  29. backend.mark_as_revoked(task_id)
  30. panel.logger.warn("Task %s revoked" % (task_id, ))
  31. return {"ok": "task %s revoked" % (task_id, )}
  32. @Panel.register
  33. def enable_events(panel):
  34. dispatcher = panel.listener.event_dispatcher
  35. if not dispatcher.enabled:
  36. dispatcher.enable()
  37. dispatcher.send("worker-online")
  38. panel.logger.warn("Events enabled by remote.")
  39. return {"ok": "events enabled"}
  40. return {"ok": "events already enabled"}
  41. @Panel.register
  42. def disable_events(panel):
  43. dispatcher = panel.listener.event_dispatcher
  44. if dispatcher.enabled:
  45. dispatcher.send("worker-offline")
  46. dispatcher.disable()
  47. panel.logger.warn("Events disabled by remote.")
  48. return {"ok": "events disabled"}
  49. return {"ok": "events already disabled"}
  50. @Panel.register
  51. def set_loglevel(panel, loglevel=None):
  52. if loglevel is not None:
  53. if not isinstance(loglevel, int):
  54. loglevel = conf.LOG_LEVELS[loglevel.upper()]
  55. log.get_default_logger(loglevel=loglevel)
  56. return {"ok": loglevel}
  57. @Panel.register
  58. def rate_limit(panel, task_name, rate_limit, **kwargs):
  59. """Set new rate limit for a task type.
  60. See :attr:`celery.task.base.Task.rate_limit`.
  61. :param task_name: Type of task.
  62. :param rate_limit: New rate limit.
  63. """
  64. try:
  65. timeutils.rate(rate_limit)
  66. except ValueError, exc:
  67. return {"error": "Invalid rate limit string: %s" % exc}
  68. try:
  69. tasks[task_name].rate_limit = rate_limit
  70. except KeyError:
  71. panel.logger.error("Rate limit attempt for unknown task %s" % (
  72. task_name, ))
  73. return {"error": "unknown task"}
  74. if conf.DISABLE_RATE_LIMITS:
  75. panel.logger.error("Rate limit attempt, but rate limits disabled.")
  76. return {"error": "rate limits disabled"}
  77. panel.listener.ready_queue.refresh()
  78. if not rate_limit:
  79. panel.logger.warn("Disabled rate limits for tasks of type %s" % (
  80. task_name, ))
  81. return {"ok": "rate limit disabled successfully"}
  82. panel.logger.warn("New rate limit for tasks of type %s: %s." % (
  83. task_name, rate_limit))
  84. return {"ok": "new rate limit set successfully"}
  85. @Panel.register
  86. def dump_schedule(panel, safe=False, **kwargs):
  87. schedule = panel.listener.eta_schedule.schedule
  88. if not schedule.queue:
  89. panel.logger.info("--Empty schedule--")
  90. return []
  91. formatitem = lambda (i, item): "%s. %s pri%s %r" % (i,
  92. datetime.fromtimestamp(item["eta"]),
  93. item["priority"],
  94. item["item"])
  95. info = map(formatitem, enumerate(schedule.info()))
  96. panel.logger.info("* Dump of current schedule:\n%s" % (
  97. "\n".join(info, )))
  98. scheduled_tasks = []
  99. for item in schedule.info():
  100. scheduled_tasks.append({"eta": item["eta"],
  101. "priority": item["priority"],
  102. "request": item["item"].info(safe=safe)})
  103. return scheduled_tasks
  104. @Panel.register
  105. def dump_reserved(panel, safe=False, **kwargs):
  106. ready_queue = panel.listener.ready_queue
  107. reserved = ready_queue.items
  108. if not reserved:
  109. panel.logger.info("--Empty queue--")
  110. return []
  111. panel.logger.info("* Dump of currently reserved tasks:\n%s" % (
  112. "\n".join(map(repr, reserved), )))
  113. return [request.info(safe=safe)
  114. for request in reserved]
  115. @Panel.register
  116. def dump_active(panel, safe=False, **kwargs):
  117. return [request.info(safe=safe)
  118. for request in state.active_requests]
  119. @Panel.register
  120. def stats(panel, **kwargs):
  121. return {"total": state.total_count,
  122. "listener": panel.listener.info,
  123. "pool": panel.listener.pool.info}
  124. @Panel.register
  125. def dump_revoked(panel, **kwargs):
  126. return list(state.revoked)
  127. @Panel.register
  128. def dump_tasks(panel, **kwargs):
  129. def _extract_info(task):
  130. fields = dict((field, str(getattr(task, field, None)))
  131. for field in TASK_INFO_FIELDS
  132. if getattr(task, field, None) is not None)
  133. info = map("=".join, fields.items())
  134. if not info:
  135. return task.name
  136. return "%s [%s]" % (task.name, " ".join(info))
  137. info = map(_extract_info, (tasks[task]
  138. for task in sorted(tasks.keys())))
  139. panel.logger.warn("* Dump of currently registered tasks:\n%s" % (
  140. "\n".join(info)))
  141. return info
  142. @Panel.register
  143. def ping(panel, **kwargs):
  144. return "pong"
  145. @Panel.register
  146. def shutdown(panel, **kwargs):
  147. panel.logger.critical("Got shutdown from remote.")
  148. raise SystemExit("Got shutdown from remote")