Browse Source

incomplete consumer boot steps

Ask Solem 12 years ago
parent
commit
c05af8cfcb
2 changed files with 10 additions and 129 deletions
  1. 9 7
      celery/worker/bootsteps.py
  2. 1 122
      celery/worker/consumer.py

+ 9 - 7
celery/worker/bootsteps.py

@@ -72,7 +72,7 @@ class Namespace(object):
             if component:
             if component:
                 logger.debug('Starting %s...', qualname(component))
                 logger.debug('Starting %s...', qualname(component))
                 self.started = i + 1
                 self.started = i + 1
-                component.start()
+                component.start(parent)
                 logger.debug('%s OK!', qualname(component))
                 logger.debug('%s OK!', qualname(component))
 
 
     def close(self, parent):
     def close(self, parent):
@@ -84,7 +84,8 @@ class Namespace(object):
             except AttributeError:
             except AttributeError:
                 pass
                 pass
             else:
             else:
-                close()
+                close(parent)
+        self.state = CLOSE
 
 
     def stop(self, parent, terminate=False):
     def stop(self, parent, terminate=False):
         what = 'Terminating' if terminate else 'Stopping'
         what = 'Terminating' if terminate else 'Stopping'
@@ -94,20 +95,21 @@ class Namespace(object):
         if self.state in (CLOSE, TERMINATE):
         if self.state in (CLOSE, TERMINATE):
             return
             return
 
 
+        self.close()
+
         if self.state != RUN or self.started != len(parent.components):
         if self.state != RUN or self.started != len(parent.components):
             # Not fully started, can safely exit.
             # Not fully started, can safely exit.
             self.state = TERMINATE
             self.state = TERMINATE
             self.shutdown_complete.set()
             self.shutdown_complete.set()
             return
             return
 
 
-        self.state = CLOSE
         for component in reversed(parent.components):
         for component in reversed(parent.components):
             if component:
             if component:
                 logger.debug('%s %s...', what, qualname(component))
                 logger.debug('%s %s...', what, qualname(component))
                 stop = component.stop
                 stop = component.stop
                 if terminate:
                 if terminate:
                     stop = getattr(component, 'terminate', None) or stop
                     stop = getattr(component, 'terminate', None) or stop
-                stop()
+                stop(parent)
 
 
         if self.on_stopped:
         if self.on_stopped:
             self.on_stopped()
             self.on_stopped()
@@ -277,13 +279,13 @@ class StartStopComponent(Component):
     abstract = True
     abstract = True
     terminable = False
     terminable = False
 
 
-    def start(self):
+    def start(self, parent):
         return self.obj.start()
         return self.obj.start()
 
 
-    def stop(self):
+    def stop(self, parent):
         return self.obj.stop()
         return self.obj.stop()
 
 
-    def close(self):
+    def close(self, parent):
         pass
         pass
 
 
     def terminate(self):
     def terminate(self):

+ 1 - 122
celery/worker/consumer.py

@@ -7,68 +7,6 @@ This module contains the component responsible for consuming messages
 from the broker, processing the messages and keeping the broker connections
 from the broker, processing the messages and keeping the broker connections
 up and running.
 up and running.
 
 
-
-* :meth:`~Consumer.start` is an infinite loop, which only iterates
-  again if the connection is lost. For each iteration (at start, or if the
-  connection is lost) it calls :meth:`~Consumer.reset_connection`,
-  and starts the consumer by calling :meth:`~Consumer.consume_messages`.
-
-* :meth:`~Consumer.reset_connection`, clears the internal queues,
-  establishes a new connection to the broker, sets up the task
-  consumer (+ QoS), and the broadcast remote control command consumer.
-
-  Also if events are enabled it configures the event dispatcher and starts
-  up the heartbeat thread.
-
-* Finally it can consume messages. :meth:`~Consumer.consume_messages`
-  is simply an infinite loop waiting for events on the AMQP channels.
-
-  Both the task consumer and the broadcast consumer uses the same
-  callback: :meth:`~Consumer.receive_message`.
-
-* So for each message received the :meth:`~Consumer.receive_message`
-  method is called, this checks the payload of the message for either
-  a `task` key or a `control` key.
-
-  If the message is a task, it verifies the validity of the message
-  converts it to a :class:`celery.worker.job.Request`, and sends
-  it to :meth:`~Consumer.on_task`.
-
-  If the message is a control command the message is passed to
-  :meth:`~Consumer.on_control`, which in turn dispatches
-  the control command using the control dispatcher.
-
-  It also tries to handle malformed or invalid messages properly,
-  so the worker doesn't choke on them and die. Any invalid messages
-  are acknowledged immediately and logged, so the message is not resent
-  again, and again.
-
-* If the task has an ETA/countdown, the task is moved to the `timer`
-  so the :class:`timer2.Timer` can schedule it at its
-  deadline. Tasks without an eta are moved immediately to the `ready_queue`,
-  so they can be picked up by the :class:`~celery.worker.mediator.Mediator`
-  to be sent to the pool.
-
-* When a task with an ETA is received the QoS prefetch count is also
-  incremented, so another message can be reserved. When the ETA is met
-  the prefetch count is decremented again, though this cannot happen
-  immediately because amqplib doesn't support doing broker requests
-  across threads. Instead the current prefetch count is kept as a
-  shared counter, so as soon as  :meth:`~Consumer.consume_messages`
-  detects that the value has changed it will send out the actual
-  QoS event to the broker.
-
-* Notice that when the connection is lost all internal queues are cleared
-  because we can no longer ack the messages reserved in memory.
-  However, this is not dangerous as the broker will resend them
-  to another worker when the channel is closed.
-
-* **WARNING**: :meth:`~Consumer.stop` does not close the connection!
-  This is because some pre-acked messages may be in processing,
-  and they need to be finished before the channel is closed.
-  For celeryd this means the pool must finish the tasks it has acked
-  early, *then* close the connection.
-
 """
 """
 from __future__ import absolute_import
 from __future__ import absolute_import
 
 
@@ -79,6 +17,7 @@ import threading
 from time import sleep
 from time import sleep
 from Queue import Empty
 from Queue import Empty
 
 
+from kombu.common import QoS
 from kombu.syn import _detect_environment
 from kombu.syn import _detect_environment
 from kombu.utils.encoding import safe_repr
 from kombu.utils.encoding import safe_repr
 from kombu.utils.eventio import READ, WRITE, ERR
 from kombu.utils.eventio import READ, WRITE, ERR
@@ -101,9 +40,6 @@ from .heartbeat import Heart
 #: Heartbeat check is called every heartbeat_seconds' / rate'.
 #: Heartbeat check is called every heartbeat_seconds' / rate'.
 AMQHEARTBEAT_RATE = 2.0
 AMQHEARTBEAT_RATE = 2.0
 
 
-#: Prefetch count can't exceed short.
-PREFETCH_COUNT_MAX = 0xFFFF
-
 UNKNOWN_FORMAT = """\
 UNKNOWN_FORMAT = """\
 Received and deleted unknown message. Wrong destination?!?
 Received and deleted unknown message. Wrong destination?!?
 
 
@@ -198,63 +134,6 @@ class Component(StartStopComponent):
         return c
         return c
 
 
 
 
-class QoS(object):
-    """Thread safe increment/decrement of a channels prefetch_count.
-
-    :param consumer: A :class:`kombu.messaging.Consumer` instance.
-    :param initial_value: Initial prefetch count value.
-
-    """
-    prev = None
-
-    def __init__(self, consumer, initial_value):
-        self.consumer = consumer
-        self._mutex = threading.RLock()
-        self.value = initial_value or 0
-
-    def increment_eventually(self, n=1):
-        """Increment the value, but do not update the channels QoS.
-
-        The MainThread will be responsible for calling :meth:`update`
-        when necessary.
-
-        """
-        with self._mutex:
-            if self.value:
-                self.value = self.value + max(n, 0)
-        return self.value
-
-    def decrement_eventually(self, n=1):
-        """Decrement the value, but do not update the channels QoS.
-
-        The MainThread will be responsible for calling :meth:`update`
-        when necessary.
-
-        """
-        with self._mutex:
-            if self.value:
-                self.value -= n
-        return self.value
-
-    def set(self, pcount):
-        """Set channel prefetch_count setting."""
-        if pcount != self.prev:
-            new_value = pcount
-            if pcount > PREFETCH_COUNT_MAX:
-                warn('QoS: Disabled: prefetch_count exceeds %r',
-                     PREFETCH_COUNT_MAX)
-                new_value = 0
-            debug('basic.qos: prefetch_count->%s', new_value)
-            self.consumer.qos(prefetch_count=new_value)
-            self.prev = pcount
-        return pcount
-
-    def update(self):
-        """Update prefetch count with current value."""
-        with self._mutex:
-            return self.set(self.value)
-
-
 class Consumer(object):
 class Consumer(object):
     """Listen for messages received from the broker and
     """Listen for messages received from the broker and
     move them to the ready queue for task processing.
     move them to the ready queue for task processing.