worker.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297
  1. # -*- coding: utf-8 -*-
  2. from __future__ import absolute_import
  3. import logging
  4. import os
  5. import socket
  6. import sys
  7. import warnings
  8. from functools import partial
  9. from billiard import cpu_count, current_process
  10. from celery import __version__, platforms, signals
  11. from celery.app import app_or_default
  12. from celery.app.abstract import configurated, from_config
  13. from celery.exceptions import ImproperlyConfigured, SystemTerminate
  14. from celery.utils import cry, isatty
  15. from celery.utils.imports import qualname
  16. from celery.utils.log import LOG_LEVELS, get_logger, mlevel
  17. from celery.utils.text import pluralize
  18. from celery.worker import WorkController
  19. try:
  20. from greenlet import GreenletExit
  21. IGNORE_ERRORS = (GreenletExit, )
  22. except ImportError: # pragma: no cover
  23. IGNORE_ERRORS = ()
  24. logger = get_logger(__name__)
  25. BANNER = """
  26. -------------- celery@%(hostname)s v%(version)s
  27. ---- **** -----
  28. --- * *** * -- [Configuration]
  29. -- * - **** --- . broker: %(conninfo)s
  30. - ** ---------- . loader: %(loader)s
  31. - ** ---------- . logfile: %(logfile)s@%(loglevel)s
  32. - ** ---------- . concurrency: %(concurrency)s
  33. - ** ---------- . events: %(events)s
  34. - *** --- * --- . beat: %(celerybeat)s
  35. -- ******* ----
  36. --- ***** ----- [Queues]
  37. -------------- %(queues)s
  38. """
  39. EXTRA_INFO_FMT = """
  40. [Tasks]
  41. %(tasks)s
  42. """
  43. UNKNOWN_QUEUE = """\
  44. Trying to select queue subset of %r, but queue %s is not
  45. defined in the CELERY_QUEUES setting.
  46. If you want to automatically declare unknown queues you can
  47. enable the CELERY_CREATE_MISSING_QUEUES setting.
  48. """
  49. class Worker(configurated):
  50. WorkController = WorkController
  51. app = None
  52. inherit_confopts = (WorkController, )
  53. loglevel = from_config("log_level")
  54. redirect_stdouts = from_config()
  55. redirect_stdouts_level = from_config()
  56. def __init__(self, hostname=None, purge=False, beat=False,
  57. queues=None, include=None, app=None, pidfile=None,
  58. autoscale=None, autoreload=False, no_execv=False, **kwargs):
  59. self.app = app = app_or_default(app or self.app)
  60. self.hostname = hostname or socket.gethostname()
  61. # this signal can be used to set up configuration for
  62. # workers by name.
  63. signals.celeryd_init.send(sender=self.hostname, instance=self,
  64. conf=self.app.conf)
  65. self.setup_defaults(kwargs, namespace="celeryd")
  66. if not self.concurrency:
  67. try:
  68. self.concurrency = cpu_count()
  69. except NotImplementedError:
  70. self.concurrency = 2
  71. self.purge = purge
  72. self.beat = beat
  73. self.use_queues = [] if queues is None else queues
  74. self.queues = None
  75. self.include = include
  76. self.pidfile = pidfile
  77. self.autoscale = None
  78. self.autoreload = autoreload
  79. self.no_execv = no_execv
  80. if autoscale:
  81. max_c, _, min_c = autoscale.partition(",")
  82. self.autoscale = [int(max_c), min_c and int(min_c) or 0]
  83. self._isatty = isatty(sys.stdout)
  84. self.colored = app.log.colored(self.logfile)
  85. if isinstance(self.use_queues, basestring):
  86. self.use_queues = self.use_queues.split(",")
  87. if self.include:
  88. if isinstance(self.include, basestring):
  89. self.include = self.include.split(",")
  90. app.conf.CELERY_IMPORTS = tuple(
  91. self.include) + tuple(app.conf.CELERY_IMPORTS)
  92. self.loglevel = mlevel(self.loglevel)
  93. def run(self):
  94. self.init_queues()
  95. self.app.loader.init_worker()
  96. self.redirect_stdouts_to_logger()
  97. if getattr(os, "getuid", None) and os.getuid() == 0:
  98. warnings.warn(RuntimeWarning(
  99. "Running celeryd with superuser privileges is discouraged!"))
  100. if self.purge:
  101. self.purge_messages()
  102. # Dump configuration to screen so we have some basic information
  103. # for when users sends bug reports.
  104. print(str(self.colored.cyan(" \n", self.startup_info())) +
  105. str(self.colored.reset(self.extra_info() or "")))
  106. self.set_process_status("-active-")
  107. try:
  108. self.run_worker()
  109. except IGNORE_ERRORS:
  110. pass
  111. def on_consumer_ready(self, consumer):
  112. signals.worker_ready.send(sender=consumer)
  113. print("celery@%s has started." % self.hostname)
  114. def init_queues(self):
  115. try:
  116. self.app.select_queues(self.use_queues)
  117. except KeyError, exc:
  118. raise ImproperlyConfigured(UNKNOWN_QUEUE % (self.use_queues, exc))
  119. def redirect_stdouts_to_logger(self):
  120. self.app.log.setup(self.loglevel, self.logfile,
  121. self.redirect_stdouts, self.redirect_stdouts_level)
  122. def purge_messages(self):
  123. count = self.app.control.purge()
  124. print("purge: Erased %d %s from the queue.\n" % (
  125. count, pluralize(count, "message")))
  126. def tasklist(self, include_builtins=True):
  127. tasks = self.app.tasks.keys()
  128. if not include_builtins:
  129. tasks = filter(lambda s: not s.startswith("celery."), tasks)
  130. return "\n".join(" . %s" % task for task in sorted(tasks))
  131. def extra_info(self):
  132. if self.loglevel <= logging.INFO:
  133. include_builtins = self.loglevel <= logging.DEBUG
  134. tasklist = self.tasklist(include_builtins=include_builtins)
  135. return EXTRA_INFO_FMT % {"tasks": tasklist}
  136. def startup_info(self):
  137. app = self.app
  138. concurrency = self.concurrency
  139. if self.autoscale:
  140. concurrency = "{min=%s, max=%s}" % self.autoscale
  141. return BANNER % {
  142. "hostname": self.hostname,
  143. "version": __version__,
  144. "conninfo": self.app.broker_connection().as_uri(),
  145. "concurrency": concurrency,
  146. "loglevel": LOG_LEVELS[self.loglevel],
  147. "logfile": self.logfile or "[stderr]",
  148. "celerybeat": "ON" if self.beat else "OFF",
  149. "events": "ON" if self.send_events else "OFF",
  150. "loader": qualname(self.app.loader),
  151. "queues": app.amqp.queues.format(indent=18, indent_first=False),
  152. }
  153. def run_worker(self):
  154. if self.pidfile:
  155. platforms.create_pidlock(self.pidfile)
  156. worker = self.WorkController(app=self.app,
  157. hostname=self.hostname,
  158. ready_callback=self.on_consumer_ready, beat=self.beat,
  159. autoscale=self.autoscale, autoreload=self.autoreload,
  160. no_execv=self.no_execv,
  161. **self.confopts_as_dict())
  162. self.install_platform_tweaks(worker)
  163. signals.worker_init.send(sender=worker)
  164. worker.start()
  165. def install_platform_tweaks(self, worker):
  166. """Install platform specific tweaks and workarounds."""
  167. if self.app.IS_OSX:
  168. self.osx_proxy_detection_workaround()
  169. # Install signal handler so SIGHUP restarts the worker.
  170. if not self._isatty:
  171. # only install HUP handler if detached from terminal,
  172. # so closing the terminal window doesn't restart celeryd
  173. # into the background.
  174. if self.app.IS_OSX:
  175. # OS X can't exec from a process using threads.
  176. # See http://github.com/ask/celery/issues#issue/152
  177. install_HUP_not_supported_handler(worker)
  178. else:
  179. install_worker_restart_handler(worker)
  180. install_worker_term_handler(worker)
  181. install_worker_term_hard_handler(worker)
  182. install_worker_int_handler(worker)
  183. install_cry_handler()
  184. install_rdb_handler()
  185. def osx_proxy_detection_workaround(self):
  186. """See http://github.com/ask/celery/issues#issue/161"""
  187. os.environ.setdefault("celery_dummy_proxy", "set_by_celeryd")
  188. def set_process_status(self, info):
  189. return platforms.set_mp_process_title("celeryd",
  190. info="%s (%s)" % (info, platforms.strargv(sys.argv)),
  191. hostname=self.hostname)
  192. def _shutdown_handler(worker, sig="TERM", how="stop", exc=SystemExit,
  193. callback=None, types={"terminate": "Cold", "stop": "Warm"}):
  194. def _handle_request(signum, frame):
  195. if current_process()._name == "MainProcess":
  196. if callback:
  197. callback(worker)
  198. print("celeryd: %s shutdown (MainProcess)" % types[how])
  199. getattr(worker, how)(in_sighandler=True)
  200. raise exc()
  201. _handle_request.__name__ = "worker_" + how
  202. platforms.signals[sig] = _handle_request
  203. install_worker_term_handler = partial(
  204. _shutdown_handler, sig="SIGTERM", how="stop", exc=SystemExit,
  205. )
  206. install_worker_term_hard_handler = partial(
  207. _shutdown_handler, sig="SIGQUIT", how="terminate", exc=SystemTerminate,
  208. )
  209. def on_SIGINT(worker):
  210. print("celeryd: Hitting Ctrl+C again will terminate all running tasks!")
  211. install_worker_term_hard_handler(worker, sig="SIGINT")
  212. install_worker_int_handler = partial(
  213. _shutdown_handler, sig="SIGINT", callback=on_SIGINT
  214. )
  215. def install_worker_restart_handler(worker, sig="SIGHUP"):
  216. def restart_worker_sig_handler(signum, frame):
  217. """Signal handler restarting the current python program."""
  218. print("Restarting celeryd (%s)" % (" ".join(sys.argv), ))
  219. worker.stop(in_sighandler=True)
  220. os.execv(sys.executable, [sys.executable] + sys.argv)
  221. platforms.signals[sig] = restart_worker_sig_handler
  222. def install_cry_handler():
  223. # Jython/PyPy does not have sys._current_frames
  224. is_jython = sys.platform.startswith("java")
  225. is_pypy = hasattr(sys, "pypy_version_info")
  226. if is_jython or is_pypy: # pragma: no cover
  227. return
  228. def cry_handler(signum, frame):
  229. """Signal handler logging the stacktrace of all active threads."""
  230. logger.error("\n" + cry())
  231. platforms.signals["SIGUSR1"] = cry_handler
  232. def install_rdb_handler(envvar="CELERY_RDBSIG",
  233. sig="SIGUSR2"): # pragma: no cover
  234. def rdb_handler(signum, frame):
  235. """Signal handler setting a rdb breakpoint at the current frame."""
  236. from celery.contrib import rdb
  237. rdb.set_trace(frame)
  238. if os.environ.get(envvar):
  239. platforms.signals[sig] = rdb_handler
  240. def install_HUP_not_supported_handler(worker, sig="SIGHUP"):
  241. def warn_on_HUP_handler(signum, frame):
  242. logger.error("%(sig)s not supported: Restarting with %(sig)s is "
  243. "unstable on this platform!" % {"sig": sig})
  244. platforms.signals[sig] = warn_on_HUP_handler