worker.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.apps.worker
  4. ~~~~~~~~~~~~~~~~~~
  5. This module is the 'program-version' of :mod:`celery.worker`.
  6. It does everything necessary to run that module
  7. as an actual application, like installing signal handlers,
  8. platform tweaks, and so on.
  9. """
  10. from __future__ import absolute_import, print_function, unicode_literals
  11. import logging
  12. import os
  13. import platform as _platform
  14. import sys
  15. from datetime import datetime
  16. from functools import partial
  17. from billiard.process import current_process
  18. from kombu.utils.encoding import safe_str
  19. from celery import VERSION_BANNER, platforms, signals
  20. from celery.app import trace
  21. from celery.exceptions import WorkerShutdown, WorkerTerminate
  22. from celery.five import string, string_t
  23. from celery.loaders.app import AppLoader
  24. from celery.platforms import EX_FAILURE, EX_OK, check_privileges
  25. from celery.utils import cry, isatty
  26. from celery.utils.imports import qualname
  27. from celery.utils.log import get_logger, in_sighandler, set_in_sighandler
  28. from celery.utils.text import pluralize
  29. from celery.worker import WorkController
  30. __all__ = ['Worker']
  31. logger = get_logger(__name__)
  32. is_jython = sys.platform.startswith('java')
  33. is_pypy = hasattr(sys, 'pypy_version_info')
  34. def active_thread_count():
  35. from threading import enumerate
  36. return sum(1 for t in enumerate()
  37. if not t.name.startswith('Dummy-'))
  38. def safe_say(msg):
  39. print('\n{0}'.format(msg), file=sys.__stderr__)
  40. ARTLINES = [
  41. ' --------------',
  42. '---- **** -----',
  43. '--- * *** * --',
  44. '-- * - **** ---',
  45. '- ** ----------',
  46. '- ** ----------',
  47. '- ** ----------',
  48. '- ** ----------',
  49. '- *** --- * ---',
  50. '-- ******* ----',
  51. '--- ***** -----',
  52. ' --------------',
  53. ]
  54. BANNER = """\
  55. {hostname} v{version}
  56. {platform} {timestamp}
  57. [config]
  58. .> app: {app}
  59. .> transport: {conninfo}
  60. .> results: {results}
  61. .> concurrency: {concurrency}
  62. [queues]
  63. {queues}
  64. """
  65. EXTRA_INFO_FMT = """
  66. [tasks]
  67. {tasks}
  68. """
  69. class Worker(WorkController):
  70. def on_before_init(self, **kwargs):
  71. trace.setup_worker_optimizations(self.app, self.hostname)
  72. # this signal can be used to set up configuration for
  73. # workers by name.
  74. signals.celeryd_init.send(
  75. sender=self.hostname, instance=self,
  76. conf=self.app.conf, options=kwargs,
  77. )
  78. check_privileges(self.app.conf.accept_content)
  79. def on_after_init(self, purge=False, no_color=None,
  80. redirect_stdouts=None, redirect_stdouts_level=None,
  81. **kwargs):
  82. self.redirect_stdouts = self.app.either(
  83. 'worker_redirect_stdouts', redirect_stdouts,
  84. )
  85. self.redirect_stdouts_level = self.app.either(
  86. 'worker_redirect_stdouts_level', redirect_stdouts_level,
  87. )
  88. super(Worker, self).setup_defaults(**kwargs)
  89. self.purge = purge
  90. self.no_color = no_color
  91. self._isatty = isatty(sys.stdout)
  92. self.colored = self.app.log.colored(
  93. self.logfile,
  94. enabled=not no_color if no_color is not None else no_color
  95. )
  96. def on_init_blueprint(self):
  97. self._custom_logging = self.setup_logging()
  98. # apply task execution optimizations
  99. # -- This will finalize the app!
  100. trace.setup_worker_optimizations(self.app, self.hostname)
  101. def on_start(self):
  102. app = self.app
  103. if not self._custom_logging and self.redirect_stdouts:
  104. app.log.redirect_stdouts(self.redirect_stdouts_level)
  105. WorkController.on_start(self)
  106. # this signal can be used to e.g. change queues after
  107. # the -Q option has been applied.
  108. signals.celeryd_after_setup.send(
  109. sender=self.hostname, instance=self, conf=app.conf,
  110. )
  111. if self.purge:
  112. self.purge_messages()
  113. # Dump configuration to screen so we have some basic information
  114. # for when users sends bug reports.
  115. print(safe_str(''.join([
  116. string(self.colored.cyan(' \n', self.startup_info())),
  117. string(self.colored.reset(self.extra_info() or '')),
  118. ])), file=sys.__stdout__)
  119. self.set_process_status('-active-')
  120. self.install_platform_tweaks(self)
  121. def on_consumer_ready(self, consumer):
  122. signals.worker_ready.send(sender=consumer)
  123. print('{0} ready.'.format(safe_str(self.hostname),))
  124. def setup_logging(self, colorize=None):
  125. if colorize is None and self.no_color is not None:
  126. colorize = not self.no_color
  127. return self.app.log.setup(
  128. self.loglevel, self.logfile,
  129. redirect_stdouts=False, colorize=colorize, hostname=self.hostname,
  130. )
  131. def purge_messages(self):
  132. with self.app.connection_for_write() as connection:
  133. count = self.app.control.purge(connection=connection)
  134. if count: # pragma: no cover
  135. print('purge: Erased {0} {1} from the queue.\n'.format(
  136. count, pluralize(count, 'message')))
  137. def tasklist(self, include_builtins=True, sep='\n', int_='celery.'):
  138. return sep.join(
  139. ' . {0}'.format(task) for task in sorted(self.app.tasks)
  140. if (not task.startswith(int_) if not include_builtins else task)
  141. )
  142. def extra_info(self):
  143. if self.loglevel <= logging.INFO:
  144. include_builtins = self.loglevel <= logging.DEBUG
  145. tasklist = self.tasklist(include_builtins=include_builtins)
  146. return EXTRA_INFO_FMT.format(tasks=tasklist)
  147. def startup_info(self):
  148. app = self.app
  149. concurrency = string(self.concurrency)
  150. appr = '{0}:{1:#x}'.format(app.main or '__main__', id(app))
  151. if not isinstance(app.loader, AppLoader):
  152. loader = qualname(app.loader)
  153. if loader.startswith('celery.loaders'): # pragma: no cover
  154. loader = loader[14:]
  155. appr += ' ({0})'.format(loader)
  156. if self.autoscale:
  157. max, min = self.autoscale
  158. concurrency = '{{min={0}, max={1}}}'.format(min, max)
  159. pool = self.pool_cls
  160. if not isinstance(pool, string_t):
  161. pool = pool.__module__
  162. concurrency += ' ({0})'.format(pool.split('.')[-1])
  163. events = 'ON'
  164. if not self.send_events:
  165. events = 'OFF (enable -E to monitor this worker)'
  166. banner = BANNER.format(
  167. app=appr,
  168. hostname=safe_str(self.hostname),
  169. timestamp=datetime.now().replace(microsecond=0),
  170. version=VERSION_BANNER,
  171. conninfo=self.app.connection().as_uri(),
  172. results=self.app.backend.as_uri(),
  173. concurrency=concurrency,
  174. platform=safe_str(_platform.platform()),
  175. events=events,
  176. queues=app.amqp.queues.format(indent=0, indent_first=False),
  177. ).splitlines()
  178. # integrate the ASCII art.
  179. for i, x in enumerate(banner):
  180. try:
  181. banner[i] = ' '.join([ARTLINES[i], banner[i]])
  182. except IndexError:
  183. banner[i] = ' ' * 16 + banner[i]
  184. return '\n'.join(banner) + '\n'
  185. def install_platform_tweaks(self, worker):
  186. """Install platform specific tweaks and workarounds."""
  187. if self.app.IS_macOS:
  188. self.macOS_proxy_detection_workaround()
  189. # Install signal handler so SIGHUP restarts the worker.
  190. if not self._isatty:
  191. # only install HUP handler if detached from terminal,
  192. # so closing the terminal window doesn't restart the worker
  193. # into the background.
  194. if self.app.IS_macOS:
  195. # macOS can't exec from a process using threads.
  196. # See https://github.com/celery/celery/issues#issue/152
  197. install_HUP_not_supported_handler(worker)
  198. else:
  199. install_worker_restart_handler(worker)
  200. install_worker_term_handler(worker)
  201. install_worker_term_hard_handler(worker)
  202. install_worker_int_handler(worker)
  203. install_cry_handler()
  204. install_rdb_handler()
  205. def macOS_proxy_detection_workaround(self):
  206. """See https://github.com/celery/celery/issues#issue/161"""
  207. os.environ.setdefault('celery_dummy_proxy', 'set_by_celeryd')
  208. def set_process_status(self, info):
  209. return platforms.set_mp_process_title(
  210. 'celeryd',
  211. info='{0} ({1})'.format(info, platforms.strargv(sys.argv)),
  212. hostname=self.hostname,
  213. )
  214. def _shutdown_handler(worker, sig='TERM', how='Warm',
  215. exc=WorkerShutdown, callback=None, exitcode=EX_OK):
  216. def _handle_request(*args):
  217. with in_sighandler():
  218. from celery.worker import state
  219. if current_process()._name == 'MainProcess':
  220. if callback:
  221. callback(worker)
  222. safe_say('worker: {0} shutdown (MainProcess)'.format(how))
  223. if active_thread_count() > 1:
  224. setattr(state, {'Warm': 'should_stop',
  225. 'Cold': 'should_terminate'}[how], exitcode)
  226. else:
  227. raise exc(exitcode)
  228. _handle_request.__name__ = str('worker_{0}'.format(how))
  229. platforms.signals[sig] = _handle_request
  230. install_worker_term_handler = partial(
  231. _shutdown_handler, sig='SIGTERM', how='Warm', exc=WorkerShutdown,
  232. )
  233. if not is_jython: # pragma: no cover
  234. install_worker_term_hard_handler = partial(
  235. _shutdown_handler, sig='SIGQUIT', how='Cold', exc=WorkerTerminate,
  236. exitcode=EX_FAILURE,
  237. )
  238. else: # pragma: no cover
  239. install_worker_term_handler = \
  240. install_worker_term_hard_handler = lambda *a, **kw: None
  241. def on_SIGINT(worker):
  242. safe_say('worker: Hitting Ctrl+C again will terminate all running tasks!')
  243. install_worker_term_hard_handler(worker, sig='SIGINT')
  244. if not is_jython: # pragma: no cover
  245. install_worker_int_handler = partial(
  246. _shutdown_handler, sig='SIGINT', callback=on_SIGINT,
  247. exitcode=EX_FAILURE,
  248. )
  249. else: # pragma: no cover
  250. def install_worker_int_handler(*args, **kwargs):
  251. pass
  252. def _reload_current_worker():
  253. platforms.close_open_fds([
  254. sys.__stdin__, sys.__stdout__, sys.__stderr__,
  255. ])
  256. os.execv(sys.executable, [sys.executable] + sys.argv)
  257. def install_worker_restart_handler(worker, sig='SIGHUP'):
  258. def restart_worker_sig_handler(*args):
  259. """Signal handler restarting the current python program."""
  260. set_in_sighandler(True)
  261. safe_say('Restarting celery worker ({0})'.format(' '.join(sys.argv)))
  262. import atexit
  263. atexit.register(_reload_current_worker)
  264. from celery.worker import state
  265. state.should_stop = EX_OK
  266. platforms.signals[sig] = restart_worker_sig_handler
  267. def install_cry_handler(sig='SIGUSR1'):
  268. # Jython/PyPy does not have sys._current_frames
  269. if is_jython or is_pypy: # pragma: no cover
  270. return
  271. def cry_handler(*args):
  272. """Signal handler logging the stack-trace of all active threads."""
  273. with in_sighandler():
  274. safe_say(cry())
  275. platforms.signals[sig] = cry_handler
  276. def install_rdb_handler(envvar='CELERY_RDBSIG',
  277. sig='SIGUSR2'): # pragma: no cover
  278. def rdb_handler(*args):
  279. """Signal handler setting a rdb breakpoint at the current frame."""
  280. with in_sighandler():
  281. from celery.contrib.rdb import set_trace, _frame
  282. # gevent does not pass standard signal handler args
  283. frame = args[1] if args else _frame().f_back
  284. set_trace(frame)
  285. if os.environ.get(envvar):
  286. platforms.signals[sig] = rdb_handler
  287. def install_HUP_not_supported_handler(worker, sig='SIGHUP'):
  288. def warn_on_HUP_handler(signum, frame):
  289. with in_sighandler():
  290. safe_say('{sig} not supported: Restarting with {sig} is '
  291. 'unstable on this platform!'.format(sig=sig))
  292. platforms.signals[sig] = warn_on_HUP_handler