__init__.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.worker
  4. ~~~~~~~~~~~~~
  5. :class:`WorkController` can be used to instantiate in-process workers.
  6. The worker consists of several components, all managed by boot-steps
  7. (mod:`celery.worker.bootsteps`).
  8. """
  9. from __future__ import absolute_import
  10. import atexit
  11. import logging
  12. import socket
  13. import sys
  14. import time
  15. import traceback
  16. from functools import partial
  17. from billiard.exceptions import WorkerLostError
  18. from billiard.util import Finalize
  19. from kombu.syn import detect_environment
  20. from celery import concurrency as _concurrency
  21. from celery import platforms
  22. from celery.app import app_or_default
  23. from celery.app.abstract import configurated, from_config
  24. from celery.exceptions import SystemTerminate, TaskRevokedError
  25. from celery.utils.functional import noop
  26. from celery.utils.imports import qualname, reload_from_cwd
  27. from celery.utils.log import get_logger
  28. from celery.utils.threads import Event
  29. from celery.utils.timer2 import Schedule
  30. from . import bootsteps
  31. from . import state
  32. from .buckets import TaskBucket, AsyncTaskBucket, FastQueue
  33. from .hub import Hub, BoundedSemaphore
  34. #: Worker states
  35. RUN = 0x1
  36. CLOSE = 0x2
  37. TERMINATE = 0x3
  38. #: Default socket timeout at shutdown.
  39. SHUTDOWN_SOCKET_TIMEOUT = 5.0
  40. MAXTASKS_NO_BILLIARD = """\
  41. maxtasksperchild enabled but billiard C extension not installed!
  42. This may lead to a deadlock, please install the billiard C extension.
  43. """
  44. logger = get_logger(__name__)
  45. class Namespace(bootsteps.Namespace):
  46. """This is the boot-step namespace of the :class:`WorkController`.
  47. It loads modules from :setting:`CELERYD_BOOT_STEPS`, and its
  48. own set of built-in boot-step modules.
  49. """
  50. name = 'worker'
  51. builtin_boot_steps = ('celery.worker.autoscale',
  52. 'celery.worker.autoreload',
  53. 'celery.worker.consumer',
  54. 'celery.worker.mediator')
  55. def modules(self):
  56. return self.builtin_boot_steps + self.app.conf.CELERYD_BOOT_STEPS
  57. class Pool(bootsteps.StartStopComponent):
  58. """The pool component.
  59. Describes how to initialize the worker pool, and starts and stops
  60. the pool during worker startup/shutdown.
  61. Adds attributes:
  62. * autoscale
  63. * pool
  64. * max_concurrency
  65. * min_concurrency
  66. """
  67. name = 'worker.pool'
  68. requires = ('queues', 'beat', )
  69. def __init__(self, w,
  70. autoscale=None, autoreload=False, no_execv=False, **kwargs):
  71. w.autoscale = autoscale
  72. w.pool = None
  73. w.max_concurrency = None
  74. w.min_concurrency = w.concurrency
  75. w.no_execv = no_execv
  76. if w.autoscale:
  77. w.max_concurrency, w.min_concurrency = w.autoscale
  78. self.autoreload_enabled = autoreload
  79. def on_poll_init(self, pool, w, hub):
  80. apply_after = hub.timer.apply_after
  81. apply_at = hub.timer.apply_at
  82. on_soft_timeout = pool.on_soft_timeout
  83. on_hard_timeout = pool.on_hard_timeout
  84. maintain_pool = pool.maintain_pool
  85. add_reader = hub.add_reader
  86. remove = hub.remove
  87. now = time.time
  88. # did_start_ok will verify that pool processes were able to start,
  89. # but this will only work the first time we start, as
  90. # maxtasksperchild will mess up metrics.
  91. if not w.consumer.restart_count and not pool.did_start_ok():
  92. raise WorkerLostError('Could not start worker processes')
  93. # need to handle pool results before every task
  94. # since multiple tasks can be received in a single poll()
  95. hub.on_task.append(pool.maybe_handle_result)
  96. hub.update_readers(pool.readers)
  97. for handler, interval in pool.timers.iteritems():
  98. hub.timer.apply_interval(interval * 1000.0, handler)
  99. def on_timeout_set(R, soft, hard):
  100. def _on_soft_timeout():
  101. if hard:
  102. R._tref = apply_at(now() + (hard - soft),
  103. on_hard_timeout, (R, ))
  104. on_soft_timeout(R)
  105. if soft:
  106. R._tref = apply_after(soft * 1000.0, _on_soft_timeout)
  107. elif hard:
  108. R._tref = apply_after(hard * 1000.0,
  109. on_hard_timeout, (R, ))
  110. def on_timeout_cancel(result):
  111. try:
  112. result._tref.cancel()
  113. delattr(result, '_tref')
  114. except AttributeError:
  115. pass
  116. # This makes sure the timers are fired even if writing
  117. # to the pool inqueue blocks.
  118. # XXX Ugly hack to fix #1306 and the worker will still be blocking
  119. # but at least it will recover if there are no worker processes
  120. # to write to.
  121. # This can be fixed properly when we are
  122. # able to use the pure-python version of multiprocessing
  123. # (celery 3.1+)
  124. try:
  125. import _billiard
  126. except ImportError:
  127. # billiard C extension not installed
  128. if w.max_tasks_per_child:
  129. logger.warning(MAXTASKS_NO_BILLIARD)
  130. _quick_put = pool._pool._quick_put
  131. def quick_put(obj):
  132. _quick_put(obj, hub.maintain_pool)
  133. pool._pool._quick_put = quick_put
  134. pool.init_callbacks(
  135. on_process_up=lambda w: add_reader(w.sentinel, maintain_pool),
  136. on_process_down=lambda w: remove(w.sentinel),
  137. on_timeout_set=on_timeout_set,
  138. on_timeout_cancel=on_timeout_cancel,
  139. )
  140. def create(self, w, semaphore=None, max_restarts=None):
  141. threaded = not w.use_eventloop
  142. procs = w.min_concurrency
  143. forking_enable = w.no_execv or not w.force_execv
  144. if not threaded:
  145. semaphore = w.semaphore = BoundedSemaphore(procs)
  146. w._quick_acquire = w.semaphore.acquire
  147. w._quick_release = w.semaphore.release
  148. max_restarts = 100
  149. allow_restart = self.autoreload_enabled or w.pool_restarts
  150. pool = w.pool = self.instantiate(
  151. w.pool_cls, w.min_concurrency,
  152. initargs=(w.app, w.hostname),
  153. maxtasksperchild=w.max_tasks_per_child,
  154. timeout=w.task_time_limit,
  155. soft_timeout=w.task_soft_time_limit,
  156. putlocks=w.pool_putlocks and threaded,
  157. lost_worker_timeout=w.worker_lost_wait,
  158. threads=threaded,
  159. max_restarts=max_restarts,
  160. allow_restart=allow_restart,
  161. forking_enable=forking_enable,
  162. semaphore=semaphore,
  163. callbacks_propagate=(
  164. w._conninfo.connection_errors + w._conninfo.channel_errors
  165. ),
  166. )
  167. if w.hub:
  168. w.hub.on_init.append(partial(self.on_poll_init, pool, w))
  169. return pool
  170. class Beat(bootsteps.StartStopComponent):
  171. """Component used to embed a celerybeat process.
  172. This will only be enabled if the ``beat``
  173. argument is set.
  174. """
  175. name = 'worker.beat'
  176. def __init__(self, w, beat=False, **kwargs):
  177. self.enabled = w.beat = beat
  178. w.beat = None
  179. def create(self, w):
  180. from celery.beat import EmbeddedService
  181. b = w.beat = EmbeddedService(app=w.app,
  182. schedule_filename=w.schedule_filename,
  183. scheduler_cls=w.scheduler_cls)
  184. return b
  185. class Queues(bootsteps.Component):
  186. """This component initializes the internal queues
  187. used by the worker."""
  188. name = 'worker.queues'
  189. requires = ('ev', )
  190. def create(self, w):
  191. BucketType = TaskBucket
  192. w.start_mediator = not w.disable_rate_limits
  193. if not w.pool_cls.rlimit_safe:
  194. w.start_mediator = False
  195. BucketType = AsyncTaskBucket
  196. process_task = w.process_task
  197. if w.use_eventloop:
  198. w.start_mediator = False
  199. BucketType = AsyncTaskBucket
  200. if w.pool_putlocks and w.pool_cls.uses_semaphore:
  201. process_task = w.process_task_sem
  202. if w.disable_rate_limits:
  203. w.ready_queue = FastQueue()
  204. w.ready_queue.put = process_task
  205. else:
  206. w.ready_queue = BucketType(
  207. task_registry=w.app.tasks, callback=process_task, worker=w,
  208. )
  209. class EvLoop(bootsteps.StartStopComponent):
  210. name = 'worker.ev'
  211. def __init__(self, w, **kwargs):
  212. w.hub = None
  213. def include_if(self, w):
  214. return w.use_eventloop
  215. def create(self, w):
  216. w.timer = Schedule(max_interval=10)
  217. hub = w.hub = Hub(w.timer)
  218. return hub
  219. class Timers(bootsteps.Component):
  220. """This component initializes the internal timers used by the worker."""
  221. name = 'worker.timers'
  222. requires = ('pool', )
  223. def include_if(self, w):
  224. return not w.use_eventloop
  225. def create(self, w):
  226. if not w.timer_cls:
  227. # Default Timer is set by the pool, as e.g. eventlet
  228. # needs a custom implementation.
  229. w.timer_cls = w.pool.Timer
  230. w.timer = self.instantiate(w.pool.Timer,
  231. max_interval=w.timer_precision,
  232. on_timer_error=self.on_timer_error,
  233. on_timer_tick=self.on_timer_tick)
  234. def on_timer_error(self, exc):
  235. logger.error('Timer error: %r', exc, exc_info=True)
  236. def on_timer_tick(self, delay):
  237. logger.debug('Timer wake-up! Next eta %s secs.', delay)
  238. class StateDB(bootsteps.Component):
  239. """This component sets up the workers state db if enabled."""
  240. name = 'worker.state-db'
  241. def __init__(self, w, **kwargs):
  242. self.enabled = w.state_db
  243. w._persistence = None
  244. def create(self, w):
  245. w._persistence = state.Persistent(w.state_db)
  246. atexit.register(w._persistence.save)
  247. class WorkController(configurated):
  248. """Unmanaged worker instance."""
  249. RUN = RUN
  250. CLOSE = CLOSE
  251. TERMINATE = TERMINATE
  252. app = None
  253. concurrency = from_config()
  254. loglevel = logging.ERROR
  255. logfile = from_config('log_file')
  256. send_events = from_config()
  257. pool_cls = from_config('pool')
  258. consumer_cls = from_config('consumer')
  259. mediator_cls = from_config('mediator')
  260. timer_cls = from_config('timer')
  261. timer_precision = from_config('timer_precision')
  262. autoscaler_cls = from_config('autoscaler')
  263. autoreloader_cls = from_config('autoreloader')
  264. schedule_filename = from_config()
  265. scheduler_cls = from_config('celerybeat_scheduler')
  266. task_time_limit = from_config()
  267. task_soft_time_limit = from_config()
  268. max_tasks_per_child = from_config()
  269. pool_putlocks = from_config()
  270. pool_restarts = from_config()
  271. force_execv = from_config()
  272. prefetch_multiplier = from_config()
  273. state_db = from_config()
  274. disable_rate_limits = from_config()
  275. worker_lost_wait = from_config()
  276. _state = None
  277. _running = 0
  278. def __init__(self, loglevel=None, hostname=None, ready_callback=noop,
  279. queues=None, app=None, pidfile=None, use_eventloop=None,
  280. **kwargs):
  281. self.app = app_or_default(app or self.app)
  282. self._shutdown_complete = Event()
  283. self.setup_defaults(kwargs, namespace='celeryd')
  284. self.app.select_queues(queues) # select queues subset.
  285. # Options
  286. self.loglevel = loglevel or self.loglevel
  287. self.hostname = hostname or socket.gethostname()
  288. self.ready_callback = ready_callback
  289. self._finalize = Finalize(self, self.stop, exitpriority=1)
  290. self.pidfile = pidfile
  291. self.pidlock = None
  292. # this connection is not established, only used for params
  293. self._conninfo = self.app.connection()
  294. self.use_eventloop = (
  295. self.should_use_eventloop() if use_eventloop is None
  296. else use_eventloop
  297. )
  298. # Update celery_include to have all known task modules, so that we
  299. # ensure all task modules are imported in case an execv happens.
  300. task_modules = set(task.__class__.__module__
  301. for task in self.app.tasks.itervalues())
  302. self.app.conf.CELERY_INCLUDE = tuple(
  303. set(self.app.conf.CELERY_INCLUDE) | task_modules,
  304. )
  305. # Initialize boot steps
  306. self.pool_cls = _concurrency.get_implementation(self.pool_cls)
  307. self.components = []
  308. self.namespace = Namespace(app=self.app).apply(self, **kwargs)
  309. def start(self):
  310. """Starts the workers main loop."""
  311. self._state = self.RUN
  312. if self.pidfile:
  313. self.pidlock = platforms.create_pidlock(self.pidfile)
  314. try:
  315. for i, component in enumerate(self.components):
  316. logger.debug('Starting %s...', qualname(component))
  317. self._running = i + 1
  318. if component:
  319. component.start()
  320. logger.debug('%s OK!', qualname(component))
  321. except SystemTerminate:
  322. self.terminate()
  323. except Exception, exc:
  324. logger.error('Unrecoverable error: %r', exc,
  325. exc_info=True)
  326. self.stop()
  327. except (KeyboardInterrupt, SystemExit):
  328. self.stop()
  329. # Will only get here if running green,
  330. # makes sure all greenthreads have exited.
  331. self._shutdown_complete.wait()
  332. def process_task_sem(self, req):
  333. return self._quick_acquire(self.process_task, req)
  334. def process_task(self, req):
  335. """Process task by sending it to the pool of workers."""
  336. try:
  337. req.execute_using_pool(self.pool)
  338. except TaskRevokedError:
  339. try:
  340. self._quick_release() # Issue 877
  341. except AttributeError:
  342. pass
  343. except Exception, exc:
  344. logger.critical('Internal error: %r\n%s',
  345. exc, traceback.format_exc(), exc_info=True)
  346. except SystemTerminate:
  347. self.terminate()
  348. raise
  349. except BaseException, exc:
  350. self.stop()
  351. raise exc
  352. def signal_consumer_close(self):
  353. try:
  354. self.consumer.close()
  355. except AttributeError:
  356. pass
  357. def should_use_eventloop(self):
  358. return (detect_environment() == 'default' and
  359. self._conninfo.is_evented and not self.app.IS_WINDOWS)
  360. def stop(self, in_sighandler=False):
  361. """Graceful shutdown of the worker server."""
  362. self.signal_consumer_close()
  363. if not in_sighandler or self.pool.signal_safe:
  364. self._shutdown(warm=True)
  365. def terminate(self, in_sighandler=False):
  366. """Not so graceful shutdown of the worker server."""
  367. self.signal_consumer_close()
  368. if not in_sighandler or self.pool.signal_safe:
  369. self._shutdown(warm=False)
  370. def _shutdown(self, warm=True):
  371. what = 'Stopping' if warm else 'Terminating'
  372. socket_timeout = socket.getdefaulttimeout()
  373. socket.setdefaulttimeout(SHUTDOWN_SOCKET_TIMEOUT) # Issue 975
  374. if self._state in (self.CLOSE, self.TERMINATE):
  375. return
  376. self.app.loader.shutdown_worker()
  377. if self.pool:
  378. self.pool.close()
  379. if self._state != self.RUN or self._running != len(self.components):
  380. # Not fully started, can safely exit.
  381. self._state = self.TERMINATE
  382. self._shutdown_complete.set()
  383. return
  384. self._state = self.CLOSE
  385. for component in reversed(self.components):
  386. logger.debug('%s %s...', what, qualname(component))
  387. if component:
  388. stop = component.stop
  389. if not warm:
  390. stop = getattr(component, 'terminate', None) or stop
  391. stop()
  392. self.timer.stop()
  393. self.consumer.close_connection()
  394. if self.pidlock:
  395. self.pidlock.release()
  396. self._state = self.TERMINATE
  397. socket.setdefaulttimeout(socket_timeout)
  398. self._shutdown_complete.set()
  399. def reload(self, modules=None, reload=False, reloader=None):
  400. modules = self.app.loader.task_modules if modules is None else modules
  401. imp = self.app.loader.import_from_cwd
  402. for module in set(modules or ()):
  403. if module not in sys.modules:
  404. logger.debug('importing module %s', module)
  405. imp(module)
  406. elif reload:
  407. logger.debug('reloading module %s', module)
  408. reload_from_cwd(sys.modules[module], reloader)
  409. self.pool.restart()
  410. @property
  411. def state(self):
  412. return state