worker.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280
  1. import atexit
  2. import logging
  3. import multiprocessing
  4. import os
  5. import socket
  6. import sys
  7. from celery import __version__
  8. from celery import platforms
  9. from celery import signals
  10. from celery.app import app_or_default
  11. from celery.exceptions import ImproperlyConfigured
  12. from celery.utils import get_full_cls_name, LOG_LEVELS
  13. from celery.worker import WorkController
  14. STARTUP_INFO_FMT = """
  15. Configuration ->
  16. . broker -> %(conninfo)s
  17. . queues ->
  18. %(queues)s
  19. . concurrency -> %(concurrency)s
  20. . loader -> %(loader)s
  21. . logfile -> %(logfile)s@%(loglevel)s
  22. . events -> %(events)s
  23. . beat -> %(celerybeat)s
  24. %(tasks)s
  25. """.strip()
  26. TASK_LIST_FMT = """ . tasks ->\n%s"""
  27. class Worker(object):
  28. WorkController = WorkController
  29. def __init__(self, concurrency=None, loglevel=None, logfile=None,
  30. hostname=None, discard=False, run_clockservice=False,
  31. schedule=None, task_time_limit=None, task_soft_time_limit=None,
  32. max_tasks_per_child=None, queues=None, events=False, db=None,
  33. include=None, app=None, pidfile=None, **kwargs):
  34. self.app = app = app_or_default(app)
  35. self.concurrency = (concurrency or
  36. app.conf.CELERYD_CONCURRENCY or
  37. multiprocessing.cpu_count())
  38. self.loglevel = loglevel or app.conf.CELERYD_LOG_LEVEL
  39. self.logfile = logfile or app.conf.CELERYD_LOG_FILE
  40. self.hostname = hostname or socket.gethostname()
  41. self.discard = discard
  42. self.run_clockservice = run_clockservice
  43. self.schedule = schedule or app.conf.CELERYBEAT_SCHEDULE_FILENAME
  44. self.events = events
  45. self.task_time_limit = (task_time_limit or
  46. app.conf.CELERYD_TASK_TIME_LIMIT)
  47. self.task_soft_time_limit = (task_soft_time_limit or
  48. app.conf.CELERYD_TASK_SOFT_TIME_LIMIT)
  49. self.max_tasks_per_child = (max_tasks_per_child or
  50. app.conf.CELERYD_MAX_TASKS_PER_CHILD)
  51. self.db = db
  52. self.use_queues = queues or []
  53. self.queues = None
  54. self.include = include or []
  55. self.pidfile = pidfile
  56. self._isatty = sys.stdout.isatty()
  57. if isinstance(self.use_queues, basestring):
  58. self.use_queues = self.use_queues.split(",")
  59. if isinstance(self.include, basestring):
  60. self.include = self.include.split(",")
  61. if not isinstance(self.loglevel, int):
  62. self.loglevel = LOG_LEVELS[self.loglevel.upper()]
  63. def run(self):
  64. self.init_loader()
  65. self.init_queues()
  66. self.worker_init()
  67. self.redirect_stdouts_to_logger()
  68. print("celery@%s v%s is starting." % (self.hostname, __version__))
  69. if getattr(os, "geteuid", None) and os.geteuid() == 0:
  70. warnings.warn(
  71. "Running celeryd with superuser privileges is not encouraged!")
  72. if self.discard:
  73. self.purge_messages()
  74. # Dump configuration to screen so we have some basic information
  75. # for when users sends bug reports.
  76. print(self.startup_info())
  77. self.set_process_status("Running...")
  78. self.run_worker()
  79. def on_listener_ready(self, listener):
  80. signals.worker_ready.send(sender=listener)
  81. print("celery@%s has started." % self.hostname)
  82. def init_queues(self):
  83. if self.use_queues:
  84. create_missing = self.app.conf.CELERY_CREATE_MISSING_QUEUES
  85. try:
  86. self.app.amqp.queues.select_subset(self.use_queues,
  87. create_missing)
  88. except KeyError, exc:
  89. raise ImproperlyConfigured(
  90. "Trying to select queue subset of %r, but queue %s"
  91. "is not defined in CELERY_QUEUES. If you want to "
  92. "automatically declare unknown queues you have to "
  93. "enable CELERY_CREATE_MISSING_QUEUES" % (
  94. self.use_queues, exc))
  95. self.queues = self.app.amqp.queues
  96. def init_loader(self):
  97. self.loader = self.app.loader
  98. self.settings = self.app.conf
  99. map(self.loader.import_module, self.include)
  100. def redirect_stdouts_to_logger(self):
  101. handled = self.app.log.setup_logging_subsystem(loglevel=self.loglevel,
  102. logfile=self.logfile)
  103. if not handled:
  104. logger = self.app.log.get_default_logger()
  105. self.app.log.redirect_stdouts_to_logger(logger,
  106. loglevel=logging.WARNING)
  107. def purge_messages(self):
  108. count = self.app.control.discard_all()
  109. what = (not count or count > 1) and "messages" or "message"
  110. print("discard: Erased %d %s from the queue.\n" % (count, what))
  111. def worker_init(self):
  112. # Run the worker init handler.
  113. # (Usually imports task modules and such.)
  114. self.loader.init_worker()
  115. def tasklist(self, include_builtins=True):
  116. from celery.registry import tasks
  117. tasklist = tasks.keys()
  118. if not include_builtins:
  119. tasklist = filter(lambda s: not s.startswith("celery."),
  120. tasklist)
  121. return TASK_LIST_FMT % "\n".join("\t. %s" % task
  122. for task in sorted(tasklist))
  123. def startup_info(self):
  124. tasklist = ""
  125. if self.loglevel <= logging.INFO:
  126. include_builtins = self.loglevel <= logging.DEBUG
  127. tasklist = self.tasklist(include_builtins=include_builtins)
  128. return STARTUP_INFO_FMT % {
  129. "conninfo": self.app.amqp.format_broker_info(),
  130. "queues": self.queues.format(indent=8),
  131. "concurrency": self.concurrency,
  132. "loglevel": LOG_LEVELS[self.loglevel],
  133. "logfile": self.logfile or "[stderr]",
  134. "celerybeat": self.run_clockservice and "ON" or "OFF",
  135. "events": self.events and "ON" or "OFF",
  136. "tasks": tasklist,
  137. "loader": get_full_cls_name(self.loader.__class__),
  138. }
  139. def run_worker(self):
  140. if self.pidfile:
  141. pidlock = platforms.create_pidlock(self.pidfile).acquire()
  142. atexit.register(pidlock.release)
  143. worker = self.WorkController(app=self.app,
  144. concurrency=self.concurrency,
  145. loglevel=self.loglevel,
  146. logfile=self.logfile,
  147. hostname=self.hostname,
  148. ready_callback=self.on_listener_ready,
  149. embed_clockservice=self.run_clockservice,
  150. schedule_filename=self.schedule,
  151. send_events=self.events,
  152. db=self.db,
  153. queues=self.queues,
  154. max_tasks_per_child=self.max_tasks_per_child,
  155. task_time_limit=self.task_time_limit,
  156. task_soft_time_limit=self.task_soft_time_limit)
  157. self.install_platform_tweaks(worker)
  158. worker.start()
  159. def install_platform_tweaks(self, worker):
  160. """Install platform specific tweaks and workarounds."""
  161. if self.app.IS_OSX:
  162. self.osx_proxy_detection_workaround()
  163. # Install signal handler so SIGHUP restarts the worker.
  164. if not self._isatty:
  165. # only install HUP handler if detached from terminal,
  166. # so closing the terminal window doesn't restart celeryd
  167. # into the background.
  168. if self.app.IS_OSX:
  169. # OS X can't exec from a process using threads.
  170. # See http://github.com/ask/celery/issues#issue/152
  171. install_HUP_not_supported_handler(worker)
  172. else:
  173. install_worker_restart_handler(worker)
  174. install_worker_term_handler(worker)
  175. install_worker_int_handler(worker)
  176. signals.worker_init.send(sender=worker)
  177. def osx_proxy_detection_workaround(self):
  178. """See http://github.com/ask/celery/issues#issue/161"""
  179. os.environ.setdefault("celery_dummy_proxy", "set_by_celeryd")
  180. def set_process_status(self, info):
  181. info = "%s (%s)" % (info, platforms.strargv(sys.argv))
  182. return platforms.set_mp_process_title("celeryd",
  183. info=info,
  184. hostname=self.hostname)
  185. def install_worker_int_handler(worker):
  186. def _stop(signum, frame):
  187. process_name = multiprocessing.current_process().name
  188. if process_name == "MainProcess":
  189. worker.logger.warn(
  190. "celeryd: Hitting Ctrl+C again will terminate "
  191. "all running tasks!")
  192. install_worker_int_again_handler(worker)
  193. worker.logger.warn("celeryd: Warm shutdown (%s)" % (
  194. process_name))
  195. worker.stop()
  196. raise SystemExit()
  197. platforms.install_signal_handler("SIGINT", _stop)
  198. def install_worker_int_again_handler(worker):
  199. def _stop(signum, frame):
  200. process_name = multiprocessing.current_process().name
  201. if process_name == "MainProcess":
  202. worker.logger.warn("celeryd: Cold shutdown (%s)" % (
  203. process_name))
  204. worker.terminate()
  205. raise SystemExit()
  206. platforms.install_signal_handler("SIGINT", _stop)
  207. def install_worker_term_handler(worker):
  208. def _stop(signum, frame):
  209. process_name = multiprocessing.current_process().name
  210. if process_name == "MainProcess":
  211. worker.logger.warn("celeryd: Warm shutdown (%s)" % (
  212. process_name))
  213. worker.stop()
  214. raise SystemExit()
  215. platforms.install_signal_handler("SIGTERM", _stop)
  216. def install_worker_restart_handler(worker):
  217. def restart_worker_sig_handler(signum, frame):
  218. """Signal handler restarting the current python program."""
  219. worker.logger.warn("Restarting celeryd (%s)" % (
  220. " ".join(sys.argv)))
  221. worker.stop()
  222. os.execv(sys.executable, [sys.executable] + sys.argv)
  223. platforms.install_signal_handler("SIGHUP", restart_worker_sig_handler)
  224. def install_HUP_not_supported_handler(worker):
  225. def warn_on_HUP_handler(signum, frame):
  226. worker.logger.error("SIGHUP not supported: "
  227. "Restarting with HUP is unstable on this platform!")
  228. platforms.install_signal_handler("SIGHUP", warn_on_HUP_handler)
  229. def run_worker(*args, **kwargs):
  230. return Worker(*args, **kwargs).run()