Quellcode durchsuchen

Pass DjangoAMQPConnection *instances*, instead of cls. Requires carrot version
0.3.8 (next release)

Ask Solem vor 16 Jahren
Ursprung
Commit
20a2f8e740
3 geänderte Dateien mit 7 neuen und 7 gelöschten Zeilen
  1. 5 5
      celery/task.py
  2. 1 1
      celery/worker.py
  3. 1 1
      setup.py

+ 5 - 5
celery/task.py

@@ -22,7 +22,7 @@ def delay_task(task_name, *args, **kwargs):
         raise tasks.NotRegistered(
                 "Task with name %s not registered in the task registry." % (
                     task_name))
-    publisher = TaskPublisher(connection=DjangoAMQPConnection)
+    publisher = TaskPublisher(connection=DjangoAMQPConnection())
     task_id = publisher.delay_task(task_name, *args, **kwargs)
     publisher.close()
     return task_id
@@ -37,7 +37,7 @@ def discard_all():
     Returns the number of tasks discarded.
 
     """
-    consumer = TaskConsumer(connection=DjangoAMQPConnection)
+    consumer = TaskConsumer(connection=DjangoAMQPConnection())
     discarded_count = consumer.discard_all()
     consumer.close()
     return discarded_count
@@ -141,11 +141,11 @@ class Task(object):
 
     def get_publisher(self):
         """Get a celery task message publisher."""
-        return TaskPublisher(connection=DjangoAMQPConnection)
+        return TaskPublisher(connection=DjangoAMQPConnection())
 
     def get_consumer(self):
         """Get a celery task message consumer."""
-        return TaskConsumer(connection=DjangoAMQPConnection)
+        return TaskConsumer(connection=DjangoAMQPConnection())
 
     @classmethod
     def delay(cls, *args, **kwargs):
@@ -204,7 +204,7 @@ class TaskSet(object):
             True
         """
         taskset_id = str(uuid.uuid4())
-        publisher = TaskPublisher(connection=DjangoAMQPConnection)
+        publisher = TaskPublisher(connection=DjangoAMQPConnection())
         subtask_ids = []
         for arg in self.arguments:
             subtask_id = publisher.delay_task_in_set(task_name=self.task_name,

+ 1 - 1
celery/worker.py

@@ -78,7 +78,7 @@ class TaskDaemon(object):
                                     self.queue_wakeup_after
         self.logger = setup_logger(loglevel, logfile)
         self.pool = multiprocessing.Pool(self.concurrency)
-        self.task_consumer = TaskConsumer(connection=DjangoAMQPConnection)
+        self.task_consumer = TaskConsumer(connection=DjangoAMQPConnection())
 
     def fetch_next_task(self):
         message = self.task_consumer.fetch()

+ 1 - 1
setup.py

@@ -61,7 +61,7 @@ setup(
     install_requires=[
         'django-unittest-depth',
         'simplejson',
-        'carrot',
+        'carrot>=0.3.8',
         'django',
     ],
     cmdclass = {"test": RunTests},