123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301 |
- """Async I/O backend support utilities."""
- from __future__ import absolute_import, unicode_literals
- import socket
- import threading
- from collections import deque
- from time import sleep
- from weakref import WeakKeyDictionary
- from kombu.utils.compat import detect_environment
- from kombu.utils.objects import cached_property
- from celery import states
- from celery.exceptions import TimeoutError
- from celery.five import Empty, monotonic
- from celery.utils.threads import THREAD_TIMEOUT_MAX
- __all__ = [
- 'AsyncBackendMixin', 'BaseResultConsumer', 'Drainer',
- 'register_drainer',
- ]
- drainers = {}
- def register_drainer(name):
- """Decorator used to register a new result drainer type."""
- def _inner(cls):
- drainers[name] = cls
- return cls
- return _inner
- @register_drainer('default')
- class Drainer(object):
- """Result draining service."""
- def __init__(self, result_consumer):
- self.result_consumer = result_consumer
- def start(self):
- pass
- def stop(self):
- pass
- def drain_events_until(self, p, timeout=None, on_interval=None,
- monotonic=monotonic, wait=None):
- wait = wait or self.result_consumer.drain_events
- time_start = monotonic()
- while 1:
- # Total time spent may exceed a single call to wait()
- if timeout and monotonic() - time_start >= timeout:
- raise socket.timeout()
- try:
- yield self.wait_for(p, wait, timeout=1)
- except socket.timeout:
- pass
- if on_interval:
- on_interval()
- if p.ready: # got event on the wanted channel.
- break
- def wait_for(self, p, wait, timeout=None):
- wait(timeout=timeout)
- class greenletDrainer(Drainer):
- spawn = None
- _g = None
- def __init__(self, *args, **kwargs):
- super(greenletDrainer, self).__init__(*args, **kwargs)
- self._started = threading.Event()
- self._stopped = threading.Event()
- self._shutdown = threading.Event()
- def run(self):
- self._started.set()
- while not self._stopped.is_set():
- try:
- self.result_consumer.drain_events(timeout=1)
- except socket.timeout:
- pass
- self._shutdown.set()
- def start(self):
- if not self._started.is_set():
- self._g = self.spawn(self.run)
- self._started.wait()
- def stop(self):
- self._stopped.set()
- self._shutdown.wait(THREAD_TIMEOUT_MAX)
- def wait_for(self, p, wait, timeout=None):
- self.start()
- if not p.ready:
- sleep(0)
- @register_drainer('eventlet')
- class eventletDrainer(greenletDrainer):
- @cached_property
- def spawn(self):
- from eventlet import spawn
- return spawn
- @register_drainer('gevent')
- class geventDrainer(greenletDrainer):
- @cached_property
- def spawn(self):
- from gevent import spawn
- return spawn
- class AsyncBackendMixin(object):
- """Mixin for backends that enables the async API."""
- def _collect_into(self, result, bucket):
- self.result_consumer.buckets[result] = bucket
- def iter_native(self, result, no_ack=True, **kwargs):
- self._ensure_not_eager()
- results = result.results
- if not results:
- raise StopIteration()
- # we tell the result consumer to put consumed results
- # into these buckets.
- bucket = deque()
- for node in results:
- if node._cache:
- bucket.append(node)
- else:
- self._collect_into(node, bucket)
- for _ in self._wait_for_pending(result, no_ack=no_ack, **kwargs):
- while bucket:
- node = bucket.popleft()
- yield node.id, node._cache
- while bucket:
- node = bucket.popleft()
- yield node.id, node._cache
- def add_pending_result(self, result, weak=False, start_drainer=True):
- if start_drainer:
- self.result_consumer.drainer.start()
- try:
- self._maybe_resolve_from_buffer(result)
- except Empty:
- self._add_pending_result(result.id, result, weak=weak)
- return result
- def _maybe_resolve_from_buffer(self, result):
- result._maybe_set_cache(self._pending_messages.take(result.id))
- def _add_pending_result(self, task_id, result, weak=False):
- weak, concrete = self._pending_results
- if task_id not in weak and result.id not in concrete:
- (weak if weak else concrete)[task_id] = result
- self.result_consumer.consume_from(task_id)
- def add_pending_results(self, results, weak=False):
- self.result_consumer.drainer.start()
- return [self.add_pending_result(result, weak=weak, start_drainer=False)
- for result in results]
- def remove_pending_result(self, result):
- self._remove_pending_result(result.id)
- self.on_result_fulfilled(result)
- return result
- def _remove_pending_result(self, task_id):
- for map in self._pending_results:
- map.pop(task_id, None)
- def on_result_fulfilled(self, result):
- self.result_consumer.cancel_for(result.id)
- def wait_for_pending(self, result,
- callback=None, propagate=True, **kwargs):
- self._ensure_not_eager()
- for _ in self._wait_for_pending(result, **kwargs):
- pass
- return result.maybe_throw(callback=callback, propagate=propagate)
- def _wait_for_pending(self, result,
- timeout=None, on_interval=None, on_message=None,
- **kwargs):
- return self.result_consumer._wait_for_pending(
- result, timeout=timeout,
- on_interval=on_interval, on_message=on_message,
- )
- @property
- def is_async(self):
- return True
- class BaseResultConsumer(object):
- """Manager responsible for consuming result messages."""
- def __init__(self, backend, app, accept,
- pending_results, pending_messages):
- self.backend = backend
- self.app = app
- self.accept = accept
- self._pending_results = pending_results
- self._pending_messages = pending_messages
- self.on_message = None
- self.buckets = WeakKeyDictionary()
- self.drainer = drainers[detect_environment()](self)
- def start(self):
- raise NotImplementedError()
- def stop(self):
- pass
- def drain_events(self, timeout=None):
- raise NotImplementedError()
- def consume_from(self, task_id):
- raise NotImplementedError()
- def cancel_for(self, task_id):
- raise NotImplementedError()
- def _after_fork(self):
- self.buckets.clear()
- self.buckets = WeakKeyDictionary()
- self.on_message = None
- self.on_after_fork()
- def on_after_fork(self):
- pass
- def drain_events_until(self, p, timeout=None, on_interval=None):
- return self.drainer.drain_events_until(
- p, timeout=timeout, on_interval=on_interval)
- def _wait_for_pending(self, result,
- timeout=None, on_interval=None, on_message=None,
- **kwargs):
- self.on_wait_for_pending(result, timeout=timeout, **kwargs)
- prev_on_m, self.on_message = self.on_message, on_message
- try:
- for _ in self.drain_events_until(
- result.on_ready, timeout=timeout,
- on_interval=on_interval):
- yield
- sleep(0)
- except socket.timeout:
- raise TimeoutError('The operation timed out.')
- finally:
- self.on_message = prev_on_m
- def on_wait_for_pending(self, result, timeout=None, **kwargs):
- pass
- def on_out_of_band_result(self, message):
- self.on_state_change(message.payload, message)
- def _get_pending_result(self, task_id):
- for mapping in self._pending_results:
- try:
- return mapping[task_id]
- except KeyError:
- pass
- raise KeyError(task_id)
- def on_state_change(self, meta, message):
- if self.on_message:
- self.on_message(meta)
- if meta['status'] in states.READY_STATES:
- task_id = meta['task_id']
- try:
- result = self._get_pending_result(task_id)
- except KeyError:
- # send to buffer in case we received this result
- # before it was added to _pending_results.
- self._pending_messages.put(task_id, meta)
- else:
- result._maybe_set_cache(meta)
- buckets = self.buckets
- try:
- # remove bucket for this result, since it's fulfilled
- bucket = buckets.pop(result)
- except KeyError:
- pass
- else:
- # send to waiter via bucket
- bucket.append(result)
- sleep(0)
|