worker.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. from carrot.connection import DjangoAMQPConnection
  2. from crunchy.messaging import TaskConsumer
  3. from crunchy.conf import DAEMON_CONCURRENCY, DAEMON_LOG_FILE
  4. from crunchy.conf import QUEUE_WAKEUP_AFTER, EMPTY_MSG_EMIT_EVERY
  5. from crunchy.log import setup_logger
  6. from crunchy.registry import tasks
  7. from crunchy.process import ProcessQueue
  8. from crunchy.models import PeriodicTaskMeta
  9. import multiprocessing
  10. import simplejson
  11. import traceback
  12. import logging
  13. import time
  14. class EmptyQueue(Exception):
  15. """The message queue is currently empty."""
  16. class UnknownTask(Exception):
  17. """Got an unknown task in the queue. The message is requeued and
  18. ignored."""
  19. class TaskDaemon(object):
  20. """Executes tasks waiting in the task queue.
  21. ``concurrency`` is the number of simultaneous processes.
  22. """
  23. loglevel = logging.ERROR
  24. concurrency = DAEMON_CONCURRENCY
  25. logfile = DAEMON_LOG_FILE
  26. queue_wakeup_after = QUEUE_WAKEUP_AFTER
  27. def __init__(self, concurrency=None, logfile=None, loglevel=None,
  28. queue_wakeup_after=None):
  29. self.loglevel = loglevel or self.loglevel
  30. self.concurrency = concurrency or self.concurrency
  31. self.logfile = logfile or self.logfile
  32. self.queue_wakeup_after = queue_wakeup_after or \
  33. self.queue_wakeup_after
  34. self.logger = setup_logger(loglevel, logfile)
  35. self.pool = multiprocessing.Pool(self.concurrency)
  36. self.task_consumer = TaskConsumer(connection=DjangoAMQPConnection)
  37. self.task_registry = tasks
  38. def fetch_next_task(self):
  39. message = self.task_consumer.fetch()
  40. if message is None: # No messages waiting.
  41. raise EmptyQueue()
  42. message_data = simplejson.loads(message.body)
  43. task_name = message_data.pop("crunchTASK")
  44. task_id = message_data.pop("crunchID")
  45. self.logger.info("Got task from broker: %s[%s]" % (
  46. task_name, task_id))
  47. if task_name not in self.task_registry:
  48. message.reject()
  49. raise UnknownTask(task_name)
  50. task_func = self.task_registry[task_name]
  51. task_func_params = {"logfile": self.logfile,
  52. "loglevel": self.loglevel}
  53. task_func_params.update(message_data)
  54. #try:
  55. result = self.pool.apply_async(task_func, [], task_func_params)
  56. #except:
  57. # message.reject()
  58. # raise
  59. message.ack()
  60. return result, task_name, task_id
  61. def run_periodic_tasks(self):
  62. for task in PeriodicTaskMeta.objects.get_waiting_tasks():
  63. task.delay()
  64. def run(self):
  65. results = ProcessQueue(self.concurrency, logger=self.logger,
  66. done_msg="Task %(name)s[%(id)s] processed: %(return_value)s")
  67. last_empty_emit = None
  68. while True:
  69. self.run_periodic_tasks()
  70. try:
  71. result, task_name, task_id = self.fetch_next_task()
  72. except EmptyQueue:
  73. if not last_empty_emit or \
  74. time.time() > last_empty_emit + EMPTY_MSG_EMIT_EVERY:
  75. self.logger.info("Waiting for queue.")
  76. last_empty_emit = time.time()
  77. time.sleep(self.queue_wakeup_after)
  78. continue
  79. except UnknownTask, e:
  80. self.logger.info("Unknown task requeued and ignored: %s" % (
  81. e))
  82. continue
  83. #except Exception, e:
  84. # self.logger.critical("Raised %s: %s\n%s" % (
  85. # e.__class__, e, traceback.format_exc()))
  86. # continue
  87. results.add(result, task_name, task_id)