worker.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314
  1. import atexit
  2. import logging
  3. import multiprocessing
  4. import os
  5. import socket
  6. import sys
  7. import warnings
  8. from kombu.utils import partition
  9. from celery import __version__
  10. from celery import platforms
  11. from celery import signals
  12. from celery.app import app_or_default
  13. from celery.exceptions import ImproperlyConfigured
  14. from celery.utils import get_full_cls_name, LOG_LEVELS, isatty
  15. from celery.utils import term
  16. from celery.worker import WorkController
  17. STARTUP_INFO_FMT = """
  18. Configuration ->
  19. . broker -> %(conninfo)s
  20. . queues ->
  21. %(queues)s
  22. . concurrency -> %(concurrency)s
  23. . loader -> %(loader)s
  24. . logfile -> %(logfile)s@%(loglevel)s
  25. . events -> %(events)s
  26. . beat -> %(celerybeat)s
  27. %(tasks)s
  28. """.strip()
  29. TASK_LIST_FMT = """ . tasks ->\n%s"""
  30. class Worker(object):
  31. WorkController = WorkController
  32. def __init__(self, concurrency=None, loglevel=None, logfile=None,
  33. hostname=None, discard=False, run_clockservice=False,
  34. schedule=None, task_time_limit=None, task_soft_time_limit=None,
  35. max_tasks_per_child=None, queues=None, events=False, db=None,
  36. include=None, app=None, pidfile=None,
  37. redirect_stdouts=None, redirect_stdouts_level=None,
  38. autoscale=None, scheduler_cls=None, **kwargs):
  39. self.app = app = app_or_default(app)
  40. self.concurrency = (concurrency or
  41. app.conf.CELERYD_CONCURRENCY or
  42. multiprocessing.cpu_count())
  43. self.loglevel = loglevel or app.conf.CELERYD_LOG_LEVEL
  44. self.logfile = logfile or app.conf.CELERYD_LOG_FILE
  45. app.conf.CELERYD_LOG_COLOR = not self.logfile and isatty(sys.stderr)
  46. self.hostname = hostname or socket.gethostname()
  47. self.discard = discard
  48. self.run_clockservice = run_clockservice
  49. self.schedule = schedule or app.conf.CELERYBEAT_SCHEDULE_FILENAME
  50. self.scheduler_cls = scheduler_cls or app.conf.CELERYBEAT_SCHEDULER
  51. self.events = events
  52. self.task_time_limit = (task_time_limit or
  53. app.conf.CELERYD_TASK_TIME_LIMIT)
  54. self.task_soft_time_limit = (task_soft_time_limit or
  55. app.conf.CELERYD_TASK_SOFT_TIME_LIMIT)
  56. self.max_tasks_per_child = (max_tasks_per_child or
  57. app.conf.CELERYD_MAX_TASKS_PER_CHILD)
  58. self.redirect_stdouts = (redirect_stdouts or
  59. app.conf.CELERY_REDIRECT_STDOUTS)
  60. self.redirect_stdouts_level = (redirect_stdouts_level or
  61. app.conf.CELERY_REDIRECT_STDOUTS_LEVEL)
  62. self.db = db
  63. self.use_queues = queues or []
  64. self.queues = None
  65. self.include = include or []
  66. self.pidfile = pidfile
  67. self.autoscale = None
  68. if autoscale:
  69. max_c, _, min_c = partition(autoscale, ",")
  70. self.autoscale = [int(max_c), min_c and int(min_c) or 0]
  71. self._isatty = sys.stdout.isatty()
  72. self.colored = term.colored(enabled=app.conf.CELERYD_LOG_COLOR)
  73. if isinstance(self.use_queues, basestring):
  74. self.use_queues = self.use_queues.split(",")
  75. if isinstance(self.include, basestring):
  76. self.include = self.include.split(",")
  77. if not isinstance(self.loglevel, int):
  78. try:
  79. self.loglevel = LOG_LEVELS[self.loglevel.upper()]
  80. except KeyError:
  81. self.die("Unknown level %r. Please use one of %s." % (
  82. self.loglevel,
  83. "|".join(l for l in LOG_LEVELS.keys()
  84. if isinstance(l, basestring))))
  85. def run(self):
  86. self.init_loader()
  87. self.init_queues()
  88. self.worker_init()
  89. self.redirect_stdouts_to_logger()
  90. print(str(self.colored.cyan(
  91. "celery@%s v%s is starting." % (self.hostname, __version__))))
  92. if getattr(os, "geteuid", None) and os.geteuid() == 0:
  93. warnings.warn(
  94. "Running celeryd with superuser privileges is not encouraged!")
  95. if self.discard:
  96. self.purge_messages()
  97. # Dump configuration to screen so we have some basic information
  98. # for when users sends bug reports.
  99. print(str(self.colored.reset(" \n", self.startup_info())))
  100. self.set_process_status("Running...")
  101. self.run_worker()
  102. def on_consumer_ready(self, consumer):
  103. signals.worker_ready.send(sender=consumer)
  104. print("celery@%s has started." % self.hostname)
  105. def init_queues(self):
  106. if self.use_queues:
  107. create_missing = self.app.conf.CELERY_CREATE_MISSING_QUEUES
  108. try:
  109. self.app.amqp.queues.select_subset(self.use_queues,
  110. create_missing)
  111. except KeyError, exc:
  112. raise ImproperlyConfigured(
  113. "Trying to select queue subset of %r, but queue %s"
  114. "is not defined in CELERY_QUEUES. If you want to "
  115. "automatically declare unknown queues you have to "
  116. "enable CELERY_CREATE_MISSING_QUEUES" % (
  117. self.use_queues, exc))
  118. self.queues = self.app.amqp.queues
  119. def init_loader(self):
  120. self.loader = self.app.loader
  121. self.settings = self.app.conf
  122. for module in self.include:
  123. self.loader.import_module(module)
  124. def redirect_stdouts_to_logger(self):
  125. handled = self.app.log.setup_logging_subsystem(loglevel=self.loglevel,
  126. logfile=self.logfile)
  127. if not handled:
  128. logger = self.app.log.get_default_logger()
  129. if self.redirect_stdouts:
  130. self.app.log.redirect_stdouts_to_logger(logger,
  131. loglevel=self.redirect_stdouts_level)
  132. def purge_messages(self):
  133. count = self.app.control.discard_all()
  134. what = (not count or count > 1) and "messages" or "message"
  135. print("discard: Erased %d %s from the queue.\n" % (count, what))
  136. def worker_init(self):
  137. # Run the worker init handler.
  138. # (Usually imports task modules and such.)
  139. self.loader.init_worker()
  140. def tasklist(self, include_builtins=True):
  141. from celery.registry import tasks
  142. tasklist = tasks.keys()
  143. if not include_builtins:
  144. tasklist = filter(lambda s: not s.startswith("celery."),
  145. tasklist)
  146. return TASK_LIST_FMT % "\n".join("\t. %s" % task
  147. for task in sorted(tasklist))
  148. def startup_info(self):
  149. tasklist = ""
  150. if self.loglevel <= logging.INFO:
  151. include_builtins = self.loglevel <= logging.DEBUG
  152. tasklist = self.tasklist(include_builtins=include_builtins)
  153. return STARTUP_INFO_FMT % {
  154. "conninfo": self.app.amqp.format_broker_info(),
  155. "queues": self.queues.format(indent=8),
  156. "concurrency": self.concurrency,
  157. "loglevel": LOG_LEVELS[self.loglevel],
  158. "logfile": self.logfile or "[stderr]",
  159. "celerybeat": self.run_clockservice and "ON" or "OFF",
  160. "events": self.events and "ON" or "OFF",
  161. "tasks": tasklist,
  162. "loader": get_full_cls_name(self.loader.__class__),
  163. }
  164. def run_worker(self):
  165. if self.pidfile:
  166. pidlock = platforms.create_pidlock(self.pidfile).acquire()
  167. atexit.register(pidlock.release)
  168. worker = self.WorkController(app=self.app,
  169. concurrency=self.concurrency,
  170. loglevel=self.loglevel,
  171. logfile=self.logfile,
  172. hostname=self.hostname,
  173. ready_callback=self.on_consumer_ready,
  174. embed_clockservice=self.run_clockservice,
  175. schedule_filename=self.schedule,
  176. scheduler_cls=self.scheduler_cls,
  177. send_events=self.events,
  178. db=self.db,
  179. queues=self.queues,
  180. max_tasks_per_child=self.max_tasks_per_child,
  181. task_time_limit=self.task_time_limit,
  182. task_soft_time_limit=self.task_soft_time_limit,
  183. autoscale=self.autoscale)
  184. self.install_platform_tweaks(worker)
  185. worker.start()
  186. def install_platform_tweaks(self, worker):
  187. """Install platform specific tweaks and workarounds."""
  188. if self.app.IS_OSX:
  189. self.osx_proxy_detection_workaround()
  190. # Install signal handler so SIGHUP restarts the worker.
  191. if not self._isatty:
  192. # only install HUP handler if detached from terminal,
  193. # so closing the terminal window doesn't restart celeryd
  194. # into the background.
  195. if self.app.IS_OSX:
  196. # OS X can't exec from a process using threads.
  197. # See http://github.com/ask/celery/issues#issue/152
  198. install_HUP_not_supported_handler(worker)
  199. else:
  200. install_worker_restart_handler(worker)
  201. install_worker_term_handler(worker)
  202. install_worker_int_handler(worker)
  203. signals.worker_init.send(sender=worker)
  204. def osx_proxy_detection_workaround(self):
  205. """See http://github.com/ask/celery/issues#issue/161"""
  206. os.environ.setdefault("celery_dummy_proxy", "set_by_celeryd")
  207. def set_process_status(self, info):
  208. info = "%s (%s)" % (info, platforms.strargv(sys.argv))
  209. return platforms.set_mp_process_title("celeryd",
  210. info=info,
  211. hostname=self.hostname)
  212. def die(self, msg, exitcode=1):
  213. sys.stderr.write("Error: %s\n" % (msg, ))
  214. sys.exit(exitcode)
  215. def install_worker_int_handler(worker):
  216. def _stop(signum, frame):
  217. process_name = multiprocessing.current_process().name
  218. if process_name == "MainProcess":
  219. worker.logger.warn(
  220. "celeryd: Hitting Ctrl+C again will terminate "
  221. "all running tasks!")
  222. install_worker_int_again_handler(worker)
  223. worker.logger.warn("celeryd: Warm shutdown (%s)" % (
  224. process_name))
  225. worker.stop()
  226. raise SystemExit()
  227. platforms.install_signal_handler("SIGINT", _stop)
  228. def install_worker_int_again_handler(worker):
  229. def _stop(signum, frame):
  230. process_name = multiprocessing.current_process().name
  231. if process_name == "MainProcess":
  232. worker.logger.warn("celeryd: Cold shutdown (%s)" % (
  233. process_name))
  234. worker.terminate()
  235. raise SystemExit()
  236. platforms.install_signal_handler("SIGINT", _stop)
  237. def install_worker_term_handler(worker):
  238. def _stop(signum, frame):
  239. process_name = multiprocessing.current_process().name
  240. if process_name == "MainProcess":
  241. worker.logger.warn("celeryd: Warm shutdown (%s)" % (
  242. process_name))
  243. worker.stop()
  244. raise SystemExit()
  245. platforms.install_signal_handler("SIGTERM", _stop)
  246. def install_worker_restart_handler(worker):
  247. def restart_worker_sig_handler(signum, frame):
  248. """Signal handler restarting the current python program."""
  249. worker.logger.warn("Restarting celeryd (%s)" % (
  250. " ".join(sys.argv)))
  251. worker.stop()
  252. os.execv(sys.executable, [sys.executable] + sys.argv)
  253. platforms.install_signal_handler("SIGHUP", restart_worker_sig_handler)
  254. def install_HUP_not_supported_handler(worker):
  255. def warn_on_HUP_handler(signum, frame):
  256. worker.logger.error("SIGHUP not supported: "
  257. "Restarting with HUP is unstable on this platform!")
  258. platforms.install_signal_handler("SIGHUP", warn_on_HUP_handler)
  259. def run_worker(*args, **kwargs):
  260. return Worker(*args, **kwargs).run()