celeryd.py 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273
  1. #!/usr/bin/env python
  2. """celeryd
  3. .. program:: celeryd
  4. .. cmdoption:: -c, --concurrency
  5. Number of child processes processing the queue. The default
  6. is the number of CPUs available on your system.
  7. .. cmdoption:: -f, --logfile
  8. Path to log file. If no logfile is specified, ``stderr`` is used.
  9. .. cmdoption:: -l, --loglevel
  10. Logging level, choose between ``DEBUG``, ``INFO``, ``WARNING``,
  11. ``ERROR``, ``CRITICAL``, or ``FATAL``.
  12. .. cmdoption:: -n, --hostname
  13. Set custom hostname.
  14. .. cmdoption:: -B, --beat
  15. Also run the ``celerybeat`` periodic task scheduler. Please note that
  16. there must only be one instance of this service.
  17. .. cmdoption:: -s, --schedule
  18. Path to the schedule database if running with the ``-B`` option.
  19. Defaults to ``celerybeat-schedule``. The extension ".db" will be
  20. appended to the filename.
  21. .. cmdoption:: -E, --events
  22. Send events that can be captured by monitors like ``celerymon``.
  23. .. cmdoption:: --discard
  24. Discard all waiting tasks before the daemon is started.
  25. **WARNING**: This is unrecoverable, and the tasks will be
  26. deleted from the messaging server.
  27. """
  28. import os
  29. import sys
  30. import socket
  31. import logging
  32. import optparse
  33. import traceback
  34. import multiprocessing
  35. import celery
  36. from celery import conf
  37. from celery import signals
  38. from celery import platform
  39. from celery.log import emergency_error
  40. from celery.task import discard_all
  41. from celery.utils import info
  42. from celery.utils import get_full_cls_name
  43. from celery.worker import WorkController
  44. STARTUP_INFO_FMT = """
  45. Configuration ->
  46. . broker -> %(conninfo)s
  47. . queues ->
  48. %(queues)s
  49. . concurrency -> %(concurrency)s
  50. . loader -> %(loader)s
  51. . logfile -> %(logfile)s@%(loglevel)s
  52. . events -> %(events)s
  53. . beat -> %(celerybeat)s
  54. %(tasks)s
  55. """.strip()
  56. TASK_LIST_FMT = """ . tasks ->\n%s"""
  57. OPTION_LIST = (
  58. optparse.make_option('-c', '--concurrency',
  59. default=conf.CELERYD_CONCURRENCY,
  60. action="store", dest="concurrency", type="int",
  61. help="Number of child processes processing the queue."),
  62. optparse.make_option('--discard', default=False,
  63. action="store_true", dest="discard",
  64. help="Discard all waiting tasks before the server is started. "
  65. "WARNING: This is unrecoverable, and the tasks will be "
  66. "deleted from the messaging server."),
  67. optparse.make_option('-f', '--logfile', default=conf.CELERYD_LOG_FILE,
  68. action="store", dest="logfile",
  69. help="Path to log file."),
  70. optparse.make_option('-l', '--loglevel', default=conf.CELERYD_LOG_LEVEL,
  71. action="store", dest="loglevel",
  72. help="Choose between DEBUG/INFO/WARNING/ERROR/CRITICAL/FATAL."),
  73. optparse.make_option('-n', '--hostname', default=None,
  74. action="store", dest="hostname",
  75. help="Set custom host name. E.g. 'foo.example.com'."),
  76. optparse.make_option('-B', '--beat', default=False,
  77. action="store_true", dest="run_clockservice",
  78. help="Also run the celerybeat periodic task scheduler. \
  79. Please note that only one instance must be running."),
  80. optparse.make_option('-s', '--schedule',
  81. default=conf.CELERYBEAT_SCHEDULE_FILENAME,
  82. action="store", dest="schedule",
  83. help="Path to the schedule database if running with the -B \
  84. option. The extension '.db' will be appended to the \
  85. filename. Default: %s" % (
  86. conf.CELERYBEAT_SCHEDULE_FILENAME)),
  87. optparse.make_option('-E', '--events', default=conf.SEND_EVENTS,
  88. action="store_true", dest="events",
  89. help="Send events so celery can be monitored by e.g. celerymon."),
  90. )
  91. class Worker(object):
  92. def __init__(self, concurrency=conf.CELERYD_CONCURRENCY,
  93. loglevel=conf.CELERYD_LOG_LEVEL, logfile=conf.CELERYD_LOG_FILE,
  94. hostname=None, discard=False, run_clockservice=False,
  95. schedule=conf.CELERYBEAT_SCHEDULE_FILENAME,
  96. events=False, **kwargs):
  97. self.concurrency = concurrency or multiprocessing.cpu_count()
  98. self.loglevel = loglevel
  99. self.logfile = logfile
  100. self.hostname = hostname or socket.gethostname()
  101. self.discard = discard
  102. self.run_clockservice = run_clockservice
  103. self.schedule = schedule
  104. self.events = events
  105. if not isinstance(self.loglevel, int):
  106. self.loglevel = conf.LOG_LEVELS[self.loglevel.upper()]
  107. def run(self):
  108. print("celery@%s v%s is starting." % (self.hostname,
  109. celery.__version__))
  110. self.init_loader()
  111. if conf.CELERY_BACKEND == "database" \
  112. and self.settings.DATABASE_ENGINE == "sqlite3" and \
  113. self.concurrency > 1:
  114. import warnings
  115. warnings.warn("The sqlite3 database engine doesn't handle "
  116. "concurrency well. Will use a single process only.",
  117. UserWarning)
  118. self.concurrency = 1
  119. if self.discard:
  120. self.purge_messages()
  121. self.worker_init()
  122. # Dump configuration to screen so we have some basic information
  123. # for when users sends bug reports.
  124. print(self.startup_info())
  125. set_process_status("Running...")
  126. self.run_worker()
  127. def on_listener_ready(self, listener):
  128. signals.worker_ready.send(sender=listener)
  129. print("celery@%s has started." % self.hostname)
  130. def init_loader(self):
  131. from celery.loaders import current_loader, load_settings
  132. self.loader = current_loader()
  133. self.settings = load_settings()
  134. def purge_messages(self):
  135. discarded_count = discard_all()
  136. what = discarded_count > 1 and "messages" or "message"
  137. print("discard: Erased %d %s from the queue.\n" % (
  138. discarded_count, what))
  139. def worker_init(self):
  140. # Run the worker init handler.
  141. # (Usually imports task modules and such.)
  142. self.loader.on_worker_init()
  143. def tasklist(self, include_builtins=True):
  144. from celery.registry import tasks
  145. tasklist = tasks.keys()
  146. if not include_builtins:
  147. tasklist = filter(lambda s: not s.startswith("celery."),
  148. tasklist)
  149. return TASK_LIST_FMT % "\n".join("\t. %s" % task
  150. for task in sorted(tasklist))
  151. def startup_info(self):
  152. tasklist = ""
  153. if self.loglevel <= logging.INFO:
  154. include_builtins = self.loglevel <= logging.DEBUG
  155. tasklist = self.tasklist(include_builtins=include_builtins)
  156. return STARTUP_INFO_FMT % {
  157. "conninfo": info.format_broker_info(),
  158. "queues": info.format_routing_table(indent=8),
  159. "concurrency": self.concurrency,
  160. "loglevel": conf.LOG_LEVELS[self.loglevel],
  161. "logfile": self.logfile or "[stderr]",
  162. "celerybeat": self.run_clockservice and "ON" or "OFF",
  163. "events": self.events and "ON" or "OFF",
  164. "tasks": tasklist,
  165. "loader": get_full_cls_name(self.loader.__class__),
  166. }
  167. def run_worker(self):
  168. worker = WorkController(concurrency=self.concurrency,
  169. loglevel=self.loglevel,
  170. logfile=self.logfile,
  171. hostname=self.hostname,
  172. ready_callback=self.on_listener_ready,
  173. embed_clockservice=self.run_clockservice,
  174. schedule_filename=self.schedule,
  175. send_events=self.events)
  176. # Install signal handler so SIGHUP restarts the worker.
  177. install_worker_restart_handler(worker)
  178. install_worker_term_handler(worker)
  179. signals.worker_init.send(sender=worker)
  180. try:
  181. worker.start()
  182. except Exception, exc:
  183. emergency_error(self.logfile,
  184. "celeryd raised exception %s: %s\n%s" % (
  185. exc.__class__, exc, traceback.format_exc()))
  186. def install_worker_term_handler(worker):
  187. def _stop(signum, frame):
  188. worker.stop()
  189. raise SystemExit()
  190. platform.install_signal_handler("SIGTERM", _stop)
  191. def install_worker_restart_handler(worker):
  192. def restart_worker_sig_handler(signum, frame):
  193. """Signal handler restarting the current python program."""
  194. worker.logger.warn("Restarting celeryd (%s)" % (
  195. " ".join(sys.argv)))
  196. worker.stop()
  197. os.execv(sys.executable, [sys.executable] + sys.argv)
  198. platform.install_signal_handler("SIGHUP", restart_worker_sig_handler)
  199. def parse_options(arguments):
  200. """Parse the available options to ``celeryd``."""
  201. parser = optparse.OptionParser(option_list=OPTION_LIST)
  202. options, values = parser.parse_args(arguments)
  203. return options
  204. def set_process_status(info):
  205. arg_start = "manage" in sys.argv[0] and 2 or 1
  206. if sys.argv[arg_start:]:
  207. info = "%s (%s)" % (info, " ".join(sys.argv[arg_start:]))
  208. platform.set_mp_process_title("celeryd", info=info)
  209. def run_worker(**options):
  210. return Worker(**options).run()
  211. def main():
  212. options = parse_options(sys.argv[1:])
  213. return run_worker(**vars(options))
  214. if __name__ == "__main__":
  215. main()