worker.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334
  1. import atexit
  2. import logging
  3. import multiprocessing
  4. import os
  5. import socket
  6. import sys
  7. import warnings
  8. from kombu.utils import partition
  9. from celery import __version__
  10. from celery import platforms
  11. from celery import signals
  12. from celery.app import app_or_default
  13. from celery.exceptions import ImproperlyConfigured, SystemTerminate
  14. from celery.utils import get_full_cls_name, LOG_LEVELS, cry
  15. from celery.worker import WorkController
  16. BANNER = """
  17. -------------- celery@%(hostname)s v%(version)s
  18. ---- **** -----
  19. --- * *** * -- [Configuration]
  20. -- * - **** --- . broker: %(conninfo)s
  21. - ** ---------- . loader: %(loader)s
  22. - ** ---------- . logfile: %(logfile)s@%(loglevel)s
  23. - ** ---------- . concurrency: %(concurrency)s
  24. - ** ---------- . events: %(events)s
  25. - *** --- * --- . beat: %(celerybeat)s
  26. -- ******* ----
  27. --- ***** ----- [Queues]
  28. -------------- %(queues)s
  29. """
  30. EXTRA_INFO_FMT = """
  31. [Tasks]
  32. %(tasks)s
  33. """
  34. class Worker(object):
  35. WorkController = WorkController
  36. def __init__(self, concurrency=None, loglevel=None, logfile=None,
  37. hostname=None, discard=False, run_clockservice=False,
  38. schedule=None, task_time_limit=None, task_soft_time_limit=None,
  39. max_tasks_per_child=None, queues=None, events=False, db=None,
  40. include=None, app=None, pidfile=None,
  41. redirect_stdouts=None, redirect_stdouts_level=None,
  42. autoscale=None, scheduler_cls=None, pool=None, **kwargs):
  43. self.app = app = app_or_default(app)
  44. self.concurrency = (concurrency or
  45. app.conf.CELERYD_CONCURRENCY or
  46. multiprocessing.cpu_count())
  47. self.loglevel = loglevel or app.conf.CELERYD_LOG_LEVEL
  48. self.logfile = logfile or app.conf.CELERYD_LOG_FILE
  49. self.hostname = hostname or socket.gethostname()
  50. self.discard = discard
  51. self.run_clockservice = run_clockservice
  52. if self.app.IS_WINDOWS and self.run_clockservice:
  53. self.die("-B option does not work on Windows. "
  54. "Please run celerybeat as a separate service.")
  55. self.schedule = schedule or app.conf.CELERYBEAT_SCHEDULE_FILENAME
  56. self.scheduler_cls = scheduler_cls or app.conf.CELERYBEAT_SCHEDULER
  57. self.events = events
  58. self.task_time_limit = (task_time_limit or
  59. app.conf.CELERYD_TASK_TIME_LIMIT)
  60. self.task_soft_time_limit = (task_soft_time_limit or
  61. app.conf.CELERYD_TASK_SOFT_TIME_LIMIT)
  62. self.max_tasks_per_child = (max_tasks_per_child or
  63. app.conf.CELERYD_MAX_TASKS_PER_CHILD)
  64. self.redirect_stdouts = (redirect_stdouts or
  65. app.conf.CELERY_REDIRECT_STDOUTS)
  66. self.redirect_stdouts_level = (redirect_stdouts_level or
  67. app.conf.CELERY_REDIRECT_STDOUTS_LEVEL)
  68. self.pool = (pool or app.conf.CELERYD_POOL)
  69. self.db = db
  70. self.use_queues = queues or []
  71. self.queues = None
  72. self.include = include or []
  73. self.pidfile = pidfile
  74. self.autoscale = None
  75. if autoscale:
  76. max_c, _, min_c = partition(autoscale, ",")
  77. self.autoscale = [int(max_c), min_c and int(min_c) or 0]
  78. self._isatty = sys.stdout.isatty()
  79. self.colored = app.log.colored(self.logfile)
  80. if isinstance(self.use_queues, basestring):
  81. self.use_queues = self.use_queues.split(",")
  82. if isinstance(self.include, basestring):
  83. self.include = self.include.split(",")
  84. if not isinstance(self.loglevel, int):
  85. try:
  86. self.loglevel = LOG_LEVELS[self.loglevel.upper()]
  87. except KeyError:
  88. self.die("Unknown level %r. Please use one of %s." % (
  89. self.loglevel,
  90. "|".join(l for l in LOG_LEVELS.keys()
  91. if isinstance(l, basestring))))
  92. def run(self):
  93. self.init_loader()
  94. self.init_queues()
  95. self.worker_init()
  96. self.redirect_stdouts_to_logger()
  97. if getattr(os, "geteuid", None) and os.geteuid() == 0:
  98. warnings.warn(
  99. "Running celeryd with superuser privileges is not encouraged!")
  100. if self.discard:
  101. self.purge_messages()
  102. # Dump configuration to screen so we have some basic information
  103. # for when users sends bug reports.
  104. print(str(self.colored.cyan(" \n", self.startup_info())) +
  105. str(self.colored.reset(self.extra_info())))
  106. self.set_process_status("-active-")
  107. self.run_worker()
  108. def on_consumer_ready(self, consumer):
  109. signals.worker_ready.send(sender=consumer)
  110. print("celery@%s has started." % self.hostname)
  111. def init_queues(self):
  112. if self.use_queues:
  113. create_missing = self.app.conf.CELERY_CREATE_MISSING_QUEUES
  114. try:
  115. self.app.amqp.queues.select_subset(self.use_queues,
  116. create_missing)
  117. except KeyError, exc:
  118. raise ImproperlyConfigured(
  119. "Trying to select queue subset of %r, but queue %s"
  120. "is not defined in CELERY_QUEUES. If you want to "
  121. "automatically declare unknown queues you have to "
  122. "enable CELERY_CREATE_MISSING_QUEUES" % (
  123. self.use_queues, exc))
  124. self.queues = self.app.amqp.queues
  125. def init_loader(self):
  126. self.loader = self.app.loader
  127. self.settings = self.app.conf
  128. for module in self.include:
  129. self.loader.import_module(module)
  130. def redirect_stdouts_to_logger(self):
  131. handled = self.app.log.setup_logging_subsystem(loglevel=self.loglevel,
  132. logfile=self.logfile)
  133. if not handled:
  134. logger = self.app.log.get_default_logger()
  135. if self.redirect_stdouts:
  136. self.app.log.redirect_stdouts_to_logger(logger,
  137. loglevel=self.redirect_stdouts_level)
  138. def purge_messages(self):
  139. count = self.app.control.discard_all()
  140. what = (not count or count > 1) and "messages" or "message"
  141. print("discard: Erased %d %s from the queue.\n" % (count, what))
  142. def worker_init(self):
  143. # Run the worker init handler.
  144. # (Usually imports task modules and such.)
  145. self.loader.init_worker()
  146. def tasklist(self, include_builtins=True):
  147. from celery.registry import tasks
  148. tasklist = tasks.keys()
  149. if not include_builtins:
  150. tasklist = filter(lambda s: not s.startswith("celery."),
  151. tasklist)
  152. return "\n".join(" . %s" % task for task in sorted(tasklist))
  153. def extra_info(self):
  154. if self.loglevel <= logging.INFO:
  155. include_builtins = self.loglevel <= logging.DEBUG
  156. tasklist = self.tasklist(include_builtins=include_builtins)
  157. return EXTRA_INFO_FMT % {"tasks": tasklist}
  158. return ""
  159. def startup_info(self):
  160. concurrency = self.concurrency
  161. if self.autoscale:
  162. cmax, cmin = self.autoscale
  163. concurrency = "{min=%s, max=%s}" % (cmin, cmax)
  164. return BANNER % {
  165. "hostname": self.hostname,
  166. "version": __version__,
  167. "conninfo": self.app.broker_connection().as_uri(),
  168. "concurrency": concurrency,
  169. "loglevel": LOG_LEVELS[self.loglevel],
  170. "logfile": self.logfile or "[stderr]",
  171. "celerybeat": self.run_clockservice and "ON" or "OFF",
  172. "events": self.events and "ON" or "OFF",
  173. "loader": get_full_cls_name(self.loader.__class__),
  174. "queues": self.queues.format(indent=18, indent_first=False),
  175. }
  176. def run_worker(self):
  177. if self.pidfile:
  178. pidlock = platforms.create_pidlock(self.pidfile).acquire()
  179. atexit.register(pidlock.release)
  180. worker = self.WorkController(app=self.app,
  181. concurrency=self.concurrency,
  182. loglevel=self.loglevel,
  183. logfile=self.logfile,
  184. hostname=self.hostname,
  185. ready_callback=self.on_consumer_ready,
  186. embed_clockservice=self.run_clockservice,
  187. schedule_filename=self.schedule,
  188. scheduler_cls=self.scheduler_cls,
  189. send_events=self.events,
  190. db=self.db,
  191. queues=self.queues,
  192. max_tasks_per_child=self.max_tasks_per_child,
  193. task_time_limit=self.task_time_limit,
  194. task_soft_time_limit=self.task_soft_time_limit,
  195. autoscale=self.autoscale,
  196. pool_cls=self.pool)
  197. self.install_platform_tweaks(worker)
  198. worker.start()
  199. def install_platform_tweaks(self, worker):
  200. """Install platform specific tweaks and workarounds."""
  201. if self.app.IS_OSX:
  202. self.osx_proxy_detection_workaround()
  203. # Install signal handler so SIGHUP restarts the worker.
  204. if not self._isatty:
  205. # only install HUP handler if detached from terminal,
  206. # so closing the terminal window doesn't restart celeryd
  207. # into the background.
  208. if self.app.IS_OSX:
  209. # OS X can't exec from a process using threads.
  210. # See http://github.com/ask/celery/issues#issue/152
  211. install_HUP_not_supported_handler(worker)
  212. else:
  213. install_worker_restart_handler(worker)
  214. install_worker_term_handler(worker)
  215. install_worker_int_handler(worker)
  216. install_cry_handler(worker.logger)
  217. signals.worker_init.send(sender=worker)
  218. def osx_proxy_detection_workaround(self):
  219. """See http://github.com/ask/celery/issues#issue/161"""
  220. os.environ.setdefault("celery_dummy_proxy", "set_by_celeryd")
  221. def set_process_status(self, info):
  222. info = "%s (%s)" % (info, platforms.strargv(sys.argv))
  223. return platforms.set_mp_process_title("celeryd",
  224. info=info,
  225. hostname=self.hostname)
  226. def die(self, msg, exitcode=1):
  227. sys.stderr.write("Error: %s\n" % (msg, ))
  228. sys.exit(exitcode)
  229. def install_worker_int_handler(worker):
  230. def _stop(signum, frame):
  231. process_name = multiprocessing.current_process().name
  232. if process_name == "MainProcess":
  233. worker.logger.warn(
  234. "celeryd: Hitting Ctrl+C again will terminate "
  235. "all running tasks!")
  236. install_worker_int_again_handler(worker)
  237. worker.logger.warn("celeryd: Warm shutdown (%s)" % (
  238. process_name))
  239. worker.stop(in_sighandler=True)
  240. raise SystemExit()
  241. platforms.install_signal_handler("SIGINT", _stop)
  242. def install_worker_int_again_handler(worker):
  243. def _stop(signum, frame):
  244. process_name = multiprocessing.current_process().name
  245. if process_name == "MainProcess":
  246. worker.logger.warn("celeryd: Cold shutdown (%s)" % (
  247. process_name))
  248. worker.terminate(in_sighandler=True)
  249. raise SystemTerminate()
  250. platforms.install_signal_handler("SIGINT", _stop)
  251. def install_worker_term_handler(worker):
  252. def _stop(signum, frame):
  253. process_name = multiprocessing.current_process().name
  254. if process_name == "MainProcess":
  255. worker.logger.warn("celeryd: Warm shutdown (%s)" % (
  256. process_name))
  257. worker.stop(in_sighandler=True)
  258. raise SystemExit()
  259. platforms.install_signal_handler("SIGTERM", _stop)
  260. def install_worker_restart_handler(worker):
  261. def restart_worker_sig_handler(signum, frame):
  262. """Signal handler restarting the current python program."""
  263. worker.logger.warn("Restarting celeryd (%s)" % (
  264. " ".join(sys.argv)))
  265. worker.stop(in_sighandler=True)
  266. os.execv(sys.executable, [sys.executable] + sys.argv)
  267. platforms.install_signal_handler("SIGHUP", restart_worker_sig_handler)
  268. def install_cry_handler(logger):
  269. # 2.4 does not have sys._current_frames
  270. if sys.version_info > (2, 5):
  271. def cry_handler(signum, frame):
  272. """Signal handler logging the stacktrace of all active threads."""
  273. logger.error("\n" + cry())
  274. platforms.install_signal_handler("SIGUSR1", cry_handler)
  275. def install_HUP_not_supported_handler(worker):
  276. def warn_on_HUP_handler(signum, frame):
  277. worker.logger.error("SIGHUP not supported: "
  278. "Restarting with HUP is unstable on this platform!")
  279. platforms.install_signal_handler("SIGHUP", warn_on_HUP_handler)