Browse Source

Merge branch '3.0'

Conflicts:
	Changelog
	celery/datastructures.py
	celery/worker/state.py
	requirements/default.txt
	setup.cfg
Ask Solem 12 years ago
parent
commit
e08fbe474f

+ 5 - 2
celery/app/amqp.py

@@ -316,6 +316,9 @@ class AMQP(object):
     #: compat alias to Connection
     BrokerConnection = Connection
 
+    producer_cls = TaskProducer
+    consumer_cls = TaskConsumer
+
     #: Cached and prepared routing table.
     _rtable = None
 
@@ -353,7 +356,7 @@ class AMQP(object):
     def TaskConsumer(self):
         """Return consumer configured to consume from the queues
         we are configured for (``app.amqp.queues.consume_from``)."""
-        return self.app.subclass_with_self(TaskConsumer,
+        return self.app.subclass_with_self(self.consumer_cls,
                                            reverse='amqp.TaskConsumer')
     get_task_consumer = TaskConsumer  # XXX compat
 
@@ -366,7 +369,7 @@ class AMQP(object):
         """
         conf = self.app.conf
         return self.app.subclass_with_self(
-            TaskProducer,
+            self.producer_cls,
             reverse='amqp.TaskProducer',
             exchange=self.default_exchange,
             routing_key=conf.CELERY_DEFAULT_ROUTING_KEY,

+ 0 - 13
celery/app/task.py

@@ -769,19 +769,6 @@ class Task(object):
                 not getattr(self, 'disable_error_emails', None):
             self.ErrorMail(self, **kwargs).send(context, exc)
 
-    def execute(self, request, pool, loglevel, logfile, **kwargs):
-        """The method the worker calls to execute the task.
-
-        :param request: A :class:`~celery.worker.job.Request`.
-        :param pool: A task pool.
-        :param loglevel: Current loglevel.
-        :param logfile: Name of the currently used logfile.
-
-        :keyword consumer: The :class:`~celery.worker.consumer.Consumer`.
-
-        """
-        request.execute_using_pool(pool, loglevel, logfile)
-
     def push_request(self, *args, **kwargs):
         self.request_stack.push(Context(*args, **kwargs))
 

+ 49 - 20
celery/datastructures.py

@@ -12,6 +12,7 @@ import sys
 import time
 
 from collections import defaultdict, MutableMapping
+from heapq import heapify, heappush, heappop
 from functools import partial
 from itertools import chain
 from operator import itemgetter
@@ -526,47 +527,75 @@ class LimitedSet(object):
     :keyword expires: Time in seconds, before a membership expires.
 
     """
-    __slots__ = ('maxlen', 'expires', '_data', '__len__')
+    __slots__ = ('maxlen', 'expires', '_data', '__len__', '_heap')
 
-    def __init__(self, maxlen=None, expires=None, data=None):
+    def __init__(self, maxlen=None, expires=None, data=None, heap=None):
         self.maxlen = maxlen
         self.expires = expires
         self._data = {} if data is None else data
+        self._heap = [] if heap is None else heap
         self.__len__ = self._data.__len__
 
     def add(self, value):
         """Add a new member."""
-        self._expire_item()
-        self._data[value] = time.time()
+        self.purge(1)
+        now = time.time()
+        self._data[value] = now
+        heappush(self._heap, (now, value))
+
+    def __reduce__(self):
+        return self.__class__, (
+            self.maxlen, self.expires, self._data, self._heap,
+        )
 
     def clear(self):
         """Remove all members"""
         self._data.clear()
+        self._heap[:] = []
 
     def pop_value(self, value):
         """Remove membership by finding value."""
+        try:
+            itime = self._data[value]
+        except KeyError:
+            return
+        try:
+            self._heap.remove((value, itime))
+        except ValueError:
+            pass
         self._data.pop(value, None)
 
     def _expire_item(self):
         """Hunt down and remove an expired item."""
-        while 1:
-            if self.maxlen and len(self) >= self.maxlen:
-                value, when = self.first
-                if not self.expires or time.time() > when + self.expires:
-                    try:
-                        self.pop_value(value)
-                    except TypeError:  # pragma: no cover
-                        continue
-            break
+        self.purge(1)
 
     def __contains__(self, value):
         return value in self._data
 
-    def update(self, other):
-        if isinstance(other, dict):
-            self._data.update(other)
-        elif isinstance(other, self.__class__):
+    def purge(self, limit=None):
+        H, maxlen = self._heap, self.maxlen
+        if not maxlen:
+            return
+        i = 0
+        while len(self) >= maxlen:
+            if limit and i > limit:
+                break
+            try:
+                item = heappop(H)
+            except IndexError:
+                break
+            if self.expires:
+                if time.time() < item[0] + self.expires:
+                    heappush(H, item)
+                    break
+            self._data.pop(item[1])
+            i += 1
+
+    def update(self, other, heappush=heappush):
+        if isinstance(other, self.__class__):
             self._data.update(other._data)
+            self._heap.extend(other._heap)
+            heapify(self._heap)
         else:
             for obj in other:
                 self.add(obj)
@@ -578,13 +607,13 @@ class LimitedSet(object):
         return iter(self._data)
 
     def __repr__(self):
-        return 'LimitedSet({0!r})'.format(list(self._data))
+        return 'LimitedSet(%s)' % (repr(list(self._data))[:100], )
 
     @property
     def chronologically(self):
-        return sorted(self._data.items(), key=itemgetter(1))
+        return [value for _, value in self._heap]
 
     @property
     def first(self):
         """Get the oldest member."""
-        return self.chronologically[0]
+        return self._heap[0][1]

+ 1 - 1
celery/result.py

@@ -222,7 +222,7 @@ class AsyncResult(ResultBase):
         return self.__class__, self.__reduce_args__()
 
     def __reduce_args__(self):
-        return self.id, self.backend, self.task_name, self.parent
+        return self.id, self.backend, self.task_name, None, self.parent
 
     @cached_property
     def graph(self):

+ 9 - 4
celery/worker/state.py

@@ -154,15 +154,20 @@ class Persistent(object):
         self.close()
 
     def merge(self, d):
-        revoked.update(d.get('revoked') or {})
+        saved = d.get('revoked') or LimitedSet()
+        if isinstance(saved, LimitedSet):
+            revoked.update(saved)
+        else:
+            # (pre 3.0.18) used to be stored as dict
+            for item in saved:
+                revoked.add(item)
         if self.clock:
             d['clock'] = self.clock.adjust(d.get('clock') or 0)
         return d
 
     def sync(self, d):
-        prev = d.get('revoked') or {}
-        prev.update(revoked.as_dict())
-        d['revoked'] = prev
+        revoked.purge()
+        d['revoked'] = revoked
         if self.clock:
             d['clock'] = self.clock.forward()
         return d

+ 29 - 0
docs/history/changelog-3.0.rst

@@ -9,6 +9,35 @@
 
 If you're looking for versions prior to 3.0.x you should go to :ref:`history`.
 
+.. _version-3.0.18:
+
+3.0.18
+======
+:release-date: 2013-04-09 04:00:00 P.M BST
+
+- Now depends on :mod:`billiard` 2.7.3.25.
+
+- Now depends on :mod:`kombu` 2.5.9.
+
+- Worker/statedb: Now uses pickle protocol 2 (Py2.5+)
+
+- :class:`~celery.app.utils.ConfigurationView` is now a ``MutableMapping``.
+
+    Contributed by Aaron Harnly.
+
+- Fixed memory leak in LRU cache implementation.
+
+    Fix contributed by Romuald Brunet.
+
+- ``celery.contrib.rdb``: Now works when sockets are in non-blocking mode.
+
+    Fix contributed by Theo Spears.
+
+- Canvas list operations now takes application instance from the first
+  task in the list, instead of depending on the ``current_app`` (Issue #1249).
+
+
+
 .. _version-3.0.17:
 
 3.0.17

+ 2 - 2
requirements/default.txt

@@ -1,3 +1,3 @@
 pytz
-billiard>=2.7.3.23
-kombu>=2.5.8
+billiard>=2.7.3.25
+kombu>=2.5.9

+ 2 - 2
setup.cfg

@@ -15,5 +15,5 @@ upload-dir = docs/.build/html
 
 [bdist_rpm]
 requires = pytz
-           billiard >= 2.7.3.23
-           kombu >= 2.5.8
+           billiard >= 2.7.3.25
+           kombu >= 2.5.9