__init__.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.worker
  4. ~~~~~~~~~~~~~
  5. :class:`WorkController` can be used to instantiate in-process workers.
  6. The worker consists of several components, all managed by boot-steps
  7. (mod:`celery.worker.bootsteps`).
  8. """
  9. from __future__ import absolute_import
  10. import atexit
  11. import logging
  12. import socket
  13. import sys
  14. import time
  15. import traceback
  16. from functools import partial
  17. from billiard.exceptions import WorkerLostError
  18. from billiard.util import Finalize
  19. from kombu.syn import detect_environment
  20. from celery import concurrency as _concurrency
  21. from celery import platforms
  22. from celery.app import app_or_default
  23. from celery.app.abstract import configurated, from_config
  24. from celery.exceptions import SystemTerminate, TaskRevokedError
  25. from celery.utils.functional import noop
  26. from celery.utils.imports import qualname, reload_from_cwd
  27. from celery.utils.log import get_logger
  28. from celery.utils.threads import Event
  29. from celery.utils.timer2 import Schedule
  30. from . import bootsteps
  31. from . import state
  32. from .buckets import TaskBucket, AsyncTaskBucket, FastQueue
  33. from .hub import Hub, BoundedSemaphore
  34. #: Worker states
  35. RUN = 0x1
  36. CLOSE = 0x2
  37. TERMINATE = 0x3
  38. #: Default socket timeout at shutdown.
  39. SHUTDOWN_SOCKET_TIMEOUT = 5.0
  40. logger = get_logger(__name__)
  41. class Namespace(bootsteps.Namespace):
  42. """This is the boot-step namespace of the :class:`WorkController`.
  43. It loads modules from :setting:`CELERYD_BOOT_STEPS`, and its
  44. own set of built-in boot-step modules.
  45. """
  46. name = 'worker'
  47. builtin_boot_steps = ('celery.worker.autoscale',
  48. 'celery.worker.autoreload',
  49. 'celery.worker.consumer',
  50. 'celery.worker.mediator')
  51. def modules(self):
  52. return self.builtin_boot_steps + self.app.conf.CELERYD_BOOT_STEPS
  53. class Pool(bootsteps.StartStopComponent):
  54. """The pool component.
  55. Describes how to initialize the worker pool, and starts and stops
  56. the pool during worker startup/shutdown.
  57. Adds attributes:
  58. * autoscale
  59. * pool
  60. * max_concurrency
  61. * min_concurrency
  62. """
  63. name = 'worker.pool'
  64. requires = ('queues', 'beat', )
  65. def __init__(self, w,
  66. autoscale=None, autoreload=False, no_execv=False, **kwargs):
  67. w.autoscale = autoscale
  68. w.pool = None
  69. w.max_concurrency = None
  70. w.min_concurrency = w.concurrency
  71. w.no_execv = no_execv
  72. if w.autoscale:
  73. w.max_concurrency, w.min_concurrency = w.autoscale
  74. self.autoreload_enabled = autoreload
  75. def on_poll_init(self, pool, w, hub):
  76. apply_after = hub.timer.apply_after
  77. apply_at = hub.timer.apply_at
  78. on_soft_timeout = pool.on_soft_timeout
  79. on_hard_timeout = pool.on_hard_timeout
  80. maintain_pool = pool.maintain_pool
  81. add_reader = hub.add_reader
  82. remove = hub.remove
  83. now = time.time
  84. # did_start_ok will verify that pool processes were able to start,
  85. # but this will only work the first time we start, as
  86. # maxtasksperchild will mess up metrics.
  87. if not w.consumer.restart_count and not pool.did_start_ok():
  88. raise WorkerLostError('Could not start worker processes')
  89. # need to handle pool results before every task
  90. # since multiple tasks can be received in a single poll()
  91. hub.on_task.append(pool.maybe_handle_result)
  92. hub.update_readers(pool.readers)
  93. for handler, interval in pool.timers.iteritems():
  94. hub.timer.apply_interval(interval * 1000.0, handler)
  95. def on_timeout_set(R, soft, hard):
  96. def _on_soft_timeout():
  97. if hard:
  98. R._tref = apply_at(now() + (hard - soft),
  99. on_hard_timeout, (R, ))
  100. on_soft_timeout(R)
  101. if soft:
  102. R._tref = apply_after(soft * 1000.0, _on_soft_timeout)
  103. elif hard:
  104. R._tref = apply_after(hard * 1000.0,
  105. on_hard_timeout, (R, ))
  106. def on_timeout_cancel(result):
  107. try:
  108. result._tref.cancel()
  109. delattr(result, '_tref')
  110. except AttributeError:
  111. pass
  112. pool.init_callbacks(
  113. on_process_up=lambda w: add_reader(w.sentinel, maintain_pool),
  114. on_process_down=lambda w: remove(w.sentinel),
  115. on_timeout_set=on_timeout_set,
  116. on_timeout_cancel=on_timeout_cancel,
  117. )
  118. def create(self, w, semaphore=None, max_restarts=None):
  119. threaded = not w.use_eventloop
  120. procs = w.min_concurrency
  121. forking_enable = w.no_execv or not w.force_execv
  122. if not threaded:
  123. semaphore = w.semaphore = BoundedSemaphore(procs)
  124. max_restarts = 100
  125. allow_restart = self.autoreload_enabled or w.pool_restarts
  126. pool = w.pool = self.instantiate(
  127. w.pool_cls, w.min_concurrency,
  128. initargs=(w.app, w.hostname),
  129. maxtasksperchild=w.max_tasks_per_child,
  130. timeout=w.task_time_limit,
  131. soft_timeout=w.task_soft_time_limit,
  132. putlocks=w.pool_putlocks and threaded,
  133. lost_worker_timeout=w.worker_lost_wait,
  134. threads=threaded,
  135. max_restarts=max_restarts,
  136. allow_restart=allow_restart,
  137. forking_enable=forking_enable,
  138. semaphore=semaphore,
  139. callbacks_propagate=(
  140. w._conninfo.connection_errors + w._conninfo.channel_errors
  141. ),
  142. )
  143. if w.hub:
  144. w.hub.on_init.append(partial(self.on_poll_init, pool, w))
  145. return pool
  146. class Beat(bootsteps.StartStopComponent):
  147. """Component used to embed a celerybeat process.
  148. This will only be enabled if the ``beat``
  149. argument is set.
  150. """
  151. name = 'worker.beat'
  152. def __init__(self, w, beat=False, **kwargs):
  153. self.enabled = w.beat = beat
  154. w.beat = None
  155. def create(self, w):
  156. from celery.beat import EmbeddedService
  157. b = w.beat = EmbeddedService(app=w.app,
  158. schedule_filename=w.schedule_filename,
  159. scheduler_cls=w.scheduler_cls)
  160. return b
  161. class Queues(bootsteps.Component):
  162. """This component initializes the internal queues
  163. used by the worker."""
  164. name = 'worker.queues'
  165. requires = ('ev', )
  166. def create(self, w):
  167. BucketType = TaskBucket
  168. w.start_mediator = not w.disable_rate_limits
  169. if not w.pool_cls.rlimit_safe:
  170. w.start_mediator = False
  171. BucketType = AsyncTaskBucket
  172. process_task = w.process_task
  173. if w.use_eventloop:
  174. w.start_mediator = False
  175. BucketType = AsyncTaskBucket
  176. if w.pool_putlocks and w.pool_cls.uses_semaphore:
  177. process_task = w.process_task_sem
  178. if w.disable_rate_limits:
  179. w.ready_queue = FastQueue()
  180. w.ready_queue.put = process_task
  181. else:
  182. w.ready_queue = BucketType(
  183. task_registry=w.app.tasks, callback=process_task, worker=w,
  184. )
  185. class EvLoop(bootsteps.StartStopComponent):
  186. name = 'worker.ev'
  187. def __init__(self, w, **kwargs):
  188. w.hub = None
  189. def include_if(self, w):
  190. return w.use_eventloop
  191. def create(self, w):
  192. w.timer = Schedule(max_interval=10)
  193. hub = w.hub = Hub(w.timer)
  194. return hub
  195. class Timers(bootsteps.Component):
  196. """This component initializes the internal timers used by the worker."""
  197. name = 'worker.timers'
  198. requires = ('pool', )
  199. def include_if(self, w):
  200. return not w.use_eventloop
  201. def create(self, w):
  202. if not w.timer_cls:
  203. # Default Timer is set by the pool, as e.g. eventlet
  204. # needs a custom implementation.
  205. w.timer_cls = w.pool.Timer
  206. w.timer = self.instantiate(w.pool.Timer,
  207. max_interval=w.timer_precision,
  208. on_timer_error=self.on_timer_error,
  209. on_timer_tick=self.on_timer_tick)
  210. def on_timer_error(self, exc):
  211. logger.error('Timer error: %r', exc, exc_info=True)
  212. def on_timer_tick(self, delay):
  213. logger.debug('Timer wake-up! Next eta %s secs.', delay)
  214. class StateDB(bootsteps.Component):
  215. """This component sets up the workers state db if enabled."""
  216. name = 'worker.state-db'
  217. def __init__(self, w, **kwargs):
  218. self.enabled = w.state_db
  219. w._persistence = None
  220. def create(self, w):
  221. w._persistence = state.Persistent(w.state_db)
  222. atexit.register(w._persistence.save)
  223. class WorkController(configurated):
  224. """Unmanaged worker instance."""
  225. RUN = RUN
  226. CLOSE = CLOSE
  227. TERMINATE = TERMINATE
  228. app = None
  229. concurrency = from_config()
  230. loglevel = logging.ERROR
  231. logfile = from_config('log_file')
  232. send_events = from_config()
  233. pool_cls = from_config('pool')
  234. consumer_cls = from_config('consumer')
  235. mediator_cls = from_config('mediator')
  236. timer_cls = from_config('timer')
  237. timer_precision = from_config('timer_precision')
  238. autoscaler_cls = from_config('autoscaler')
  239. autoreloader_cls = from_config('autoreloader')
  240. schedule_filename = from_config()
  241. scheduler_cls = from_config('celerybeat_scheduler')
  242. task_time_limit = from_config()
  243. task_soft_time_limit = from_config()
  244. max_tasks_per_child = from_config()
  245. pool_putlocks = from_config()
  246. pool_restarts = from_config()
  247. force_execv = from_config()
  248. prefetch_multiplier = from_config()
  249. state_db = from_config()
  250. disable_rate_limits = from_config()
  251. worker_lost_wait = from_config()
  252. _state = None
  253. _running = 0
  254. def __init__(self, loglevel=None, hostname=None, ready_callback=noop,
  255. queues=None, app=None, pidfile=None, use_eventloop=None,
  256. **kwargs):
  257. self.app = app_or_default(app or self.app)
  258. self._shutdown_complete = Event()
  259. self.setup_defaults(kwargs, namespace='celeryd')
  260. self.app.select_queues(queues) # select queues subset.
  261. # Options
  262. self.loglevel = loglevel or self.loglevel
  263. self.hostname = hostname or socket.gethostname()
  264. self.ready_callback = ready_callback
  265. self._finalize = Finalize(self, self.stop, exitpriority=1)
  266. self.pidfile = pidfile
  267. self.pidlock = None
  268. # this connection is not established, only used for params
  269. self._conninfo = self.app.connection()
  270. self.use_eventloop = (
  271. self.should_use_eventloop() if use_eventloop is None
  272. else use_eventloop
  273. )
  274. # Update celery_include to have all known task modules, so that we
  275. # ensure all task modules are imported in case an execv happens.
  276. task_modules = set(task.__class__.__module__
  277. for task in self.app.tasks.itervalues())
  278. self.app.conf.CELERY_INCLUDE = tuple(
  279. set(self.app.conf.CELERY_INCLUDE) | task_modules,
  280. )
  281. # Initialize boot steps
  282. self.pool_cls = _concurrency.get_implementation(self.pool_cls)
  283. self.components = []
  284. self.namespace = Namespace(app=self.app).apply(self, **kwargs)
  285. def start(self):
  286. """Starts the workers main loop."""
  287. self._state = self.RUN
  288. if self.pidfile:
  289. self.pidlock = platforms.create_pidlock(self.pidfile)
  290. try:
  291. for i, component in enumerate(self.components):
  292. logger.debug('Starting %s...', qualname(component))
  293. self._running = i + 1
  294. if component:
  295. component.start()
  296. logger.debug('%s OK!', qualname(component))
  297. except SystemTerminate:
  298. self.terminate()
  299. except Exception, exc:
  300. logger.error('Unrecoverable error: %r', exc,
  301. exc_info=True)
  302. self.stop()
  303. except (KeyboardInterrupt, SystemExit):
  304. self.stop()
  305. # Will only get here if running green,
  306. # makes sure all greenthreads have exited.
  307. self._shutdown_complete.wait()
  308. def process_task_sem(self, req):
  309. return self.semaphore.acquire(self.process_task, req)
  310. def process_task(self, req):
  311. """Process task by sending it to the pool of workers."""
  312. try:
  313. req.execute_using_pool(self.pool)
  314. except TaskRevokedError:
  315. try:
  316. self._quick_release() # Issue 877
  317. except AttributeError:
  318. pass
  319. except Exception, exc:
  320. logger.critical('Internal error: %r\n%s',
  321. exc, traceback.format_exc(), exc_info=True)
  322. except SystemTerminate:
  323. self.terminate()
  324. raise
  325. except BaseException, exc:
  326. self.stop()
  327. raise exc
  328. def signal_consumer_close(self):
  329. try:
  330. self.consumer.close()
  331. except AttributeError:
  332. pass
  333. def should_use_eventloop(self):
  334. return (detect_environment() == 'default' and
  335. self._conninfo.is_evented and not self.app.IS_WINDOWS)
  336. def stop(self, in_sighandler=False):
  337. """Graceful shutdown of the worker server."""
  338. self.signal_consumer_close()
  339. if not in_sighandler or self.pool.signal_safe:
  340. self._shutdown(warm=True)
  341. def terminate(self, in_sighandler=False):
  342. """Not so graceful shutdown of the worker server."""
  343. self.signal_consumer_close()
  344. if not in_sighandler or self.pool.signal_safe:
  345. self._shutdown(warm=False)
  346. def _shutdown(self, warm=True):
  347. what = 'Stopping' if warm else 'Terminating'
  348. socket_timeout = socket.getdefaulttimeout()
  349. socket.setdefaulttimeout(SHUTDOWN_SOCKET_TIMEOUT) # Issue 975
  350. if self._state in (self.CLOSE, self.TERMINATE):
  351. return
  352. self.app.loader.shutdown_worker()
  353. if self.pool:
  354. self.pool.close()
  355. if self._state != self.RUN or self._running != len(self.components):
  356. # Not fully started, can safely exit.
  357. self._state = self.TERMINATE
  358. self._shutdown_complete.set()
  359. return
  360. self._state = self.CLOSE
  361. for component in reversed(self.components):
  362. logger.debug('%s %s...', what, qualname(component))
  363. if component:
  364. stop = component.stop
  365. if not warm:
  366. stop = getattr(component, 'terminate', None) or stop
  367. stop()
  368. self.timer.stop()
  369. self.consumer.close_connection()
  370. if self.pidlock:
  371. self.pidlock.release()
  372. self._state = self.TERMINATE
  373. socket.setdefaulttimeout(socket_timeout)
  374. self._shutdown_complete.set()
  375. def reload(self, modules=None, reload=False, reloader=None):
  376. modules = self.app.loader.task_modules if modules is None else modules
  377. imp = self.app.loader.import_from_cwd
  378. for module in set(modules or ()):
  379. if module not in sys.modules:
  380. logger.debug('importing module %s', module)
  381. imp(module)
  382. elif reload:
  383. logger.debug('reloading module %s', module)
  384. reload_from_cwd(sys.modules[module], reloader)
  385. self.pool.restart()
  386. @property
  387. def state(self):
  388. return state