__init__.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.worker
  4. ~~~~~~~~~~~~~
  5. :class:`WorkController` can be used to instantiate in-process workers.
  6. The worker consists of several components, all managed by boot-steps
  7. (mod:`celery.abstract`).
  8. :copyright: (c) 2009 - 2012 by Ask Solem.
  9. :license: BSD, see LICENSE for more details.
  10. """
  11. from __future__ import absolute_import
  12. import atexit
  13. import logging
  14. import socket
  15. import sys
  16. import traceback
  17. from billiard import forking_enable
  18. from kombu.syn import detect_environment
  19. from kombu.utils.finalize import Finalize
  20. from celery import concurrency as _concurrency
  21. from celery import platforms
  22. from celery.app import app_or_default, set_default_app
  23. from celery.app.abstract import configurated, from_config
  24. from celery.exceptions import SystemTerminate
  25. from celery.utils.functional import noop
  26. from celery.utils.imports import qualname, reload_from_cwd
  27. from celery.utils.log import get_logger
  28. from celery.utils.threads import Event
  29. from celery.utils.timer2 import Schedule
  30. from . import abstract
  31. from . import state
  32. from .buckets import TaskBucket, FastQueue
  33. from .hub import BoundedSemaphore
  34. RUN = 0x1
  35. CLOSE = 0x2
  36. TERMINATE = 0x3
  37. logger = get_logger(__name__)
  38. class Namespace(abstract.Namespace):
  39. """This is the boot-step namespace of the :class:`WorkController`.
  40. It loads modules from :setting:`CELERYD_BOOT_STEPS`, and its
  41. own set of built-in boot-step modules.
  42. """
  43. name = "worker"
  44. builtin_boot_steps = ("celery.worker.autoscale",
  45. "celery.worker.autoreload",
  46. "celery.worker.consumer",
  47. "celery.worker.mediator")
  48. def modules(self):
  49. return (self.builtin_boot_steps
  50. + self.app.conf.CELERYD_BOOT_STEPS)
  51. class Pool(abstract.StartStopComponent):
  52. """The pool component.
  53. Describes how to initialize the worker pool, and starts and stops
  54. the pool during worker startup/shutdown.
  55. Adds attributes:
  56. * autoscale
  57. * pool
  58. * max_concurrency
  59. * min_concurrency
  60. """
  61. name = "worker.pool"
  62. requires = ("queues", )
  63. def __init__(self, w, autoscale=None, no_execv=False, **kwargs):
  64. w.autoscale = autoscale
  65. w.pool = None
  66. w.max_concurrency = None
  67. w.min_concurrency = w.concurrency
  68. w.no_execv = no_execv
  69. if w.autoscale:
  70. w.max_concurrency, w.min_concurrency = w.autoscale
  71. def create(self, w, semaphore=None, max_restarts=None):
  72. threaded = not w.use_eventloop
  73. forking_enable(not threaded or (w.no_execv or not w.force_execv))
  74. procs = w.min_concurrency
  75. if not threaded:
  76. semaphore = w.semaphore = BoundedSemaphore(procs)
  77. max_restarts = 100
  78. pool = w.pool = self.instantiate(w.pool_cls, w.min_concurrency,
  79. initargs=(w.app, w.hostname),
  80. maxtasksperchild=w.max_tasks_per_child,
  81. timeout=w.task_time_limit,
  82. soft_timeout=w.task_soft_time_limit,
  83. putlocks=w.pool_putlocks and threaded,
  84. lost_worker_timeout=w.worker_lost_wait,
  85. with_task_thread=threaded,
  86. with_result_thread=threaded,
  87. with_supervisor_thread=threaded,
  88. max_restarts=max_restarts,
  89. semaphore=semaphore)
  90. return pool
  91. class Beat(abstract.StartStopComponent):
  92. """Component used to embed a celerybeat process.
  93. This will only be enabled if the ``beat``
  94. argument is set.
  95. """
  96. name = "worker.beat"
  97. def __init__(self, w, beat=False, **kwargs):
  98. self.enabled = w.beat = beat
  99. w.beat = None
  100. def create(self, w):
  101. from celery.beat import EmbeddedService
  102. b = w.beat = EmbeddedService(app=w.app,
  103. schedule_filename=w.schedule_filename,
  104. scheduler_cls=w.scheduler_cls)
  105. return b
  106. class Queues(abstract.Component):
  107. """This component initializes the internal queues
  108. used by the worker."""
  109. name = "worker.queues"
  110. def create(self, w):
  111. if not w.pool_cls.rlimit_safe:
  112. w.disable_rate_limits = True
  113. if w.disable_rate_limits:
  114. w.ready_queue = FastQueue()
  115. if w.use_eventloop:
  116. if w.pool_putlocks and w.pool_cls.uses_semaphore:
  117. w.ready_queue.put = w.process_task_sem
  118. else:
  119. w.ready_queue.put = w.process_task
  120. elif not w.pool_cls.requires_mediator:
  121. # just send task directly to pool, skip the mediator.
  122. w.ready_queue.put = w.process_task
  123. else:
  124. w.ready_queue = TaskBucket(task_registry=w.app.tasks)
  125. class Timers(abstract.Component):
  126. """This component initializes the internal timers used by the worker."""
  127. name = "worker.timers"
  128. requires = ("pool", )
  129. def create(self, w):
  130. options = {"on_error": self.on_timer_error,
  131. "on_tick": self.on_timer_tick}
  132. if w.use_eventloop:
  133. # the timers are fired by the hub, so don't use the Timer thread.
  134. w.timer = Schedule(max_interval=10, **options)
  135. else:
  136. if not w.timer_cls:
  137. # Default Timer is set by the pool, as e.g. eventlet
  138. # needs a custom implementation.
  139. w.timer_cls = w.pool.Timer
  140. w.timer = self.instantiate(w.pool.Timer,
  141. max_interval=w.timer_precision,
  142. **options)
  143. def on_timer_error(self, exc):
  144. logger.error("Timer error: %r", exc, exc_info=True)
  145. def on_timer_tick(self, delay):
  146. logger.debug("Timer wake-up! Next eta %s secs.", delay)
  147. class StateDB(abstract.Component):
  148. """This component sets up the workers state db if enabled."""
  149. name = "worker.state-db"
  150. def __init__(self, w, **kwargs):
  151. self.enabled = w.state_db
  152. w._persistence = None
  153. def create(self, w):
  154. w._persistence = state.Persistent(w.state_db)
  155. atexit.register(w._persistence.save)
  156. class WorkController(configurated):
  157. """Unmanaged worker instance."""
  158. RUN = RUN
  159. CLOSE = CLOSE
  160. TERMINATE = TERMINATE
  161. app = None
  162. concurrency = from_config()
  163. loglevel = logging.ERROR
  164. logfile = from_config("log_file")
  165. send_events = from_config()
  166. pool_cls = from_config("pool")
  167. consumer_cls = from_config("consumer")
  168. mediator_cls = from_config("mediator")
  169. timer_cls = from_config("timer")
  170. timer_precision = from_config("timer_precision")
  171. autoscaler_cls = from_config("autoscaler")
  172. autoreloader_cls = from_config("autoreloader")
  173. schedule_filename = from_config()
  174. scheduler_cls = from_config("celerybeat_scheduler")
  175. task_time_limit = from_config()
  176. task_soft_time_limit = from_config()
  177. max_tasks_per_child = from_config()
  178. pool_putlocks = from_config()
  179. force_execv = from_config()
  180. prefetch_multiplier = from_config()
  181. state_db = from_config()
  182. disable_rate_limits = from_config()
  183. worker_lost_wait = from_config()
  184. _state = None
  185. _running = 0
  186. def __init__(self, loglevel=None, hostname=None, ready_callback=noop,
  187. queues=None, app=None, pidfile=None, **kwargs):
  188. self.app = app_or_default(app or self.app)
  189. # all new threads start without a current app, so if an app is not
  190. # passed on to the thread it will fall back to the "default app",
  191. # which then could be the wrong app. So for the worker
  192. # we set this to always return our app. This is a hack,
  193. # and means that only a single app can be used for workers
  194. # running in the same process.
  195. set_default_app(self.app)
  196. self._shutdown_complete = Event()
  197. self.setup_defaults(kwargs, namespace="celeryd")
  198. self.app.select_queues(queues) # select queues subset.
  199. # Options
  200. self.loglevel = loglevel or self.loglevel
  201. self.hostname = hostname or socket.gethostname()
  202. self.ready_callback = ready_callback
  203. self._finalize = Finalize(self, self.stop, exitpriority=1)
  204. self.pidfile = pidfile
  205. self.pidlock = None
  206. self.use_eventloop = (detect_environment() == "default" and
  207. self.app.broker_connection().is_evented)
  208. # Initialize boot steps
  209. self.pool_cls = _concurrency.get_implementation(self.pool_cls)
  210. self.components = []
  211. self.namespace = Namespace(app=self.app).apply(self, **kwargs)
  212. def start(self):
  213. """Starts the workers main loop."""
  214. self._state = self.RUN
  215. if self.pidfile:
  216. self.pidlock = platforms.create_pidlock(self.pidfile)
  217. try:
  218. for i, component in enumerate(self.components):
  219. logger.debug("Starting %s...", qualname(component))
  220. self._running = i + 1
  221. component.start()
  222. logger.debug("%s OK!", qualname(component))
  223. except SystemTerminate:
  224. self.terminate()
  225. except Exception, exc:
  226. logger.error("Unrecoverable error: %r", exc,
  227. exc_info=True)
  228. self.stop()
  229. except (KeyboardInterrupt, SystemExit):
  230. self.stop()
  231. # Will only get here if running green,
  232. # makes sure all greenthreads have exited.
  233. self._shutdown_complete.wait()
  234. def process_task_sem(self, req):
  235. return self.semaphore.acquire(self.process_task, req)
  236. def process_task(self, req):
  237. """Process task by sending it to the pool of workers."""
  238. try:
  239. req.task.execute(req, self.pool, self.loglevel, self.logfile)
  240. except Exception, exc:
  241. logger.critical("Internal error: %r\n%s",
  242. exc, traceback.format_exc(), exc_info=True)
  243. except SystemTerminate:
  244. self.terminate()
  245. raise
  246. except BaseException, exc:
  247. self.stop()
  248. raise exc
  249. def signal_consumer_close(self):
  250. try:
  251. self.consumer.close()
  252. except AttributeError:
  253. pass
  254. def stop(self, in_sighandler=False):
  255. """Graceful shutdown of the worker server."""
  256. self.signal_consumer_close()
  257. if not in_sighandler or self.pool.signal_safe:
  258. self._shutdown(warm=True)
  259. def terminate(self, in_sighandler=False):
  260. """Not so graceful shutdown of the worker server."""
  261. self.signal_consumer_close()
  262. if not in_sighandler or self.pool.signal_safe:
  263. self._shutdown(warm=False)
  264. def _shutdown(self, warm=True):
  265. what = "Stopping" if warm else "Terminating"
  266. if self._state in (self.CLOSE, self.TERMINATE):
  267. return
  268. if self.pool:
  269. self.pool.close()
  270. if self._state != self.RUN or self._running != len(self.components):
  271. # Not fully started, can safely exit.
  272. self._state = self.TERMINATE
  273. self._shutdown_complete.set()
  274. return
  275. self._state = self.CLOSE
  276. for component in reversed(self.components):
  277. logger.debug("%s %s...", what, qualname(component))
  278. stop = component.stop
  279. if not warm:
  280. stop = getattr(component, "terminate", None) or stop
  281. stop()
  282. self.timer.stop()
  283. self.consumer.close_connection()
  284. if self.pidlock:
  285. self.pidlock.release()
  286. self._state = self.TERMINATE
  287. self._shutdown_complete.set()
  288. def reload(self, modules=None, reload=False, reloader=None):
  289. modules = self.app.loader.task_modules if modules is None else modules
  290. imp = self.app.loader.import_from_cwd
  291. for module in set(modules or ()):
  292. if module not in sys.modules:
  293. logger.debug("importing module %s", module)
  294. imp(module)
  295. elif reload:
  296. logger.debug("reloading module %s", module)
  297. reload_from_cwd(sys.modules[module], reloader)
  298. self.pool.restart()
  299. @property
  300. def state(self):
  301. return state