platforms.py 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362
  1. from __future__ import absolute_import
  2. import os
  3. import sys
  4. import errno
  5. import signal as _signal
  6. from celery.local import try_import
  7. _setproctitle = try_import("setproctitle")
  8. resource = try_import("resource")
  9. pwd = try_import("pwd")
  10. grp = try_import("grp")
  11. DAEMON_UMASK = 0
  12. DAEMON_WORKDIR = "/"
  13. DAEMON_REDIRECT_TO = getattr(os, "devnull", "/dev/null")
  14. class LockFailed(Exception):
  15. pass
  16. def get_fdmax(default=None):
  17. fdmax = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
  18. if fdmax == resource.RLIM_INFINITY:
  19. return default
  20. return fdmax
  21. class PIDFile(object):
  22. def __init__(self, path):
  23. self.path = os.path.abspath(path)
  24. def write_pid(self):
  25. open_flags = (os.O_CREAT | os.O_EXCL | os.O_WRONLY)
  26. open_mode = (((os.R_OK | os.W_OK) << 6) |
  27. ((os.R_OK) << 3) |
  28. ((os.R_OK)))
  29. pidfile_fd = os.open(self.path, open_flags, open_mode)
  30. pidfile = os.fdopen(pidfile_fd, "w")
  31. pid = os.getpid()
  32. pidfile.write("%d\n" % (pid, ))
  33. pidfile.close()
  34. def acquire(self):
  35. try:
  36. self.write_pid()
  37. except OSError, exc:
  38. raise LockFailed(str(exc))
  39. return self
  40. __enter__ = acquire
  41. def is_locked(self):
  42. return os.path.exists(self.path)
  43. def release(self, *args):
  44. self.remove()
  45. __exit__ = release
  46. def read_pid(self):
  47. try:
  48. fh = open(self.path, "r")
  49. except IOError, exc:
  50. if exc.errno == errno.ENOENT:
  51. return
  52. raise
  53. line = fh.readline().strip()
  54. fh.close()
  55. try:
  56. return int(line)
  57. except ValueError:
  58. raise ValueError("PID file %r contents invalid." % self.path)
  59. def remove(self):
  60. try:
  61. os.unlink(self.path)
  62. except OSError, exc:
  63. if exc.errno in (errno.ENOENT, errno.EACCES):
  64. return
  65. raise
  66. def remove_if_stale(self):
  67. try:
  68. pid = self.read_pid()
  69. except ValueError, exc:
  70. sys.stderr.write("Broken pidfile found. Removing it.\n")
  71. self.remove()
  72. return True
  73. if not pid:
  74. self.remove()
  75. return True
  76. try:
  77. os.kill(pid, 0)
  78. except os.error, exc:
  79. if exc.errno == errno.ESRCH:
  80. sys.stderr.write("Stale pidfile exists. Removing it.\n")
  81. self.remove()
  82. return True
  83. return False
  84. def create_pidlock(pidfile):
  85. """Create and verify pidfile.
  86. If the pidfile already exists the program exits with an error message,
  87. however if the process it refers to is not running anymore, the pidfile
  88. is just deleted.
  89. """
  90. pidlock = PIDFile(pidfile)
  91. if pidlock.is_locked() and not pidlock.remove_if_stale():
  92. raise SystemExit(
  93. "ERROR: Pidfile (%s) already exists.\n"
  94. "Seems we're already running? (PID: %s)" % (
  95. pidfile, pidlock.read_pid()))
  96. return pidlock
  97. class DaemonContext(object):
  98. _is_open = False
  99. def __init__(self, pidfile=None, workdir=DAEMON_WORKDIR,
  100. umask=DAEMON_UMASK, **kwargs):
  101. self.workdir = workdir
  102. self.umask = umask
  103. def open(self):
  104. if not self._is_open:
  105. self._detach()
  106. os.chdir(self.workdir)
  107. os.umask(self.umask)
  108. for fd in reversed(range(get_fdmax(default=2048))):
  109. try:
  110. os.close(fd)
  111. except OSError, exc:
  112. if exc.errno != errno.EBADF:
  113. raise
  114. os.open(DAEMON_REDIRECT_TO, os.O_RDWR)
  115. os.dup2(0, 1)
  116. os.dup2(0, 2)
  117. self._is_open = True
  118. __enter__ = open
  119. def close(self, *args):
  120. if self._is_open:
  121. self._is_open = False
  122. __exit__ = close
  123. def _detach(self):
  124. if os.fork() == 0: # first child
  125. os.setsid() # create new session
  126. if os.fork() > 0: # second child
  127. os._exit(0)
  128. else:
  129. os._exit(0)
  130. return self
  131. def detached(logfile=None, pidfile=None, uid=None, gid=None, umask=0,
  132. workdir=None, **opts):
  133. if not resource:
  134. raise RuntimeError("This platform does not support detach.")
  135. workdir = os.getcwd() if workdir is None else workdir
  136. signals.reset("SIGCLD") # Make sure SIGCLD is using the default handler.
  137. set_effective_user(uid=uid, gid=gid)
  138. # Since without stderr any errors will be silently suppressed,
  139. # we need to know that we have access to the logfile.
  140. logfile and open(logfile, "a").close()
  141. # Doesn't actually create the pidfile, but makes sure it's not stale.
  142. pidfile and create_pidlock(pidfile)
  143. return DaemonContext(umask=umask, workdir=workdir)
  144. def parse_uid(uid):
  145. """Parse user id.
  146. uid can be an interger (uid) or a string (username), if a username
  147. the uid is taken from the password file.
  148. """
  149. try:
  150. return int(uid)
  151. except ValueError:
  152. if pwd:
  153. try:
  154. return pwd.getpwnam(uid).pw_uid
  155. except KeyError:
  156. raise KeyError("User does not exist: %r" % (uid, ))
  157. raise
  158. def parse_gid(gid):
  159. """Parse group id.
  160. gid can be an integer (gid) or a string (group name), if a group name
  161. the gid is taken from the password file.
  162. """
  163. try:
  164. return int(gid)
  165. except ValueError:
  166. if grp:
  167. try:
  168. return grp.getgrnam(gid).gr_gid
  169. except KeyError:
  170. raise KeyError("Group does not exist: %r" % (gid, ))
  171. raise
  172. def setegid(gid):
  173. """Set effective group id."""
  174. gid = parse_gid(gid)
  175. if gid != os.getgid():
  176. os.setegid(gid)
  177. def seteuid(uid):
  178. """Set effective user id."""
  179. uid = parse_uid(uid)
  180. if uid != os.getuid():
  181. os.seteuid(uid)
  182. def set_effective_user(uid=None, gid=None):
  183. """Change process privileges to new user/group.
  184. If uid and gid is set the effective user/group is set.
  185. If only uid is set, the effective uer is set, and the group is
  186. set to the users primary group.
  187. If only gid is set, the effective group is set.
  188. """
  189. uid = uid and parse_uid(uid)
  190. gid = gid and parse_gid(gid)
  191. if uid:
  192. # If gid isn't defined, get the primary gid of the uer.
  193. if not gid and pwd:
  194. gid = pwd.getpwuid(uid).pw_gid
  195. setegid(gid)
  196. seteuid(uid)
  197. else:
  198. gid and setegid(gid)
  199. class Signals(object):
  200. ignored = _signal.SIG_IGN
  201. default = _signal.SIG_DFL
  202. def supported(self, signal_name):
  203. """Returns true value if ``signal_name`` exists on this platform."""
  204. try:
  205. return self.signum(signal_name)
  206. except AttributeError:
  207. pass
  208. def signum(self, signal_name):
  209. """Get signal number from signal name."""
  210. if isinstance(signal_name, int):
  211. return signal_name
  212. if not isinstance(signal_name, basestring) or not signal_name.isupper():
  213. raise TypeError("signal name must be uppercase string.")
  214. if not signal_name.startswith("SIG"):
  215. signal_name = "SIG" + signal_name
  216. return getattr(_signal, signal_name)
  217. def reset(self, *signal_names):
  218. """Reset signals to the default signal handler.
  219. Does nothing if the platform doesn't support signals,
  220. or the specified signal in particular.
  221. """
  222. self.update((sig, self.default) for sig in signal_names)
  223. def ignore(self, *signal_names):
  224. """Ignore signal using :const:`SIG_IGN`.
  225. Does nothing if the platform doesn't support signals,
  226. or the specified signal in particular.
  227. """
  228. self.update((sig, self.ignored) for sig in signal_names)
  229. def __getitem__(self, signal_name):
  230. return _signal.getsignal(self.signum(signal_name))
  231. def __setitem__(self, signal_name, handler):
  232. """Install signal handler.
  233. Does nothing if the current platform doesn't support signals,
  234. or the specified signal in particular.
  235. """
  236. try:
  237. _signal.signal(self.signum(signal_name), handler)
  238. except (AttributeError, ValueError):
  239. pass
  240. def update(self, _d_=None, **sigmap):
  241. """Set signal handlers from a mapping."""
  242. for signal_name, handler in dict(_d_ or {}, **sigmap).iteritems():
  243. self[signal_name] = handler
  244. signals = Signals()
  245. get_signal = signals.signum # compat
  246. install_signal_handler = signals.__setitem__ # compat
  247. reset_signal = signals.reset # compat
  248. ignore_signal = signals.ignore # compat
  249. def strargv(argv):
  250. arg_start = 2 if "manage" in argv[0] else 1
  251. if len(argv) > arg_start:
  252. return " ".join(argv[arg_start:])
  253. return ""
  254. def set_process_title(progname, info=None):
  255. """Set the ps name for the currently running process.
  256. Only works if :mod:`setproctitle` is installed.
  257. """
  258. proctitle = "[%s]" % progname
  259. proctitle = "%s %s" % (proctitle, info) if info else proctitle
  260. if _setproctitle:
  261. _setproctitle.setproctitle(proctitle)
  262. return proctitle
  263. def set_mp_process_title(progname, info=None, hostname=None):
  264. """Set the ps name using the multiprocessing process name.
  265. Only works if :mod:`setproctitle` is installed.
  266. """
  267. if hostname:
  268. progname = "%s@%s" % (progname, hostname.split(".")[0])
  269. try:
  270. from multiprocessing.process import current_process
  271. except ImportError:
  272. return set_process_title(progname, info=info)
  273. else:
  274. return set_process_title("%s:%s" % (progname,
  275. current_process().name), info=info)