celeryd.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295
  1. #!/usr/bin/env python
  2. """celeryd
  3. .. program:: celeryd
  4. .. cmdoption:: -c, --concurrency
  5. Number of child processes processing the queue. The default
  6. is the number of CPUs available on your system.
  7. .. cmdoption:: -f, --logfile
  8. Path to log file. If no logfile is specified, ``stderr`` is used.
  9. .. cmdoption:: -l, --loglevel
  10. Logging level, choose between ``DEBUG``, ``INFO``, ``WARNING``,
  11. ``ERROR``, ``CRITICAL``, or ``FATAL``.
  12. .. cmdoption:: -n, --hostname
  13. Set custom hostname.
  14. .. cmdoption:: -B, --beat
  15. Also run the ``celerybeat`` periodic task scheduler. Please note that
  16. there must only be one instance of this service.
  17. .. cmdoption:: -s, --schedule
  18. Path to the schedule database if running with the ``-B`` option.
  19. Defaults to ``celerybeat-schedule``. The extension ".db" will be
  20. appended to the filename.
  21. .. cmdoption:: -E, --events
  22. Send events that can be captured by monitors like ``celerymon``.
  23. .. cmdoption:: --discard
  24. Discard all waiting tasks before the daemon is started.
  25. **WARNING**: This is unrecoverable, and the tasks will be
  26. deleted from the messaging server.
  27. """
  28. import os
  29. import sys
  30. import socket
  31. import logging
  32. import optparse
  33. import warnings
  34. import traceback
  35. import multiprocessing
  36. import celery
  37. from celery import conf
  38. from celery import signals
  39. from celery import platform
  40. from celery.log import emergency_error
  41. from celery.task import discard_all
  42. from celery.utils import info
  43. from celery.utils import get_full_cls_name
  44. from celery.worker import WorkController
  45. STARTUP_INFO_FMT = """
  46. Configuration ->
  47. . broker -> %(conninfo)s
  48. . queues ->
  49. %(queues)s
  50. . concurrency -> %(concurrency)s
  51. . loader -> %(loader)s
  52. . logfile -> %(logfile)s@%(loglevel)s
  53. . events -> %(events)s
  54. . beat -> %(celerybeat)s
  55. %(tasks)s
  56. """.strip()
  57. TASK_LIST_FMT = """ . tasks ->\n%s"""
  58. OPTION_LIST = (
  59. optparse.make_option('-c', '--concurrency',
  60. default=conf.CELERYD_CONCURRENCY,
  61. action="store", dest="concurrency", type="int",
  62. help="Number of child processes processing the queue."),
  63. optparse.make_option('--discard', default=False,
  64. action="store_true", dest="discard",
  65. help="Discard all waiting tasks before the server is started. "
  66. "WARNING: This is unrecoverable, and the tasks will be "
  67. "deleted from the messaging server."),
  68. optparse.make_option('-f', '--logfile', default=conf.CELERYD_LOG_FILE,
  69. action="store", dest="logfile",
  70. help="Path to log file."),
  71. optparse.make_option('-l', '--loglevel', default=conf.CELERYD_LOG_LEVEL,
  72. action="store", dest="loglevel",
  73. help="Choose between DEBUG/INFO/WARNING/ERROR/CRITICAL/FATAL."),
  74. optparse.make_option('-n', '--hostname', default=None,
  75. action="store", dest="hostname",
  76. help="Set custom host name. E.g. 'foo.example.com'."),
  77. optparse.make_option('-B', '--beat', default=False,
  78. action="store_true", dest="run_clockservice",
  79. help="Also run the celerybeat periodic task scheduler. \
  80. Please note that only one instance must be running."),
  81. optparse.make_option('-s', '--schedule',
  82. default=conf.CELERYBEAT_SCHEDULE_FILENAME,
  83. action="store", dest="schedule",
  84. help="Path to the schedule database if running with the -B \
  85. option. The extension '.db' will be appended to the \
  86. filename. Default: %s" % (
  87. conf.CELERYBEAT_SCHEDULE_FILENAME)),
  88. optparse.make_option('-E', '--events', default=conf.SEND_EVENTS,
  89. action="store_true", dest="events",
  90. help="Send events so celery can be monitored by e.g. celerymon."),
  91. )
  92. class Worker(object):
  93. def __init__(self, concurrency=conf.CELERYD_CONCURRENCY,
  94. loglevel=conf.CELERYD_LOG_LEVEL, logfile=conf.CELERYD_LOG_FILE,
  95. hostname=None, discard=False, run_clockservice=False,
  96. schedule=conf.CELERYBEAT_SCHEDULE_FILENAME,
  97. events=False, **kwargs):
  98. self.concurrency = concurrency or multiprocessing.cpu_count()
  99. self.loglevel = loglevel
  100. self.logfile = logfile
  101. self.hostname = hostname or socket.gethostname()
  102. self.discard = discard
  103. self.run_clockservice = run_clockservice
  104. self.schedule = schedule
  105. self.events = events
  106. if not isinstance(self.loglevel, int):
  107. self.loglevel = conf.LOG_LEVELS[self.loglevel.upper()]
  108. def run(self):
  109. print("celery@%s v%s is starting." % (self.hostname,
  110. celery.__version__))
  111. self.init_loader()
  112. if conf.RESULT_BACKEND == "database" \
  113. and self.settings.DATABASE_ENGINE == "sqlite3" and \
  114. self.concurrency > 1:
  115. warnings.warn("The sqlite3 database engine doesn't handle "
  116. "concurrency well. Will use a single process only.",
  117. UserWarning)
  118. self.concurrency = 1
  119. if getattr(self.settings, "DEBUG", False):
  120. warnings.warn("Using settings.DEBUG leads to a memory leak, "
  121. "never use this setting in a production environment!")
  122. if self.discard:
  123. self.purge_messages()
  124. self.worker_init()
  125. # Dump configuration to screen so we have some basic information
  126. # for when users sends bug reports.
  127. print(self.startup_info())
  128. set_process_status("Running...")
  129. self.run_worker()
  130. def on_listener_ready(self, listener):
  131. signals.worker_ready.send(sender=listener)
  132. print("celery@%s has started." % self.hostname)
  133. def init_loader(self):
  134. from celery.loaders import current_loader, load_settings
  135. self.loader = current_loader()
  136. self.settings = load_settings()
  137. def purge_messages(self):
  138. discarded_count = discard_all()
  139. what = discarded_count > 1 and "messages" or "message"
  140. print("discard: Erased %d %s from the queue.\n" % (
  141. discarded_count, what))
  142. def worker_init(self):
  143. # Run the worker init handler.
  144. # (Usually imports task modules and such.)
  145. self.loader.init_worker()
  146. def tasklist(self, include_builtins=True):
  147. from celery.registry import tasks
  148. tasklist = tasks.keys()
  149. if not include_builtins:
  150. tasklist = filter(lambda s: not s.startswith("celery."),
  151. tasklist)
  152. return TASK_LIST_FMT % "\n".join("\t. %s" % task
  153. for task in sorted(tasklist))
  154. def startup_info(self):
  155. tasklist = ""
  156. if self.loglevel <= logging.INFO:
  157. include_builtins = self.loglevel <= logging.DEBUG
  158. tasklist = self.tasklist(include_builtins=include_builtins)
  159. return STARTUP_INFO_FMT % {
  160. "conninfo": info.format_broker_info(),
  161. "queues": info.format_routing_table(indent=8),
  162. "concurrency": self.concurrency,
  163. "loglevel": conf.LOG_LEVELS[self.loglevel],
  164. "logfile": self.logfile or "[stderr]",
  165. "celerybeat": self.run_clockservice and "ON" or "OFF",
  166. "events": self.events and "ON" or "OFF",
  167. "tasks": tasklist,
  168. "loader": get_full_cls_name(self.loader.__class__),
  169. }
  170. def run_worker(self):
  171. worker = WorkController(concurrency=self.concurrency,
  172. loglevel=self.loglevel,
  173. logfile=self.logfile,
  174. hostname=self.hostname,
  175. ready_callback=self.on_listener_ready,
  176. embed_clockservice=self.run_clockservice,
  177. schedule_filename=self.schedule,
  178. send_events=self.events)
  179. # Install signal handler so SIGHUP restarts the worker.
  180. install_worker_restart_handler(worker)
  181. install_worker_term_handler(worker)
  182. install_worker_int_handler(worker)
  183. signals.worker_init.send(sender=worker)
  184. try:
  185. worker.start()
  186. except Exception, exc:
  187. emergency_error(self.logfile,
  188. "celeryd raised exception %s: %s\n%s" % (
  189. exc.__class__, exc, traceback.format_exc()))
  190. def install_worker_int_handler(worker):
  191. def _stop(signum, frame):
  192. process_name = multiprocessing.current_process().name
  193. if process_name == "MainProcess":
  194. worker.logger.warn("celeryd: Cold shutdown (%s)" % (
  195. process_name))
  196. worker.terminate()
  197. raise SystemExit()
  198. platform.install_signal_handler("SIGINT", _stop)
  199. def install_worker_term_handler(worker):
  200. def _stop(signum, frame):
  201. process_name = multiprocessing.current_process().name
  202. if process_name == "MainProcess":
  203. worker.logger.warn("celeryd: Warm shutdown (%s)" % (
  204. process_name))
  205. worker.stop()
  206. raise SystemExit()
  207. platform.install_signal_handler("SIGTERM", _stop)
  208. def install_worker_restart_handler(worker):
  209. def restart_worker_sig_handler(signum, frame):
  210. """Signal handler restarting the current python program."""
  211. worker.logger.warn("Restarting celeryd (%s)" % (
  212. " ".join(sys.argv)))
  213. worker.stop()
  214. os.execv(sys.executable, [sys.executable] + sys.argv)
  215. platform.install_signal_handler("SIGHUP", restart_worker_sig_handler)
  216. def parse_options(arguments):
  217. """Parse the available options to ``celeryd``."""
  218. parser = optparse.OptionParser(option_list=OPTION_LIST)
  219. options, values = parser.parse_args(arguments)
  220. return options
  221. def set_process_status(info):
  222. arg_start = "manage" in sys.argv[0] and 2 or 1
  223. if sys.argv[arg_start:]:
  224. info = "%s (%s)" % (info, " ".join(sys.argv[arg_start:]))
  225. platform.set_mp_process_title("celeryd", info=info)
  226. def run_worker(**options):
  227. return Worker(**options).run()
  228. def main():
  229. options = parse_options(sys.argv[1:])
  230. return run_worker(**vars(options))
  231. if __name__ == "__main__":
  232. main()