base.py 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125
  1. import os
  2. import sys
  3. import time
  4. import traceback
  5. from celery import log
  6. from celery.datastructures import ExceptionInfo
  7. from celery.utils.functional import partial
  8. from celery.utils import timer2
  9. def apply_target(target, args=(), kwargs={}, callback=None,
  10. accept_callback=None, pid=None):
  11. if accept_callback:
  12. accept_callback(pid or os.getpid(), time.time())
  13. callback(target(*args, **kwargs))
  14. class BasePool(object):
  15. RUN = 0x1
  16. CLOSE = 0x2
  17. TERMINATE = 0x3
  18. Timer = timer2.Timer
  19. signal_safe = True
  20. _state = None
  21. _pool = None
  22. def __init__(self, limit=None, putlocks=True, logger=None, **options):
  23. self.limit = limit
  24. self.putlocks = putlocks
  25. self.logger = logger or log.get_default_logger()
  26. self.options = options
  27. def on_start(self):
  28. pass
  29. def on_stop(self):
  30. pass
  31. def on_apply(self, *args, **kwargs):
  32. pass
  33. def terminate_job(self, pid):
  34. raise NotImplementedError(
  35. "%s does not implement kill_job" % (self.__class__, ))
  36. def stop(self):
  37. self._state = self.CLOSE
  38. self.on_stop()
  39. self._state = self.TERMINATE
  40. def terminate(self):
  41. self._state = self.TERMINATE
  42. self.on_terminate()
  43. def start(self):
  44. self.on_start()
  45. self._state = self.RUN
  46. def apply_async(self, target, args=None, kwargs=None, callbacks=None,
  47. errbacks=None, accept_callback=None, timeout_callback=None,
  48. **compat):
  49. """Equivalent of the :func:`apply` built-in function.
  50. All `callbacks` and `errbacks` should complete immediately since
  51. otherwise the thread which handles the result will get blocked.
  52. """
  53. args = args or []
  54. kwargs = kwargs or {}
  55. callbacks = callbacks or []
  56. errbacks = errbacks or []
  57. on_ready = partial(self.on_ready, callbacks, errbacks)
  58. on_worker_error = partial(self.on_worker_error, errbacks)
  59. self.logger.debug("TaskPool: Apply %s (args:%s kwargs:%s)" % (
  60. target, args, kwargs))
  61. return self.on_apply(target, args, kwargs,
  62. callback=on_ready,
  63. accept_callback=accept_callback,
  64. timeout_callback=timeout_callback,
  65. error_callback=on_worker_error,
  66. waitforslot=self.putlocks)
  67. def on_ready(self, callbacks, errbacks, ret_value):
  68. """What to do when a worker task is ready and its return value has
  69. been collected."""
  70. if isinstance(ret_value, ExceptionInfo):
  71. if isinstance(ret_value.exception, (
  72. SystemExit, KeyboardInterrupt)):
  73. raise ret_value.exception
  74. [self.safe_apply_callback(errback, ret_value)
  75. for errback in errbacks]
  76. else:
  77. [self.safe_apply_callback(callback, ret_value)
  78. for callback in callbacks]
  79. def on_worker_error(self, errbacks, exc):
  80. einfo = ExceptionInfo((exc.__class__, exc, None))
  81. [errback(einfo) for errback in errbacks]
  82. def safe_apply_callback(self, fun, *args):
  83. try:
  84. fun(*args)
  85. except:
  86. self.logger.error("Pool callback raised exception: %s" % (
  87. traceback.format_exc(), ),
  88. exc_info=sys.exc_info())
  89. def _get_info(self):
  90. return {}
  91. @property
  92. def info(self):
  93. return self._get_info()
  94. @property
  95. def active(self):
  96. return self._state == self.RUN