Browse Source

celery.worker.hub is now kombu.async

Ask Solem 11 years ago
parent
commit
1070fcd749

+ 1 - 1
celery/concurrency/processes.py

@@ -37,6 +37,7 @@ from billiard.pool import (
     RUN, CLOSE, TERMINATE, ACK, NACK, EX_RECYCLE, WorkersJoined, CoroStop,
 )
 from billiard.queues import _SimpleQueue
+from kombu.async import READ, WRITE, ERR
 from kombu.serialization import pickle as _pickle
 from kombu.utils import fxrange
 from kombu.utils.compat import get_errno
@@ -49,7 +50,6 @@ from celery.app import trace
 from celery.concurrency.base import BasePool
 from celery.five import Counter, items, values
 from celery.utils.log import get_logger
-from celery.worker.hub import READ, WRITE, ERR
 
 __all__ = ['TaskPool']
 

+ 4 - 23
celery/platforms.py

@@ -18,12 +18,14 @@ import signal as _signal
 import sys
 
 from billiard import current_process
+# fileno used to be in this module
+from kombu.utils import maybe_fileno
 from kombu.utils.compat import get_errno
 from kombu.utils.encoding import safe_str
 from contextlib import contextmanager
 
 from .local import try_import
-from .five import items, range, reraise, string_t, int_types
+from .five import items, range, reraise, string_t
 
 _setproctitle = try_import('setproctitle')
 resource = try_import('resource')
@@ -32,7 +34,7 @@ grp = try_import('grp')
 
 __all__ = ['EX_OK', 'EX_FAILURE', 'EX_UNAVAILABLE', 'EX_USAGE', 'SYSTEM',
            'IS_OSX', 'IS_WINDOWS', 'pyimplementation', 'LockFailed',
-           'get_fdmax', 'Pidfile', 'create_pidlock', 'fileno', 'maybe_fileno',
+           'get_fdmax', 'Pidfile', 'create_pidlock',
            'close_open_fds', 'DaemonContext', 'detached', 'parse_uid',
            'parse_gid', 'setgroups', 'initgroups', 'setgid', 'setuid',
            'maybe_drop_privileges', 'signals', 'set_process_title',
@@ -57,13 +59,6 @@ PIDFILE_MODE = ((os.R_OK | os.W_OK) << 6) | ((os.R_OK) << 3) | ((os.R_OK))
 PIDLOCKED = """ERROR: Pidfile ({0}) already exists.
 Seems we're already running? (pid: {1})"""
 
-try:
-    from io import UnsupportedOperation
-    FILENO_ERRORS = (AttributeError, ValueError, UnsupportedOperation)
-except ImportError:  # pragma: no cover
-    # Py2
-    FILENO_ERRORS = (AttributeError, ValueError)  # noqa
-
 
 def pyimplementation():
     """Returns string identifying the current Python implementation."""
@@ -268,20 +263,6 @@ def _create_pidlock(pidfile):
     return pidlock
 
 
-def fileno(f):
-    if isinstance(f, int_types):
-        return f
-    return f.fileno()
-
-
-def maybe_fileno(f):
-    """Get object fileno, or :const:`None` if not defined."""
-    try:
-        return fileno(f)
-    except FILENO_ERRORS:
-        pass
-
-
 def close_open_fds(keep=None):
     keep = [maybe_fileno(f) for f in keep if maybe_fileno(f)] if keep else []
     for fd in reversed(range(get_fdmax(default=2048))):

+ 8 - 9
celery/tests/worker/test_hub.py

@@ -1,13 +1,12 @@
 from __future__ import absolute_import
 
-from celery.worker.hub import (
-    DummyLock,
-    BoundedSemaphore,
+from kombu.async import (
     Hub,
     repr_flag,
     _rcb,
     READ, WRITE, ERR
 )
+from kombu.async.semaphore import DummyLock, LaxBoundedSemaphore
 
 from mock import Mock, call, patch
 
@@ -40,10 +39,10 @@ class test_DummyLock(Case):
             pass
 
 
-class test_BoundedSemaphore(Case):
+class test_LaxBoundedSemaphore(Case):
 
     def test_acquire_release(self):
-        x = BoundedSemaphore(2)
+        x = LaxBoundedSemaphore(2)
 
         c1 = Mock()
         x.acquire(c1, 1)
@@ -65,13 +64,13 @@ class test_BoundedSemaphore(Case):
         c3.assert_called_with(3)
 
     def test_bounded(self):
-        x = BoundedSemaphore(2)
+        x = LaxBoundedSemaphore(2)
         for i in range(100):
             x.release()
         self.assertEqual(x.value, 2)
 
     def test_grow_shrink(self):
-        x = BoundedSemaphore(1)
+        x = LaxBoundedSemaphore(1)
         self.assertEqual(x.initial_value, 1)
         cb1 = Mock()
         x.acquire(cb1, 1)
@@ -111,7 +110,7 @@ class test_BoundedSemaphore(Case):
         self.assertEqual(x.value, x.initial_value)
 
     def test_clear(self):
-        x = BoundedSemaphore(10)
+        x = LaxBoundedSemaphore(10)
         for i in range(11):
             x.acquire(Mock())
         self.assertTrue(x._waiting)
@@ -141,7 +140,7 @@ class test_Hub(Case):
         self.assertEqual(_rcb(f), f.__name__)
         self.assertEqual(_rcb('foo'), 'foo')
 
-    @patch('kombu.utils.eventio.poll')
+    @patch('kombu.async.hub.poll')
     def test_start_stop(self, poll):
         hub = Hub()
         hub.start()

+ 3 - 3
celery/tests/worker/test_worker.py

@@ -9,6 +9,7 @@ from threading import Event
 
 from billiard.exceptions import WorkerLostError
 from kombu import Connection
+from kombu.async import READ, ERR
 from kombu.common import QoS, ignore_errors
 from kombu.exceptions import StdChannelError
 from kombu.transport.base import Message
@@ -24,7 +25,6 @@ from celery.utils import uuid
 from celery.worker import components
 from celery.worker import consumer
 from celery.worker.consumer import Consumer as __Consumer
-from celery.worker.hub import READ, ERR
 from celery.worker.job import Request
 from celery.utils import worker_direct
 from celery.utils.serialization import pickle
@@ -1036,7 +1036,7 @@ class test_WorkController(AppCase):
         pool.create(w)
 
     def test_Pool_create(self):
-        from celery.worker.hub import BoundedSemaphore
+        from kombu.async.semaphore import LaxBoundedSemaphore
         w = Mock()
         w._conninfo.connection_errors = w._conninfo.channel_errors = ()
         w.hub = Mock()
@@ -1063,7 +1063,7 @@ class test_WorkController(AppCase):
         w.consumer.restart_count = -1
         pool = components.Pool(w)
         pool.create(w)
-        self.assertIsInstance(w.semaphore, BoundedSemaphore)
+        self.assertIsInstance(w.semaphore, LaxBoundedSemaphore)
         self.assertTrue(w.hub.on_init)
         P = w.pool
         P.start()

+ 2 - 12
celery/utils/functional.py

@@ -15,10 +15,10 @@ from functools import wraps
 from itertools import islice
 
 from kombu.utils import cached_property
-from kombu.utils.functional import lazy, maybe_evaluate
+from kombu.utils.functional import lazy, maybe_evaluate, is_list, maybe_list
 from kombu.utils.compat import OrderedDict
 
-from celery.five import UserDict, UserList, items, keys, string_t
+from celery.five import UserDict, UserList, items, keys
 
 __all__ = ['LRUCache', 'is_list', 'maybe_list', 'memoize', 'mlazy', 'noop',
            'first', 'firstmethod', 'chunks', 'padlist', 'mattrgetter', 'uniq',
@@ -121,16 +121,6 @@ class LRUCache(UserDict):
             return list(self._iterate_items())
 
 
-def is_list(l, scalars=(dict, string_t)):
-    """Returns true if object is list-like, but not a dict or string."""
-    return hasattr(l, '__iter__') and not isinstance(l, scalars or ())
-
-
-def maybe_list(l, scalars=(dict, string_t)):
-    """Returns list of one element if ``l`` is a scalar."""
-    return l if l is None or is_list(l, scalars) else [l]
-
-
 def memoize(maxsize=None, Cache=LRUCache):
 
     def _memoize(fun):

+ 2 - 1
celery/worker/autoscale.py

@@ -19,13 +19,14 @@ import threading
 from functools import partial
 from time import sleep, time
 
+from kombu.async.semaphore import DummyLock
+
 from celery import bootsteps
 from celery.utils.log import get_logger
 from celery.utils.threads import bgThread
 
 from . import state
 from .components import Pool
-from .hub import DummyLock
 
 __all__ = ['Autoscaler', 'WorkerComponent']
 

+ 9 - 6
celery/worker/components.py

@@ -12,14 +12,15 @@ import atexit
 
 from functools import partial
 
+from kombu.async import Hub as _Hub, get_event_loop, set_event_loop
+from kombu.async.semaphore import DummyLock, LaxBoundedSemaphore
+
 from celery import bootsteps
 from celery.exceptions import ImproperlyConfigured
 from celery.five import string_t
 from celery.utils.log import worker_logger as logger
 from celery.utils.timer2 import Schedule
 
-from . import hub
-
 __all__ = ['Timer', 'Hub', 'Queues', 'Pool', 'Beat', 'StateDB', 'Consumer']
 
 ERR_B_GREEN = """\
@@ -62,20 +63,22 @@ class Hub(bootsteps.StartStopStep):
         return w.use_eventloop
 
     def create(self, w):
-        w.hub = hub.Hub(w.timer)
+        w.hub = get_event_loop()
+        if w.hub is None:
+            w.hub = set_event_loop(_Hub(w.timer))
         self._patch_thread_primitives(w)
         return w.hub
 
     def _patch_thread_primitives(self, w):
         # make clock use dummy lock
-        w.app.clock.lock = hub.DummyLock()
+        w.app.clock.lock = DummyLock()
         # multiprocessing's ApplyResult uses this lock.
         try:
             from billiard import pool
         except ImportError:
             pass
         else:
-            pool.Lock = hub.DummyLock
+            pool.Lock = DummyLock
 
 
 class Queues(bootsteps.Step):
@@ -134,7 +137,7 @@ class Pool(bootsteps.StartStopStep):
         procs = w.min_concurrency
         forking_enable = not threaded or (w.no_execv or not w.force_execv)
         if not threaded:
-            semaphore = w.semaphore = hub.BoundedSemaphore(procs)
+            semaphore = w.semaphore = LaxBoundedSemaphore(procs)
             w._quick_acquire = w.semaphore.acquire
             w._quick_release = w.semaphore.release
             max_restarts = 100

+ 1 - 1
celery/worker/consumer.py

@@ -24,6 +24,7 @@ from time import sleep
 
 from billiard.common import restart_state
 from billiard.exceptions import RestartFreqExceeded
+from kombu.async.semaphore import DummyLock
 from kombu.common import QoS, ignore_errors
 from kombu.syn import _detect_environment
 from kombu.utils.compat import get_errno
@@ -41,7 +42,6 @@ from celery.utils.text import truncate
 from celery.utils.timeutils import humanize_seconds, rate
 
 from . import heartbeat, loops, pidbox
-from .hub import DummyLock
 from .state import task_reserved, maybe_shutdown, revoked, reserved_requests
 
 try:

+ 0 - 283
celery/worker/hub.py

@@ -1,283 +0,0 @@
-# -*- coding: utf-8 -*-
-"""
-    celery.worker.hub
-    ~~~~~~~~~~~~~~~~~
-
-    Event-loop implementation.
-
-"""
-from __future__ import absolute_import
-
-from kombu.utils import cached_property
-from kombu.utils import eventio
-from kombu.utils.eventio import READ, WRITE, ERR
-
-from celery.five import items, range
-from celery.platforms import fileno
-from celery.utils.functional import maybe_list
-from celery.utils.log import get_logger
-from celery.utils.timer2 import Schedule
-
-__all__ = ['Hub', 'BoundedSemaphore', 'DummyLock']
-logger = get_logger(__name__)
-
-
-def repr_flag(flag):
-    return '{0}{1}{2}'.format('R' if flag & READ else '',
-                              'W' if flag & WRITE else '',
-                              '!' if flag & ERR else '')
-
-
-def _rcb(obj):
-    if obj is None:
-        return '<missing>'
-    if isinstance(obj, str):
-        return obj
-    return obj.__name__
-
-
-class BoundedSemaphore(object):
-    """Asynchronous Bounded Semaphore.
-
-    Bounded means that the value will stay within the specified
-    range even if released more times than it was acquired.
-
-    Example:
-
-        >>> x = BoundedSemaphore(2)
-
-        >>> def callback(i):
-        ...     say('HELLO {0!r}'.format(i))
-
-        >>> x.acquire(callback, 1)
-        HELLO 1
-
-        >>> x.acquire(callback, 2)
-        HELLO 2
-
-        >>> x.acquire(callback, 3)
-        >>> x._waiters   # private, do not access directly
-        [(callback, 3)]
-
-        >>> x.release()
-        HELLO 3
-
-    """
-
-    def __init__(self, value):
-        self.initial_value = self.value = value
-        self._waiting = []
-
-    def acquire(self, callback, *partial_args):
-        """Acquire semaphore, applying ``callback`` if
-        the resource is available.
-
-        :param callback: The callback to apply.
-        :param \*partial_args: partial arguments to callback.
-
-        """
-        if self.value <= 0:
-            self._waiting.append((callback, partial_args))
-            return False
-        else:
-            self.value = max(self.value - 1, 0)
-            callback(*partial_args)
-            return True
-
-    def release(self):
-        """Release semaphore.
-
-        If there are any waiters this will apply the first waiter
-        that is waiting for the resource (FIFO order).
-
-        """
-        self.value = min(self.value + 1, self.initial_value)
-        if self._waiting:
-            waiter, args = self._waiting.pop()
-            waiter(*args)
-
-    def grow(self, n=1):
-        """Change the size of the semaphore to accept more users."""
-        self.initial_value += n
-        self.value += n
-        [self.release() for _ in range(n)]
-
-    def shrink(self, n=1):
-        """Change the size of the semaphore to accept less users."""
-        self.initial_value = max(self.initial_value - n, 0)
-        self.value = max(self.value - n, 0)
-
-    def clear(self):
-        """Reset the sempahore, which also wipes out any waiting callbacks."""
-        self._waiting[:] = []
-        self.value = self.initial_value
-
-
-class Hub(object):
-    """Event loop object.
-
-    :keyword timer: Specify custom :class:`~celery.utils.timer2.Schedule`.
-
-    """
-    #: Flag set if reading from an fd will not block.
-    READ = READ
-
-    #: Flag set if writing to an fd will not block.
-    WRITE = WRITE
-
-    #: Flag set on error, and the fd should be read from asap.
-    ERR = ERR
-
-    #: List of callbacks to be called when the loop is initialized,
-    #: applied with the hub instance as sole argument.
-    on_init = None
-
-    #: List of callbacks to be called when the loop is exiting,
-    #: applied with the hub instance as sole argument.
-    on_close = None
-
-    #: List of callbacks to be called when a task is received.
-    #: Takes no arguments.
-    on_task = None
-
-    def __init__(self, timer=None):
-        self.timer = Schedule() if timer is None else timer
-
-        self.readers = {}
-        self.writers = {}
-        self.on_init = []
-        self.on_close = []
-        self.on_task = []
-
-        # The eventloop (in celery.worker.loops)
-        # will merge fds in this set and then instead of calling
-        # the callback for each ready fd it will call the
-        # :attr:`consolidate_callback` with the list of ready_fds
-        # as an argument.  This API is internal and is only
-        # used by the multiprocessing pool to find inqueues
-        # that are ready to write.
-        self.consolidate = set()
-        self.consolidate_callback = None
-
-    def start(self):
-        """Called by Hub bootstep at worker startup."""
-        self.poller = eventio.poll()
-
-    def stop(self):
-        """Called by Hub bootstep at worker shutdown."""
-        self.poller.close()
-
-    def init(self):
-        for callback in self.on_init:
-            callback(self)
-
-    def fire_timers(self, min_delay=1, max_delay=10, max_timers=10,
-                    propagate=()):
-        delay = None
-        if self.timer._queue:
-            for i in range(max_timers):
-                delay, entry = next(self.scheduler)
-                if entry is None:
-                    break
-                try:
-                    entry()
-                except propagate:
-                    raise
-                except Exception as exc:
-                    logger.error('Error in timer: %r', exc, exc_info=1)
-        return min(max(delay or 0, min_delay), max_delay)
-
-    def add(self, fds, callback, flags, consolidate=False):
-        for fd in maybe_list(fds, None):
-            try:
-                self._add(fd, callback, flags, consolidate)
-            except ValueError:
-                self._discard(fd)
-
-    def remove(self, fd):
-        fd = fileno(fd)
-        self._unregister(fd)
-        self._discard(fd)
-
-    def add_reader(self, fds, callback):
-        return self.add(fds, callback, READ | ERR)
-
-    def add_writer(self, fds, callback):
-        return self.add(fds, callback, WRITE)
-
-    def update_readers(self, readers):
-        [self.add_reader(*x) for x in items(readers)]
-
-    def update_writers(self, writers):
-        [self.add_writer(*x) for x in items(writers)]
-
-    def _unregister(self, fd):
-        try:
-            self.poller.unregister(fd)
-        except (KeyError, OSError):
-            pass
-
-    def close(self, *args):
-        [self._unregister(fd) for fd in self.readers]
-        self.readers.clear()
-        [self._unregister(fd) for fd in self.writers]
-        self.writers.clear()
-        for callback in self.on_close:
-            callback(self)
-
-    def _add(self, fd, cb, flags, consolidate=False):
-        self.poller.register(fd, flags)
-        (self.readers if flags & READ else self.writers)[fileno(fd)] = cb
-        if consolidate:
-            self.consolidate.add(fd)
-
-    def _discard(self, fd):
-        fd = fileno(fd)
-        self.readers.pop(fd, None)
-        self.writers.pop(fd, None)
-        self.consolidate.discard(fd)
-
-    def repr_active(self):
-        return ', '.join(self._repr_readers() + self._repr_writers())
-
-    def repr_events(self, events):
-        return ', '.join(
-            '{0}->{1}'.format(
-                _rcb(self._callback_for(fd, fl, '{0!r}(GONE)'.format(fd))),
-                repr_flag(fl),
-            )
-            for fd, fl in events
-        )
-
-    def _repr_readers(self):
-        return ['({0}){1}->{2}'.format(fd, _rcb(cb), repr_flag(READ | ERR))
-                for fd, cb in items(self.readers)]
-
-    def _repr_writers(self):
-        return ['({0}){1}->{2}'.format(fd, _rcb(cb), repr_flag(WRITE))
-                for fd, cb in items(self.writers)]
-
-    def _callback_for(self, fd, flag, *default):
-        try:
-            if flag & READ:
-                return self.readers[fileno(fd)]
-            if flag & WRITE:
-                return self.writers[fileno(fd)]
-        except KeyError:
-            if default:
-                return default[0]
-            raise
-
-    @cached_property
-    def scheduler(self):
-        return iter(self.timer)
-
-
-class DummyLock(object):
-    """Pretending to be a lock."""
-
-    def __enter__(self):
-        return self
-
-    def __exit__(self, *exc_info):
-        pass