|
@@ -546,24 +546,31 @@ Combining these you can easily process events in real-time:
|
|
|
|
|
|
.. code-block:: python
|
|
|
|
|
|
-
|
|
|
from celery import Celery
|
|
|
|
|
|
|
|
|
- def monitor_events(app):
|
|
|
+ def my_monitor(app):
|
|
|
state = app.events.State()
|
|
|
|
|
|
- def on_event(event):
|
|
|
- state.event(event) # <-- updates in-memory cluster state
|
|
|
+ def announce_failed_tasks(event):
|
|
|
+ state.event(event)
|
|
|
+ # task name is sent only with -received event, and state
|
|
|
+ # will keep track of this for us.
|
|
|
+ task = state.tasks.get(event['uuid'])
|
|
|
|
|
|
- print('Workers online: %r' % ', '.join(
|
|
|
- worker for worker in state.workers if worker.alive
|
|
|
- ))
|
|
|
+ print('TASK FAILED: %s[%s] %s' % (
|
|
|
+ task.name, task.uuid, task.info(), ))
|
|
|
|
|
|
with app.connection() as connection:
|
|
|
- recv = app.events.Receiver(connection, handlers={'*': on_event})
|
|
|
+ recv = app.events.Receiver(connection, handlers={
|
|
|
+ 'task-failed': announce_failed_tasks,
|
|
|
+ '*': state.event,
|
|
|
+ })
|
|
|
recv.capture(limit=None, timeout=None, wakeup=True)
|
|
|
|
|
|
+ if __name__ == '__main__':
|
|
|
+ celery = Celery(broker='amqp://guest@localhost//')
|
|
|
+ my_monitor(celery)
|
|
|
|
|
|
.. note::
|
|
|
|