base.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.concurrency.base
  4. ~~~~~~~~~~~~~~~~~~~~~~~
  5. TaskPool interface.
  6. """
  7. from __future__ import absolute_import
  8. import logging
  9. import os
  10. import time
  11. from kombu.utils.encoding import safe_repr
  12. from celery.utils import timer2
  13. from celery.utils.log import get_logger
  14. logger = get_logger('celery.concurrency')
  15. def apply_target(target, args=(), kwargs={}, callback=None,
  16. accept_callback=None, pid=None, **_):
  17. if accept_callback:
  18. accept_callback(pid or os.getpid(), time.time())
  19. callback(target(*args, **kwargs))
  20. class BasePool(object):
  21. RUN = 0x1
  22. CLOSE = 0x2
  23. TERMINATE = 0x3
  24. Timer = timer2.Timer
  25. #: set to true if the pool can be shutdown from within
  26. #: a signal handler.
  27. signal_safe = True
  28. #: set to true if pool supports rate limits.
  29. #: (this is here for gevent, which currently does not implement
  30. #: the necessary timers).
  31. rlimit_safe = True
  32. #: set to true if pool requires the use of a mediator
  33. #: thread (e.g. if applying new items can block the current thread).
  34. requires_mediator = False
  35. #: set to true if pool uses greenlets.
  36. is_green = False
  37. _state = None
  38. _pool = None
  39. #: only used by multiprocessing pool
  40. uses_semaphore = False
  41. def __init__(self, limit=None, putlocks=True,
  42. forking_enable=True, callbacks_propagate=(), **options):
  43. self.limit = limit
  44. self.putlocks = putlocks
  45. self.options = options
  46. self.forking_enable = forking_enable
  47. self.callbacks_propagate = callbacks_propagate
  48. self._does_debug = logger.isEnabledFor(logging.DEBUG)
  49. def on_start(self):
  50. pass
  51. def did_start_ok(self):
  52. return True
  53. def on_stop(self):
  54. pass
  55. def on_apply(self, *args, **kwargs):
  56. pass
  57. def on_terminate(self):
  58. pass
  59. def on_soft_timeout(self, job):
  60. pass
  61. def on_hard_timeout(self, job):
  62. pass
  63. def maybe_handle_result(self, *args):
  64. pass
  65. def maintain_pool(self, *args, **kwargs):
  66. pass
  67. def terminate_job(self, pid):
  68. raise NotImplementedError(
  69. '%s does not implement kill_job' % (self.__class__, ))
  70. def restart(self):
  71. raise NotImplementedError(
  72. '%s does not implement restart' % (self.__class__, ))
  73. def stop(self):
  74. self.on_stop()
  75. self._state = self.TERMINATE
  76. def terminate(self):
  77. self._state = self.TERMINATE
  78. self.on_terminate()
  79. def start(self):
  80. self.on_start()
  81. self._state = self.RUN
  82. def close(self):
  83. self._state = self.CLOSE
  84. self.on_close()
  85. def on_close(self):
  86. pass
  87. def init_callbacks(self, **kwargs):
  88. pass
  89. def apply_async(self, target, args=[], kwargs={}, **options):
  90. """Equivalent of the :func:`apply` built-in function.
  91. Callbacks should optimally return as soon as possible since
  92. otherwise the thread which handles the result will get blocked.
  93. """
  94. if self._does_debug:
  95. logger.debug('TaskPool: Apply %s (args:%s kwargs:%s)',
  96. target, safe_repr(args), safe_repr(kwargs))
  97. return self.on_apply(target, args, kwargs,
  98. waitforslot=self.putlocks,
  99. callbacks_propagate=self.callbacks_propagate,
  100. **options)
  101. def _get_info(self):
  102. return {}
  103. @property
  104. def info(self):
  105. return self._get_info()
  106. @property
  107. def active(self):
  108. return self._state == self.RUN
  109. @property
  110. def num_processes(self):
  111. return self.limit
  112. @property
  113. def readers(self):
  114. return {}
  115. @property
  116. def writers(self):
  117. return {}
  118. @property
  119. def timers(self):
  120. return {}