Procházet zdrojové kódy

* celeryd: Added option --discard: Discard (delete!) all waiting messages in the queue.
* celeryd: The --wakeup-after option was not handled as a float. Fixed.

Ask Solem před 16 roky
rodič
revize
328c04e223
2 změnil soubory, kde provedl 24 přidání a 2 odebrání
  1. 23 2
      celery/bin/celeryd.py
  2. 1 0
      celery/management/commands/celeryd.py

+ 23 - 2
celery/bin/celeryd.py

@@ -30,6 +30,12 @@
 
     Run in the background as a daemon.
 
+.. cmdoption:: --discard
+
+    Discard all waiting tasks before the daemon is started.
+    **WARNING**: This is unrecoverable, and the tasks will be
+    deleted from the messaging server.
+
 """
 import os
 import sys
@@ -45,6 +51,7 @@ from celery.conf import LOG_LEVELS, DAEMON_LOG_FILE, DAEMON_LOG_LEVEL
 from celery.conf import DAEMON_CONCURRENCY, DAEMON_PID_FILE
 from celery.conf import QUEUE_WAKEUP_AFTER
 from celery import discovery
+from celery.task import discard_all
 from celery.worker import WorkController
 import traceback
 import optparse
@@ -52,7 +59,7 @@ import atexit
 
 
 def main(concurrency=DAEMON_CONCURRENCY, daemon=False,
-        loglevel=DAEMON_LOG_LEVEL, logfile=DAEMON_LOG_FILE,
+        loglevel=DAEMON_LOG_LEVEL, logfile=DAEMON_LOG_FILE, discard=False,
         pidfile=DAEMON_PID_FILE, queue_wakeup_after=QUEUE_WAKEUP_AFTER):
     """Run the celery daemon."""
     if settings.DATABASE_ENGINE == "sqlite3" and concurrency > 1:
@@ -61,6 +68,14 @@ def main(concurrency=DAEMON_CONCURRENCY, daemon=False,
                 "concurrency. We'll be using a single process only.",
                 UserWarning)
         concurrency = 1
+
+    if discard:
+        discarded_count = discard_all()
+        what = "message"
+        if discarded_count > 1:
+            what = "messages"
+        sys.stderr.write("Discard: Erased %d %s from the queue.\n" % (
+            discarded_count, what))
     if daemon:
         sys.stderr.write("Launching celeryd in the background...\n")
         pidfile_handler = PIDFile(pidfile)
@@ -87,6 +102,11 @@ OPTION_LIST = (
     optparse.make_option('-c', '--concurrency', default=DAEMON_CONCURRENCY,
             action="store", dest="concurrency", type="int",
             help="Number of child processes processing the queue."),
+    optparse.make_option('--discard', default=False,
+            action="store_true", dest="discard",
+            help="Discard all waiting tasks before the daemon is started. "
+                 "WARNING: This is unrecoverable, and the tasks will be "
+                 "deleted from the messaging server."),
     optparse.make_option('-f', '--logfile', default=DAEMON_LOG_FILE,
             action="store", dest="logfile",
             help="Path to log file."),
@@ -97,7 +117,7 @@ OPTION_LIST = (
             action="store", dest="pidfile",
             help="Path to pidfile."),
     optparse.make_option('-w', '--wakeup-after', default=QUEUE_WAKEUP_AFTER,
-            action="store", dest="queue_wakeup_after",
+            action="store", type="float", dest="queue_wakeup_after",
             help="If the queue is empty, this is the time *in seconds* the "
                  "daemon sleeps until it wakes up to check if there's any "
                  "new messages on the queue."),
@@ -122,4 +142,5 @@ if __name__ == "__main__":
          logfile=options.logfile,
          loglevel=options.loglevel,
          pidfile=options.pidfile,
+         discard=options.discard,
          queue_wakeup_after=options.queue_wakeup_after)

+ 1 - 0
celery/management/commands/celeryd.py

@@ -20,6 +20,7 @@ class Command(BaseCommand):
         main(concurrency=options.get('concurrency'),
              daemon=options.get('daemon'),
              logfile=options.get('logfile'),
+             discard=options.get('discard'),
              loglevel=options.get('loglevel'),
              pidfile=options.get('pidfile'),
              queue_wakeup_after=options.get('queue_wakeup_after'))