Browse Source

Clear internal queues if connection reset.

Ask Solem 15 years ago
parent
commit
e37740255c
3 changed files with 19 additions and 0 deletions
  1. 7 0
      celery/worker/buckets.py
  2. 9 0
      celery/worker/listener.py
  3. 3 0
      celery/worker/scheduler.py

+ 7 - 0
celery/worker/buckets.py

@@ -174,6 +174,10 @@ class TaskBucket(object):
     def empty(self):
         return all(bucket.empty() for bucket in self.buckets.values())
 
+    def clear(self):
+        for bucket in self.buckets.values():
+            bucket.queue.clear()
+
 
 class TokenBucketQueue(object):
     """Queue with rate limited get operations.
@@ -270,6 +274,9 @@ class TokenBucketQueue(object):
     def empty(self):
         return self.queue.empty()
 
+    def clear(self):
+        return self.queue.queue.clear()
+
     def wait(self, block=False):
         """Wait until a token can be retrieved from the bucket and return
         the next item."""

+ 9 - 0
celery/worker/listener.py

@@ -187,6 +187,15 @@ class CarrotListener(object):
         self.logger.debug(
                 "CarrotListener: Re-establishing connection to the broker...")
         self.stop_consumers()
+
+        try:
+            # TaskBucket supports clear directly.
+            self.ready_queue.clear()
+        except AttributeError:
+            # Use the underlying deque of regular Queue
+            self.ready_queue.queue.clear()
+        self.eta_schedule.clear()
+
         self.connection = self._open_connection()
         self.logger.debug("CarrotListener: Connection Established.")
         self.task_consumer = get_consumer_set(connection=self.connection)

+ 3 - 0
celery/worker/scheduler.py

@@ -57,6 +57,9 @@ class Scheduler(object):
         """Is the schedule empty?"""
         return not self._queue
 
+    def clear(self):
+        self._queue = []
+
     @property
     def queue(self):
         events = list(self._queue)