celeryd 3.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586
  1. #!/usr/bin/env python
  2. import os
  3. import sys
  4. sys.path.append(os.getcwd())
  5. django_project_dir = os.environ.get("DJANGO_PROJECT_DIR")
  6. if django_project_dir:
  7. sys.path.append(django_project_dir)
  8. from django.conf import settings
  9. from celery.platform import PIDFile, daemonize, remove_pidfile
  10. from celery.log import setup_logger, emergency_error
  11. from celery.conf import LOG_LEVELS, DAEMON_LOG_FILE, DAEMON_LOG_LEVEL
  12. from celery.conf import DAEMON_CONCURRENCY, DAEMON_PID_FILE
  13. from celery.conf import QUEUE_WAKEUP_AFTER
  14. from celery import discovery
  15. from celery.worker import TaskDaemon
  16. import traceback
  17. import optparse
  18. import atexit
  19. def main(concurrency=DAEMON_CONCURRENCY, daemon=False,
  20. loglevel=DAEMON_LOG_LEVEL, logfile=DAEMON_LOG_FILE,
  21. pidfile=DAEMON_PID_FILE, queue_wakeup_after=QUEUE_WAKEUP_AFTER):
  22. if settings.DATABASE_ENGINE == "sqlite3" and concurrency > 1:
  23. import warnings
  24. warnings.warn("The sqlite3 database engine doesn't support "
  25. "concurrency. We'll be using a single process only.",
  26. UserWarning)
  27. concurrency = 1
  28. if daemon:
  29. sys.stderr.write("Launching celeryd in the background...\n")
  30. pidfile_handler = PIDFile(pidfile)
  31. pidfile_handler.check()
  32. daemonize(pidfile=pidfile_handler)
  33. atexit.register(remove_pidfile, pidfile)
  34. else:
  35. logfile = None # log to stderr when not running as daemon.
  36. discovery.autodiscover()
  37. celeryd = TaskDaemon(concurrency=concurrency,
  38. loglevel=loglevel,
  39. logfile=logfile,
  40. queue_wakeup_after=queue_wakeup_after)
  41. try:
  42. celeryd.run()
  43. except Exception, e:
  44. emergency_error(logfile, "celeryd raised exception %s: %s\n%s" % (
  45. e.__class__, e, traceback.format_exc()))
  46. def parse_options(arguments):
  47. parser = optparse.OptionParser()
  48. parser.add_option('-c', '--concurrency', default=DAEMON_CONCURRENCY,
  49. action="store", dest="concurrency", type="int",
  50. help="Number of child processes processing the queue.")
  51. parser.add_option('-f', '--logfile', default=DAEMON_LOG_FILE,
  52. action="store", dest="logfile",
  53. help="Path to log file.")
  54. parser.add_option('-l', '--loglevel', default=DAEMON_LOG_LEVEL,
  55. action="store", dest="loglevel",
  56. help="Choose between DEBUG/INFO/WARNING/ERROR/CRITICAL/FATAL.")
  57. parser.add_option('-p', '--pidfile', default=DAEMON_PID_FILE,
  58. action="store", dest="pidfile",
  59. help="Path to PID file.")
  60. parser.add_option('-w', '--wakeup-after', default=QUEUE_WAKEUP_AFTER,
  61. action="store", dest="queue_wakeup_after",
  62. help="If the queue is empty, this is the time *in seconds* the "
  63. "daemon sleeps until it wakes up to check if there's any "
  64. "new messages on the queue.")
  65. parser.add_option('-d', '--daemon', default=False,
  66. action="store_true", dest="daemon",
  67. help="Run in background as a daemon.")
  68. options, values = parser.parse_args(arguments)
  69. if not isinstance(options.loglevel, int):
  70. options.loglevel = LOG_LEVELS[options.loglevel.upper()]
  71. return options
  72. if __name__ == "__main__":
  73. options = parse_options(sys.argv[1:])
  74. main(concurrency=options.concurrency,
  75. daemon=options.daemon,
  76. logfile=options.logfile,
  77. loglevel=options.loglevel,
  78. pidfile=options.pidfile,
  79. queue_wakeup_after=options.queue_wakeup_after)