Przeglądaj źródła

Playing around with batches.

Ask Solem 15 lat temu
rodzic
commit
d6f0915c9a
3 zmienionych plików z 63 dodań i 3 usunięć
  1. 57 0
      celery/contrib/batches.py
  2. 3 0
      celery/task/base.py
  3. 3 3
      celery/worker/__init__.py

+ 57 - 0
celery/contrib/batches.py

@@ -0,0 +1,57 @@
+from itertools import count
+from collections import deque, defaultdict
+
+from celery.task.base import Task
+
+
+class Batches(Task):
+    abstract = True
+    flush_every = 10
+
+    def __init__(self):
+        self._buffer = deque()
+        self._count = count().next
+
+    def execute(self, wrapper, pool, loglevel, logfile):
+        self._buffer.append((wrapper, pool, loglevel, logfile))
+
+        if not self._count() % self.flush_every:
+            self.flush(self._buffer)
+            self._buffer.clear()
+
+    def flush(self, tasks):
+        for wrapper, pool, loglevel, logfile in tasks:
+            wrapper.execute_using_pool(pool, loglevel, logfile)
+
+
+class Counter(Task):
+    abstract = True
+    flush_every = 10
+
+    def __init__(self):
+        self._buffer = deque()
+        self._count = count().next
+
+    def execute(self, wrapper, pool, loglevel, logfile):
+        self._buffer.append((wrapper.args, wrapper.kwargs))
+
+        if not self._count() % self.flush_every:
+            self.flush(self._buffer)
+            self._buffer.clear()
+
+    def flush(self, buffer):
+        raise NotImplementedError("Counters must implement 'flush'")
+
+
+
+class ClickCounter(Task):
+    flush_every = 1000
+
+    def flush(self, buffer):
+        urlcount = defaultdict(lambda: 0)
+        for args, kwargs in buffer:
+            urlcount[kwargs["url"]] += 1
+
+        for url, count in urlcount.items():
+            print(">>> Clicks: %s -> %s" % (url, count))
+            # increment_in_db(url, n=count)

+ 3 - 0
celery/task/base.py

@@ -401,6 +401,9 @@ class Task(object):
         """
         pass
 
+    def execute(self, wrapper, pool, loglevel, logfile):
+        wrapper.execute_using_pool(pool, loglevel, logfile)
+
 
 class ExecuteRemoteTask(Task):
     """Execute an arbitrary function or object.

+ 3 - 3
celery/worker/__init__.py

@@ -161,12 +161,12 @@ class WorkController(object):
         finally:
             self.stop()
 
-    def process_task(self, task):
+    def process_task(self, wrapper):
         """Process task by sending it to the pool of workers."""
         try:
             try:
-                task.execute_using_pool(self.pool, self.loglevel,
-                                        self.logfile)
+                wrapper.task.execute(wrapper, self.pool,
+                                     self.loglevel, self.logfile)
             except Exception, exc:
                 self.logger.critical("Internal error %s: %s\n%s" % (
                                 exc.__class__, exc, traceback.format_exc()))