celeryd.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352
  1. #!/usr/bin/env python
  2. """celeryd
  3. .. program:: celeryd
  4. .. cmdoption:: -c, --concurrency
  5. Number of child processes processing the queue. The default
  6. is the number of CPUs available on your system.
  7. .. cmdoption:: -f, --logfile
  8. Path to log file. If no logfile is specified, ``stderr`` is used.
  9. .. cmdoption:: -l, --loglevel
  10. Logging level, choose between ``DEBUG``, ``INFO``, ``WARNING``,
  11. ``ERROR``, ``CRITICAL``, or ``FATAL``.
  12. .. cmdoption:: -n, --hostname
  13. Set custom hostname.
  14. .. cmdoption:: -B, --beat
  15. Also run the ``celerybeat`` periodic task scheduler. Please note that
  16. there must only be one instance of this service.
  17. .. cmdoption:: -Q, queues
  18. List of queues to enable for this worker separated by comma.
  19. By default all configured queues are enabled.
  20. Example: ``-Q video,image``
  21. .. cmdoption:: -s, --schedule
  22. Path to the schedule database if running with the ``-B`` option.
  23. Defaults to ``celerybeat-schedule``. The extension ".db" will be
  24. appended to the filename.
  25. .. cmdoption:: -E, --events
  26. Send events that can be captured by monitors like ``celerymon``.
  27. .. cmdoption:: --discard
  28. Discard all waiting tasks before the daemon is started.
  29. **WARNING**: This is unrecoverable, and the tasks will be
  30. deleted from the messaging server.
  31. .. cmdoption:: --time-limit
  32. Enables a hard time limit (in seconds) for tasks.
  33. .. cmdoption:: --soft-time-limit
  34. Enables a soft time limit (in seconds) for tasks.
  35. .. cmdoption:: --maxtasksperchild
  36. Maximum number of tasks a pool worker can execute before it's
  37. terminated and replaced by a new worker.
  38. """
  39. import os
  40. import sys
  41. import socket
  42. import logging
  43. import optparse
  44. import warnings
  45. import traceback
  46. import multiprocessing
  47. import celery
  48. from celery import conf
  49. from celery import signals
  50. from celery import platform
  51. from celery.log import emergency_error
  52. from celery.task import discard_all
  53. from celery.utils import info
  54. from celery.utils import get_full_cls_name
  55. from celery.worker import WorkController
  56. STARTUP_INFO_FMT = """
  57. Configuration ->
  58. . broker -> %(conninfo)s
  59. . queues ->
  60. %(queues)s
  61. . concurrency -> %(concurrency)s
  62. . loader -> %(loader)s
  63. . logfile -> %(logfile)s@%(loglevel)s
  64. . events -> %(events)s
  65. . beat -> %(celerybeat)s
  66. %(tasks)s
  67. """.strip()
  68. TASK_LIST_FMT = """ . tasks ->\n%s"""
  69. OPTION_LIST = (
  70. optparse.make_option('-c', '--concurrency',
  71. default=conf.CELERYD_CONCURRENCY,
  72. action="store", dest="concurrency", type="int",
  73. help="Number of child processes processing the queue."),
  74. optparse.make_option('--discard', default=False,
  75. action="store_true", dest="discard",
  76. help="Discard all waiting tasks before the server is started. "
  77. "WARNING: This is unrecoverable, and the tasks will be "
  78. "deleted from the messaging server."),
  79. optparse.make_option('-f', '--logfile', default=conf.CELERYD_LOG_FILE,
  80. action="store", dest="logfile",
  81. help="Path to log file."),
  82. optparse.make_option('-l', '--loglevel', default=conf.CELERYD_LOG_LEVEL,
  83. action="store", dest="loglevel",
  84. help="Choose between DEBUG/INFO/WARNING/ERROR/CRITICAL/FATAL."),
  85. optparse.make_option('-n', '--hostname', default=None,
  86. action="store", dest="hostname",
  87. help="Set custom host name. E.g. 'foo.example.com'."),
  88. optparse.make_option('-B', '--beat', default=False,
  89. action="store_true", dest="run_clockservice",
  90. help="Also run the celerybeat periodic task scheduler. \
  91. Please note that only one instance must be running."),
  92. optparse.make_option('-s', '--schedule',
  93. default=conf.CELERYBEAT_SCHEDULE_FILENAME,
  94. action="store", dest="schedule",
  95. help="Path to the schedule database if running with the -B \
  96. option. The extension '.db' will be appended to the \
  97. filename. Default: %s" % (
  98. conf.CELERYBEAT_SCHEDULE_FILENAME)),
  99. optparse.make_option('-E', '--events', default=conf.SEND_EVENTS,
  100. action="store_true", dest="events",
  101. help="Send events so celery can be monitored by e.g. celerymon."),
  102. optparse.make_option('--time-limit',
  103. default=conf.CELERYD_TASK_TIME_LIMIT,
  104. action="store", type="int", dest="task_time_limit",
  105. help="Enables a hard time limit (in seconds) for tasks."),
  106. optparse.make_option('--soft-time-limit',
  107. default=conf.CELERYD_TASK_SOFT_TIME_LIMIT,
  108. action="store", type="int", dest="task_soft_time_limit",
  109. help="Enables a soft time limit (in seconds) for tasks."),
  110. optparse.make_option('--maxtasksperchild',
  111. default=conf.CELERYD_MAX_TASKS_PER_CHILD,
  112. action="store", type="int", dest="max_tasks_per_child",
  113. help="Maximum number of tasks a pool worker can execute"
  114. "before it's terminated and replaced by a new worker."),
  115. optparse.make_option('--queues', '-Q', default=[],
  116. action="store", dest="queues",
  117. help="Comma separated list of queues to enable for this worker. "
  118. "By default all configured queues are enabled. "
  119. "Example: -Q video,image"),
  120. )
  121. class Worker(object):
  122. def __init__(self, concurrency=conf.CELERYD_CONCURRENCY,
  123. loglevel=conf.CELERYD_LOG_LEVEL, logfile=conf.CELERYD_LOG_FILE,
  124. hostname=None, discard=False, run_clockservice=False,
  125. schedule=conf.CELERYBEAT_SCHEDULE_FILENAME,
  126. task_time_limit=conf.CELERYD_TASK_TIME_LIMIT,
  127. task_soft_time_limit=conf.CELERYD_TASK_SOFT_TIME_LIMIT,
  128. max_tasks_per_child=conf.CELERYD_MAX_TASKS_PER_CHILD,
  129. queues=None, events=False, **kwargs):
  130. self.concurrency = concurrency or multiprocessing.cpu_count()
  131. self.loglevel = loglevel
  132. self.logfile = logfile
  133. self.hostname = hostname or socket.gethostname()
  134. self.discard = discard
  135. self.run_clockservice = run_clockservice
  136. self.schedule = schedule
  137. self.events = events
  138. self.task_time_limit = task_time_limit
  139. self.task_soft_time_limit = task_soft_time_limit
  140. self.max_tasks_per_child = max_tasks_per_child
  141. self.queues = queues or []
  142. if isinstance(self.queues, basestring):
  143. self.queues = self.queues.split(",")
  144. if not isinstance(self.loglevel, int):
  145. self.loglevel = conf.LOG_LEVELS[self.loglevel.upper()]
  146. def run(self):
  147. print("celery@%s v%s is starting." % (self.hostname,
  148. celery.__version__))
  149. self.init_loader()
  150. self.init_queues()
  151. if conf.RESULT_BACKEND == "database" \
  152. and self.settings.DATABASE_ENGINE == "sqlite3" and \
  153. self.concurrency > 1:
  154. warnings.warn("The sqlite3 database engine doesn't handle "
  155. "concurrency well. Will use a single process only.",
  156. UserWarning)
  157. self.concurrency = 1
  158. if getattr(self.settings, "DEBUG", False):
  159. warnings.warn("Using settings.DEBUG leads to a memory leak, "
  160. "never use this setting in a production environment!")
  161. if self.discard:
  162. self.purge_messages()
  163. self.worker_init()
  164. # Dump configuration to screen so we have some basic information
  165. # for when users sends bug reports.
  166. print(self.startup_info())
  167. set_process_status("Running...")
  168. self.run_worker()
  169. def on_listener_ready(self, listener):
  170. signals.worker_ready.send(sender=listener)
  171. print("celery@%s has started." % self.hostname)
  172. def init_queues(self):
  173. conf.QUEUES = dict((queue, options)
  174. for queue, options in conf.QUEUES.items()
  175. if queue in self.queues)
  176. def init_loader(self):
  177. from celery.loaders import current_loader, load_settings
  178. self.loader = current_loader()
  179. self.settings = load_settings()
  180. def purge_messages(self):
  181. discarded_count = discard_all()
  182. what = discarded_count > 1 and "messages" or "message"
  183. print("discard: Erased %d %s from the queue.\n" % (
  184. discarded_count, what))
  185. def worker_init(self):
  186. # Run the worker init handler.
  187. # (Usually imports task modules and such.)
  188. self.loader.init_worker()
  189. def tasklist(self, include_builtins=True):
  190. from celery.registry import tasks
  191. tasklist = tasks.keys()
  192. if not include_builtins:
  193. tasklist = filter(lambda s: not s.startswith("celery."),
  194. tasklist)
  195. return TASK_LIST_FMT % "\n".join("\t. %s" % task
  196. for task in sorted(tasklist))
  197. def startup_info(self):
  198. tasklist = ""
  199. if self.loglevel <= logging.INFO:
  200. include_builtins = self.loglevel <= logging.DEBUG
  201. tasklist = self.tasklist(include_builtins=include_builtins)
  202. return STARTUP_INFO_FMT % {
  203. "conninfo": info.format_broker_info(),
  204. "queues": info.format_routing_table(indent=8),
  205. "concurrency": self.concurrency,
  206. "loglevel": conf.LOG_LEVELS[self.loglevel],
  207. "logfile": self.logfile or "[stderr]",
  208. "celerybeat": self.run_clockservice and "ON" or "OFF",
  209. "events": self.events and "ON" or "OFF",
  210. "tasks": tasklist,
  211. "loader": get_full_cls_name(self.loader.__class__),
  212. }
  213. def run_worker(self):
  214. worker = WorkController(concurrency=self.concurrency,
  215. loglevel=self.loglevel,
  216. logfile=self.logfile,
  217. hostname=self.hostname,
  218. ready_callback=self.on_listener_ready,
  219. embed_clockservice=self.run_clockservice,
  220. schedule_filename=self.schedule,
  221. send_events=self.events,
  222. max_tasks_per_child=self.max_tasks_per_child,
  223. task_time_limit=self.task_time_limit,
  224. task_soft_time_limit=self.task_soft_time_limit)
  225. # Install signal handler so SIGHUP restarts the worker.
  226. install_worker_restart_handler(worker)
  227. install_worker_term_handler(worker)
  228. install_worker_int_handler(worker)
  229. signals.worker_init.send(sender=worker)
  230. try:
  231. worker.start()
  232. except Exception, exc:
  233. emergency_error(self.logfile,
  234. "celeryd raised exception %s: %s\n%s" % (
  235. exc.__class__, exc, traceback.format_exc()))
  236. def install_worker_int_handler(worker):
  237. def _stop(signum, frame):
  238. process_name = multiprocessing.current_process().name
  239. if process_name == "MainProcess":
  240. worker.logger.warn("celeryd: Cold shutdown (%s)" % (
  241. process_name))
  242. worker.terminate()
  243. raise SystemExit()
  244. platform.install_signal_handler("SIGINT", _stop)
  245. def install_worker_term_handler(worker):
  246. def _stop(signum, frame):
  247. process_name = multiprocessing.current_process().name
  248. if process_name == "MainProcess":
  249. worker.logger.warn("celeryd: Warm shutdown (%s)" % (
  250. process_name))
  251. worker.stop()
  252. raise SystemExit()
  253. platform.install_signal_handler("SIGTERM", _stop)
  254. def install_worker_restart_handler(worker):
  255. def restart_worker_sig_handler(signum, frame):
  256. """Signal handler restarting the current python program."""
  257. worker.logger.warn("Restarting celeryd (%s)" % (
  258. " ".join(sys.argv)))
  259. worker.stop()
  260. os.execv(sys.executable, [sys.executable] + sys.argv)
  261. platform.install_signal_handler("SIGHUP", restart_worker_sig_handler)
  262. def parse_options(arguments):
  263. """Parse the available options to ``celeryd``."""
  264. parser = optparse.OptionParser(option_list=OPTION_LIST)
  265. options, values = parser.parse_args(arguments)
  266. return options
  267. def set_process_status(info):
  268. arg_start = "manage" in sys.argv[0] and 2 or 1
  269. if sys.argv[arg_start:]:
  270. info = "%s (%s)" % (info, " ".join(sys.argv[arg_start:]))
  271. platform.set_mp_process_title("celeryd", info=info)
  272. def run_worker(**options):
  273. return Worker(**options).run()
  274. def main():
  275. options = parse_options(sys.argv[1:])
  276. return run_worker(**vars(options))
  277. if __name__ == "__main__":
  278. main()