Browse Source

Tests passing

Ask Solem 12 years ago
parent
commit
5338f86341

+ 26 - 25
celery/app/amqp.py

@@ -180,15 +180,17 @@ class TaskProducer(Producer):
             expires=None, exchange=None, exchange_type=None,
             event_dispatcher=None, retry=None, retry_policy=None,
             queue=None, now=None, retries=0, chord=None, callbacks=None,
-            errbacks=None, mandatory=None, priority=None, immediate=None,
-            routing_key=None, serializer=None, delivery_mode=None,
-            compression=None, declare=None, **kwargs):
+            errbacks=None, routing_key=None, serializer=None,
+            delivery_mode=None, compression=None, declare=None, **kwargs):
         """Send task message."""
 
         declare = declare or []
+        qname = queue
         if queue is not None:
             if isinstance(queue, basestring):
-                queue = self.queues[queue]
+                qname, queue = queue, self.queues[queue]
+            else:
+                qname = queue.name
             exchange = exchange or queue.exchange.name
             routing_key = routing_key or queue.routing_key
 
@@ -212,28 +214,27 @@ class TaskProducer(Producer):
         eta = eta and eta.isoformat()
         expires = expires and expires.isoformat()
 
-        body = {'task': task_name,
-                'id': task_id,
-                'args': task_args,
-                'kwargs': task_kwargs,
-                'retries': retries or 0,
-                'eta': eta,
-                'expires': expires,
-                'utc': self.utc,
-                'callbacks': callbacks,
-                'errbacks': errbacks}
-        group_id = group_id or taskset_id
-        if group_id:
-            body['taskset'] = group_id
-        if chord:
-            body['chord'] = chord
-
-        self.publish(body, exchange=exchange, mandatory=mandatory,
-             immediate=immediate, routing_key=routing_key,
+        body = {
+            'task': task_name,
+            'id': task_id,
+            'args': task_args,
+            'kwargs': task_kwargs,
+            'retries': retries or 0,
+            'eta': eta,
+            'expires': expires,
+            'utc': self.utc,
+            'callbacks': callbacks,
+            'errbacks': errbacks,
+            'taskset': group_id or taskset_id,
+            'chord': chord,
+        }
+
+        self.publish(body,
+             exchange=exchange, routing_key=routing_key,
              serializer=serializer or self.serializer,
              compression=compression or self.compression,
-             retry=retry, retry_policy=_rp, delivery_mode=delivery_mode,
-             priority=priority, declare=declare,
+             retry=retry, retry_policy=_rp,
+             delivery_mode=delivery_mode, declare=declare,
              **kwargs)
 
         signals.task_sent.send(sender=task_name, **body)
@@ -248,7 +249,7 @@ class TaskProducer(Producer):
                                                retries=retries,
                                                eta=eta,
                                                expires=expires,
-                                               queue=queue,
+                                               queue=qname,
                                                exchange=exname,
                                                routing_key=routing_key)
         return task_id

+ 1 - 1
celery/app/builtins.py

@@ -81,7 +81,7 @@ def add_unlock_chord_task(app):
         if result.ready():
             subtask(callback).delay(j(propagate=propagate))
         else:
-            raise unlock_chord.retry(countdown=interval, max_retries=max_retries)
+            return unlock_chord.retry(countdown=interval, max_retries=max_retries)
     return unlock_chord
 
 

+ 5 - 2
celery/bin/camqadm.py

@@ -14,7 +14,10 @@ import pprint
 
 from itertools import count
 
-from amqplib import client_0_8 as amqp
+try:
+    import amqp
+except ImportError:
+    from amqplib import client_0_8 as amqp
 
 from celery.app import app_or_default
 from celery.utils.functional import padlist
@@ -216,7 +219,7 @@ class AMQShell(cmd.Cmd):
 
             >>> get_amqp_api_command('queue.delete', ['pobox', 'yes', 'no'])
             (<bound method Channel.queue_delete of
-             <amqplib.client_0_8.channel.Channel object at 0x...>>,
+             <amqp.channel.Channel object at 0x...>>,
              ('testfoo', True, False))
 
         """

+ 2 - 2
celery/task/base.py

@@ -43,8 +43,8 @@ class Task(BaseTask):
     exchange = None
     exchange_type = None
     delivery_mode = None
-    mandatory = False
-    immediate = False
+    mandatory = False  # XXX deprecated
+    immediate = False  # XXX deprecated
     priority = None
     type = 'regular'
     error_whitelist = ()

+ 3 - 3
celery/tests/app/test_app.py

@@ -384,13 +384,13 @@ class test_App(Case):
                                        'userid': 'guest',
                                        'password': 'guest',
                                        'virtual_host': '/'},
-                            self.app.connection('amqplib://').info())
+                            self.app.connection('amqp://').info())
         self.app.conf.BROKER_PORT = 1978
         self.app.conf.BROKER_VHOST = 'foo'
         self.assertDictContainsSubset({'port': 1978,
                                        'virtual_host': 'foo'},
-                    self.app.connection('amqplib://:1978/foo').info())
-        conn = self.app.connection('amqplib:////value')
+                    self.app.connection('amqp://:1978/foo').info())
+        conn = self.app.connection('amqp:////value')
         self.assertDictContainsSubset({'virtual_host': '/value'},
                                       conn.info())
 

+ 12 - 2
celery/tests/backends/test_amqp.py

@@ -1,6 +1,7 @@
 from __future__ import absolute_import
 from __future__ import with_statement
 
+import pickle
 import socket
 
 from datetime import timedelta
@@ -14,7 +15,7 @@ from celery.datastructures import ExceptionInfo
 from celery.exceptions import TimeoutError
 from celery.utils import uuid
 
-from celery.tests.utils import AppCase, sleepdeprived
+from celery.tests.utils import AppCase, sleepdeprived, Mock
 
 
 class SomeClass(object):
@@ -140,11 +141,14 @@ class test_AMQPBackend(AppCase):
             def __init__(self, **merge):
                 self.payload = dict({'status': states.STARTED,
                                      'result': None}, **merge)
+                self.body = pickle.dumps(self.payload)
+                self.content_type = 'application/x-python-serialize'
+                self.content_encoding = 'binary'
 
         class MockBinding(object):
 
             def __init__(self, *args, **kwargs):
-                pass
+                self.channel = Mock()
 
             def __call__(self, *args, **kwargs):
                 return self
@@ -158,10 +162,14 @@ class test_AMQPBackend(AppCase):
                 except Empty:
                     pass
 
+            def is_bound(self):
+                return True
+
         class MockBackend(AMQPBackend):
             Queue = MockBinding
 
         backend = MockBackend()
+        backend._republish = Mock()
 
         # FFWD's to the latest state.
         results.put(Message(status=states.RECEIVED, seq=1))
@@ -178,6 +186,8 @@ class test_AMQPBackend(AppCase):
         backend.get_task_meta(tid)
         self.assertIn(tid, backend._cache, 'Caches last known state')
 
+        self.assertTrue(backend._republish.called)
+
         # Returns cache if no new states.
         results.queue.clear()
         assert not results.qsize()

+ 1 - 1
celery/tests/functional/case.py

@@ -109,7 +109,7 @@ class WorkerCase(Case):
 
     @classmethod
     def setUpClass(cls):
-        logging.getLogger('amqplib').setLevel(logging.ERROR)
+        logging.getLogger('amqp').setLevel(logging.ERROR)
         cls.worker = Worker.managed(cls.hostname, caller=cls)
 
     @classmethod

+ 2 - 2
celery/worker/consumer.py

@@ -52,8 +52,8 @@ up and running.
 * When a task with an ETA is received the QoS prefetch count is also
   incremented, so another message can be reserved. When the ETA is met
   the prefetch count is decremented again, though this cannot happen
-  immediately because amqplib doesn't support doing broker requests
-  across threads. Instead the current prefetch count is kept as a
+  immediately because most broker clients don't support doing broker
+  requests across threads.  Instead the current prefetch count is kept as a
   shared counter, so as soon as  :meth:`~Consumer.consume_messages`
   detects that the value has changed it will send out the actual
   QoS event to the broker.

+ 4 - 3
docs/configuration.rst

@@ -765,9 +765,10 @@ Only the scheme part (``transport://``) is required, the rest
 is optional, and defaults to the specific transports default values.
 
 The transport part is the broker implementation to use, and the
-default is ``amqp``, but there are many other choices including
-``librabbitmq``, ``amqplib``, ``redis``, ``beanstalk``,
-``sqlalchemy``, ``django``, ``mongodb``, ``couchdb`` and ``pika``.
+default is ``amqp``, which uses ``librabbitmq`` by default or falls back to
+``pyamqp`` if that is not installed.  Also there are many other choices including
+``redis``, ``beanstalk``, ``sqlalchemy``, ``django``, ``mongodb``,
+``couchdb``.
 It can also be a fully qualified path to your own transport implementation.
 
 See the Kombu documentation for more information about broker URLs.

+ 3 - 3
docs/faq.rst

@@ -123,12 +123,12 @@ kombu
 
 Kombu depends on the following packages:
 
-- `amqplib`_
+- `amqp`_
 
 The underlying pure-Python amqp client implementation.  AMQP being the default
-broker it is a natural dependency.
+broker this is a natural dependency.
 
-.. _`amqplib`: http://pypi.python.org/pypi/amqplib
+.. _`amqp`: http://pypi.python.org/pypi/amqp
 
 - `anyjson`_
 

+ 0 - 14
docs/userguide/calling.rst

@@ -491,20 +491,6 @@ AMQP's full routing capabilities. Interested parties may read the
 
     Routing key used to determine.
 
-- mandatory
-
-    This sets the delivery to be mandatory.  An exception will be raised
-    if there are no running workers able to take on the task.
-
-    Not supported by :mod:`amqplib`.
-
-- immediate
-
-    Request immediate delivery. Will raise an exception
-    if the task cannot be routed to a worker immediately.
-
-    Not supported by :mod:`amqplib`.
-
 - priority
 
     A number between `0` and `9`, where `0` is the highest priority.

+ 1 - 1
docs/userguide/optimizing.rst

@@ -66,7 +66,7 @@ If you're using RabbitMQ (AMQP) as the broker then you can install the
 
 The 'amqp' transport will automatically use the librabbitmq module if it's
 installed, or you can also specify the transport you want directly by using
-the ``amqplib://`` or ``librabbitmq://`` prefixes.
+the ``pyamqp://`` or ``librabbitmq://`` prefixes.
 
 .. _optimizing-connection-pools:
 

+ 3 - 2
docs/whatsnew-3.0.rst

@@ -61,7 +61,7 @@ Highlights
 
         Celery will automatically use the :mod:`librabbitmq` module
         if installed, which is a very fast and memory-optimized
-        replacement for the amqplib module.
+        replacement for the py-amqp module.
 
     - Redis support is more reliable with improved ack emulation.
 
@@ -112,7 +112,8 @@ or Redis as a broker, resulting in:
 - Sub-millisecond timer precision.
 - Faster shutdown times.
 
-The transports supported are:  ``amqplib``, ``librabbitmq``, and ``redis``
+The transports supported are:  ``py-amqp`` ``librabbitmq``, ``redis``,
+and ``amqplib``.
 Hopefully this can be extended to include additional broker transports
 in the future.
 

+ 1 - 1
funtests/benchmarks/bench_worker.py

@@ -20,7 +20,7 @@ DEFAULT_ITS = 40000
 
 BROKER_TRANSPORT = os.environ.get('BROKER', 'librabbitmq')
 if hasattr(sys, 'pypy_version_info'):
-    BROKER_TRANSPORT = 'amqplib'
+    BROKER_TRANSPORT = 'pyamqp'
 
 celery = Celery(__name__)
 celery.conf.update(BROKER_TRANSPORT=BROKER_TRANSPORT,