batches.py 1.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556
  1. from itertools import count
  2. from collections import deque, defaultdict
  3. from celery.task.base import Task
  4. class Batches(Task):
  5. abstract = True
  6. flush_every = 10
  7. def __init__(self):
  8. self._buffer = deque()
  9. self._count = count().next
  10. def execute(self, wrapper, pool, loglevel, logfile):
  11. self._buffer.append((wrapper, pool, loglevel, logfile))
  12. if not self._count() % self.flush_every:
  13. self.flush(self._buffer)
  14. self._buffer.clear()
  15. def flush(self, tasks):
  16. for wrapper, pool, loglevel, logfile in tasks:
  17. wrapper.execute_using_pool(pool, loglevel, logfile)
  18. class Counter(Task):
  19. abstract = True
  20. flush_every = 10
  21. def __init__(self):
  22. self._buffer = deque()
  23. self._count = count().next
  24. def execute(self, wrapper, pool, loglevel, logfile):
  25. self._buffer.append((wrapper.args, wrapper.kwargs))
  26. if not self._count() % self.flush_every:
  27. self.flush(self._buffer)
  28. self._buffer.clear()
  29. def flush(self, buffer):
  30. raise NotImplementedError("Counters must implement 'flush'")
  31. class ClickCounter(Task):
  32. flush_every = 1000
  33. def flush(self, buffer):
  34. urlcount = defaultdict(lambda: 0)
  35. for args, kwargs in buffer:
  36. urlcount[kwargs["url"]] += 1
  37. for url, count in urlcount.items():
  38. print(">>> Clicks: %s -> %s" % (url, count))
  39. # increment_in_db(url, n=count)