celeryd.py 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235
  1. #!/usr/bin/env python
  2. """celeryd
  3. .. program:: celeryd
  4. .. cmdoption:: -c, --concurrency
  5. Number of child processes processing the queue.
  6. .. cmdoption:: -f, --logfile
  7. Path to log file. If no logfile is specified, ``stderr`` is used.
  8. .. cmdoption:: -l, --loglevel
  9. Logging level, choose between ``DEBUG``, ``INFO``, ``WARNING``,
  10. ``ERROR``, ``CRITICAL``, or ``FATAL``.
  11. .. cmdoption:: -p, --pidfile
  12. Path to pidfile.
  13. .. cmdoption:: -d, --detach, --daemon
  14. Run in the background as a daemon.
  15. .. cmdoption:: --discard
  16. Discard all waiting tasks before the daemon is started.
  17. **WARNING**: This is unrecoverable, and the tasks will be
  18. deleted from the messaging server.
  19. .. cmdoption:: -u, --uid
  20. User-id to run ``celeryd`` as when in daemon mode.
  21. .. cmdoption:: -g, --gid
  22. Group-id to run ``celeryd`` as when in daemon mode.
  23. .. cmdoption:: --umask
  24. umask of the process when in daemon mode.
  25. .. cmdoption:: --workdir
  26. Directory to change to when in daemon mode.
  27. .. cmdoption:: --chroot
  28. Change root directory to this path when in daemon mode.
  29. """
  30. import os
  31. import sys
  32. sys.path.append(os.getcwd())
  33. django_project_dir = os.environ.get("DJANGO_PROJECT_DIR")
  34. if django_project_dir:
  35. sys.path.append(django_project_dir)
  36. from django.conf import settings
  37. from celery.log import emergency_error
  38. from celery.conf import LOG_LEVELS, DAEMON_LOG_FILE, DAEMON_LOG_LEVEL
  39. from celery.conf import DAEMON_CONCURRENCY, DAEMON_PID_FILE
  40. from celery import conf
  41. from celery import discovery
  42. from celery.task import discard_all
  43. from celery.worker import WorkController
  44. import traceback
  45. import optparse
  46. import atexit
  47. from daemon import DaemonContext
  48. from daemon.pidlockfile import PIDLockFile
  49. import errno
  50. STARTUP_INFO_FMT = """
  51. * Celery loading with the following configuration
  52. * Broker -> amqp://%(vhost)s@%(host)s:%(port)s
  53. * Exchange -> %(exchange)s (%(exchange_type)s)
  54. * Consumer -> Queue:%(consumer_queue)s Routing:%(consumer_rkey)s
  55. * Concurrency:%(concurrency)s
  56. """.strip()
  57. def acquire_pidlock(pidfile):
  58. """Get the :class:`daemon.pidlockfile.PIDLockFile` handler for
  59. ``pidfile``.
  60. If the ``pidfile`` already exists, but the process is not running the
  61. ``pidfile`` will be removed, a ``"stale pidfile"`` message is emitted
  62. and execution continues as normally. However, if the process is still
  63. running the program will exit complaning that the program is already
  64. running in the background somewhere.
  65. """
  66. pidlock = PIDLockFile(pidfile)
  67. if not pidlock.is_locked():
  68. return pidlock
  69. pid = pidlock.read_pid()
  70. try:
  71. os.kill(pid, 0)
  72. except os.error, exc:
  73. if exc.errno == errno.ESRCH:
  74. sys.stderr.write("Stale pidfile exists. Removing it.\n")
  75. pidlock.release()
  76. return
  77. else:
  78. raise SystemExit(
  79. "ERROR: Pidfile (%s) already exists.\n"
  80. "Seems celeryd is already running? (PID: %d)" % (
  81. pidfile, pid))
  82. return pidlock
  83. def run_worker(concurrency=DAEMON_CONCURRENCY, daemon=False,
  84. loglevel=DAEMON_LOG_LEVEL, logfile=DAEMON_LOG_FILE, discard=False,
  85. pidfile=DAEMON_PID_FILE, umask=0, uid=None, gid=None,
  86. working_directory=None, chroot=None, **kwargs):
  87. """Run the celery daemon."""
  88. if settings.DATABASE_ENGINE == "sqlite3" and concurrency > 1:
  89. import warnings
  90. warnings.warn("The sqlite3 database engine doesn't support "
  91. "concurrency. We'll be using a single process only.",
  92. UserWarning)
  93. concurrency = 1
  94. if not isinstance(loglevel, int):
  95. loglevel = LOG_LEVELS[loglevel.upper()]
  96. if discard:
  97. discarded_count = discard_all()
  98. what = discard_count > 1 and "messages" or "message"
  99. sys.stderr.write("Discard: Erased %d %s from the queue.\n" % (
  100. discarded_count, what))
  101. startup_info = STARTUP_INFO_FMT % {
  102. "vhost": settings.AMQP_VHOST,
  103. "host": settings.AMQP_SERVER,
  104. "port": settings.AMQP_PORT,
  105. "exchange": conf.AMQP_EXCHANGE,
  106. "exchange_type": conf.AMQP_EXCHANGE_TYPE,
  107. "consumer_queue": conf.AMQP_CONSUMER_QUEUE,
  108. "consumer_rkey": conf.AMQP_CONSUMER_ROUTING_KEY,
  109. "publisher_rkey": conf.AMQP_PUBLISHER_ROUTING_KEY,
  110. "concurrency": concurrency,
  111. "loglevel": loglevel,
  112. "pidfile": pidfile,
  113. }
  114. sys.stderr.write(startup_info + "\n")
  115. if daemon:
  116. # Since without stderr any errors will be silently suppressed,
  117. # we need to know that we have access to the logfile
  118. pidlock = acquire_pidlock(pidfile)
  119. if not umask:
  120. umask = 0
  121. if logfile:
  122. open(logfile, "a").close()
  123. uid = uid and int(uid) or os.geteuid()
  124. gid = gid and int(gid) or os.getegid()
  125. working_directory = working_directory or os.getcwd()
  126. sys.stderr.write("* Launching celeryd in the background...\n")
  127. context = DaemonContext(chroot_directory=chroot,
  128. working_directory=working_directory,
  129. umask=umask,
  130. pidfile=pidlock,
  131. uid=uid,
  132. gid=gid)
  133. context.open()
  134. else:
  135. logfile = None # log to stderr when not running as daemon.
  136. discovery.autodiscover()
  137. celeryd = WorkController(concurrency=concurrency,
  138. loglevel=loglevel,
  139. logfile=logfile,
  140. is_detached=daemon)
  141. try:
  142. celeryd.run()
  143. except Exception, e:
  144. emergency_error(logfile, "celeryd raised exception %s: %s\n%s" % (
  145. e.__class__, e, traceback.format_exc()))
  146. except:
  147. if daemon:
  148. context.close()
  149. raise
  150. OPTION_LIST = (
  151. optparse.make_option('-c', '--concurrency', default=DAEMON_CONCURRENCY,
  152. action="store", dest="concurrency", type="int",
  153. help="Number of child processes processing the queue."),
  154. optparse.make_option('--discard', default=False,
  155. action="store_true", dest="discard",
  156. help="Discard all waiting tasks before the daemon is started. "
  157. "WARNING: This is unrecoverable, and the tasks will be "
  158. "deleted from the messaging server."),
  159. optparse.make_option('-f', '--logfile', default=DAEMON_LOG_FILE,
  160. action="store", dest="logfile",
  161. help="Path to log file."),
  162. optparse.make_option('-l', '--loglevel', default=DAEMON_LOG_LEVEL,
  163. action="store", dest="loglevel",
  164. help="Choose between DEBUG/INFO/WARNING/ERROR/CRITICAL/FATAL."),
  165. optparse.make_option('-p', '--pidfile', default=DAEMON_PID_FILE,
  166. action="store", dest="pidfile",
  167. help="Path to pidfile."),
  168. optparse.make_option('-d', '--detach', '--daemon', default=False,
  169. action="store_true", dest="daemon",
  170. help="Run in the background as a daemon."),
  171. optparse.make_option('-u', '--uid', default=None,
  172. action="store", dest="uid",
  173. help="User-id to run celeryd as when in daemon mode."),
  174. optparse.make_option('-g', '--gid', default=None,
  175. action="store", dest="gid",
  176. help="Group-id to run celeryd as when in daemon mode."),
  177. optparse.make_option('--umask', default=0,
  178. action="store", type="int", dest="umask",
  179. help="umask of the process when in daemon mode."),
  180. optparse.make_option('--workdir', default=None,
  181. action="store", dest="working_directory",
  182. help="Directory to change to when in daemon mode."),
  183. optparse.make_option('--chroot', default=None,
  184. action="store", dest="chroot",
  185. help="Change root directory to this path when in daemon mode."),
  186. )
  187. def parse_options(arguments):
  188. """Parse the available options to ``celeryd``."""
  189. parser = optparse.OptionParser(option_list=OPTION_LIST)
  190. options, values = parser.parse_args(arguments)
  191. return options
  192. if __name__ == "__main__":
  193. options = parse_options(sys.argv[1:])
  194. run_worker(**options)