worker.py 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
  1. from carrot.connection import DjangoAMQPConnection
  2. from crunchy.messaging import TaskConsumer
  3. from crunchy.conf import DAEMON_CONCURRENCY, DAEMON_LOG_FILE
  4. from crunchy.conf import QUEUE_WAKEUP_AFTER, EMPTY_MSG_EMIT_EVERY
  5. from crunchy.log import setup_logger
  6. from crunchy.registry import tasks
  7. from crunchy.process import ProcessQueue
  8. import multiprocessing
  9. import simplejson
  10. import traceback
  11. import logging
  12. import time
  13. class EmptyQueue(Exception):
  14. """The message queue is currently empty."""
  15. class UnknownTask(Exception):
  16. """Got an unknown task in the queue. The message is requeued and
  17. ignored."""
  18. class TaskDaemon(object):
  19. """Refreshes feed_urls in the queue using a process pool.
  20. ``concurrency`` is the number of simultaneous processes.
  21. """
  22. loglevel = logging.ERROR
  23. concurrency = DAEMON_CONCURRENCY
  24. logfile = DAEMON_LOG_FILE
  25. queue_wakeup_after = QUEUE_WAKEUP_AFTER
  26. def __init__(self, concurrency=None, logfile=None, loglevel=None,
  27. queue_wakeup_after=None):
  28. self.loglevel = loglevel or self.loglevel
  29. self.concurrency = concurrency or self.concurrency
  30. self.logfile = logfile or self.logfile
  31. self.queue_wakeup_after = queue_wakeup_after or \
  32. self.queue_wakeup_after
  33. self.logger = setup_logger(loglevel, logfile)
  34. self.pool = multiprocessing.Pool(self.concurrency)
  35. self.task_consumer = TaskConsumer(connection=DjangoAMQPConnection)
  36. self.task_registry = tasks
  37. def fetch_next_task(self):
  38. message = self.task_consumer.fetch()
  39. if message is None: # No messages waiting.
  40. raise EmptyQueue()
  41. message_data = simplejson.loads(message.body)
  42. task_name = message_data.pop("crunchTASK")
  43. task_id = message_data.pop("crunchID")
  44. self.logger.info("Got task from broker: %s[%s]" % (
  45. task_name, task_id))
  46. if task_name not in self.task_registry:
  47. message.reject()
  48. raise UnknownTask(task_name)
  49. task_func = self.task_registry[task_name]
  50. task_func_params = {"loglevel": self.loglevel,
  51. "logfile": self.logfile}
  52. task_func_params.update(message_data)
  53. #try:
  54. result = self.pool.apply_async(task_func, [], task_func_params)
  55. #except:
  56. # message.reject()
  57. # raise
  58. message.ack()
  59. return result, task_name, task_id
  60. def run(self):
  61. results = ProcessQueue(self.concurrency, logger=self.logger,
  62. done_msg="Task %(name)s[%(id)s] processed: %(return_value)s")
  63. last_empty_emit = None
  64. while True:
  65. try:
  66. result, task_name, task_id = self.fetch_next_task()
  67. except EmptyQueue:
  68. if not last_empty_emit or \
  69. time.time() > last_empty_emit + EMPTY_MSG_EMIT_EVERY:
  70. self.logger.info("Waiting for queue.")
  71. last_empty_emit = time.time()
  72. time.sleep(self.queue_wakeup_after)
  73. continue
  74. except UnknownTask, e:
  75. self.logger.info("Unknown task %s requeued and ignored." % (
  76. task_name))
  77. continue
  78. #except Exception, e:
  79. # self.logger.critical("Raised %s: %s\n%s" % (
  80. # e.__class__, e, traceback.format_exc()))
  81. # continue
  82. results.add(result, task_name, task_id)