__init__.py 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234
  1. """
  2. The Multiprocessing Worker Server
  3. Documentation for this module is in ``docs/reference/celery.worker.rst``.
  4. """
  5. from carrot.connection import DjangoAMQPConnection
  6. from celery.worker.controllers import Mediator, PeriodicWorkController
  7. from celery.worker.job import TaskWrapper
  8. from celery.registry import NotRegistered
  9. from celery.messaging import TaskConsumer
  10. from celery.conf import DAEMON_CONCURRENCY, DAEMON_LOG_FILE
  11. from celery.log import setup_logger
  12. from celery.pool import TaskPool
  13. from Queue import Queue
  14. import traceback
  15. import logging
  16. class AMQPListener(object):
  17. """Listen for messages received from the AMQP broker and
  18. move them the the bucket queue for task processing.
  19. :param bucket_queue: See :attr:`bucket_queue`.
  20. :param hold_queue: See :attr:`hold_queue`.
  21. .. attribute:: bucket_queue
  22. The queue that holds tasks ready for processing immediately.
  23. .. attribute:: hold_queue
  24. The queue that holds paused tasks. Reasons for being paused include
  25. a countdown/eta or that it's waiting for retry.
  26. .. attribute:: logger
  27. The logger used.
  28. """
  29. def __init__(self, bucket_queue, hold_queue, logger):
  30. self.amqp_connection = None
  31. self.task_consumer = None
  32. self.bucket_queue = bucket_queue
  33. self.hold_queue = hold_queue
  34. self.logger = logger
  35. def start(self):
  36. """Start processing AMQP messages."""
  37. task_consumer = self.reset_connection()
  38. it = task_consumer.iterconsume(limit=None)
  39. while True:
  40. it.next()
  41. def stop(self):
  42. """Stop processing AMQP messages and close the connection
  43. to the broker."""
  44. self.close_connection()
  45. def receive_message(self, message_data, message):
  46. """The callback called when a new message is received.
  47. If the message has an ``eta`` we move it to the hold queue,
  48. otherwise we move it the bucket queue for immediate processing.
  49. """
  50. try:
  51. task = TaskWrapper.from_message(message, message_data,
  52. logger=self.logger)
  53. except NotRegistered, exc:
  54. self.logger.info("Unknown task ignored: %s" % (exc))
  55. return
  56. eta = message_data.get("eta")
  57. if eta:
  58. self.hold_queue.put((task, eta))
  59. else:
  60. self.bucket_queue.put(task)
  61. def close_connection(self):
  62. """Close the AMQP connection."""
  63. if self.task_consumer:
  64. self.task_consumer.close()
  65. self.task_consumer = None
  66. if self.amqp_connection:
  67. self.amqp_connection.close()
  68. self.amqp_connection = None
  69. def reset_connection(self):
  70. """Reset the AMQP connection, and reinitialize the
  71. :class:`celery.messaging.TaskConsumer` instance.
  72. Resets the task consumer in :attr:`task_consumer`.
  73. """
  74. self.close_connection()
  75. self.amqp_connection = DjangoAMQPConnection()
  76. self.task_consumer = TaskConsumer(connection=self.amqp_connection)
  77. self.task_consumer.register_callback(self.receive_message)
  78. return self.task_consumer
  79. class WorkController(object):
  80. """Executes tasks waiting in the task queue.
  81. :param concurrency: see :attr:`concurrency`.
  82. :param logfile: see :attr:`logfile`.
  83. :param loglevel: see :attr:`loglevel`.
  84. .. attribute:: concurrency
  85. The number of simultaneous processes doing work (default:
  86. :const:`celery.conf.DAEMON_CONCURRENCY`)
  87. .. attribute:: loglevel
  88. The loglevel used (default: :const:`logging.INFO`)
  89. .. attribute:: logfile
  90. The logfile used, if no logfile is specified it uses ``stderr``
  91. (default: :const:`celery.conf.DAEMON_LOG_FILE`).
  92. .. attribute:: logger
  93. The :class:`logging.Logger` instance used for logging.
  94. .. attribute:: is_detached
  95. Flag describing if the worker is running as a daemon or not.
  96. .. attribute:: pool
  97. The :class:`multiprocessing.Pool` instance used.
  98. .. attribute:: bucket_queue
  99. The :class:`Queue.Queue` that holds tasks ready for immediate
  100. processing.
  101. .. attribute:: hold_queue
  102. The :class:`Queue.Queue` that holds paused tasks. Reasons for holding
  103. back the task include waiting for ``eta`` to pass or the task is being
  104. retried.
  105. .. attribute:: periodic_work_controller
  106. Instance of :class:`celery.worker.controllers.PeriodicWorkController`.
  107. .. attribute:: mediator
  108. Instance of :class:`celery.worker.controllers.Mediator`.
  109. .. attribute:: amqp_listener
  110. Instance of :class:`AMQPListener`.
  111. """
  112. loglevel = logging.ERROR
  113. concurrency = DAEMON_CONCURRENCY
  114. logfile = DAEMON_LOG_FILE
  115. _state = None
  116. def __init__(self, concurrency=None, logfile=None, loglevel=None,
  117. is_detached=False):
  118. # Options
  119. self.loglevel = loglevel or self.loglevel
  120. self.concurrency = concurrency or self.concurrency
  121. self.logfile = logfile or self.logfile
  122. self.is_detached = is_detached
  123. self.logger = setup_logger(loglevel, logfile)
  124. # Queues
  125. self.bucket_queue = Queue()
  126. self.hold_queue = Queue()
  127. # Threads+Pool
  128. self.periodic_work_controller = PeriodicWorkController(
  129. self.bucket_queue,
  130. self.hold_queue)
  131. self.pool = TaskPool(self.concurrency, logger=self.logger)
  132. self.amqp_listener = AMQPListener(self.bucket_queue, self.hold_queue,
  133. logger=self.logger)
  134. self.mediator = Mediator(self.bucket_queue, self.safe_process_task)
  135. # The order is important here;
  136. # the first in the list is the first to start,
  137. # and they must be stopped in reverse order.
  138. self.components = [self.pool,
  139. self.mediator,
  140. self.periodic_work_controller,
  141. self.amqp_listener]
  142. def start(self):
  143. """Starts the workers main loop."""
  144. self._state = "RUN"
  145. try:
  146. [component.start() for component in self.components]
  147. finally:
  148. self.stop()
  149. def safe_process_task(self, task):
  150. """Same as :meth:`process_task`, but catches all exceptions
  151. the task raises and log them as errors, to make sure the
  152. worker doesn't die."""
  153. try:
  154. try:
  155. self.process_task(task)
  156. except Exception, exc:
  157. self.logger.critical("Internal error %s: %s\n%s" % (
  158. exc.__class__, exc, traceback.format_exc()))
  159. except (SystemExit, KeyboardInterrupt):
  160. self.stop()
  161. def process_task(self, task):
  162. """Process task by sending it to the pool of workers."""
  163. self.logger.info("Got task from broker: %s[%s]" % (
  164. task.task_name, task.task_id))
  165. task.execute_using_pool(self.pool, self.loglevel, self.logfile)
  166. def stop(self):
  167. """Gracefully shutdown the worker server."""
  168. # shut down the periodic work controller thread
  169. if self._state != "RUN":
  170. return
  171. [component.stop() for component in reversed(self.components)]