celerybeat.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. #!/usr/bin/env python
  2. """celerybeat
  3. .. program:: celerybeat
  4. .. cmdoption:: -f, --logfile
  5. Path to log file. If no logfile is specified, ``stderr`` is used.
  6. .. cmdoption:: -l, --loglevel
  7. Logging level, choose between ``DEBUG``, ``INFO``, ``WARNING``,
  8. ``ERROR``, ``CRITICAL``, or ``FATAL``.
  9. .. cmdoption:: -p, --pidfile
  10. Path to pidfile.
  11. .. cmdoption:: -d, --detach, --daemon
  12. Run in the background as a daemon.
  13. .. cmdoption:: -u, --uid
  14. User-id to run ``celerybeat`` as when in daemon mode.
  15. .. cmdoption:: -g, --gid
  16. Group-id to run ``celerybeat`` as when in daemon mode.
  17. .. cmdoption:: --umask
  18. umask of the process when in daemon mode.
  19. .. cmdoption:: --workdir
  20. Directory to change to when in daemon mode.
  21. .. cmdoption:: --chroot
  22. Change root directory to this path when in daemon mode.
  23. """
  24. import os
  25. import sys
  26. import traceback
  27. import optparse
  28. from celery import conf
  29. from celery import platform
  30. from celery import __version__
  31. from celery.log import emergency_error
  32. from celery.beat import ClockService
  33. from celery.loaders import current_loader, settings
  34. from celery.messaging import get_connection_info
  35. STARTUP_INFO_FMT = """
  36. Configuration ->
  37. * Broker -> %(conninfo)s
  38. * Exchange -> %(exchange)s (%(exchange_type)s)
  39. * Consumer -> Queue:%(consumer_queue)s Binding:%(consumer_rkey)s
  40. """.strip()
  41. OPTION_LIST = (
  42. optparse.make_option('-f', '--logfile', default=conf.CELERYBEAT_LOG_FILE,
  43. action="store", dest="logfile",
  44. help="Path to log file."),
  45. optparse.make_option('-l', '--loglevel',
  46. default=conf.CELERYBEAT_LOG_LEVEL,
  47. action="store", dest="loglevel",
  48. help="Choose between DEBUG/INFO/WARNING/ERROR/CRITICAL/FATAL."),
  49. optparse.make_option('-p', '--pidfile',
  50. default=conf.CELERYBEAT_PID_FILE,
  51. action="store", dest="pidfile",
  52. help="Path to pidfile."),
  53. optparse.make_option('-d', '--detach', '--daemon', default=False,
  54. action="store_true", dest="detach",
  55. help="Run in the background as a daemon."),
  56. optparse.make_option('-u', '--uid', default=None,
  57. action="store", dest="uid",
  58. help="User-id to run celerybeat as when in daemon mode."),
  59. optparse.make_option('-g', '--gid', default=None,
  60. action="store", dest="gid",
  61. help="Group-id to run celerybeat as when in daemon mode."),
  62. optparse.make_option('--umask', default=0,
  63. action="store", type="int", dest="umask",
  64. help="umask of the process when in daemon mode."),
  65. optparse.make_option('--workdir', default=None,
  66. action="store", dest="working_directory",
  67. help="Directory to change to when in daemon mode."),
  68. optparse.make_option('--chroot', default=None,
  69. action="store", dest="chroot",
  70. help="Change root directory to this path when in daemon mode."),
  71. )
  72. def run_clockservice(detach=False, loglevel=conf.CELERYBEAT_LOG_LEVEL,
  73. logfile=conf.CELERYBEAT_LOG_FILE, pidfile=conf.CELERYBEAT_PID_FILE,
  74. umask=0, uid=None, gid=None, working_directory=None, chroot=None,
  75. **kwargs):
  76. """Starts the celerybeat clock server."""
  77. print("celerybeat %s is starting." % __version__)
  78. # Setup logging
  79. if not isinstance(loglevel, int):
  80. loglevel = conf.LOG_LEVELS[loglevel.upper()]
  81. if not detach:
  82. logfile = None # log to stderr when not running in the background.
  83. # Dump configuration to screen so we have some basic information
  84. # when users sends e-mails.
  85. print(STARTUP_INFO_FMT % {
  86. "conninfo": get_connection_info(),
  87. "exchange": conf.AMQP_EXCHANGE,
  88. "exchange_type": conf.AMQP_EXCHANGE_TYPE,
  89. "consumer_queue": conf.AMQP_CONSUMER_QUEUE,
  90. "consumer_rkey": conf.AMQP_CONSUMER_ROUTING_KEY,
  91. "publisher_rkey": conf.AMQP_PUBLISHER_ROUTING_KEY,
  92. "loglevel": loglevel,
  93. "pidfile": pidfile,
  94. })
  95. print("celerybeat has started.")
  96. from celery.log import setup_logger, redirect_stdouts_to_logger
  97. if detach:
  98. context = platform.create_daemon_context(logfile, pidfile,
  99. chroot_directory=chroot,
  100. working_directory=working_directory,
  101. umask=umask,
  102. uid=uid,
  103. gid=gid)
  104. context.open()
  105. logger = setup_logger(loglevel, logfile)
  106. redirect_stdouts_to_logger(logger, loglevel)
  107. # Run the worker init handler.
  108. # (Usually imports task modules and such.)
  109. current_loader.on_worker_init()
  110. def _run_clock():
  111. logger = setup_logger(loglevel, logfile)
  112. clockservice = ClockService(logger=logger, is_detached=detach)
  113. try:
  114. clockservice.start()
  115. except Exception, e:
  116. emergency_error(logfile,
  117. "celerybeat raised exception %s: %s\n%s" % (
  118. e.__class__, e, traceback.format_exc()))
  119. try:
  120. _run_clock()
  121. except:
  122. if detach:
  123. context.close()
  124. raise
  125. def parse_options(arguments):
  126. """Parse the available options to ``celeryd``."""
  127. parser = optparse.OptionParser(option_list=OPTION_LIST)
  128. options, values = parser.parse_args(arguments)
  129. return options
  130. if __name__ == "__main__":
  131. options = parse_options(sys.argv[1:])
  132. run_clockservice(**vars(options))