Browse Source

Added CELERY_SEND_TASK_SENT_EVENT option. If enabled an event will be sent out
with every task, so monitors can track tasks before the workers receive them.

This means there is 2 messages for every task.

Ask Solem 14 years ago
parent
commit
07b6e297da

+ 17 - 4
celery/app/amqp.py

@@ -102,7 +102,8 @@ class TaskPublisher(messaging.Publisher):
 
     def delay_task(self, task_name, task_args=None, task_kwargs=None,
             countdown=None, eta=None, task_id=None, taskset_id=None,
-            expires=None, exchange=None, exchange_type=None, **kwargs):
+            expires=None, exchange=None, exchange_type=None,
+            event_dispatcher=None, **kwargs):
         """Delay task for execution by the celery nodes."""
 
         task_id = task_id or gen_unique_id()
@@ -122,14 +123,18 @@ class TaskPublisher(messaging.Publisher):
             now = now or datetime.now()
             expires = now + timedelta(seconds=expires)
 
+        retries = kwargs.get("retries", 0)
+        eta = eta and eta.isoformat()
+        expires = expires and expires.isoformat()
+
         message_data = {
             "task": task_name,
             "id": task_id,
             "args": task_args or [],
             "kwargs": task_kwargs or {},
-            "retries": kwargs.get("retries", 0),
-            "eta": eta and eta.isoformat(),
-            "expires": expires and expires.isoformat(),
+            "retries": retries,
+            "eta": eta,
+            "expires": expires,
         }
 
         if taskset_id:
@@ -146,6 +151,14 @@ class TaskPublisher(messaging.Publisher):
                   **extract_msg_options(kwargs))
         signals.task_sent.send(sender=task_name, **message_data)
 
+        if event_dispatcher:
+            event_dispatcher.send("task-sent", uuid=task_id,
+                                               name=task_name,
+                                               args=repr(task_args),
+                                               kwargs=repr(task_kwargs),
+                                               retries=retries,
+                                               eta=eta,
+                                               expires=expires)
         return task_id
 
 

+ 8 - 0
celery/app/base.py

@@ -37,6 +37,7 @@ class BaseApp(object):
         self._control = None
         self._loader = None
         self._log = None
+        self._events = None
         self.set_as_current = set_as_current
         self.on_init()
 
@@ -314,3 +315,10 @@ class BaseApp(object):
             from celery.log import Logging
             self._log = Logging(app=self)
         return self._log
+
+    @property
+    def events(self):
+        if self._events is None:
+            from celery.events import Events
+            self._events = Events(app=self)
+        return self._events

+ 1 - 0
celery/app/defaults.py

@@ -83,6 +83,7 @@ NAMESPACES = {
         "RESULT_PERSISTENT": Option(False, type="bool"),
         "SEND_EVENTS": Option(False, type="bool"),
         "SEND_TASK_ERROR_EMAILS": Option(False, type="bool"),
+        "SEND_TASK_SENT_EVENT": Option(False, type="bool"),
         "STORE_ERRORS_EVEN_IF_IGNORED": Option(False, type="bool"),
         "TASK_RESULT_EXPIRES": Option(timedelta(days=1), type="int"),
         "AMQP_TASK_RESULT_EXPIRES": Option(type="int"),

+ 44 - 5
celery/events/__init__.py

@@ -39,15 +39,25 @@ class EventDispatcher(object):
     :keyword enabled: Set to :const:`False` to not actually publish any events,
         making :meth:`send` a noop operation.
 
+    :keyword channel: Can be used instead of `connection` to specify
+        an exact channel to use when sending events.
+
+    :keyword buffer_while_offline: If enabled events will be buffered
+       while the connection is down. :meth:`flush` must be called
+       as soon as the connection is re-established.
+
     You need to :meth:`close` this after use.
 
     """
 
-    def __init__(self, connection, hostname=None, enabled=True, app=None):
+    def __init__(self, connection=None, hostname=None, enabled=True,
+            channel=None, buffer_while_offline=True, app=None):
         self.app = app_or_default(app)
         self.connection = connection
+        self.channel = channel
         self.hostname = hostname or socket.gethostname()
         self.enabled = enabled
+        self.buffer_while_offline = buffer_while_offline
         self._lock = threading.Lock()
         self.publisher = None
         self._outbound_buffer = deque()
@@ -58,14 +68,16 @@ class EventDispatcher(object):
     def enable(self):
         conf = self.app.conf
         self.enabled = True
-        self.publisher = Producer(self.connection.channel(),
-                                 exchange=event_exchange,
-                                 serializer=conf.CELERY_EVENT_SERIALIZER)
+        channel = self.channel or self.connection.channel()
+        self.publisher = Producer(channel,
+                                  exchange=event_exchange,
+                                  serializer=conf.CELERY_EVENT_SERIALIZER)
 
     def disable(self):
         self.enabled = False
         if self.publisher is not None:
-            self.publisher.channel.close()
+            if not self.channel:  # close auto channel.
+                self.publisher.channel.close()
             self.publisher = None
 
     def send(self, type, **fields):
@@ -85,6 +97,8 @@ class EventDispatcher(object):
                 self.publisher.publish(event,
                                        routing_key=type.replace("-", "."))
             except Exception, exc:
+                if not self.buffer_while_offline:
+                    raise
                 self._outbound_buffer.append((event, exc))
         finally:
             self._lock.release()
@@ -176,3 +190,28 @@ class EventReceiver(object):
     def _receive(self, message_data, message):
         type = message_data.pop("type").lower()
         self.process(type, create_event(type, message_data))
+
+
+
+class Events(object):
+
+    def __init__(self, app):
+        self.app = app
+
+    def Receiver(self, connection, handlers=None, routing_key="#"):
+        return EventReceiver(connection,
+                             handlers=handlers,
+                             routing_key=routing_key,
+                             app=self.app)
+
+    def Dispatcher(self, connection=None, hostname=None, enabled=True,
+            channel=None, buffer_while_offline=True):
+        return EventDispatcher(connection,
+                               hostname=hostname,
+                               enabled=enabled,
+                               channel=channel,
+                               app=self.app)
+
+    def State(self):
+        from celery.events.state import State as _State
+        return _State()

+ 2 - 4
celery/events/cursesmon.py

@@ -10,8 +10,6 @@ from textwrap import wrap
 
 from celery import states
 from celery.app import app_or_default
-from celery.events import EventReceiver
-from celery.events.state import State
 from celery.utils import abbr, abbrtask
 
 
@@ -387,13 +385,13 @@ class DisplayThread(threading.Thread):
 def evtop(app=None):
     sys.stderr.write("-> evtop: starting capture...\n")
     app = app_or_default(app)
-    state = State()
+    state = app.events.State()
     display = CursesMonitor(state, app=app)
     display.init_screen()
     refresher = DisplayThread(display)
     refresher.start()
     conn = app.broker_connection()
-    recv = EventReceiver(conn, app=app, handlers={"*": state.event})
+    recv = app.events.Receiver(conn, handlers={"*": state.event})
     try:
         recv.capture(limit=None)
     except Exception:

+ 1 - 2
celery/events/dumper.py

@@ -4,7 +4,6 @@ from datetime import datetime
 
 from celery.app import app_or_default
 from celery.datastructures import LocalCache
-from celery.events import EventReceiver
 
 
 TASK_NAMES = LocalCache(0xFFF)
@@ -57,7 +56,7 @@ def evdump(app=None):
     app = app_or_default(app)
     dumper = Dumper()
     conn = app.broker_connection()
-    recv = EventReceiver(conn, app=app, handlers={"*": dumper.on_event})
+    recv = app.events.Receiver(conn, handlers={"*": dumper.on_event})
     try:
         recv.capture()
     except (KeyboardInterrupt, SystemExit):

+ 2 - 4
celery/events/snapshot.py

@@ -2,8 +2,6 @@ from celery.utils import timer2
 
 from celery.app import app_or_default
 from celery.datastructures import TokenBucket
-from celery.events import EventReceiver
-from celery.events.state import State
 from celery.utils import instantiate, LOG_LEVELS
 from celery.utils.dispatch import Signal
 from celery.utils.timeutils import rate
@@ -82,12 +80,12 @@ def evcam(camera, freq=1.0, maxrate=None, loglevel=0,
     logger.info(
         "-> evcam: Taking snapshots with %s (every %s secs.)\n" % (
             camera, freq))
-    state = State()
+    state = app.events.State()
     cam = instantiate(camera, state, app=app,
                       freq=freq, maxrate=maxrate, logger=logger)
     cam.install()
     conn = app.broker_connection()
-    recv = EventReceiver(conn, app=app, handlers={"*": state.event})
+    recv = app.events.Receiver(conn, handlers={"*": state.event})
     try:
         try:
             recv.capture(limit=None)

+ 5 - 0
celery/events/state.py

@@ -66,6 +66,7 @@ class Task(Element):
                      name=None,
                      state=states.PENDING,
                      received=False,
+                     sent=False,
                      started=False,
                      succeeded=False,
                      failed=False,
@@ -111,6 +112,10 @@ class Task(Element):
             fields = dict((key, fields[key]) for key in keep)
             super(Task, self).update(fields)
 
+    def on_sent(self, timestamp=None, **fields):
+        self.sent = timestamp
+        self.update(states.PENDING, timestamp, fields)
+
     def on_received(self, timestamp=None, **fields):
         self.received = timestamp
         self.update(states.RECEIVED, timestamp, fields)

+ 9 - 2
celery/task/base.py

@@ -432,12 +432,13 @@ class BaseTask(object):
 
         """
         router = self.app.amqp.Router(queues)
+        conf = self.app.conf
 
-        if self.app.conf.CELERY_ALWAYS_EAGER:
+        if conf.CELERY_ALWAYS_EAGER:
             return self.apply(args, kwargs, task_id=task_id)
 
         options.setdefault("compression",
-                           self.app.conf.CELERY_MESSAGE_COMPRESSION)
+                           conf.CELERY_MESSAGE_COMPRESSION)
         options = dict(extract_exec_options(self), **options)
         options = router.route(options, self.name, args, kwargs)
         exchange = options.get("exchange")
@@ -447,11 +448,17 @@ class BaseTask(object):
         publish = publisher or self.get_publisher(connection,
                                                   exchange=exchange,
                                                   exchange_type=exchange_type)
+        evd = None
+        if conf.CELERY_SEND_TASK_SENT_EVENT:
+            evd = self.app.events.Dispatcher(channel=publish.channel,
+                                             buffer_while_offline=False)
+
         try:
             task_id = publish.delay_task(self.name, args, kwargs,
                                          task_id=task_id,
                                          countdown=countdown,
                                          eta=eta, expires=expires,
+                                         event_dispatcher=evd,
                                          **options)
         finally:
             if not publisher:

+ 1 - 3
celery/worker/consumer.py

@@ -76,7 +76,6 @@ import warnings
 
 from celery.app import app_or_default
 from celery.datastructures import AttributeDict, SharedCounter
-from celery.events import EventDispatcher
 from celery.exceptions import NotRegistered
 from celery.utils import noop
 from celery.utils.timer2 import to_timestamp
@@ -413,8 +412,7 @@ class Consumer(object):
         # Flush events sent while connection was down.
         if self.event_dispatcher:
             self.event_dispatcher.flush()
-        self.event_dispatcher = EventDispatcher(self.connection,
-                                                app=self.app,
+        self.event_dispatcher = self.app.events.Dispatcher(self.connection,
                                                 hostname=self.hostname,
                                                 enabled=self.send_events)
         self.restart_heartbeat()