celeryd.py 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292
  1. #!/usr/bin/env python
  2. """celeryd
  3. .. program:: celeryd
  4. .. cmdoption:: -c, --concurrency
  5. Number of child processes processing the queue. The default
  6. is the number of CPUs available on your system.
  7. .. cmdoption:: -f, --logfile
  8. Path to log file. If no logfile is specified, ``stderr`` is used.
  9. .. cmdoption:: -l, --loglevel
  10. Logging level, choose between ``DEBUG``, ``INFO``, ``WARNING``,
  11. ``ERROR``, ``CRITICAL``, or ``FATAL``.
  12. .. cmdoption:: -p, --pidfile
  13. Path to pidfile.
  14. .. cmdoption:: -s, --statistics
  15. Turn on reporting of statistics (remember to flush the statistics message
  16. queue from time to time).
  17. .. cmdoption:: -d, --detach, --daemon
  18. Run in the background as a daemon.
  19. .. cmdoption:: -S, --supervised
  20. Restart the worker server if it dies.
  21. .. cmdoption:: --discard
  22. Discard all waiting tasks before the daemon is started.
  23. **WARNING**: This is unrecoverable, and the tasks will be
  24. deleted from the messaging server.
  25. .. cmdoption:: -u, --uid
  26. User-id to run ``celeryd`` as when in daemon mode.
  27. .. cmdoption:: -g, --gid
  28. Group-id to run ``celeryd`` as when in daemon mode.
  29. .. cmdoption:: --umask
  30. umask of the process when in daemon mode.
  31. .. cmdoption:: --workdir
  32. Directory to change to when in daemon mode.
  33. .. cmdoption:: --chroot
  34. Change root directory to this path when in daemon mode.
  35. """
  36. import os
  37. import sys
  38. CAN_DETACH = True
  39. try:
  40. import resource
  41. except ImportError:
  42. CAN_DETACH = False
  43. sys.path.append(os.getcwd())
  44. django_project_dir = os.environ.get("DJANGO_PROJECT_DIR")
  45. if django_project_dir:
  46. sys.path.append(django_project_dir)
  47. from django.conf import settings
  48. from celery import __version__
  49. from celery.supervisor import OFASupervisor
  50. from celery.log import emergency_error
  51. from celery.conf import LOG_LEVELS, DAEMON_LOG_FILE, DAEMON_LOG_LEVEL
  52. from celery.conf import DAEMON_CONCURRENCY, DAEMON_PID_FILE
  53. from celery.messaging import TaskConsumer
  54. from celery import conf
  55. from celery import discovery
  56. from celery.task import discard_all
  57. from celery.worker import WorkController
  58. from carrot.connection import DjangoAMQPConnection
  59. from celery.messaging import TaskConsumer, StatsConsumer
  60. import multiprocessing
  61. import traceback
  62. import optparse
  63. import atexit
  64. USE_STATISTICS = getattr(settings, "CELERY_STATISTICS", False)
  65. # Make sure the setting exists.
  66. settings.CELERY_STATISTICS = USE_STATISTICS
  67. STARTUP_INFO_FMT = """
  68. Configuration ->
  69. * Broker -> amqp://%(vhost)s@%(host)s:%(port)s
  70. * Exchange -> %(exchange)s (%(exchange_type)s)
  71. * Consumer -> Queue:%(consumer_queue)s Routing:%(consumer_rkey)s
  72. * Concurrency -> %(concurrency)s
  73. * Statistics -> %(statistics)s
  74. """.strip()
  75. OPTION_LIST = (
  76. optparse.make_option('-c', '--concurrency', default=DAEMON_CONCURRENCY,
  77. action="store", dest="concurrency", type="int",
  78. help="Number of child processes processing the queue."),
  79. optparse.make_option('--discard', default=False,
  80. action="store_true", dest="discard",
  81. help="Discard all waiting tasks before the server is started. "
  82. "WARNING: This is unrecoverable, and the tasks will be "
  83. "deleted from the messaging server."),
  84. optparse.make_option('-s', '--statistics', default=USE_STATISTICS,
  85. action="store_true", dest="statistics",
  86. help="Collect statistics."),
  87. optparse.make_option('-f', '--logfile', default=DAEMON_LOG_FILE,
  88. action="store", dest="logfile",
  89. help="Path to log file."),
  90. optparse.make_option('-l', '--loglevel', default=DAEMON_LOG_LEVEL,
  91. action="store", dest="loglevel",
  92. help="Choose between DEBUG/INFO/WARNING/ERROR/CRITICAL/FATAL."),
  93. optparse.make_option('-p', '--pidfile', default=DAEMON_PID_FILE,
  94. action="store", dest="pidfile",
  95. help="Path to pidfile."),
  96. optparse.make_option('-d', '--detach', '--daemon', default=False,
  97. action="store_true", dest="detach",
  98. help="Run in the background as a daemon."),
  99. optparse.make_option('-S', '--supervised', default=False,
  100. action="store_true", dest="supervised",
  101. help="Restart the worker server if it dies."),
  102. optparse.make_option('-u', '--uid', default=None,
  103. action="store", dest="uid",
  104. help="User-id to run celeryd as when in daemon mode."),
  105. optparse.make_option('-g', '--gid', default=None,
  106. action="store", dest="gid",
  107. help="Group-id to run celeryd as when in daemon mode."),
  108. optparse.make_option('--umask', default=0,
  109. action="store", type="int", dest="umask",
  110. help="umask of the process when in daemon mode."),
  111. optparse.make_option('--workdir', default=None,
  112. action="store", dest="working_directory",
  113. help="Directory to change to when in daemon mode."),
  114. optparse.make_option('--chroot', default=None,
  115. action="store", dest="chroot",
  116. help="Change root directory to this path when in daemon mode."),
  117. )
  118. def acquire_pidlock(pidfile):
  119. """Get the :class:`daemon.pidlockfile.PIDLockFile` handler for
  120. ``pidfile``.
  121. If the ``pidfile`` already exists, but the process is not running the
  122. ``pidfile`` will be removed, a ``"stale pidfile"`` message is emitted
  123. and execution continues as normally. However, if the process is still
  124. running the program will exit complaning that the program is already
  125. running in the background somewhere.
  126. """
  127. from daemon.pidlockfile import PIDLockFile
  128. import errno
  129. pidlock = PIDLockFile(pidfile)
  130. if not pidlock.is_locked():
  131. return pidlock
  132. pid = pidlock.read_pid()
  133. try:
  134. os.kill(pid, 0)
  135. except os.error, exc:
  136. if exc.errno == errno.ESRCH:
  137. sys.stderr.write("Stale pidfile exists. Removing it.\n")
  138. pidlock.release()
  139. return PIDLockFile(pidfile)
  140. else:
  141. raise SystemExit(
  142. "ERROR: Pidfile (%s) already exists.\n"
  143. "Seems celeryd is already running? (PID: %d)" % (
  144. pidfile, pid))
  145. return pidlock
  146. def run_worker(concurrency=DAEMON_CONCURRENCY, detach=False,
  147. loglevel=DAEMON_LOG_LEVEL, logfile=DAEMON_LOG_FILE, discard=False,
  148. pidfile=DAEMON_PID_FILE, umask=0, uid=None, gid=None,
  149. supervised=False, working_directory=None, chroot=None,
  150. statistics=None, **kwargs):
  151. """Starts the celery worker server."""
  152. print("Celery %s is starting." % __version__)
  153. if statistics:
  154. settings.CELERY_STATISTICS = statistics
  155. if not concurrency:
  156. concurrency = multiprocessing.cpu_count()
  157. if settings.DATABASE_ENGINE == "sqlite3" and concurrency > 1:
  158. import warnings
  159. warnings.warn("The sqlite3 database engine doesn't support "
  160. "concurrency. We'll be using a single process only.",
  161. UserWarning)
  162. concurrency = 1
  163. # Setup logging
  164. if not isinstance(loglevel, int):
  165. loglevel = LOG_LEVELS[loglevel.upper()]
  166. if not detach:
  167. logfile = None # log to stderr when not running in the background.
  168. if discard:
  169. discarded_count = discard_all()
  170. what = discarded_count > 1 and "messages" or "message"
  171. print("discard: Erased %d %s from the queue.\n" % (
  172. discarded_count, what))
  173. # Dump configuration to screen so we have some basic information
  174. # when users sends e-mails.
  175. print(STARTUP_INFO_FMT % {
  176. "vhost": settings.AMQP_VHOST,
  177. "host": settings.AMQP_SERVER,
  178. "port": settings.AMQP_PORT,
  179. "exchange": conf.AMQP_EXCHANGE,
  180. "exchange_type": conf.AMQP_EXCHANGE_TYPE,
  181. "consumer_queue": conf.AMQP_CONSUMER_QUEUE,
  182. "consumer_rkey": conf.AMQP_CONSUMER_ROUTING_KEY,
  183. "publisher_rkey": conf.AMQP_PUBLISHER_ROUTING_KEY,
  184. "concurrency": concurrency,
  185. "loglevel": loglevel,
  186. "pidfile": pidfile,
  187. "statistics": settings.CELERY_STATISTICS and "ON" or "OFF",
  188. })
  189. print("Celery has started.")
  190. if detach:
  191. if not CAN_DETACH:
  192. raise RuntimeError(
  193. "This operating system doesn't support detach. ")
  194. from daemon import DaemonContext
  195. # Since without stderr any errors will be silently suppressed,
  196. # we need to know that we have access to the logfile
  197. if logfile:
  198. open(logfile, "a").close()
  199. pidlock = acquire_pidlock(pidfile)
  200. if not umask:
  201. umask = 0
  202. uid = uid and int(uid) or os.geteuid()
  203. gid = gid and int(gid) or os.getegid()
  204. working_directory = working_directory or os.getcwd()
  205. context = DaemonContext(chroot_directory=chroot,
  206. working_directory=working_directory,
  207. umask=umask,
  208. pidfile=pidlock,
  209. uid=uid,
  210. gid=gid)
  211. context.open()
  212. discovery.autodiscover()
  213. def run_worker():
  214. worker = WorkController(concurrency=concurrency,
  215. loglevel=loglevel,
  216. logfile=logfile,
  217. is_detached=detach)
  218. try:
  219. worker.run()
  220. except Exception, e:
  221. emergency_error(logfile, "celeryd raised exception %s: %s\n%s" % (
  222. e.__class__, e, traceback.format_exc()))
  223. try:
  224. if supervised:
  225. OFASupervisor(target=run_worker).start()
  226. else:
  227. run_worker()
  228. except:
  229. if detach:
  230. context.close()
  231. raise
  232. def parse_options(arguments):
  233. """Parse the available options to ``celeryd``."""
  234. parser = optparse.OptionParser(option_list=OPTION_LIST)
  235. options, values = parser.parse_args(arguments)
  236. return options
  237. if __name__ == "__main__":
  238. options = parse_options(sys.argv[1:])
  239. run_worker(**options)