batches.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197
  1. """
  2. celery.contrib.batches
  3. ======================
  4. Collect messages and processes them as a list.
  5. **Example**
  6. A click counter that flushes the buffer every 100 messages, and every
  7. 10 seconds.
  8. .. code-block:: python
  9. from celery.task import task
  10. from celery.contrib.batches import Batches
  11. # Flush after 100 messages, or 10 seconds.
  12. @task(base=Batches, flush_every=100, flush_interval=10)
  13. def count_click(requests):
  14. from collections import Counter
  15. count = Counter(request.kwargs["url"] for request in requests)
  16. for url, count in count.items():
  17. print(">>> Clicks: %s -> %s" % (url, count))
  18. Registering the click is done as follows:
  19. >>> count_click.delay(url="http://example.com")
  20. .. warning::
  21. For this to work you have to set
  22. :setting:`CELERYD_PREFETCH_MULTIPLIER` to zero, or some value where
  23. the final multiplied value is higher than ``flush_every``.
  24. In the future we hope to add the ability to direct batching tasks
  25. to a channel with different QoS requirements than the task channel.
  26. :copyright: (c) 2009 - 2011 by Ask Solem.
  27. :license: BSD, see LICENSE for more details.
  28. """
  29. from __future__ import absolute_import
  30. from itertools import count
  31. from Queue import Empty, Queue
  32. from celery.task import Task
  33. from celery.utils import cached_property, timer2
  34. from celery.worker import state
  35. def consume_queue(queue):
  36. """Iterator yielding all immediately available items in a
  37. :class:`Queue.Queue`.
  38. The iterator stops as soon as the queue raises :exc:`Queue.Empty`.
  39. *Examples*
  40. >>> q = Queue()
  41. >>> map(q.put, range(4))
  42. >>> list(consume_queue(q))
  43. [0, 1, 2, 3]
  44. >>> list(consume_queue(q))
  45. []
  46. """
  47. get = queue.get_nowait
  48. while 1:
  49. try:
  50. yield get()
  51. except Empty:
  52. break
  53. def apply_batches_task(task, args, loglevel, logfile):
  54. task.request.update({"loglevel": loglevel, "logfile": logfile})
  55. try:
  56. result = task(*args)
  57. except Exception, exp:
  58. result = None
  59. task.logger.error("There was an Exception: %s", exp, exc_info=True)
  60. finally:
  61. task.request.clear()
  62. return result
  63. class SimpleRequest(object):
  64. """Pickleable request."""
  65. #: task id
  66. id = None
  67. #: task name
  68. name = None
  69. #: positional arguments
  70. args = ()
  71. #: keyword arguments
  72. kwargs = {}
  73. #: message delivery information.
  74. delivery_info = None
  75. #: worker node name
  76. hostname = None
  77. def __init__(self, id, name, args, kwargs, delivery_info, hostname):
  78. self.id = id
  79. self.name = name
  80. self.args = args
  81. self.kwargs = kwargs
  82. self.delivery_info = delivery_info
  83. self.hostname = hostname
  84. @classmethod
  85. def from_request(cls, request):
  86. return cls(request.task_id, request.task_name, request.args,
  87. request.kwargs, request.delivery_info, request.hostname)
  88. class Batches(Task):
  89. abstract = True
  90. #: Maximum number of message in buffer.
  91. flush_every = 10
  92. #: Timeout in seconds before buffer is flushed anyway.
  93. flush_interval = 30
  94. def __init__(self):
  95. self._buffer = Queue()
  96. self._count = count(1).next
  97. self._tref = None
  98. self._pool = None
  99. self._logging = None
  100. def run(self, requests):
  101. raise NotImplementedError("%r must implement run(requests)" % (self, ))
  102. def flush(self, requests):
  103. return self.apply_buffer(requests, ([SimpleRequest.from_request(r)
  104. for r in requests], ))
  105. def execute(self, request, pool, loglevel, logfile):
  106. if not self._pool: # just take pool from first task.
  107. self._pool = pool
  108. if not self._logging:
  109. self._logging = loglevel, logfile
  110. state.task_ready(request) # immediately remove from worker state.
  111. self._buffer.put(request)
  112. if self._tref is None: # first request starts flush timer.
  113. self._tref = timer2.apply_interval(self.flush_interval * 1000,
  114. self._do_flush)
  115. if not self._count() % self.flush_every:
  116. self._do_flush()
  117. def _do_flush(self):
  118. self.debug("Wake-up to flush buffer...")
  119. requests = None
  120. if self._buffer.qsize():
  121. requests = list(consume_queue(self._buffer))
  122. if requests:
  123. self.debug("Buffer complete: %s" % (len(requests), ))
  124. self.flush(requests)
  125. if not requests:
  126. self.debug("Cancelling timer: Nothing in buffer.")
  127. self._tref.cancel() # cancel timer.
  128. self._tref = None
  129. def apply_buffer(self, requests, args=(), kwargs={}):
  130. acks_late = [], []
  131. [acks_late[r.task.acks_late].append(r) for r in requests]
  132. assert requests and (acks_late[True] or acks_late[False])
  133. def on_accepted(pid, time_accepted):
  134. [req.acknowledge() for req in acks_late[False]]
  135. def on_return(result):
  136. [req.acknowledge() for req in acks_late[True]]
  137. loglevel, logfile = self._logging
  138. return self._pool.apply_async(apply_batches_task,
  139. (self, args, loglevel, logfile),
  140. accept_callback=on_accepted,
  141. callback=acks_late[True] and on_return or None)
  142. def debug(self, msg):
  143. self.logger.debug("%s: %s", self.name, msg)
  144. @cached_property
  145. def logger(self):
  146. return self.app.log.get_default_logger()