batches.py 1.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657
  1. from itertools import count
  2. from collections import deque
  3. from celery.task.base import Task
  4. from celery.utils.compat import defaultdict
  5. class Batches(Task):
  6. abstract = True
  7. flush_every = 10
  8. def __init__(self):
  9. self._buffer = deque()
  10. self._count = count().next
  11. def execute(self, wrapper, pool, loglevel, logfile):
  12. self._buffer.append((wrapper, pool, loglevel, logfile))
  13. if not self._count() % self.flush_every:
  14. self.flush(self._buffer)
  15. self._buffer.clear()
  16. def flush(self, tasks):
  17. for wrapper, pool, loglevel, logfile in tasks:
  18. wrapper.execute_using_pool(pool, loglevel, logfile)
  19. class Counter(Task):
  20. abstract = True
  21. flush_every = 10
  22. def __init__(self):
  23. self._buffer = deque()
  24. self._count = count().next
  25. def execute(self, wrapper, pool, loglevel, logfile):
  26. self._buffer.append((wrapper.args, wrapper.kwargs))
  27. if not self._count() % self.flush_every:
  28. self.flush(self._buffer)
  29. self._buffer.clear()
  30. def flush(self, buffer):
  31. raise NotImplementedError("Counters must implement 'flush'")
  32. class ClickCounter(Task):
  33. flush_every = 1000
  34. def flush(self, buffer):
  35. urlcount = defaultdict(lambda: 0)
  36. for args, kwargs in buffer:
  37. urlcount[kwargs["url"]] += 1
  38. for url, count in urlcount.items():
  39. print(">>> Clicks: %s -> %s" % (url, count))
  40. # increment_in_db(url, n=count)