threads.py 1.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748
  1. # -*- coding: utf-8 -*-
  2. from __future__ import absolute_import
  3. from UserDict import UserDict
  4. from .base import apply_target, BasePool
  5. class NullDict(UserDict):
  6. def __setitem__(self, key, value):
  7. pass
  8. class TaskPool(BasePool):
  9. def __init__(self, *args, **kwargs):
  10. try:
  11. import threadpool
  12. except ImportError:
  13. raise ImportError(
  14. "The threaded pool requires the threadpool module.")
  15. self.WorkRequest = threadpool.WorkRequest
  16. self.ThreadPool = threadpool.ThreadPool
  17. super(TaskPool, self).__init__(*args, **kwargs)
  18. def on_start(self):
  19. self._pool = self.ThreadPool(self.limit)
  20. # threadpool stores all work requests until they are processed
  21. # we don't need this dict, and it occupies way too much memory.
  22. self._pool.workRequests = NullDict()
  23. self._quick_put = self._pool.putRequest
  24. self._quick_clear = self._pool._results_queue.queue.clear
  25. def on_stop(self):
  26. self._pool.dismissWorkers(self.limit, do_join=True)
  27. def on_apply(self, target, args=None, kwargs=None, callback=None,
  28. accept_callback=None, **_):
  29. req = self.WorkRequest(apply_target, (target, args, kwargs, callback,
  30. accept_callback))
  31. self._quick_put(req)
  32. # threadpool also has callback support,
  33. # but for some reason the callback is not triggered
  34. # before you've collected the results.
  35. # Clear the results (if any), so it doesn't grow too large.
  36. self._quick_clear()
  37. return req