builtins.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161
  1. from datetime import datetime
  2. from celery import conf
  3. from celery.backends import default_backend
  4. from celery.registry import tasks
  5. from celery.utils import timeutils
  6. from celery.worker import state
  7. from celery.worker.revoke import revoked
  8. from celery.worker.control.registry import Panel
  9. TASK_INFO_FIELDS = ("exchange", "routing_key", "rate_limit")
  10. @Panel.register
  11. def revoke(panel, task_id, task_name=None, **kwargs):
  12. """Revoke task by task id."""
  13. revoked.add(task_id)
  14. backend = default_backend
  15. if task_name: # Use custom task backend (if any)
  16. try:
  17. backend = tasks[task_name].backend
  18. except KeyError:
  19. pass
  20. backend.mark_as_revoked(task_id)
  21. panel.logger.warn("Task %s revoked" % (task_id, ))
  22. return {"ok": "task %s revoked" % (task_id, )}
  23. @Panel.register
  24. def enable_events(panel):
  25. dispatcher = panel.listener.event_dispatcher
  26. if not dispatcher.enabled:
  27. dispatcher.enable()
  28. dispatcher.send("worker-online")
  29. panel.logger.warn("Events enabled by remote.")
  30. return {"ok": "events enabled"}
  31. return {"ok": "events already enabled"}
  32. @Panel.register
  33. def disable_events(panel):
  34. dispatcher = panel.listener.event_dispatcher
  35. if dispatcher.enabled:
  36. dispatcher.send("worker-offline")
  37. dispatcher.disable()
  38. panel.logger.warn("Events disabled by remote.")
  39. return {"ok": "events disabled"}
  40. return {"ok": "events already disabled"}
  41. @Panel.register
  42. def rate_limit(panel, task_name, rate_limit, **kwargs):
  43. """Set new rate limit for a task type.
  44. See :attr:`celery.task.base.Task.rate_limit`.
  45. :param task_name: Type of task.
  46. :param rate_limit: New rate limit.
  47. """
  48. try:
  49. timeutils.rate(rate_limit)
  50. except ValueError, exc:
  51. return {"error": "Invalid rate limit string: %s" % exc}
  52. try:
  53. tasks[task_name].rate_limit = rate_limit
  54. except KeyError:
  55. panel.logger.error("Rate limit attempt for unknown task %s" % (
  56. task_name, ))
  57. return {"error": "unknown task"}
  58. if conf.DISABLE_RATE_LIMITS:
  59. panel.logger.error("Rate limit attempt, but rate limits disabled.")
  60. return {"error": "rate limits disabled"}
  61. panel.listener.ready_queue.refresh()
  62. if not rate_limit:
  63. panel.logger.warn("Disabled rate limits for tasks of type %s" % (
  64. task_name, ))
  65. return {"ok": "rate limit disabled successfully"}
  66. panel.logger.warn("New rate limit for tasks of type %s: %s." % (
  67. task_name, rate_limit))
  68. return {"ok": "new rate limit set successfully"}
  69. @Panel.register
  70. def dump_schedule(panel, **kwargs):
  71. schedule = panel.listener.eta_schedule
  72. if not schedule.queue:
  73. panel.logger.info("--Empty schedule--")
  74. return []
  75. formatitem = lambda (i, item): "%s. %s pri%s %r" % (i,
  76. datetime.fromtimestamp(item["eta"]),
  77. item["priority"],
  78. item["item"])
  79. info = map(formatitem, enumerate(schedule.info()))
  80. panel.logger.info("* Dump of current schedule:\n%s" % (
  81. "\n".join(info, )))
  82. return info
  83. @Panel.register
  84. def dump_reserved(panel, **kwargs):
  85. ready_queue = panel.listener.ready_queue
  86. reserved = ready_queue.items
  87. if not reserved:
  88. panel.logger.info("--Empty queue--")
  89. return []
  90. info = map(repr, reserved)
  91. panel.logger.info("* Dump of currently reserved tasks:\n%s" % (
  92. "\n".join(info, )))
  93. return info
  94. @Panel.register
  95. def dump_active(panel, **kwargs):
  96. from celery.worker.state import active
  97. return active
  98. @Panel.register
  99. def stats(panel, **kwargs):
  100. return {"active": state.active,
  101. "total": state.total,
  102. "pool": panel.listener.pool.info}
  103. @Panel.register
  104. def dump_tasks(panel, **kwargs):
  105. def _extract_info(task):
  106. fields = dict((field, str(getattr(task, field, None)))
  107. for field in TASK_INFO_FIELDS
  108. if getattr(task, field, None) is not None)
  109. info = map("=".join, fields.items())
  110. if not info:
  111. return task.name
  112. return "%s [%s]" % (task.name, " ".join(info))
  113. info = map(_extract_info, (tasks[task]
  114. for task in sorted(tasks.keys())))
  115. panel.logger.warn("* Dump of currently registered tasks:\n%s" % (
  116. "\n".join(info)))
  117. return info
  118. @Panel.register
  119. def ping(panel, **kwargs):
  120. return "pong"
  121. @Panel.register
  122. def shutdown(panel, **kwargs):
  123. panel.logger.critical("Got shutdown from remote.")
  124. raise SystemExit