task.py 2.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677
  1. from carrot.connection import DjangoAMQPConnection
  2. from crunchy.log import setup_logger
  3. from crunchy.registry import tasks
  4. from crunchy.messaging import TaskPublisher, TaskConsumer
  5. def delay_task(task_name, **kwargs):
  6. if task_name not in tasks:
  7. raise tasks.NotRegistered(
  8. "Task with name %s not registered in the task registry." % (
  9. task_name))
  10. publisher = TaskPublisher(connection=DjangoAMQPConnection)
  11. task_id = publisher.delay_task(task_name, **kwargs)
  12. publisher.close()
  13. return task_id
  14. def discard_all():
  15. consumer = TaskConsumer(connection=DjangoAMQPConnection)
  16. discarded_count = consumer.discard_all()
  17. consumer.close()
  18. return discarded_count
  19. class Task(object):
  20. name = None
  21. type = "regular"
  22. def __init__(self):
  23. if not self.name:
  24. raise NotImplementedError("Tasks must define a name attribute.")
  25. def __call__(self, **kwargs):
  26. return self.run(**kwargs)
  27. def run(self, **kwargs):
  28. raise NotImplementedError("Tasks must define a run method.")
  29. def get_logger(self, **kwargs):
  30. """Get a process-aware logger object."""
  31. return setup_logger(**kwargs)
  32. def get_publisher(self):
  33. """Get a crunchy task message publisher."""
  34. return TaskPublisher(connection=DjangoAMQPConnection)
  35. def get_consumer(self):
  36. """Get a crunchy task message consumer."""
  37. return TaskConsumer(connection=DjangoAMQPConnection)
  38. @classmethod
  39. def delay(cls, **kwargs):
  40. return delay_task(cls.name, **kwargs)
  41. class PeriodicTask(Task):
  42. run_every = 86400
  43. type = "periodic"
  44. def __init__(self):
  45. if not self.run_every:
  46. raise NotImplementedError(
  47. "Periodic tasks must have a run_every attribute")
  48. super(PeriodicTask, self).__init__()
  49. class TestTask(Task):
  50. name = "crunchy-test-task"
  51. def run(self, some_arg, **kwargs):
  52. logger = self.get_logger(**kwargs)
  53. logger.info("TestTask got some_arg=%s" % some_arg)
  54. def after(self, task_id):
  55. logger = self.get_logger(**kwargs)
  56. logger.info("TestTask with id %s was successfully executed." % task_id)
  57. tasks.register(TestTask)