worker.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360
  1. # -*- coding: utf-8 -*-
  2. from __future__ import absolute_import
  3. import atexit
  4. import logging
  5. try:
  6. import multiprocessing
  7. except ImportError:
  8. multiprocessing = None # noqa
  9. import os
  10. import socket
  11. import sys
  12. import warnings
  13. from .. import __version__, platforms, signals
  14. from ..app import app_or_default
  15. from ..app.abstract import configurated, from_config
  16. from ..exceptions import ImproperlyConfigured, SystemTerminate
  17. from ..utils import cry, isatty, LOG_LEVELS, pluralize, qualname
  18. from ..worker import WorkController
  19. try:
  20. from greenlet import GreenletExit
  21. IGNORE_ERRORS = (GreenletExit, )
  22. except ImportError:
  23. IGNORE_ERRORS = ()
  24. BANNER = """
  25. -------------- celery@%(hostname)s v%(version)s
  26. ---- **** -----
  27. --- * *** * -- [Configuration]
  28. -- * - **** --- . broker: %(conninfo)s
  29. - ** ---------- . loader: %(loader)s
  30. - ** ---------- . logfile: %(logfile)s@%(loglevel)s
  31. - ** ---------- . concurrency: %(concurrency)s
  32. - ** ---------- . events: %(events)s
  33. - *** --- * --- . beat: %(celerybeat)s
  34. -- ******* ----
  35. --- ***** ----- [Queues]
  36. -------------- %(queues)s
  37. """
  38. EXTRA_INFO_FMT = """
  39. [Tasks]
  40. %(tasks)s
  41. """
  42. UNKNOWN_QUEUE_ERROR = """\
  43. Trying to select queue subset of %r, but queue %s is not
  44. defined in the CELERY_QUEUES setting.
  45. If you want to automatically declare unknown queues you can
  46. enable the CELERY_CREATE_MISSING_QUEUES setting.
  47. """
  48. def cpu_count():
  49. if multiprocessing is not None:
  50. try:
  51. return multiprocessing.cpu_count()
  52. except NotImplementedError:
  53. pass
  54. return 2
  55. def get_process_name():
  56. if multiprocessing is not None:
  57. return multiprocessing.current_process().name
  58. class Worker(configurated):
  59. WorkController = WorkController
  60. inherit_confopts = (WorkController, )
  61. loglevel = from_config("log_level")
  62. redirect_stdouts = from_config()
  63. redirect_stdouts_level = from_config()
  64. def __init__(self, hostname=None, discard=False, embed_clockservice=False,
  65. queues=None, include=None, app=None, pidfile=None,
  66. autoscale=None, autoreload=False, **kwargs):
  67. self.app = app = app_or_default(app)
  68. self.hostname = hostname or socket.gethostname()
  69. # this signal can be used to set up configuration for
  70. # workers by name.
  71. signals.celeryd_init.send(sender=self.hostname, instance=self,
  72. conf=self.app.conf)
  73. self.setup_defaults(kwargs, namespace="celeryd")
  74. if not self.concurrency:
  75. self.concurrency = cpu_count()
  76. self.discard = discard
  77. self.embed_clockservice = embed_clockservice
  78. if self.app.IS_WINDOWS and self.embed_clockservice:
  79. self.die("-B option does not work on Windows. "
  80. "Please run celerybeat as a separate service.")
  81. self.use_queues = [] if queues is None else queues
  82. self.queues = None
  83. self.include = [] if include is None else include
  84. self.pidfile = pidfile
  85. self.autoscale = None
  86. self.autoreload = autoreload
  87. if autoscale:
  88. max_c, _, min_c = autoscale.partition(",")
  89. self.autoscale = [int(max_c), min_c and int(min_c) or 0]
  90. self._isatty = isatty(sys.stdout)
  91. self.colored = app.log.colored(self.logfile)
  92. if isinstance(self.use_queues, basestring):
  93. self.use_queues = self.use_queues.split(",")
  94. if isinstance(self.include, basestring):
  95. self.include = self.include.split(",")
  96. if not isinstance(self.loglevel, int):
  97. try:
  98. self.loglevel = LOG_LEVELS[self.loglevel.upper()]
  99. except KeyError:
  100. self.die("Unknown level %r. Please use one of %s." % (
  101. self.loglevel,
  102. "|".join(l for l in LOG_LEVELS.keys()
  103. if isinstance(l, basestring))))
  104. def run(self):
  105. self.init_loader()
  106. self.init_queues()
  107. self.worker_init()
  108. self.redirect_stdouts_to_logger()
  109. if getattr(os, "getuid", None) and os.getuid() == 0:
  110. warnings.warn(RuntimeWarning(
  111. "Running celeryd with superuser privileges is discouraged!"))
  112. if self.discard:
  113. self.purge_messages()
  114. # Dump configuration to screen so we have some basic information
  115. # for when users sends bug reports.
  116. print(str(self.colored.cyan(" \n", self.startup_info())) +
  117. str(self.colored.reset(self.extra_info())))
  118. self.set_process_status("-active-")
  119. try:
  120. self.run_worker()
  121. except IGNORE_ERRORS:
  122. pass
  123. def on_consumer_ready(self, consumer):
  124. signals.worker_ready.send(sender=consumer)
  125. print("celery@%s has started." % self.hostname)
  126. def init_queues(self):
  127. try:
  128. self.app.select_queues(self.use_queues)
  129. except KeyError, exc:
  130. raise ImproperlyConfigured(
  131. UNKNOWN_QUEUE_ERROR % (self.use_queues, exc))
  132. def init_loader(self):
  133. self.loader = self.app.loader
  134. self.settings = self.app.conf
  135. for module in self.include:
  136. self.loader.import_task_module(module)
  137. def redirect_stdouts_to_logger(self):
  138. self.app.log.setup(self.loglevel, self.logfile,
  139. self.redirect_stdouts,
  140. self.redirect_stdouts_level)
  141. def purge_messages(self):
  142. count = self.app.control.discard_all()
  143. print("discard: Erased %d %s from the queue.\n" % (
  144. count, pluralize(count, "message")))
  145. def worker_init(self):
  146. # Run the worker init handler.
  147. # (Usually imports task modules and such.)
  148. self.loader.init_worker()
  149. def tasklist(self, include_builtins=True):
  150. from ..registry import tasks
  151. tasklist = tasks.keys()
  152. if not include_builtins:
  153. tasklist = filter(lambda s: not s.startswith("celery."),
  154. tasklist)
  155. return "\n".join(" . %s" % task for task in sorted(tasklist))
  156. def extra_info(self):
  157. if self.loglevel <= logging.INFO:
  158. include_builtins = self.loglevel <= logging.DEBUG
  159. tasklist = self.tasklist(include_builtins=include_builtins)
  160. return EXTRA_INFO_FMT % {"tasks": tasklist}
  161. return ""
  162. def startup_info(self):
  163. app = self.app
  164. concurrency = self.concurrency
  165. if self.autoscale:
  166. cmax, cmin = self.autoscale
  167. concurrency = "{min=%s, max=%s}" % (cmin, cmax)
  168. return BANNER % {
  169. "hostname": self.hostname,
  170. "version": __version__,
  171. "conninfo": self.app.broker_connection().as_uri(),
  172. "concurrency": concurrency,
  173. "loglevel": LOG_LEVELS[self.loglevel],
  174. "logfile": self.logfile or "[stderr]",
  175. "celerybeat": "ON" if self.embed_clockservice else "OFF",
  176. "events": "ON" if self.send_events else "OFF",
  177. "loader": qualname(self.loader),
  178. "queues": app.amqp.queues.format(indent=18, indent_first=False),
  179. }
  180. def run_worker(self):
  181. if self.pidfile:
  182. pidlock = platforms.create_pidlock(self.pidfile).acquire()
  183. atexit.register(pidlock.release)
  184. worker = self.WorkController(app=self.app,
  185. hostname=self.hostname,
  186. ready_callback=self.on_consumer_ready,
  187. embed_clockservice=self.embed_clockservice,
  188. autoscale=self.autoscale,
  189. autoreload=self.autoreload,
  190. **self.confopts_as_dict())
  191. self.install_platform_tweaks(worker)
  192. signals.worker_init.send(sender=worker)
  193. worker.start()
  194. def install_platform_tweaks(self, worker):
  195. """Install platform specific tweaks and workarounds."""
  196. if self.app.IS_OSX:
  197. self.osx_proxy_detection_workaround()
  198. # Install signal handler so SIGHUP restarts the worker.
  199. if not self._isatty:
  200. # only install HUP handler if detached from terminal,
  201. # so closing the terminal window doesn't restart celeryd
  202. # into the background.
  203. if self.app.IS_OSX:
  204. # OS X can't exec from a process using threads.
  205. # See http://github.com/celery/celery/issues#issue/152
  206. install_HUP_not_supported_handler(worker)
  207. else:
  208. install_worker_restart_handler(worker)
  209. install_worker_term_handler(worker)
  210. install_worker_term_hard_handler(worker)
  211. install_worker_int_handler(worker)
  212. install_cry_handler(worker.logger)
  213. install_rdb_handler()
  214. def osx_proxy_detection_workaround(self):
  215. """See http://github.com/celery/celery/issues#issue/161"""
  216. os.environ.setdefault("celery_dummy_proxy", "set_by_celeryd")
  217. def set_process_status(self, info):
  218. info = "%s (%s)" % (info, platforms.strargv(sys.argv))
  219. return platforms.set_mp_process_title("celeryd",
  220. info=info,
  221. hostname=self.hostname)
  222. def die(self, msg, exitcode=1):
  223. sys.stderr.write("Error: %s\n" % (msg, ))
  224. sys.exit(exitcode)
  225. def install_worker_int_handler(worker):
  226. def _stop(signum, frame):
  227. process_name = get_process_name()
  228. if not process_name or process_name == "MainProcess":
  229. print("celeryd: Hitting Ctrl+C again will terminate "
  230. "all running tasks!")
  231. install_worker_int_again_handler(worker)
  232. print("celeryd: Warm shutdown (%s)" % (process_name, ))
  233. worker.stop(in_sighandler=True)
  234. raise SystemExit()
  235. platforms.signals["SIGINT"] = _stop
  236. def install_worker_int_again_handler(worker):
  237. def _stop(signum, frame):
  238. process_name = get_process_name()
  239. if not process_name or process_name == "MainProcess":
  240. print("celeryd: Cold shutdown (%s)" % (process_name, ))
  241. worker.terminate(in_sighandler=True)
  242. raise SystemTerminate()
  243. platforms.signals["SIGINT"] = _stop
  244. def install_worker_term_handler(worker):
  245. def _stop(signum, frame):
  246. process_name = get_process_name()
  247. if not process_name or process_name == "MainProcess":
  248. print("celeryd: Warm shutdown (%s)" % (process_name, ))
  249. worker.stop(in_sighandler=True)
  250. raise SystemExit()
  251. platforms.signals["SIGTERM"] = _stop
  252. def install_worker_term_hard_handler(worker):
  253. def _stop(signum, frame):
  254. process_name = get_process_name()
  255. if not process_name or process_name == "MainProcess":
  256. print("celeryd: Cold shutdown (%s)" % (process_name, ))
  257. worker.terminate(in_sighandler=True)
  258. raise SystemTerminate()
  259. platforms.signals["SIGQUIT"] = _stop
  260. def install_worker_restart_handler(worker):
  261. def restart_worker_sig_handler(signum, frame):
  262. """Signal handler restarting the current python program."""
  263. print("Restarting celeryd (%s)" % (" ".join(sys.argv), ))
  264. worker.stop(in_sighandler=True)
  265. os.execv(sys.executable, [sys.executable] + sys.argv)
  266. platforms.signals["SIGHUP"] = restart_worker_sig_handler
  267. def install_cry_handler(logger):
  268. # Jython/PyPy does not have sys._current_frames
  269. is_jython = sys.platform.startswith("java")
  270. is_pypy = hasattr(sys, "pypy_version_info")
  271. if not (is_jython or is_pypy):
  272. def cry_handler(signum, frame):
  273. """Signal handler logging the stacktrace of all active threads."""
  274. logger.error("\n" + cry())
  275. platforms.signals["SIGUSR1"] = cry_handler
  276. def install_rdb_handler(envvar="CELERY_RDBSIG"): # pragma: no cover
  277. def rdb_handler(signum, frame):
  278. """Signal handler setting a rdb breakpoint at the current frame."""
  279. from ..contrib import rdb
  280. rdb.set_trace(frame)
  281. if os.environ.get(envvar):
  282. platforms.signals["SIGUSR2"] = rdb_handler
  283. def install_HUP_not_supported_handler(worker):
  284. def warn_on_HUP_handler(signum, frame):
  285. worker.logger.error("SIGHUP not supported: "
  286. "Restarting with HUP is unstable on this platform!")
  287. platforms.signals["SIGHUP"] = warn_on_HUP_handler