worker.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361
  1. # -*- coding: utf-8 -*-
  2. from __future__ import absolute_import
  3. import atexit
  4. import logging
  5. try:
  6. import multiprocessing
  7. except ImportError:
  8. multiprocessing = None # noqa
  9. import os
  10. import socket
  11. import sys
  12. import warnings
  13. from .. import __version__, platforms, signals
  14. from ..app import app_or_default
  15. from ..app.abstract import configurated, from_config
  16. from ..exceptions import ImproperlyConfigured, SystemTerminate
  17. from ..utils import isatty, LOG_LEVELS, cry, qualname
  18. from ..utils.functional import maybe_list
  19. from ..worker import WorkController
  20. try:
  21. from greenlet import GreenletExit
  22. IGNORE_ERRORS = (GreenletExit, )
  23. except ImportError:
  24. IGNORE_ERRORS = ()
  25. BANNER = """
  26. -------------- celery@%(hostname)s v%(version)s
  27. ---- **** -----
  28. --- * *** * -- [Configuration]
  29. -- * - **** --- . broker: %(conninfo)s
  30. - ** ---------- . loader: %(loader)s
  31. - ** ---------- . logfile: %(logfile)s@%(loglevel)s
  32. - ** ---------- . concurrency: %(concurrency)s
  33. - ** ---------- . events: %(events)s
  34. - *** --- * --- . beat: %(celerybeat)s
  35. -- ******* ----
  36. --- ***** ----- [Queues]
  37. -------------- %(queues)s
  38. """
  39. EXTRA_INFO_FMT = """
  40. [Tasks]
  41. %(tasks)s
  42. """
  43. UNKNOWN_QUEUE_ERROR = """\
  44. Trying to select queue subset of %r, but queue %s is not
  45. defined in the CELERY_QUEUES setting.
  46. If you want to automatically declare unknown queues you can
  47. enable the CELERY_CREATE_MISSING_QUEUES setting.
  48. """
  49. def cpu_count():
  50. if multiprocessing is not None:
  51. try:
  52. return multiprocessing.cpu_count()
  53. except NotImplementedError:
  54. pass
  55. return 2
  56. def get_process_name():
  57. if multiprocessing is not None:
  58. return multiprocessing.current_process().name
  59. class Worker(configurated):
  60. WorkController = WorkController
  61. inherit_confopts = (WorkController, )
  62. loglevel = from_config("log_level")
  63. redirect_stdouts = from_config()
  64. redirect_stdouts_level = from_config()
  65. def __init__(self, hostname=None, discard=False, embed_clockservice=False,
  66. queues=None, include=None, app=None, pidfile=None,
  67. autoscale=None, autoreload=False, **kwargs):
  68. self.app = app = app_or_default(app)
  69. self.setup_defaults(kwargs, namespace="celeryd")
  70. if not self.concurrency:
  71. self.concurrency = cpu_count()
  72. self.hostname = hostname or socket.gethostname()
  73. self.discard = discard
  74. self.embed_clockservice = embed_clockservice
  75. if self.app.IS_WINDOWS and self.embed_clockservice:
  76. self.die("-B option does not work on Windows. "
  77. "Please run celerybeat as a separate service.")
  78. self.use_queues = [] if queues is None else queues
  79. self.queues = None
  80. self.include = [] if include is None else include
  81. self.pidfile = pidfile
  82. self.autoscale = None
  83. if autoscale:
  84. max_c, _, min_c = autoscale.partition(",")
  85. self.autoscale = [int(max_c), min_c and int(min_c) or 0]
  86. self._isatty = isatty(sys.stdout)
  87. self.colored = app.log.colored(self.logfile)
  88. if isinstance(self.use_queues, basestring):
  89. self.use_queues = self.use_queues.split(",")
  90. if isinstance(self.include, basestring):
  91. self.include = self.include.split(",")
  92. self.autoreload = autoreload
  93. if autoreload:
  94. imports = list(self.include)
  95. imports.extend(maybe_list(
  96. self.app.conf.get("CELERY_IMPORTS") or ()))
  97. self.autoreload = set(imports)
  98. if not isinstance(self.loglevel, int):
  99. try:
  100. self.loglevel = LOG_LEVELS[self.loglevel.upper()]
  101. except KeyError:
  102. self.die("Unknown level %r. Please use one of %s." % (
  103. self.loglevel,
  104. "|".join(l for l in LOG_LEVELS.keys()
  105. if isinstance(l, basestring))))
  106. def run(self):
  107. self.init_loader()
  108. self.init_queues()
  109. self.worker_init()
  110. self.redirect_stdouts_to_logger()
  111. if getattr(os, "getuid", None) and os.getuid() == 0:
  112. warnings.warn(RuntimeWarning(
  113. "Running celeryd with superuser privileges is discouraged!"))
  114. if self.discard:
  115. self.purge_messages()
  116. # Dump configuration to screen so we have some basic information
  117. # for when users sends bug reports.
  118. print(str(self.colored.cyan(" \n", self.startup_info())) +
  119. str(self.colored.reset(self.extra_info())))
  120. self.set_process_status("-active-")
  121. try:
  122. self.run_worker()
  123. except IGNORE_ERRORS:
  124. pass
  125. def on_consumer_ready(self, consumer):
  126. signals.worker_ready.send(sender=consumer)
  127. print("celery@%s has started." % self.hostname)
  128. def init_queues(self):
  129. try:
  130. self.app.select_queues(self.use_queues)
  131. except KeyError, exc:
  132. raise ImproperlyConfigured(
  133. UNKNOWN_QUEUE_ERROR % (self.use_queues, exc))
  134. def init_loader(self):
  135. self.loader = self.app.loader
  136. self.settings = self.app.conf
  137. for module in self.include:
  138. self.loader.import_from_cwd(module)
  139. def redirect_stdouts_to_logger(self):
  140. self.app.log.setup(self.loglevel, self.logfile,
  141. self.redirect_stdouts,
  142. self.redirect_stdouts_level)
  143. def purge_messages(self):
  144. count = self.app.control.discard_all()
  145. what = (not count or count > 1) and "messages" or "message"
  146. print("discard: Erased %d %s from the queue.\n" % (count, what))
  147. def worker_init(self):
  148. # Run the worker init handler.
  149. # (Usually imports task modules and such.)
  150. self.loader.init_worker()
  151. def tasklist(self, include_builtins=True):
  152. from ..registry import tasks
  153. tasklist = tasks.keys()
  154. if not include_builtins:
  155. tasklist = filter(lambda s: not s.startswith("celery."),
  156. tasklist)
  157. return "\n".join(" . %s" % task for task in sorted(tasklist))
  158. def extra_info(self):
  159. if self.loglevel <= logging.INFO:
  160. include_builtins = self.loglevel <= logging.DEBUG
  161. tasklist = self.tasklist(include_builtins=include_builtins)
  162. return EXTRA_INFO_FMT % {"tasks": tasklist}
  163. return ""
  164. def startup_info(self):
  165. app = self.app
  166. concurrency = self.concurrency
  167. if self.autoscale:
  168. cmax, cmin = self.autoscale
  169. concurrency = "{min=%s, max=%s}" % (cmin, cmax)
  170. return BANNER % {
  171. "hostname": self.hostname,
  172. "version": __version__,
  173. "conninfo": self.app.broker_connection().as_uri(),
  174. "concurrency": concurrency,
  175. "loglevel": LOG_LEVELS[self.loglevel],
  176. "logfile": self.logfile or "[stderr]",
  177. "celerybeat": "ON" if self.embed_clockservice else "OFF",
  178. "events": "ON" if self.send_events else "OFF",
  179. "loader": qualname(self.loader),
  180. "queues": app.amqp.queues.format(indent=18, indent_first=False),
  181. }
  182. def run_worker(self):
  183. if self.pidfile:
  184. pidlock = platforms.create_pidlock(self.pidfile).acquire()
  185. atexit.register(pidlock.release)
  186. worker = self.WorkController(app=self.app,
  187. hostname=self.hostname,
  188. ready_callback=self.on_consumer_ready,
  189. embed_clockservice=self.embed_clockservice,
  190. autoscale=self.autoscale,
  191. autoreload=self.autoreload,
  192. **self.confopts_as_dict())
  193. self.install_platform_tweaks(worker)
  194. worker.start()
  195. def install_platform_tweaks(self, worker):
  196. """Install platform specific tweaks and workarounds."""
  197. if self.app.IS_OSX:
  198. self.osx_proxy_detection_workaround()
  199. # Install signal handler so SIGHUP restarts the worker.
  200. if not self._isatty:
  201. # only install HUP handler if detached from terminal,
  202. # so closing the terminal window doesn't restart celeryd
  203. # into the background.
  204. if self.app.IS_OSX:
  205. # OS X can't exec from a process using threads.
  206. # See http://github.com/ask/celery/issues#issue/152
  207. install_HUP_not_supported_handler(worker)
  208. else:
  209. install_worker_restart_handler(worker)
  210. install_worker_term_handler(worker)
  211. install_worker_term_hard_handler(worker)
  212. install_worker_int_handler(worker)
  213. install_cry_handler(worker.logger)
  214. install_rdb_handler()
  215. signals.worker_init.send(sender=worker)
  216. def osx_proxy_detection_workaround(self):
  217. """See http://github.com/ask/celery/issues#issue/161"""
  218. os.environ.setdefault("celery_dummy_proxy", "set_by_celeryd")
  219. def set_process_status(self, info):
  220. info = "%s (%s)" % (info, platforms.strargv(sys.argv))
  221. return platforms.set_mp_process_title("celeryd",
  222. info=info,
  223. hostname=self.hostname)
  224. def die(self, msg, exitcode=1):
  225. sys.stderr.write("Error: %s\n" % (msg, ))
  226. sys.exit(exitcode)
  227. def install_worker_int_handler(worker):
  228. def _stop(signum, frame):
  229. process_name = get_process_name()
  230. if not process_name or process_name == "MainProcess":
  231. print("celeryd: Hitting Ctrl+C again will terminate "
  232. "all running tasks!")
  233. install_worker_int_again_handler(worker)
  234. print("celeryd: Warm shutdown (%s)" % (process_name, ))
  235. worker.stop(in_sighandler=True)
  236. raise SystemExit()
  237. platforms.signals["SIGINT"] = _stop
  238. def install_worker_int_again_handler(worker):
  239. def _stop(signum, frame):
  240. process_name = get_process_name()
  241. if not process_name or process_name == "MainProcess":
  242. print("celeryd: Cold shutdown (%s)" % (process_name, ))
  243. worker.terminate(in_sighandler=True)
  244. raise SystemTerminate()
  245. platforms.signals["SIGINT"] = _stop
  246. def install_worker_term_handler(worker):
  247. def _stop(signum, frame):
  248. process_name = get_process_name()
  249. if not process_name or process_name == "MainProcess":
  250. print("celeryd: Warm shutdown (%s)" % (process_name, ))
  251. worker.stop(in_sighandler=True)
  252. raise SystemExit()
  253. platforms.signals["SIGTERM"] = _stop
  254. def install_worker_term_hard_handler(worker):
  255. def _stop(signum, frame):
  256. process_name = get_process_name()
  257. if not process_name or process_name == "MainProcess":
  258. print("celeryd: Cold shutdown (%s)" % (process_name, ))
  259. worker.terminate(in_sighandler=True)
  260. raise SystemTerminate()
  261. platforms.signals["SIGQUIT"] = _stop
  262. def install_worker_restart_handler(worker):
  263. def restart_worker_sig_handler(signum, frame):
  264. """Signal handler restarting the current python program."""
  265. print("Restarting celeryd (%s)" % (" ".join(sys.argv), ))
  266. worker.stop(in_sighandler=True)
  267. os.execv(sys.executable, [sys.executable] + sys.argv)
  268. platforms.signals["SIGHUP"] = restart_worker_sig_handler
  269. def install_cry_handler(logger):
  270. # Jython/PyPy does not have sys._current_frames
  271. is_jython = sys.platform.startswith("java")
  272. is_pypy = hasattr(sys, "pypy_version_info")
  273. if not (is_jython or is_pypy):
  274. def cry_handler(signum, frame):
  275. """Signal handler logging the stacktrace of all active threads."""
  276. logger.error("\n" + cry())
  277. platforms.signals["SIGUSR1"] = cry_handler
  278. def install_rdb_handler(envvar="CELERY_RDBSIG"): # pragma: no cover
  279. def rdb_handler(signum, frame):
  280. """Signal handler setting a rdb breakpoint at the current frame."""
  281. from ..contrib import rdb
  282. rdb.set_trace(frame)
  283. if os.environ.get(envvar):
  284. platforms.signals["SIGUSR2"] = rdb_handler
  285. def install_HUP_not_supported_handler(worker):
  286. def warn_on_HUP_handler(signum, frame):
  287. worker.logger.error("SIGHUP not supported: "
  288. "Restarting with HUP is unstable on this platform!")
  289. platforms.signals["SIGHUP"] = warn_on_HUP_handler