worker.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
  1. from carrot.connection import DjangoAMQPConnection
  2. from celery.messaging import TaskConsumer
  3. from celery.conf import DAEMON_CONCURRENCY, DAEMON_LOG_FILE
  4. from celery.conf import QUEUE_WAKEUP_AFTER, EMPTY_MSG_EMIT_EVERY
  5. from celery.log import setup_logger
  6. from celery.registry import tasks
  7. from celery.process import ProcessQueue
  8. from celery.models import PeriodicTaskMeta
  9. import multiprocessing
  10. import simplejson
  11. import traceback
  12. import logging
  13. import time
  14. class EmptyQueue(Exception):
  15. """The message queue is currently empty."""
  16. class UnknownTask(Exception):
  17. """Got an unknown task in the queue. The message is requeued and
  18. ignored."""
  19. class TaskWrapper(object):
  20. def __init__(self, task_name, task_id, task_func, args, kwargs):
  21. self.task_name = task_name
  22. self.task_id = task_id
  23. self.task_func = task_func
  24. self.args = args
  25. self.kwargs = kwargs
  26. @classmethod
  27. def from_message(cls, message):
  28. message_data = simplejson.loads(message.body)
  29. task_name = message_data.pop("task")
  30. task_id = message_data.pop("id")
  31. args = message_data.pop("args")
  32. kwargs = message_data.pop("kwargs")
  33. if task_name not in tasks:
  34. message.reject()
  35. raise UnknownTask(task_name)
  36. task_func = tasks[task_name]
  37. return cls(task_name, task_id, task_func, args, kwargs)
  38. def extend_kwargs_with_logging(self, loglevel, logfile):
  39. task_func_kwargs = {"logfile": logfile,
  40. "loglevel": loglevel}
  41. task_func_kwargs.update(self.kwargs)
  42. return task_func_kwargs
  43. def execute(self, loglevel, logfile):
  44. task_func_kwargs = self.extend_kwargs_with_logging(loglevel, logfile)
  45. return self.task_func(*self.args, **task_func_kwargs)
  46. def execute_using_pool(self, pool, loglevel, logfile):
  47. task_func_kwargs = self.extend_kwargs_with_logging(loglevel, logfile)
  48. return pool.apply_async(self.task_func, self.args, task_func_kwargs)
  49. class TaskDaemon(object):
  50. """Executes tasks waiting in the task queue.
  51. ``concurrency`` is the number of simultaneous processes.
  52. """
  53. loglevel = logging.ERROR
  54. concurrency = DAEMON_CONCURRENCY
  55. logfile = DAEMON_LOG_FILE
  56. queue_wakeup_after = QUEUE_WAKEUP_AFTER
  57. empty_msg_emit_every = EMPTY_MSG_EMIT_EVERY
  58. def __init__(self, concurrency=None, logfile=None, loglevel=None,
  59. queue_wakeup_after=None):
  60. self.loglevel = loglevel or self.loglevel
  61. self.concurrency = concurrency or self.concurrency
  62. self.logfile = logfile or self.logfile
  63. self.queue_wakeup_after = queue_wakeup_after or \
  64. self.queue_wakeup_after
  65. self.logger = setup_logger(loglevel, logfile)
  66. self.pool = multiprocessing.Pool(self.concurrency)
  67. self.task_consumer = TaskConsumer(connection=DjangoAMQPConnection)
  68. def fetch_next_task(self):
  69. message = self.task_consumer.fetch()
  70. if message is None: # No messages waiting.
  71. raise EmptyQueue()
  72. task = TaskWrapper.from_message(message)
  73. self.logger.info("Got task from broker: %s[%s]" % (
  74. task.task_name, task.task_id))
  75. return task, message
  76. def execute_next_task(self):
  77. task, message = self.fetch_next_task()
  78. try:
  79. result = task.execute_using_pool(self.pool, self.loglevel,
  80. self.logfile)
  81. except Exception, error:
  82. self.logger.critical("Worker got exception %s: %s\n%s" % (
  83. error.__class__, error, traceback.format_exc()))
  84. return
  85. message.ack()
  86. return result, task.task_name, task.task_id
  87. def run_periodic_tasks(self):
  88. """Schedule all waiting periodic tasks for execution.
  89. Returns list of :class:`celery.models.PeriodicTaskMeta` objects.
  90. """
  91. waiting_tasks = PeriodicTaskMeta.objects.get_waiting_tasks()
  92. [waiting_task.delay()
  93. for waiting_task in waiting_tasks]
  94. return waiting_tasks
  95. def run(self):
  96. """Run the worker server."""
  97. results = ProcessQueue(self.concurrency, logger=self.logger,
  98. done_msg="Task %(name)s[%(id)s] processed: %(return_value)s")
  99. last_empty_emit = None
  100. while True:
  101. self.run_periodic_tasks()
  102. try:
  103. result, task_name, task_id = self.execute_next_task()
  104. except ValueError:
  105. # execute_next_task didn't return a r/name/id tuple,
  106. # probably because it got an exception.
  107. continue
  108. except EmptyQueue:
  109. emit_every = self.empty_msg_emit_every
  110. if emit_every:
  111. if not last_empty_emit or \
  112. time.time() > last_empty_emit + emit_every:
  113. self.logger.info("Waiting for queue.")
  114. last_empty_emit = time.time()
  115. time.sleep(self.queue_wakeup_after)
  116. continue
  117. except UnknownTask, e:
  118. self.logger.info("Unknown task ignored: %s" % (e))
  119. continue
  120. except Exception, e:
  121. self.logger.critical("Message queue raised %s: %s\n%s" % (
  122. e.__class__, e, traceback.format_exc()))
  123. continue
  124. results.add(result, task_name, task_id)