__init__.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.worker
  4. ~~~~~~~~~~~~~
  5. :class:`WorkController` can be used to instantiate in-process workers.
  6. The worker consists of several components, all managed by boot-steps
  7. (mod:`celery.worker.bootsteps`).
  8. """
  9. from __future__ import absolute_import
  10. import atexit
  11. import logging
  12. import socket
  13. import sys
  14. import time
  15. import traceback
  16. from functools import partial
  17. from billiard.exceptions import WorkerLostError
  18. from kombu.syn import detect_environment
  19. from kombu.utils.finalize import Finalize
  20. from celery import concurrency as _concurrency
  21. from celery import platforms
  22. from celery.app import app_or_default, set_default_app
  23. from celery.app.abstract import configurated, from_config
  24. from celery.exceptions import SystemTerminate, TaskRevokedError
  25. from celery.task import trace
  26. from celery.utils.functional import noop
  27. from celery.utils.imports import qualname, reload_from_cwd
  28. from celery.utils.log import get_logger
  29. from celery.utils.threads import Event
  30. from celery.utils.timer2 import Schedule
  31. from . import bootsteps
  32. from . import state
  33. from .buckets import TaskBucket, FastQueue
  34. from .hub import Hub, BoundedSemaphore
  35. #: Worker states
  36. RUN = 0x1
  37. CLOSE = 0x2
  38. TERMINATE = 0x3
  39. #: Default socket timeout at shutdown.
  40. SHUTDOWN_SOCKET_TIMEOUT = 5.0
  41. logger = get_logger(__name__)
  42. class Namespace(bootsteps.Namespace):
  43. """This is the boot-step namespace of the :class:`WorkController`.
  44. It loads modules from :setting:`CELERYD_BOOT_STEPS`, and its
  45. own set of built-in boot-step modules.
  46. """
  47. name = 'worker'
  48. builtin_boot_steps = ('celery.worker.autoscale',
  49. 'celery.worker.autoreload',
  50. 'celery.worker.consumer',
  51. 'celery.worker.mediator')
  52. def modules(self):
  53. return (self.builtin_boot_steps
  54. + self.app.conf.CELERYD_BOOT_STEPS)
  55. class Pool(bootsteps.StartStopComponent):
  56. """The pool component.
  57. Describes how to initialize the worker pool, and starts and stops
  58. the pool during worker startup/shutdown.
  59. Adds attributes:
  60. * autoscale
  61. * pool
  62. * max_concurrency
  63. * min_concurrency
  64. """
  65. name = 'worker.pool'
  66. requires = ('queues', 'beat', )
  67. def __init__(self, w, autoscale=None, autoreload=False,
  68. no_execv=False, **kwargs):
  69. w.autoscale = autoscale
  70. w.pool = None
  71. w.max_concurrency = None
  72. w.min_concurrency = w.concurrency
  73. w.no_execv = no_execv
  74. if w.autoscale:
  75. w.max_concurrency, w.min_concurrency = w.autoscale
  76. self.autoreload_enabled = autoreload
  77. def on_poll_init(self, pool, hub):
  78. apply_after = hub.timer.apply_after
  79. apply_at = hub.timer.apply_at
  80. on_soft_timeout = pool.on_soft_timeout
  81. on_hard_timeout = pool.on_hard_timeout
  82. maintain_pool = pool.maintain_pool
  83. add_reader = hub.add_reader
  84. remove = hub.remove
  85. now = time.time
  86. if not pool.did_start_ok():
  87. raise WorkerLostError('Could not start worker processes')
  88. # need to handle pool results before every task
  89. # since multiple tasks can be received in a single poll()
  90. hub.on_task.append(pool.maybe_handle_result)
  91. hub.update_readers(pool.readers)
  92. for handler, interval in pool.timers.iteritems():
  93. hub.timer.apply_interval(interval * 1000.0, handler)
  94. def on_timeout_set(R, soft, hard):
  95. def _on_soft_timeout():
  96. if hard:
  97. R._tref = apply_at(now() + (hard - soft),
  98. on_hard_timeout, (R, ))
  99. on_soft_timeout(R)
  100. if soft:
  101. R._tref = apply_after(soft * 1000.0, _on_soft_timeout)
  102. elif hard:
  103. R._tref = apply_after(hard * 1000.0,
  104. on_hard_timeout, (R, ))
  105. def on_timeout_cancel(result):
  106. try:
  107. result._tref.cancel()
  108. delattr(result, '_tref')
  109. except AttributeError:
  110. pass
  111. pool.init_callbacks(
  112. on_process_up=lambda w: add_reader(w.sentinel, maintain_pool),
  113. on_process_down=lambda w: remove(w.sentinel),
  114. on_timeout_set=on_timeout_set,
  115. on_timeout_cancel=on_timeout_cancel,
  116. )
  117. def create(self, w, semaphore=None, max_restarts=None):
  118. threaded = not w.use_eventloop
  119. procs = w.min_concurrency
  120. forking_enable = not threaded or (w.no_execv or not w.force_execv)
  121. if not threaded:
  122. semaphore = w.semaphore = BoundedSemaphore(procs)
  123. max_restarts = 100
  124. allow_restart = self.autoreload_enabled or w.pool_restarts
  125. pool = w.pool = self.instantiate(w.pool_cls, w.min_concurrency,
  126. initargs=(w.app, w.hostname),
  127. maxtasksperchild=w.max_tasks_per_child,
  128. timeout=w.task_time_limit,
  129. soft_timeout=w.task_soft_time_limit,
  130. putlocks=w.pool_putlocks and threaded,
  131. lost_worker_timeout=w.worker_lost_wait,
  132. threads=threaded,
  133. max_restarts=max_restarts,
  134. allow_restart=allow_restart,
  135. forking_enable=forking_enable,
  136. semaphore=semaphore)
  137. if w.hub:
  138. w.hub.on_init.append(partial(self.on_poll_init, pool))
  139. return pool
  140. class Beat(bootsteps.StartStopComponent):
  141. """Component used to embed a celerybeat process.
  142. This will only be enabled if the ``beat``
  143. argument is set.
  144. """
  145. name = 'worker.beat'
  146. def __init__(self, w, beat=False, **kwargs):
  147. self.enabled = w.beat = beat
  148. w.beat = None
  149. def create(self, w):
  150. from celery.beat import EmbeddedService
  151. b = w.beat = EmbeddedService(app=w.app,
  152. schedule_filename=w.schedule_filename,
  153. scheduler_cls=w.scheduler_cls)
  154. return b
  155. class Queues(bootsteps.Component):
  156. """This component initializes the internal queues
  157. used by the worker."""
  158. name = 'worker.queues'
  159. requires = ('ev', )
  160. def create(self, w):
  161. w.start_mediator = True
  162. if not w.pool_cls.rlimit_safe:
  163. w.disable_rate_limits = True
  164. if w.disable_rate_limits:
  165. w.ready_queue = FastQueue()
  166. if w.use_eventloop:
  167. w.start_mediator = False
  168. if w.pool_putlocks and w.pool_cls.uses_semaphore:
  169. w.ready_queue.put = w.process_task_sem
  170. else:
  171. w.ready_queue.put = w.process_task
  172. elif not w.pool_cls.requires_mediator:
  173. # just send task directly to pool, skip the mediator.
  174. w.ready_queue.put = w.process_task
  175. w.start_mediator = False
  176. else:
  177. w.ready_queue = TaskBucket(task_registry=w.app.tasks)
  178. class EvLoop(bootsteps.StartStopComponent):
  179. name = 'worker.ev'
  180. def __init__(self, w, **kwargs):
  181. w.hub = None
  182. def include_if(self, w):
  183. return w.use_eventloop
  184. def create(self, w):
  185. w.timer = Schedule(max_interval=10)
  186. hub = w.hub = Hub(w.timer)
  187. return hub
  188. class Timers(bootsteps.Component):
  189. """This component initializes the internal timers used by the worker."""
  190. name = 'worker.timers'
  191. requires = ('pool', )
  192. def include_if(self, w):
  193. return not w.use_eventloop
  194. def create(self, w):
  195. if not w.timer_cls:
  196. # Default Timer is set by the pool, as e.g. eventlet
  197. # needs a custom implementation.
  198. w.timer_cls = w.pool.Timer
  199. w.timer = self.instantiate(w.pool.Timer,
  200. max_interval=w.timer_precision,
  201. on_timer_error=self.on_timer_error,
  202. on_timer_tick=self.on_timer_tick)
  203. def on_timer_error(self, exc):
  204. logger.error('Timer error: %r', exc, exc_info=True)
  205. def on_timer_tick(self, delay):
  206. logger.debug('Timer wake-up! Next eta %s secs.', delay)
  207. class StateDB(bootsteps.Component):
  208. """This component sets up the workers state db if enabled."""
  209. name = 'worker.state-db'
  210. def __init__(self, w, **kwargs):
  211. self.enabled = w.state_db
  212. w._persistence = None
  213. def create(self, w):
  214. w._persistence = state.Persistent(w.state_db)
  215. atexit.register(w._persistence.save)
  216. class WorkController(configurated):
  217. """Unmanaged worker instance."""
  218. RUN = RUN
  219. CLOSE = CLOSE
  220. TERMINATE = TERMINATE
  221. app = None
  222. concurrency = from_config()
  223. loglevel = logging.ERROR
  224. logfile = from_config('log_file')
  225. send_events = from_config()
  226. pool_cls = from_config('pool')
  227. consumer_cls = from_config('consumer')
  228. mediator_cls = from_config('mediator')
  229. timer_cls = from_config('timer')
  230. timer_precision = from_config('timer_precision')
  231. autoscaler_cls = from_config('autoscaler')
  232. autoreloader_cls = from_config('autoreloader')
  233. schedule_filename = from_config()
  234. scheduler_cls = from_config('celerybeat_scheduler')
  235. task_time_limit = from_config()
  236. task_soft_time_limit = from_config()
  237. max_tasks_per_child = from_config()
  238. pool_putlocks = from_config()
  239. pool_restarts = from_config()
  240. force_execv = from_config()
  241. prefetch_multiplier = from_config()
  242. state_db = from_config()
  243. disable_rate_limits = from_config()
  244. worker_lost_wait = from_config()
  245. _state = None
  246. _running = 0
  247. def __init__(self, loglevel=None, hostname=None, ready_callback=noop,
  248. queues=None, app=None, pidfile=None, **kwargs):
  249. self.app = app_or_default(app or self.app)
  250. # all new threads start without a current app, so if an app is not
  251. # passed on to the thread it will fall back to the "default app",
  252. # which then could be the wrong app. So for the worker
  253. # we set this to always return our app. This is a hack,
  254. # and means that only a single app can be used for workers
  255. # running in the same process.
  256. set_default_app(self.app)
  257. self.app.finalize()
  258. trace._tasks = self.app._tasks
  259. self._shutdown_complete = Event()
  260. self.setup_defaults(kwargs, namespace='celeryd')
  261. self.app.select_queues(queues) # select queues subset.
  262. # Options
  263. self.loglevel = loglevel or self.loglevel
  264. self.hostname = hostname or socket.gethostname()
  265. self.ready_callback = ready_callback
  266. self._finalize = Finalize(self, self.stop, exitpriority=1)
  267. self.pidfile = pidfile
  268. self.pidlock = None
  269. self.use_eventloop = self.should_use_eventloop()
  270. # Update celery_include to have all known task modules, so that we
  271. # ensure all task modules are imported in case an execv happens.
  272. task_modules = set(task.__class__.__module__
  273. for task in self.app.tasks.itervalues())
  274. self.app.conf.CELERY_INCLUDE = tuple(
  275. set(self.app.conf.CELERY_INCLUDE) | task_modules,
  276. )
  277. # Initialize boot steps
  278. self.pool_cls = _concurrency.get_implementation(self.pool_cls)
  279. self.components = []
  280. self.namespace = Namespace(app=self.app).apply(self, **kwargs)
  281. def start(self):
  282. """Starts the workers main loop."""
  283. self._state = self.RUN
  284. if self.pidfile:
  285. self.pidlock = platforms.create_pidlock(self.pidfile)
  286. try:
  287. for i, component in enumerate(self.components):
  288. logger.debug('Starting %s...', qualname(component))
  289. self._running = i + 1
  290. if component:
  291. component.start()
  292. logger.debug('%s OK!', qualname(component))
  293. except SystemTerminate:
  294. self.terminate()
  295. except Exception, exc:
  296. logger.error('Unrecoverable error: %r', exc,
  297. exc_info=True)
  298. self.stop()
  299. except (KeyboardInterrupt, SystemExit):
  300. self.stop()
  301. # Will only get here if running green,
  302. # makes sure all greenthreads have exited.
  303. self._shutdown_complete.wait()
  304. def process_task_sem(self, req):
  305. return self.semaphore.acquire(self.process_task, req)
  306. def process_task(self, req):
  307. """Process task by sending it to the pool of workers."""
  308. try:
  309. req.execute_using_pool(self.pool)
  310. except TaskRevokedError:
  311. try:
  312. self._quick_release() # Issue 877
  313. except AttributeError:
  314. pass
  315. except Exception, exc:
  316. logger.critical('Internal error: %r\n%s',
  317. exc, traceback.format_exc(), exc_info=True)
  318. except SystemTerminate:
  319. self.terminate()
  320. raise
  321. except BaseException, exc:
  322. self.stop()
  323. raise exc
  324. def signal_consumer_close(self):
  325. try:
  326. self.consumer.close()
  327. except AttributeError:
  328. pass
  329. def should_use_eventloop(self):
  330. return (detect_environment() == 'default' and
  331. self.app.connection().is_evented and not self.app.IS_WINDOWS)
  332. def stop(self, in_sighandler=False):
  333. """Graceful shutdown of the worker server."""
  334. self.signal_consumer_close()
  335. if not in_sighandler or self.pool.signal_safe:
  336. self._shutdown(warm=True)
  337. def terminate(self, in_sighandler=False):
  338. """Not so graceful shutdown of the worker server."""
  339. self.signal_consumer_close()
  340. if not in_sighandler or self.pool.signal_safe:
  341. self._shutdown(warm=False)
  342. def _shutdown(self, warm=True):
  343. what = 'Stopping' if warm else 'Terminating'
  344. socket_timeout = socket.getdefaulttimeout()
  345. socket.setdefaulttimeout(SHUTDOWN_SOCKET_TIMEOUT) # Issue 975
  346. if self._state in (self.CLOSE, self.TERMINATE):
  347. return
  348. self.app.loader.shutdown_worker()
  349. if self.pool:
  350. self.pool.close()
  351. if self._state != self.RUN or self._running != len(self.components):
  352. # Not fully started, can safely exit.
  353. self._state = self.TERMINATE
  354. self._shutdown_complete.set()
  355. return
  356. self._state = self.CLOSE
  357. for component in reversed(self.components):
  358. logger.debug('%s %s...', what, qualname(component))
  359. if component:
  360. stop = component.stop
  361. if not warm:
  362. stop = getattr(component, 'terminate', None) or stop
  363. stop()
  364. self.timer.stop()
  365. self.consumer.close_connection()
  366. if self.pidlock:
  367. self.pidlock.release()
  368. self._state = self.TERMINATE
  369. socket.setdefaulttimeout(socket_timeout)
  370. self._shutdown_complete.set()
  371. def reload(self, modules=None, reload=False, reloader=None):
  372. modules = self.app.loader.task_modules if modules is None else modules
  373. imp = self.app.loader.import_from_cwd
  374. for module in set(modules or ()):
  375. if module not in sys.modules:
  376. logger.debug('importing module %s', module)
  377. imp(module)
  378. elif reload:
  379. logger.debug('reloading module %s', module)
  380. reload_from_cwd(sys.modules[module], reloader)
  381. self.pool.restart()
  382. @property
  383. def state(self):
  384. return state