Explorar o código

Added support for message priorities, topic exchanges, custom routing key for tasks, mandatory and immediate flags.

Ask Solem %!s(int64=16) %!d(string=hai) anos
pai
achega
9359fcc1a1
Modificáronse 3 ficheiros con 95 adicións e 17 borrados
  1. 16 0
      celery/conf.py
  2. 17 11
      celery/messaging.py
  3. 62 6
      celery/task.py

+ 16 - 0
celery/conf.py

@@ -5,6 +5,7 @@ import logging
 DEFAULT_AMQP_EXCHANGE = "celery"
 DEFAULT_AMQP_ROUTING_KEY = "celery"
 DEFAULT_AMQP_CONSUMER_QUEUE = "celery"
+DEFAULT_AMQP_EXCHANGE_TYPE = "direct"
 DEFAULT_DAEMON_CONCURRENCY = 10
 DEFAULT_QUEUE_WAKEUP_AFTER = 0.3
 DEFAULT_EMPTY_MSG_EMIT_EVERY = 5
@@ -109,6 +110,21 @@ DAEMON_CONCURRENCY = getattr(settings, "CELERYD_CONCURRENCY",
 AMQP_EXCHANGE = getattr(settings, "CELERY_AMQP_EXCHANGE",
                         DEFAULT_AMQP_EXCHANGE)
 
+
+"""
+.. data:: AMQP_EXCHANGE_TYPE
+
+The type of exchange. If the exchange type is ``direct``, all messages
+receives all tasks. However, if the exchange type is ``topic``, you can 
+route e.g some tasks to one server, and others to the rest.
+See `Exchange types and the effect of bindings`_.
+
+.. _`Exchange types and the effect of bindings: http://en.wikipedia.org/wiki/Advanced_Message_Queuing_Protocol#Exchange_types_and_the_effect_of_bindings
+
+"""
+AMQP_EXCHANGE_TYPE = getattr(settings, "CELERY_AMQP_EXCHANGE_TYPE",
+                        DEFAULT_AMQP_EXCHANGE_TYPE)
+
 """
 .. data:: AMQP_ROUTING_KEY
    

+ 17 - 11
celery/messaging.py

@@ -22,34 +22,39 @@ class TaskPublisher(Publisher):
     exchange = conf.AMQP_EXCHANGE
     routing_key = conf.AMQP_ROUTING_KEY
 
-    def delay_task(self, task_name, *task_args, **task_kwargs):
+    def delay_task(self, task_name, task_args, task_kwargs, **kwargs):
         """Delay task for execution by the celery nodes."""
         return self._delay_task(task_name=task_name, args=task_args,
-                                kwargs=task_kwargs)
+                                kwargs=task_kwargs, **kwargs)
 
     def delay_task_in_set(self, task_name, taskset_id, task_args,
-            task_kwargs):
+            task_kwargs, **kwargs):
         """Delay a task which part of a task set."""
         return self._delay_task(task_name=task_name, part_of_set=taskset_id,
-                                args=task_args, kwargs=task_kwargs)
+                                args=task_args, kwargs=task_kwargs, **kwargs)
 
     def requeue_task(self, task_name, task_id, task_args, task_kwargs,
-            part_of_set=None):
+            part_of_set=None, **kwargs):
         """Requeue a failed task."""
         return self._delay_task(task_name=task_name, part_of_set=part_of_set,
                                 task_id=task_id, args=task_args,
-                                kwargs=task_kwargs)
+                                kwargs=task_kwargs, **kwargs)
 
     def _delay_task(self, task_name, task_id=None, part_of_set=None,
-            args=None, kwargs=None):
-        args = args or []
-        kwargs = kwargs or {}
+            task_args=None, task_kwargs=None, **kwargs):
+        priority = kwargs.get("priority")
+        immediate = kwargs.get("immediate")
+        mandatory = kwargs.get("mandatory")
+        routing_key = kwargs.get("routing_key")
+
+        task_args = task_args or []
+        task_kwargs = task_kwargs or {}
         task_id = task_id or str(uuid.uuid4())
         message_data = {
             "id": task_id,
             "task": task_name,
-            "args": args,
-            "kwargs": kwargs,
+            "args": task_args,
+            "kwargs": task_kwargs,
         }
         if part_of_set:
             message_data["taskset"] = part_of_set
@@ -62,3 +67,4 @@ class TaskConsumer(NoProcessConsumer):
     queue = conf.AMQP_CONSUMER_QUEUE
     exchange = conf.AMQP_EXCHANGE
     routing_key = conf.AMQP_ROUTING_KEY
+    exchange_type = conf.AMQP_EXCHANGE_TYPE

+ 62 - 6
celery/task.py

@@ -18,6 +18,47 @@ import uuid
 import pickle
 
 
+def apply_async(task, args, kwargs, routing_key=None, immediate=None,
+        mandatory=None, connect_timeout=None, priority=None):
+    """Run a task asynchronously by the celery daemon(s).
+
+    :param task: The task to run (a callable object, or a :class:`Task`
+        instance
+
+    :param args: The positional arguments to pass on to the task (a ``list``).
+
+    :param kwargs: The keyword arguments to pass on to the task (a ``dict``)
+
+
+    :keyword routing_key: The routing key used to route the task to a worker
+        server.
+
+    :keyword immediate: Request immediate delivery. Will raise an exception
+        if the task cannot be routed to a worker immediately.
+
+    :keyword mandatory: Mandatory routing. Raises an exception if there's
+        no running workers able to take on this task.
+
+    :keyword connect_timeout: The timeout in seconds, before we give up
+        on establishing a connection to the AMQP server.
+
+    :keyword priority: The task priority, a number between ``0`` and ``9``.
+
+    """
+    message_opts = {"routing_key": routing_key,
+                    "immediate": immediate,
+                    "mandatory": mandatory,
+                    "priority": priority}
+    for option_name, option_value in message_opts.items():
+        message_opts[option_name] = getattr(task, option_name, option_value)
+
+    amqp_connection = DjangoAMQPConnection(connect_timeout=connect_timeout)
+    publisher = TaskPublsher(connection=amqp_connection)
+    task_id = publisher.delay_task(task.name, args, kwargs, **message_opts)
+    amqp_conection.close()
+    return AsyncResult(task_id)
+
+
 def delay_task(task_name, *args, **kwargs):
     """Delay a task for execution by the ``celery`` daemon.
 
@@ -45,11 +86,8 @@ def delay_task(task_name, *args, **kwargs):
         raise tasks.NotRegistered(
                 "Task with name %s not registered in the task registry." % (
                     task_name))
-    amqp_connection = DjangoAMQPConnection()
-    publisher = TaskPublisher(connection=amqp_connection)
-    task_id = publisher.delay_task(task_name, *args, **kwargs)
-    amqp_connection.close()
-    return AsyncResult(task_id)
+    task = tasks[task_name]
+    return apply_async(task, args, kwargs)
 
 
 def discard_all():
@@ -140,6 +178,9 @@ class Task(object):
     max_retries = 0 # unlimited
     retry_interval = timedelta(seconds=2)
     auto_retry = False
+    routing_key = None
+    immediate = False
+    mandatory = False
 
     def __init__(self):
         if not self.name:
@@ -218,7 +259,22 @@ class Task(object):
         See :func:`delay_task`.
 
         """
-        return delay_task(cls.name, *args, **kwargs)
+        return apply_async(cls, args, kwargs)
+
+    @classmethod
+    def apply_async(cls, args, kwargs, **options):
+        """Delay this task for execution by the ``celery`` daemon(s).
+
+        :param args: positional arguments passed on to the task.
+
+        :param kwargs: keyword arguments passed on to the task.
+
+        :rtype: :class:`celery.result.AsyncResult`
+
+        See :func:`apply_async`.
+        
+        """
+        return apply_async(cls, args, kwargs, **options)
 
 
 class TaskSet(object):