Ask Solem há 11 anos atrás
pai
commit
e50f52e00e
3 ficheiros alterados com 37 adições e 35 exclusões
  1. 0 1
      celery/app/base.py
  2. 3 2
      celery/concurrency/processes.py
  3. 34 32
      celery/worker/hub.py

+ 0 - 1
celery/app/base.py

@@ -20,7 +20,6 @@ from operator import attrgetter
 from billiard.util import register_after_fork
 from kombu.clocks import LamportClock
 from kombu.common import oid_from
-from kombu.serialization import enable_insecure_serializers
 from kombu.utils import cached_property
 
 from celery import platforms

+ 3 - 2
celery/concurrency/processes.py

@@ -465,7 +465,7 @@ class AsynPool(_pool.Pool):
             assert len(self._queues) == before
 
     def destroy_queues(self, queues):
-        """Destroyes queues that can no longer be used and that will
+        """Destroy queues that can no longer be used, so that they
         be replaced by new sockets."""
         removed = 1
         try:
@@ -623,7 +623,8 @@ class TaskPool(BasePool):
         return {
             'total': total,
             'avg': per(total / len(self.write_stats) if total else 0, total),
-            'all': ', '.join(per(v, total) for v in vals)
+            'all': ', '.join(per(v, total) for v in vals),
+            'raw': ', '.join(map(str, vals)),
         }
 
     def on_poll_init(self, w, hub):

+ 34 - 32
celery/worker/hub.py

@@ -39,9 +39,7 @@ class BoundedSemaphore(object):
     """Asynchronous Bounded Semaphore.
 
     Bounded means that the value will stay within the specified
-    range even if it is released more times than it was acquired.
-
-    This type is *not thread safe*.
+    range even if released more times than it was acquired.
 
     Example:
 
@@ -70,8 +68,8 @@ class BoundedSemaphore(object):
         self._waiting = []
 
     def acquire(self, callback, *partial_args):
-        """Acquire semaphore, applying ``callback`` when
-        the semaphore is ready.
+        """Acquire semaphore, applying ``callback`` if
+        the resource is available.
 
         :param callback: The callback to apply.
         :param \*partial_args: partial arguments to callback.
@@ -88,8 +86,8 @@ class BoundedSemaphore(object):
     def release(self):
         """Release semaphore.
 
-        This will apply any waiting callbacks from previous
-        calls to :meth:`acquire` done when the semaphore was busy.
+        If there are any waiters this will apply the first waiter
+        that is waiting for the resource (FIFO order).
 
         """
         self.value = min(self.value + 1, self.initial_value)
@@ -98,18 +96,18 @@ class BoundedSemaphore(object):
             waiter(*args)
 
     def grow(self, n=1):
-        """Change the size of the semaphore to hold more values."""
+        """Change the size of the semaphore to accept more users."""
         self.initial_value += n
         self.value += n
         [self.release() for _ in range(n)]
 
     def shrink(self, n=1):
-        """Change the size of the semaphore to hold less values."""
+        """Change the size of the semaphore to accept less users."""
         self.initial_value = max(self.initial_value - n, 0)
         self.value = max(self.value - n, 0)
 
     def clear(self):
-        """Reset the sempahore, including wiping out any waiting callbacks."""
+        """Reset the sempahore, which also wipes out any waiting callbacks."""
         self._waiting[:] = []
         self.value = self.initial_value
 
@@ -149,6 +147,14 @@ class Hub(object):
         self.on_init = []
         self.on_close = []
         self.on_task = []
+
+        # The eventloop (in celery.worker.loops)
+        # will merge fds in this set and then instead of calling
+        # the callback for each ready fd it will call the
+        # :attr:`consolidate_callback` with the list of ready_fds
+        # as an argument.  This API is internal and is only
+        # used by the multiprocessing pool to find inqueues
+        # that are ready to write.
         self.consolidate = set()
         self.consolidate_callback = None
 
@@ -180,16 +186,6 @@ class Hub(object):
                     logger.error('Error in timer: %r', exc, exc_info=1)
         return min(max(delay or 0, min_delay), max_delay)
 
-    def _add(self, fd, cb, flags, consolidate=False):
-        #if flags & WRITE:
-        #    ex = self.writers.get(fd)
-        #    if ex and ex.__name__ == '_write_job':
-        #        assert not ex.gi_frame or ex.gi_frame == -1
-        self.poller.register(fd, flags)
-        (self.readers if flags & READ else self.writers)[fileno(fd)] = cb
-        if consolidate:
-            self.consolidate.add(fd)
-
     def add(self, fds, callback, flags, consolidate=False):
         for fd in maybe_list(fds, None):
             try:
@@ -220,12 +216,6 @@ class Hub(object):
         except (KeyError, OSError):
             pass
 
-    def _discard(self, fd):
-        fd = fileno(fd)
-        self.readers.pop(fd, None)
-        self.writers.pop(fd, None)
-        self.consolidate.discard(fd)
-
     def close(self, *args):
         [self._unregister(fd) for fd in self.readers]
         self.readers.clear()
@@ -234,13 +224,17 @@ class Hub(object):
         for callback in self.on_close:
             callback(self)
 
-    def _repr_readers(self):
-        return ['({0}){1}->{2}'.format(fd, _rcb(cb), repr_flag(READ | ERR))
-                for fd, cb in items(self.readers)]
+    def _add(self, fd, cb, flags, consolidate=False):
+        self.poller.register(fd, flags)
+        (self.readers if flags & READ else self.writers)[fileno(fd)] = cb
+        if consolidate:
+            self.consolidate.add(fd)
 
-    def _repr_writers(self):
-        return ['({0}){1}->{2}'.format(fd, _rcb(cb), repr_flag(WRITE))
-                for fd, cb in items(self.writers)]
+    def _discard(self, fd):
+        fd = fileno(fd)
+        self.readers.pop(fd, None)
+        self.writers.pop(fd, None)
+        self.consolidate.discard(fd)
 
     def repr_active(self):
         return ', '.join(self._repr_readers() + self._repr_writers())
@@ -254,6 +248,14 @@ class Hub(object):
             for fd, fl in events
         )
 
+    def _repr_readers(self):
+        return ['({0}){1}->{2}'.format(fd, _rcb(cb), repr_flag(READ | ERR))
+                for fd, cb in items(self.readers)]
+
+    def _repr_writers(self):
+        return ['({0}){1}->{2}'.format(fd, _rcb(cb), repr_flag(WRITE))
+                for fd, cb in items(self.writers)]
+
     def _callback_for(self, fd, flag, *default):
         try:
             if flag & READ: