1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071 |
- from carrot.connection import DjangoAMQPConnection
- from crunchy.log import setup_logger
- from crunchy.registry import tasks
- from crunchy.messaging import TaskPublisher, TaskConsumer
- def delay_task(task_name, **kwargs):
- if task_name not in tasks:
- raise tasks.NotRegistered(
- "Task with name %s not registered in the task registry." % (
- task_name))
- publisher = TaskPublisher(connection=DjangoAMQPConnection)
- task_id = publisher.delay_task(task_name, **kwargs)
- publisher.close()
- return task_id
- def discard_all():
- consumer = TaskConsumer(connection=DjangoAMQPConnection)
- discarded_count = consumer.discard_all()
- consumer.close()
- return discarded_count
- class Task(object):
- name = None
- def __init__(self):
- if not self.name:
- raise NotImplementedError("Tasks must define a name attribute.")
- def __call__(self, **kwargs):
- return self.run(**kwargs)
- def run(self, **kwargs):
- raise NotImplementedError("Tasks must define a run method.")
- def after(self, task_id):
- """This method is called when the task is sucessfully executed."""
- pass
- def get_logger(self, **kwargs):
- """Get a process-aware logger object."""
- return setup_logger(**kwargs)
- def get_publisher(self):
- """Get a crunchy task message publisher."""
- return TaskPublisher(connection=DjangoAMQPConnection)
- def get_consumer(self):
- """Get a crunchy task message consumer."""
- return TaskConsumer(connection=DjangoAMQPConnection)
- class TaskExecutedTask(Task):
- name = "crunchy-task-executed"
- def run(self, task_id, task_name, **kwargs):
- logger = self.get_logger(**kwargs)
- logger.info("Task %s[%s] executed successfully." % (task_id, task_name))
- tasks.register(TaskExecutedTask)
-
- class TestTask(Task):
- name = "crunchy-test-task"
- def run(self, some_arg, **kwargs):
- logger = self.get_logger(**kwargs)
- logger.info("TestTask got some_arg=%s" % some_arg)
- def after(self, task_id):
- logger = self.get_logger(**kwargs)
- logger.info("TestTask with id %s was successfully executed." % task_id)
- tasks.register(TestTask)
|