Browse Source

celery.utils.dispatch.Signal: Replaces django.dispatch.Signal

Ask Solem 15 years ago
parent
commit
7203c97062

+ 1 - 1
celery/signals.py

@@ -1,4 +1,4 @@
-from django.dispatch import Signal
+from celery.utils.dispatch import Signal
 
 task_sent = Signal(providing_args=["task_id", "task",
                                    "args", "kwargs",

+ 1 - 0
celery/utils/dispatch/__init__.py

@@ -0,0 +1 @@
+from celery.utils.dispatch.signal import Signal

+ 36 - 0
celery/utils/dispatch/license.txt

@@ -0,0 +1,36 @@
+django.dispatch was originally forked from PyDispatcher.
+
+PyDispatcher License:
+
+    Copyright (c) 2001-2003, Patrick K. O'Brien and Contributors
+    All rights reserved.
+    
+    Redistribution and use in source and binary forms, with or without
+    modification, are permitted provided that the following conditions
+    are met:
+    
+        Redistributions of source code must retain the above copyright
+        notice, this list of conditions and the following disclaimer.
+    
+        Redistributions in binary form must reproduce the above
+        copyright notice, this list of conditions and the following
+        disclaimer in the documentation and/or other materials
+        provided with the distribution.
+    
+        The name of Patrick K. O'Brien, or the name of any Contributor,
+        may not be used to endorse or promote products derived from this 
+        software without specific prior written permission.
+    
+    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+    ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+    LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
+    FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
+    COPYRIGHT HOLDERS AND CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+    INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+    (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+    SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
+    HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
+    STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+    ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
+    OF THE POSSIBILITY OF SUCH DAMAGE. 
+

+ 275 - 0
celery/utils/dispatch/saferef.py

@@ -0,0 +1,275 @@
+"""
+"Safe weakrefs", originally from pyDispatcher.
+
+Provides a way to safely weakref any function, including bound methods (which
+aren't handled by the core weakref module).
+"""
+
+import weakref
+import traceback
+
+
+def safe_ref(target, on_delete=None):
+    """Return a *safe* weak reference to a callable target
+
+    :param target: the object to be weakly referenced, if it's a
+        bound method reference, will create a :class:`BoundMethodWeakref`,
+        otherwise creates a simple :class:`weakref.ref`.
+
+    :keyword on_delete: if provided, will have a hard reference stored
+        to the callable to be called after the safe reference
+        goes out of scope with the reference object, (either a
+        :class:`weakref.ref` or a :class:`BoundMethodWeakref`) as argument.
+    """
+    if getattr(target, "im_self", None) is not None:
+        # Turn a bound method into a BoundMethodWeakref instance.
+        # Keep track of these instances for lookup by disconnect().
+        assert hasattr(target, 'im_func'), \
+            """safe_ref target %r has im_self, but no im_func, " \
+            "don't know how to create reference""" % (target, )
+        return get_bound_method_weakref(target=target,
+                                        on_delete=on_delete)
+    if callable(on_delete):
+        return weakref.ref(target, on_delete)
+    else:
+        return weakref.ref(target)
+
+
+class BoundMethodWeakref(object):
+    """'Safe' and reusable weak references to instance methods.
+
+    BoundMethodWeakref objects provide a mechanism for
+    referencing a bound method without requiring that the
+    method object itself (which is normally a transient
+    object) is kept alive.  Instead, the BoundMethodWeakref
+    object keeps weak references to both the object and the
+    function which together define the instance method.
+
+    .. attribute:: key
+        the identity key for the reference, calculated
+        by the class's :meth:`calculate_key` method applied to the
+        target instance method
+
+    .. attribute:: deletion_methods
+
+        sequence of callable objects taking
+        single argument, a reference to this object which
+        will be called when *either* the target object or
+        target function is garbage collected (i.e. when
+        this object becomes invalid).  These are specified
+        as the on_delete parameters of :func:`safe_ref` calls.
+
+    .. attribute:: weak_self
+        weak reference to the target object
+
+    .. attribute:: weak_func
+        weak reference to the target function
+
+    .. attribute:: _all_instances
+        class attribute pointing to all live
+        BoundMethodWeakref objects indexed by the class's
+        :meth:`calculate_key(target)` method applied to the target
+        objects. This weak value dictionary is used to
+        short-circuit creation so that multiple references
+        to the same (object, function) pair produce the
+        same BoundMethodWeakref instance.
+
+    """
+
+    _all_instances = weakref.WeakValueDictionary()
+
+    def __new__(cls, target, on_delete=None, *arguments, **named):
+        """Create new instance or return current instance
+
+        Basically this method of construction allows us to
+        short-circuit creation of references to already-
+        referenced instance methods.  The key corresponding
+        to the target is calculated, and if there is already
+        an existing reference, that is returned, with its
+        deletionMethods attribute updated.  Otherwise the
+        new instance is created and registered in the table
+        of already-referenced methods.
+
+        """
+        key = cls.calculate_key(target)
+        current = cls._all_instances.get(key)
+        if current is not None:
+            current.deletion_methods.append(on_delete)
+            return current
+        else:
+            base = super(BoundMethodWeakref, cls).__new__(cls)
+            cls._all_instances[key] = base
+            base.__init__(target, on_delete, *arguments, **named)
+            return base
+
+    def __init__(self, target, on_delete=None):
+        """Return a weak-reference-like instance for a bound method
+
+        :param target: the instance-method target for the weak
+            reference, must have ``im_self`` and ``im_func`` attributes
+            and be reconstructable via::
+
+                target.im_func.__get__(target.im_self)
+
+            which is true of built-in instance methods.
+
+        :keyword on_delete: optional callback which will be called
+            when this weak reference ceases to be valid
+            (i.e. either the object or the function is garbage
+            collected).  Should take a single argument,
+            which will be passed a pointer to this object.
+
+        """
+        def remove(weak, self=self):
+            """Set self.is_dead to true when method or instance is destroyed"""
+            methods = self.deletion_methods[:]
+            del(self.deletion_methods[:])
+            try:
+                del(self.__class__._all_instances[self.key])
+            except KeyError:
+                pass
+            for function in methods:
+                try:
+                    if callable(function):
+                        function(self)
+                except Exception, exc:
+                    try:
+                        traceback.print_exc()
+                    except AttributeError, err:
+                        print("Exception during saferef %s cleanup function "
+                              "%s: %s" % (self, function, exc))
+
+        self.deletion_methods = [on_delete]
+        self.key = self.calculate_key(target)
+        self.weak_self = weakref.ref(target.im_self, remove)
+        self.weak_func = weakref.ref(target.im_func, remove)
+        self.self_name = str(target.im_self)
+        self.func_name = str(target.im_func.__name__)
+
+    def calculate_key(cls, target):
+        """Calculate the reference key for this reference
+
+        Currently this is a two-tuple of the ``id()``'s of the
+        target object and the target function respectively.
+        """
+        return id(target.im_self), id(target.im_func)
+    calculate_key = classmethod(calculate_key)
+
+    def __str__(self):
+        """Give a friendly representation of the object"""
+        return """%s( %s.%s )""" % (
+            self.__class__.__name__,
+            self.self_name,
+            self.func_name,
+        )
+
+    __repr__ = __str__
+
+    def __nonzero__(self):
+        """Whether we are still a valid reference"""
+        return self() is not None
+
+    def __cmp__(self, other):
+        """Compare with another reference"""
+        if not isinstance(other, self.__class__):
+            return cmp(self.__class__, type(other))
+        return cmp(self.key, other.key)
+
+    def __call__(self):
+        """Return a strong reference to the bound method
+
+        If the target cannot be retrieved, then will
+        return None, otherwise returns a bound instance
+        method for our object and function.
+
+        Note:
+            You may call this method any number of times,
+            as it does not invalidate the reference.
+        """
+        target = self.weak_self()
+        if target is not None:
+            function = self.weak_func()
+            if function is not None:
+                return function.__get__(target)
+        return None
+
+class BoundNonDescriptorMethodWeakref(BoundMethodWeakref):
+    """A specialized :class:`BoundMethodWeakref`, for platforms where
+    instance methods are not descriptors.
+
+    It assumes that the function name and the target attribute name are the
+    same, instead of assuming that the function is a descriptor. This approach
+    is equally fast, but not 100% reliable because functions can be stored on an
+    attribute named differenty than the function's name such as in::
+
+        >>> class A(object):
+        ...     pass
+
+        >>> def foo(self):
+        ...     return "foo"
+        >>> A.bar = foo
+
+    But this shouldn't be a common use case. So, on platforms where methods
+    aren't descriptors (such as Jython) this implementation has the advantage
+    of working in the most cases.
+
+    """
+    def __init__(self, target, on_delete=None):
+        """Return a weak-reference-like instance for a bound method
+
+        :param target: the instance-method target for the weak
+            reference, must have ``im_self`` and ``im_func`` attributes
+            and be reconstructable via::
+
+                target.im_func.__get__(target.im_self)
+
+            which is true of built-in instance methods.
+
+        :keyword on_delete: optional callback which will be called
+            when this weak reference ceases to be valid
+            (i.e. either the object or the function is garbage
+            collected). Should take a single argument,
+            which will be passed a pointer to this object.
+
+        """
+        assert getattr(target.im_self, target.__name__) == target, \
+               "method %s isn't available as the attribute %s of %s" % (
+                    target, target.__name__, target.im_self)
+        super(BoundNonDescriptorMethodWeakref, self).__init__(target,
+                                                              on_delete)
+
+    def __call__(self):
+        """Return a strong reference to the bound method
+
+        If the target cannot be retrieved, then will
+        return None, otherwise returns a bound instance
+        method for our object and function.
+
+        Note:
+            You may call this method any number of times,
+            as it does not invalidate the reference.
+
+        """
+        target = self.weak_self()
+        if target is not None:
+            function = self.weak_func()
+            if function is not None:
+                # Using curry() would be another option, but it erases the
+                # "signature" of the function. That is, after a function is
+                # curried, the inspect module can't be used to determine how
+                # many arguments the function expects, nor what keyword
+                # arguments it supports, and pydispatcher needs this
+                # information.
+                return getattr(target, function.__name__)
+        return None
+
+
+def get_bound_method_weakref(target, on_delete):
+    """Instantiates the appropiate :class:`BoundMethodWeakRef`, depending
+    on the details of the underlying class method implementation."""
+    if hasattr(target, '__get__'):
+        # target method is a descriptor, so the default implementation works:
+        return BoundMethodWeakref(target=target, on_delete=on_delete)
+    else:
+        # no luck, use the alternative implementation:
+        return BoundNonDescriptorMethodWeakref(target=target, on_delete=on_delete)

+ 209 - 0
celery/utils/dispatch/signal.py

@@ -0,0 +1,209 @@
+"""Signal class."""
+
+import weakref
+try:
+    set
+except NameError:
+    from sets import Set as set # Python 2.3 fallback
+
+from celery.utils.dispatch import saferef
+
+WEAKREF_TYPES = (weakref.ReferenceType, saferef.BoundMethodWeakref)
+
+
+def _make_id(target):
+    if hasattr(target, 'im_func'):
+        return (id(target.im_self), id(target.im_func))
+    return id(target)
+
+
+class Signal(object):
+    """Base class for all signals
+
+
+    .. attribute:: receivers
+        Internal attribute, holds a dictionary of
+        ``{receriverkey (id): weakref(receiver)}`` mappings.
+
+    """
+
+    def __init__(self, providing_args=None):
+        """Create a new signal.
+
+        :param providing_args: A list of the arguments this signal can pass
+            along in a :meth:`send` call.
+
+        """
+        self.receivers = []
+        if providing_args is None:
+            providing_args = []
+        self.providing_args = set(providing_args)
+
+    def connect(self, receiver, sender=None, weak=True, dispatch_uid=None):
+        """Connect receiver to sender for signal.
+
+        :param receiver: A function or an instance method which is to
+            receive signals. Receivers must be hashable objects.
+
+            if weak is ``True``, then receiver must be weak-referencable (more
+            precisely :func:`saferef.safe_ref()` must be able to create a reference
+            to the receiver).
+
+            Receivers must be able to accept keyword arguments.
+
+            If receivers have a ``dispatch_uid`` attribute, the receiver will
+            not be added if another receiver already exists with that
+            ``dispatch_uid``.
+
+        :keyword sender: The sender to which the receiver should respond.
+            Must either be of type :class:`Signal`, or ``None`` to receive
+            events from any sender.
+
+        :keyword weak: Whether to use weak references to the receiver.
+            By default, the module will attempt to use weak references to the
+            receiver objects. If this parameter is false, then strong
+            references will be used.
+
+        :keyword dispatch_uid: An identifier used to uniquely identify a
+            particular instance of a receiver. This will usually be a
+            string, though it may be anything hashable.
+
+        """
+        if dispatch_uid:
+            lookup_key = (dispatch_uid, _make_id(sender))
+        else:
+            lookup_key = (_make_id(receiver), _make_id(sender))
+
+        if weak:
+            receiver = saferef.safe_ref(receiver, on_delete=self._remove_receiver)
+
+        for r_key, _ in self.receivers:
+            if r_key == lookup_key:
+                break
+        else:
+            self.receivers.append((lookup_key, receiver))
+
+    def disconnect(self, receiver=None, sender=None, weak=True, dispatch_uid=None):
+        """Disconnect receiver from sender for signal.
+
+        If weak references are used, disconnect need not be called. The receiver
+        will be remove from dispatch automatically.
+
+        :keyword receiver: The registered receiver to disconnect. May be
+            none if ``dispatch_uid`` is specified.
+
+        :keyword sender: The registered sender to disconnect.
+
+        :keyword weak: The weakref state to disconnect.
+
+        :keyword dispatch_uid: the unique identifier of the receiver
+            to disconnect
+
+        """
+        if dispatch_uid:
+            lookup_key = (dispatch_uid, _make_id(sender))
+        else:
+            lookup_key = (_make_id(receiver), _make_id(sender))
+
+        for index in xrange(len(self.receivers)):
+            (r_key, _) = self.receivers[index]
+            if r_key == lookup_key:
+                del self.receivers[index]
+                break
+
+    def send(self, sender, **named):
+        """Send signal from sender to all connected receivers.
+
+        If any receiver raises an error, the error propagates back through send,
+        terminating the dispatch loop, so it is quite possible to not have all
+        receivers called if a raises an error.
+
+        :param sender: The sender of the signal. Either a specific
+            object or ``None``.
+
+        :keyword \*\*named: Named arguments which will be passed to receivers.
+
+        :returns: a list of tuple pairs: ``[(receiver, response), ... ]``.
+
+        """
+        responses = []
+        if not self.receivers:
+            return responses
+
+        for receiver in self._live_receivers(_make_id(sender)):
+            response = receiver(signal=self, sender=sender, **named)
+            responses.append((receiver, response))
+        return responses
+
+    def send_robust(self, sender, **named):
+        """Send signal from sender to all connected receivers catching errors.
+
+        :param sender: The sender of the signal. Can be any python object
+            (normally one registered with a connect if you actually want
+            something to occur).
+
+        :keyword \*\*named: Named arguments which will be passed to receivers.
+            These arguments must be a subset of the argument names defined in
+            :attr:`providing_args`.
+
+        :returns: a list of tuple pairs: ``[(receiver, response), ... ]``.
+
+        :raises DispatcherKeyError:
+
+        if any receiver raises an error (specifically any subclass of
+        :exc:`Exception`), the error instance is returned as the result for that
+        receiver.
+
+        """
+        responses = []
+        if not self.receivers:
+            return responses
+
+        # Call each receiver with whatever arguments it can accept.
+        # Return a list of tuple pairs [(receiver, response), ... ].
+        for receiver in self._live_receivers(_make_id(sender)):
+            try:
+                response = receiver(signal=self, sender=sender, **named)
+            except Exception, err:
+                responses.append((receiver, err))
+            else:
+                responses.append((receiver, response))
+        return responses
+
+    def _live_receivers(self, senderkey):
+        """Filter sequence of receivers to get resolved, live receivers.
+
+        This checks for weak references and resolves them, then returning only
+        live receivers.
+
+        """
+        none_senderkey = _make_id(None)
+        receivers = []
+
+        for (receiverkey, r_senderkey), receiver in self.receivers:
+            if r_senderkey == none_senderkey or r_senderkey == senderkey:
+                if isinstance(receiver, WEAKREF_TYPES):
+                    # Dereference the weak reference.
+                    receiver = receiver()
+                    if receiver is not None:
+                        receivers.append(receiver)
+                else:
+                    receivers.append(receiver)
+        return receivers
+
+    def _remove_receiver(self, receiver):
+        """Remove dead receivers from connections."""
+
+        to_remove = []
+        for key, connected_receiver in self.receivers:
+            if connected_receiver == receiver:
+                to_remove.append(key)
+        for key in to_remove:
+            for idx, (r_key, _) in enumerate(self.receivers):
+                if r_key == key:
+                    del self.receivers[idx]
+
+    def __repr__(self):
+        return '<Signal: %s>' % (self.__class__.__name__, )
+
+    __str__ = __repr__