base.py 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122
  1. import sys
  2. import traceback
  3. from celery import log
  4. from celery.datastructures import ExceptionInfo
  5. from celery.utils.functional import partial
  6. def apply_target(target, args=(), kwargs={}, callback=None,
  7. accept_callback=None):
  8. if accept_callback:
  9. accept_callback()
  10. callback(target(*args, **kwargs))
  11. class BasePool(object):
  12. RUN = 0x1
  13. CLOSE = 0x2
  14. TERMINATE = 0x3
  15. signal_safe = True
  16. _state = None
  17. _pool = None
  18. def __init__(self, limit=None, putlocks=True, logger=None, **options):
  19. self.limit = limit
  20. self.putlocks = putlocks
  21. self.logger = logger or log.get_default_logger()
  22. self.options = options
  23. def on_start(self):
  24. pass
  25. def on_stop(self):
  26. pass
  27. def on_apply(self, *args, **kwargs):
  28. pass
  29. def stop(self):
  30. self._state = self.CLOSE
  31. self.on_stop()
  32. self._state = self.TERMINATE
  33. def terminate(self):
  34. self._state = self.TERMINATE
  35. self.on_terminate()
  36. def start(self):
  37. self.on_start()
  38. self._state = self.RUN
  39. def apply_async(self, target, args=None, kwargs=None, callbacks=None,
  40. errbacks=None, accept_callback=None, timeout_callback=None,
  41. **compat):
  42. """Equivalent of the :func:`apply` built-in function.
  43. All `callbacks` and `errbacks` should complete immediately since
  44. otherwise the thread which handles the result will get blocked.
  45. """
  46. args = args or []
  47. kwargs = kwargs or {}
  48. callbacks = callbacks or []
  49. errbacks = errbacks or []
  50. on_ready = partial(self.on_ready, callbacks, errbacks)
  51. on_worker_error = partial(self.on_worker_error, errbacks)
  52. self.logger.debug("TaskPool: Apply %s (args:%s kwargs:%s)" % (
  53. target, args, kwargs))
  54. return self.on_apply(target, args, kwargs,
  55. callback=on_ready,
  56. accept_callback=accept_callback,
  57. timeout_callback=timeout_callback,
  58. error_callback=on_worker_error,
  59. waitforslot=self.putlocks)
  60. def on_ready(self, callbacks, errbacks, ret_value):
  61. """What to do when a worker task is ready and its return value has
  62. been collected."""
  63. if isinstance(ret_value, ExceptionInfo):
  64. if isinstance(ret_value.exception, (
  65. SystemExit, KeyboardInterrupt)):
  66. raise ret_value.exception
  67. [self.safe_apply_callback(errback, ret_value)
  68. for errback in errbacks]
  69. else:
  70. [self.safe_apply_callback(callback, ret_value)
  71. for callback in callbacks]
  72. def on_worker_error(self, errbacks, exc):
  73. einfo = ExceptionInfo((exc.__class__, exc, None))
  74. [errback(einfo) for errback in errbacks]
  75. def safe_apply_callback(self, fun, *args):
  76. try:
  77. fun(*args)
  78. except:
  79. self.logger.error("Pool callback raised exception: %s" % (
  80. traceback.format_exc(), ),
  81. exc_info=sys.exc_info())
  82. def blocking(self, fun, *args, **kwargs):
  83. return fun(*args, **kwargs)
  84. def _get_info(self):
  85. return {}
  86. @property
  87. def info(self):
  88. return self._get_info()
  89. @property
  90. def active(self):
  91. return self._state == self.RUN