base.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144
  1. # -*- coding: utf-8 -*-
  2. from __future__ import absolute_import
  3. import logging
  4. import os
  5. import sys
  6. import time
  7. import traceback
  8. from functools import partial
  9. from .. import log
  10. from ..datastructures import ExceptionInfo
  11. from ..utils import timer2
  12. from ..utils.encoding import safe_repr
  13. def apply_target(target, args=(), kwargs={}, callback=None,
  14. accept_callback=None, pid=None, **_):
  15. if accept_callback:
  16. accept_callback(pid or os.getpid(), time.time())
  17. callback(target(*args, **kwargs))
  18. class BasePool(object):
  19. RUN = 0x1
  20. CLOSE = 0x2
  21. TERMINATE = 0x3
  22. Timer = timer2.Timer
  23. signal_safe = True
  24. rlimit_safe = True
  25. is_green = False
  26. _state = None
  27. _pool = None
  28. def __init__(self, limit=None, putlocks=True, logger=None, **options):
  29. self.limit = limit
  30. self.putlocks = putlocks
  31. self.logger = logger or log.get_default_logger()
  32. self.options = options
  33. self._does_debug = self.logger.isEnabledFor(logging.DEBUG)
  34. def on_start(self):
  35. pass
  36. def on_stop(self):
  37. pass
  38. def on_apply(self, *args, **kwargs):
  39. pass
  40. def on_terminate(self):
  41. pass
  42. def terminate_job(self, pid):
  43. raise NotImplementedError(
  44. "%s does not implement kill_job" % (self.__class__, ))
  45. def restart(self):
  46. raise NotImplementedError(
  47. "%s does not implement restart" % (self.__class__, ))
  48. def stop(self):
  49. self._state = self.CLOSE
  50. self.on_stop()
  51. self._state = self.TERMINATE
  52. def terminate(self):
  53. self._state = self.TERMINATE
  54. self.on_terminate()
  55. def start(self):
  56. self.on_start()
  57. self._state = self.RUN
  58. def apply_async(self, target, args=None, kwargs=None, callback=None,
  59. errback=None, accept_callback=None, timeout_callback=None,
  60. soft_timeout=None, timeout=None, **compat):
  61. """Equivalent of the :func:`apply` built-in function.
  62. Callbacks should optimally return as soon as possible since
  63. otherwise the thread which handles the result will get blocked.
  64. """
  65. args = args or []
  66. kwargs = kwargs or {}
  67. on_ready = partial(self.on_ready, callback, errback)
  68. on_worker_error = partial(self.on_worker_error, errback)
  69. if self._does_debug:
  70. self.logger.debug("TaskPool: Apply %s (args:%s kwargs:%s)",
  71. target, safe_repr(args), safe_repr(kwargs))
  72. return self.on_apply(target, args, kwargs,
  73. callback=on_ready,
  74. accept_callback=accept_callback,
  75. timeout_callback=timeout_callback,
  76. error_callback=on_worker_error,
  77. waitforslot=self.putlocks,
  78. soft_timeout=soft_timeout,
  79. timeout=timeout)
  80. def on_ready(self, callback, errback, ret_value):
  81. """What to do when a worker task is ready and its return value has
  82. been collected."""
  83. if isinstance(ret_value, ExceptionInfo):
  84. if isinstance(ret_value.exception, (
  85. SystemExit, KeyboardInterrupt)):
  86. raise ret_value.exception
  87. self.safe_apply_callback(errback, ret_value)
  88. else:
  89. self.safe_apply_callback(callback, ret_value)
  90. def on_worker_error(self, errback, exc_info):
  91. errback(exc_info)
  92. def safe_apply_callback(self, fun, *args):
  93. if fun:
  94. try:
  95. fun(*args)
  96. except BaseException:
  97. self.logger.error("Pool callback raised exception: %s",
  98. traceback.format_exc(),
  99. exc_info=sys.exc_info())
  100. def _get_info(self):
  101. return {}
  102. @property
  103. def info(self):
  104. return self._get_info()
  105. @property
  106. def active(self):
  107. return self._state == self.RUN
  108. @property
  109. def num_processes(self):
  110. return self.limit