gevent.py 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. # -*- coding: utf-8 -*-
  2. from __future__ import absolute_import
  3. import os
  4. if not os.environ.get("GEVENT_NOPATCH"):
  5. from gevent import monkey
  6. monkey.patch_all()
  7. from time import time
  8. from celery.utils import timer2
  9. from .base import apply_target, BasePool
  10. class Schedule(timer2.Schedule):
  11. def __init__(self, *args, **kwargs):
  12. from gevent.greenlet import Greenlet, GreenletExit
  13. class _Greenlet(Greenlet):
  14. def cancel(self):
  15. self.kill()
  16. self._Greenlet = _Greenlet
  17. self._GreenletExit = GreenletExit
  18. super(Schedule, self).__init__(*args, **kwargs)
  19. self._queue = set()
  20. def _enter(self, eta, priority, entry):
  21. secs = max(eta - time(), 0)
  22. g = self._Greenlet.spawn_later(secs, entry)
  23. self._queue.add(g)
  24. g.link(self._entry_exit)
  25. g.entry = entry
  26. g.eta = eta
  27. g.priority = priority
  28. g.cancelled = False
  29. return g
  30. def _entry_exit(self, g):
  31. try:
  32. g.kill()
  33. finally:
  34. self._queue.discard(g)
  35. def clear(self):
  36. queue = self._queue
  37. while queue:
  38. try:
  39. queue.pop().kill()
  40. except KeyError:
  41. pass
  42. @property
  43. def queue(self):
  44. return [(g.eta, g.priority, g.entry) for g in self._queue]
  45. class Timer(timer2.Timer):
  46. Schedule = Schedule
  47. def ensure_started(self):
  48. pass
  49. def stop(self):
  50. self.schedule.clear()
  51. def start(self):
  52. pass
  53. class TaskPool(BasePool):
  54. Timer = Timer
  55. signal_safe = False
  56. rlimit_safe = False
  57. is_green = True
  58. def __init__(self, *args, **kwargs):
  59. from gevent import spawn_raw
  60. from gevent.pool import Pool
  61. self.Pool = Pool
  62. self.spawn_n = spawn_raw
  63. super(TaskPool, self).__init__(*args, **kwargs)
  64. def on_start(self):
  65. self._pool = self.Pool(self.limit)
  66. def on_stop(self):
  67. if self._pool is not None:
  68. self._pool.join()
  69. def on_apply(self, target, args=None, kwargs=None, callback=None,
  70. accept_callback=None, **_):
  71. return self._pool.spawn(apply_target, target, args, kwargs,
  72. callback, accept_callback)
  73. def grow(self, n=1):
  74. self._pool._semaphore.counter += n
  75. self._pool.size += n
  76. def shrink(self, n=1):
  77. self._pool._semaphore.counter -= n
  78. self._pool.size -= n
  79. @property
  80. def num_processes(self):
  81. return len(self._pool)