celeryd.py 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238
  1. #!/usr/bin/env python
  2. """celeryd
  3. .. program:: celeryd
  4. .. cmdoption:: -c, --concurrency
  5. Number of child processes processing the queue.
  6. .. cmdoption:: -f, --logfile
  7. Path to log file. If no logfile is specified, ``stderr`` is used.
  8. .. cmdoption:: -l, --loglevel
  9. Logging level, choose between ``DEBUG``, ``INFO``, ``WARNING``,
  10. ``ERROR``, ``CRITICAL``, or ``FATAL``.
  11. .. cmdoption:: -p, --pidfile
  12. Path to pidfile.
  13. .. cmdoption:: -d, --detach, --daemon
  14. Run in the background as a daemon.
  15. .. cmdoption:: --discard
  16. Discard all waiting tasks before the daemon is started.
  17. **WARNING**: This is unrecoverable, and the tasks will be
  18. deleted from the messaging server.
  19. .. cmdoption:: -u, --uid
  20. User-id to run ``celeryd`` as when in daemon mode.
  21. .. cmdoption:: -g, --gid
  22. Group-id to run ``celeryd`` as when in daemon mode.
  23. .. cmdoption:: --umask
  24. umask of the process when in daemon mode.
  25. .. cmdoption:: --workdir
  26. Directory to change to when in daemon mode.
  27. .. cmdoption:: --chroot
  28. Change root directory to this path when in daemon mode.
  29. """
  30. import os
  31. import sys
  32. sys.path.append(os.getcwd())
  33. django_project_dir = os.environ.get("DJANGO_PROJECT_DIR")
  34. if django_project_dir:
  35. sys.path.append(django_project_dir)
  36. from django.conf import settings
  37. from celery.log import emergency_error
  38. from celery.conf import LOG_LEVELS, DAEMON_LOG_FILE, DAEMON_LOG_LEVEL
  39. from celery.conf import DAEMON_CONCURRENCY, DAEMON_PID_FILE
  40. from celery import conf
  41. from celery import discovery
  42. from celery.task import discard_all
  43. from celery.worker import WorkController
  44. import multiprocessing
  45. import traceback
  46. import optparse
  47. import atexit
  48. from daemon import DaemonContext
  49. from daemon.pidlockfile import PIDLockFile
  50. import errno
  51. STARTUP_INFO_FMT = """
  52. * Celery loading with the following configuration
  53. * Broker -> amqp://%(vhost)s@%(host)s:%(port)s
  54. * Exchange -> %(exchange)s (%(exchange_type)s)
  55. * Consumer -> Queue:%(consumer_queue)s Routing:%(consumer_rkey)s
  56. * Concurrency:%(concurrency)s
  57. """.strip()
  58. def acquire_pidlock(pidfile):
  59. """Get the :class:`daemon.pidlockfile.PIDLockFile` handler for
  60. ``pidfile``.
  61. If the ``pidfile`` already exists, but the process is not running the
  62. ``pidfile`` will be removed, a ``"stale pidfile"`` message is emitted
  63. and execution continues as normally. However, if the process is still
  64. running the program will exit complaning that the program is already
  65. running in the background somewhere.
  66. """
  67. pidlock = PIDLockFile(pidfile)
  68. if not pidlock.is_locked():
  69. return pidlock
  70. pid = pidlock.read_pid()
  71. try:
  72. os.kill(pid, 0)
  73. except os.error, exc:
  74. if exc.errno == errno.ESRCH:
  75. sys.stderr.write("Stale pidfile exists. Removing it.\n")
  76. pidlock.release()
  77. return
  78. else:
  79. raise SystemExit(
  80. "ERROR: Pidfile (%s) already exists.\n"
  81. "Seems celeryd is already running? (PID: %d)" % (
  82. pidfile, pid))
  83. return pidlock
  84. def run_worker(concurrency=DAEMON_CONCURRENCY, daemon=False,
  85. loglevel=DAEMON_LOG_LEVEL, logfile=DAEMON_LOG_FILE, discard=False,
  86. pidfile=DAEMON_PID_FILE, umask=0, uid=None, gid=None,
  87. working_directory=None, chroot=None, **kwargs):
  88. """Run the celery daemon."""
  89. if not concurrency:
  90. concurrency = multiprocessing.cpu_count()
  91. if settings.DATABASE_ENGINE == "sqlite3" and concurrency > 1:
  92. import warnings
  93. warnings.warn("The sqlite3 database engine doesn't support "
  94. "concurrency. We'll be using a single process only.",
  95. UserWarning)
  96. concurrency = 1
  97. if not isinstance(loglevel, int):
  98. loglevel = LOG_LEVELS[loglevel.upper()]
  99. if discard:
  100. discarded_count = discard_all()
  101. what = discard_count > 1 and "messages" or "message"
  102. sys.stderr.write("Discard: Erased %d %s from the queue.\n" % (
  103. discarded_count, what))
  104. startup_info = STARTUP_INFO_FMT % {
  105. "vhost": settings.AMQP_VHOST,
  106. "host": settings.AMQP_SERVER,
  107. "port": settings.AMQP_PORT,
  108. "exchange": conf.AMQP_EXCHANGE,
  109. "exchange_type": conf.AMQP_EXCHANGE_TYPE,
  110. "consumer_queue": conf.AMQP_CONSUMER_QUEUE,
  111. "consumer_rkey": conf.AMQP_CONSUMER_ROUTING_KEY,
  112. "publisher_rkey": conf.AMQP_PUBLISHER_ROUTING_KEY,
  113. "concurrency": concurrency,
  114. "loglevel": loglevel,
  115. "pidfile": pidfile,
  116. }
  117. sys.stderr.write(startup_info + "\n")
  118. if daemon:
  119. # Since without stderr any errors will be silently suppressed,
  120. # we need to know that we have access to the logfile
  121. pidlock = acquire_pidlock(pidfile)
  122. if not umask:
  123. umask = 0
  124. if logfile:
  125. open(logfile, "a").close()
  126. uid = uid and int(uid) or os.geteuid()
  127. gid = gid and int(gid) or os.getegid()
  128. working_directory = working_directory or os.getcwd()
  129. sys.stderr.write("* Launching celeryd in the background...\n")
  130. context = DaemonContext(chroot_directory=chroot,
  131. working_directory=working_directory,
  132. umask=umask,
  133. pidfile=pidlock,
  134. uid=uid,
  135. gid=gid)
  136. context.open()
  137. else:
  138. logfile = None # log to stderr when not running as daemon.
  139. discovery.autodiscover()
  140. celeryd = WorkController(concurrency=concurrency,
  141. loglevel=loglevel,
  142. logfile=logfile,
  143. is_detached=daemon)
  144. try:
  145. celeryd.run()
  146. except Exception, e:
  147. emergency_error(logfile, "celeryd raised exception %s: %s\n%s" % (
  148. e.__class__, e, traceback.format_exc()))
  149. except:
  150. if daemon:
  151. context.close()
  152. raise
  153. OPTION_LIST = (
  154. optparse.make_option('-c', '--concurrency', default=DAEMON_CONCURRENCY,
  155. action="store", dest="concurrency", type="int",
  156. help="Number of child processes processing the queue."),
  157. optparse.make_option('--discard', default=False,
  158. action="store_true", dest="discard",
  159. help="Discard all waiting tasks before the daemon is started. "
  160. "WARNING: This is unrecoverable, and the tasks will be "
  161. "deleted from the messaging server."),
  162. optparse.make_option('-f', '--logfile', default=DAEMON_LOG_FILE,
  163. action="store", dest="logfile",
  164. help="Path to log file."),
  165. optparse.make_option('-l', '--loglevel', default=DAEMON_LOG_LEVEL,
  166. action="store", dest="loglevel",
  167. help="Choose between DEBUG/INFO/WARNING/ERROR/CRITICAL/FATAL."),
  168. optparse.make_option('-p', '--pidfile', default=DAEMON_PID_FILE,
  169. action="store", dest="pidfile",
  170. help="Path to pidfile."),
  171. optparse.make_option('-d', '--detach', '--daemon', default=False,
  172. action="store_true", dest="daemon",
  173. help="Run in the background as a daemon."),
  174. optparse.make_option('-u', '--uid', default=None,
  175. action="store", dest="uid",
  176. help="User-id to run celeryd as when in daemon mode."),
  177. optparse.make_option('-g', '--gid', default=None,
  178. action="store", dest="gid",
  179. help="Group-id to run celeryd as when in daemon mode."),
  180. optparse.make_option('--umask', default=0,
  181. action="store", type="int", dest="umask",
  182. help="umask of the process when in daemon mode."),
  183. optparse.make_option('--workdir', default=None,
  184. action="store", dest="working_directory",
  185. help="Directory to change to when in daemon mode."),
  186. optparse.make_option('--chroot', default=None,
  187. action="store", dest="chroot",
  188. help="Change root directory to this path when in daemon mode."),
  189. )
  190. def parse_options(arguments):
  191. """Parse the available options to ``celeryd``."""
  192. parser = optparse.OptionParser(option_list=OPTION_LIST)
  193. options, values = parser.parse_args(arguments)
  194. return options
  195. if __name__ == "__main__":
  196. options = parse_options(sys.argv[1:])
  197. run_worker(**options)