base.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138
  1. from __future__ import absolute_import
  2. import logging
  3. import os
  4. import sys
  5. import time
  6. import traceback
  7. from functools import partial
  8. from .. import log
  9. from ..datastructures import ExceptionInfo
  10. from ..utils import timer2
  11. from ..utils.encoding import safe_repr
  12. def apply_target(target, args=(), kwargs={}, callback=None,
  13. accept_callback=None, pid=None):
  14. if accept_callback:
  15. accept_callback(pid or os.getpid(), time.time())
  16. callback(target(*args, **kwargs))
  17. class BasePool(object):
  18. RUN = 0x1
  19. CLOSE = 0x2
  20. TERMINATE = 0x3
  21. Timer = timer2.Timer
  22. signal_safe = True
  23. is_green = False
  24. _state = None
  25. _pool = None
  26. def __init__(self, limit=None, putlocks=True, logger=None, **options):
  27. self.limit = limit
  28. self.putlocks = putlocks
  29. self.logger = logger or log.get_default_logger()
  30. self.options = options
  31. self.does_debug = self.logger.isEnabledFor(logging.DEBUG)
  32. def on_start(self):
  33. pass
  34. def on_stop(self):
  35. pass
  36. def on_apply(self, *args, **kwargs):
  37. pass
  38. def on_terminate(self):
  39. pass
  40. def terminate_job(self, pid):
  41. raise NotImplementedError(
  42. "%s does not implement kill_job" % (self.__class__, ))
  43. def stop(self):
  44. self._state = self.CLOSE
  45. self.on_stop()
  46. self._state = self.TERMINATE
  47. def terminate(self):
  48. self._state = self.TERMINATE
  49. self.on_terminate()
  50. def start(self):
  51. self.on_start()
  52. self._state = self.RUN
  53. def apply_async(self, target, args=None, kwargs=None, callback=None,
  54. errback=None, accept_callback=None, timeout_callback=None,
  55. soft_timeout=None, timeout=None, **compat):
  56. """Equivalent of the :func:`apply` built-in function.
  57. Callbacks should optimally return as soon as possible ince
  58. otherwise the thread which handles the result will get blocked.
  59. """
  60. args = args or []
  61. kwargs = kwargs or {}
  62. on_ready = partial(self.on_ready, callback, errback)
  63. on_worker_error = partial(self.on_worker_error, errback)
  64. if self.does_debug:
  65. self.logger.debug("TaskPool: Apply %s (args:%s kwargs:%s)",
  66. target, safe_repr(args), safe_repr(kwargs))
  67. return self.on_apply(target, args, kwargs,
  68. callback=on_ready,
  69. accept_callback=accept_callback,
  70. timeout_callback=timeout_callback,
  71. error_callback=on_worker_error,
  72. waitforslot=self.putlocks,
  73. soft_timeout=soft_timeout,
  74. timeout=timeout)
  75. def on_ready(self, callback, errback, ret_value):
  76. """What to do when a worker task is ready and its return value has
  77. been collected."""
  78. if isinstance(ret_value, ExceptionInfo):
  79. if isinstance(ret_value.exception, (
  80. SystemExit, KeyboardInterrupt)):
  81. raise ret_value.exception
  82. self.safe_apply_callback(errback, ret_value)
  83. else:
  84. self.safe_apply_callback(callback, ret_value)
  85. def on_worker_error(self, errback, exc_info):
  86. errback(exc_info)
  87. def safe_apply_callback(self, fun, *args):
  88. if fun:
  89. try:
  90. fun(*args)
  91. except BaseException:
  92. self.logger.error("Pool callback raised exception: %s",
  93. traceback.format_exc(),
  94. exc_info=sys.exc_info())
  95. def _get_info(self):
  96. return {}
  97. @property
  98. def info(self):
  99. return self._get_info()
  100. @property
  101. def active(self):
  102. return self._state == self.RUN
  103. @property
  104. def num_processes(self):
  105. return self.limit