__init__.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.worker
  4. ~~~~~~~~~~~~~
  5. :class:`WorkController` can be used to instantiate in-process workers.
  6. The worker consists of several components, all managed by boot-steps
  7. (mod:`celery.abstract`).
  8. :copyright: (c) 2009 - 2012 by Ask Solem.
  9. :license: BSD, see LICENSE for more details.
  10. """
  11. from __future__ import absolute_import
  12. import atexit
  13. import logging
  14. import socket
  15. import sys
  16. import traceback
  17. from billiard import forking_enable
  18. from kombu.utils.finalize import Finalize
  19. from celery import concurrency as _concurrency
  20. from celery.app import app_or_default, set_default_app
  21. from celery.app.abstract import configurated, from_config
  22. from celery.exceptions import SystemTerminate
  23. from celery.utils.functional import noop
  24. from celery.utils.imports import qualname, reload_from_cwd
  25. from celery.utils.log import get_logger
  26. from celery.utils.threads import Event
  27. from . import abstract
  28. from . import state
  29. from .buckets import TaskBucket, FastQueue
  30. RUN = 0x1
  31. CLOSE = 0x2
  32. TERMINATE = 0x3
  33. logger = get_logger(__name__)
  34. class Namespace(abstract.Namespace):
  35. """This is the boot-step namespace of the :class:`WorkController`.
  36. It loads modules from :setting:`CELERYD_BOOT_STEPS`, and its
  37. own set of built-in boot-step modules.
  38. """
  39. name = "worker"
  40. builtin_boot_steps = ("celery.worker.autoscale",
  41. "celery.worker.autoreload",
  42. "celery.worker.consumer",
  43. "celery.worker.mediator")
  44. def modules(self):
  45. return (self.builtin_boot_steps
  46. + self.app.conf.CELERYD_BOOT_STEPS)
  47. class Pool(abstract.StartStopComponent):
  48. """The pool component.
  49. Describes how to initialize the worker pool, and starts and stops
  50. the pool during worker startup/shutdown.
  51. Adds attributes:
  52. * autoscale
  53. * pool
  54. * max_concurrency
  55. * min_concurrency
  56. """
  57. name = "worker.pool"
  58. requires = ("queues", )
  59. def __init__(self, w, autoscale=None, no_execv=False, **kwargs):
  60. w.autoscale = autoscale
  61. w.pool = None
  62. w.max_concurrency = None
  63. w.min_concurrency = w.concurrency
  64. w.no_execv = no_execv
  65. if w.autoscale:
  66. w.max_concurrency, w.min_concurrency = w.autoscale
  67. def create(self, w):
  68. forking_enable(w.no_execv or not w.force_execv)
  69. pool = w.pool = self.instantiate(w.pool_cls, w.min_concurrency,
  70. initargs=(w.app, w.hostname),
  71. maxtasksperchild=w.max_tasks_per_child,
  72. timeout=w.task_time_limit,
  73. soft_timeout=w.task_soft_time_limit,
  74. putlocks=w.pool_putlocks,
  75. lost_worker_timeout=w.worker_lost_wait)
  76. return pool
  77. class Beat(abstract.StartStopComponent):
  78. """Component used to embed a celerybeat process.
  79. This will only be enabled if the ``beat``
  80. argument is set.
  81. """
  82. name = "worker.beat"
  83. def __init__(self, w, beat=False, **kwargs):
  84. self.enabled = w.beat = beat
  85. w.beat = None
  86. def create(self, w):
  87. from celery.beat import EmbeddedService
  88. b = w.beat = EmbeddedService(app=w.app,
  89. schedule_filename=w.schedule_filename,
  90. scheduler_cls=w.scheduler_cls)
  91. return b
  92. class Queues(abstract.Component):
  93. """This component initializes the internal queues
  94. used by the worker."""
  95. name = "worker.queues"
  96. def create(self, w):
  97. if not w.pool_cls.rlimit_safe:
  98. w.disable_rate_limits = True
  99. if w.disable_rate_limits:
  100. w.ready_queue = FastQueue()
  101. if not w.pool_cls.requires_mediator:
  102. # just send task directly to pool, skip the mediator.
  103. w.ready_queue.put = w.process_task
  104. else:
  105. w.ready_queue = TaskBucket(task_registry=w.app.tasks)
  106. class Timers(abstract.Component):
  107. """This component initializes the internal timers used by the worker."""
  108. name = "worker.timers"
  109. requires = ("pool", )
  110. def create(self, w):
  111. w.priority_timer = self.instantiate(w.pool.Timer)
  112. if not w.eta_scheduler_cls:
  113. # Default Timer is set by the pool, as e.g. eventlet
  114. # needs a custom implementation.
  115. w.eta_scheduler_cls = w.pool.Timer
  116. w.scheduler = self.instantiate(w.eta_scheduler_cls,
  117. precision=w.eta_scheduler_precision,
  118. on_error=self.on_timer_error,
  119. on_tick=self.on_timer_tick)
  120. def on_timer_error(self, exc):
  121. logger.error("Timer error: %r", exc, exc_info=True)
  122. def on_timer_tick(self, delay):
  123. logger.debug("Scheduler wake-up! Next eta %s secs.", delay)
  124. class StateDB(abstract.Component):
  125. """This component sets up the workers state db if enabled."""
  126. name = "worker.state-db"
  127. def __init__(self, w, **kwargs):
  128. self.enabled = w.state_db
  129. w._persistence = None
  130. def create(self, w):
  131. w._persistence = state.Persistent(w.state_db)
  132. atexit.register(w._persistence.save)
  133. class WorkController(configurated):
  134. """Unmanaged worker instance."""
  135. RUN = RUN
  136. CLOSE = CLOSE
  137. TERMINATE = TERMINATE
  138. app = None
  139. concurrency = from_config()
  140. loglevel = logging.ERROR
  141. logfile = from_config("log_file")
  142. send_events = from_config()
  143. pool_cls = from_config("pool")
  144. consumer_cls = from_config("consumer")
  145. mediator_cls = from_config("mediator")
  146. eta_scheduler_cls = from_config("eta_scheduler")
  147. eta_scheduler_precision = from_config()
  148. autoscaler_cls = from_config("autoscaler")
  149. autoreloader_cls = from_config("autoreloader")
  150. schedule_filename = from_config()
  151. scheduler_cls = from_config("celerybeat_scheduler")
  152. task_time_limit = from_config()
  153. task_soft_time_limit = from_config()
  154. max_tasks_per_child = from_config()
  155. pool_putlocks = from_config()
  156. force_execv = from_config()
  157. prefetch_multiplier = from_config()
  158. state_db = from_config()
  159. disable_rate_limits = from_config()
  160. worker_lost_wait = from_config()
  161. _state = None
  162. _running = 0
  163. def __init__(self, loglevel=None, hostname=None, ready_callback=noop,
  164. queues=None, app=None, **kwargs):
  165. self.app = app_or_default(app or self.app)
  166. # all new threads start without a current app, so if an app is not
  167. # passed on to the thread it will fall back to the "default app",
  168. # which then could be the wrong app. So for the worker
  169. # we set this to always return our app. This is a hack,
  170. # and means that only a single app can be used for workers
  171. # running in the same process.
  172. set_default_app(self.app)
  173. self._shutdown_complete = Event()
  174. self.setup_defaults(kwargs, namespace="celeryd")
  175. self.app.select_queues(queues) # select queues subset.
  176. # Options
  177. self.loglevel = loglevel or self.loglevel
  178. self.hostname = hostname or socket.gethostname()
  179. self.ready_callback = ready_callback
  180. self._finalize = Finalize(self, self.stop, exitpriority=1)
  181. # Initialize boot steps
  182. self.pool_cls = _concurrency.get_implementation(self.pool_cls)
  183. self.components = []
  184. self.namespace = Namespace(app=self.app).apply(self, **kwargs)
  185. def start(self):
  186. """Starts the workers main loop."""
  187. self._state = self.RUN
  188. try:
  189. for i, component in enumerate(self.components):
  190. logger.debug("Starting %s...", qualname(component))
  191. self._running = i + 1
  192. component.start()
  193. logger.debug("%s OK!", qualname(component))
  194. except SystemTerminate:
  195. self.terminate()
  196. except Exception, exc:
  197. logger.error("Unrecoverable error: %r", exc,
  198. exc_info=True)
  199. self.stop()
  200. except (KeyboardInterrupt, SystemExit):
  201. self.stop()
  202. # Will only get here if running green,
  203. # makes sure all greenthreads have exited.
  204. self._shutdown_complete.wait()
  205. def process_task(self, req):
  206. """Process task by sending it to the pool of workers."""
  207. try:
  208. req.task.execute(req, self.pool, self.loglevel, self.logfile)
  209. except Exception, exc:
  210. logger.critical("Internal error: %r\n%s",
  211. exc, traceback.format_exc(), exc_info=True)
  212. except SystemTerminate:
  213. self.terminate()
  214. raise
  215. except BaseException, exc:
  216. self.stop()
  217. raise exc
  218. def stop(self, in_sighandler=False):
  219. """Graceful shutdown of the worker server."""
  220. if not in_sighandler or self.pool.signal_safe:
  221. self._shutdown(warm=True)
  222. def terminate(self, in_sighandler=False):
  223. """Not so graceful shutdown of the worker server."""
  224. if not in_sighandler or self.pool.signal_safe:
  225. self._shutdown(warm=False)
  226. def _shutdown(self, warm=True):
  227. what = "Stopping" if warm else "Terminating"
  228. if self._state in (self.CLOSE, self.TERMINATE):
  229. return
  230. if self._state != self.RUN or self._running != len(self.components):
  231. # Not fully started, can safely exit.
  232. self._state = self.TERMINATE
  233. self._shutdown_complete.set()
  234. return
  235. self._state = self.CLOSE
  236. for component in reversed(self.components):
  237. logger.debug("%s %s...", what, qualname(component))
  238. stop = component.stop
  239. if not warm:
  240. stop = getattr(component, "terminate", None) or stop
  241. stop()
  242. self.priority_timer.stop()
  243. self.consumer.close_connection()
  244. self._state = self.TERMINATE
  245. self._shutdown_complete.set()
  246. def reload(self, modules=None, reload=False, reloader=None):
  247. modules = self.app.loader.task_modules if modules is None else modules
  248. imp = self.app.loader.import_from_cwd
  249. for module in set(modules or ()):
  250. if module not in sys.modules:
  251. logger.debug("importing module %s", module)
  252. imp(module)
  253. elif reload:
  254. logger.debug("reloading module %s", module)
  255. reload_from_cwd(sys.modules[module], reloader)
  256. self.pool.restart()
  257. @property
  258. def state(self):
  259. return state