worker.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.apps.worker
  4. ~~~~~~~~~~~~~~~~~~
  5. This module is the 'program-version' of :mod:`celery.worker`.
  6. It does everything necessary to run that module
  7. as an actual application, like installing signal handlers,
  8. platform tweaks, and so on.
  9. """
  10. from __future__ import absolute_import
  11. import logging
  12. import os
  13. import socket
  14. import sys
  15. import warnings
  16. from functools import partial
  17. from billiard import cpu_count, current_process
  18. from celery import VERSION_BANNER, platforms, signals
  19. from celery.app import app_or_default
  20. from celery.app.abstract import configurated, from_config
  21. from celery.exceptions import ImproperlyConfigured, SystemTerminate
  22. from celery.loaders.app import AppLoader
  23. from celery.utils import cry, isatty, worker_direct
  24. from celery.utils.imports import qualname
  25. from celery.utils.log import get_logger, mlevel, set_in_sighandler
  26. from celery.utils.text import pluralize
  27. from celery.worker import WorkController
  28. try:
  29. from greenlet import GreenletExit
  30. IGNORE_ERRORS = (GreenletExit, )
  31. except ImportError: # pragma: no cover
  32. IGNORE_ERRORS = ()
  33. logger = get_logger(__name__)
  34. def active_thread_count():
  35. from threading import enumerate
  36. # must use .getName on Python 2.5
  37. return sum(1 for t in enumerate()
  38. if not t.getName().startswith('Dummy-'))
  39. def safe_say(msg):
  40. sys.__stderr__.write('\n%s\n' % msg)
  41. ARTLINES = [
  42. ' --------------',
  43. '---- **** -----',
  44. '--- * *** * --',
  45. '-- * - **** ---',
  46. '- ** ----------',
  47. '- ** ----------',
  48. '- ** ----------',
  49. '- ** ----------',
  50. '- *** --- * ---',
  51. '-- ******* ----',
  52. '--- ***** -----',
  53. ' --------------',
  54. ]
  55. BANNER = """\
  56. celery@%(hostname)s v%(version)s
  57. [Configuration]
  58. . broker: %(conninfo)s
  59. . app: %(app)s
  60. . concurrency: %(concurrency)s
  61. . events: %(events)s
  62. [Queues]
  63. %(queues)s
  64. """
  65. EXTRA_INFO_FMT = """
  66. [Tasks]
  67. %(tasks)s
  68. """
  69. UNKNOWN_QUEUE = """\
  70. Trying to select queue subset of %r, but queue %s is not
  71. defined in the CELERY_QUEUES setting.
  72. If you want to automatically declare unknown queues you can
  73. enable the CELERY_CREATE_MISSING_QUEUES setting.
  74. """
  75. class Worker(configurated):
  76. WorkController = WorkController
  77. app = None
  78. inherit_confopts = (WorkController, )
  79. loglevel = from_config('log_level')
  80. redirect_stdouts = from_config()
  81. redirect_stdouts_level = from_config()
  82. def __init__(self, hostname=None, purge=False, beat=False,
  83. queues=None, include=None, app=None, pidfile=None,
  84. autoscale=None, autoreload=False, no_execv=False, **kwargs):
  85. self.app = app = app_or_default(app or self.app)
  86. self.hostname = hostname or socket.gethostname()
  87. # this signal can be used to set up configuration for
  88. # workers by name.
  89. signals.celeryd_init.send(sender=self.hostname, instance=self,
  90. conf=self.app.conf)
  91. self.setup_defaults(kwargs, namespace='celeryd')
  92. if not self.concurrency:
  93. try:
  94. self.concurrency = cpu_count()
  95. except NotImplementedError:
  96. self.concurrency = 2
  97. self.purge = purge
  98. self.beat = beat
  99. self.use_queues = [] if queues is None else queues
  100. self.queues = None
  101. self.include = include
  102. self.pidfile = pidfile
  103. self.autoscale = None
  104. self.autoreload = autoreload
  105. self.no_execv = no_execv
  106. if autoscale:
  107. max_c, _, min_c = autoscale.partition(',')
  108. self.autoscale = [int(max_c), min_c and int(min_c) or 0]
  109. self._isatty = isatty(sys.stdout)
  110. self.colored = app.log.colored(self.logfile)
  111. if isinstance(self.use_queues, basestring):
  112. self.use_queues = self.use_queues.split(',')
  113. if self.include:
  114. if isinstance(self.include, basestring):
  115. self.include = self.include.split(',')
  116. app.conf.CELERY_INCLUDE = (
  117. tuple(app.conf.CELERY_INCLUDE) + tuple(self.include))
  118. self.loglevel = mlevel(self.loglevel)
  119. def run(self):
  120. self.init_queues()
  121. self.app.loader.init_worker()
  122. # this signal can be used to e.g. change queues after
  123. # the -Q option has been applied.
  124. signals.celeryd_after_setup.send(sender=self.hostname, instance=self,
  125. conf=self.app.conf)
  126. if getattr(os, 'getuid', None) and os.getuid() == 0:
  127. warnings.warn(RuntimeWarning(
  128. 'Running celeryd with superuser privileges is discouraged!'))
  129. if self.purge:
  130. self.purge_messages()
  131. # Dump configuration to screen so we have some basic information
  132. # for when users sends bug reports.
  133. print(str(self.colored.cyan(' \n', self.startup_info())) +
  134. str(self.colored.reset(self.extra_info() or '')))
  135. self.set_process_status('-active-')
  136. self.redirect_stdouts_to_logger()
  137. try:
  138. self.run_worker()
  139. except IGNORE_ERRORS:
  140. pass
  141. def on_consumer_ready(self, consumer):
  142. signals.worker_ready.send(sender=consumer)
  143. print('celery@%s has started.' % self.hostname)
  144. def init_queues(self):
  145. try:
  146. self.app.select_queues(self.use_queues)
  147. except KeyError, exc:
  148. raise ImproperlyConfigured(UNKNOWN_QUEUE % (self.use_queues, exc))
  149. if self.app.conf.CELERY_WORKER_DIRECT:
  150. self.app.amqp.queues.select_add(worker_direct(self.hostname))
  151. def redirect_stdouts_to_logger(self):
  152. self.app.log.setup(self.loglevel, self.logfile,
  153. self.redirect_stdouts, self.redirect_stdouts_level)
  154. def purge_messages(self):
  155. count = self.app.control.purge()
  156. print('purge: Erased %d %s from the queue.\n' % (
  157. count, pluralize(count, 'message')))
  158. def tasklist(self, include_builtins=True):
  159. tasks = self.app.tasks.keys()
  160. if not include_builtins:
  161. tasks = filter(lambda s: not s.startswith('celery.'), tasks)
  162. return '\n'.join(' . %s' % task for task in sorted(tasks))
  163. def extra_info(self):
  164. if self.loglevel <= logging.INFO:
  165. include_builtins = self.loglevel <= logging.DEBUG
  166. tasklist = self.tasklist(include_builtins=include_builtins)
  167. return EXTRA_INFO_FMT % {'tasks': tasklist}
  168. def startup_info(self):
  169. app = self.app
  170. concurrency = unicode(self.concurrency)
  171. appr = '%s:0x%x' % (app.main or '__main__', id(app))
  172. if not isinstance(app.loader, AppLoader):
  173. loader = qualname(app.loader)
  174. if loader.startswith('celery.loaders'):
  175. loader = loader[14:]
  176. appr += ' (%s)' % loader
  177. if self.autoscale:
  178. max, min = self.autoscale
  179. concurrency = '{min=%s, max=%s}' % (min, max)
  180. pool = self.pool_cls
  181. if not isinstance(pool, basestring):
  182. pool = pool.__module__
  183. concurrency += ' (%s)' % pool.split('.')[-1]
  184. events = 'ON'
  185. if not self.send_events:
  186. events = 'OFF (enable -E to monitor this worker)'
  187. banner = (BANNER % {
  188. 'app': appr,
  189. 'hostname': self.hostname,
  190. 'version': VERSION_BANNER,
  191. 'conninfo': self.app.connection().as_uri(),
  192. 'concurrency': concurrency,
  193. 'events': events,
  194. 'queues': app.amqp.queues.format(indent=0, indent_first=False),
  195. }).splitlines()
  196. # integrate the ASCII art.
  197. for i, x in enumerate(banner):
  198. try:
  199. banner[i] = ' '.join([ARTLINES[i], banner[i]])
  200. except IndexError:
  201. banner[i] = ' ' * 16 + banner[i]
  202. return '\n'.join(banner) + '\n'
  203. def run_worker(self):
  204. worker = self.WorkController(app=self.app,
  205. hostname=self.hostname,
  206. ready_callback=self.on_consumer_ready, beat=self.beat,
  207. autoscale=self.autoscale, autoreload=self.autoreload,
  208. no_execv=self.no_execv,
  209. pidfile=self.pidfile,
  210. **self.confopts_as_dict())
  211. self.install_platform_tweaks(worker)
  212. signals.worker_init.send(sender=worker)
  213. worker.start()
  214. def install_platform_tweaks(self, worker):
  215. """Install platform specific tweaks and workarounds."""
  216. if self.app.IS_OSX:
  217. self.osx_proxy_detection_workaround()
  218. # Install signal handler so SIGHUP restarts the worker.
  219. if not self._isatty:
  220. # only install HUP handler if detached from terminal,
  221. # so closing the terminal window doesn't restart celeryd
  222. # into the background.
  223. if self.app.IS_OSX:
  224. # OS X can't exec from a process using threads.
  225. # See http://github.com/celery/celery/issues#issue/152
  226. install_HUP_not_supported_handler(worker)
  227. else:
  228. install_worker_restart_handler(worker)
  229. install_worker_term_handler(worker)
  230. install_worker_term_hard_handler(worker)
  231. install_worker_int_handler(worker)
  232. install_cry_handler()
  233. install_rdb_handler()
  234. def osx_proxy_detection_workaround(self):
  235. """See http://github.com/celery/celery/issues#issue/161"""
  236. os.environ.setdefault('celery_dummy_proxy', 'set_by_celeryd')
  237. def set_process_status(self, info):
  238. return platforms.set_mp_process_title('celeryd',
  239. info='%s (%s)' % (info, platforms.strargv(sys.argv)),
  240. hostname=self.hostname)
  241. def _shutdown_handler(worker, sig='TERM', how='Warm', exc=SystemExit,
  242. callback=None):
  243. def _handle_request(signum, frame):
  244. set_in_sighandler(True)
  245. try:
  246. from celery.worker import state
  247. if current_process()._name == 'MainProcess':
  248. if callback:
  249. callback(worker)
  250. safe_say('celeryd: %s shutdown (MainProcess)' % how)
  251. if active_thread_count() > 1:
  252. setattr(state, {'Warm': 'should_stop',
  253. 'Cold': 'should_terminate'}[how], True)
  254. else:
  255. raise exc()
  256. finally:
  257. set_in_sighandler(False)
  258. _handle_request.__name__ = 'worker_' + how
  259. platforms.signals[sig] = _handle_request
  260. install_worker_term_handler = partial(
  261. _shutdown_handler, sig='SIGTERM', how='Warm', exc=SystemExit,
  262. )
  263. install_worker_term_hard_handler = partial(
  264. _shutdown_handler, sig='SIGQUIT', how='Cold', exc=SystemTerminate,
  265. )
  266. def on_SIGINT(worker):
  267. safe_say('celeryd: Hitting Ctrl+C again will terminate all running tasks!')
  268. install_worker_term_hard_handler(worker, sig='SIGINT')
  269. install_worker_int_handler = partial(
  270. _shutdown_handler, sig='SIGINT', callback=on_SIGINT
  271. )
  272. def install_worker_restart_handler(worker, sig='SIGHUP'):
  273. def restart_worker_sig_handler(signum, frame):
  274. """Signal handler restarting the current python program."""
  275. set_in_sighandler(True)
  276. safe_say('Restarting celeryd (%s)' % (' '.join(sys.argv), ))
  277. pid = os.fork()
  278. if pid == 0:
  279. os.execv(sys.executable, [sys.executable] + sys.argv)
  280. from celery.worker import state
  281. state.should_stop = True
  282. platforms.signals[sig] = restart_worker_sig_handler
  283. def install_cry_handler():
  284. # Jython/PyPy does not have sys._current_frames
  285. is_jython = sys.platform.startswith('java')
  286. is_pypy = hasattr(sys, 'pypy_version_info')
  287. if is_jython or is_pypy: # pragma: no cover
  288. return
  289. def cry_handler(signum, frame):
  290. """Signal handler logging the stacktrace of all active threads."""
  291. set_in_sighandler(True)
  292. try:
  293. safe_say(cry())
  294. finally:
  295. set_in_sighandler(False)
  296. platforms.signals['SIGUSR1'] = cry_handler
  297. def install_rdb_handler(envvar='CELERY_RDBSIG',
  298. sig='SIGUSR2'): # pragma: no cover
  299. def rdb_handler(signum, frame):
  300. """Signal handler setting a rdb breakpoint at the current frame."""
  301. set_in_sighandler(True)
  302. try:
  303. from celery.contrib import rdb
  304. rdb.set_trace(frame)
  305. finally:
  306. set_in_sighandler(False)
  307. if os.environ.get(envvar):
  308. platforms.signals[sig] = rdb_handler
  309. def install_HUP_not_supported_handler(worker, sig='SIGHUP'):
  310. def warn_on_HUP_handler(signum, frame):
  311. set_in_sighandler(True)
  312. try:
  313. safe_say('%(sig)s not supported: Restarting with %(sig)s is '
  314. 'unstable on this platform!' % {'sig': sig})
  315. finally:
  316. set_in_sighandler(False)
  317. platforms.signals[sig] = warn_on_HUP_handler