__init__.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.worker
  4. ~~~~~~~~~~~~~
  5. :class:`WorkController` can be used to instantiate in-process workers.
  6. The worker consists of several components, all managed by boot-steps
  7. (mod:`celery.worker.bootsteps`).
  8. """
  9. from __future__ import absolute_import
  10. import socket
  11. import sys
  12. import traceback
  13. from threading import Event
  14. from billiard import cpu_count
  15. from kombu.syn import detect_environment
  16. from kombu.utils.finalize import Finalize
  17. from celery import concurrency as _concurrency
  18. from celery import platforms
  19. from celery import signals
  20. from celery.app import app_or_default, set_default_app
  21. from celery.app.abstract import configurated, from_config
  22. from celery.exceptions import (
  23. ImproperlyConfigured, SystemTerminate, TaskRevokedError,
  24. )
  25. from celery.task import trace
  26. from celery.utils import worker_direct
  27. from celery.utils.imports import qualname, reload_from_cwd
  28. from celery.utils.log import get_logger, mlevel
  29. from . import bootsteps
  30. from . import state
  31. try:
  32. from greenlet import GreenletExit
  33. IGNORE_ERRORS = (GreenletExit, )
  34. except ImportError: # pragma: no cover
  35. IGNORE_ERRORS = ()
  36. RUN = 0x1
  37. CLOSE = 0x2
  38. TERMINATE = 0x3
  39. UNKNOWN_QUEUE = """\
  40. Trying to select queue subset of {0!r}, but queue {1} is not
  41. defined in the CELERY_QUEUES setting.
  42. If you want to automatically declare unknown queues you can
  43. enable the CELERY_CREATE_MISSING_QUEUES setting.
  44. """
  45. logger = get_logger(__name__)
  46. class Namespace(bootsteps.Namespace):
  47. """This is the boot-step namespace of the :class:`WorkController`.
  48. It loads modules from :setting:`CELERYD_BOOT_STEPS`, and its
  49. own set of built-in boot-step modules.
  50. """
  51. name = 'worker'
  52. builtin_boot_steps = ('celery.worker.components',
  53. 'celery.worker.autoscale',
  54. 'celery.worker.autoreload',
  55. 'celery.worker.consumer',
  56. 'celery.worker.mediator')
  57. def modules(self):
  58. return self.builtin_boot_steps + self.app.conf.CELERYD_BOOT_STEPS
  59. class WorkController(configurated):
  60. """Unmanaged worker instance."""
  61. RUN = RUN
  62. CLOSE = CLOSE
  63. TERMINATE = TERMINATE
  64. app = None
  65. concurrency = from_config()
  66. loglevel = from_config('log_level')
  67. logfile = from_config('log_file')
  68. send_events = from_config()
  69. pool_cls = from_config('pool')
  70. consumer_cls = from_config('consumer')
  71. mediator_cls = from_config('mediator')
  72. timer_cls = from_config('timer')
  73. timer_precision = from_config('timer_precision')
  74. autoscaler_cls = from_config('autoscaler')
  75. autoreloader_cls = from_config('autoreloader')
  76. schedule_filename = from_config()
  77. scheduler_cls = from_config('celerybeat_scheduler')
  78. task_time_limit = from_config()
  79. task_soft_time_limit = from_config()
  80. max_tasks_per_child = from_config()
  81. pool_putlocks = from_config()
  82. force_execv = from_config()
  83. prefetch_multiplier = from_config()
  84. state_db = from_config()
  85. disable_rate_limits = from_config()
  86. worker_lost_wait = from_config()
  87. _state = None
  88. _running = 0
  89. pidlock = None
  90. def __init__(self, app=None, hostname=None, **kwargs):
  91. self.app = app_or_default(app or self.app)
  92. # all new threads start without a current app, so if an app is not
  93. # passed on to the thread it will fall back to the "default app",
  94. # which then could be the wrong app. So for the worker
  95. # we set this to always return our app. This is a hack,
  96. # and means that only a single app can be used for workers
  97. # running in the same process.
  98. set_default_app(self.app)
  99. self.app.finalize()
  100. trace._tasks = self.app._tasks # optimization
  101. self.hostname = hostname or socket.gethostname()
  102. self.on_before_init(**kwargs)
  103. self._finalize = Finalize(self, self.stop, exitpriority=1)
  104. self._shutdown_complete = Event()
  105. self.setup_instance(**self.prepare_args(**kwargs))
  106. def on_before_init(self, **kwargs):
  107. pass
  108. def on_start(self):
  109. pass
  110. def on_consumer_ready(self, consumer):
  111. pass
  112. def setup_instance(self, queues=None, ready_callback=None,
  113. pidfile=None, include=None, **kwargs):
  114. self.pidfile = pidfile
  115. self.app.loader.init_worker()
  116. self.setup_defaults(kwargs, namespace='celeryd')
  117. self.setup_queues(queues)
  118. self.setup_includes(include)
  119. # Set default concurrency
  120. if not self.concurrency:
  121. try:
  122. self.concurrency = cpu_count()
  123. except NotImplementedError:
  124. self.concurrency = 2
  125. # Options
  126. self.loglevel = mlevel(self.loglevel)
  127. self.ready_callback = ready_callback or self.on_consumer_ready
  128. self.use_eventloop = self.should_use_eventloop()
  129. signals.worker_init.send(sender=self)
  130. # Initialize boot steps
  131. self.pool_cls = _concurrency.get_implementation(self.pool_cls)
  132. self.components = []
  133. self.namespace = Namespace(app=self.app).apply(self, **kwargs)
  134. def setup_queues(self, queues):
  135. if isinstance(queues, basestring):
  136. queues = queues.split(',')
  137. self.queues = queues
  138. try:
  139. self.app.select_queues(queues)
  140. except KeyError as exc:
  141. raise ImproperlyConfigured(
  142. UNKNOWN_QUEUE.format(queues, exc))
  143. if self.app.conf.CELERY_WORKER_DIRECT:
  144. self.app.amqp.queues.select_add(worker_direct(self.hostname))
  145. def setup_includes(self, includes):
  146. # Update celery_include to have all known task modules, so that we
  147. # ensure all task modules are imported in case an execv happens.
  148. inc = self.app.conf.CELERY_INCLUDE
  149. if includes:
  150. if isinstance(includes, basestring):
  151. includes = includes.split(',')
  152. inc = self.app.conf.CELERY_INCLUDE = tuple(inc) + tuple(includes)
  153. self.include = includes
  154. task_modules = set(task.__class__.__module__
  155. for task in self.app.tasks.itervalues())
  156. self.app.conf.CELERY_INCLUDE = tuple(set(inc) | task_modules)
  157. def prepare_args(self, **kwargs):
  158. return kwargs
  159. def start(self):
  160. """Starts the workers main loop."""
  161. self.on_start()
  162. self._state = self.RUN
  163. if self.pidfile:
  164. self.pidlock = platforms.create_pidlock(self.pidfile)
  165. try:
  166. for i, component in enumerate(self.components):
  167. logger.debug('Starting %s...', qualname(component))
  168. self._running = i + 1
  169. if component:
  170. component.start()
  171. logger.debug('%s OK!', qualname(component))
  172. except SystemTerminate:
  173. self.terminate()
  174. except Exception as exc:
  175. logger.error('Unrecoverable error: %r', exc,
  176. exc_info=True)
  177. self.stop()
  178. except (KeyboardInterrupt, SystemExit):
  179. self.stop()
  180. try:
  181. # Will only get here if running green,
  182. # makes sure all greenthreads have exited.
  183. self._shutdown_complete.wait()
  184. except IGNORE_ERRORS:
  185. pass
  186. run = start # XXX Compat
  187. def process_task_sem(self, req):
  188. return self._quick_acquire(self.process_task, req)
  189. def process_task(self, req):
  190. """Process task by sending it to the pool of workers."""
  191. try:
  192. req.execute_using_pool(self.pool)
  193. except TaskRevokedError:
  194. try:
  195. self._quick_release() # Issue 877
  196. except AttributeError:
  197. pass
  198. except Exception as exc:
  199. logger.critical('Internal error: %r\n%s',
  200. exc, traceback.format_exc(), exc_info=True)
  201. except SystemTerminate:
  202. self.terminate()
  203. raise
  204. except BaseException as exc:
  205. self.stop()
  206. raise exc
  207. def signal_consumer_close(self):
  208. try:
  209. self.consumer.close()
  210. except AttributeError:
  211. pass
  212. def should_use_eventloop(self):
  213. return (detect_environment() == 'default' and
  214. self.app.connection().is_evented and not self.app.IS_WINDOWS)
  215. def stop(self, in_sighandler=False):
  216. """Graceful shutdown of the worker server."""
  217. self.signal_consumer_close()
  218. if not in_sighandler or self.pool.signal_safe:
  219. self._shutdown(warm=True)
  220. def terminate(self, in_sighandler=False):
  221. """Not so graceful shutdown of the worker server."""
  222. self.signal_consumer_close()
  223. if not in_sighandler or self.pool.signal_safe:
  224. self._shutdown(warm=False)
  225. def _shutdown(self, warm=True):
  226. what = 'Stopping' if warm else 'Terminating'
  227. if self._state in (self.CLOSE, self.TERMINATE):
  228. return
  229. self.app.loader.shutdown_worker()
  230. if self.pool:
  231. self.pool.close()
  232. if self._state != self.RUN or self._running != len(self.components):
  233. # Not fully started, can safely exit.
  234. self._state = self.TERMINATE
  235. self._shutdown_complete.set()
  236. return
  237. self._state = self.CLOSE
  238. for component in reversed(self.components):
  239. logger.debug('%s %s...', what, qualname(component))
  240. if component:
  241. stop = component.stop
  242. if not warm:
  243. stop = getattr(component, 'terminate', None) or stop
  244. stop()
  245. self.timer.stop()
  246. self.consumer.close_connection()
  247. if self.pidlock:
  248. self.pidlock.release()
  249. self._state = self.TERMINATE
  250. self._shutdown_complete.set()
  251. def reload(self, modules=None, reload=False, reloader=None):
  252. modules = self.app.loader.task_modules if modules is None else modules
  253. imp = self.app.loader.import_from_cwd
  254. for module in set(modules or ()):
  255. if module not in sys.modules:
  256. logger.debug('importing module %s', module)
  257. imp(module)
  258. elif reload:
  259. logger.debug('reloading module %s', module)
  260. reload_from_cwd(sys.modules[module], reloader)
  261. self.pool.restart()
  262. @property
  263. def state(self):
  264. return state