celeryd.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315
  1. #!/usr/bin/env python
  2. """celeryd
  3. .. program:: celeryd
  4. .. cmdoption:: -c, --concurrency
  5. Number of child processes processing the queue. The default
  6. is the number of CPUs available on your system.
  7. .. cmdoption:: -f, --logfile
  8. Path to log file. If no logfile is specified, ``stderr`` is used.
  9. .. cmdoption:: -l, --loglevel
  10. Logging level, choose between ``DEBUG``, ``INFO``, ``WARNING``,
  11. ``ERROR``, ``CRITICAL``, or ``FATAL``.
  12. .. cmdoption:: -p, --pidfile
  13. Path to pidfile.
  14. .. cmdoption:: -s, --statistics
  15. Turn on reporting of statistics (remember to flush the statistics message
  16. queue from time to time).
  17. .. cmdoption:: -d, --detach, --daemon
  18. Run in the background as a daemon.
  19. .. cmdoption:: -S, --supervised
  20. Restart the worker server if it dies.
  21. .. cmdoption:: --discard
  22. Discard all waiting tasks before the daemon is started.
  23. **WARNING**: This is unrecoverable, and the tasks will be
  24. deleted from the messaging server.
  25. .. cmdoption:: -u, --uid
  26. User-id to run ``celeryd`` as when in daemon mode.
  27. .. cmdoption:: -g, --gid
  28. Group-id to run ``celeryd`` as when in daemon mode.
  29. .. cmdoption:: --umask
  30. umask of the process when in daemon mode.
  31. .. cmdoption:: --workdir
  32. Directory to change to when in daemon mode.
  33. .. cmdoption:: --chroot
  34. Change root directory to this path when in daemon mode.
  35. """
  36. import os
  37. import sys
  38. CAN_DETACH = True
  39. try:
  40. import resource
  41. except ImportError:
  42. CAN_DETACH = False
  43. from celery.loaders import current_loader
  44. from celery.loaders import settings
  45. from celery import __version__
  46. from celery.supervisor import OFASupervisor
  47. from celery.log import emergency_error
  48. from celery.conf import LOG_LEVELS, DAEMON_LOG_FILE, DAEMON_LOG_LEVEL
  49. from celery.conf import DAEMON_CONCURRENCY, DAEMON_PID_FILE
  50. from celery import conf
  51. from celery import discovery
  52. from celery.task import discard_all
  53. from celery.worker import WorkController
  54. from signal import signal, SIGHUP
  55. import multiprocessing
  56. import traceback
  57. import optparse
  58. import atexit
  59. USE_STATISTICS = getattr(settings, "CELERY_STATISTICS", False)
  60. # Make sure the setting exists.
  61. settings.CELERY_STATISTICS = USE_STATISTICS
  62. STARTUP_INFO_FMT = """
  63. Configuration ->
  64. * Broker -> amqp://%(vhost)s@%(host)s:%(port)s
  65. * Exchange -> %(exchange)s (%(exchange_type)s)
  66. * Consumer -> Queue:%(consumer_queue)s Routing:%(consumer_rkey)s
  67. * Concurrency -> %(concurrency)s
  68. * Statistics -> %(statistics)s
  69. """.strip()
  70. OPTION_LIST = (
  71. optparse.make_option('-c', '--concurrency', default=DAEMON_CONCURRENCY,
  72. action="store", dest="concurrency", type="int",
  73. help="Number of child processes processing the queue."),
  74. optparse.make_option('--discard', default=False,
  75. action="store_true", dest="discard",
  76. help="Discard all waiting tasks before the server is started. "
  77. "WARNING: This is unrecoverable, and the tasks will be "
  78. "deleted from the messaging server."),
  79. optparse.make_option('-s', '--statistics', default=USE_STATISTICS,
  80. action="store_true", dest="statistics",
  81. help="Collect statistics."),
  82. optparse.make_option('-f', '--logfile', default=DAEMON_LOG_FILE,
  83. action="store", dest="logfile",
  84. help="Path to log file."),
  85. optparse.make_option('-l', '--loglevel', default=DAEMON_LOG_LEVEL,
  86. action="store", dest="loglevel",
  87. help="Choose between DEBUG/INFO/WARNING/ERROR/CRITICAL/FATAL."),
  88. optparse.make_option('-p', '--pidfile', default=DAEMON_PID_FILE,
  89. action="store", dest="pidfile",
  90. help="Path to pidfile."),
  91. optparse.make_option('-d', '--detach', '--daemon', default=False,
  92. action="store_true", dest="detach",
  93. help="Run in the background as a daemon."),
  94. optparse.make_option('-S', '--supervised', default=False,
  95. action="store_true", dest="supervised",
  96. help="Restart the worker server if it dies."),
  97. optparse.make_option('-u', '--uid', default=None,
  98. action="store", dest="uid",
  99. help="User-id to run celeryd as when in daemon mode."),
  100. optparse.make_option('-g', '--gid', default=None,
  101. action="store", dest="gid",
  102. help="Group-id to run celeryd as when in daemon mode."),
  103. optparse.make_option('--umask', default=0,
  104. action="store", type="int", dest="umask",
  105. help="umask of the process when in daemon mode."),
  106. optparse.make_option('--workdir', default=None,
  107. action="store", dest="working_directory",
  108. help="Directory to change to when in daemon mode."),
  109. optparse.make_option('--chroot', default=None,
  110. action="store", dest="chroot",
  111. help="Change root directory to this path when in daemon mode."),
  112. )
  113. def acquire_pidlock(pidfile):
  114. """Get the :class:`daemon.pidlockfile.PIDLockFile` handler for
  115. ``pidfile``.
  116. If the ``pidfile`` already exists, but the process is not running the
  117. ``pidfile`` will be removed, a ``"stale pidfile"`` message is emitted
  118. and execution continues as normally. However, if the process is still
  119. running the program will exit complaning that the program is already
  120. running in the background somewhere.
  121. """
  122. from daemon.pidlockfile import PIDLockFile
  123. import errno
  124. pidlock = PIDLockFile(pidfile)
  125. if not pidlock.is_locked():
  126. return pidlock
  127. pid = pidlock.read_pid()
  128. try:
  129. os.kill(pid, 0)
  130. except os.error, exc:
  131. if exc.errno == errno.ESRCH:
  132. sys.stderr.write("Stale pidfile exists. Removing it.\n")
  133. os.unlink(pidfile)
  134. return PIDLockFile(pidfile)
  135. else:
  136. raise SystemExit(
  137. "ERROR: Pidfile (%s) already exists.\n"
  138. "Seems celeryd is already running? (PID: %d)" % (
  139. pidfile, pid))
  140. return pidlock
  141. def run_worker(concurrency=DAEMON_CONCURRENCY, detach=False,
  142. loglevel=DAEMON_LOG_LEVEL, logfile=DAEMON_LOG_FILE, discard=False,
  143. pidfile=DAEMON_PID_FILE, umask=0, uid=None, gid=None,
  144. supervised=False, working_directory=None, chroot=None,
  145. statistics=None, **kwargs):
  146. """Starts the celery worker server."""
  147. print("Celery %s is starting." % __version__)
  148. if statistics:
  149. settings.CELERY_STATISTICS = statistics
  150. if not concurrency:
  151. concurrency = multiprocessing.cpu_count()
  152. if settings.DATABASE_ENGINE == "sqlite3" and concurrency > 1:
  153. import warnings
  154. warnings.warn("The sqlite3 database engine doesn't support "
  155. "concurrency. We'll be using a single process only.",
  156. UserWarning)
  157. concurrency = 1
  158. # Setup logging
  159. if not isinstance(loglevel, int):
  160. loglevel = LOG_LEVELS[loglevel.upper()]
  161. if not detach:
  162. logfile = None # log to stderr when not running in the background.
  163. if discard:
  164. discarded_count = discard_all()
  165. what = discarded_count > 1 and "messages" or "message"
  166. print("discard: Erased %d %s from the queue.\n" % (
  167. discarded_count, what))
  168. # Dump configuration to screen so we have some basic information
  169. # when users sends e-mails.
  170. print(STARTUP_INFO_FMT % {
  171. "vhost": getattr(settings, "AMQP_VHOST", "(default)"),
  172. "host": getattr(settings, "AMQP_SERVER", "(default)"),
  173. "port": getattr(settings, "AMQP_PORT", "(default)"),
  174. "exchange": conf.AMQP_EXCHANGE,
  175. "exchange_type": conf.AMQP_EXCHANGE_TYPE,
  176. "consumer_queue": conf.AMQP_CONSUMER_QUEUE,
  177. "consumer_rkey": conf.AMQP_CONSUMER_ROUTING_KEY,
  178. "publisher_rkey": conf.AMQP_PUBLISHER_ROUTING_KEY,
  179. "concurrency": concurrency,
  180. "loglevel": loglevel,
  181. "pidfile": pidfile,
  182. "statistics": settings.CELERY_STATISTICS and "ON" or "OFF",
  183. })
  184. print("Celery has started.")
  185. if detach:
  186. if not CAN_DETACH:
  187. raise RuntimeError(
  188. "This operating system doesn't support detach. ")
  189. from daemon import DaemonContext
  190. # Since without stderr any errors will be silently suppressed,
  191. # we need to know that we have access to the logfile
  192. if logfile:
  193. open(logfile, "a").close()
  194. pidlock = acquire_pidlock(pidfile)
  195. if not umask:
  196. umask = 0
  197. uid = uid and int(uid) or os.geteuid()
  198. gid = gid and int(gid) or os.getegid()
  199. working_directory = working_directory or os.getcwd()
  200. context = DaemonContext(chroot_directory=chroot,
  201. working_directory=working_directory,
  202. umask=umask,
  203. pidfile=pidlock,
  204. uid=uid,
  205. gid=gid)
  206. context.open()
  207. # Run the worker init handler.
  208. # (Usually imports task modules and such.)
  209. current_loader.on_worker_init()
  210. def run_worker():
  211. worker = WorkController(concurrency=concurrency,
  212. loglevel=loglevel,
  213. logfile=logfile,
  214. is_detached=detach)
  215. # Install signal handler that restarts celeryd on SIGHUP
  216. install_restart_signal_handler(worker)
  217. try:
  218. worker.start()
  219. except Exception, e:
  220. emergency_error(logfile, "celeryd raised exception %s: %s\n%s" % (
  221. e.__class__, e, traceback.format_exc()))
  222. try:
  223. if supervised:
  224. OFASupervisor(target=run_worker).start()
  225. else:
  226. run_worker()
  227. except:
  228. if detach:
  229. context.close()
  230. raise
  231. def install_restart_signal_handler(worker):
  232. """Installs a signal handler that restarts the current program
  233. when it receives the ``SIGHUP`` signal.
  234. """
  235. def restart_self(signum, frame):
  236. """Signal handler restarting the current python program."""
  237. worker.logger.info("Restarting celeryd (%s)" % (
  238. " ".join(sys.argv)))
  239. if worker.is_detached:
  240. pid = os.fork()
  241. if pid:
  242. worker.stop()
  243. sys.exit(0)
  244. else:
  245. worker.stop()
  246. os.execv(sys.executable, [sys.executable] + sys.argv)
  247. signal(SIGHUP, restart_self)
  248. def parse_options(arguments):
  249. """Parse the available options to ``celeryd``."""
  250. parser = optparse.OptionParser(option_list=OPTION_LIST)
  251. options, values = parser.parse_args(arguments)
  252. return options
  253. if __name__ == "__main__":
  254. options = parse_options(sys.argv[1:])
  255. run_worker(**vars(options))