123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164 |
- # -*- coding: utf-8 -*-
- """
- celery.concurrency.base
- ~~~~~~~~~~~~~~~~~~~~~~~
- TaskPool interface.
- """
- from __future__ import absolute_import
- import logging
- import os
- import time
- from kombu.utils.encoding import safe_repr
- from celery.utils import timer2
- from celery.utils.log import get_logger
- logger = get_logger('celery.concurrency')
- def apply_target(target, args=(), kwargs={}, callback=None,
- accept_callback=None, pid=None, **_):
- if accept_callback:
- accept_callback(pid or os.getpid(), time.time())
- callback(target(*args, **kwargs))
- class BasePool(object):
- RUN = 0x1
- CLOSE = 0x2
- TERMINATE = 0x3
- Timer = timer2.Timer
- #: set to true if the pool can be shutdown from within
- #: a signal handler.
- signal_safe = True
- #: set to true if pool supports rate limits.
- #: (this is here for gevent, which currently does not implement
- #: the necessary timers).
- rlimit_safe = True
- #: set to true if pool requires the use of a mediator
- #: thread (e.g. if applying new items can block the current thread).
- requires_mediator = False
- #: set to true if pool uses greenlets.
- is_green = False
- _state = None
- _pool = None
- #: only used by multiprocessing pool
- uses_semaphore = False
- def __init__(self, limit=None, putlocks=True, forking_enable=True,
- **options):
- self.limit = limit
- self.putlocks = putlocks
- self.options = options
- self.forking_enable = forking_enable
- self._does_debug = logger.isEnabledFor(logging.DEBUG)
- def on_start(self):
- pass
- def did_start_ok(self):
- return True
- def on_stop(self):
- pass
- def on_apply(self, *args, **kwargs):
- pass
- def on_terminate(self):
- pass
- def on_soft_timeout(self, job):
- pass
- def on_hard_timeout(self, job):
- pass
- def maybe_handle_result(self, *args):
- pass
- def maintain_pool(self, *args, **kwargs):
- pass
- def terminate_job(self, pid):
- raise NotImplementedError(
- '{0} does not implement kill_job'.format(type(self)))
- def restart(self):
- raise NotImplementedError(
- '{0} does not implement restart'.format(type(self)))
- def stop(self):
- self.on_stop()
- self._state = self.TERMINATE
- def terminate(self):
- self._state = self.TERMINATE
- self.on_terminate()
- def start(self):
- self.on_start()
- self._state = self.RUN
- def close(self):
- self._state = self.CLOSE
- self.on_close()
- def on_close(self):
- pass
- def init_callbacks(self, **kwargs):
- pass
- def apply_async(self, target, args=[], kwargs={}, **options):
- """Equivalent of the :func:`apply` built-in function.
- Callbacks should optimally return as soon as possible since
- otherwise the thread which handles the result will get blocked.
- """
- if self._does_debug:
- logger.debug('TaskPool: Apply %s (args:%s kwargs:%s)',
- target, safe_repr(args), safe_repr(kwargs))
- return self.on_apply(target, args, kwargs,
- waitforslot=self.putlocks,
- **options)
- def _get_info(self):
- return {}
- @property
- def info(self):
- return self._get_info()
- @property
- def active(self):
- return self._state == self.RUN
- @property
- def num_processes(self):
- return self.limit
- @property
- def readers(self):
- return {}
- @property
- def writers(self):
- return {}
- @property
- def timers(self):
- return {}
|