coroutine.py 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. from __future__ import absolute_import
  2. from functools import wraps
  3. from Queue import Queue
  4. from celery.utils import cached_property
  5. def coroutine(fun):
  6. """Decorator that turns a generator into a coroutine that is
  7. started automatically, and that can send values back to the caller.
  8. **Example coroutine that returns values to caller**::
  9. @coroutine
  10. def adder(self):
  11. while 1:
  12. x, y = (yield)
  13. self.give(x + y)
  14. >>> c = adder()
  15. # call sends value and returns the result.
  16. >>> c.call(4, 4)
  17. 8
  18. # or you can send the value and get the result later.
  19. >>> c.send(4, 4)
  20. >>> c.get()
  21. 8
  22. **Example sink (input-only coroutine)**::
  23. @coroutine
  24. def uniq():
  25. seen = set()
  26. while 1:
  27. line = (yield)
  28. if line not in seen:
  29. seen.add(line)
  30. print(line)
  31. >>> u = uniq()
  32. >>> [u.send(l) for l in [1, 2, 2, 3]]
  33. [1, 2, 3]
  34. **Example chaining coroutines**::
  35. @coroutine
  36. def uniq(callback):
  37. seen = set()
  38. while 1:
  39. line = (yield)
  40. if line not in seen:
  41. callback.send(line)
  42. seen.add(line)
  43. @coroutine
  44. def uppercaser(callback):
  45. while 1:
  46. line = (yield)
  47. callback.send(str(line).upper())
  48. @coroutine
  49. def printer():
  50. while 1:
  51. line = (yield)
  52. print(line)
  53. >>> pipe = uniq(uppercaser(printer()))
  54. >>> for line in file("AUTHORS").readlines():
  55. pipe.send(line)
  56. """
  57. @wraps(fun)
  58. def start(*args, **kwargs):
  59. return Coroutine.start_from(fun, *args, **kwargs)
  60. return start
  61. class Coroutine(object):
  62. _gen = None
  63. started = False
  64. def bind(self, generator):
  65. self._gen = generator
  66. def _next(self):
  67. return self._gen.next()
  68. next = __next__ = _next
  69. def start(self):
  70. if self.started:
  71. raise ValueError("coroutine already started")
  72. self.next()
  73. self.started = True
  74. return self
  75. def send1(self, value):
  76. return self._gen.send(value)
  77. def call1(self, value, timeout=None):
  78. self.send1(value)
  79. return self.get(timeout=timeout)
  80. def send(self, *args):
  81. return self._gen.send(args)
  82. def call(self, *args, **opts):
  83. self.send(*args)
  84. return self.get(**opts)
  85. @classmethod
  86. def start_from(cls, fun, *args, **kwargs):
  87. coro = cls()
  88. coro.bind(fun(coro, *args, **kwargs))
  89. return coro.start()
  90. @cached_property
  91. def __output__(self):
  92. return Queue()
  93. @property
  94. def give(self):
  95. return self.__output__.put_nowait
  96. @property
  97. def get(self):
  98. return self.__output__.get
  99. if __name__ == "__main__":
  100. @coroutine
  101. def adder(self):
  102. while 1:
  103. x, y = (yield)
  104. self.give(x + y)
  105. x = adder()
  106. for i in xrange(10):
  107. print(x.call(i, i))