worker.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104
  1. from carrot.connection import DjangoAMQPConnection
  2. from celery.messaging import TaskConsumer
  3. from celery.conf import DAEMON_CONCURRENCY, DAEMON_LOG_FILE
  4. from celery.conf import QUEUE_WAKEUP_AFTER, EMPTY_MSG_EMIT_EVERY
  5. from celery.log import setup_logger
  6. from celery.registry import tasks
  7. from celery.process import ProcessQueue
  8. from celery.models import PeriodicTaskMeta
  9. import multiprocessing
  10. import simplejson
  11. import traceback
  12. import logging
  13. import time
  14. class EmptyQueue(Exception):
  15. """The message queue is currently empty."""
  16. class UnknownTask(Exception):
  17. """Got an unknown task in the queue. The message is requeued and
  18. ignored."""
  19. class TaskDaemon(object):
  20. """Executes tasks waiting in the task queue.
  21. ``concurrency`` is the number of simultaneous processes.
  22. """
  23. loglevel = logging.ERROR
  24. concurrency = DAEMON_CONCURRENCY
  25. logfile = DAEMON_LOG_FILE
  26. queue_wakeup_after = QUEUE_WAKEUP_AFTER
  27. def __init__(self, concurrency=None, logfile=None, loglevel=None,
  28. queue_wakeup_after=None):
  29. self.loglevel = loglevel or self.loglevel
  30. self.concurrency = concurrency or self.concurrency
  31. self.logfile = logfile or self.logfile
  32. self.queue_wakeup_after = queue_wakeup_after or \
  33. self.queue_wakeup_after
  34. self.logger = setup_logger(loglevel, logfile)
  35. self.pool = multiprocessing.Pool(self.concurrency)
  36. self.task_consumer = TaskConsumer(connection=DjangoAMQPConnection)
  37. self.task_registry = tasks
  38. def fetch_next_task(self):
  39. message = self.task_consumer.fetch()
  40. if message is None: # No messages waiting.
  41. raise EmptyQueue()
  42. message_data = simplejson.loads(message.body)
  43. task_name = message_data.pop("celeryTASK")
  44. task_id = message_data.pop("celeryID")
  45. self.logger.info("Got task from broker: %s[%s]" % (
  46. task_name, task_id))
  47. if task_name not in self.task_registry:
  48. message.reject()
  49. raise UnknownTask(task_name)
  50. task_func = self.task_registry[task_name]
  51. task_func_params = {"logfile": self.logfile,
  52. "loglevel": self.loglevel}
  53. task_func_params.update(message_data)
  54. try:
  55. result = self.pool.apply_async(task_func, [], task_func_params)
  56. except Exception, error:
  57. self.logger.critical("Worker got exception %s: %s\n%s" % (
  58. error.__class__, error, traceback.format_exc()))
  59. else:
  60. message.ack()
  61. return result, task_name, task_id
  62. def run_periodic_tasks(self):
  63. for task in PeriodicTaskMeta.objects.get_waiting_tasks():
  64. task.delay()
  65. def run(self):
  66. results = ProcessQueue(self.concurrency, logger=self.logger,
  67. done_msg="Task %(name)s[%(id)s] processed: %(return_value)s")
  68. last_empty_emit = None
  69. while True:
  70. self.run_periodic_tasks()
  71. try:
  72. result, task_name, task_id = self.fetch_next_task()
  73. except EmptyQueue:
  74. if not last_empty_emit or \
  75. time.time() > last_empty_emit + EMPTY_MSG_EMIT_EVERY:
  76. self.logger.info("Waiting for queue.")
  77. last_empty_emit = time.time()
  78. time.sleep(self.queue_wakeup_after)
  79. continue
  80. except UnknownTask, e:
  81. self.logger.info("Unknown task ignored: %s" % (e))
  82. continue
  83. except Exception, e:
  84. self.logger.critical("Message queue raised %s: %s\n%s" % (
  85. e.__class__, e, traceback.format_exc()))
  86. continue
  87. results.add(result, task_name, task_id)