celeryd.py 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292
  1. #!/usr/bin/env python
  2. """celeryd
  3. .. program:: celeryd
  4. .. cmdoption:: -c, --concurrency
  5. Number of child processes processing the queue. The default
  6. is the number of CPUs available on your system.
  7. .. cmdoption:: -f, --logfile
  8. Path to log file. If no logfile is specified, ``stderr`` is used.
  9. .. cmdoption:: -l, --loglevel
  10. Logging level, choose between ``DEBUG``, ``INFO``, ``WARNING``,
  11. ``ERROR``, ``CRITICAL``, or ``FATAL``.
  12. .. cmdoption:: -p, --pidfile
  13. Path to pidfile.
  14. .. cmdoption:: -s, --statistics
  15. Turn on reporting of statistics (remember to flush the statistics message
  16. queue from time to time).
  17. .. cmdoption:: -d, --detach, --daemon
  18. Run in the background as a daemon.
  19. .. cmdoption:: -S, --supervised
  20. Restart the worker server if it dies.
  21. .. cmdoption:: --discard
  22. Discard all waiting tasks before the daemon is started.
  23. **WARNING**: This is unrecoverable, and the tasks will be
  24. deleted from the messaging server.
  25. .. cmdoption:: -u, --uid
  26. User-id to run ``celeryd`` as when in daemon mode.
  27. .. cmdoption:: -g, --gid
  28. Group-id to run ``celeryd`` as when in daemon mode.
  29. .. cmdoption:: --umask
  30. umask of the process when in daemon mode.
  31. .. cmdoption:: --workdir
  32. Directory to change to when in daemon mode.
  33. .. cmdoption:: --chroot
  34. Change root directory to this path when in daemon mode.
  35. """
  36. import os
  37. import sys
  38. import warnings
  39. from carrot.connection import DjangoBrokerConnection
  40. from celery.loaders import current_loader
  41. from celery.loaders import settings
  42. from celery import __version__
  43. from celery.supervisor import OFASupervisor
  44. from celery.log import emergency_error
  45. from celery.conf import LOG_LEVELS, DAEMON_LOG_FILE, DAEMON_LOG_LEVEL
  46. from celery.conf import DAEMON_CONCURRENCY, DAEMON_PID_FILE
  47. from celery import conf
  48. from celery.task import discard_all
  49. from celery.worker import WorkController
  50. from celery import platform
  51. import multiprocessing
  52. import traceback
  53. import optparse
  54. USE_STATISTICS = getattr(settings, "CELERY_STATISTICS", False)
  55. # Make sure the setting exists.
  56. settings.CELERY_STATISTICS = USE_STATISTICS
  57. STARTUP_INFO_FMT = """
  58. Configuration ->
  59. * Broker -> %(carrot_backend)s://%(vhost)s@%(host)s:%(port)s
  60. * Exchange -> %(exchange)s (%(exchange_type)s)
  61. * Consumer -> Queue:%(consumer_queue)s Routing:%(consumer_rkey)s
  62. * Concurrency -> %(concurrency)s
  63. * Statistics -> %(statistics)s
  64. """.strip()
  65. OPTION_LIST = (
  66. optparse.make_option('-c', '--concurrency', default=DAEMON_CONCURRENCY,
  67. action="store", dest="concurrency", type="int",
  68. help="Number of child processes processing the queue."),
  69. optparse.make_option('--discard', default=False,
  70. action="store_true", dest="discard",
  71. help="Discard all waiting tasks before the server is started. "
  72. "WARNING: This is unrecoverable, and the tasks will be "
  73. "deleted from the messaging server."),
  74. optparse.make_option('-s', '--statistics', default=USE_STATISTICS,
  75. action="store_true", dest="statistics",
  76. help="Collect statistics."),
  77. optparse.make_option('-f', '--logfile', default=DAEMON_LOG_FILE,
  78. action="store", dest="logfile",
  79. help="Path to log file."),
  80. optparse.make_option('-l', '--loglevel', default=DAEMON_LOG_LEVEL,
  81. action="store", dest="loglevel",
  82. help="Choose between DEBUG/INFO/WARNING/ERROR/CRITICAL/FATAL."),
  83. optparse.make_option('-p', '--pidfile', default=DAEMON_PID_FILE,
  84. action="store", dest="pidfile",
  85. help="Path to pidfile."),
  86. optparse.make_option('-d', '--detach', '--daemon', default=False,
  87. action="store_true", dest="detach",
  88. help="Run in the background as a daemon."),
  89. optparse.make_option('-S', '--supervised', default=False,
  90. action="store_true", dest="supervised",
  91. help="Restart the worker server if it dies."),
  92. optparse.make_option('-u', '--uid', default=None,
  93. action="store", dest="uid",
  94. help="User-id to run celeryd as when in daemon mode."),
  95. optparse.make_option('-g', '--gid', default=None,
  96. action="store", dest="gid",
  97. help="Group-id to run celeryd as when in daemon mode."),
  98. optparse.make_option('--umask', default=0,
  99. action="store", type="int", dest="umask",
  100. help="umask of the process when in daemon mode."),
  101. optparse.make_option('--workdir', default=None,
  102. action="store", dest="working_directory",
  103. help="Directory to change to when in daemon mode."),
  104. optparse.make_option('--chroot', default=None,
  105. action="store", dest="chroot",
  106. help="Change root directory to this path when in daemon mode."),
  107. )
  108. def run_worker(concurrency=DAEMON_CONCURRENCY, detach=False,
  109. loglevel=DAEMON_LOG_LEVEL, logfile=DAEMON_LOG_FILE, discard=False,
  110. pidfile=DAEMON_PID_FILE, umask=0, uid=None, gid=None,
  111. supervised=False, working_directory=None, chroot=None,
  112. statistics=None, **kwargs):
  113. """Starts the celery worker server."""
  114. if detach:
  115. warnings.warn("""
  116. WARNING: celery's support for --detach is severely broken!
  117. Please use start-stop-daemon, supervisord or similar
  118. daemonization services instead.
  119. For more information and instructions, please read
  120. http://ask.github.com/celery/cookbook/daemonizing.html
  121. """, UserWarning)
  122. print("Celery %s is starting." % __version__)
  123. # set SIGCLD back to the default SIG_DFL (before python-daemon overrode
  124. # it) lets the parent wait() for the terminated child process and stops
  125. # the 'OSError: [Errno 10] No child processes' problem.
  126. platform.reset_signal("SIGCLD")
  127. if statistics is not None:
  128. settings.CELERY_STATISTICS = statistics
  129. if not concurrency:
  130. concurrency = multiprocessing.cpu_count()
  131. if conf.CELERY_BACKEND == "database" \
  132. and settings.DATABASE_ENGINE == "sqlite3" and \
  133. concurrency > 1:
  134. warnings.warn("The sqlite3 database engine doesn't support "
  135. "concurrency. We'll be using a single process only.",
  136. UserWarning)
  137. concurrency = 1
  138. # Setup logging
  139. if not isinstance(loglevel, int):
  140. loglevel = LOG_LEVELS[loglevel.upper()]
  141. if not detach:
  142. logfile = None # log to stderr when not running in the background.
  143. if discard:
  144. discarded_count = discard_all()
  145. what = discarded_count > 1 and "messages" or "message"
  146. print("discard: Erased %d %s from the queue.\n" % (
  147. discarded_count, what))
  148. # Dump configuration to screen so we have some basic information
  149. # when users sends e-mails.
  150. broker_connection = DjangoBrokerConnection()
  151. carrot_backend = broker_connection.backend_cls
  152. if carrot_backend and not isinstance(carrot_backend, str):
  153. carrot_backend = carrot_backend.__name__
  154. print(STARTUP_INFO_FMT % {
  155. "carrot_backend": carrot_backend or "amqp",
  156. "vhost": broker_connection.virtual_host or "(default)",
  157. "host": broker_connection.hostname or "(default)",
  158. "port": broker_connection.port or "(port)",
  159. "exchange": conf.AMQP_EXCHANGE,
  160. "exchange_type": conf.AMQP_EXCHANGE_TYPE,
  161. "consumer_queue": conf.AMQP_CONSUMER_QUEUE,
  162. "consumer_rkey": conf.AMQP_CONSUMER_ROUTING_KEY,
  163. "publisher_rkey": conf.AMQP_PUBLISHER_ROUTING_KEY,
  164. "concurrency": concurrency,
  165. "loglevel": loglevel,
  166. "pidfile": pidfile,
  167. "statistics": settings.CELERY_STATISTICS and "ON" or "OFF",
  168. })
  169. del(broker_connection)
  170. print("Celery has started.")
  171. if detach:
  172. from celery.log import setup_logger, redirect_stdouts_to_logger
  173. context = platform.create_daemon_context(logfile, pidfile,
  174. chroot_directory=chroot,
  175. working_directory=working_directory,
  176. umask=umask,
  177. uid=uid,
  178. gid=gid)
  179. context.open()
  180. logger = setup_logger(loglevel, logfile)
  181. redirect_stdouts_to_logger(logger, loglevel)
  182. # Run the worker init handler.
  183. # (Usually imports task modules and such.)
  184. current_loader.on_worker_init()
  185. def run_worker():
  186. worker = WorkController(concurrency=concurrency,
  187. loglevel=loglevel,
  188. logfile=logfile,
  189. is_detached=detach)
  190. # Install signal handler that restarts celeryd on SIGHUP,
  191. # (only on POSIX systems)
  192. install_worker_restart_handler(worker)
  193. try:
  194. worker.start()
  195. except Exception, e:
  196. emergency_error(logfile, "celeryd raised exception %s: %s\n%s" % (
  197. e.__class__, e, traceback.format_exc()))
  198. try:
  199. if supervised:
  200. OFASupervisor(target=run_worker).start()
  201. else:
  202. run_worker()
  203. except:
  204. if detach:
  205. context.close()
  206. raise
  207. def install_worker_restart_handler(worker):
  208. def restart_worker_sig_handler(signum, frame):
  209. """Signal handler restarting the current python program."""
  210. worker.logger.info("Restarting celeryd (%s)" % (
  211. " ".join(sys.argv)))
  212. if worker.is_detached:
  213. pid = os.fork()
  214. if pid:
  215. worker.stop()
  216. sys.exit(0)
  217. else:
  218. worker.stop()
  219. os.execv(sys.executable, [sys.executable] + sys.argv)
  220. platform.install_signal_handler("SIGHUP", restart_worker_sig_handler)
  221. def parse_options(arguments):
  222. """Parse the available options to ``celeryd``."""
  223. parser = optparse.OptionParser(option_list=OPTION_LIST)
  224. options, values = parser.parse_args(arguments)
  225. return options
  226. def main():
  227. options = parse_options(sys.argv[1:])
  228. run_worker(**vars(options))
  229. if __name__ == "__main__":
  230. main()