|
@@ -3,38 +3,82 @@
|
|
|
celery.contrib.batches
|
|
|
======================
|
|
|
|
|
|
-Collect messages and processes them as a list.
|
|
|
+Experimental task class that buffers messages and processes them as a list.
|
|
|
|
|
|
-**Example**
|
|
|
+.. warning::
|
|
|
+
|
|
|
+ For this to work you have to set
|
|
|
+ :setting:`CELERYD_PREFETCH_MULTIPLIER` to zero, or some value where
|
|
|
+ the final multiplied value is higher than ``flush_every``.
|
|
|
+
|
|
|
+ In the future we hope to add the ability to direct batching tasks
|
|
|
+ to a channel with different QoS requirements than the task channel.
|
|
|
+
|
|
|
+**Simple Example**
|
|
|
|
|
|
A click counter that flushes the buffer every 100 messages, and every
|
|
|
-10 seconds.
|
|
|
+seconds. Does not do anything with the data, but can easily be modified
|
|
|
+to store it in a database.
|
|
|
|
|
|
.. code-block:: python
|
|
|
|
|
|
- from celery import task
|
|
|
- from celery.contrib.batches import Batches
|
|
|
-
|
|
|
# Flush after 100 messages, or 10 seconds.
|
|
|
- @task(base=Batches, flush_every=100, flush_interval=10)
|
|
|
+ @app.task(base=Batches, flush_every=100, flush_interval=10)
|
|
|
def count_click(requests):
|
|
|
from collections import Counter
|
|
|
count = Counter(request.kwargs['url'] for request in requests)
|
|
|
for url, count in count.items():
|
|
|
print('>>> Clicks: %s -> %s' % (url, count))
|
|
|
|
|
|
-Registering the click is done as follows:
|
|
|
|
|
|
- >>> count_click.delay(url='http://example.com')
|
|
|
+Then you can ask for a click to be counted by doing::
|
|
|
|
|
|
-.. warning::
|
|
|
+ >>> count_click.delay('http://example.com')
|
|
|
|
|
|
- For this to work you have to set
|
|
|
- :setting:`CELERYD_PREFETCH_MULTIPLIER` to zero, or some value where
|
|
|
- the final multiplied value is higher than ``flush_every``.
|
|
|
+**Example returning results**
|
|
|
|
|
|
- In the future we hope to add the ability to direct batching tasks
|
|
|
- to a channel with different QoS requirements than the task channel.
|
|
|
+An interface to the Web of Trust API that flushes the buffer every 100
|
|
|
+messages, and every 10 seconds.
|
|
|
+
|
|
|
+.. code-block:: python
|
|
|
+
|
|
|
+ import requests
|
|
|
+ from urlparse import urlparse
|
|
|
+
|
|
|
+ from celery.contrib.batches import Batches
|
|
|
+
|
|
|
+ wot_api_target = "https://api.mywot.com/0.4/public_link_json"
|
|
|
+
|
|
|
+ @app.task(base=Batches, flush_every=100, flush_interval=10)
|
|
|
+ def wot_api(requests):
|
|
|
+ sig = lambda url: url
|
|
|
+ reponses = wot_api_real(
|
|
|
+ (sig(*request.args, **request.kwargs) for request in requests)
|
|
|
+ )
|
|
|
+ # use mark_as_done to manually return response data
|
|
|
+ for response, request in zip(reponses, requests):
|
|
|
+ app.backend.mark_as_done(request.id, response)
|
|
|
+
|
|
|
+
|
|
|
+ def wot_api_real(urls):
|
|
|
+ domains = [urlparse(url).netloc for url in urls]
|
|
|
+ response = requests.get(
|
|
|
+ wot_api_target,
|
|
|
+ params={"hosts": ('/').join(set(domains)) + '/'}
|
|
|
+ )
|
|
|
+ return [response.json[domain] for domain in domains]
|
|
|
+
|
|
|
+Using the API is done as follows::
|
|
|
+
|
|
|
+ >>> wot_api.delay('http://example.com')
|
|
|
+
|
|
|
+.. note::
|
|
|
+
|
|
|
+ If you don't have an ``app`` instance then use the current app proxy
|
|
|
+ instead::
|
|
|
+
|
|
|
+ from celery import current_app
|
|
|
+ app.backend.mark_as_done(request.id, response)
|
|
|
|
|
|
"""
|
|
|
from __future__ import absolute_import
|