Przeglądaj źródła

Merges branches master(app) and kombufy.

Ask Solem 14 lat temu
rodzic
commit
a90a3a32b4

+ 16 - 51
celery/app/amqp.py

@@ -1,21 +1,24 @@
+import os
 
 
 from datetime import datetime, timedelta
 from datetime import datetime, timedelta
 from UserDict import UserDict
 from UserDict import UserDict
 
 
-from carrot.connection import BrokerConnection
-from carrot import messaging
 
 
 from celery import routes
 from celery import routes
 from celery import signals
 from celery import signals
 from celery.utils import gen_unique_id, mitemgetter, textindent
 from celery.utils import gen_unique_id, mitemgetter, textindent
 
 
+from kombu.connection import BrokerConnection
+from kombu import compat as messaging
+
 MSG_OPTIONS = ("mandatory", "priority", "immediate",
 MSG_OPTIONS = ("mandatory", "priority", "immediate",
-               "routing_key", "serializer", "delivery_mode")
+               "routing_key", "serializer", "delivery_mode",
+               "compression")
 QUEUE_FORMAT = """
 QUEUE_FORMAT = """
 . %(name)s -> exchange:%(exchange)s (%(exchange_type)s) \
 . %(name)s -> exchange:%(exchange)s (%(exchange_type)s) \
 binding:%(binding_key)s
 binding:%(binding_key)s
 """
 """
-BROKER_FORMAT = "%(carrot_backend)s://%(userid)s@%(host)s%(port)s%(vhost)s"
+BROKER_FORMAT = "%(transport)s://%(userid)s@%(host)s%(port)s%(vhost)s"
 
 
 get_msg_options = mitemgetter(*MSG_OPTIONS)
 get_msg_options = mitemgetter(*MSG_OPTIONS)
 extract_msg_options = lambda d: dict(zip(MSG_OPTIONS, get_msg_options(d)))
 extract_msg_options = lambda d: dict(zip(MSG_OPTIONS, get_msg_options(d)))
@@ -136,40 +139,11 @@ class TaskPublisher(messaging.Publisher):
         return task_id
         return task_id
 
 
 
 
-class ConsumerSet(messaging.ConsumerSet):
-    """ConsumerSet with an optional decode error callback.
-
-    For more information see :class:`carrot.messaging.ConsumerSet`.
-
-    .. attribute:: on_decode_error
-
-        Callback called if a message had decoding errors.
-        The callback is called with the signature::
-
-            callback(message, exception)
-
-    """
-    on_decode_error = None
-
-    def _receive_callback(self, raw_message):
-        message = self.backend.message_to_python(raw_message)
-        if self.auto_ack and not message.acknowledged:
-            message.ack()
-        try:
-            decoded = message.decode()
-        except Exception, exc:
-            if self.on_decode_error:
-                return self.on_decode_error(message, exc)
-            else:
-                raise
-        self.receive(decoded, message)
-
-
 class AMQP(object):
 class AMQP(object):
     BrokerConnection = BrokerConnection
     BrokerConnection = BrokerConnection
     Publisher = messaging.Publisher
     Publisher = messaging.Publisher
     Consumer = messaging.Consumer
     Consumer = messaging.Consumer
-    ConsumerSet = ConsumerSet
+    ConsumerSet = messaging.ConsumerSet
     _queues = None
     _queues = None
 
 
     def __init__(self, app):
     def __init__(self, app):
@@ -212,18 +186,9 @@ class AMQP(object):
 
 
         return publisher
         return publisher
 
 
-    def get_task_consumer(self, connection, queues=None, **options):
-        queues = queues or self.queues
-
-        cset = self.ConsumerSet(connection)
-        for queue_name, queue_options in queues.items():
-            queue_options = dict(queue_options)
-            queue_options["routing_key"] = queue_options.pop("binding_key",
-                                                             None)
-            consumer = self.Consumer(connection, queue=queue_name,
-                                     backend=cset.backend, **queue_options)
-            cset.consumers.append(consumer)
-        return cset
+    def get_task_consumer(self, connection, queues=None, **kwargs):
+        return self.ConsumerSet(connection, from_dict=queues or self.queues,
+                                **kwargs)
 
 
     def get_default_queue(self):
     def get_default_queue(self):
         q = self.app.conf.CELERY_DEFAULT_QUEUE
         q = self.app.conf.CELERY_DEFAULT_QUEUE
@@ -232,10 +197,10 @@ class AMQP(object):
     def get_broker_info(self, broker_connection=None):
     def get_broker_info(self, broker_connection=None):
         if broker_connection is None:
         if broker_connection is None:
             broker_connection = self.app.broker_connection()
             broker_connection = self.app.broker_connection()
-        carrot_backend = broker_connection.backend_cls
-        if carrot_backend and not isinstance(carrot_backend, str):
-            carrot_backend = carrot_backend.__name__
-        carrot_backend = carrot_backend or "amqp"
+        transport = broker_connection.transport_cls
+        if transport and not isinstance(transport, basestring):
+            transport = transport.__name__
+        transport = transport or "amqp"
 
 
         port = broker_connection.port or \
         port = broker_connection.port or \
                     broker_connection.get_backend_cls().default_port
                     broker_connection.get_backend_cls().default_port
@@ -245,7 +210,7 @@ class AMQP(object):
         if not vhost.startswith("/"):
         if not vhost.startswith("/"):
             vhost = "/" + vhost
             vhost = "/" + vhost
 
 
-        return {"carrot_backend": carrot_backend,
+        return {"transport": transport,
                 "userid": broker_connection.userid,
                 "userid": broker_connection.userid,
                 "host": broker_connection.hostname,
                 "host": broker_connection.hostname,
                 "port": port,
                 "port": port,

+ 2 - 2
celery/app/base.py

@@ -187,7 +187,7 @@ class BaseApp(object):
 
 
     def broker_connection(self, hostname=None, userid=None,
     def broker_connection(self, hostname=None, userid=None,
             password=None, virtual_host=None, port=None, ssl=None,
             password=None, virtual_host=None, port=None, ssl=None,
-            insist=None, connect_timeout=None, backend_cls=None):
+            insist=None, connect_timeout=None, transport=None, **kwargs):
         """Establish a connection to the message broker.
         """Establish a connection to the message broker.
 
 
         :keyword hostname: defaults to the ``BROKER_HOST`` setting.
         :keyword hostname: defaults to the ``BROKER_HOST`` setting.
@@ -210,7 +210,7 @@ class BaseApp(object):
                     password or self.conf.BROKER_PASSWORD,
                     password or self.conf.BROKER_PASSWORD,
                     virtual_host or self.conf.BROKER_VHOST,
                     virtual_host or self.conf.BROKER_VHOST,
                     port or self.conf.BROKER_PORT,
                     port or self.conf.BROKER_PORT,
-                    backend_cls=backend_cls or self.conf.BROKER_BACKEND,
+                    transport=transport or self.conf.BROKER_BACKEND,
                     insist=self.either("BROKER_INSIST", insist),
                     insist=self.either("BROKER_INSIST", insist),
                     ssl=self.either("BROKER_USE_SSL", ssl),
                     ssl=self.either("BROKER_USE_SSL", ssl),
                     connect_timeout=self.either(
                     connect_timeout=self.either(

+ 1 - 1
celery/task/base.py

@@ -207,7 +207,7 @@ class BaseTask(object):
     .. attribute:: serializer
     .. attribute:: serializer
 
 
         The name of a serializer that has been registered with
         The name of a serializer that has been registered with
-        :mod:`carrot.serialization.registry`. Example: ``"json"``.
+        :mod:`kombu.serialization.registry`. Example: ``"json"``.
 
 
     .. attribute:: backend
     .. attribute:: backend
 
 

+ 3 - 1
celery/tests/test_task.py

@@ -413,10 +413,12 @@ class TestTaskSet(unittest.TestCase):
         self.assertEqual(ts.total, 9)
         self.assertEqual(ts.total, 9)
 
 
         consumer = IncrementCounterTask().get_consumer()
         consumer = IncrementCounterTask().get_consumer()
-        consumer.discard_all()
+        consumer.purge()
+        consumer.close()
         taskset_res = ts.apply_async()
         taskset_res = ts.apply_async()
         subtasks = taskset_res.subtasks
         subtasks = taskset_res.subtasks
         taskset_id = taskset_res.taskset_id
         taskset_id = taskset_res.taskset_id
+        consumer = IncrementCountertask().get_consumer()
         for subtask in subtasks:
         for subtask in subtasks:
             m = consumer.fetch().payload
             m = consumer.fetch().payload
             self.assertDictContainsSubset({"taskset": taskset_id,
             self.assertDictContainsSubset({"taskset": taskset_id,

+ 5 - 3
celery/tests/test_worker.py

@@ -4,8 +4,8 @@ import unittest2 as unittest
 from datetime import datetime, timedelta
 from datetime import datetime, timedelta
 from Queue import Empty
 from Queue import Empty
 
 
-from carrot.backends.base import BaseMessage
-from carrot.connection import BrokerConnection
+from kombu.backends.base import BaseMessage
+from kombu.connection import BrokerConnection
 from celery.utils.timer2 import Timer
 from celery.utils.timer2 import Timer
 
 
 from celery.app import app_or_default
 from celery.app import app_or_default
@@ -93,7 +93,7 @@ class MockLogger(object):
 class MockBackend(object):
 class MockBackend(object):
     _acked = False
     _acked = False
 
 
-    def ack(self, delivery_tag):
+    def basic_ack(self, delivery_tag):
         self._acked = True
         self._acked = True
 
 
 
 
@@ -470,6 +470,7 @@ class test_CarrotListener(unittest.TestCase):
         l = _Listener(self.ready_queue, self.eta_schedule, self.logger,
         l = _Listener(self.ready_queue, self.eta_schedule, self.logger,
                       send_events=False, init_callback=init_callback)
                       send_events=False, init_callback=init_callback)
         l.qos = _QoS()
         l.qos = _QoS()
+        l.connection = BrokerConnection()
 
 
         def raises_KeyError(limit=None):
         def raises_KeyError(limit=None):
             yield True
             yield True
@@ -485,6 +486,7 @@ class test_CarrotListener(unittest.TestCase):
         l = _Listener(self.ready_queue, self.eta_schedule, self.logger,
         l = _Listener(self.ready_queue, self.eta_schedule, self.logger,
                       send_events=False, init_callback=init_callback)
                       send_events=False, init_callback=init_callback)
         l.qos = _QoS()
         l.qos = _QoS()
+        l.connection = BrokerConnection()
 
 
         def raises_socket_error(limit=None):
         def raises_socket_error(limit=None):
             yield True
             yield True

+ 3 - 3
celery/tests/test_worker_job.py

@@ -6,7 +6,7 @@ import unittest2 as unittest
 
 
 from StringIO import StringIO
 from StringIO import StringIO
 
 
-from carrot.backends.base import BaseMessage
+from kombu.backends.base import BaseMessage
 
 
 from celery import states
 from celery import states
 from celery.app import app_or_default
 from celery.app import app_or_default
@@ -338,7 +338,7 @@ class test_TaskRequest(unittest.TestCase):
     def test_from_message(self):
     def test_from_message(self):
         body = {"task": mytask.name, "id": gen_unique_id(),
         body = {"task": mytask.name, "id": gen_unique_id(),
                 "args": [2], "kwargs": {u"æØåveéðƒeæ": "bar"}}
                 "args": [2], "kwargs": {u"æØåveéðƒeæ": "bar"}}
-        m = BaseMessage(body=simplejson.dumps(body), backend="foo",
+        m = BaseMessage(None, body=simplejson.dumps(body), backend="foo",
                         content_type="application/json",
                         content_type="application/json",
                         content_encoding="utf-8")
                         content_encoding="utf-8")
         tw = TaskRequest.from_message(m, m.decode())
         tw = TaskRequest.from_message(m, m.decode())
@@ -354,7 +354,7 @@ class test_TaskRequest(unittest.TestCase):
     def test_from_message_nonexistant_task(self):
     def test_from_message_nonexistant_task(self):
         body = {"task": "cu.mytask.doesnotexist", "id": gen_unique_id(),
         body = {"task": "cu.mytask.doesnotexist", "id": gen_unique_id(),
                 "args": [2], "kwargs": {u"æØåveéðƒeæ": "bar"}}
                 "args": [2], "kwargs": {u"æØåveéðƒeæ": "bar"}}
-        m = BaseMessage(body=simplejson.dumps(body), backend="foo",
+        m = BaseMessage(None, body=simplejson.dumps(body), backend="foo",
                         content_type="application/json",
                         content_type="application/json",
                         content_encoding="utf-8")
                         content_encoding="utf-8")
         self.assertRaises(NotRegistered, TaskRequest.from_message,
         self.assertRaises(NotRegistered, TaskRequest.from_message,

+ 1 - 53
celery/utils/__init__.py

@@ -15,7 +15,7 @@ from uuid import UUID, uuid4, _uuid_generate_random
 from inspect import getargspec
 from inspect import getargspec
 from itertools import islice
 from itertools import islice
 
 
-from carrot.utils import rpartition
+from kombu.utils import rpartition
 
 
 from celery.utils.functional import partial
 from celery.utils.functional import partial
 
 
@@ -221,58 +221,6 @@ def get_full_cls_name(cls):
                      cls.__name__])
                      cls.__name__])
 
 
 
 
-def repeatlast(it):
-    """Iterate over all elements in the iterator, and when its exhausted
-    yield the last value infinitely."""
-    for item in it:
-        yield item
-    while 1:            # pragma: no cover
-        yield item
-
-
-def retry_over_time(fun, catch, args=[], kwargs={}, errback=noop,
-        max_retries=None, interval_start=2, interval_step=2, interval_max=30):
-    """Retry the function over and over until max retries is exceeded.
-
-    For each retry we sleep a for a while before we try again, this interval
-    is increased for every retry until the max seconds is reached.
-
-    :param fun: The function to try
-    :param catch: Exceptions to catch, can be either tuple or a single
-        exception class.
-    :keyword args: Positional arguments passed on to the function.
-    :keyword kwargs: Keyword arguments passed on to the function.
-    :keyword errback: Callback for when an exception in ``catch`` is raised.
-        The callback must take two arguments: ``exc`` and ``interval``, where
-        ``exc`` is the exception instance, and ``interval`` is the time in
-        seconds to sleep next..
-    :keyword max_retries: Maximum number of retries before we give up.
-        If this is not set, we will retry forever.
-    :keyword interval_start: How long (in seconds) we start sleeping between
-        retries.
-    :keyword interval_step: By how much the interval is increased for each
-        retry.
-    :keyword interval_max: Maximum number of seconds to sleep between retries.
-
-    """
-    retries = 0
-    interval_range = xrange(interval_start,
-                            interval_max + interval_start,
-                            interval_step)
-
-    for interval in repeatlast(interval_range):
-        try:
-            retval = fun(*args, **kwargs)
-        except catch, exc:
-            if max_retries and retries > max_retries:
-                raise
-            errback(exc, interval)
-            retries += 1
-            time.sleep(interval)
-        else:
-            return retval
-
-
 def fun_takes_kwargs(fun, kwlist=[]):
 def fun_takes_kwargs(fun, kwlist=[]):
     """With a function, and a list of keyword arguments, returns arguments
     """With a function, and a list of keyword arguments, returns arguments
     in the list which the function takes.
     in the list which the function takes.

+ 1 - 1
celery/utils/timeutils.py

@@ -3,7 +3,7 @@ import math
 from datetime import datetime, timedelta
 from datetime import datetime, timedelta
 from dateutil.parser import parse as parse_iso8601
 from dateutil.parser import parse as parse_iso8601
 
 
-from carrot.utils import partition
+from kombu.utils import partition
 
 
 DAYNAMES = "sun", "mon", "tue", "wed", "thu", "fri", "sat"
 DAYNAMES = "sun", "mon", "tue", "wed", "thu", "fri", "sat"
 WEEKDAYS = dict((name, dow) for name, dow in zip(DAYNAMES, range(7)))
 WEEKDAYS = dict((name, dow) for name, dow in zip(DAYNAMES, range(7)))

+ 16 - 38
celery/worker/listener.py

@@ -22,11 +22,6 @@ up and running.
 
 
   Both the task consumer and the broadcast consumer uses the same
   Both the task consumer and the broadcast consumer uses the same
   callback: :meth:`~CarrotListener.receive_message`.
   callback: :meth:`~CarrotListener.receive_message`.
-  The reason is that some carrot backends doesn't support consuming
-  from several channels simultaneously, so we use a little nasty trick
-  (:meth:`~CarrotListener._detect_wait_method`) to select the best
-  possible channel distribution depending on the functionality supported
-  by the carrot backend.
 
 
 * So for each message received the :meth:`~CarrotListener.receive_message`
 * So for each message received the :meth:`~CarrotListener.receive_message`
   method is called, this checks the payload of the message for either
   method is called, this checks the payload of the message for either
@@ -78,14 +73,12 @@ from __future__ import generators
 import socket
 import socket
 import warnings
 import warnings
 
 
-from carrot.connection import AMQPConnectionException
-
 from celery.app import app_or_default
 from celery.app import app_or_default
 from celery.datastructures import SharedCounter
 from celery.datastructures import SharedCounter
 from celery.events import EventDispatcher
 from celery.events import EventDispatcher
 from celery.exceptions import NotRegistered
 from celery.exceptions import NotRegistered
 from celery.pidbox import BroadcastConsumer
 from celery.pidbox import BroadcastConsumer
-from celery.utils import noop, retry_over_time
+from celery.utils import noop
 from celery.utils.timer2 import to_timestamp
 from celery.utils.timer2 import to_timestamp
 from celery.worker.job import TaskRequest, InvalidTaskError
 from celery.worker.job import TaskRequest, InvalidTaskError
 from celery.worker.control import ControlDispatch
 from celery.worker.control import ControlDispatch
@@ -100,7 +93,7 @@ class QoS(object):
 
 
     For thread-safe increment/decrement of a channels prefetch count value.
     For thread-safe increment/decrement of a channels prefetch count value.
 
 
-    :param consumer: A :class:`carrot.messaging.Consumer` instance.
+    :param consumer: A :class:`kombu.messaging.Consumer` instance.
     :param initial_value: Initial prefetch count value.
     :param initial_value: Initial prefetch count value.
     :param logger: Logger used to log debug messages.
     :param logger: Logger used to log debug messages.
 
 
@@ -221,6 +214,8 @@ class CarrotListener(object):
                                                 logger=logger,
                                                 logger=logger,
                                                 hostname=self.hostname,
                                                 hostname=self.hostname,
                                                 listener=self)
                                                 listener=self)
+        self.connection_errors = \
+                self.app.broker_connection().connection_errors
         self.queues = queues
         self.queues = queues
 
 
     def start(self):
     def start(self):
@@ -237,14 +232,14 @@ class CarrotListener(object):
             self.reset_connection()
             self.reset_connection()
             try:
             try:
                 self.consume_messages()
                 self.consume_messages()
-            except (socket.error, AMQPConnectionException, IOError):
+            except self.connection_errors:
                 self.logger.error("CarrotListener: Connection to broker lost."
                 self.logger.error("CarrotListener: Connection to broker lost."
                                 + " Trying to re-establish connection...")
                                 + " Trying to re-establish connection...")
 
 
     def consume_messages(self):
     def consume_messages(self):
         """Consume messages forever (or until an exception is raised)."""
         """Consume messages forever (or until an exception is raised)."""
         self.logger.debug("CarrotListener: Starting message consumer...")
         self.logger.debug("CarrotListener: Starting message consumer...")
-        wait_for_message = self._detect_wait_method()(limit=None).next
+        wait_for_message = self._mainloop().next
         self.logger.debug("CarrotListener: Ready to accept tasks!")
         self.logger.debug("CarrotListener: Ready to accept tasks!")
 
 
         while 1:
         while 1:
@@ -420,45 +415,28 @@ class CarrotListener(object):
         self.heart = Heart(self.event_dispatcher)
         self.heart = Heart(self.event_dispatcher)
         self.heart.start()
         self.heart.start()
 
 
-    def _mainloop(self, **kwargs):
+    def _mainloop(self):
+        elf.broadcast_consumer.register_callback(self.receive_message)
+        self.task_consumer.consume()
+        self.broadcast_consumer.consume()
         while 1:
         while 1:
             yield self.connection.drain_events()
             yield self.connection.drain_events()
 
 
-    def _detect_wait_method(self):
-        if hasattr(self.connection.connection, "drain_events"):
-            self.broadcast_consumer.register_callback(self.receive_message)
-            self.task_consumer.iterconsume()
-            self.broadcast_consumer.iterconsume()
-            return self._mainloop
-        else:
-            self.task_consumer.add_consumer(self.broadcast_consumer)
-            return self.task_consumer.iterconsume
-
     def _open_connection(self):
     def _open_connection(self):
-        """Retries connecting to the AMQP broker over time.
-
-        See :func:`celery.utils.retry_over_time`.
-
-        """
+        """Open connection.  May retry opening the connection if configuration
+        allows that."""
 
 
         def _connection_error_handler(exc, interval):
         def _connection_error_handler(exc, interval):
             """Callback handler for connection errors."""
             """Callback handler for connection errors."""
             self.logger.error("CarrotListener: Connection Error: %s. " % exc
             self.logger.error("CarrotListener: Connection Error: %s. " % exc
                      + "Trying again in %d seconds..." % interval)
                      + "Trying again in %d seconds..." % interval)
 
 
-        def _establish_connection():
-            """Establish a connection to the broker."""
-            conn = self.app.broker_connection()
-            conn.connect()                      # evaluate connection
-            return conn
-
+        conn = self.app.broker_connection()
         if not self.app.conf.BROKER_CONNECTION_RETRY:
         if not self.app.conf.BROKER_CONNECTION_RETRY:
-            return _establish_connection()
+            return conn.connect()
 
 
-        conn = retry_over_time(_establish_connection, (socket.error, IOError),
-                    errback=_connection_error_handler,
-                    max_retries=self.app.conf.BROKER_CONNECTION_MAX_RETRIES)
-        return conn
+        return conn.ensure_connection(_connection_error_handler,
+                    self.app.conf.BROKER_CONNECTION_MAX_RETRIES)
 
 
     def stop(self):
     def stop(self):
         """Stop consuming.
         """Stop consuming.

+ 1 - 35
docs/tutorials/otherqueues.rst

@@ -4,13 +4,6 @@
  Using Celery with Redis/Database as the messaging queue.
  Using Celery with Redis/Database as the messaging queue.
 ==========================================================
 ==========================================================
 
 
-There's a plug-in for celery that enables the use of Redis or an SQL database
-as the messaging queue. This is not part of celery itself, but exists as
-an extension to `carrot`_.
-
-.. _`carrot`: http://pypi.python.org/pypi/carrot
-.. _`ghettoq`: http://pypi.python.org/pypi/ghettoq
-
 .. contents::
 .. contents::
     :local:
     :local:
 
 
@@ -19,10 +12,6 @@ an extension to `carrot`_.
 Installation
 Installation
 ============
 ============
 
 
-You need to install the `ghettoq`_ library::
-
-    $ pip install -U ghettoq
-
 .. _otherqueues-redis:
 .. _otherqueues-redis:
 
 
 Redis
 Redis
@@ -40,7 +29,7 @@ Configuration
 Configuration is easy, set the carrot backend, and configure the location of
 Configuration is easy, set the carrot backend, and configure the location of
 your Redis database::
 your Redis database::
 
 
-    CARROT_BACKEND = "ghettoq.taproot.Redis"
+    BROKER_BACKEND = "redis"
 
 
     BROKER_HOST = "localhost"  # Maps to redis host.
     BROKER_HOST = "localhost"  # Maps to redis host.
     BROKER_PORT = 6379         # Maps to redis port.
     BROKER_PORT = 6379         # Maps to redis port.
@@ -86,26 +75,3 @@ configuration values.
     When using Django::
     When using Django::
 
 
         $ python manage.py syncdb
         $ python manage.py syncdb
-
-.. _otherqueues-notes:
-
-Important notes
----------------
-
-These message queues does not have the concept of exchanges and routing keys,
-there's only the queue entity. As a result of this you need to set the
-name of the exchange to be the same as the queue::
-
-    CELERY_DEFAULT_EXCHANGE = "tasks"
-
-or in a custom queue-mapping:
-
-.. code-block:: python
-
-    CELERY_QUEUES = {
-        "tasks": {"exchange": "tasks"},
-        "feeds": {"exchange": "feeds"},
-    }
-
-This isn't a problem if you use the default queue setting, as the default is
-already using the same name for queue/exchange.