celeryd.py 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225
  1. #!/usr/bin/env python
  2. """celeryd
  3. .. program:: celeryd
  4. .. cmdoption:: -c, --concurrency
  5. Number of child processes processing the queue. The default
  6. is the number of CPUs available on your system.
  7. .. cmdoption:: -f, --logfile
  8. Path to log file. If no logfile is specified, ``stderr`` is used.
  9. .. cmdoption:: -l, --loglevel
  10. Logging level, choose between ``DEBUG``, ``INFO``, ``WARNING``,
  11. ``ERROR``, ``CRITICAL``, or ``FATAL``.
  12. .. cmdoption:: -n, --hostname
  13. Set custom hostname.
  14. .. cmdoption:: -B, --beat
  15. Also run the ``celerybeat`` periodic task scheduler. Please note that
  16. there must only be one instance of this service.
  17. .. cmdoption:: -E, --events
  18. Send events that can be captured by monitors like ``celerymon``.
  19. .. cmdoption:: --discard
  20. Discard all waiting tasks before the daemon is started.
  21. **WARNING**: This is unrecoverable, and the tasks will be
  22. deleted from the messaging server.
  23. """
  24. import os
  25. import sys
  26. import socket
  27. import logging
  28. import optparse
  29. import traceback
  30. import multiprocessing
  31. import celery
  32. from celery import conf
  33. from celery import platform
  34. from celery.log import emergency_error
  35. from celery.task import discard_all
  36. from celery.utils import info
  37. from celery.worker import WorkController
  38. STARTUP_INFO_FMT = """
  39. Configuration ->
  40. . broker -> %(conninfo)s
  41. . queues ->
  42. %(queues)s
  43. . concurrency -> %(concurrency)s
  44. . loader -> %(loader)s
  45. . logfile -> %(logfile)s@%(loglevel)s
  46. . events -> %(events)s
  47. . beat -> %(celerybeat)s
  48. %(tasks)s
  49. """.strip()
  50. TASK_LIST_FMT = """ . tasks ->\n%s"""
  51. OPTION_LIST = (
  52. optparse.make_option('-c', '--concurrency',
  53. default=conf.CELERYD_CONCURRENCY,
  54. action="store", dest="concurrency", type="int",
  55. help="Number of child processes processing the queue."),
  56. optparse.make_option('--discard', default=False,
  57. action="store_true", dest="discard",
  58. help="Discard all waiting tasks before the server is started. "
  59. "WARNING: This is unrecoverable, and the tasks will be "
  60. "deleted from the messaging server."),
  61. optparse.make_option('-f', '--logfile', default=conf.CELERYD_LOG_FILE,
  62. action="store", dest="logfile",
  63. help="Path to log file."),
  64. optparse.make_option('-l', '--loglevel', default=conf.CELERYD_LOG_LEVEL,
  65. action="store", dest="loglevel",
  66. help="Choose between DEBUG/INFO/WARNING/ERROR/CRITICAL/FATAL."),
  67. optparse.make_option('-n', '--hostname', default=None,
  68. action="store", dest="hostname",
  69. help="Set custom host name. E.g. 'foo.example.com'."),
  70. optparse.make_option('-B', '--beat', default=False,
  71. action="store_true", dest="run_clockservice",
  72. help="Also run the celerybeat periodic task scheduler. \
  73. Please note that only one instance must be running."),
  74. optparse.make_option('-E', '--events', default=conf.SEND_EVENTS,
  75. action="store_true", dest="events",
  76. help="Send events so celery can be monitored by e.g. celerymon."),
  77. )
  78. def run_worker(concurrency=conf.CELERYD_CONCURRENCY,
  79. loglevel=conf.CELERYD_LOG_LEVEL, logfile=conf.CELERYD_LOG_FILE,
  80. hostname=None,
  81. discard=False, run_clockservice=False, events=False, **kwargs):
  82. """Starts the celery worker server."""
  83. hostname = hostname or socket.gethostname()
  84. print("celery@%s v%s is starting." % (hostname, celery.__version__))
  85. from celery.loaders import current_loader, load_settings
  86. loader = current_loader()
  87. settings = load_settings()
  88. if not concurrency:
  89. concurrency = multiprocessing.cpu_count()
  90. if conf.CELERY_BACKEND == "database" \
  91. and settings.DATABASE_ENGINE == "sqlite3" and \
  92. concurrency > 1:
  93. import warnings
  94. warnings.warn("The sqlite3 database engine doesn't support "
  95. "concurrency. We'll be using a single process only.",
  96. UserWarning)
  97. concurrency = 1
  98. # Setup logging
  99. if not isinstance(loglevel, int):
  100. loglevel = conf.LOG_LEVELS[loglevel.upper()]
  101. if discard:
  102. discarded_count = discard_all()
  103. what = discarded_count > 1 and "messages" or "message"
  104. print("discard: Erased %d %s from the queue.\n" % (
  105. discarded_count, what))
  106. # Run the worker init handler.
  107. # (Usually imports task modules and such.)
  108. loader.on_worker_init()
  109. # Dump configuration to screen so we have some basic information
  110. # when users sends e-mails.
  111. tasklist = ""
  112. if loglevel <= logging.INFO:
  113. from celery.registry import tasks
  114. tasklist = tasks.keys()
  115. if not loglevel <= logging.DEBUG:
  116. tasklist = filter(lambda s: not s.startswith("celery."), tasklist)
  117. tasklist = TASK_LIST_FMT % "\n".join(" . %s" % task
  118. for task in sorted(tasklist))
  119. print(STARTUP_INFO_FMT % {
  120. "conninfo": info.format_broker_info(),
  121. "queues": info.format_routing_table(indent=8),
  122. "concurrency": concurrency,
  123. "loglevel": conf.LOG_LEVELS[loglevel],
  124. "logfile": logfile or "[stderr]",
  125. "celerybeat": run_clockservice and "ON" or "OFF",
  126. "events": events and "ON" or "OFF",
  127. "tasks": tasklist,
  128. "loader": loader.__class__.__module__,
  129. })
  130. print("Celery has started.")
  131. set_process_status("Running...")
  132. def run_worker():
  133. worker = WorkController(concurrency=concurrency,
  134. loglevel=loglevel,
  135. logfile=logfile,
  136. hostname=hostname,
  137. embed_clockservice=run_clockservice,
  138. send_events=events)
  139. # Install signal handler so SIGHUP restarts the worker.
  140. install_worker_restart_handler(worker)
  141. from celery import signals
  142. signals.worker_init.send(sender=worker)
  143. try:
  144. worker.start()
  145. except Exception, e:
  146. emergency_error(logfile, "celeryd raised exception %s: %s\n%s" % (
  147. e.__class__, e, traceback.format_exc()))
  148. try:
  149. run_worker()
  150. except:
  151. set_process_status("Exiting...")
  152. raise
  153. def install_worker_restart_handler(worker):
  154. def restart_worker_sig_handler(signum, frame):
  155. """Signal handler restarting the current python program."""
  156. worker.logger.warn("Restarting celeryd (%s)" % (
  157. " ".join(sys.argv)))
  158. worker.stop()
  159. os.execv(sys.executable, [sys.executable] + sys.argv)
  160. platform.install_signal_handler("SIGHUP", restart_worker_sig_handler)
  161. def parse_options(arguments):
  162. """Parse the available options to ``celeryd``."""
  163. parser = optparse.OptionParser(option_list=OPTION_LIST)
  164. options, values = parser.parse_args(arguments)
  165. return options
  166. def set_process_status(info):
  167. arg_start = "manage" in sys.argv[0] and 2 or 1
  168. if sys.argv[arg_start:]:
  169. info = "%s (%s)" % (info, " ".join(sys.argv[arg_start:]))
  170. platform.set_mp_process_title("celeryd", info=info)
  171. def main():
  172. options = parse_options(sys.argv[1:])
  173. run_worker(**vars(options))
  174. if __name__ == "__main__":
  175. main()