__init__.py 9.7 KB


  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.worker
  4. ~~~~~~~~~~~~~
  5. :class:`WorkController` can be used to instantiate in-process workers.
  6. The worker consists of several components, all managed by boot-steps
  7. (mod:`celery.abstract`).
  8. :copyright: (c) 2009 - 2012 by Ask Solem.
  9. :license: BSD, see LICENSE for more details.
  10. """
  11. from __future__ import absolute_import
  12. import atexit
  13. import logging
  14. import socket
  15. import sys
  16. import threading
  17. import traceback
  18. from kombu.utils.finalize import Finalize
  19. from .. import abstract
  20. from .. import concurrency as _concurrency
  21. from .. import registry
  22. from ..app import app_or_default
  23. from ..app.abstract import configurated, from_config
  24. from ..exceptions import SystemTerminate
  25. from ..log import SilenceRepeated
  26. from ..utils import noop, qualname
  27. from . import state
  28. from .buckets import TaskBucket, FastQueue
  29. RUN = 0x1
  30. CLOSE = 0x2
  31. TERMINATE = 0x3
  32. class Namespace(abstract.Namespace):
  33. """This is the boot-step namespace of the :class:`WorkController`.
  34. It loads modules from :setting:`CELERYD_BOOT_STEPS`, and its
  35. own set of built-in boot-step modules.
  36. """
  37. name = "worker"
  38. builtin_boot_steps = ("celery.worker.autoscale",
  39. "celery.worker.autoreload",
  40. "celery.worker.consumer",
  41. "celery.worker.mediator")
  42. def modules(self):
  43. return (self.builtin_boot_steps
  44. + self.app.conf.CELERYD_BOOT_STEPS)
  45. class Pool(abstract.StartStopComponent):
  46. """The pool component.
  47. Describes how to initialize the worker pool, and starts and stops
  48. the pool during worker startup/shutdown.
  49. Adds attributes:
  50. * autoscale
  51. * pool
  52. * max_concurrency
  53. * min_concurrency
  54. """
  55. name = "worker.pool"
  56. requires = ("queues", )
  57. def __init__(self, w, autoscale=None, **kwargs):
  58. w.autoscale = autoscale
  59. w.pool = None
  60. w.max_concurrency = None
  61. w.min_concurrency = w.concurrency
  62. if w.autoscale:
  63. w.max_concurrency, w.min_concurrency = w.autoscale
  64. def create(self, w):
  65. pool = w.pool = self.instantiate(w.pool_cls, w.min_concurrency,
  66. logger=w.logger,
  67. initargs=(w.app, w.hostname),
  68. maxtasksperchild=w.max_tasks_per_child,
  69. timeout=w.task_time_limit,
  70. soft_timeout=w.task_soft_time_limit,
  71. putlocks=w.pool_putlocks)
  72. return pool
  73. class Beat(abstract.StartStopComponent):
  74. """Component used to embed a celerybeat process.
  75. This will only be enabled if the ``embed_clockservice``
  76. argument is set.
  77. """
  78. name = "worker.beat"
  79. def __init__(self, w, embed_clockservice=False, **kwargs):
  80. self.enabled = w.embed_clockservice = embed_clockservice
  81. w.beat = None
  82. def create(self, w):
  83. from ..beat import EmbeddedService
  84. b = w.beat = EmbeddedService(app=w.app,
  85. logger=w.logger,
  86. schedule_filename=w.schedule_filename,
  87. scheduler_cls=w.scheduler_cls)
  88. return b
  89. class Queues(abstract.Component):
  90. """This component initializes the internal queues
  91. used by the worker."""
  92. name = "worker.queues"
  93. def create(self, w):
  94. if not w.pool_cls.rlimit_safe:
  95. w.disable_rate_limits = True
  96. if w.disable_rate_limits:
  97. w.ready_queue = FastQueue()
  98. if not w.pool_cls.requires_mediator:
  99. # just send task directly to pool, skip the mediator.
  100. w.ready_queue.put = w.process_task
  101. else:
  102. w.ready_queue = TaskBucket(task_registry=registry.tasks)
  103. class Timers(abstract.Component):
  104. """This component initializes the internal timers used by the worker."""
  105. name = "worker.timers"
  106. requires = ("pool", )
  107. def create(self, w):
  108. w.priority_timer = self.instantiate(w.pool.Timer)
  109. if not w.eta_scheduler_cls:
  110. # Default Timer is set by the pool, as e.g. eventlet
  111. # needs a custom implementation.
  112. w.eta_scheduler_cls = w.pool.Timer
  113. w.scheduler = self.instantiate(w.eta_scheduler_cls,
  114. precision=w.eta_scheduler_precision,
  115. on_error=w.on_timer_error,
  116. on_tick=w.on_timer_tick)
  117. class StateDB(abstract.Component):
  118. """This component sets up the workers state db if enabled."""
  119. name = "worker.state-db"
  120. def __init__(self, w, **kwargs):
  121. self.enabled = w.state_db
  122. w._persistence = None
  123. def create(self, w):
  124. w._persistence = state.Persistent(w.state_db)
  125. atexit.register(w._persistence.save)
  126. class WorkController(configurated):
  127. """Unmanaged worker instance."""
  128. RUN = RUN
  129. CLOSE = CLOSE
  130. TERMINATE = TERMINATE
  131. concurrency = from_config()
  132. loglevel = logging.ERROR
  133. logfile = from_config("log_file")
  134. send_events = from_config()
  135. pool_cls = from_config("pool")
  136. consumer_cls = from_config("consumer")
  137. mediator_cls = from_config("mediator")
  138. eta_scheduler_cls = from_config("eta_scheduler")
  139. eta_scheduler_precision = from_config()
  140. autoscaler_cls = from_config("autoscaler")
  141. autoreloader_cls = from_config("autoreloader")
  142. schedule_filename = from_config()
  143. scheduler_cls = from_config("celerybeat_scheduler")
  144. task_time_limit = from_config()
  145. task_soft_time_limit = from_config()
  146. max_tasks_per_child = from_config()
  147. pool_putlocks = from_config()
  148. prefetch_multiplier = from_config()
  149. state_db = from_config()
  150. disable_rate_limits = from_config()
  151. _state = None
  152. _running = 0
  153. def __init__(self, loglevel=None, hostname=None, logger=None,
  154. ready_callback=noop,
  155. queues=None, app=None, **kwargs):
  156. self.app = app_or_default(app)
  157. self._shutdown_complete = threading.Event()
  158. self.setup_defaults(kwargs, namespace="celeryd")
  159. self.app.select_queues(queues) # select queues subset.
  160. # Options
  161. self.loglevel = loglevel or self.loglevel
  162. self.logger = self.app.log.get_default_logger()
  163. self.hostname = hostname or socket.gethostname()
  164. self.ready_callback = ready_callback
  165. self.timer_debug = SilenceRepeated(self.logger.debug,
  166. max_iterations=10)
  167. self._finalize = Finalize(self, self.stop, exitpriority=1)
  168. self._finalize_db = None
  169. # Initialize boot steps
  170. self.pool_cls = _concurrency.get_implementation(self.pool_cls)
  171. self.components = []
  172. self.namespace = Namespace(app=self.app,
  173. logger=self.logger).apply(self, **kwargs)
  174. def start(self):
  175. """Starts the workers main loop."""
  176. self._state = self.RUN
  177. try:
  178. for i, component in enumerate(self.components):
  179. self.logger.debug("Starting %s...", qualname(component))
  180. self._running = i + 1
  181. component.start()
  182. self.logger.debug("%s OK!", qualname(component))
  183. except SystemTerminate:
  184. self.terminate()
  185. except Exception, exc:
  186. self.logger.error("Unrecoverable error: %r" % (exc, ),
  187. exc_info=sys.exc_info())
  188. self.stop()
  189. except (KeyboardInterrupt, SystemExit):
  190. self.stop()
  191. # Will only get here if running green,
  192. # makes sure all greenthreads have exited.
  193. self._shutdown_complete.wait()
  194. def process_task(self, request):
  195. """Process task by sending it to the pool of workers."""
  196. try:
  197. request.task.execute(request, self.pool,
  198. self.loglevel, self.logfile)
  199. except Exception, exc:
  200. self.logger.critical("Internal error %s: %s\n%s",
  201. exc.__class__, exc, traceback.format_exc(),
  202. exc_info=True)
  203. except SystemTerminate:
  204. self.terminate()
  205. raise
  206. except BaseException, exc:
  207. self.stop()
  208. raise exc
  209. def stop(self, in_sighandler=False):
  210. """Graceful shutdown of the worker server."""
  211. if not in_sighandler or self.pool.signal_safe:
  212. self._shutdown(warm=True)
  213. def terminate(self, in_sighandler=False):
  214. """Not so graceful shutdown of the worker server."""
  215. if not in_sighandler or self.pool.signal_safe:
  216. self._shutdown(warm=False)
  217. def _shutdown(self, warm=True):
  218. what = (warm and "stopping" or "terminating").capitalize()
  219. if self._state in (self.CLOSE, self.TERMINATE):
  220. return
  221. if self._state != self.RUN or self._running != len(self.components):
  222. # Not fully started, can safely exit.
  223. self._state = self.TERMINATE
  224. self._shutdown_complete.set()
  225. return
  226. self._state = self.CLOSE
  227. for component in reversed(self.components):
  228. self.logger.debug("%s %s...", what, qualname(component))
  229. stop = component.stop
  230. if not warm:
  231. stop = getattr(component, "terminate", None) or stop
  232. stop()
  233. self.priority_timer.stop()
  234. self.consumer.close_connection()
  235. self._state = self.TERMINATE
  236. self._shutdown_complete.set()
  237. def on_timer_error(self, einfo):
  238. self.logger.error("Timer error: %r", einfo[1], exc_info=einfo)
  239. def on_timer_tick(self, delay):
  240. self.timer_debug("Scheduler wake-up! Next eta %s secs." % delay)
  241. @property
  242. def state(self):
  243. return state