worker.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339
  1. # -*- coding: utf-8 -*-
  2. """This module is the 'program-version' of :mod:`celery.worker`.
  3. It does everything necessary to run that module
  4. as an actual application, like installing signal handlers,
  5. platform tweaks, and so on.
  6. """
  7. import logging
  8. import os
  9. import platform as _platform
  10. import sys
  11. from datetime import datetime
  12. from functools import partial
  13. from billiard.process import current_process
  14. from kombu.utils.encoding import safe_str
  15. from celery import VERSION_BANNER, platforms, signals
  16. from celery.app import trace
  17. from celery.exceptions import WorkerShutdown, WorkerTerminate
  18. from celery.loaders.app import AppLoader
  19. from celery.platforms import EX_FAILURE, EX_OK, check_privileges, isatty
  20. from celery.utils.debug import cry
  21. from celery.utils.imports import qualname
  22. from celery.utils.log import get_logger, in_sighandler, set_in_sighandler
  23. from celery.utils.text import pluralize
  24. from celery.worker import WorkController
  25. __all__ = ['Worker']
  26. logger = get_logger(__name__)
  27. is_jython = sys.platform.startswith('java')
  28. is_pypy = hasattr(sys, 'pypy_version_info')
  29. def active_thread_count():
  30. from threading import enumerate
  31. return sum(1 for t in enumerate()
  32. if not t.name.startswith('Dummy-'))
  33. def safe_say(msg):
  34. print('\n{0}'.format(msg), file=sys.__stderr__)
  35. ARTLINES = [
  36. ' --------------',
  37. '---- **** -----',
  38. '--- * *** * --',
  39. '-- * - **** ---',
  40. '- ** ----------',
  41. '- ** ----------',
  42. '- ** ----------',
  43. '- ** ----------',
  44. '- *** --- * ---',
  45. '-- ******* ----',
  46. '--- ***** -----',
  47. ' --------------',
  48. ]
  49. BANNER = """\
  50. {hostname} v{version}
  51. {platform} {timestamp}
  52. [config]
  53. .> app: {app}
  54. .> transport: {conninfo}
  55. .> results: {results}
  56. .> concurrency: {concurrency}
  57. [queues]
  58. {queues}
  59. """
  60. EXTRA_INFO_FMT = """
  61. [tasks]
  62. {tasks}
  63. """
  64. class Worker(WorkController):
  65. def on_before_init(self, **kwargs):
  66. trace.setup_worker_optimizations(self.app, self.hostname)
  67. # this signal can be used to set up configuration for
  68. # workers by name.
  69. signals.celeryd_init.send(
  70. sender=self.hostname, instance=self,
  71. conf=self.app.conf, options=kwargs,
  72. )
  73. check_privileges(self.app.conf.accept_content)
  74. def on_after_init(self, purge=False, no_color=None,
  75. redirect_stdouts=None, redirect_stdouts_level=None,
  76. **kwargs):
  77. self.redirect_stdouts = self.app.either(
  78. 'worker_redirect_stdouts', redirect_stdouts)
  79. self.redirect_stdouts_level = self.app.either(
  80. 'worker_redirect_stdouts_level', redirect_stdouts_level)
  81. super(Worker, self).setup_defaults(**kwargs)
  82. self.purge = purge
  83. self.no_color = no_color
  84. self._isatty = isatty(sys.stdout)
  85. self.colored = self.app.log.colored(
  86. self.logfile,
  87. enabled=not no_color if no_color is not None else no_color
  88. )
  89. def on_init_blueprint(self):
  90. self._custom_logging = self.setup_logging()
  91. # apply task execution optimizations
  92. # -- This will finalize the app!
  93. trace.setup_worker_optimizations(self.app, self.hostname)
  94. def on_start(self):
  95. app = self.app
  96. if not self._custom_logging and self.redirect_stdouts:
  97. app.log.redirect_stdouts(self.redirect_stdouts_level)
  98. WorkController.on_start(self)
  99. # this signal can be used to e.g. change queues after
  100. # the -Q option has been applied.
  101. signals.celeryd_after_setup.send(
  102. sender=self.hostname, instance=self, conf=app.conf,
  103. )
  104. if self.purge:
  105. self.purge_messages()
  106. # Dump configuration to screen so we have some basic information
  107. # for when users sends bug reports.
  108. print(safe_str(''.join([
  109. str(self.colored.cyan(' \n', self.startup_info())),
  110. str(self.colored.reset(self.extra_info() or '')),
  111. ])), file=sys.__stdout__)
  112. self.set_process_status('-active-')
  113. self.install_platform_tweaks(self)
  114. def on_consumer_ready(self, consumer):
  115. signals.worker_ready.send(sender=consumer)
  116. print('{0} ready.'.format(safe_str(self.hostname),))
  117. def setup_logging(self, colorize=None):
  118. if colorize is None and self.no_color is not None:
  119. colorize = not self.no_color
  120. return self.app.log.setup(
  121. self.loglevel, self.logfile,
  122. redirect_stdouts=False, colorize=colorize, hostname=self.hostname,
  123. )
  124. def purge_messages(self):
  125. with self.app.connection_for_write() as connection:
  126. count = self.app.control.purge(connection=connection)
  127. if count: # pragma: no cover
  128. print('purge: Erased {0} {1} from the queue.\n'.format(
  129. count, pluralize(count, 'message')))
  130. def tasklist(self, include_builtins=True, sep='\n', int_='celery.'):
  131. return sep.join(
  132. ' . {0}'.format(task) for task in sorted(self.app.tasks)
  133. if (not task.startswith(int_) if not include_builtins else task)
  134. )
  135. def extra_info(self):
  136. if self.loglevel <= logging.INFO:
  137. include_builtins = self.loglevel <= logging.DEBUG
  138. tasklist = self.tasklist(include_builtins=include_builtins)
  139. return EXTRA_INFO_FMT.format(tasks=tasklist)
  140. def startup_info(self):
  141. app = self.app
  142. concurrency = str(self.concurrency)
  143. appr = '{0}:{1:#x}'.format(app.main or '__main__', id(app))
  144. if not isinstance(app.loader, AppLoader):
  145. loader = qualname(app.loader)
  146. if loader.startswith('celery.loaders'): # pragma: no cover
  147. loader = loader[14:]
  148. appr += ' ({0})'.format(loader)
  149. pool = self.pool_cls
  150. if not isinstance(pool, str):
  151. pool = pool.__module__
  152. concurrency += ' ({0})'.format(pool.split('.')[-1])
  153. events = 'ON'
  154. if not self.send_events:
  155. events = 'OFF (enable -E to monitor this worker)'
  156. banner = BANNER.format(
  157. app=appr,
  158. hostname=safe_str(self.hostname),
  159. timestamp=datetime.now().replace(microsecond=0),
  160. version=VERSION_BANNER,
  161. conninfo=self.app.connection().as_uri(),
  162. results=self.app.backend.as_uri(),
  163. concurrency=concurrency,
  164. platform=safe_str(_platform.platform()),
  165. events=events,
  166. queues=app.amqp.queues.format(indent=0, indent_first=False),
  167. ).splitlines()
  168. # integrate the ASCII art.
  169. for i, x in enumerate(banner):
  170. try:
  171. banner[i] = ' '.join([ARTLINES[i], banner[i]])
  172. except IndexError:
  173. banner[i] = ' ' * 16 + banner[i]
  174. return '\n'.join(banner) + '\n'
  175. def install_platform_tweaks(self, worker):
  176. """Install platform specific tweaks and workarounds."""
  177. if self.app.IS_macOS:
  178. self.macOS_proxy_detection_workaround()
  179. # Install signal handler so SIGHUP restarts the worker.
  180. if not self._isatty:
  181. # only install HUP handler if detached from terminal,
  182. # so closing the terminal window doesn't restart the worker
  183. # into the background.
  184. if self.app.IS_macOS:
  185. # macOS can't exec from a process using threads.
  186. # See https://github.com/celery/celery/issues#issue/152
  187. install_HUP_not_supported_handler(worker)
  188. else:
  189. install_worker_restart_handler(worker)
  190. install_worker_term_handler(worker)
  191. install_worker_term_hard_handler(worker)
  192. install_worker_int_handler(worker)
  193. install_cry_handler()
  194. install_rdb_handler()
  195. def macOS_proxy_detection_workaround(self):
  196. """See https://github.com/celery/celery/issues#issue/161"""
  197. os.environ.setdefault('celery_dummy_proxy', 'set_by_celeryd')
  198. def set_process_status(self, info):
  199. return platforms.set_mp_process_title(
  200. 'celeryd',
  201. info='{0} ({1})'.format(info, platforms.strargv(sys.argv)),
  202. hostname=self.hostname,
  203. )
  204. def _shutdown_handler(worker, sig='TERM', how='Warm',
  205. exc=WorkerShutdown, callback=None, exitcode=EX_OK):
  206. def _handle_request(*args):
  207. with in_sighandler():
  208. from celery.worker import state
  209. if current_process()._name == 'MainProcess':
  210. if callback:
  211. callback(worker)
  212. safe_say('worker: {0} shutdown (MainProcess)'.format(how))
  213. if active_thread_count() > 1:
  214. setattr(state, {'Warm': 'should_stop',
  215. 'Cold': 'should_terminate'}[how], exitcode)
  216. else:
  217. raise exc(exitcode)
  218. _handle_request.__name__ = str('worker_{0}'.format(how))
  219. platforms.signals[sig] = _handle_request
  220. install_worker_term_handler = partial(
  221. _shutdown_handler, sig='SIGTERM', how='Warm', exc=WorkerShutdown,
  222. )
  223. if not is_jython: # pragma: no cover
  224. install_worker_term_hard_handler = partial(
  225. _shutdown_handler, sig='SIGQUIT', how='Cold', exc=WorkerTerminate,
  226. exitcode=EX_FAILURE,
  227. )
  228. else: # pragma: no cover
  229. install_worker_term_handler = \
  230. install_worker_term_hard_handler = lambda *a, **kw: None
  231. def on_SIGINT(worker):
  232. safe_say('worker: Hitting Ctrl+C again will terminate all running tasks!')
  233. install_worker_term_hard_handler(worker, sig='SIGINT')
  234. if not is_jython: # pragma: no cover
  235. install_worker_int_handler = partial(
  236. _shutdown_handler, sig='SIGINT', callback=on_SIGINT,
  237. exitcode=EX_FAILURE,
  238. )
  239. else: # pragma: no cover
  240. def install_worker_int_handler(*args, **kwargs):
  241. pass
  242. def _reload_current_worker():
  243. platforms.close_open_fds([
  244. sys.__stdin__, sys.__stdout__, sys.__stderr__,
  245. ])
  246. os.execv(sys.executable, [sys.executable] + sys.argv)
  247. def install_worker_restart_handler(worker, sig='SIGHUP'):
  248. def restart_worker_sig_handler(*args):
  249. """Signal handler restarting the current python program."""
  250. set_in_sighandler(True)
  251. safe_say('Restarting celery worker ({0})'.format(' '.join(sys.argv)))
  252. import atexit
  253. atexit.register(_reload_current_worker)
  254. from celery.worker import state
  255. state.should_stop = EX_OK
  256. platforms.signals[sig] = restart_worker_sig_handler
  257. def install_cry_handler(sig='SIGUSR1'):
  258. # Jython/PyPy does not have sys._current_frames
  259. if is_jython or is_pypy: # pragma: no cover
  260. return
  261. def cry_handler(*args):
  262. """Signal handler logging the stack-trace of all active threads."""
  263. with in_sighandler():
  264. safe_say(cry())
  265. platforms.signals[sig] = cry_handler
  266. def install_rdb_handler(envvar='CELERY_RDBSIG',
  267. sig='SIGUSR2'): # pragma: no cover
  268. def rdb_handler(*args):
  269. """Signal handler setting a rdb breakpoint at the current frame."""
  270. with in_sighandler():
  271. from celery.contrib.rdb import set_trace, _frame
  272. # gevent does not pass standard signal handler args
  273. frame = args[1] if args else _frame().f_back
  274. set_trace(frame)
  275. if os.environ.get(envvar):
  276. platforms.signals[sig] = rdb_handler
  277. def install_HUP_not_supported_handler(worker, sig='SIGHUP'):
  278. def warn_on_HUP_handler(signum, frame):
  279. with in_sighandler():
  280. safe_say('{sig} not supported: Restarting with {sig} is '
  281. 'unstable on this platform!'.format(sig=sig))
  282. platforms.signals[sig] = warn_on_HUP_handler