base.py 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.concurrency.base
  4. ~~~~~~~~~~~~~~~~~~~~~~~
  5. TaskPool interface.
  6. """
  7. from __future__ import absolute_import
  8. import logging
  9. import os
  10. import time
  11. from kombu.utils.encoding import safe_repr
  12. from celery.utils import timer2
  13. from celery.utils.log import get_logger
  14. __all__ = ['BasePool', 'apply_target']
  15. logger = get_logger('celery.pool')
  16. def apply_target(target, args=(), kwargs={}, callback=None,
  17. accept_callback=None, pid=None, **_):
  18. if accept_callback:
  19. accept_callback(pid or os.getpid(), time.time())
  20. callback(target(*args, **kwargs))
  21. class BasePool(object):
  22. RUN = 0x1
  23. CLOSE = 0x2
  24. TERMINATE = 0x3
  25. Timer = timer2.Timer
  26. #: set to true if the pool can be shutdown from within
  27. #: a signal handler.
  28. signal_safe = True
  29. #: set to true if pool uses greenlets.
  30. is_green = False
  31. _state = None
  32. _pool = None
  33. #: only used by multiprocessing pool
  34. uses_semaphore = False
  35. def __init__(self, limit=None, putlocks=True,
  36. forking_enable=True, callbacks_propagate=(), **options):
  37. self.limit = limit
  38. self.putlocks = putlocks
  39. self.options = options
  40. self.forking_enable = forking_enable
  41. self.callbacks_propagate = callbacks_propagate
  42. self._does_debug = logger.isEnabledFor(logging.DEBUG)
  43. def on_start(self):
  44. pass
  45. def did_start_ok(self):
  46. return True
  47. def flush(self):
  48. pass
  49. def on_stop(self):
  50. pass
  51. def register_with_event_loop(self, loop):
  52. pass
  53. def on_apply(self, *args, **kwargs):
  54. pass
  55. def on_terminate(self):
  56. pass
  57. def on_soft_timeout(self, job):
  58. pass
  59. def on_hard_timeout(self, job):
  60. pass
  61. def maybe_handle_result(self, *args):
  62. pass
  63. def maintain_pool(self, *args, **kwargs):
  64. pass
  65. def terminate_job(self, pid):
  66. raise NotImplementedError(
  67. '{0} does not implement kill_job'.format(type(self)))
  68. def restart(self):
  69. raise NotImplementedError(
  70. '{0} does not implement restart'.format(type(self)))
  71. def stop(self):
  72. self.on_stop()
  73. self._state = self.TERMINATE
  74. def terminate(self):
  75. self._state = self.TERMINATE
  76. self.on_terminate()
  77. def start(self):
  78. self.on_start()
  79. self._state = self.RUN
  80. def close(self):
  81. self._state = self.CLOSE
  82. self.on_close()
  83. def on_close(self):
  84. pass
  85. def apply_async(self, target, args=[], kwargs={}, **options):
  86. """Equivalent of the :func:`apply` built-in function.
  87. Callbacks should optimally return as soon as possible since
  88. otherwise the thread which handles the result will get blocked.
  89. """
  90. if self._does_debug:
  91. logger.debug('TaskPool: Apply %s (args:%s kwargs:%s)',
  92. target, safe_repr(args), safe_repr(kwargs))
  93. return self.on_apply(target, args, kwargs,
  94. waitforslot=self.putlocks,
  95. callbacks_propagate=self.callbacks_propagate,
  96. **options)
  97. def _get_info(self):
  98. return {}
  99. @property
  100. def info(self):
  101. return self._get_info()
  102. @property
  103. def active(self):
  104. return self._state == self.RUN
  105. @property
  106. def num_processes(self):
  107. return self.limit