gevent.py 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. # -*- coding: utf-8 -*-
  2. """Gevent execution pool."""
  3. from __future__ import absolute_import, unicode_literals
  4. try:
  5. from gevent import Timeout
  6. except ImportError: # pragma: no cover
  7. Timeout = None # noqa
  8. from kombu.async import timer as _timer
  9. from kombu.five import monotonic
  10. from . import base
  11. __all__ = ['TaskPool']
  12. # pylint: disable=redefined-outer-name
  13. # We cache globals and attribute lookups, so disable this warning.
  14. def apply_timeout(target, args=(), kwargs={}, callback=None,
  15. accept_callback=None, pid=None, timeout=None,
  16. timeout_callback=None, Timeout=Timeout,
  17. apply_target=base.apply_target, **rest):
  18. try:
  19. with Timeout(timeout):
  20. return apply_target(target, args, kwargs, callback,
  21. accept_callback, pid,
  22. propagate=(Timeout,), **rest)
  23. except Timeout:
  24. return timeout_callback(False, timeout)
  25. class Timer(_timer.Timer):
  26. def __init__(self, *args, **kwargs):
  27. from gevent.greenlet import Greenlet, GreenletExit
  28. class _Greenlet(Greenlet):
  29. cancel = Greenlet.kill
  30. self._Greenlet = _Greenlet
  31. self._GreenletExit = GreenletExit
  32. super(Timer, self).__init__(*args, **kwargs)
  33. self._queue = set()
  34. def _enter(self, eta, priority, entry, **kwargs):
  35. secs = max(eta - monotonic(), 0)
  36. g = self._Greenlet.spawn_later(secs, entry)
  37. self._queue.add(g)
  38. g.link(self._entry_exit)
  39. g.entry = entry
  40. g.eta = eta
  41. g.priority = priority
  42. g.canceled = False
  43. return g
  44. def _entry_exit(self, g):
  45. try:
  46. g.kill()
  47. finally:
  48. self._queue.discard(g)
  49. def clear(self):
  50. queue = self._queue
  51. while queue:
  52. try:
  53. queue.pop().kill()
  54. except KeyError:
  55. pass
  56. @property
  57. def queue(self):
  58. return self._queue
  59. class TaskPool(base.BasePool):
  60. """GEvent Pool."""
  61. Timer = Timer
  62. signal_safe = False
  63. is_green = True
  64. task_join_will_block = False
  65. _pool = None
  66. _quick_put = None
  67. def __init__(self, *args, **kwargs):
  68. from gevent import spawn_raw
  69. from gevent.pool import Pool
  70. self.Pool = Pool
  71. self.spawn_n = spawn_raw
  72. self.timeout = kwargs.get('timeout')
  73. super(TaskPool, self).__init__(*args, **kwargs)
  74. def on_start(self):
  75. self._pool = self.Pool(self.limit)
  76. self._quick_put = self._pool.spawn
  77. def on_stop(self):
  78. if self._pool is not None:
  79. self._pool.join()
  80. def on_apply(self, target, args=None, kwargs=None, callback=None,
  81. accept_callback=None, timeout=None,
  82. timeout_callback=None, apply_target=base.apply_target, **_):
  83. timeout = self.timeout if timeout is None else timeout
  84. return self._quick_put(apply_timeout if timeout else apply_target,
  85. target, args, kwargs, callback, accept_callback,
  86. timeout=timeout,
  87. timeout_callback=timeout_callback)
  88. def grow(self, n=1):
  89. self._pool._semaphore.counter += n
  90. self._pool.size += n
  91. def shrink(self, n=1):
  92. self._pool._semaphore.counter -= n
  93. self._pool.size -= n
  94. @property
  95. def num_processes(self):
  96. return len(self._pool)