celeryd.py 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269
  1. #!/usr/bin/env python
  2. """celeryd
  3. .. program:: celeryd
  4. .. cmdoption:: -c, --concurrency
  5. Number of child processes processing the queue. The default
  6. is the number of CPUs available on your system.
  7. .. cmdoption:: -f, --logfile
  8. Path to log file. If no logfile is specified, ``stderr`` is used.
  9. .. cmdoption:: -l, --loglevel
  10. Logging level, choose between ``DEBUG``, ``INFO``, ``WARNING``,
  11. ``ERROR``, ``CRITICAL``, or ``FATAL``.
  12. .. cmdoption:: -p, --pidfile
  13. Path to pidfile.
  14. .. cmdoption:: -s, --statistics
  15. Turn on reporting of statistics (remember to flush the statistics message
  16. queue from time to time).
  17. .. cmdoption:: -d, --detach, --daemon
  18. Run in the background as a daemon.
  19. .. cmdoption:: -S, --supervised
  20. Restart the worker server if it dies.
  21. .. cmdoption:: --discard
  22. Discard all waiting tasks before the daemon is started.
  23. **WARNING**: This is unrecoverable, and the tasks will be
  24. deleted from the messaging server.
  25. .. cmdoption:: -u, --uid
  26. User-id to run ``celeryd`` as when in daemon mode.
  27. .. cmdoption:: -g, --gid
  28. Group-id to run ``celeryd`` as when in daemon mode.
  29. .. cmdoption:: --umask
  30. umask of the process when in daemon mode.
  31. .. cmdoption:: --workdir
  32. Directory to change to when in daemon mode.
  33. .. cmdoption:: --chroot
  34. Change root directory to this path when in daemon mode.
  35. """
  36. import os
  37. import sys
  38. from celery.loaders import current_loader
  39. from celery.loaders import settings
  40. from celery import __version__
  41. from celery.supervisor import OFASupervisor
  42. from celery.log import emergency_error
  43. from celery.conf import LOG_LEVELS, DAEMON_LOG_FILE, DAEMON_LOG_LEVEL
  44. from celery.conf import DAEMON_CONCURRENCY, DAEMON_PID_FILE
  45. from celery import conf
  46. from celery.task import discard_all
  47. from celery.worker import WorkController
  48. from celery import platform
  49. import multiprocessing
  50. import traceback
  51. import optparse
  52. USE_STATISTICS = getattr(settings, "CELERY_STATISTICS", False)
  53. # Make sure the setting exists.
  54. settings.CELERY_STATISTICS = USE_STATISTICS
  55. STARTUP_INFO_FMT = """
  56. Configuration ->
  57. * Broker -> amqp://%(vhost)s@%(host)s:%(port)s
  58. * Exchange -> %(exchange)s (%(exchange_type)s)
  59. * Consumer -> Queue:%(consumer_queue)s Routing:%(consumer_rkey)s
  60. * Concurrency -> %(concurrency)s
  61. * Statistics -> %(statistics)s
  62. """.strip()
  63. OPTION_LIST = (
  64. optparse.make_option('-c', '--concurrency', default=DAEMON_CONCURRENCY,
  65. action="store", dest="concurrency", type="int",
  66. help="Number of child processes processing the queue."),
  67. optparse.make_option('--discard', default=False,
  68. action="store_true", dest="discard",
  69. help="Discard all waiting tasks before the server is started. "
  70. "WARNING: This is unrecoverable, and the tasks will be "
  71. "deleted from the messaging server."),
  72. optparse.make_option('-s', '--statistics', default=USE_STATISTICS,
  73. action="store_true", dest="statistics",
  74. help="Collect statistics."),
  75. optparse.make_option('-f', '--logfile', default=DAEMON_LOG_FILE,
  76. action="store", dest="logfile",
  77. help="Path to log file."),
  78. optparse.make_option('-l', '--loglevel', default=DAEMON_LOG_LEVEL,
  79. action="store", dest="loglevel",
  80. help="Choose between DEBUG/INFO/WARNING/ERROR/CRITICAL/FATAL."),
  81. optparse.make_option('-p', '--pidfile', default=DAEMON_PID_FILE,
  82. action="store", dest="pidfile",
  83. help="Path to pidfile."),
  84. optparse.make_option('-d', '--detach', '--daemon', default=False,
  85. action="store_true", dest="detach",
  86. help="Run in the background as a daemon."),
  87. optparse.make_option('-S', '--supervised', default=False,
  88. action="store_true", dest="supervised",
  89. help="Restart the worker server if it dies."),
  90. optparse.make_option('-u', '--uid', default=None,
  91. action="store", dest="uid",
  92. help="User-id to run celeryd as when in daemon mode."),
  93. optparse.make_option('-g', '--gid', default=None,
  94. action="store", dest="gid",
  95. help="Group-id to run celeryd as when in daemon mode."),
  96. optparse.make_option('--umask', default=0,
  97. action="store", type="int", dest="umask",
  98. help="umask of the process when in daemon mode."),
  99. optparse.make_option('--workdir', default=None,
  100. action="store", dest="working_directory",
  101. help="Directory to change to when in daemon mode."),
  102. optparse.make_option('--chroot', default=None,
  103. action="store", dest="chroot",
  104. help="Change root directory to this path when in daemon mode."),
  105. )
  106. def run_worker(concurrency=DAEMON_CONCURRENCY, detach=False,
  107. loglevel=DAEMON_LOG_LEVEL, logfile=DAEMON_LOG_FILE, discard=False,
  108. pidfile=DAEMON_PID_FILE, umask=0, uid=None, gid=None,
  109. supervised=False, working_directory=None, chroot=None,
  110. statistics=None, **kwargs):
  111. """Starts the celery worker server."""
  112. print("Celery %s is starting." % __version__)
  113. # set SIGCLD back to the default SIG_DFL (before python-daemon overrode
  114. # it) lets the parent wait() for the terminated child process and stops
  115. # the 'OSError: [Errno 10] No child processes' problem.
  116. platform.reset_signal("SIGCLD")
  117. if statistics is not None:
  118. settings.CELERY_STATISTICS = statistics
  119. if not concurrency:
  120. concurrency = multiprocessing.cpu_count()
  121. if conf.CELERY_BACKEND == "database" \
  122. and settings.DATABASE_ENGINE == "sqlite3" and \
  123. concurrency > 1:
  124. import warnings
  125. warnings.warn("The sqlite3 database engine doesn't support "
  126. "concurrency. We'll be using a single process only.",
  127. UserWarning)
  128. concurrency = 1
  129. # Setup logging
  130. if not isinstance(loglevel, int):
  131. loglevel = LOG_LEVELS[loglevel.upper()]
  132. if not detach:
  133. logfile = None # log to stderr when not running in the background.
  134. if discard:
  135. discarded_count = discard_all()
  136. what = discarded_count > 1 and "messages" or "message"
  137. print("discard: Erased %d %s from the queue.\n" % (
  138. discarded_count, what))
  139. # Dump configuration to screen so we have some basic information
  140. # when users sends e-mails.
  141. print(STARTUP_INFO_FMT % {
  142. "vhost": getattr(settings, "AMQP_VHOST", "(default)"),
  143. "host": getattr(settings, "AMQP_SERVER", "(default)"),
  144. "port": getattr(settings, "AMQP_PORT", "(default)"),
  145. "exchange": conf.AMQP_EXCHANGE,
  146. "exchange_type": conf.AMQP_EXCHANGE_TYPE,
  147. "consumer_queue": conf.AMQP_CONSUMER_QUEUE,
  148. "consumer_rkey": conf.AMQP_CONSUMER_ROUTING_KEY,
  149. "publisher_rkey": conf.AMQP_PUBLISHER_ROUTING_KEY,
  150. "concurrency": concurrency,
  151. "loglevel": loglevel,
  152. "pidfile": pidfile,
  153. "statistics": settings.CELERY_STATISTICS and "ON" or "OFF",
  154. })
  155. print("Celery has started.")
  156. if detach:
  157. from celery.log import setup_logger, redirect_stdouts_to_logger
  158. context = platform.create_daemon_context(logfile, pidfile,
  159. chroot_directory=chroot,
  160. working_directory=working_directory,
  161. umask=umask,
  162. uid=uid,
  163. gid=gid)
  164. context.open()
  165. logger = setup_logger(loglevel, logfile)
  166. redirect_stdouts_to_logger(logger, loglevel)
  167. # Run the worker init handler.
  168. # (Usually imports task modules and such.)
  169. current_loader.on_worker_init()
  170. def run_worker():
  171. worker = WorkController(concurrency=concurrency,
  172. loglevel=loglevel,
  173. logfile=logfile,
  174. is_detached=detach)
  175. # Install signal handler that restarts celeryd on SIGHUP,
  176. # (only on POSIX systems)
  177. install_worker_restart_handler(worker)
  178. try:
  179. worker.start()
  180. except Exception, e:
  181. emergency_error(logfile, "celeryd raised exception %s: %s\n%s" % (
  182. e.__class__, e, traceback.format_exc()))
  183. try:
  184. if supervised:
  185. OFASupervisor(target=run_worker).start()
  186. else:
  187. run_worker()
  188. except:
  189. if detach:
  190. context.close()
  191. raise
  192. def install_worker_restart_handler(worker):
  193. def restart_worker_sig_handler(signum, frame):
  194. """Signal handler restarting the current python program."""
  195. worker.logger.info("Restarting celeryd (%s)" % (
  196. " ".join(sys.argv)))
  197. if worker.is_detached:
  198. pid = os.fork()
  199. if pid:
  200. worker.stop()
  201. sys.exit(0)
  202. else:
  203. worker.stop()
  204. os.execv(sys.executable, [sys.executable] + sys.argv)
  205. platform.install_signal_handler("SIGHUP", restart_worker_sig_handler)
  206. def parse_options(arguments):
  207. """Parse the available options to ``celeryd``."""
  208. parser = optparse.OptionParser(option_list=OPTION_LIST)
  209. options, values = parser.parse_args(arguments)
  210. return options
  211. if __name__ == "__main__":
  212. options = parse_options(sys.argv[1:])
  213. run_worker(**vars(options))