__init__.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307
  1. import socket
  2. import logging
  3. import traceback
  4. from multiprocessing.util import Finalize
  5. from celery import beat
  6. from celery import concurrency as _concurrency
  7. from celery import registry
  8. from celery import platforms
  9. from celery import signals
  10. from celery.app import app_or_default
  11. from celery.exceptions import SystemTerminate
  12. from celery.log import SilenceRepeated
  13. from celery.utils import noop, instantiate
  14. from celery.worker import state
  15. from celery.worker.buckets import TaskBucket, FastQueue
  16. RUN = 0x1
  17. CLOSE = 0x2
  18. TERMINATE = 0x3
  19. #: List of signals to reset when a child process starts.
  20. WORKER_SIGRESET = frozenset(["SIGTERM",
  21. "SIGHUP",
  22. "SIGTTIN",
  23. "SIGTTOU"])
  24. #: List of signals to ignore when a child process starts.
  25. WORKER_SIGIGNORE = frozenset(["SIGINT"])
  26. def process_initializer(app, hostname):
  27. """Initializes the process so it can be used to process tasks.
  28. Used for multiprocessing environments.
  29. """
  30. app = app_or_default(app)
  31. app.set_current()
  32. [platforms.reset_signal(signal) for signal in WORKER_SIGRESET]
  33. [platforms.ignore_signal(signal) for signal in WORKER_SIGIGNORE]
  34. platforms.set_mp_process_title("celeryd", hostname=hostname)
  35. # This is for windows and other platforms not supporting
  36. # fork(). Note that init_worker makes sure it's only
  37. # run once per process.
  38. app.loader.init_worker()
  39. signals.worker_process_init.send(sender=None)
  40. class WorkController(object):
  41. """Unmanaged worker instance."""
  42. RUN = RUN
  43. CLOSE = CLOSE
  44. TERMINATE = TERMINATE
  45. #: The number of simultaneous processes doing work (default:
  46. #: :setting:`CELERYD_CONCURRENCY`)
  47. concurrency = None
  48. #: The loglevel used (default: :const:`logging.INFO`)
  49. loglevel = logging.ERROR
  50. #: The logfile used, if no logfile is specified it uses `stderr`
  51. #: (default: :setting:`CELERYD_LOG_FILE`).
  52. logfile = None
  53. #: If :const:`True`, celerybeat is embedded, running in the main worker
  54. #: process as a thread.
  55. embed_clockservice = None
  56. #: Enable the sending of monitoring events, these events can be captured
  57. #: by monitors (celerymon).
  58. send_events = False
  59. #: The :class:`logging.Logger` instance used for logging.
  60. logger = None
  61. #: The pool instance used.
  62. pool = None
  63. #: The internal queue object that holds tasks ready for immediate
  64. #: processing.
  65. ready_queue = None
  66. #: Instance of :class:`celery.worker.controllers.Mediator`.
  67. mediator = None
  68. #: Consumer instance.
  69. consumer = None
  70. _state = None
  71. _running = 0
  72. def __init__(self, concurrency=None, logfile=None, loglevel=None,
  73. send_events=None, hostname=None, ready_callback=noop,
  74. embed_clockservice=False, pool_cls=None, consumer_cls=None,
  75. mediator_cls=None, eta_scheduler_cls=None,
  76. schedule_filename=None, task_time_limit=None,
  77. task_soft_time_limit=None, max_tasks_per_child=None,
  78. pool_putlocks=None, db=None, prefetch_multiplier=None,
  79. eta_scheduler_precision=None, queues=None,
  80. disable_rate_limits=None, autoscale=None,
  81. autoscaler_cls=None, scheduler_cls=None, app=None):
  82. self.app = app_or_default(app)
  83. conf = self.app.conf
  84. # Options
  85. self.loglevel = loglevel or self.loglevel
  86. self.concurrency = concurrency or conf.CELERYD_CONCURRENCY
  87. self.logfile = logfile or conf.CELERYD_LOG_FILE
  88. self.logger = self.app.log.get_default_logger()
  89. if send_events is None:
  90. send_events = conf.CELERY_SEND_EVENTS
  91. self.send_events = send_events
  92. self.pool_cls = _concurrency.get_implementation(
  93. pool_cls or conf.CELERYD_POOL)
  94. self.consumer_cls = consumer_cls or conf.CELERYD_CONSUMER
  95. self.mediator_cls = mediator_cls or conf.CELERYD_MEDIATOR
  96. self.eta_scheduler_cls = eta_scheduler_cls or \
  97. conf.CELERYD_ETA_SCHEDULER
  98. self.autoscaler_cls = autoscaler_cls or \
  99. conf.CELERYD_AUTOSCALER
  100. self.schedule_filename = schedule_filename or \
  101. conf.CELERYBEAT_SCHEDULE_FILENAME
  102. self.scheduler_cls = scheduler_cls or conf.CELERYBEAT_SCHEDULER
  103. self.hostname = hostname or socket.gethostname()
  104. self.embed_clockservice = embed_clockservice
  105. self.ready_callback = ready_callback
  106. self.task_time_limit = task_time_limit or \
  107. conf.CELERYD_TASK_TIME_LIMIT
  108. self.task_soft_time_limit = task_soft_time_limit or \
  109. conf.CELERYD_TASK_SOFT_TIME_LIMIT
  110. self.max_tasks_per_child = max_tasks_per_child or \
  111. conf.CELERYD_MAX_TASKS_PER_CHILD
  112. self.pool_putlocks = pool_putlocks or \
  113. conf.CELERYD_POOL_PUTLOCKS
  114. self.eta_scheduler_precision = eta_scheduler_precision or \
  115. conf.CELERYD_ETA_SCHEDULER_PRECISION
  116. self.prefetch_multiplier = prefetch_multiplier or \
  117. conf.CELERYD_PREFETCH_MULTIPLIER
  118. self.timer_debug = SilenceRepeated(self.logger.debug,
  119. max_iterations=10)
  120. self.db = db or conf.CELERYD_STATE_DB
  121. self.disable_rate_limits = disable_rate_limits or \
  122. conf.CELERY_DISABLE_RATE_LIMITS
  123. self.queues = queues
  124. self._finalize = Finalize(self, self.stop, exitpriority=1)
  125. if self.db:
  126. persistence = state.Persistent(self.db)
  127. Finalize(persistence, persistence.save, exitpriority=5)
  128. # Queues
  129. if self.disable_rate_limits:
  130. self.ready_queue = FastQueue()
  131. self.ready_queue.put = self.process_task
  132. else:
  133. self.ready_queue = TaskBucket(task_registry=registry.tasks)
  134. self.logger.debug("Instantiating thread components...")
  135. # Threads + Pool + Consumer
  136. self.autoscaler = None
  137. max_concurrency = None
  138. min_concurrency = concurrency
  139. if autoscale:
  140. max_concurrency, min_concurrency = autoscale
  141. self.pool = instantiate(self.pool_cls, min_concurrency,
  142. logger=self.logger,
  143. initializer=process_initializer,
  144. initargs=(self.app, self.hostname),
  145. maxtasksperchild=self.max_tasks_per_child,
  146. timeout=self.task_time_limit,
  147. soft_timeout=self.task_soft_time_limit,
  148. putlocks=self.pool_putlocks)
  149. if not self.eta_scheduler_cls:
  150. # Default Timer is set by the pool, as e.g. eventlet
  151. # needs a custom implementation.
  152. self.eta_scheduler_cls = self.pool.Timer
  153. if autoscale:
  154. self.autoscaler = instantiate(self.autoscaler_cls, self.pool,
  155. max_concurrency=max_concurrency,
  156. min_concurrency=min_concurrency,
  157. logger=self.logger)
  158. self.mediator = None
  159. if not self.disable_rate_limits:
  160. self.mediator = instantiate(self.mediator_cls, self.ready_queue,
  161. app=self.app,
  162. callback=self.process_task,
  163. logger=self.logger)
  164. self.scheduler = instantiate(self.eta_scheduler_cls,
  165. precision=eta_scheduler_precision,
  166. on_error=self.on_timer_error,
  167. on_tick=self.on_timer_tick)
  168. self.beat = None
  169. if self.embed_clockservice:
  170. self.beat = beat.EmbeddedService(app=self.app,
  171. logger=self.logger,
  172. schedule_filename=self.schedule_filename,
  173. scheduler_cls=self.scheduler_cls)
  174. prefetch_count = self.concurrency * self.prefetch_multiplier
  175. self.consumer = instantiate(self.consumer_cls,
  176. self.ready_queue,
  177. self.scheduler,
  178. logger=self.logger,
  179. hostname=self.hostname,
  180. send_events=self.send_events,
  181. init_callback=self.ready_callback,
  182. initial_prefetch_count=prefetch_count,
  183. pool=self.pool,
  184. queues=self.queues,
  185. app=self.app)
  186. # The order is important here;
  187. # the first in the list is the first to start,
  188. # and they must be stopped in reverse order.
  189. self.components = filter(None, (self.pool,
  190. self.mediator,
  191. self.scheduler,
  192. self.beat,
  193. self.autoscaler,
  194. self.consumer))
  195. def start(self):
  196. """Starts the workers main loop."""
  197. self._state = self.RUN
  198. try:
  199. for i, component in enumerate(self.components):
  200. self.logger.debug("Starting thread %s..." % (
  201. component.__class__.__name__))
  202. self._running = i + 1
  203. self.pool.blocking(component.start)
  204. except SystemTerminate:
  205. self.terminate()
  206. raise SystemExit()
  207. except (SystemExit, KeyboardInterrupt), exc:
  208. self.stop()
  209. raise exc
  210. def process_task(self, wrapper):
  211. """Process task by sending it to the pool of workers."""
  212. try:
  213. try:
  214. wrapper.task.execute(wrapper, self.pool,
  215. self.loglevel, self.logfile)
  216. except Exception, exc:
  217. self.logger.critical("Internal error %s: %s\n%s" % (
  218. exc.__class__, exc, traceback.format_exc()))
  219. except SystemTerminate:
  220. self.terminate()
  221. raise SystemExit()
  222. except (SystemExit, KeyboardInterrupt), exc:
  223. self.stop()
  224. raise exc
  225. def stop(self, in_sighandler=False):
  226. """Graceful shutdown of the worker server."""
  227. if in_sighandler and not self.pool.signal_safe:
  228. return
  229. self.pool.blocking(self._shutdown, warm=True)
  230. def terminate(self, in_sighandler=False):
  231. """Not so graceful shutdown of the worker server."""
  232. if in_sighandler and not self.pool.signal_safe:
  233. return
  234. self.pool.blocking(self._shutdown, warm=False)
  235. def _shutdown(self, warm=True):
  236. what = (warm and "stopping" or "terminating").capitalize()
  237. if self._state != self.RUN or self._running != len(self.components):
  238. # Not fully started, can safely exit.
  239. return
  240. self._state = self.CLOSE
  241. signals.worker_shutdown.send(sender=self)
  242. for component in reversed(self.components):
  243. self.logger.debug("%s thread %s..." % (
  244. what, component.__class__.__name__))
  245. stop = component.stop
  246. if not warm:
  247. stop = getattr(component, "terminate", stop)
  248. stop()
  249. self.consumer.close_connection()
  250. self._state = self.TERMINATE
  251. def on_timer_error(self, exc_info):
  252. _, exc, _ = exc_info
  253. self.logger.error("Timer error: %r" % (exc, ))
  254. def on_timer_tick(self, delay):
  255. self.timer_debug("Scheduler wake-up! Next eta %s secs." % delay)