celerybeat.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. #!/usr/bin/env python
  2. """celerybeat
  3. .. program:: celerybeat
  4. .. cmdoption:: -f, --logfile
  5. Path to log file. If no logfile is specified, ``stderr`` is used.
  6. .. cmdoption:: -l, --loglevel
  7. Logging level, choose between ``DEBUG``, ``INFO``, ``WARNING``,
  8. ``ERROR``, ``CRITICAL``, or ``FATAL``.
  9. .. cmdoption:: -p, --pidfile
  10. Path to pidfile.
  11. .. cmdoption:: -d, --detach, --daemon
  12. Run in the background as a daemon.
  13. .. cmdoption:: -u, --uid
  14. User-id to run ``celerybeat`` as when in daemon mode.
  15. .. cmdoption:: -g, --gid
  16. Group-id to run ``celerybeat`` as when in daemon mode.
  17. .. cmdoption:: --umask
  18. umask of the process when in daemon mode.
  19. .. cmdoption:: --workdir
  20. Directory to change to when in daemon mode.
  21. .. cmdoption:: --chroot
  22. Change root directory to this path when in daemon mode.
  23. """
  24. import os
  25. import sys
  26. import traceback
  27. import optparse
  28. from celery import __version__
  29. from celery import conf
  30. from celery import platform
  31. from celery.log import emergency_error
  32. from celery.beat import ClockService
  33. from celery.loaders import current_loader, settings
  34. STARTUP_INFO_FMT = """
  35. Configuration ->
  36. * Broker -> amqp://%(vhost)s@%(host)s:%(port)s
  37. * Exchange -> %(exchange)s (%(exchange_type)s)
  38. * Consumer -> Queue:%(consumer_queue)s Routing:%(consumer_rkey)s
  39. """.strip()
  40. OPTION_LIST = (
  41. optparse.make_option('-f', '--logfile', default=conf.DAEMON_LOG_FILE,
  42. action="store", dest="logfile",
  43. help="Path to log file."),
  44. optparse.make_option('-l', '--loglevel', default=conf.DAEMON_LOG_LEVEL,
  45. action="store", dest="loglevel",
  46. help="Choose between DEBUG/INFO/WARNING/ERROR/CRITICAL/FATAL."),
  47. optparse.make_option('-p', '--pidfile', default=conf.DAEMON_PID_FILE,
  48. action="store", dest="pidfile",
  49. help="Path to pidfile."),
  50. optparse.make_option('-d', '--detach', '--daemon', default=False,
  51. action="store_true", dest="detach",
  52. help="Run in the background as a daemon."),
  53. optparse.make_option('-u', '--uid', default=None,
  54. action="store", dest="uid",
  55. help="User-id to run celerybeat as when in daemon mode."),
  56. optparse.make_option('-g', '--gid', default=None,
  57. action="store", dest="gid",
  58. help="Group-id to run celerybeat as when in daemon mode."),
  59. optparse.make_option('--umask', default=0,
  60. action="store", type="int", dest="umask",
  61. help="umask of the process when in daemon mode."),
  62. optparse.make_option('--workdir', default=None,
  63. action="store", dest="working_directory",
  64. help="Directory to change to when in daemon mode."),
  65. optparse.make_option('--chroot', default=None,
  66. action="store", dest="chroot",
  67. help="Change root directory to this path when in daemon mode."),
  68. )
  69. def run_clockserver(detach=False, loglevel=conf.DAEMON_LOG_LEVEL,
  70. logfile=conf.DAEMON_LOG_FILE, pidfile=conf.DAEMON_PID_FILE,
  71. umask=0, uid=None, gid=None, working_directory=None, chroot=None,
  72. **kwargs):
  73. """Starts the celerybeat clock server."""
  74. print("Celery Beat %s is starting." % __version__)
  75. # Setup logging
  76. if not isinstance(loglevel, int):
  77. loglevel = conf.LOG_LEVELS[loglevel.upper()]
  78. if not detach:
  79. logfile = None # log to stderr when not running in the background.
  80. # Dump configuration to screen so we have some basic information
  81. # when users sends e-mails.
  82. print(STARTUP_INFO_FMT % {
  83. "vhost": getattr(settings, "AMQP_VHOST", "(default)"),
  84. "host": getattr(settings, "AMQP_SERVER", "(default)"),
  85. "port": getattr(settings, "AMQP_PORT", "(default)"),
  86. "exchange": conf.AMQP_EXCHANGE,
  87. "exchange_type": conf.AMQP_EXCHANGE_TYPE,
  88. "consumer_queue": conf.AMQP_CONSUMER_QUEUE,
  89. "consumer_rkey": conf.AMQP_CONSUMER_ROUTING_KEY,
  90. "publisher_rkey": conf.AMQP_PUBLISHER_ROUTING_KEY,
  91. "loglevel": loglevel,
  92. "pidfile": pidfile,
  93. })
  94. print("Celery Beat has started.")
  95. if detach:
  96. from celery.log import setup_logger, redirect_stdouts_to_logger
  97. context = platform.create_daemon_context(logfile, pidfile,
  98. chroot_directory=chroot,
  99. working_directory=working_directory,
  100. umask=umask,
  101. uid=uid,
  102. gid=gid)
  103. context.open()
  104. logger = setup_logger(loglevel, logfile)
  105. redirect_stdouts_to_logger(logger, loglevel)
  106. # Run the worker init handler.
  107. # (Usually imports task modules and such.)
  108. current_loader.on_worker_init()
  109. def _run_clock():
  110. clockservice = ClockService(loglevel=loglevel,
  111. logfile=logfile,
  112. is_detached=detach)
  113. try:
  114. clockservice.start()
  115. except Exception, e:
  116. emergency_error(logfile,
  117. "celerybeat raised exception %s: %s\n%s" % (
  118. e.__class__, e, traceback.format_exc()))
  119. try:
  120. _run_clock()
  121. except:
  122. if detach:
  123. context.close()
  124. raise
  125. def parse_options(arguments):
  126. """Parse the available options to ``celeryd``."""
  127. parser = optparse.OptionParser(option_list=OPTION_LIST)
  128. options, values = parser.parse_args(arguments)
  129. return options
  130. if __name__ == "__main__":
  131. options = parse_options(sys.argv[1:])
  132. run_clockserver(**vars(options))