worker.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.apps.worker
  4. ~~~~~~~~~~~~~~~~~~
  5. This module is the 'program-version' of :mod:`celery.worker`.
  6. It does everything necessary to run that module
  7. as an actual application, like installing signal handlers,
  8. platform tweaks, and so on.
  9. """
  10. from __future__ import absolute_import
  11. import logging
  12. import os
  13. import socket
  14. import sys
  15. import warnings
  16. from functools import partial
  17. from billiard import cpu_count, current_process
  18. from kombu.utils.encoding import safe_str
  19. from celery import VERSION_BANNER, platforms, signals
  20. from celery.app import app_or_default
  21. from celery.app.abstract import configurated, from_config
  22. from celery.exceptions import ImproperlyConfigured, SystemTerminate
  23. from celery.loaders.app import AppLoader
  24. from celery.task import trace
  25. from celery.utils import cry, isatty, worker_direct
  26. from celery.utils.imports import qualname
  27. from celery.utils.log import get_logger, mlevel, set_in_sighandler
  28. from celery.utils.text import pluralize
  29. from celery.worker import WorkController
  30. try:
  31. from greenlet import GreenletExit
  32. IGNORE_ERRORS = (GreenletExit, )
  33. except ImportError: # pragma: no cover
  34. IGNORE_ERRORS = ()
  35. logger = get_logger(__name__)
  36. is_jython = sys.platform.startswith('java')
  37. is_pypy = hasattr(sys, 'pypy_version_info')
  38. def active_thread_count():
  39. from threading import enumerate
  40. # must use .getName on Python 2.5
  41. return sum(1 for t in enumerate()
  42. if not t.getName().startswith('Dummy-'))
  43. def safe_say(msg):
  44. sys.__stderr__.write('\n%s\n' % msg)
  45. ARTLINES = [
  46. ' --------------',
  47. '---- **** -----',
  48. '--- * *** * --',
  49. '-- * - **** ---',
  50. '- ** ----------',
  51. '- ** ----------',
  52. '- ** ----------',
  53. '- ** ----------',
  54. '- *** --- * ---',
  55. '-- ******* ----',
  56. '--- ***** -----',
  57. ' --------------',
  58. ]
  59. BANNER = """\
  60. celery@%(hostname)s v%(version)s
  61. [Configuration]
  62. . broker: %(conninfo)s
  63. . app: %(app)s
  64. . concurrency: %(concurrency)s
  65. . events: %(events)s
  66. [Queues]
  67. %(queues)s
  68. """
  69. EXTRA_INFO_FMT = """
  70. [Tasks]
  71. %(tasks)s
  72. """
  73. UNKNOWN_QUEUE = """\
  74. Trying to select queue subset of %r, but queue %s is not
  75. defined in the CELERY_QUEUES setting.
  76. If you want to automatically declare unknown queues you can
  77. enable the CELERY_CREATE_MISSING_QUEUES setting.
  78. """
  79. class Worker(configurated):
  80. WorkController = WorkController
  81. app = None
  82. inherit_confopts = (WorkController, )
  83. loglevel = from_config('log_level')
  84. redirect_stdouts = from_config()
  85. redirect_stdouts_level = from_config()
  86. def __init__(self, hostname=None, purge=False, beat=False,
  87. queues=None, include=None, app=None, pidfile=None,
  88. autoscale=None, autoreload=False, no_execv=False,
  89. no_color=None, **kwargs):
  90. self.app = app = app_or_default(app or self.app)
  91. self.hostname = hostname or socket.gethostname()
  92. # this signal can be used to set up configuration for
  93. # workers by name.
  94. signals.celeryd_init.send(sender=self.hostname, instance=self,
  95. conf=self.app.conf)
  96. self.setup_defaults(kwargs, namespace='celeryd')
  97. if not self.concurrency:
  98. try:
  99. self.concurrency = cpu_count()
  100. except NotImplementedError:
  101. self.concurrency = 2
  102. self.purge = purge
  103. self.beat = beat
  104. self.use_queues = [] if queues is None else queues
  105. self.queues = None
  106. self.include = include
  107. self.pidfile = pidfile
  108. self.autoscale = None
  109. self.autoreload = autoreload
  110. self.no_color = no_color
  111. self.no_execv = no_execv
  112. if autoscale:
  113. max_c, _, min_c = autoscale.partition(',')
  114. self.autoscale = [int(max_c), min_c and int(min_c) or 0]
  115. self._isatty = isatty(sys.stdout)
  116. self.colored = app.log.colored(
  117. self.logfile,
  118. enabled=not no_color if no_color is not None else no_color
  119. )
  120. if isinstance(self.use_queues, basestring):
  121. self.use_queues = self.use_queues.split(',')
  122. if self.include:
  123. if isinstance(self.include, basestring):
  124. self.include = self.include.split(',')
  125. app.conf.CELERY_INCLUDE = (
  126. tuple(app.conf.CELERY_INCLUDE) + tuple(self.include))
  127. self.loglevel = mlevel(self.loglevel)
  128. def run(self):
  129. self.init_queues()
  130. self.app.loader.init_worker()
  131. # this signal can be used to e.g. change queues after
  132. # the -Q option has been applied.
  133. signals.celeryd_after_setup.send(sender=self.hostname, instance=self,
  134. conf=self.app.conf)
  135. if getattr(os, 'getuid', None) and os.getuid() == 0:
  136. warnings.warn(RuntimeWarning(
  137. 'Running celeryd with superuser privileges is discouraged!'))
  138. if self.purge:
  139. self.purge_messages()
  140. # Dump configuration to screen so we have some basic information
  141. # for when users sends bug reports.
  142. print(str(self.colored.cyan(' \n', self.startup_info())) +
  143. str(self.colored.reset(self.extra_info() or '')))
  144. self.set_process_status('-active-')
  145. self.setup_logging()
  146. # apply task execution optimizations
  147. trace.setup_worker_optimizations(self.app)
  148. try:
  149. self.run_worker()
  150. except IGNORE_ERRORS:
  151. pass
  152. def on_consumer_ready(self, consumer):
  153. signals.worker_ready.send(sender=consumer)
  154. print('celery@%s ready.' % safe_str(self.hostname))
  155. def init_queues(self):
  156. try:
  157. self.app.select_queues(self.use_queues)
  158. except KeyError, exc:
  159. raise ImproperlyConfigured(UNKNOWN_QUEUE % (self.use_queues, exc))
  160. if self.app.conf.CELERY_WORKER_DIRECT:
  161. self.app.amqp.queues.select_add(worker_direct(self.hostname))
  162. def setup_logging(self, colorize=None):
  163. if colorize is None and self.no_color is not None:
  164. colorize = not self.no_color
  165. self.app.log.setup(self.loglevel, self.logfile,
  166. self.redirect_stdouts, self.redirect_stdouts_level,
  167. colorize=colorize)
  168. def purge_messages(self):
  169. count = self.app.control.purge()
  170. print('purge: Erased %d %s from the queue.\n' % (
  171. count, pluralize(count, 'message')))
  172. def tasklist(self, include_builtins=True):
  173. tasks = self.app.tasks
  174. if not include_builtins:
  175. tasks = filter(lambda s: not s.startswith('celery.'), tasks)
  176. return '\n'.join(' . %s' % task for task in sorted(tasks))
  177. def extra_info(self):
  178. if self.loglevel <= logging.INFO:
  179. include_builtins = self.loglevel <= logging.DEBUG
  180. tasklist = self.tasklist(include_builtins=include_builtins)
  181. return EXTRA_INFO_FMT % {'tasks': tasklist}
  182. def startup_info(self):
  183. app = self.app
  184. concurrency = unicode(self.concurrency)
  185. appr = '%s:0x%x' % (app.main or '__main__', id(app))
  186. if not isinstance(app.loader, AppLoader):
  187. loader = qualname(app.loader)
  188. if loader.startswith('celery.loaders'):
  189. loader = loader[14:]
  190. appr += ' (%s)' % loader
  191. if self.autoscale:
  192. max, min = self.autoscale
  193. concurrency = '{min=%s, max=%s}' % (min, max)
  194. pool = self.pool_cls
  195. if not isinstance(pool, basestring):
  196. pool = pool.__module__
  197. concurrency += ' (%s)' % pool.split('.')[-1]
  198. events = 'ON'
  199. if not self.send_events:
  200. events = 'OFF (enable -E to monitor this worker)'
  201. banner = (BANNER % {
  202. 'app': appr,
  203. 'hostname': self.hostname,
  204. 'version': VERSION_BANNER,
  205. 'conninfo': self.app.connection().as_uri(),
  206. 'concurrency': concurrency,
  207. 'events': events,
  208. 'queues': app.amqp.queues.format(indent=0, indent_first=False),
  209. }).splitlines()
  210. # integrate the ASCII art.
  211. for i, x in enumerate(banner):
  212. try:
  213. banner[i] = ' '.join([ARTLINES[i], banner[i]])
  214. except IndexError:
  215. banner[i] = ' ' * 16 + banner[i]
  216. return '\n'.join(banner) + '\n'
  217. def run_worker(self):
  218. worker = self.WorkController(
  219. app=self.app,
  220. hostname=self.hostname,
  221. ready_callback=self.on_consumer_ready, beat=self.beat,
  222. autoscale=self.autoscale, autoreload=self.autoreload,
  223. no_execv=self.no_execv,
  224. pidfile=self.pidfile,
  225. **self.confopts_as_dict()
  226. )
  227. self.install_platform_tweaks(worker)
  228. signals.worker_init.send(sender=worker)
  229. worker.start()
  230. def install_platform_tweaks(self, worker):
  231. """Install platform specific tweaks and workarounds."""
  232. if self.app.IS_OSX:
  233. self.osx_proxy_detection_workaround()
  234. # Install signal handler so SIGHUP restarts the worker.
  235. if not self._isatty:
  236. # only install HUP handler if detached from terminal,
  237. # so closing the terminal window doesn't restart celeryd
  238. # into the background.
  239. if self.app.IS_OSX:
  240. # OS X can't exec from a process using threads.
  241. # See http://github.com/celery/celery/issues#issue/152
  242. install_HUP_not_supported_handler(worker)
  243. else:
  244. install_worker_restart_handler(worker)
  245. install_worker_term_handler(worker)
  246. install_worker_term_hard_handler(worker)
  247. install_worker_int_handler(worker)
  248. install_cry_handler()
  249. install_rdb_handler()
  250. def osx_proxy_detection_workaround(self):
  251. """See http://github.com/celery/celery/issues#issue/161"""
  252. os.environ.setdefault('celery_dummy_proxy', 'set_by_celeryd')
  253. def set_process_status(self, info):
  254. return platforms.set_mp_process_title(
  255. 'celeryd',
  256. info='%s (%s)' % (info, platforms.strargv(sys.argv)),
  257. hostname=self.hostname,
  258. )
  259. def _shutdown_handler(worker, sig='TERM', how='Warm',
  260. exc=SystemExit, callback=None):
  261. def _handle_request(*args):
  262. set_in_sighandler(True)
  263. try:
  264. from celery.worker import state
  265. if current_process()._name == 'MainProcess':
  266. if callback:
  267. callback(worker)
  268. safe_say('celeryd: %s shutdown (MainProcess)' % how)
  269. if active_thread_count() > 1:
  270. setattr(state, {'Warm': 'should_stop',
  271. 'Cold': 'should_terminate'}[how], True)
  272. else:
  273. raise exc()
  274. finally:
  275. set_in_sighandler(False)
  276. _handle_request.__name__ = 'worker_' + how
  277. platforms.signals[sig] = _handle_request
  278. install_worker_term_handler = partial(
  279. _shutdown_handler, sig='SIGTERM', how='Warm', exc=SystemExit,
  280. )
  281. if not is_jython:
  282. install_worker_term_hard_handler = partial(
  283. _shutdown_handler, sig='SIGQUIT', how='Cold', exc=SystemTerminate,
  284. )
  285. else:
  286. install_worker_term_handler = lambda *a, **kw: None
  287. def on_SIGINT(worker):
  288. safe_say('celeryd: Hitting Ctrl+C again will terminate all running tasks!')
  289. install_worker_term_hard_handler(worker, sig='SIGINT')
  290. if not is_jython:
  291. install_worker_int_handler = partial(
  292. _shutdown_handler, sig='SIGINT', callback=on_SIGINT
  293. )
  294. else:
  295. install_worker_int_handler = lambda *a, **kw: None
  296. def _clone_current_worker():
  297. if os.fork() == 0:
  298. os.execv(sys.executable, [sys.executable] + sys.argv)
  299. def install_worker_restart_handler(worker, sig='SIGHUP'):
  300. def restart_worker_sig_handler(*args):
  301. """Signal handler restarting the current python program."""
  302. set_in_sighandler(True)
  303. safe_say('Restarting celeryd (%s)' % (' '.join(sys.argv), ))
  304. import atexit
  305. atexit.register(_clone_current_worker)
  306. from celery.worker import state
  307. state.should_stop = True
  308. platforms.signals[sig] = restart_worker_sig_handler
  309. def install_cry_handler():
  310. # Jython/PyPy does not have sys._current_frames
  311. if is_jython or is_pypy: # pragma: no cover
  312. return
  313. def cry_handler(*args):
  314. """Signal handler logging the stacktrace of all active threads."""
  315. set_in_sighandler(True)
  316. try:
  317. safe_say(cry())
  318. finally:
  319. set_in_sighandler(False)
  320. platforms.signals['SIGUSR1'] = cry_handler
  321. def install_rdb_handler(envvar='CELERY_RDBSIG',
  322. sig='SIGUSR2'): # pragma: no cover
  323. def rdb_handler(*args):
  324. """Signal handler setting a rdb breakpoint at the current frame."""
  325. set_in_sighandler(True)
  326. try:
  327. _, frame = args
  328. from celery.contrib import rdb
  329. rdb.set_trace(frame)
  330. finally:
  331. set_in_sighandler(False)
  332. if os.environ.get(envvar):
  333. platforms.signals[sig] = rdb_handler
  334. def install_HUP_not_supported_handler(worker, sig='SIGHUP'):
  335. def warn_on_HUP_handler(*args):
  336. set_in_sighandler(True)
  337. try:
  338. safe_say('%(sig)s not supported: Restarting with %(sig)s is '
  339. 'unstable on this platform!' % {'sig': sig})
  340. finally:
  341. set_in_sighandler(False)
  342. platforms.signals[sig] = warn_on_HUP_handler