worker.py 13 KB


  1. import atexit
  2. import logging
  3. import multiprocessing
  4. import os
  5. import socket
  6. import sys
  7. import warnings
  8. from kombu.utils import partition
  9. from celery import __version__
  10. from celery import platforms
  11. from celery import signals
  12. from celery.app import app_or_default
  13. from celery.exceptions import ImproperlyConfigured, SystemTerminate
  14. from celery.utils import get_full_cls_name, LOG_LEVELS, cry
  15. from celery.worker import WorkController
  16. BANNER = """
  17. -------------- celery@%(hostname)s v%(version)s
  18. ---- **** -----
  19. --- * *** * -- [Configuration]
  20. -- * - **** --- . broker: %(conninfo)s
  21. - ** ---------- . loader: %(loader)s
  22. - ** ---------- . logfile: %(logfile)s@%(loglevel)s
  23. - ** ---------- . concurrency: %(concurrency)s
  24. - ** ---------- . events: %(events)s
  25. - *** --- * --- . beat: %(celerybeat)s
  26. -- ******* ----
  27. --- ***** ----- [Queues]
  28. -------------- %(queues)s
  29. """
  30. EXTRA_INFO_FMT = """
  31. [Tasks]
  32. %(tasks)s
  33. """
  34. class Worker(object):
  35. WorkController = WorkController
  36. def __init__(self, concurrency=None, loglevel=None, logfile=None,
  37. hostname=None, discard=False, run_clockservice=False,
  38. schedule=None, task_time_limit=None, task_soft_time_limit=None,
  39. max_tasks_per_child=None, queues=None, events=False, db=None,
  40. include=None, app=None, pidfile=None,
  41. redirect_stdouts=None, redirect_stdouts_level=None,
  42. autoscale=None, scheduler_cls=None, pool=None, **kwargs):
  43. self.app = app = app_or_default(app)
  44. self.concurrency = (concurrency or
  45. app.conf.CELERYD_CONCURRENCY or
  46. multiprocessing.cpu_count())
  47. self.loglevel = loglevel or app.conf.CELERYD_LOG_LEVEL
  48. self.logfile = logfile or app.conf.CELERYD_LOG_FILE
  49. self.hostname = hostname or socket.gethostname()
  50. self.discard = discard
  51. self.run_clockservice = run_clockservice
  52. if self.app.IS_WINDOWS and self.run_clockservice:
  53. self.die("-B option does not work on Windows. "
  54. "Please run celerybeat as a separate service.")
  55. self.schedule = schedule or app.conf.CELERYBEAT_SCHEDULE_FILENAME
  56. self.scheduler_cls = scheduler_cls or app.conf.CELERYBEAT_SCHEDULER
  57. self.events = events
  58. self.task_time_limit = (task_time_limit or
  59. app.conf.CELERYD_TASK_TIME_LIMIT)
  60. self.task_soft_time_limit = (task_soft_time_limit or
  61. app.conf.CELERYD_TASK_SOFT_TIME_LIMIT)
  62. self.max_tasks_per_child = (max_tasks_per_child or
  63. app.conf.CELERYD_MAX_TASKS_PER_CHILD)
  64. self.redirect_stdouts = (redirect_stdouts or
  65. app.conf.CELERY_REDIRECT_STDOUTS)
  66. self.redirect_stdouts_level = (redirect_stdouts_level or
  67. app.conf.CELERY_REDIRECT_STDOUTS_LEVEL)
  68. self.pool = (pool or app.conf.CELERYD_POOL)
  69. self.db = db
  70. self.use_queues = queues or []
  71. self.queues = None
  72. self.include = include or []
  73. self.pidfile = pidfile
  74. self.autoscale = None
  75. if autoscale:
  76. max_c, _, min_c = partition(autoscale, ",")
  77. self.autoscale = [int(max_c), min_c and int(min_c) or 0]
  78. self._isatty = sys.stdout.isatty()
  79. self.colored = app.log.colored(self.logfile)
  80. if isinstance(self.use_queues, basestring):
  81. self.use_queues = self.use_queues.split(",")
  82. if isinstance(self.include, basestring):
  83. self.include = self.include.split(",")
  84. if not isinstance(self.loglevel, int):
  85. try:
  86. self.loglevel = LOG_LEVELS[self.loglevel.upper()]
  87. except KeyError:
  88. self.die("Unknown level %r. Please use one of %s." % (
  89. self.loglevel,
  90. "|".join(l for l in LOG_LEVELS.keys()
  91. if isinstance(l, basestring))))
  92. def run(self):
  93. self.init_loader()
  94. self.init_queues()
  95. self.worker_init()
  96. self.redirect_stdouts_to_logger()
  97. if getattr(os, "geteuid", None) and os.geteuid() == 0:
  98. warnings.warn(
  99. "Running celeryd with superuser privileges is not encouraged!")
  100. if self.discard:
  101. self.purge_messages()
  102. # Dump configuration to screen so we have some basic information
  103. # for when users sends bug reports.
  104. print(str(self.colored.cyan(" \n", self.startup_info())) +
  105. str(self.colored.reset(self.extra_info())))
  106. self.set_process_status("-active-")
  107. self.run_worker()
  108. def on_consumer_ready(self, consumer):
  109. signals.worker_ready.send(sender=consumer)
  110. print("celery@%s has started." % self.hostname)
  111. def init_queues(self):
  112. if self.use_queues:
  113. create_missing = self.app.conf.CELERY_CREATE_MISSING_QUEUES
  114. try:
  115. self.app.amqp.queues.select_subset(self.use_queues,
  116. create_missing)
  117. except KeyError, exc:
  118. raise ImproperlyConfigured(
  119. "Trying to select queue subset of %r, but queue %s"
  120. "is not defined in CELERY_QUEUES. If you want to "
  121. "automatically declare unknown queues you have to "
  122. "enable CELERY_CREATE_MISSING_QUEUES" % (
  123. self.use_queues, exc))
  124. self.queues = self.app.amqp.queues
  125. def init_loader(self):
  126. self.loader = self.app.loader
  127. self.settings = self.app.conf
  128. for module in self.include:
  129. self.loader.import_module(module)
  130. def redirect_stdouts_to_logger(self):
  131. handled = self.app.log.setup_logging_subsystem(loglevel=self.loglevel,
  132. logfile=self.logfile)
  133. if not handled:
  134. logger = self.app.log.get_default_logger()
  135. if self.redirect_stdouts:
  136. self.app.log.redirect_stdouts_to_logger(logger,
  137. loglevel=self.redirect_stdouts_level)
  138. def purge_messages(self):
  139. count = self.app.control.discard_all()
  140. what = (not count or count > 1) and "messages" or "message"
  141. print("discard: Erased %d %s from the queue.\n" % (count, what))
  142. def worker_init(self):
  143. # Run the worker init handler.
  144. # (Usually imports task modules and such.)
  145. self.loader.init_worker()
  146. def tasklist(self, include_builtins=True):
  147. from celery.registry import tasks
  148. tasklist = tasks.keys()
  149. if not include_builtins:
  150. tasklist = filter(lambda s: not s.startswith("celery."),
  151. tasklist)
  152. return "\n".join(" . %s" % task for task in sorted(tasklist))
  153. def extra_info(self):
  154. if self.loglevel <= logging.INFO:
  155. include_builtins = self.loglevel <= logging.DEBUG
  156. tasklist = self.tasklist(include_builtins=include_builtins)
  157. return EXTRA_INFO_FMT % {"tasks": tasklist}
  158. return ""
  159. def startup_info(self):
  160. concurrency = self.concurrency
  161. if self.autoscale:
  162. cmax, cmin = self.autoscale
  163. concurrency = "{min=%s, max=%s}" % (cmin, cmax)
  164. return BANNER % {
  165. "hostname": self.hostname,
  166. "version": __version__,
  167. "conninfo": self.app.broker_connection().as_uri(),
  168. "concurrency": concurrency,
  169. "loglevel": LOG_LEVELS[self.loglevel],
  170. "logfile": self.logfile or "[stderr]",
  171. "celerybeat": self.run_clockservice and "ON" or "OFF",
  172. "events": self.events and "ON" or "OFF",
  173. "loader": get_full_cls_name(self.loader.__class__),
  174. "queues": self.queues.format(indent=18, indent_first=False),
  175. }
  176. def run_worker(self):
  177. if self.pidfile:
  178. pidlock = platforms.create_pidlock(self.pidfile).acquire()
  179. atexit.register(pidlock.release)
  180. worker = self.WorkController(app=self.app,
  181. concurrency=self.concurrency,
  182. loglevel=self.loglevel,
  183. logfile=self.logfile,
  184. hostname=self.hostname,
  185. ready_callback=self.on_consumer_ready,
  186. embed_clockservice=self.run_clockservice,
  187. schedule_filename=self.schedule,
  188. scheduler_cls=self.scheduler_cls,
  189. send_events=self.events,
  190. db=self.db,
  191. queues=self.queues,
  192. max_tasks_per_child=self.max_tasks_per_child,
  193. task_time_limit=self.task_time_limit,
  194. task_soft_time_limit=self.task_soft_time_limit,
  195. autoscale=self.autoscale,
  196. pool_cls=self.pool)
  197. self.install_platform_tweaks(worker)
  198. worker.start()
  199. def install_platform_tweaks(self, worker):
  200. """Install platform specific tweaks and workarounds."""
  201. if self.app.IS_OSX:
  202. self.osx_proxy_detection_workaround()
  203. # Install signal handler so SIGHUP restarts the worker.
  204. if not self._isatty:
  205. # only install HUP handler if detached from terminal,
  206. # so closing the terminal window doesn't restart celeryd
  207. # into the background.
  208. if self.app.IS_OSX:
  209. # OS X can't exec from a process using threads.
  210. # See http://github.com/ask/celery/issues#issue/152
  211. install_HUP_not_supported_handler(worker)
  212. else:
  213. install_worker_restart_handler(worker)
  214. install_worker_term_handler(worker)
  215. install_worker_int_handler(worker)
  216. install_cry_handler(worker.logger)
  217. signals.worker_init.send(sender=worker)
  218. def osx_proxy_detection_workaround(self):
  219. """See http://github.com/ask/celery/issues#issue/161"""
  220. os.environ.setdefault("celery_dummy_proxy", "set_by_celeryd")
  221. def set_process_status(self, info):
  222. info = "%s (%s)" % (info, platforms.strargv(sys.argv))
  223. return platforms.set_mp_process_title("celeryd",
  224. info=info,
  225. hostname=self.hostname)
  226. def die(self, msg, exitcode=1):
  227. sys.stderr.write("Error: %s\n" % (msg, ))
  228. sys.exit(exitcode)
  229. def install_worker_int_handler(worker):
  230. def _stop(signum, frame):
  231. process_name = multiprocessing.current_process().name
  232. if process_name == "MainProcess":
  233. worker.logger.warn(
  234. "celeryd: Hitting Ctrl+C again will terminate "
  235. "all running tasks!")
  236. install_worker_int_again_handler(worker)
  237. worker.logger.warn("celeryd: Warm shutdown (%s)" % (
  238. process_name))
  239. worker.stop(in_sighandler=True)
  240. raise SystemExit()
  241. platforms.install_signal_handler("SIGINT", _stop)
  242. def install_worker_int_again_handler(worker):
  243. def _stop(signum, frame):
  244. process_name = multiprocessing.current_process().name
  245. if process_name == "MainProcess":
  246. worker.logger.warn("celeryd: Cold shutdown (%s)" % (
  247. process_name))
  248. worker.terminate(in_sighandler=True)
  249. raise SystemTerminate()
  250. platforms.install_signal_handler("SIGINT", _stop)
  251. def install_worker_term_handler(worker):
  252. def _stop(signum, frame):
  253. process_name = multiprocessing.current_process().name
  254. if process_name == "MainProcess":
  255. worker.logger.warn("celeryd: Warm shutdown (%s)" % (
  256. process_name))
  257. worker.stop(in_sighandler=True)
  258. raise SystemExit()
  259. platforms.install_signal_handler("SIGTERM", _stop)
  260. def install_worker_restart_handler(worker):
  261. def restart_worker_sig_handler(signum, frame):
  262. """Signal handler restarting the current python program."""
  263. worker.logger.warn("Restarting celeryd (%s)" % (
  264. " ".join(sys.argv)))
  265. worker.stop(in_sighandler=True)
  266. os.execv(sys.executable, [sys.executable] + sys.argv)
  267. platforms.install_signal_handler("SIGHUP", restart_worker_sig_handler)
  268. def install_cry_handler(logger):
  269. # 2.4 does not have sys._current_frames
  270. if sys.version_info > (2, 5):
  271. def cry_handler(signum, frame):
  272. """Signal handler logging the stacktrace of all active threads."""
  273. logger.error("\n" + cry())
  274. platforms.install_signal_handler("SIGUSR1", cry_handler)
  275. def install_HUP_not_supported_handler(worker):
  276. def warn_on_HUP_handler(signum, frame):
  277. worker.logger.error("SIGHUP not supported: "
  278. "Restarting with HUP is unstable on this platform!")
  279. platforms.install_signal_handler("SIGHUP", warn_on_HUP_handler)