celeryd.py 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282
  1. #!/usr/bin/env python
  2. """celeryd
  3. .. program:: celeryd
  4. .. cmdoption:: -c, --concurrency
  5. Number of child processes processing the queue. The default
  6. is the number of CPUs available on your system.
  7. .. cmdoption:: -f, --logfile
  8. Path to log file. If no logfile is specified, ``stderr`` is used.
  9. .. cmdoption:: -l, --loglevel
  10. Logging level, choose between ``DEBUG``, ``INFO``, ``WARNING``,
  11. ``ERROR``, ``CRITICAL``, or ``FATAL``.
  12. .. cmdoption:: -p, --pidfile
  13. Path to pidfile.
  14. .. cmdoption:: -B, --beat
  15. Also run the ``celerybeat`` periodic task scheduler. Please note that
  16. there must only be one instance of this service.
  17. .. cmdoption:: -E, --events
  18. Send events that can be captured by monitors like ``celerymon``.
  19. .. cmdoption:: -d, --detach, --daemon
  20. Run in the background as a daemon.
  21. .. cmdoption:: --discard
  22. Discard all waiting tasks before the daemon is started.
  23. **WARNING**: This is unrecoverable, and the tasks will be
  24. deleted from the messaging server.
  25. .. cmdoption:: -u, --uid
  26. User-id to run ``celeryd`` as when in daemon mode.
  27. .. cmdoption:: -g, --gid
  28. Group-id to run ``celeryd`` as when in daemon mode.
  29. .. cmdoption:: --umask
  30. umask of the process when in daemon mode.
  31. .. cmdoption:: --workdir
  32. Directory to change to when in daemon mode.
  33. .. cmdoption:: --chroot
  34. Change root directory to this path when in daemon mode.
  35. """
  36. import os
  37. import sys
  38. import logging
  39. import optparse
  40. import traceback
  41. import multiprocessing
  42. from celery import conf
  43. from celery import platform
  44. from celery import __version__
  45. from celery.log import emergency_error
  46. from celery.task import discard_all
  47. from celery.utils import noop
  48. from celery.utils import info
  49. from celery.worker import WorkController
  50. STARTUP_INFO_FMT = """
  51. Configuration ->
  52. . broker -> %(conninfo)s
  53. . queues ->
  54. %(queues)s
  55. . concurrency -> %(concurrency)s
  56. . loader -> %(loader)s
  57. . sys -> logfile:%(logfile)s@%(loglevel)s %(pidfile)s
  58. . events -> %(events)s
  59. . beat -> %(celerybeat)s
  60. %(tasks)s
  61. """.strip()
  62. TASK_LIST_FMT = """ . tasks ->\n%s"""
  63. OPTION_LIST = (
  64. optparse.make_option('-c', '--concurrency',
  65. default=conf.CELERYD_CONCURRENCY,
  66. action="store", dest="concurrency", type="int",
  67. help="Number of child processes processing the queue."),
  68. optparse.make_option('--discard', default=False,
  69. action="store_true", dest="discard",
  70. help="Discard all waiting tasks before the server is started. "
  71. "WARNING: This is unrecoverable, and the tasks will be "
  72. "deleted from the messaging server."),
  73. optparse.make_option('-f', '--logfile', default=conf.CELERYD_LOG_FILE,
  74. action="store", dest="logfile",
  75. help="Path to log file."),
  76. optparse.make_option('-l', '--loglevel', default=conf.CELERYD_LOG_LEVEL,
  77. action="store", dest="loglevel",
  78. help="Choose between DEBUG/INFO/WARNING/ERROR/CRITICAL/FATAL."),
  79. optparse.make_option('-p', '--pidfile', default=conf.CELERYD_PID_FILE,
  80. action="store", dest="pidfile",
  81. help="Path to pidfile."),
  82. optparse.make_option('-B', '--beat', default=False,
  83. action="store_true", dest="run_clockservice",
  84. help="Also run the celerybeat periodic task scheduler. \
  85. Please note that only one instance must be running."),
  86. optparse.make_option('-E', '--events', default=conf.SEND_EVENTS,
  87. action="store_true", dest="events",
  88. help="Send events so celery can be monitored by e.g. celerymon."),
  89. optparse.make_option('-d', '--detach', '--daemon', default=False,
  90. action="store_true", dest="detach",
  91. help="Run in the background as a daemon."),
  92. optparse.make_option('-u', '--uid', default=None,
  93. action="store", dest="uid",
  94. help="User-id to run celeryd as when in daemon mode."),
  95. optparse.make_option('-g', '--gid', default=None,
  96. action="store", dest="gid",
  97. help="Group-id to run celeryd as when in daemon mode."),
  98. optparse.make_option('--umask', default=0,
  99. action="store", type="int", dest="umask",
  100. help="umask of the process when in daemon mode."),
  101. optparse.make_option('--workdir', default=None,
  102. action="store", dest="working_directory",
  103. help="Directory to change to when in daemon mode."),
  104. optparse.make_option('--chroot', default=None,
  105. action="store", dest="chroot",
  106. help="Change root directory to this path when in daemon mode."),
  107. )
  108. def run_worker(concurrency=conf.CELERYD_CONCURRENCY, detach=False,
  109. loglevel=conf.CELERYD_LOG_LEVEL, logfile=conf.CELERYD_LOG_FILE,
  110. discard=False, pidfile=conf.CELERYD_PID_FILE, umask=0,
  111. uid=None, gid=None, working_directory=None,
  112. chroot=None, run_clockservice=False, events=False, **kwargs):
  113. """Starts the celery worker server."""
  114. print("Celery %s is starting." % __version__)
  115. from celery.loaders import Loader, current_loader, settings
  116. if not concurrency:
  117. concurrency = multiprocessing.cpu_count()
  118. if conf.CELERY_BACKEND == "database" \
  119. and settings.DATABASE_ENGINE == "sqlite3" and \
  120. concurrency > 1:
  121. import warnings
  122. warnings.warn("The sqlite3 database engine doesn't support "
  123. "concurrency. We'll be using a single process only.",
  124. UserWarning)
  125. concurrency = 1
  126. # Setup logging
  127. if not isinstance(loglevel, int):
  128. loglevel = conf.LOG_LEVELS[loglevel.upper()]
  129. if discard:
  130. discarded_count = discard_all()
  131. what = discarded_count > 1 and "messages" or "message"
  132. print("discard: Erased %d %s from the queue.\n" % (
  133. discarded_count, what))
  134. # Run the worker init handler.
  135. # (Usually imports task modules and such.)
  136. current_loader.on_worker_init()
  137. # Dump configuration to screen so we have some basic information
  138. # when users sends e-mails.
  139. tasklist = ""
  140. if loglevel <= logging.INFO:
  141. from celery.registry import tasks
  142. tasklist = tasks.keys()
  143. if not loglevel <= logging.DEBUG:
  144. tasklist = filter(lambda s: not s.startswith("celery."), tasklist)
  145. tasklist = TASK_LIST_FMT % "\n".join(" . %s" % task
  146. for task in sorted(tasklist))
  147. print(STARTUP_INFO_FMT % {
  148. "conninfo": info.format_broker_info(),
  149. "queues": info.format_routing_table(indent=8),
  150. "concurrency": concurrency,
  151. "loglevel": conf.LOG_LEVELS[loglevel],
  152. "logfile": logfile or "[stderr]",
  153. "pidfile": detach and "pidfile:%s" % pidfile or "",
  154. "celerybeat": run_clockservice and "ON" or "OFF",
  155. "events": events and "ON" or "OFF",
  156. "tasks": tasklist,
  157. "loader": Loader.__module__,
  158. })
  159. print("Celery has started.")
  160. set_process_status("Running...")
  161. on_stop = noop
  162. if detach:
  163. from celery.log import setup_logger, redirect_stdouts_to_logger
  164. context, on_stop = platform.create_daemon_context(logfile, pidfile,
  165. chroot_directory=chroot,
  166. working_directory=working_directory,
  167. umask=umask)
  168. context.open()
  169. logger = setup_logger(loglevel, logfile)
  170. redirect_stdouts_to_logger(logger, loglevel)
  171. platform.set_effective_user(uid, gid)
  172. def run_worker():
  173. worker = WorkController(concurrency=concurrency,
  174. loglevel=loglevel,
  175. logfile=logfile,
  176. embed_clockservice=run_clockservice,
  177. send_events=events,
  178. is_detached=detach)
  179. from celery import signals
  180. signals.worker_init.send(sender=worker)
  181. # Install signal handler that restarts celeryd on SIGHUP,
  182. # (only on POSIX systems)
  183. install_worker_restart_handler(worker)
  184. try:
  185. worker.start()
  186. except Exception, e:
  187. emergency_error(logfile, "celeryd raised exception %s: %s\n%s" % (
  188. e.__class__, e, traceback.format_exc()))
  189. try:
  190. run_worker()
  191. except:
  192. set_process_status("Exiting...")
  193. on_stop()
  194. raise
  195. def install_worker_restart_handler(worker):
  196. def restart_worker_sig_handler(signum, frame):
  197. """Signal handler restarting the current python program."""
  198. worker.logger.info("Restarting celeryd (%s)" % (
  199. " ".join(sys.argv)))
  200. if worker.is_detached:
  201. pid = os.fork()
  202. if pid:
  203. worker.stop()
  204. sys.exit(0)
  205. else:
  206. worker.stop()
  207. os.execv(sys.executable, [sys.executable] + sys.argv)
  208. platform.install_signal_handler("SIGHUP", restart_worker_sig_handler)
  209. def parse_options(arguments):
  210. """Parse the available options to ``celeryd``."""
  211. parser = optparse.OptionParser(option_list=OPTION_LIST)
  212. options, values = parser.parse_args(arguments)
  213. return options
  214. def set_process_status(info):
  215. arg_start = "manage" in sys.argv[0] and 2 or 1
  216. if sys.argv[arg_start:]:
  217. info = "%s (%s)" % (info, " ".join(sys.argv[arg_start:]))
  218. platform.set_mp_process_title("celeryd", info=info)
  219. def main():
  220. options = parse_options(sys.argv[1:])
  221. run_worker(**vars(options))
  222. if __name__ == "__main__":
  223. main()