Browse Source

AMQPConnection renamed to BrokerConnection for carrot 0.5.2

Ask Solem 15 years ago
parent
commit
d830190b62

+ 0 - 3
celery/bin/celeryd.py

@@ -79,13 +79,10 @@ from celery.supervisor import OFASupervisor
 from celery.log import emergency_error
 from celery.conf import LOG_LEVELS, DAEMON_LOG_FILE, DAEMON_LOG_LEVEL
 from celery.conf import DAEMON_CONCURRENCY, DAEMON_PID_FILE
-from celery.messaging import TaskConsumer
 from celery import conf
 from celery import discovery
 from celery.task import discard_all
 from celery.worker import WorkController
-from carrot.connection import DjangoAMQPConnection
-from celery.messaging import TaskConsumer, StatsConsumer
 import multiprocessing
 import traceback
 import optparse

+ 3 - 2
celery/execute.py

@@ -1,4 +1,4 @@
-from carrot.connection import DjangoAMQPConnection
+from carrot.connection import DjangoBrokerConnection
 from celery.conf import AMQP_CONNECTION_TIMEOUT
 from celery.result import AsyncResult, EagerResult
 from celery.messaging import TaskPublisher
@@ -73,7 +73,8 @@ def apply_async(task, args=None, kwargs=None, countdown=None, eta=None,
     need_to_close_connection = False
     if not publisher:
         if not connection:
-            connection = DjangoAMQPConnection(connect_timeout=connect_timeout)
+            connection = DjangoBrokerConnection(
+                            connect_timeout=connect_timeout)
             need_to_close_connection = True
         publisher = TaskPublisher(connection=connection)
 

+ 3 - 3
celery/monitoring.py

@@ -3,7 +3,7 @@
     Publishing Statistics and Monitoring Celery.
 
 """
-from carrot.connection import DjangoAMQPConnection
+from carrot.connection import DjangoBrokerConnection
 from celery.messaging import StatsPublisher, StatsConsumer
 from django.conf import settings
 from django.core.cache import cache
@@ -44,7 +44,7 @@ class Statistics(object):
         """
         if not self.enabled:
             return
-        connection = DjangoAMQPConnection()
+        connection = DjangoBrokerConnection()
         publisher = StatsPublisher(connection=connection)
         publisher.send({"type": self.type, "data": data})
         publisher.close()
@@ -164,7 +164,7 @@ class StatsCollector(object):
     def collect(self):
         """Collect any new statistics available since the last time
         :meth:`collect` was executed."""
-        connection = DjangoAMQPConnection()
+        connection = DjangoBrokerConnection()
         consumer = StatsConsumer(connection=connection)
         it = consumer.iterqueue(infinite=False)
         for message in it:

+ 2 - 2
celery/task/__init__.py

@@ -3,7 +3,7 @@
 Working with tasks and task sets.
 
 """
-from carrot.connection import DjangoAMQPConnection
+from carrot.connection import DjangoBrokerConnection
 from celery.messaging import TaskConsumer
 from celery.conf import AMQP_CONNECTION_TIMEOUT
 from celery.registry import tasks
@@ -27,7 +27,7 @@ def discard_all(connect_timeout=AMQP_CONNECTION_TIMEOUT):
     :rtype: int
 
     """
-    amqp_connection = DjangoAMQPConnection(connect_timeout=connect_timeout)
+    amqp_connection = DjangoBrokerConnection(connect_timeout=connect_timeout)
     consumer = TaskConsumer(connection=amqp_connection)
     discarded_count = consumer.discard_all()
     amqp_connection.close()

+ 4 - 4
celery/task/base.py

@@ -1,4 +1,4 @@
-from carrot.connection import DjangoAMQPConnection
+from carrot.connection import DjangoBrokerConnection
 from celery.conf import AMQP_CONNECTION_TIMEOUT
 from celery.messaging import TaskPublisher, TaskConsumer
 from celery.log import setup_logger
@@ -155,7 +155,7 @@ class Task(object):
 
         """
 
-        connection = DjangoAMQPConnection(connect_timeout=connect_timeout)
+        connection = DjangoBrokerConnection(connect_timeout=connect_timeout)
         return TaskPublisher(connection=connection,
                              exchange=self.exchange,
                              routing_key=self.routing_key)
@@ -173,7 +173,7 @@ class Task(object):
             >>> consumer.connection.close()
 
         """
-        connection = DjangoAMQPConnection(connect_timeout=connect_timeout)
+        connection = DjangoBrokerConnection(connect_timeout=connect_timeout)
         return TaskConsumer(connection=connection, exchange=self.exchange,
                             routing_key=self.routing_key)
 
@@ -350,7 +350,7 @@ class TaskSet(object):
                             for args, kwargs in self.arguments]
             return TaskSetResult(taskset_id, subtasks)
 
-        conn = DjangoAMQPConnection(connect_timeout=connect_timeout)
+        conn = DjangoBrokerConnection(connect_timeout=connect_timeout)
         publisher = TaskPublisher(connection=conn,
                                   exchange=self.task.exchange)
         subtasks = [apply_async(self.task, args, kwargs,

+ 2 - 2
celery/task/strategy.py

@@ -1,4 +1,4 @@
-from carrot.connection import DjangoAMQPConnection
+from carrot.connection import DjangoBrokerConnection
 from celery.utils import chunks
 
 
@@ -37,7 +37,7 @@ def even_time_distribution(task, size, time_window, iterable, **apply_kwargs):
     bucketsize = size / time_window
     buckets = chunks(iterable, int(bucketsize))
 
-    connection = DjangoAMQPConnection()
+    connection = DjangoBrokerConnection()
     try:
         for bucket_count, bucket in enumerate(buckets):
             # Skew the countdown for items in this bucket by one.

+ 2 - 2
celery/tests/test_monitoring.py

@@ -2,7 +2,7 @@ from __future__ import with_statement
 import unittest
 import time
 from celery.monitoring import TaskTimerStats, Statistics, StatsCollector
-from carrot.connection import DjangoAMQPConnection
+from carrot.connection import DjangoBrokerConnection
 from celery.messaging import StatsConsumer
 from celery.tests.utils import OverrideStdout
 
@@ -57,7 +57,7 @@ class TestTaskTimerStats(unittest.TestCase):
 class TestStatsCollector(unittest.TestCase):
 
     def setUp(self):
-        conn = DjangoAMQPConnection()
+        conn = DjangoBrokerConnection()
         consumer = StatsConsumer(connection=conn)
         consumer.discard_all()
         conn.close()

+ 3 - 5
celery/tests/test_worker.py

@@ -1,6 +1,6 @@
 import unittest
 from Queue import Queue, Empty
-from carrot.connection import AMQPConnection
+from carrot.connection import BrokerConnection
 from celery.messaging import TaskConsumer
 from celery.worker.job import TaskWrapper
 from celery.worker import AMQPListener, WorkController
@@ -92,16 +92,14 @@ class TestAMQPListener(unittest.TestCase):
         l = AMQPListener(self.bucket_queue, self.hold_queue, self.logger)
 
         c = l.reset_connection()
-        self.assertTrue(c is l.task_consumer)
-        self.assertTrue(isinstance(l.amqp_connection, AMQPConnection))
+        self.assertTrue(isinstance(l.amqp_connection, BrokerConnection))
 
         l.close_connection()
         self.assertTrue(l.amqp_connection is None)
         self.assertTrue(l.task_consumer is None)
 
         c = l.reset_connection()
-        self.assertTrue(c is l.task_consumer)
-        self.assertTrue(isinstance(l.amqp_connection, AMQPConnection))
+        self.assertTrue(isinstance(l.amqp_connection, BrokerConnection))
 
         l.stop()
         self.assertTrue(l.amqp_connection is None)

+ 2 - 2
celery/worker/__init__.py

@@ -5,7 +5,7 @@ The Multiprocessing Worker Server
 Documentation for this module is in ``docs/reference/celery.worker.rst``.
 
 """
-from carrot.connection import DjangoAMQPConnection
+from carrot.connection import DjangoBrokerConnection
 from celery.worker.controllers import Mediator, PeriodicWorkController
 from celery.worker.job import TaskWrapper
 from celery.registry import NotRegistered
@@ -147,7 +147,7 @@ class AMQPListener(object):
 
         def _establish_connection():
             """Establish a connection to the AMQP broker."""
-            conn = DjangoAMQPConnection()
+            conn = DjangoBrokerConnection()
             connected = conn.connection # Connection is established lazily.
             return conn
 

+ 4 - 4
contrib/testconn.py

@@ -1,7 +1,7 @@
 import settings
 from django.core.management import setup_environ
 setup_environ(settings)
-from carrot.connection import DjangoAMQPConnection
+from carrot.connection import DjangoBrokerConnection
 from carrot.messaging import Messaging
 from amqplib import client_0_8 as amqp
 from celery.task import dmap
@@ -61,20 +61,20 @@ def _recv2():
 
 
 def send_a_message(msg):
-    conn = DjangoAMQPConnection()
+    conn = DjangoBrokerConnection()
     MyMessager(connection=conn).send({"message": msg})
     conn.close()
 
 
 def discard_all():
-    conn = DjangoAMQPConnection()
+    conn = DjangoBrokerConnection()
     MyMessager(connection=conn).consumer.discard_all()
     conn.close()
 
 
 def receive_a_message():
     logger = get_logger()
-    conn = DjangoAMQPConnection()
+    conn = DjangoBrokerConnection()
     m = MyMessager(connection=conn).fetch()
     if m:
         msg = simplejson.loads(m.body)

+ 3 - 3
docs/tutorials/clickcounter.rst

@@ -105,14 +105,14 @@ On to the code...
 
 .. code-block:: python
 
-    from carrot.connection import DjangoAMQPConnection
+    from carrot.connection import DjangoBrokerConnection
     from carrot.messaging import Publisher, Consumer
     from clickmuncher.models import Click
 
 
     def send_increment_clicks(for_url):
         """Send a message for incrementing the click count for an URL."""
-        connection = DjangoAMQPConnection()
+        connection = DjangoBrokerConnection()
         publisher = Publisher(connection=connection,
                               exchange="clicks",
                               routing_key="increment_click",
@@ -127,7 +127,7 @@ On to the code...
     def process_clicks():
         """Process all currently gathered clicks by saving them to the
         database."""
-        connection = DjangoAMQPConnection()
+        connection = DjangoBrokerConnection()
         consumer = Consumer(connection=connection,
                             queue="clicks",
                             exchange="clicks",