worker.py 12 KB


  1. # -*- coding: utf-8 -*-
  2. """Worker command-line program.
  3. This module is the 'program-version' of :mod:`celery.worker`.
  4. It does everything necessary to run that module
  5. as an actual application, like installing signal handlers,
  6. platform tweaks, and so on.
  7. """
  8. from __future__ import absolute_import, print_function, unicode_literals
  9. import logging
  10. import os
  11. import platform as _platform
  12. import sys
  13. from datetime import datetime
  14. from functools import partial
  15. from billiard.process import current_process
  16. from kombu.utils.encoding import safe_str
  17. from celery import VERSION_BANNER
  18. from celery import platforms
  19. from celery import signals
  20. from celery.app import trace
  21. from celery.exceptions import WorkerShutdown, WorkerTerminate
  22. from celery.five import string, string_t
  23. from celery.loaders.app import AppLoader
  24. from celery.platforms import EX_FAILURE, EX_OK, check_privileges, isatty
  25. from celery.utils import static
  26. from celery.utils import term
  27. from celery.utils.debug import cry
  28. from celery.utils.imports import qualname
  29. from celery.utils.log import get_logger, in_sighandler, set_in_sighandler
  30. from celery.utils.text import pluralize
  31. from celery.worker import WorkController
  32. __all__ = ['Worker']
  33. logger = get_logger(__name__)
  34. is_jython = sys.platform.startswith('java')
  35. is_pypy = hasattr(sys, 'pypy_version_info')
  36. ARTLINES = [
  37. ' --------------',
  38. '---- **** -----',
  39. '--- * *** * --',
  40. '-- * - **** ---',
  41. '- ** ----------',
  42. '- ** ----------',
  43. '- ** ----------',
  44. '- ** ----------',
  45. '- *** --- * ---',
  46. '-- ******* ----',
  47. '--- ***** -----',
  48. ' --------------',
  49. ]
  50. BANNER = """\
  51. {hostname} v{version}
  52. {platform} {timestamp}
  53. [config]
  54. .> app: {app}
  55. .> transport: {conninfo}
  56. .> results: {results}
  57. .> concurrency: {concurrency}
  58. .> task events: {events}
  59. [queues]
  60. {queues}
  61. """
  62. EXTRA_INFO_FMT = """
  63. [tasks]
  64. {tasks}
  65. """
  66. def active_thread_count():
  67. from threading import enumerate
  68. return sum(1 for t in enumerate()
  69. if not t.name.startswith('Dummy-'))
  70. def safe_say(msg):
  71. print('\n{0}'.format(msg), file=sys.__stderr__)
  72. class Worker(WorkController):
  73. """Worker as a program."""
  74. def on_before_init(self, quiet=False, **kwargs):
  75. self.quiet = quiet
  76. trace.setup_worker_optimizations(self.app, self.hostname)
  77. # this signal can be used to set up configuration for
  78. # workers by name.
  79. signals.celeryd_init.send(
  80. sender=self.hostname, instance=self,
  81. conf=self.app.conf, options=kwargs,
  82. )
  83. check_privileges(self.app.conf.accept_content)
  84. def on_after_init(self, purge=False, no_color=None,
  85. redirect_stdouts=None, redirect_stdouts_level=None,
  86. **kwargs):
  87. self.redirect_stdouts = self.app.either(
  88. 'worker_redirect_stdouts', redirect_stdouts)
  89. self.redirect_stdouts_level = self.app.either(
  90. 'worker_redirect_stdouts_level', redirect_stdouts_level)
  91. super(Worker, self).setup_defaults(**kwargs)
  92. self.purge = purge
  93. self.no_color = no_color
  94. self._isatty = isatty(sys.stdout)
  95. self.colored = self.app.log.colored(
  96. self.logfile,
  97. enabled=not no_color if no_color is not None else no_color
  98. )
  99. def on_init_blueprint(self):
  100. self._custom_logging = self.setup_logging()
  101. # apply task execution optimizations
  102. # -- This will finalize the app!
  103. trace.setup_worker_optimizations(self.app, self.hostname)
  104. def on_start(self):
  105. app = self.app
  106. WorkController.on_start(self)
  107. # this signal can be used to, for example, change queues after
  108. # the -Q option has been applied.
  109. signals.celeryd_after_setup.send(
  110. sender=self.hostname, instance=self, conf=app.conf,
  111. )
  112. if self.purge:
  113. self.purge_messages()
  114. if not self.quiet:
  115. self.emit_banner()
  116. self.set_process_status('-active-')
  117. self.install_platform_tweaks(self)
  118. if not self._custom_logging and self.redirect_stdouts:
  119. app.log.redirect_stdouts(self.redirect_stdouts_level)
  120. def emit_banner(self):
  121. # Dump configuration to screen so we have some basic information
  122. # for when users sends bug reports.
  123. use_image = term.supports_images()
  124. if use_image:
  125. print(term.imgcat(static.logo()))
  126. print(safe_str(''.join([
  127. string(self.colored.cyan(
  128. ' \n', self.startup_info(artlines=not use_image))),
  129. string(self.colored.reset(self.extra_info() or '')),
  130. ])), file=sys.__stdout__)
  131. def on_consumer_ready(self, consumer):
  132. signals.worker_ready.send(sender=consumer)
  133. logger.info('%s ready.', safe_str(self.hostname))
  134. def setup_logging(self, colorize=None):
  135. if colorize is None and self.no_color is not None:
  136. colorize = not self.no_color
  137. return self.app.log.setup(
  138. self.loglevel, self.logfile,
  139. redirect_stdouts=False, colorize=colorize, hostname=self.hostname,
  140. )
  141. def purge_messages(self):
  142. with self.app.connection_for_write() as connection:
  143. count = self.app.control.purge(connection=connection)
  144. if count: # pragma: no cover
  145. print('purge: Erased {0} {1} from the queue.\n'.format(
  146. count, pluralize(count, 'message')))
  147. def tasklist(self, include_builtins=True, sep='\n', int_='celery.'):
  148. return sep.join(
  149. ' . {0}'.format(task) for task in sorted(self.app.tasks)
  150. if (not task.startswith(int_) if not include_builtins else task)
  151. )
  152. def extra_info(self):
  153. if self.loglevel is None:
  154. return
  155. if self.loglevel <= logging.INFO:
  156. include_builtins = self.loglevel <= logging.DEBUG
  157. tasklist = self.tasklist(include_builtins=include_builtins)
  158. return EXTRA_INFO_FMT.format(tasks=tasklist)
  159. def startup_info(self, artlines=True):
  160. app = self.app
  161. concurrency = string(self.concurrency)
  162. appr = '{0}:{1:#x}'.format(app.main or '__main__', id(app))
  163. if not isinstance(app.loader, AppLoader):
  164. loader = qualname(app.loader)
  165. if loader.startswith('celery.loaders'): # pragma: no cover
  166. loader = loader[14:]
  167. appr += ' ({0})'.format(loader)
  168. if self.autoscale:
  169. max, min = self.autoscale
  170. concurrency = '{{min={0}, max={1}}}'.format(min, max)
  171. pool = self.pool_cls
  172. if not isinstance(pool, string_t):
  173. pool = pool.__module__
  174. concurrency += ' ({0})'.format(pool.split('.')[-1])
  175. events = 'ON'
  176. if not self.task_events:
  177. events = 'OFF (enable -E to monitor tasks in this worker)'
  178. banner = BANNER.format(
  179. app=appr,
  180. hostname=safe_str(self.hostname),
  181. timestamp=datetime.now().replace(microsecond=0),
  182. version=VERSION_BANNER,
  183. conninfo=self.app.connection().as_uri(),
  184. results=self.app.backend.as_uri(),
  185. concurrency=concurrency,
  186. platform=safe_str(_platform.platform()),
  187. events=events,
  188. queues=app.amqp.queues.format(indent=0, indent_first=False),
  189. ).splitlines()
  190. # integrate the ASCII art.
  191. if artlines:
  192. for i, _ in enumerate(banner):
  193. try:
  194. banner[i] = ' '.join([ARTLINES[i], banner[i]])
  195. except IndexError:
  196. banner[i] = ' ' * 16 + banner[i]
  197. return '\n'.join(banner) + '\n'
  198. def install_platform_tweaks(self, worker):
  199. """Install platform specific tweaks and workarounds."""
  200. if self.app.IS_macOS:
  201. self.macOS_proxy_detection_workaround()
  202. # Install signal handler so SIGHUP restarts the worker.
  203. if not self._isatty:
  204. # only install HUP handler if detached from terminal,
  205. # so closing the terminal window doesn't restart the worker
  206. # into the background.
  207. if self.app.IS_macOS:
  208. # macOS can't exec from a process using threads.
  209. # See https://github.com/celery/celery/issues#issue/152
  210. install_HUP_not_supported_handler(worker)
  211. else:
  212. install_worker_restart_handler(worker)
  213. install_worker_term_handler(worker)
  214. install_worker_term_hard_handler(worker)
  215. install_worker_int_handler(worker)
  216. install_cry_handler()
  217. install_rdb_handler()
  218. def macOS_proxy_detection_workaround(self):
  219. """See https://github.com/celery/celery/issues#issue/161."""
  220. os.environ.setdefault('celery_dummy_proxy', 'set_by_celeryd')
  221. def set_process_status(self, info):
  222. return platforms.set_mp_process_title(
  223. 'celeryd',
  224. info='{0} ({1})'.format(info, platforms.strargv(sys.argv)),
  225. hostname=self.hostname,
  226. )
  227. def _shutdown_handler(worker, sig='TERM', how='Warm',
  228. exc=WorkerShutdown, callback=None, exitcode=EX_OK):
  229. def _handle_request(*args):
  230. with in_sighandler():
  231. from celery.worker import state
  232. if current_process()._name == 'MainProcess':
  233. if callback:
  234. callback(worker)
  235. safe_say('worker: {0} shutdown (MainProcess)'.format(how))
  236. if active_thread_count() > 1:
  237. setattr(state, {'Warm': 'should_stop',
  238. 'Cold': 'should_terminate'}[how], exitcode)
  239. else:
  240. raise exc(exitcode)
  241. _handle_request.__name__ = str('worker_{0}'.format(how))
  242. platforms.signals[sig] = _handle_request
  243. install_worker_term_handler = partial(
  244. _shutdown_handler, sig='SIGTERM', how='Warm', exc=WorkerShutdown,
  245. )
  246. if not is_jython: # pragma: no cover
  247. install_worker_term_hard_handler = partial(
  248. _shutdown_handler, sig='SIGQUIT', how='Cold', exc=WorkerTerminate,
  249. exitcode=EX_FAILURE,
  250. )
  251. else: # pragma: no cover
  252. install_worker_term_handler = \
  253. install_worker_term_hard_handler = lambda *a, **kw: None
  254. def on_SIGINT(worker):
  255. safe_say('worker: Hitting Ctrl+C again will terminate all running tasks!')
  256. install_worker_term_hard_handler(worker, sig='SIGINT')
  257. if not is_jython: # pragma: no cover
  258. install_worker_int_handler = partial(
  259. _shutdown_handler, sig='SIGINT', callback=on_SIGINT,
  260. exitcode=EX_FAILURE,
  261. )
  262. else: # pragma: no cover
  263. def install_worker_int_handler(*args, **kwargs):
  264. pass
  265. def _reload_current_worker():
  266. platforms.close_open_fds([
  267. sys.__stdin__, sys.__stdout__, sys.__stderr__,
  268. ])
  269. os.execv(sys.executable, [sys.executable] + sys.argv)
  270. def install_worker_restart_handler(worker, sig='SIGHUP'):
  271. def restart_worker_sig_handler(*args):
  272. """Signal handler restarting the current python program."""
  273. set_in_sighandler(True)
  274. safe_say('Restarting celery worker ({0})'.format(' '.join(sys.argv)))
  275. import atexit
  276. atexit.register(_reload_current_worker)
  277. from celery.worker import state
  278. state.should_stop = EX_OK
  279. platforms.signals[sig] = restart_worker_sig_handler
  280. def install_cry_handler(sig='SIGUSR1'):
  281. # Jython/PyPy does not have sys._current_frames
  282. if is_jython or is_pypy: # pragma: no cover
  283. return
  284. def cry_handler(*args):
  285. """Signal handler logging the stack-trace of all active threads."""
  286. with in_sighandler():
  287. safe_say(cry())
  288. platforms.signals[sig] = cry_handler
  289. def install_rdb_handler(envvar='CELERY_RDBSIG',
  290. sig='SIGUSR2'): # pragma: no cover
  291. def rdb_handler(*args):
  292. """Signal handler setting a rdb breakpoint at the current frame."""
  293. with in_sighandler():
  294. from celery.contrib.rdb import set_trace, _frame
  295. # gevent does not pass standard signal handler args
  296. frame = args[1] if args else _frame().f_back
  297. set_trace(frame)
  298. if os.environ.get(envvar):
  299. platforms.signals[sig] = rdb_handler
  300. def install_HUP_not_supported_handler(worker, sig='SIGHUP'):
  301. def warn_on_HUP_handler(signum, frame):
  302. with in_sighandler():
  303. safe_say('{sig} not supported: Restarting with {sig} is '
  304. 'unstable on this platform!'.format(sig=sig))
  305. platforms.signals[sig] = warn_on_HUP_handler