base.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. import os
  2. import sys
  3. import time
  4. import traceback
  5. from functools import partial
  6. from celery import log
  7. from celery.datastructures import ExceptionInfo
  8. from celery.utils import timer2
  9. def apply_target(target, args=(), kwargs={}, callback=None,
  10. accept_callback=None, pid=None):
  11. if accept_callback:
  12. accept_callback(pid or os.getpid(), time.time())
  13. callback(target(*args, **kwargs))
  14. class BasePool(object):
  15. RUN = 0x1
  16. CLOSE = 0x2
  17. TERMINATE = 0x3
  18. Timer = timer2.Timer
  19. signal_safe = True
  20. is_green = False
  21. _state = None
  22. _pool = None
  23. def __init__(self, limit=None, putlocks=True, logger=None, **options):
  24. self.limit = limit
  25. self.putlocks = putlocks
  26. self.logger = logger or log.get_default_logger()
  27. self.options = options
  28. def on_start(self):
  29. pass
  30. def on_stop(self):
  31. pass
  32. def on_apply(self, *args, **kwargs):
  33. pass
  34. def on_terminate(self):
  35. pass
  36. def terminate_job(self, pid):
  37. raise NotImplementedError(
  38. "%s does not implement kill_job" % (self.__class__, ))
  39. def stop(self):
  40. self._state = self.CLOSE
  41. self.on_stop()
  42. self._state = self.TERMINATE
  43. def terminate(self):
  44. self._state = self.TERMINATE
  45. self.on_terminate()
  46. def start(self):
  47. self.on_start()
  48. self._state = self.RUN
  49. def apply_async(self, target, args=None, kwargs=None, callback=None,
  50. errback=None, accept_callback=None, timeout_callback=None,
  51. soft_timeout=None, timeout=None, **compat):
  52. """Equivalent of the :func:`apply` built-in function.
  53. Callbacks should optimally return as soon as possible ince
  54. otherwise the thread which handles the result will get blocked.
  55. """
  56. args = args or []
  57. kwargs = kwargs or {}
  58. on_ready = partial(self.on_ready, callback, errback)
  59. on_worker_error = partial(self.on_worker_error, errback)
  60. self.logger.debug("TaskPool: Apply %s (args:%s kwargs:%s)",
  61. target, args, kwargs)
  62. return self.on_apply(target, args, kwargs,
  63. callback=on_ready,
  64. accept_callback=accept_callback,
  65. timeout_callback=timeout_callback,
  66. error_callback=on_worker_error,
  67. waitforslot=self.putlocks,
  68. soft_timeout=soft_timeout,
  69. timeout=timeout)
  70. def on_ready(self, callback, errback, ret_value):
  71. """What to do when a worker task is ready and its return value has
  72. been collected."""
  73. if isinstance(ret_value, ExceptionInfo):
  74. if isinstance(ret_value.exception, (
  75. SystemExit, KeyboardInterrupt)):
  76. raise ret_value.exception
  77. self.safe_apply_callback(errback, ret_value)
  78. else:
  79. self.safe_apply_callback(callback, ret_value)
  80. def on_worker_error(self, errback, exc):
  81. errback(ExceptionInfo((exc.__class__, exc, None)))
  82. def safe_apply_callback(self, fun, *args):
  83. if fun:
  84. try:
  85. fun(*args)
  86. except BaseException:
  87. self.logger.error("Pool callback raised exception: %s",
  88. traceback.format_exc(),
  89. exc_info=sys.exc_info())
  90. def _get_info(self):
  91. return {}
  92. @property
  93. def info(self):
  94. return self._get_info()
  95. @property
  96. def active(self):
  97. return self._state == self.RUN
  98. @property
  99. def num_processes(self):
  100. return self.limit