base.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. # -*- coding: utf-8 -*-
  2. """Base Execution Pool."""
  3. from __future__ import absolute_import, unicode_literals
  4. import logging
  5. import os
  6. import sys
  7. from billiard.einfo import ExceptionInfo
  8. from billiard.exceptions import WorkerLostError
  9. from kombu.utils.encoding import safe_repr
  10. from celery.exceptions import WorkerShutdown, WorkerTerminate
  11. from celery.five import monotonic, reraise
  12. from celery.utils import timer2
  13. from celery.utils.log import get_logger
  14. from celery.utils.text import truncate
  15. __all__ = ('BasePool', 'apply_target')
  16. logger = get_logger('celery.pool')
  17. def apply_target(target, args=(), kwargs={}, callback=None,
  18. accept_callback=None, pid=None, getpid=os.getpid,
  19. propagate=(), monotonic=monotonic, **_):
  20. """Apply function within pool context."""
  21. if accept_callback:
  22. accept_callback(pid or getpid(), monotonic())
  23. try:
  24. ret = target(*args, **kwargs)
  25. except propagate:
  26. raise
  27. except Exception:
  28. raise
  29. except (WorkerShutdown, WorkerTerminate):
  30. raise
  31. except BaseException as exc:
  32. try:
  33. reraise(WorkerLostError, WorkerLostError(repr(exc)),
  34. sys.exc_info()[2])
  35. except WorkerLostError:
  36. callback(ExceptionInfo())
  37. else:
  38. callback(ret)
  39. class BasePool(object):
  40. """Task pool."""
  41. RUN = 0x1
  42. CLOSE = 0x2
  43. TERMINATE = 0x3
  44. Timer = timer2.Timer
  45. #: set to true if the pool can be shutdown from within
  46. #: a signal handler.
  47. signal_safe = True
  48. #: set to true if pool uses greenlets.
  49. is_green = False
  50. _state = None
  51. _pool = None
  52. _does_debug = True
  53. #: only used by multiprocessing pool
  54. uses_semaphore = False
  55. task_join_will_block = True
  56. body_can_be_buffer = False
  57. def __init__(self, limit=None, putlocks=True, forking_enable=True,
  58. callbacks_propagate=(), app=None, **options):
  59. self.limit = limit
  60. self.putlocks = putlocks
  61. self.options = options
  62. self.forking_enable = forking_enable
  63. self.callbacks_propagate = callbacks_propagate
  64. self.app = app
  65. def on_start(self):
  66. pass
  67. def did_start_ok(self):
  68. return True
  69. def flush(self):
  70. pass
  71. def on_stop(self):
  72. pass
  73. def register_with_event_loop(self, loop):
  74. pass
  75. def on_apply(self, *args, **kwargs):
  76. pass
  77. def on_terminate(self):
  78. pass
  79. def on_soft_timeout(self, job):
  80. pass
  81. def on_hard_timeout(self, job):
  82. pass
  83. def maintain_pool(self, *args, **kwargs):
  84. pass
  85. def terminate_job(self, pid, signal=None):
  86. raise NotImplementedError(
  87. '{0} does not implement kill_job'.format(type(self)))
  88. def restart(self):
  89. raise NotImplementedError(
  90. '{0} does not implement restart'.format(type(self)))
  91. def stop(self):
  92. self.on_stop()
  93. self._state = self.TERMINATE
  94. def terminate(self):
  95. self._state = self.TERMINATE
  96. self.on_terminate()
  97. def start(self):
  98. self._does_debug = logger.isEnabledFor(logging.DEBUG)
  99. self.on_start()
  100. self._state = self.RUN
  101. def close(self):
  102. self._state = self.CLOSE
  103. self.on_close()
  104. def on_close(self):
  105. pass
  106. def apply_async(self, target, args=[], kwargs={}, **options):
  107. """Equivalent of the :func:`apply` built-in function.
  108. Callbacks should optimally return as soon as possible since
  109. otherwise the thread which handles the result will get blocked.
  110. """
  111. if self._does_debug:
  112. logger.debug('TaskPool: Apply %s (args:%s kwargs:%s)',
  113. target, truncate(safe_repr(args), 1024),
  114. truncate(safe_repr(kwargs), 1024))
  115. return self.on_apply(target, args, kwargs,
  116. waitforslot=self.putlocks,
  117. callbacks_propagate=self.callbacks_propagate,
  118. **options)
  119. def _get_info(self):
  120. return {
  121. 'max-concurrency': self.limit,
  122. }
  123. @property
  124. def info(self):
  125. return self._get_info()
  126. @property
  127. def active(self):
  128. return self._state == self.RUN
  129. @property
  130. def num_processes(self):
  131. return self.limit