__init__.py 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.worker
  4. ~~~~~~~~~~~~~
  5. :class:`WorkController` can be used to instantiate in-process workers.
  6. The worker consists of several components, all managed by boot-steps
  7. (mod:`celery.worker.bootsteps`).
  8. """
  9. from __future__ import absolute_import
  10. import socket
  11. import sys
  12. import traceback
  13. from billiard import cpu_count
  14. from kombu.syn import detect_environment
  15. from kombu.utils.finalize import Finalize
  16. from celery import concurrency as _concurrency
  17. from celery import platforms
  18. from celery import signals
  19. from celery.app import app_or_default
  20. from celery.app.abstract import configurated, from_config
  21. from celery.exceptions import (
  22. ImproperlyConfigured, SystemTerminate, TaskRevokedError,
  23. )
  24. from celery.utils import worker_direct
  25. from celery.utils.imports import reload_from_cwd
  26. from celery.utils.log import mlevel, worker_logger as logger
  27. from . import bootsteps
  28. from . import state
  29. UNKNOWN_QUEUE = """\
  30. Trying to select queue subset of {0!r}, but queue {1} is not
  31. defined in the CELERY_QUEUES setting.
  32. If you want to automatically declare unknown queues you can
  33. enable the CELERY_CREATE_MISSING_QUEUES setting.
  34. """
  35. class Namespace(bootsteps.Namespace):
  36. """This is the boot-step namespace of the :class:`WorkController`.
  37. It loads modules from :setting:`CELERYD_BOOT_STEPS`, and its
  38. own set of built-in boot-step modules.
  39. """
  40. name = 'worker'
  41. builtin_boot_steps = ('celery.worker.components',
  42. 'celery.worker.autoscale',
  43. 'celery.worker.autoreload',
  44. 'celery.worker.consumer',
  45. 'celery.worker.mediator')
  46. def modules(self):
  47. return self.builtin_boot_steps + self.app.conf.CELERYD_BOOT_STEPS
  48. class WorkController(configurated):
  49. """Unmanaged worker instance."""
  50. app = None
  51. concurrency = from_config()
  52. loglevel = from_config('log_level')
  53. logfile = from_config('log_file')
  54. send_events = from_config()
  55. pool_cls = from_config('pool')
  56. consumer_cls = from_config('consumer')
  57. mediator_cls = from_config('mediator')
  58. timer_cls = from_config('timer')
  59. timer_precision = from_config('timer_precision')
  60. autoscaler_cls = from_config('autoscaler')
  61. autoreloader_cls = from_config('autoreloader')
  62. schedule_filename = from_config()
  63. scheduler_cls = from_config('celerybeat_scheduler')
  64. task_time_limit = from_config()
  65. task_soft_time_limit = from_config()
  66. max_tasks_per_child = from_config()
  67. pool_putlocks = from_config()
  68. pool_restarts = from_config()
  69. force_execv = from_config()
  70. prefetch_multiplier = from_config()
  71. state_db = from_config()
  72. disable_rate_limits = from_config()
  73. worker_lost_wait = from_config()
  74. pidlock = None
  75. def __init__(self, app=None, hostname=None, **kwargs):
  76. self.app = app_or_default(app or self.app)
  77. self.hostname = hostname or socket.gethostname()
  78. self.app.loader.init_worker()
  79. self.on_before_init(**kwargs)
  80. self._finalize = Finalize(self, self.stop, exitpriority=1)
  81. self.setup_instance(**self.prepare_args(**kwargs))
  82. def setup_instance(self, queues=None, ready_callback=None,
  83. pidfile=None, include=None, **kwargs):
  84. self.pidfile = pidfile
  85. self.setup_defaults(kwargs, namespace='celeryd')
  86. self.setup_queues(queues)
  87. self.setup_includes(include)
  88. # Set default concurrency
  89. if not self.concurrency:
  90. try:
  91. self.concurrency = cpu_count()
  92. except NotImplementedError:
  93. self.concurrency = 2
  94. # Options
  95. self.loglevel = mlevel(self.loglevel)
  96. self.ready_callback = ready_callback or self.on_consumer_ready
  97. self.use_eventloop = self.should_use_eventloop()
  98. signals.worker_init.send(sender=self)
  99. # Initialize boot steps
  100. self.pool_cls = _concurrency.get_implementation(self.pool_cls)
  101. self.components = []
  102. self.on_init_namespace()
  103. self.namespace = Namespace(app=self.app,
  104. on_start=self.on_start,
  105. on_close=self.on_close,
  106. on_stopped=self.on_stopped)
  107. self.namespace.apply(self, **kwargs)
  108. def on_init_namespace(self):
  109. pass
  110. def on_before_init(self, **kwargs):
  111. pass
  112. def on_start(self):
  113. if self.pidfile:
  114. self.pidlock = platforms.create_pidlock(self.pidfile)
  115. def on_consumer_ready(self, consumer):
  116. pass
  117. def on_close(self):
  118. self.app.loader.shutdown_worker()
  119. def on_stopped(self):
  120. self.timer.stop()
  121. self.consumer.shutdown()
  122. if self.pidlock:
  123. self.pidlock.release()
  124. def setup_queues(self, queues):
  125. if isinstance(queues, basestring):
  126. queues = queues.split(',')
  127. self.queues = queues
  128. try:
  129. self.app.select_queues(queues)
  130. except KeyError as exc:
  131. raise ImproperlyConfigured(
  132. UNKNOWN_QUEUE.format(queues, exc))
  133. if self.app.conf.CELERY_WORKER_DIRECT:
  134. self.app.amqp.queues.select_add(worker_direct(self.hostname))
  135. def setup_includes(self, includes):
  136. # Update celery_include to have all known task modules, so that we
  137. # ensure all task modules are imported in case an execv happens.
  138. inc = self.app.conf.CELERY_INCLUDE
  139. if includes:
  140. if isinstance(includes, basestring):
  141. includes = includes.split(',')
  142. inc = self.app.conf.CELERY_INCLUDE = tuple(inc) + tuple(includes)
  143. self.include = includes
  144. task_modules = set(task.__class__.__module__
  145. for task in self.app.tasks.itervalues())
  146. self.app.conf.CELERY_INCLUDE = tuple(set(inc) | task_modules)
  147. def prepare_args(self, **kwargs):
  148. return kwargs
  149. def start(self):
  150. """Starts the workers main loop."""
  151. try:
  152. self.namespace.start(self)
  153. except SystemTerminate:
  154. self.terminate()
  155. except Exception as exc:
  156. logger.error('Unrecoverable error: %r', exc, exc_info=True)
  157. self.stop()
  158. except (KeyboardInterrupt, SystemExit):
  159. self.stop()
  160. def process_task_sem(self, req):
  161. return self._quick_acquire(self.process_task, req)
  162. def process_task(self, req):
  163. """Process task by sending it to the pool of workers."""
  164. try:
  165. req.execute_using_pool(self.pool)
  166. except TaskRevokedError:
  167. try:
  168. self._quick_release() # Issue 877
  169. except AttributeError:
  170. pass
  171. except Exception as exc:
  172. logger.critical('Internal error: %r\n%s',
  173. exc, traceback.format_exc(), exc_info=True)
  174. except SystemTerminate:
  175. self.terminate()
  176. raise
  177. except BaseException as exc:
  178. self.stop()
  179. raise exc
  180. def signal_consumer_close(self):
  181. try:
  182. self.consumer.close()
  183. except AttributeError:
  184. pass
  185. def should_use_eventloop(self):
  186. return (detect_environment() == 'default' and
  187. self.app.connection().is_evented and not self.app.IS_WINDOWS)
  188. def stop(self, in_sighandler=False):
  189. """Graceful shutdown of the worker server."""
  190. self.signal_consumer_close()
  191. if not in_sighandler or self.pool.signal_safe:
  192. self._shutdown(warm=True)
  193. def terminate(self, in_sighandler=False):
  194. """Not so graceful shutdown of the worker server."""
  195. self.signal_consumer_close()
  196. if not in_sighandler or self.pool.signal_safe:
  197. self._shutdown(warm=False)
  198. def _shutdown(self, warm=True):
  199. self.namespace.stop(self, terminate=not warm)
  200. self.namespace.join()
  201. def reload(self, modules=None, reload=False, reloader=None):
  202. modules = self.app.loader.task_modules if modules is None else modules
  203. imp = self.app.loader.import_from_cwd
  204. for module in set(modules or ()):
  205. if module not in sys.modules:
  206. logger.debug('importing module %s', module)
  207. imp(module)
  208. elif reload:
  209. logger.debug('reloading module %s', module)
  210. reload_from_cwd(sys.modules[module], reloader)
  211. self.pool.restart()
  212. @property
  213. def _state(self):
  214. return self.namespace.state
  215. @property
  216. def state(self):
  217. return state