celeryd.py 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212
  1. #!/usr/bin/env python
  2. """celeryd
  3. .. program:: celeryd
  4. .. cmdoption:: -c, --concurrency
  5. Number of child processes processing the queue. The default
  6. is the number of CPUs available on your system.
  7. .. cmdoption:: -f, --logfile
  8. Path to log file. If no logfile is specified, ``stderr`` is used.
  9. .. cmdoption:: -l, --loglevel
  10. Logging level, choose between ``DEBUG``, ``INFO``, ``WARNING``,
  11. ``ERROR``, ``CRITICAL``, or ``FATAL``.
  12. .. cmdoption:: -B, --beat
  13. Also run the ``celerybeat`` periodic task scheduler. Please note that
  14. there must only be one instance of this service.
  15. .. cmdoption:: -E, --events
  16. Send events that can be captured by monitors like ``celerymon``.
  17. .. cmdoption:: --discard
  18. Discard all waiting tasks before the daemon is started.
  19. **WARNING**: This is unrecoverable, and the tasks will be
  20. deleted from the messaging server.
  21. """
  22. import os
  23. import sys
  24. import logging
  25. import optparse
  26. import traceback
  27. import multiprocessing
  28. from celery import conf
  29. from celery import platform
  30. from celery import __version__
  31. from celery.log import emergency_error
  32. from celery.task import discard_all
  33. from celery.utils import noop
  34. from celery.utils import info
  35. from celery.worker import WorkController
  36. STARTUP_INFO_FMT = """
  37. Configuration ->
  38. . broker -> %(conninfo)s
  39. . queues ->
  40. %(queues)s
  41. . concurrency -> %(concurrency)s
  42. . loader -> %(loader)s
  43. . logfile -> %(logfile)s@%(loglevel)s
  44. . events -> %(events)s
  45. . beat -> %(celerybeat)s
  46. %(tasks)s
  47. """.strip()
  48. TASK_LIST_FMT = """ . tasks ->\n%s"""
  49. OPTION_LIST = (
  50. optparse.make_option('-c', '--concurrency',
  51. default=conf.CELERYD_CONCURRENCY,
  52. action="store", dest="concurrency", type="int",
  53. help="Number of child processes processing the queue."),
  54. optparse.make_option('--discard', default=False,
  55. action="store_true", dest="discard",
  56. help="Discard all waiting tasks before the server is started. "
  57. "WARNING: This is unrecoverable, and the tasks will be "
  58. "deleted from the messaging server."),
  59. optparse.make_option('-f', '--logfile', default=conf.CELERYD_LOG_FILE,
  60. action="store", dest="logfile",
  61. help="Path to log file."),
  62. optparse.make_option('-l', '--loglevel', default=conf.CELERYD_LOG_LEVEL,
  63. action="store", dest="loglevel",
  64. help="Choose between DEBUG/INFO/WARNING/ERROR/CRITICAL/FATAL."),
  65. optparse.make_option('-B', '--beat', default=False,
  66. action="store_true", dest="run_clockservice",
  67. help="Also run the celerybeat periodic task scheduler. \
  68. Please note that only one instance must be running."),
  69. optparse.make_option('-E', '--events', default=conf.SEND_EVENTS,
  70. action="store_true", dest="events",
  71. help="Send events so celery can be monitored by e.g. celerymon."),
  72. )
  73. def run_worker(concurrency=conf.CELERYD_CONCURRENCY,
  74. loglevel=conf.CELERYD_LOG_LEVEL, logfile=conf.CELERYD_LOG_FILE,
  75. discard=False, run_clockservice=False, events=False, **kwargs):
  76. """Starts the celery worker server."""
  77. print("Celery %s is starting." % __version__)
  78. from celery.loaders import Loader, current_loader, settings
  79. if not concurrency:
  80. concurrency = multiprocessing.cpu_count()
  81. if conf.CELERY_BACKEND == "database" \
  82. and settings.DATABASE_ENGINE == "sqlite3" and \
  83. concurrency > 1:
  84. import warnings
  85. warnings.warn("The sqlite3 database engine doesn't support "
  86. "concurrency. We'll be using a single process only.",
  87. UserWarning)
  88. concurrency = 1
  89. # Setup logging
  90. if not isinstance(loglevel, int):
  91. loglevel = conf.LOG_LEVELS[loglevel.upper()]
  92. if discard:
  93. discarded_count = discard_all()
  94. what = discarded_count > 1 and "messages" or "message"
  95. print("discard: Erased %d %s from the queue.\n" % (
  96. discarded_count, what))
  97. # Run the worker init handler.
  98. # (Usually imports task modules and such.)
  99. current_loader.on_worker_init()
  100. # Dump configuration to screen so we have some basic information
  101. # when users sends e-mails.
  102. tasklist = ""
  103. if loglevel <= logging.INFO:
  104. from celery.registry import tasks
  105. tasklist = tasks.keys()
  106. if not loglevel <= logging.DEBUG:
  107. tasklist = filter(lambda s: not s.startswith("celery."), tasklist)
  108. tasklist = TASK_LIST_FMT % "\n".join(" . %s" % task
  109. for task in sorted(tasklist))
  110. print(STARTUP_INFO_FMT % {
  111. "conninfo": info.format_broker_info(),
  112. "queues": info.format_routing_table(indent=8),
  113. "concurrency": concurrency,
  114. "loglevel": conf.LOG_LEVELS[loglevel],
  115. "logfile": logfile or "[stderr]",
  116. "celerybeat": run_clockservice and "ON" or "OFF",
  117. "events": events and "ON" or "OFF",
  118. "tasks": tasklist,
  119. "loader": Loader.__module__,
  120. })
  121. print("Celery has started.")
  122. set_process_status("Running...")
  123. def run_worker():
  124. worker = WorkController(concurrency=concurrency,
  125. loglevel=loglevel,
  126. logfile=logfile,
  127. embed_clockservice=run_clockservice,
  128. send_events=events)
  129. # Install signal handler so SIGHUP restarts the worker.
  130. install_worker_restart_handler(worker)
  131. from celery import signals
  132. signals.worker_init.send(sender=worker)
  133. try:
  134. worker.start()
  135. except Exception, e:
  136. emergency_error(logfile, "celeryd raised exception %s: %s\n%s" % (
  137. e.__class__, e, traceback.format_exc()))
  138. try:
  139. run_worker()
  140. except:
  141. set_process_status("Exiting...")
  142. raise
  143. def install_worker_restart_handler(worker):
  144. def restart_worker_sig_handler(signum, frame):
  145. """Signal handler restarting the current python program."""
  146. worker.logger.warn("Restarting celeryd (%s)" % (
  147. " ".join(sys.argv)))
  148. worker.stop()
  149. os.execv(sys.executable, [sys.executable] + sys.argv)
  150. platform.install_signal_handler("SIGHUP", restart_worker_sig_handler)
  151. def parse_options(arguments):
  152. """Parse the available options to ``celeryd``."""
  153. parser = optparse.OptionParser(option_list=OPTION_LIST)
  154. options, values = parser.parse_args(arguments)
  155. return options
  156. def set_process_status(info):
  157. arg_start = "manage" in sys.argv[0] and 2 or 1
  158. if sys.argv[arg_start:]:
  159. info = "%s (%s)" % (info, " ".join(sys.argv[arg_start:]))
  160. platform.set_mp_process_title("celeryd", info=info)
  161. def main():
  162. options = parse_options(sys.argv[1:])
  163. run_worker(**vars(options))
  164. if __name__ == "__main__":
  165. main()