__init__.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308
  1. import socket
  2. import logging
  3. import traceback
  4. from kombu.syn import blocking
  5. from kombu.utils.finalize import Finalize
  6. from celery import beat
  7. from celery import concurrency as _concurrency
  8. from celery import registry
  9. from celery import platforms
  10. from celery import signals
  11. from celery.app import app_or_default
  12. from celery.exceptions import SystemTerminate
  13. from celery.log import SilenceRepeated
  14. from celery.utils import noop, instantiate
  15. from celery.worker import state
  16. from celery.worker.buckets import TaskBucket, FastQueue
  17. RUN = 0x1
  18. CLOSE = 0x2
  19. TERMINATE = 0x3
  20. #: List of signals to reset when a child process starts.
  21. WORKER_SIGRESET = frozenset(["SIGTERM",
  22. "SIGHUP",
  23. "SIGTTIN",
  24. "SIGTTOU",
  25. "SIGUSR1"])
  26. #: List of signals to ignore when a child process starts.
  27. WORKER_SIGIGNORE = frozenset(["SIGINT"])
  28. def process_initializer(app, hostname):
  29. """Initializes the process so it can be used to process tasks.
  30. Used for multiprocessing environments.
  31. """
  32. app = app_or_default(app)
  33. app.set_current()
  34. [platforms.reset_signal(signal) for signal in WORKER_SIGRESET]
  35. [platforms.ignore_signal(signal) for signal in WORKER_SIGIGNORE]
  36. platforms.set_mp_process_title("celeryd", hostname=hostname)
  37. # This is for windows and other platforms not supporting
  38. # fork(). Note that init_worker makes sure it's only
  39. # run once per process.
  40. app.loader.init_worker()
  41. signals.worker_process_init.send(sender=None)
  42. class WorkController(object):
  43. """Unmanaged worker instance."""
  44. RUN = RUN
  45. CLOSE = CLOSE
  46. TERMINATE = TERMINATE
  47. #: The number of simultaneous processes doing work (default:
  48. #: :setting:`CELERYD_CONCURRENCY`)
  49. concurrency = None
  50. #: The loglevel used (default: :const:`logging.INFO`)
  51. loglevel = logging.ERROR
  52. #: The logfile used, if no logfile is specified it uses `stderr`
  53. #: (default: :setting:`CELERYD_LOG_FILE`).
  54. logfile = None
  55. #: If :const:`True`, celerybeat is embedded, running in the main worker
  56. #: process as a thread.
  57. embed_clockservice = None
  58. #: Enable the sending of monitoring events, these events can be captured
  59. #: by monitors (celerymon).
  60. send_events = False
  61. #: The :class:`logging.Logger` instance used for logging.
  62. logger = None
  63. #: The pool instance used.
  64. pool = None
  65. #: The internal queue object that holds tasks ready for immediate
  66. #: processing.
  67. ready_queue = None
  68. #: Instance of :class:`celery.worker.controllers.Mediator`.
  69. mediator = None
  70. #: Consumer instance.
  71. consumer = None
  72. _state = None
  73. _running = 0
  74. def __init__(self, concurrency=None, logfile=None, loglevel=None,
  75. send_events=None, hostname=None, ready_callback=noop,
  76. embed_clockservice=False, pool_cls=None, consumer_cls=None,
  77. mediator_cls=None, eta_scheduler_cls=None,
  78. schedule_filename=None, task_time_limit=None,
  79. task_soft_time_limit=None, max_tasks_per_child=None,
  80. pool_putlocks=None, db=None, prefetch_multiplier=None,
  81. eta_scheduler_precision=None, disable_rate_limits=None,
  82. autoscale=None, autoscaler_cls=None, scheduler_cls=None,
  83. app=None):
  84. self.app = app_or_default(app)
  85. conf = self.app.conf
  86. # Options
  87. self.loglevel = loglevel or self.loglevel
  88. self.concurrency = concurrency or conf.CELERYD_CONCURRENCY
  89. self.logfile = logfile or conf.CELERYD_LOG_FILE
  90. self.logger = self.app.log.get_default_logger()
  91. if send_events is None:
  92. send_events = conf.CELERY_SEND_EVENTS
  93. self.send_events = send_events
  94. self.pool_cls = _concurrency.get_implementation(
  95. pool_cls or conf.CELERYD_POOL)
  96. self.consumer_cls = consumer_cls or conf.CELERYD_CONSUMER
  97. self.mediator_cls = mediator_cls or conf.CELERYD_MEDIATOR
  98. self.eta_scheduler_cls = eta_scheduler_cls or \
  99. conf.CELERYD_ETA_SCHEDULER
  100. self.autoscaler_cls = autoscaler_cls or \
  101. conf.CELERYD_AUTOSCALER
  102. self.schedule_filename = schedule_filename or \
  103. conf.CELERYBEAT_SCHEDULE_FILENAME
  104. self.scheduler_cls = scheduler_cls or conf.CELERYBEAT_SCHEDULER
  105. self.hostname = hostname or socket.gethostname()
  106. self.embed_clockservice = embed_clockservice
  107. self.ready_callback = ready_callback
  108. self.task_time_limit = task_time_limit or \
  109. conf.CELERYD_TASK_TIME_LIMIT
  110. self.task_soft_time_limit = task_soft_time_limit or \
  111. conf.CELERYD_TASK_SOFT_TIME_LIMIT
  112. self.max_tasks_per_child = max_tasks_per_child or \
  113. conf.CELERYD_MAX_TASKS_PER_CHILD
  114. self.pool_putlocks = pool_putlocks or \
  115. conf.CELERYD_POOL_PUTLOCKS
  116. self.eta_scheduler_precision = eta_scheduler_precision or \
  117. conf.CELERYD_ETA_SCHEDULER_PRECISION
  118. self.prefetch_multiplier = prefetch_multiplier or \
  119. conf.CELERYD_PREFETCH_MULTIPLIER
  120. self.timer_debug = SilenceRepeated(self.logger.debug,
  121. max_iterations=10)
  122. self.db = db or conf.CELERYD_STATE_DB
  123. self.disable_rate_limits = disable_rate_limits or \
  124. conf.CELERY_DISABLE_RATE_LIMITS
  125. self._finalize = Finalize(self, self.stop, exitpriority=1)
  126. self._finalize_db = None
  127. if self.db:
  128. persistence = state.Persistent(self.db)
  129. self._finalize_db = Finalize(persistence, persistence.save,
  130. exitpriority=5)
  131. # Queues
  132. if self.disable_rate_limits:
  133. self.ready_queue = FastQueue()
  134. self.ready_queue.put = self.process_task
  135. else:
  136. self.ready_queue = TaskBucket(task_registry=registry.tasks)
  137. self.logger.debug("Instantiating thread components...")
  138. # Threads + Pool + Consumer
  139. self.autoscaler = None
  140. max_concurrency = None
  141. min_concurrency = concurrency
  142. if autoscale:
  143. max_concurrency, min_concurrency = autoscale
  144. self.pool = instantiate(self.pool_cls, min_concurrency,
  145. logger=self.logger,
  146. initializer=process_initializer,
  147. initargs=(self.app, self.hostname),
  148. maxtasksperchild=self.max_tasks_per_child,
  149. timeout=self.task_time_limit,
  150. soft_timeout=self.task_soft_time_limit,
  151. putlocks=self.pool_putlocks)
  152. if not self.eta_scheduler_cls:
  153. # Default Timer is set by the pool, as e.g. eventlet
  154. # needs a custom implementation.
  155. self.eta_scheduler_cls = self.pool.Timer
  156. if autoscale:
  157. self.autoscaler = instantiate(self.autoscaler_cls, self.pool,
  158. max_concurrency=max_concurrency,
  159. min_concurrency=min_concurrency,
  160. logger=self.logger)
  161. self.mediator = None
  162. if not self.disable_rate_limits:
  163. self.mediator = instantiate(self.mediator_cls, self.ready_queue,
  164. app=self.app,
  165. callback=self.process_task,
  166. logger=self.logger)
  167. self.scheduler = instantiate(self.eta_scheduler_cls,
  168. precision=eta_scheduler_precision,
  169. on_error=self.on_timer_error,
  170. on_tick=self.on_timer_tick)
  171. self.beat = None
  172. if self.embed_clockservice:
  173. self.beat = beat.EmbeddedService(app=self.app,
  174. logger=self.logger,
  175. schedule_filename=self.schedule_filename,
  176. scheduler_cls=self.scheduler_cls)
  177. prefetch_count = self.concurrency * self.prefetch_multiplier
  178. self.consumer = instantiate(self.consumer_cls,
  179. self.ready_queue,
  180. self.scheduler,
  181. logger=self.logger,
  182. hostname=self.hostname,
  183. send_events=self.send_events,
  184. init_callback=self.ready_callback,
  185. initial_prefetch_count=prefetch_count,
  186. pool=self.pool,
  187. app=self.app)
  188. # The order is important here;
  189. # the first in the list is the first to start,
  190. # and they must be stopped in reverse order.
  191. self.components = filter(None, (self.pool,
  192. self.mediator,
  193. self.scheduler,
  194. self.beat,
  195. self.autoscaler,
  196. self.consumer))
  197. def start(self):
  198. """Starts the workers main loop."""
  199. self._state = self.RUN
  200. try:
  201. for i, component in enumerate(self.components):
  202. self.logger.debug("Starting thread %s..." % (
  203. component.__class__.__name__))
  204. self._running = i + 1
  205. blocking(component.start)
  206. except SystemTerminate:
  207. self.terminate()
  208. raise SystemExit()
  209. except (SystemExit, KeyboardInterrupt), exc:
  210. self.stop()
  211. raise exc
  212. def process_task(self, request):
  213. """Process task by sending it to the pool of workers."""
  214. try:
  215. request.task.execute(request, self.pool,
  216. self.loglevel, self.logfile)
  217. except SystemTerminate:
  218. self.terminate()
  219. raise SystemExit()
  220. except (SystemExit, KeyboardInterrupt), exc:
  221. self.stop()
  222. raise exc
  223. except Exception, exc:
  224. self.logger.critical("Internal error %s: %s\n%s" % (
  225. exc.__class__, exc, traceback.format_exc()))
  226. def stop(self, in_sighandler=False):
  227. """Graceful shutdown of the worker server."""
  228. if in_sighandler and not self.pool.signal_safe:
  229. return
  230. blocking(self._shutdown, warm=True)
  231. def terminate(self, in_sighandler=False):
  232. """Not so graceful shutdown of the worker server."""
  233. if in_sighandler and not self.pool.signal_safe:
  234. return
  235. blocking(self._shutdown, warm=False)
  236. def _shutdown(self, warm=True):
  237. what = (warm and "stopping" or "terminating").capitalize()
  238. if self._state != self.RUN or self._running != len(self.components):
  239. # Not fully started, can safely exit.
  240. return
  241. self._state = self.CLOSE
  242. signals.worker_shutdown.send(sender=self)
  243. for component in reversed(self.components):
  244. self.logger.debug("%s thread %s..." % (
  245. what, component.__class__.__name__))
  246. stop = component.stop
  247. if not warm:
  248. stop = getattr(component, "terminate", stop)
  249. stop()
  250. self.consumer.close_connection()
  251. self._state = self.TERMINATE
  252. def on_timer_error(self, exc_info):
  253. _, exc, _ = exc_info
  254. self.logger.error("Timer error: %r" % (exc, ))
  255. def on_timer_tick(self, delay):
  256. self.timer_debug("Scheduler wake-up! Next eta %s secs." % delay)