__init__.py 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.worker
  4. ~~~~~~~~~~~~~
  5. :class:`WorkController` can be used to instantiate in-process workers.
  6. The worker consists of several components, all managed by bootsteps
  7. (mod:`celery.bootsteps`).
  8. """
  9. from __future__ import absolute_import
  10. import os
  11. import socket
  12. import sys
  13. import traceback
  14. from billiard import cpu_count
  15. from billiard.util import Finalize
  16. from kombu.syn import detect_environment
  17. from celery import bootsteps
  18. from celery import concurrency as _concurrency
  19. from celery import platforms
  20. from celery import signals
  21. from celery.app import app_or_default
  22. from celery.app.abstract import configurated, from_config
  23. from celery.exceptions import (
  24. ImproperlyConfigured, SystemTerminate, TaskRevokedError,
  25. )
  26. from celery.five import string_t, values
  27. from celery.utils import nodename, nodesplit, worker_direct
  28. from celery.utils.imports import reload_from_cwd
  29. from celery.utils.log import mlevel, worker_logger as logger
  30. from . import state
  31. UNKNOWN_QUEUE = """\
  32. Trying to select queue subset of {0!r}, but queue {1} is not
  33. defined in the CELERY_QUEUES setting.
  34. If you want to automatically declare unknown queues you can
  35. enable the CELERY_CREATE_MISSING_QUEUES setting.
  36. """
  37. def default_nodename(hostname):
  38. name, host = nodesplit(hostname or '')
  39. return nodename(name or 'celery', host or socket.gethostname())
  40. class WorkController(configurated):
  41. """Unmanaged worker instance."""
  42. app = None
  43. concurrency = from_config()
  44. loglevel = from_config('log_level')
  45. logfile = from_config('log_file')
  46. send_events = from_config()
  47. pool_cls = from_config('pool')
  48. consumer_cls = from_config('consumer')
  49. mediator_cls = from_config('mediator')
  50. timer_cls = from_config('timer')
  51. timer_precision = from_config('timer_precision')
  52. autoscaler_cls = from_config('autoscaler')
  53. autoreloader_cls = from_config('autoreloader')
  54. schedule_filename = from_config()
  55. scheduler_cls = from_config('celerybeat_scheduler')
  56. task_time_limit = from_config()
  57. task_soft_time_limit = from_config()
  58. max_tasks_per_child = from_config()
  59. pool_putlocks = from_config()
  60. pool_restarts = from_config()
  61. force_execv = from_config()
  62. prefetch_multiplier = from_config()
  63. state_db = from_config()
  64. disable_rate_limits = from_config()
  65. worker_lost_wait = from_config()
  66. pidlock = None
  67. class Namespace(bootsteps.Namespace):
  68. """This is the bootstep namespace for the
  69. :class:`WorkController` class.
  70. It loads modules from :setting:`CELERYD_BOOTSTEPS`, and its
  71. own set of built-in bootsteps.
  72. """
  73. name = 'Worker'
  74. default_steps = set([
  75. 'celery.worker.components:Hub',
  76. 'celery.worker.components:Queues',
  77. 'celery.worker.components:Pool',
  78. 'celery.worker.components:Beat',
  79. 'celery.worker.components:Timer',
  80. 'celery.worker.components:StateDB',
  81. 'celery.worker.components:Consumer',
  82. 'celery.worker.autoscale:WorkerComponent',
  83. 'celery.worker.autoreload:WorkerComponent',
  84. 'celery.worker.mediator:WorkerComponent',
  85. ])
  86. def __init__(self, app=None, hostname=None, **kwargs):
  87. self.app = app_or_default(app or self.app)
  88. self.hostname = default_nodename(hostname)
  89. self.app.loader.init_worker()
  90. self.on_before_init(**kwargs)
  91. self._finalize = Finalize(self, self.stop, exitpriority=1)
  92. self.setup_instance(**self.prepare_args(**kwargs))
  93. def setup_instance(self, queues=None, ready_callback=None,
  94. pidfile=None, include=None, **kwargs):
  95. self.pidfile = pidfile
  96. self.setup_defaults(kwargs, namespace='celeryd')
  97. self.setup_queues(queues)
  98. self.setup_includes(include)
  99. # Set default concurrency
  100. if not self.concurrency:
  101. try:
  102. self.concurrency = cpu_count()
  103. except NotImplementedError:
  104. self.concurrency = 2
  105. # Options
  106. self.loglevel = mlevel(self.loglevel)
  107. self.ready_callback = ready_callback or self.on_consumer_ready
  108. self.use_eventloop = self.should_use_eventloop()
  109. self.options = kwargs
  110. signals.worker_init.send(sender=self)
  111. # Initialize bootsteps
  112. self.pool_cls = _concurrency.get_implementation(self.pool_cls)
  113. self.steps = []
  114. self.on_init_namespace()
  115. self.namespace = self.Namespace(app=self.app,
  116. on_start=self.on_start,
  117. on_close=self.on_close,
  118. on_stopped=self.on_stopped)
  119. self.namespace.apply(self, **kwargs)
  120. def on_init_namespace(self):
  121. pass
  122. def on_before_init(self, **kwargs):
  123. pass
  124. def on_start(self):
  125. if self.pidfile:
  126. self.pidlock = platforms.create_pidlock(self.pidfile)
  127. def on_consumer_ready(self, consumer):
  128. pass
  129. def on_close(self):
  130. self.app.loader.shutdown_worker()
  131. def on_stopped(self):
  132. self.timer.stop()
  133. self.consumer.shutdown()
  134. if self.pidlock:
  135. self.pidlock.release()
  136. def setup_queues(self, queues):
  137. if isinstance(queues, string_t):
  138. queues = queues.split(',')
  139. self.queues = queues
  140. try:
  141. self.app.select_queues(queues)
  142. except KeyError as exc:
  143. raise ImproperlyConfigured(
  144. UNKNOWN_QUEUE.format(queues, exc))
  145. if self.app.conf.CELERY_WORKER_DIRECT:
  146. self.app.amqp.queues.select_add(worker_direct(self.hostname))
  147. def setup_includes(self, includes):
  148. # Update celery_include to have all known task modules, so that we
  149. # ensure all task modules are imported in case an execv happens.
  150. inc = self.app.conf.CELERY_INCLUDE
  151. if includes:
  152. if isinstance(includes, string_t):
  153. includes = includes.split(',')
  154. inc = self.app.conf.CELERY_INCLUDE = tuple(inc) + tuple(includes)
  155. self.include = includes
  156. task_modules = set(task.__class__.__module__
  157. for task in values(self.app.tasks))
  158. self.app.conf.CELERY_INCLUDE = tuple(set(inc) | task_modules)
  159. def prepare_args(self, **kwargs):
  160. return kwargs
  161. def start(self):
  162. """Starts the workers main loop."""
  163. try:
  164. self.namespace.start(self)
  165. except SystemTerminate:
  166. self.terminate()
  167. except Exception as exc:
  168. logger.error('Unrecoverable error: %r', exc, exc_info=True)
  169. self.stop()
  170. except (KeyboardInterrupt, SystemExit):
  171. self.stop()
  172. def process_task_sem(self, req):
  173. return self._quick_acquire(self.process_task, req)
  174. def process_task(self, req):
  175. """Process task by sending it to the pool of workers."""
  176. try:
  177. req.execute_using_pool(self.pool)
  178. except TaskRevokedError:
  179. try:
  180. self._quick_release() # Issue 877
  181. except AttributeError:
  182. pass
  183. except Exception as exc:
  184. logger.critical('Internal error: %r\n%s',
  185. exc, traceback.format_exc(), exc_info=True)
  186. except SystemTerminate:
  187. self.terminate()
  188. raise
  189. except BaseException as exc:
  190. self.stop()
  191. raise exc
  192. def signal_consumer_close(self):
  193. try:
  194. self.consumer.close()
  195. except AttributeError:
  196. pass
  197. def should_use_eventloop(self):
  198. return (detect_environment() == 'default' and
  199. self.app.connection().is_evented and not self.app.IS_WINDOWS)
  200. def stop(self, in_sighandler=False):
  201. """Graceful shutdown of the worker server."""
  202. self.signal_consumer_close()
  203. if not in_sighandler or self.pool.signal_safe:
  204. self._shutdown(warm=True)
  205. def terminate(self, in_sighandler=False):
  206. """Not so graceful shutdown of the worker server."""
  207. self.signal_consumer_close()
  208. if not in_sighandler or self.pool.signal_safe:
  209. self._shutdown(warm=False)
  210. def _shutdown(self, warm=True):
  211. self.namespace.stop(self, terminate=not warm)
  212. self.namespace.join()
  213. def reload(self, modules=None, reload=False, reloader=None):
  214. modules = self.app.loader.task_modules if modules is None else modules
  215. imp = self.app.loader.import_from_cwd
  216. for module in set(modules or ()):
  217. if module not in sys.modules:
  218. logger.debug('importing module %s', module)
  219. imp(module)
  220. elif reload:
  221. logger.debug('reloading module %s', module)
  222. reload_from_cwd(sys.modules[module], reloader)
  223. self.pool.restart()
  224. def info(self):
  225. return {'total': self.state.total_count,
  226. 'pid': os.getpid(),
  227. 'clock': str(self.app.clock)}
  228. def stats(self):
  229. info = self.info()
  230. info.update(self.namespace.info(self))
  231. info.update(self.consumer.namespace.info(self.consumer))
  232. return info
  233. @property
  234. def _state(self):
  235. return self.namespace.state
  236. @property
  237. def state(self):
  238. return state