Kaynağa Gözat

Have a single entrypoint for revoking tasks. Also send task-revoked event.

Ask Solem 15 yıl önce
ebeveyn
işleme
6b7262b1c3

+ 82 - 44
celery/bin/celeryev.py

@@ -3,11 +3,13 @@ import time
 import curses
 import atexit
 import socket
+import optparse
 import threading
 
 from datetime import datetime
 from itertools import count
 
+from celery.task import control
 from celery.events import EventReceiver
 from celery.events.state import State
 from celery.messaging import establish_connection
@@ -17,6 +19,12 @@ TASK_NAMES = LocalCache(0xFFF)
 HUMAN_TYPES = {"worker-offline": "shutdown",
                "worker-online": "started",
                "worker-heartbeat": "heartbeat"}
+OPTION_LIST = (
+    optparse.make_option('-d', '--DUMP',
+        action="store_true", dest="dump",
+        help="Dump events to stdout."),
+)
+
 
 
 def humanize_type(type):
@@ -26,43 +34,35 @@ def humanize_type(type):
         return type.lower().replace("-", " ")
 
 
-def dump_event(event):
-    timestamp = datetime.fromtimestamp(event.pop("timestamp"))
-    type = event.pop("type").lower()
-    hostname = event.pop("hostname")
-    if type.startswith("task-"):
-        uuid = event.pop("uuid")
-        if type.startswith("task-received"):
-            task = TASK_NAMES[uuid] = "%s(%s) args=%s kwargs=%s" % (
-                    event.pop("name"), uuid,
-                    event.pop("args"),
-                    event.pop("kwargs"))
-        else:
-            task = TASK_NAMES.get(uuid, "")
-        return format_task_event(hostname, timestamp, type, task, event)
-    fields = ", ".join("%s=%s" % (key, event[key])
-                    for key in sorted(event.keys()))
-    sep = fields and ":" or ""
-    print("%s [%s] %s%s %s" % (hostname, timestamp,
-                                humanize_type(type), sep, fields))
-
-
-def format_task_event(hostname, timestamp, type, task, event):
-    fields = ", ".join("%s=%s" % (key, event[key])
-                    for key in sorted(event.keys()))
-    sep = fields and ":" or ""
-    print("%s [%s] %s%s %s %s" % (hostname, timestamp,
-                                humanize_type(type), sep, task, fields))
-
-def eventdump():
-    sys.stderr.write("-> celeryev: starting capture...\n")
-    conn = establish_connection()
-    recv = EventReceiver(conn, handlers={"*": dump_event})
-    try:
-        recv.capture()
-    except (KeyboardInterrupt, SystemExit):
-        conn and conn.close()
+class Dumper(object):
 
+    def on_event(self, event):
+        timestamp = datetime.fromtimestamp(event.pop("timestamp"))
+        type = event.pop("type").lower()
+        hostname = event.pop("hostname")
+        if type.startswith("task-"):
+            uuid = event.pop("uuid")
+            if type.startswith("task-received"):
+                task = TASK_NAMES[uuid] = "%s(%s) args=%s kwargs=%s" % (
+                        event.pop("name"), uuid,
+                        event.pop("args"),
+                        event.pop("kwargs"))
+            else:
+                task = TASK_NAMES.get(uuid, "")
+            return self.format_task_event(hostname, timestamp,
+                                          type, task, event)
+        fields = ", ".join("%s=%s" % (key, event[key])
+                        for key in sorted(event.keys()))
+        sep = fields and ":" or ""
+        print("%s [%s] %s%s %s" % (hostname, timestamp,
+                                    humanize_type(type), sep, fields))
+
+    def format_task_event(self, hostname, timestamp, type, task, event):
+        fields = ", ".join("%s=%s" % (key, event[key])
+                        for key in sorted(event.keys()))
+        sep = fields and ":" or ""
+        print("%s [%s] %s%s %s %s" % (hostname, timestamp,
+                                    humanize_type(type), sep, task, fields))
 
 
 def abbr(S, max, dots=True):
@@ -92,6 +92,8 @@ class CursesMonitor(object):
     foreground = curses.COLOR_BLACK
     background = curses.COLOR_WHITE
     online_str = "Workers online: "
+    help = ("Keys: j, k: Move selection up/down. "
+            "r: revoke selected task. q: quit")
 
     def __init__(self, state):
         self.state = state
@@ -133,9 +135,14 @@ class CursesMonitor(object):
             self.move_selection()
         elif key in ("KEY_UP", "K"):
             self.move_selection(reverse=True)
+        elif key in ("R", ):
+            self.revoke_selection()
         elif key in ("Q", ):
             raise KeyboardInterrupt
 
+    def revoke_selection(self):
+        control.revoke(self.selected_task)
+
     def draw(self):
         self.handle_keypress()
         win = self.win
@@ -159,7 +166,7 @@ class CursesMonitor(object):
                     task.visited = time.time()
 
         if self.selected_task:
-            win.addstr(my - 3, x, self.selected_str, curses.A_BOLD)
+            win.addstr(my - 4, x, self.selected_str, curses.A_BOLD)
             info = "Missing extended info"
             try:
                 selection = self.state.tasks[self.selected_task]
@@ -171,16 +178,17 @@ class CursesMonitor(object):
                     info["runtime"] = "%.2fs" % info["runtime"]
                 info = " ".join("%s=%s" % (key, value)
                             for key, value in info.items())
-            win.addstr(my - 3, x + len(self.selected_str), info)
-
+            win.addstr(my - 4, x + len(self.selected_str), info)
         else:
-            win.addstr(my - 3, x, "No task selected", curses.A_NORMAL)
+            win.addstr(my - 4, x, "No task selected", curses.A_NORMAL)
+
         if self.workers:
-            win.addstr(my - 2, x, self.online_str, curses.A_BOLD)
-            win.addstr(my - 2, x + len(self.online_str),
+            win.addstr(my - 3, x, self.online_str, curses.A_BOLD)
+            win.addstr(my - 3, x + len(self.online_str),
                     ", ".join(self.workers), curses.A_NORMAL)
         else:
-            win.addstr(my - 2, x, "No workers discovered.")
+            win.addstr(my - 3, x, "No workers discovered.")
+        win.addstr(my - 2, x, self.help)
         win.refresh()
 
     def setupscreen(self):
@@ -218,7 +226,8 @@ class DisplayThread(threading.Thread):
         while not self.shutdown:
             self.display.draw()
 
-def main():
+
+def eventtop():
     sys.stderr.write("-> celeryev: starting capture...\n")
     state = State()
     display = CursesMonitor(state)
@@ -247,6 +256,35 @@ def main():
         display.resetscreen()
 
 
+def eventdump():
+    sys.stderr.write("-> celeryev: starting capture...\n")
+    dumper = Dumper()
+    conn = establish_connection()
+    recv = EventReceiver(conn, handlers={"*": dumper.on_event})
+    try:
+        recv.capture()
+    except (KeyboardInterrupt, SystemExit):
+        conn and conn.close()
+
+
+def run_celeryev(dump=False):
+    if dump:
+        return eventdump()
+    return eventtop()
+
+
+def parse_options(arguments):
+    """Parse the available options to ``celeryev``."""
+    parser = optparse.OptionParser(option_list=OPTION_LIST)
+    options, values = parser.parse_args(arguments)
+    return options
+
+
+def main():
+    options = parse_options(sys.argv[1:])
+    return run_celeryev(**vars(options))
+
+
 
 
 

+ 1 - 1
celery/events/__init__.py

@@ -104,7 +104,7 @@ class EventReceiver(object):
         stop unless forced via :exc:`KeyboardInterrupt` or :exc:`SystemExit`.
 
         """
-        consumer = self.consume()
+        consumer = self.consumer()
         it = consumer.iterconsume(limit=limit)
         while True:
             it.next()

+ 3 - 0
celery/events/state.py

@@ -88,6 +88,9 @@ class Task(Thing):
         self.suceeded = timestamp
         self.update(fields, timestamp=timestamp)
 
+    def revoked(self, timestamp=None):
+        self.state = states.REVOKED
+
 
 class State(object):
 

+ 1 - 5
celery/worker/controllers.py

@@ -8,7 +8,6 @@ import threading
 from Queue import Empty as QueueEmpty
 
 from celery import log
-from celery.worker.revoke import revoked
 
 
 class BackgroundThread(threading.Thread):
@@ -90,10 +89,7 @@ class Mediator(BackgroundThread):
         except QueueEmpty:
             time.sleep(0.2)
         else:
-            if task.task_id in revoked: # task revoked
-                task.on_ack()
-                self.logger.warn("Mediator: Skipping revoked task: %s[%s]" % (
-                        task.task_name, task.task_id))
+            if task.revoked():
                 return
 
             self.logger.debug(

+ 18 - 0
celery/worker/job.py

@@ -14,6 +14,7 @@ from celery import platform
 from celery.log import get_default_logger
 from celery.utils import noop, fun_takes_kwargs
 from celery.utils.mail import mail_admins
+from celery.worker.revoke import revoked
 from celery.loaders import current_loader
 from celery.execute.trace import TaskTrace
 from celery.registry import tasks
@@ -198,6 +199,7 @@ class TaskWrapper(object):
         self.on_ack = on_ack
         self.delivery_info = delivery_info or {}
         self.task = tasks[self.task_name]
+        self._already_revoked = False
 
         for opt in ("success_msg", "fail_msg", "fail_email_subject",
                 "fail_email_body", "logger", "eventer"):
@@ -211,6 +213,18 @@ class TaskWrapper(object):
                 self.task_name, self.task_id,
                 self.args, self.kwargs)
 
+    def revoked(self):
+        if self._already_revoked:
+            return True
+        if self.task_id in revoked:
+            self.logger.warn("Skipping revoked task: %s[%s]" % (
+                self.task_name, self.task_id))
+            self.send_event("task-revoked", uuid=self.task_id)
+            self.on_ack()
+            self._already_revoked = True
+            return True
+        return False
+
     @classmethod
     def from_message(cls, message, message_data, logger=None, eventer=None):
         """Create a :class:`TaskWrapper` from a task message sent by
@@ -290,6 +304,8 @@ class TaskWrapper(object):
         :keyword logfile: The logfile used by the task.
 
         """
+        if self.revoked():
+            return
         # Make sure task has not already been executed.
         self._set_executed_bit()
 
@@ -318,6 +334,8 @@ class TaskWrapper(object):
         :returns :class:`multiprocessing.AsyncResult` instance.
 
         """
+        if self.revoked():
+            return
         # Make sure task has not already been executed.
         self._set_executed_bit()
 

+ 2 - 5
celery/worker/listener.py

@@ -10,7 +10,6 @@ from carrot.connection import AMQPConnectionException
 from celery import conf
 from celery.utils import noop, retry_over_time
 from celery.worker.job import TaskWrapper, InvalidTaskError
-from celery.worker.revoke import revoked
 from celery.worker.control import ControlDispatch
 from celery.worker.heartbeat import Heart
 from celery.events import EventDispatcher
@@ -136,10 +135,8 @@ class CarrotListener(object):
 
         """
 
-        if task.task_id in revoked:
-            self.logger.warn("Got revoked task from broker: %s[%s]" % (
-                task.task_name, task.task_id))
-            return task.on_ack()
+        if task.revoked():
+            return
 
         self.event_dispatcher.send("task-received", uuid=task.task_id,
                 name=task.task_name, args=repr(task.args),

+ 3 - 9
celery/worker/scheduler.py

@@ -3,7 +3,6 @@ from __future__ import generators
 import time
 import heapq
 
-from celery.worker.revoke import revoked
 from celery import log
 
 DEFAULT_MAX_INTERVAL = 2
@@ -51,16 +50,11 @@ class Scheduler(object):
                 eta, priority, item, callback = verify = self._queue[0]
                 now = nowfun()
 
-                # FIXME: Need a generic hook for this
-                if item.task_id in revoked:
+                if item.revoked():
                     event = pop(self._queue)
-                    if event is verify:
-                        item.on_ack()
-                        self.logger.warn(
-                                "Mediator: Skipping revoked task: %s[%s]" % (
-                                    item.task_name, item.task_id))
-                    else:
+                    if event is not verify:
                         heapq.heappush(self._queue, event)
+                    continue
 
                 if now < eta:
                     yield min(eta - now, self.max_interval)