task.py 2.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071
  1. from carrot.connection import DjangoAMQPConnection
  2. from crunchy.log import setup_logger
  3. from crunchy.registry import tasks
  4. from crunchy.messaging import TaskPublisher, TaskConsumer
  5. def delay_task(task_name, **kwargs):
  6. if task_name not in tasks:
  7. raise tasks.NotRegistered(
  8. "Task with name %s not registered in the task registry." % (
  9. task_name))
  10. publisher = TaskPublisher(connection=DjangoAMQPConnection)
  11. task_id = publisher.delay_task(task_name, **kwargs)
  12. publisher.close()
  13. return task_id
  14. def discard_all():
  15. consumer = TaskConsumer(connection=DjangoAMQPConnection)
  16. discarded_count = consumer.discard_all()
  17. consumer.close()
  18. return discarded_count
  19. class Task(object):
  20. name = None
  21. def __init__(self):
  22. if not self.name:
  23. raise NotImplementedError("Tasks must define a name attribute.")
  24. def __call__(self, **kwargs):
  25. return self.run(**kwargs)
  26. def run(self, **kwargs):
  27. raise NotImplementedError("Tasks must define a run method.")
  28. def after(self, task_id):
  29. """This method is called when the task is sucessfully executed."""
  30. pass
  31. def get_logger(self, **kwargs):
  32. """Get a process-aware logger object."""
  33. return setup_logger(**kwargs)
  34. def get_publisher(self):
  35. """Get a crunchy task message publisher."""
  36. return TaskPublisher(connection=DjangoAMQPConnection)
  37. def get_consumer(self):
  38. """Get a crunchy task message consumer."""
  39. return TaskConsumer(connection=DjangoAMQPConnection)
  40. class TaskExecutedTask(Task):
  41. name = "crunchy-task-executed"
  42. def run(self, task_id, task_name, **kwargs):
  43. logger = self.get_logger(**kwargs)
  44. logger.info("Task %s[%s] executed successfully." % (task_id, task_name))
  45. tasks.register(TaskExecutedTask)
  46. class TestTask(Task):
  47. name = "crunchy-test-task"
  48. def run(self, some_arg, **kwargs):
  49. logger = self.get_logger(**kwargs)
  50. logger.info("TestTask got some_arg=%s" % some_arg)
  51. def after(self, task_id):
  52. logger = self.get_logger(**kwargs)
  53. logger.info("TestTask with id %s was successfully executed." % task_id)
  54. tasks.register(TestTask)