celerybeat.py 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
  1. #!/usr/bin/env python
  2. """celerybeat
  3. .. program:: celerybeat
  4. .. cmdoption:: -s, --schedule
  5. Path to the schedule database. Defaults to celerybeat-schedule.
  6. The extension ".db" will be appended to the filename.
  7. .. cmdoption:: -f, --logfile
  8. Path to log file. If no logfile is specified, ``stderr`` is used.
  9. .. cmdoption:: -l, --loglevel
  10. Logging level, choose between ``DEBUG``, ``INFO``, ``WARNING``,
  11. ``ERROR``, ``CRITICAL``, or ``FATAL``.
  12. .. cmdoption:: -p, --pidfile
  13. Path to pidfile.
  14. .. cmdoption:: -d, --detach, --daemon
  15. Run in the background as a daemon.
  16. .. cmdoption:: -u, --uid
  17. User-id to run ``celerybeat`` as when in daemon mode.
  18. .. cmdoption:: -g, --gid
  19. Group-id to run ``celerybeat`` as when in daemon mode.
  20. .. cmdoption:: --umask
  21. umask of the process when in daemon mode.
  22. .. cmdoption:: --workdir
  23. Directory to change to when in daemon mode.
  24. .. cmdoption:: --chroot
  25. Change root directory to this path when in daemon mode.
  26. """
  27. import os
  28. import sys
  29. import traceback
  30. import optparse
  31. from celery import conf
  32. from celery import platform
  33. from celery import __version__
  34. from celery.log import emergency_error
  35. from celery.beat import ClockService
  36. from celery.utils import noop
  37. from celery.loaders import current_loader, settings
  38. from celery.messaging import get_connection_info
  39. STARTUP_INFO_FMT = """
  40. Configuration ->
  41. . broker -> %(conninfo)s
  42. . schedule -> %(schedule)s
  43. . sys -> %(logfile)s@%(loglevel)s %(pidfile)s
  44. """.strip()
  45. OPTION_LIST = (
  46. optparse.make_option('-s', '--schedule',
  47. default=conf.CELERYBEAT_SCHEDULE_FILENAME,
  48. action="store", dest="schedule",
  49. help="Path to the schedule database. The extension \
  50. '.db' will be appended to the filename. Default: %s" % (
  51. conf.CELERYBEAT_SCHEDULE_FILENAME)),
  52. optparse.make_option('-f', '--logfile', default=conf.CELERYBEAT_LOG_FILE,
  53. action="store", dest="logfile",
  54. help="Path to log file."),
  55. optparse.make_option('-l', '--loglevel',
  56. default=conf.CELERYBEAT_LOG_LEVEL,
  57. action="store", dest="loglevel",
  58. help="Choose between DEBUG/INFO/WARNING/ERROR/CRITICAL/FATAL."),
  59. optparse.make_option('-p', '--pidfile',
  60. default=conf.CELERYBEAT_PID_FILE,
  61. action="store", dest="pidfile",
  62. help="Path to pidfile."),
  63. optparse.make_option('-d', '--detach', '--daemon', default=False,
  64. action="store_true", dest="detach",
  65. help="Run in the background as a daemon."),
  66. optparse.make_option('-u', '--uid', default=None,
  67. action="store", dest="uid",
  68. help="User-id to run celerybeat as when in daemon mode."),
  69. optparse.make_option('-g', '--gid', default=None,
  70. action="store", dest="gid",
  71. help="Group-id to run celerybeat as when in daemon mode."),
  72. optparse.make_option('--umask', default=0,
  73. action="store", type="int", dest="umask",
  74. help="umask of the process when in daemon mode."),
  75. optparse.make_option('--workdir', default=None,
  76. action="store", dest="working_directory",
  77. help="Directory to change to when in daemon mode."),
  78. optparse.make_option('--chroot', default=None,
  79. action="store", dest="chroot",
  80. help="Change root directory to this path when in daemon mode."),
  81. )
  82. def run_clockservice(detach=False, loglevel=conf.CELERYBEAT_LOG_LEVEL,
  83. logfile=conf.CELERYBEAT_LOG_FILE, pidfile=conf.CELERYBEAT_PID_FILE,
  84. umask=0, uid=None, gid=None, working_directory=None, chroot=None,
  85. schedule=conf.CELERYBEAT_SCHEDULE_FILENAME, **kwargs):
  86. """Starts the celerybeat clock server."""
  87. print("celerybeat %s is starting." % __version__)
  88. # Setup logging
  89. if not isinstance(loglevel, int):
  90. loglevel = conf.LOG_LEVELS[loglevel.upper()]
  91. if not detach:
  92. logfile = None # log to stderr when not running in the background.
  93. # Dump configuration to screen so we have some basic information
  94. # when users sends e-mails.
  95. from celery.messaging import format_routing_table
  96. print(STARTUP_INFO_FMT % {
  97. "conninfo": get_connection_info(),
  98. "logfile": logfile or "@stderr",
  99. "loglevel": conf.LOG_LEVELS[loglevel],
  100. "pidfile": detach and pidfile or "",
  101. "schedule": schedule,
  102. })
  103. print("celerybeat has started.")
  104. arg_start = "manage" in sys.argv[0] and 2 or 1
  105. platform.set_process_title("celerybeat",
  106. info=" ".join(sys.argv[arg_start:]))
  107. from celery.log import setup_logger, redirect_stdouts_to_logger
  108. on_stop = noop
  109. if detach:
  110. context, on_stop = platform.create_daemon_context(logfile, pidfile,
  111. chroot_directory=chroot,
  112. working_directory=working_directory,
  113. umask=umask)
  114. context.open()
  115. logger = setup_logger(loglevel, logfile)
  116. redirect_stdouts_to_logger(logger, loglevel)
  117. platform.set_effective_user(uid, gid)
  118. # Run the worker init handler.
  119. # (Usually imports task modules and such.)
  120. current_loader.on_worker_init()
  121. def _run_clock():
  122. logger = setup_logger(loglevel, logfile)
  123. clockservice = ClockService(logger=logger, is_detached=detach,
  124. schedule_filename=schedule)
  125. try:
  126. clockservice.start()
  127. except Exception, e:
  128. emergency_error(logfile,
  129. "celerybeat raised exception %s: %s\n%s" % (
  130. e.__class__, e, traceback.format_exc()))
  131. try:
  132. _run_clock()
  133. except:
  134. on_stop()
  135. raise
  136. def parse_options(arguments):
  137. """Parse the available options to ``celeryd``."""
  138. parser = optparse.OptionParser(option_list=OPTION_LIST)
  139. options, values = parser.parse_args(arguments)
  140. return options
  141. if __name__ == "__main__":
  142. options = parse_options(sys.argv[1:])
  143. run_clockservice(**vars(options))