|
@@ -557,6 +557,11 @@ arguments::
|
|
|
Custom Camera
|
|
|
~~~~~~~~~~~~~
|
|
|
|
|
|
+Cameras can be useful if you need to capture events and do something
|
|
|
+with those events at an interval. For real-time event processing
|
|
|
+you should use :class:`@events.Receiver` directly, like in
|
|
|
+:ref:`event-real-time-example`.
|
|
|
+
|
|
|
Here is an example camera, dumping the snapshot to screen:
|
|
|
|
|
|
.. code-block:: python
|
|
@@ -565,7 +570,6 @@ Here is an example camera, dumping the snapshot to screen:
|
|
|
|
|
|
from celery.events.snapshot import Polaroid
|
|
|
|
|
|
-
|
|
|
class DumpCam(Polaroid):
|
|
|
|
|
|
def on_shutter(self, state):
|
|
@@ -581,26 +585,121 @@ See the API reference for :mod:`celery.events.state` to read more
|
|
|
about state objects.
|
|
|
|
|
|
Now you can use this cam with :program:`celery events` by specifying
|
|
|
-it with the `-c` option::
|
|
|
+it with the `-c` option:
|
|
|
+
|
|
|
+.. code-block:: bash
|
|
|
|
|
|
$ celery events -c myapp.DumpCam --frequency=2.0
|
|
|
|
|
|
-Or you can use it programmatically like this::
|
|
|
+Or you can use it programmatically like this:
|
|
|
|
|
|
- from celery.events import EventReceiver
|
|
|
- from celery.messaging import establish_connection
|
|
|
- from celery.events.state import State
|
|
|
+.. code-block:: python
|
|
|
+
|
|
|
+ from celery import Celery
|
|
|
from myapp import DumpCam
|
|
|
|
|
|
- def main():
|
|
|
- state = State()
|
|
|
- with establish_connection() as connection:
|
|
|
- recv = EventReceiver(connection, handlers={'*': state.event})
|
|
|
- with DumpCam(state, freq=1.0):
|
|
|
+ def main(app, freq=1.0):
|
|
|
+ state = app.events.State()
|
|
|
+ with app.connection() as connection:
|
|
|
+ recv = app.events.Receiver(connection, handlers={'*': state.event})
|
|
|
+ with DumpCam(state, freq=freq):
|
|
|
recv.capture(limit=None, timeout=None)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
- main()
|
|
|
+ celery = Celery(broker='amqp://guest@localhost//')
|
|
|
+ main(celery)
|
|
|
+
|
|
|
+.. _event-real-time-example:
|
|
|
+
|
|
|
+Real-time processing
|
|
|
+--------------------
|
|
|
+
|
|
|
+To process events in real-time we need the following
|
|
|
+
|
|
|
+- An event consumer (this is the ``Receiver``)
|
|
|
+
|
|
|
+- A set of handlers called when events come in.
|
|
|
+
|
|
|
+ You can have different handlers for each event type,
|
|
|
+ or a catch-all handler can be used ('*')
|
|
|
+
|
|
|
+- State (optional)
|
|
|
+
|
|
|
+ :class:`@events.State` is a convenient in-memory representation
|
|
|
+ of tasks and workers in the cluster that is updated as events come in.
|
|
|
+
|
|
|
+ It encapsulates solutions for many common things, like checking if a
|
|
|
+ worker is still alive (by verifying heartbeats), merging event fields
|
|
|
+ together as events come in, making sure timestamps are in sync, and so on.
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+Combining these we can easily process events in real-time:
|
|
|
+
|
|
|
+
|
|
|
+.. code-block:: python
|
|
|
+
|
|
|
+
|
|
|
+ from celery import Celery
|
|
|
+
|
|
|
+
|
|
|
+ def monitor_events(app):
|
|
|
+ state = app.events.State()
|
|
|
+
|
|
|
+ def on_event(event):
|
|
|
+ state.event(event) # <-- updates in-memory cluster state
|
|
|
+
|
|
|
+ print('Workers online: %r' % ', '.join(
|
|
|
+ worker for worker in state.workers if worker.alive
|
|
|
+ )
|
|
|
+
|
|
|
+ with app.connection() as connection:
|
|
|
+ recv = app.events.Receiver(connection, handlers={'*': on_event})
|
|
|
+ recv.capture(limit=None, timeout=None, wakeup=True)
|
|
|
+
|
|
|
+
|
|
|
+.. note::
|
|
|
+
|
|
|
+ The wakeup argument to ``capture`` sends a signal to all workers
|
|
|
+ to force them to send a heartbeat. This way we can immediately see
|
|
|
+ workers when the monitor starts.
|
|
|
+
|
|
|
+
|
|
|
+We can listen to specific events by specifying the handlers:
|
|
|
+
|
|
|
+.. code-block:: python
|
|
|
+
|
|
|
+ from celery import Celery
|
|
|
+
|
|
|
+ def my_monitor(app):
|
|
|
+ state = app.events.State()
|
|
|
+
|
|
|
+ def announce_failed_tasks(event):
|
|
|
+ state.event(event)
|
|
|
+ task_id = event['uuid']
|
|
|
+
|
|
|
+ print('TASK FAILED: %s[%s] %s' % (
|
|
|
+ event['name'], task_id, state[task_id].info(), ))
|
|
|
+
|
|
|
+ def announce_dead_workers(event):
|
|
|
+ state.event(event)
|
|
|
+ hostname = event['hostname']
|
|
|
+
|
|
|
+ if not state.workers[hostname].alive:
|
|
|
+ print('Worker %s missed heartbeats' % (hostname, ))
|
|
|
+
|
|
|
+
|
|
|
+ with app.connection() as connection:
|
|
|
+ recv = app.events.Receiver(connection, handlers={
|
|
|
+ 'task-failed': announce_failed_tasks,
|
|
|
+ 'worker-heartbeat': announce_dead_workers,
|
|
|
+ })
|
|
|
+ recv.capture(limit=None, timeout=None, wakeup=True)
|
|
|
+
|
|
|
+ if __name__ == '__main__':
|
|
|
+ celery = Celery(broker='amqp://guest@localhost//')
|
|
|
+ my_monitor(celery)
|
|
|
+
|
|
|
|
|
|
|
|
|
.. _event-reference:
|