__init__.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.worker
  4. ~~~~~~~~~~~~~
  5. :class:`WorkController` can be used to instantiate in-process workers.
  6. The worker consists of several components, all managed by boot-steps
  7. (mod:`celery.worker.bootsteps`).
  8. """
  9. from __future__ import absolute_import
  10. import atexit
  11. import logging
  12. import socket
  13. import sys
  14. import time
  15. import traceback
  16. from functools import partial
  17. from billiard import forking_enable
  18. from billiard.exceptions import WorkerLostError
  19. from kombu.syn import detect_environment
  20. from kombu.utils.finalize import Finalize
  21. from celery import concurrency as _concurrency
  22. from celery import platforms
  23. from celery.app import app_or_default, set_default_app
  24. from celery.app.abstract import configurated, from_config
  25. from celery.exceptions import SystemTerminate, TaskRevokedError
  26. from celery.task import trace
  27. from celery.utils.functional import noop
  28. from celery.utils.imports import qualname, reload_from_cwd
  29. from celery.utils.log import get_logger
  30. from celery.utils.threads import Event
  31. from celery.utils.timer2 import Schedule
  32. from . import bootsteps
  33. from . import state
  34. from .buckets import TaskBucket, FastQueue
  35. from .hub import Hub, BoundedSemaphore
  36. RUN = 0x1
  37. CLOSE = 0x2
  38. TERMINATE = 0x3
  39. logger = get_logger(__name__)
  40. class Namespace(bootsteps.Namespace):
  41. """This is the boot-step namespace of the :class:`WorkController`.
  42. It loads modules from :setting:`CELERYD_BOOT_STEPS`, and its
  43. own set of built-in boot-step modules.
  44. """
  45. name = 'worker'
  46. builtin_boot_steps = ('celery.worker.autoscale',
  47. 'celery.worker.autoreload',
  48. 'celery.worker.consumer',
  49. 'celery.worker.mediator')
  50. def modules(self):
  51. return (self.builtin_boot_steps
  52. + self.app.conf.CELERYD_BOOT_STEPS)
  53. class Pool(bootsteps.StartStopComponent):
  54. """The pool component.
  55. Describes how to initialize the worker pool, and starts and stops
  56. the pool during worker startup/shutdown.
  57. Adds attributes:
  58. * autoscale
  59. * pool
  60. * max_concurrency
  61. * min_concurrency
  62. """
  63. name = 'worker.pool'
  64. requires = ('queues', 'beat', )
  65. def __init__(self, w, autoscale=None, autoreload=False,
  66. no_execv=False, **kwargs):
  67. w.autoscale = autoscale
  68. w.pool = None
  69. w.max_concurrency = None
  70. w.min_concurrency = w.concurrency
  71. w.no_execv = no_execv
  72. if w.autoscale:
  73. w.max_concurrency, w.min_concurrency = w.autoscale
  74. self.autoreload_enabled = autoreload
  75. def on_poll_init(self, pool, hub):
  76. apply_after = hub.timer.apply_after
  77. apply_at = hub.timer.apply_at
  78. on_soft_timeout = pool.on_soft_timeout
  79. on_hard_timeout = pool.on_hard_timeout
  80. maintain_pool = pool.maintain_pool
  81. add_reader = hub.add_reader
  82. remove = hub.remove
  83. now = time.time
  84. if not pool.did_start_ok():
  85. raise WorkerLostError('Could not start worker processes')
  86. # need to handle pool results before every task
  87. # since multiple tasks can be received in a single poll()
  88. hub.on_task.append(pool.maybe_handle_result)
  89. hub.update_readers(pool.readers)
  90. for handler, interval in pool.timers.iteritems():
  91. hub.timer.apply_interval(interval * 1000.0, handler)
  92. def on_timeout_set(R, soft, hard):
  93. def _on_soft_timeout():
  94. if hard:
  95. R._tref = apply_at(now() + (hard - soft),
  96. on_hard_timeout, (R, ))
  97. on_soft_timeout(R)
  98. if soft:
  99. R._tref = apply_after(soft * 1000.0, _on_soft_timeout)
  100. elif hard:
  101. R._tref = apply_after(hard * 1000.0,
  102. on_hard_timeout, (R, ))
  103. def on_timeout_cancel(result):
  104. try:
  105. result._tref.cancel()
  106. delattr(result, '_tref')
  107. except AttributeError:
  108. pass
  109. pool.init_callbacks(
  110. on_process_up=lambda w: add_reader(w.sentinel, maintain_pool),
  111. on_process_down=lambda w: remove(w.sentinel),
  112. on_timeout_set=on_timeout_set,
  113. on_timeout_cancel=on_timeout_cancel,
  114. )
  115. def create(self, w, semaphore=None, max_restarts=None):
  116. threaded = not w.use_eventloop
  117. forking_enable(not threaded or (w.no_execv or not w.force_execv))
  118. procs = w.min_concurrency
  119. if not threaded:
  120. semaphore = w.semaphore = BoundedSemaphore(procs)
  121. max_restarts = 100
  122. allow_restart = self.autoreload_enabled or w.pool_restarts
  123. pool = w.pool = self.instantiate(w.pool_cls, w.min_concurrency,
  124. initargs=(w.app, w.hostname),
  125. maxtasksperchild=w.max_tasks_per_child,
  126. timeout=w.task_time_limit,
  127. soft_timeout=w.task_soft_time_limit,
  128. putlocks=w.pool_putlocks and threaded,
  129. lost_worker_timeout=w.worker_lost_wait,
  130. threads=threaded,
  131. max_restarts=max_restarts,
  132. allow_restart=allow_restart,
  133. semaphore=semaphore)
  134. if w.hub:
  135. w.hub.on_init.append(partial(self.on_poll_init, pool))
  136. return pool
  137. class Beat(bootsteps.StartStopComponent):
  138. """Component used to embed a celerybeat process.
  139. This will only be enabled if the ``beat``
  140. argument is set.
  141. """
  142. name = 'worker.beat'
  143. def __init__(self, w, beat=False, **kwargs):
  144. self.enabled = w.beat = beat
  145. w.beat = None
  146. def create(self, w):
  147. from celery.beat import EmbeddedService
  148. b = w.beat = EmbeddedService(app=w.app,
  149. schedule_filename=w.schedule_filename,
  150. scheduler_cls=w.scheduler_cls)
  151. return b
  152. class Queues(bootsteps.Component):
  153. """This component initializes the internal queues
  154. used by the worker."""
  155. name = 'worker.queues'
  156. requires = ('ev', )
  157. def create(self, w):
  158. w.start_mediator = True
  159. if not w.pool_cls.rlimit_safe:
  160. w.disable_rate_limits = True
  161. if w.disable_rate_limits:
  162. w.ready_queue = FastQueue()
  163. if w.use_eventloop:
  164. w.start_mediator = False
  165. if w.pool_putlocks and w.pool_cls.uses_semaphore:
  166. w.ready_queue.put = w.process_task_sem
  167. else:
  168. w.ready_queue.put = w.process_task
  169. elif not w.pool_cls.requires_mediator:
  170. # just send task directly to pool, skip the mediator.
  171. w.ready_queue.put = w.process_task
  172. w.start_mediator = False
  173. else:
  174. w.ready_queue = TaskBucket(task_registry=w.app.tasks)
  175. class EvLoop(bootsteps.StartStopComponent):
  176. name = 'worker.ev'
  177. def __init__(self, w, **kwargs):
  178. w.hub = None
  179. def include_if(self, w):
  180. return w.use_eventloop
  181. def create(self, w):
  182. w.timer = Schedule(max_interval=10)
  183. hub = w.hub = Hub(w.timer)
  184. return hub
  185. class Timers(bootsteps.Component):
  186. """This component initializes the internal timers used by the worker."""
  187. name = 'worker.timers'
  188. requires = ('pool', )
  189. def include_if(self, w):
  190. return not w.use_eventloop
  191. def create(self, w):
  192. if not w.timer_cls:
  193. # Default Timer is set by the pool, as e.g. eventlet
  194. # needs a custom implementation.
  195. w.timer_cls = w.pool.Timer
  196. w.timer = self.instantiate(w.pool.Timer,
  197. max_interval=w.timer_precision,
  198. on_timer_error=self.on_timer_error,
  199. on_timer_tick=self.on_timer_tick)
  200. def on_timer_error(self, exc):
  201. logger.error('Timer error: %r', exc, exc_info=True)
  202. def on_timer_tick(self, delay):
  203. logger.debug('Timer wake-up! Next eta %s secs.', delay)
  204. class StateDB(bootsteps.Component):
  205. """This component sets up the workers state db if enabled."""
  206. name = 'worker.state-db'
  207. def __init__(self, w, **kwargs):
  208. self.enabled = w.state_db
  209. w._persistence = None
  210. def create(self, w):
  211. w._persistence = state.Persistent(w.state_db)
  212. atexit.register(w._persistence.save)
  213. class WorkController(configurated):
  214. """Unmanaged worker instance."""
  215. RUN = RUN
  216. CLOSE = CLOSE
  217. TERMINATE = TERMINATE
  218. app = None
  219. concurrency = from_config()
  220. loglevel = logging.ERROR
  221. logfile = from_config('log_file')
  222. send_events = from_config()
  223. pool_cls = from_config('pool')
  224. consumer_cls = from_config('consumer')
  225. mediator_cls = from_config('mediator')
  226. timer_cls = from_config('timer')
  227. timer_precision = from_config('timer_precision')
  228. autoscaler_cls = from_config('autoscaler')
  229. autoreloader_cls = from_config('autoreloader')
  230. schedule_filename = from_config()
  231. scheduler_cls = from_config('celerybeat_scheduler')
  232. task_time_limit = from_config()
  233. task_soft_time_limit = from_config()
  234. max_tasks_per_child = from_config()
  235. pool_putlocks = from_config()
  236. pool_restarts = from_config()
  237. force_execv = from_config()
  238. prefetch_multiplier = from_config()
  239. state_db = from_config()
  240. disable_rate_limits = from_config()
  241. worker_lost_wait = from_config()
  242. _state = None
  243. _running = 0
  244. def __init__(self, loglevel=None, hostname=None, ready_callback=noop,
  245. queues=None, app=None, pidfile=None, **kwargs):
  246. self.app = app_or_default(app or self.app)
  247. # all new threads start without a current app, so if an app is not
  248. # passed on to the thread it will fall back to the "default app",
  249. # which then could be the wrong app. So for the worker
  250. # we set this to always return our app. This is a hack,
  251. # and means that only a single app can be used for workers
  252. # running in the same process.
  253. set_default_app(self.app)
  254. self.app.finalize()
  255. trace._tasks = self.app._tasks
  256. self._shutdown_complete = Event()
  257. self.setup_defaults(kwargs, namespace='celeryd')
  258. self.app.select_queues(queues) # select queues subset.
  259. # Options
  260. self.loglevel = loglevel or self.loglevel
  261. self.hostname = hostname or socket.gethostname()
  262. self.ready_callback = ready_callback
  263. self._finalize = Finalize(self, self.stop, exitpriority=1)
  264. self.pidfile = pidfile
  265. self.pidlock = None
  266. self.use_eventloop = self.should_use_eventloop()
  267. # Update celery_include to have all known task modules, so that we
  268. # ensure all task modules are imported in case an execv happens.
  269. task_modules = set(task.__class__.__module__
  270. for task in self.app.tasks.itervalues())
  271. self.app.conf.CELERY_INCLUDE = tuple(
  272. set(self.app.conf.CELERY_INCLUDE) | task_modules,
  273. )
  274. # Initialize boot steps
  275. self.pool_cls = _concurrency.get_implementation(self.pool_cls)
  276. self.components = []
  277. self.namespace = Namespace(app=self.app).apply(self, **kwargs)
  278. def start(self):
  279. """Starts the workers main loop."""
  280. self._state = self.RUN
  281. if self.pidfile:
  282. self.pidlock = platforms.create_pidlock(self.pidfile)
  283. try:
  284. for i, component in enumerate(self.components):
  285. logger.debug('Starting %s...', qualname(component))
  286. self._running = i + 1
  287. if component:
  288. component.start()
  289. logger.debug('%s OK!', qualname(component))
  290. except SystemTerminate:
  291. self.terminate()
  292. except Exception, exc:
  293. logger.error('Unrecoverable error: %r', exc,
  294. exc_info=True)
  295. self.stop()
  296. except (KeyboardInterrupt, SystemExit):
  297. self.stop()
  298. # Will only get here if running green,
  299. # makes sure all greenthreads have exited.
  300. self._shutdown_complete.wait()
  301. def process_task_sem(self, req):
  302. return self.semaphore.acquire(self.process_task, req)
  303. def process_task(self, req):
  304. """Process task by sending it to the pool of workers."""
  305. try:
  306. req.execute_using_pool(self.pool)
  307. except TaskRevokedError:
  308. if self.semaphore: # (Issue #877)
  309. self.semaphore.release()
  310. except Exception, exc:
  311. logger.critical('Internal error: %r\n%s',
  312. exc, traceback.format_exc(), exc_info=True)
  313. except SystemTerminate:
  314. self.terminate()
  315. raise
  316. except BaseException, exc:
  317. self.stop()
  318. raise exc
  319. def signal_consumer_close(self):
  320. try:
  321. self.consumer.close()
  322. except AttributeError:
  323. pass
  324. def should_use_eventloop(self):
  325. return (detect_environment() == 'default' and
  326. self.app.connection().is_evented and not self.app.IS_WINDOWS)
  327. def stop(self, in_sighandler=False):
  328. """Graceful shutdown of the worker server."""
  329. self.signal_consumer_close()
  330. if not in_sighandler or self.pool.signal_safe:
  331. self._shutdown(warm=True)
  332. def terminate(self, in_sighandler=False):
  333. """Not so graceful shutdown of the worker server."""
  334. self.signal_consumer_close()
  335. if not in_sighandler or self.pool.signal_safe:
  336. self._shutdown(warm=False)
  337. def _shutdown(self, warm=True):
  338. what = 'Stopping' if warm else 'Terminating'
  339. if self._state in (self.CLOSE, self.TERMINATE):
  340. return
  341. self.app.loader.shutdown_worker()
  342. if self.pool:
  343. self.pool.close()
  344. if self._state != self.RUN or self._running != len(self.components):
  345. # Not fully started, can safely exit.
  346. self._state = self.TERMINATE
  347. self._shutdown_complete.set()
  348. return
  349. self._state = self.CLOSE
  350. for component in reversed(self.components):
  351. logger.debug('%s %s...', what, qualname(component))
  352. if component:
  353. stop = component.stop
  354. if not warm:
  355. stop = getattr(component, 'terminate', None) or stop
  356. stop()
  357. self.timer.stop()
  358. self.consumer.close_connection()
  359. if self.pidlock:
  360. self.pidlock.release()
  361. self._state = self.TERMINATE
  362. self._shutdown_complete.set()
  363. def reload(self, modules=None, reload=False, reloader=None):
  364. modules = self.app.loader.task_modules if modules is None else modules
  365. imp = self.app.loader.import_from_cwd
  366. for module in set(modules or ()):
  367. if module not in sys.modules:
  368. logger.debug('importing module %s', module)
  369. imp(module)
  370. elif reload:
  371. logger.debug('reloading module %s', module)
  372. reload_from_cwd(sys.modules[module], reloader)
  373. self.pool.restart()
  374. @property
  375. def state(self):
  376. return state