worker.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299
  1. import atexit
  2. import logging
  3. import multiprocessing
  4. import os
  5. import socket
  6. import sys
  7. from celery import __version__
  8. from celery import platforms
  9. from celery import signals
  10. from celery.app import app_or_default
  11. from celery.exceptions import ImproperlyConfigured
  12. from celery.utils import get_full_cls_name, LOG_LEVELS
  13. from celery.utils import term
  14. from celery.worker import WorkController
  15. STARTUP_INFO_FMT = """
  16. Configuration ->
  17. . broker -> %(conninfo)s
  18. . queues ->
  19. %(queues)s
  20. . concurrency -> %(concurrency)s
  21. . loader -> %(loader)s
  22. . logfile -> %(logfile)s@%(loglevel)s
  23. . events -> %(events)s
  24. . beat -> %(celerybeat)s
  25. %(tasks)s
  26. """.strip()
  27. TASK_LIST_FMT = """ . tasks ->\n%s"""
  28. class Worker(object):
  29. WorkController = WorkController
  30. def __init__(self, concurrency=None, loglevel=None, logfile=None,
  31. hostname=None, discard=False, run_clockservice=False,
  32. schedule=None, task_time_limit=None, task_soft_time_limit=None,
  33. max_tasks_per_child=None, queues=None, events=False, db=None,
  34. include=None, app=None, pidfile=None,
  35. redirect_stdouts=None, redirect_stdouts_level=None, **kwargs):
  36. self.app = app = app_or_default(app)
  37. self.concurrency = (concurrency or
  38. app.conf.CELERYD_CONCURRENCY or
  39. multiprocessing.cpu_count())
  40. self.loglevel = loglevel or app.conf.CELERYD_LOG_LEVEL
  41. self.logfile = logfile or app.conf.CELERYD_LOG_FILE
  42. self.hostname = hostname or socket.gethostname()
  43. self.discard = discard
  44. self.run_clockservice = run_clockservice
  45. self.schedule = schedule or app.conf.CELERYBEAT_SCHEDULE_FILENAME
  46. self.events = events
  47. self.task_time_limit = (task_time_limit or
  48. app.conf.CELERYD_TASK_TIME_LIMIT)
  49. self.task_soft_time_limit = (task_soft_time_limit or
  50. app.conf.CELERYD_TASK_SOFT_TIME_LIMIT)
  51. self.max_tasks_per_child = (max_tasks_per_child or
  52. app.conf.CELERYD_MAX_TASKS_PER_CHILD)
  53. self.redirect_stdouts = (redirect_stdouts or
  54. app.conf.CELERY_REDIRECT_STDOUTS)
  55. self.redirect_stdouts_level = (redirect_stdouts_level or
  56. app.conf.CELERY_REDIRECT_STDOUTS_LEVEL)
  57. self.db = db
  58. self.use_queues = queues or []
  59. self.queues = None
  60. self.include = include or []
  61. self.pidfile = pidfile
  62. self._isatty = sys.stdout.isatty()
  63. self.colored = term.colored(enabled=defaults.CELERYD_LOG_COLOR)
  64. if isinstance(self.use_queues, basestring):
  65. self.use_queues = self.use_queues.split(",")
  66. if isinstance(self.include, basestring):
  67. self.include = self.include.split(",")
  68. if not isinstance(self.loglevel, int):
  69. try:
  70. self.loglevel = LOG_LEVELS[self.loglevel.upper()]
  71. except KeyError:
  72. self.die("Unknown level %r. Please use one of %s." % (
  73. self.loglevel,
  74. "|".join(l for l in LOG_LEVELS.keys()
  75. if isinstance(l, basestring))))
  76. def run(self):
  77. self.init_loader()
  78. self.init_queues()
  79. self.worker_init()
  80. self.redirect_stdouts_to_logger()
  81. print(str(self.colored.cyan(
  82. "celery@%s v%s is starting." % (self.hostname, __version__))))
  83. if getattr(os, "geteuid", None) and os.geteuid() == 0:
  84. warnings.warn(
  85. "Running celeryd with superuser privileges is not encouraged!")
  86. if self.discard:
  87. self.purge_messages()
  88. # Dump configuration to screen so we have some basic information
  89. # for when users sends bug reports.
  90. print(str(self.colored.reset(" \n", self.startup_info())))
  91. self.set_process_status("Running...")
  92. self.run_worker()
  93. def on_listener_ready(self, listener):
  94. signals.worker_ready.send(sender=listener)
  95. print("celery@%s has started." % self.hostname)
  96. def init_queues(self):
  97. if self.use_queues:
  98. create_missing = self.app.conf.CELERY_CREATE_MISSING_QUEUES
  99. try:
  100. self.app.amqp.queues.select_subset(self.use_queues,
  101. create_missing)
  102. except KeyError, exc:
  103. raise ImproperlyConfigured(
  104. "Trying to select queue subset of %r, but queue %s"
  105. "is not defined in CELERY_QUEUES. If you want to "
  106. "automatically declare unknown queues you have to "
  107. "enable CELERY_CREATE_MISSING_QUEUES" % (
  108. self.use_queues, exc))
  109. self.queues = self.app.amqp.queues
  110. def init_loader(self):
  111. self.loader = self.app.loader
  112. self.settings = self.app.conf
  113. map(self.loader.import_module, self.include)
  114. def redirect_stdouts_to_logger(self):
  115. handled = self.app.log.setup_logging_subsystem(loglevel=self.loglevel,
  116. logfile=self.logfile)
  117. if not handled:
  118. logger = self.app.log.get_default_logger()
  119. if self.redirect_stdouts:
  120. self.app.log.redirect_stdouts_to_logger(logger,
  121. loglevel=self.redirect_stdouts_level)
  122. def purge_messages(self):
  123. count = self.app.control.discard_all()
  124. what = (not count or count > 1) and "messages" or "message"
  125. print("discard: Erased %d %s from the queue.\n" % (count, what))
  126. def worker_init(self):
  127. # Run the worker init handler.
  128. # (Usually imports task modules and such.)
  129. self.loader.init_worker()
  130. def tasklist(self, include_builtins=True):
  131. from celery.registry import tasks
  132. tasklist = tasks.keys()
  133. if not include_builtins:
  134. tasklist = filter(lambda s: not s.startswith("celery."),
  135. tasklist)
  136. return TASK_LIST_FMT % "\n".join("\t. %s" % task
  137. for task in sorted(tasklist))
  138. def startup_info(self):
  139. tasklist = ""
  140. if self.loglevel <= logging.INFO:
  141. include_builtins = self.loglevel <= logging.DEBUG
  142. tasklist = self.tasklist(include_builtins=include_builtins)
  143. return STARTUP_INFO_FMT % {
  144. "conninfo": self.app.amqp.format_broker_info(),
  145. "queues": self.queues.format(indent=8),
  146. "concurrency": self.concurrency,
  147. "loglevel": LOG_LEVELS[self.loglevel],
  148. "logfile": self.logfile or "[stderr]",
  149. "celerybeat": self.run_clockservice and "ON" or "OFF",
  150. "events": self.events and "ON" or "OFF",
  151. "tasks": tasklist,
  152. "loader": get_full_cls_name(self.loader.__class__),
  153. }
  154. def run_worker(self):
  155. if self.pidfile:
  156. pidlock = platforms.create_pidlock(self.pidfile).acquire()
  157. atexit.register(pidlock.release)
  158. worker = self.WorkController(app=self.app,
  159. concurrency=self.concurrency,
  160. loglevel=self.loglevel,
  161. logfile=self.logfile,
  162. hostname=self.hostname,
  163. ready_callback=self.on_listener_ready,
  164. embed_clockservice=self.run_clockservice,
  165. schedule_filename=self.schedule,
  166. send_events=self.events,
  167. db=self.db,
  168. queues=self.queues,
  169. max_tasks_per_child=self.max_tasks_per_child,
  170. task_time_limit=self.task_time_limit,
  171. task_soft_time_limit=self.task_soft_time_limit)
  172. self.install_platform_tweaks(worker)
  173. worker.start()
  174. def install_platform_tweaks(self, worker):
  175. """Install platform specific tweaks and workarounds."""
  176. if self.app.IS_OSX:
  177. self.osx_proxy_detection_workaround()
  178. # Install signal handler so SIGHUP restarts the worker.
  179. if not self._isatty:
  180. # only install HUP handler if detached from terminal,
  181. # so closing the terminal window doesn't restart celeryd
  182. # into the background.
  183. if self.app.IS_OSX:
  184. # OS X can't exec from a process using threads.
  185. # See http://github.com/ask/celery/issues#issue/152
  186. install_HUP_not_supported_handler(worker)
  187. else:
  188. install_worker_restart_handler(worker)
  189. install_worker_term_handler(worker)
  190. install_worker_int_handler(worker)
  191. signals.worker_init.send(sender=worker)
  192. def osx_proxy_detection_workaround(self):
  193. """See http://github.com/ask/celery/issues#issue/161"""
  194. os.environ.setdefault("celery_dummy_proxy", "set_by_celeryd")
  195. def set_process_status(self, info):
  196. info = "%s (%s)" % (info, platforms.strargv(sys.argv))
  197. return platforms.set_mp_process_title("celeryd",
  198. info=info,
  199. hostname=self.hostname)
  200. def die(self, msg, exitcode=1):
  201. sys.stderr.write("Error: %s\n" % (msg, ))
  202. sys.exit(exitcode)
  203. def install_worker_int_handler(worker):
  204. def _stop(signum, frame):
  205. process_name = multiprocessing.current_process().name
  206. if process_name == "MainProcess":
  207. worker.logger.warn(
  208. "celeryd: Hitting Ctrl+C again will terminate "
  209. "all running tasks!")
  210. install_worker_int_again_handler(worker)
  211. worker.logger.warn("celeryd: Warm shutdown (%s)" % (
  212. process_name))
  213. worker.stop()
  214. raise SystemExit()
  215. platforms.install_signal_handler("SIGINT", _stop)
  216. def install_worker_int_again_handler(worker):
  217. def _stop(signum, frame):
  218. process_name = multiprocessing.current_process().name
  219. if process_name == "MainProcess":
  220. worker.logger.warn("celeryd: Cold shutdown (%s)" % (
  221. process_name))
  222. worker.terminate()
  223. raise SystemExit()
  224. platforms.install_signal_handler("SIGINT", _stop)
  225. def install_worker_term_handler(worker):
  226. def _stop(signum, frame):
  227. process_name = multiprocessing.current_process().name
  228. if process_name == "MainProcess":
  229. worker.logger.warn("celeryd: Warm shutdown (%s)" % (
  230. process_name))
  231. worker.stop()
  232. raise SystemExit()
  233. platforms.install_signal_handler("SIGTERM", _stop)
  234. def install_worker_restart_handler(worker):
  235. def restart_worker_sig_handler(signum, frame):
  236. """Signal handler restarting the current python program."""
  237. worker.logger.warn("Restarting celeryd (%s)" % (
  238. " ".join(sys.argv)))
  239. worker.stop()
  240. os.execv(sys.executable, [sys.executable] + sys.argv)
  241. platforms.install_signal_handler("SIGHUP", restart_worker_sig_handler)
  242. def install_HUP_not_supported_handler(worker):
  243. def warn_on_HUP_handler(signum, frame):
  244. worker.logger.error("SIGHUP not supported: "
  245. "Restarting with HUP is unstable on this platform!")
  246. platforms.install_signal_handler("SIGHUP", warn_on_HUP_handler)
  247. def run_worker(*args, **kwargs):
  248. return Worker(*args, **kwargs).run()