Forráskód Böngészése

Introducing Task classes. (after() not implemented yet, but that's going to be really awesome)
The arguments to tasks.register is now changed:

Instead of

task.register(task_name, task_func)

it's now

task.register(task_func, task_name)

or alternatively

task.register(TaskClass)

Ask Solem 16 éve
szülő
commit
462d47068d
5 módosított fájl, 69 hozzáadás és 9 törlés
  1. 1 1
      MANIFEST.in
  2. 3 3
      README.rst
  3. 1 1
      crunchy/__init__.py
  4. 12 3
      crunchy/registry.py
  5. 52 1
      crunchy/task.py

+ 1 - 1
MANIFEST.in

@@ -1,5 +1,5 @@
 include AUTHORS
-include README
+include README.rst
 include MANIFEST.in
 include LICENSE
 include Changelog

+ 3 - 3
README.rst

@@ -57,7 +57,7 @@ Defining tasks
     >>> def do_something(some_arg, **kwargs):
     ...     logger = setup_logger(**kwargs)
     ...     logger.info("Did something: %s" % some_arg)
-    >>> task.register("do_something", some_arg=do_something) 
+    >>> task.register(do_something, "do_something") 
 
 *Note* Task functions only supports keyword arguments.
 
@@ -65,7 +65,7 @@ Tell the crunch daemon to run a task
 -------------------------------------
 
     >>> from crunchy.task import delay_task
-    >>> delay_task("do_something", "foo bar baz")
+    >>> delay_task("do_something", some_arg="foo bar baz")
 
 
 Running the crunch daemon
@@ -111,7 +111,7 @@ Then you can add new tasks in your applications ``tasks.py`` module,
         clicks_for_url.save()
         logger.info("Incremented click count for %s (not at %d)" % (
                         for_url, clicks_for_url.clicks)
-    tasks.register("increment_click", increment_click)
+    tasks.register(increment_click, "increment_click")
 
 License
 =======

+ 1 - 1
crunchy/__init__.py

@@ -1,5 +1,5 @@
 """Distributed Task Queue for Django"""
-VERSION = (0, 1, 0)
+VERSION = (0, 1, 1)
 __version__ = ".".join(map(str, VERSION))
 __author__ = "Ask Solem"
 __contact__ = "askh@opera.com"

+ 12 - 3
crunchy/registry.py

@@ -22,14 +22,23 @@ class TaskRegistry(UserDict):
     def autodiscover(self):
         discovery.autodiscover()
 
-    def register(self, task_name, task_func):
+    def register(self, task, task_name=None):
+        is_class = False
+        if hasattr(task, "run"):
+            is_class = True
+            task_name = task.name
         if task_name in self.data:
             raise self.AlreadyRegistered(
                     "Task with name %s is already registered." % task_name)
-        
-        self.data[task_name] = task_func
+       
+        if is_class:
+            self.data[task_name] = task() # instantiate Task class
+        else:
+            self.data[task_name] = task
 
     def unregister(self, task_name):
+        if hasattr(task_name, "run"):
+            task_name = task_name.name
         if task_name not in self.data:
             raise self.NotRegistered(
                     "Task with name %s is not registered." % task_name)

+ 52 - 1
crunchy/task.py

@@ -1,6 +1,7 @@
 from carrot.connection import DjangoAMQPConnection
-from crunchy.messaging import TaskPublisher, TaskConsumer
+from crunchy.log import setup_logger
 from crunchy.registry import tasks
+from crunchy.messaging import TaskPublisher, TaskConsumer
 
 
 def delay_task(task_name, **kwargs):
@@ -18,3 +19,53 @@ def discard_all():
     discarded_count = consumer.discard_all()
     consumer.close()
     return discarded_count
+
+
+class Task(object):
+    name = None
+
+    def __init__(self):
+        if not self.name:
+            raise NotImplementedError("Tasks must define a name attribute.")
+
+    def __call__(self, **kwargs):
+        return self.run(**kwargs)
+
+    def run(self, **kwargs):
+        raise NotImplementedError("Tasks must define a run method.")
+
+    def after(self, task_id):
+        """This method is called when the task is sucessfully executed."""
+        pass
+
+    def get_logger(self, **kwargs):
+        """Get a process-aware logger object."""
+        return setup_logger(**kwargs)
+
+    def get_publisher(self):
+        """Get a crunchy task message publisher."""
+        return TaskPublisher(connection=DjangoAMQPConnection)
+
+    def get_consumer(self):
+        """Get a crunchy task message consumer."""
+        return TaskConsumer(connection=DjangoAMQPConnection)
+
+class TaskExecutedTask(Task):
+    name = "crunchy-task-executed"
+
+    def run(self, task_id, task_name, **kwargs):
+        logger = self.get_logger(**kwargs)
+        logger.info("Task %s[%s] executed successfully." % (task_id, task_name))
+tasks.register(TaskExecutedTask)
+        
+class TestTask(Task):
+    name = "crunchy-test-task"
+
+    def run(self, some_arg, **kwargs):
+        logger = self.get_logger(**kwargs)
+        logger.info("TestTask got some_arg=%s" % some_arg)
+
+    def after(self, task_id):
+        logger = self.get_logger(**kwargs)
+        logger.info("TestTask with id %s was successfully executed." % task_id)
+tasks.register(TestTask)