Ask Solem преди 16 години
родител
ревизия
9c15a2c84c
променени са 3 файла, в които са добавени 8 реда и са изтрити 3 реда
  1. 2 1
      celery/messaging.py
  2. 5 1
      celery/worker.py
  3. 1 1
      setup.py

+ 2 - 1
celery/messaging.py

@@ -70,5 +70,6 @@ class TaskConsumer(Consumer):
     exchange = conf.AMQP_EXCHANGE
     routing_key = conf.AMQP_CONSUMER_ROUTING_KEY
     exchange_type = conf.AMQP_EXCHANGE_TYPE
-    auto_ack = True
     decoder = pickle.loads
+    auto_ack = False
+    no_ack = False

+ 5 - 1
celery/worker.py

@@ -131,6 +131,10 @@ class TaskWrapper(object):
 
         Mapping of keyword arguments to apply to the task.
 
+    .. attribute:: message
+
+        The original message sent. Used for acknowledging the message.
+
     """
     success_msg = "Task %(name)s[%(id)s] processed: %(return_value)s"
     fail_msg = """
@@ -359,6 +363,7 @@ class WorkController(object):
         self.close_connection()
         self.amqp_connection = DjangoAMQPConnection()
         self.task_consumer = TaskConsumer(connection=self.amqp_connection)
+        self.task_consumer.register_callback(self._message_callback)
         return self.task_consumer
 
     def connection_diagnostics(self):
@@ -414,7 +419,6 @@ class WorkController(object):
     def run(self):
         """Starts the workers main loop."""
         task_consumer = self.reset_connection()
-        task_consumer.register_callback(self._message_callback)
         it = task_consumer.iterconsume(limit=None)
 
         self.pool.run()

+ 1 - 1
setup.py

@@ -66,7 +66,7 @@ setup(
     scripts=["bin/celeryd"],
     zip_safe=False,
     install_requires=[
-        'carrot>=0.4.1',
+        'carrot>=0.4.5',
         'python-daemon',
     ],
     cmdclass = {"test": RunTests},