Sfoglia il codice sorgente

Merged from upstream.

Branko Čibej 14 anni fa
parent
commit
74e5b8512e

+ 1 - 0
AUTHORS

@@ -63,3 +63,4 @@ Ordered by date of first contribution:
   Roberto Gaiser <gaiser@geekbunker.org>
   Balachandran C <balachandran.c@gramvaani.org>
   Kevin Tran <hekevintran@gmail.com>
+  Branko Čibej <brane@apache.org>

+ 11 - 2
celery/backends/amqp.py

@@ -4,6 +4,7 @@ import socket
 import time
 
 from datetime import timedelta
+from itertools import count
 
 from kombu.entity import Exchange, Queue
 from kombu.messaging import Consumer, Producer
@@ -14,6 +15,10 @@ from celery.exceptions import TimeoutError
 from celery.utils import timeutils
 
 
+class BacklogLimitExceeded(Exception):
+    """Too much state history to fast-forward."""
+
+
 def repair_uuid(s):
     # Historically the dashes in UUIDS are removed from AMQ entity names,
     # but there is no known reason to.  Hopefully we'll be able to fix
@@ -28,6 +33,8 @@ class AMQPBackend(BaseDictBackend):
     Consumer = Consumer
     Producer = Producer
 
+    BacklogLimitExceeded = BacklogLimitExceeded
+
     _pool = None
     _pool_owner_pid = None
 
@@ -139,17 +146,19 @@ class AMQPBackend(BaseDictBackend):
         else:
             return self.wait_for(task_id, timeout, cache)
 
-    def poll(self, task_id):
+    def poll(self, task_id, backlog_limit=100):
         conn = self.pool.acquire(block=True)
         channel = conn.channel()
         try:
             binding = self._create_binding(task_id)(channel)
             binding.declare()
             latest, acc = None, None
-            while 1:  # fetch the last state
+            for i in count():  # fast-forward
                 latest, acc = acc, binding.get(no_ack=True)
                 if not acc:
                     break
+                if i > backlog_limit:
+                    raise self.BacklogLimitExceeded(task_id)
             if latest:
                 payload = self._cache[task_id] = latest.payload
                 return payload

+ 2 - 2
celery/result.py

@@ -447,8 +447,8 @@ class EagerResult(BaseAsyncResult):
                                  self._state, self._traceback))
 
     def __copy__(self):
-        cls, attrs = self.__reduce__()
-        return cls(*attrs)
+        cls, args = self.__reduce__()
+        return cls(*args)
 
     def successful(self):
         """Returns :const:`True` if the task executed without failure."""

+ 2 - 1
celery/task/base.py

@@ -34,7 +34,8 @@ _default_context = {"logfile": None,
                     "kwargs": None,
                     "retries": 0,
                     "is_eager": False,
-                    "delivery_info": None}
+                    "delivery_info": None,
+                    "taskset": None}
 
 
 class Context(threading.local):

+ 1 - 1
celery/tests/test_backends/test_amqp.py

@@ -135,7 +135,7 @@ class test_AMQPBackend(unittest.TestCase):
             def declare(self):
                 pass
 
-            def get(self):
+            def get(self, no_ack=False):
                 if self.get_returns[0]:
                     class Object(object):
                         payload = {"status": "STARTED",

+ 1 - 2
docs/userguide/tasks.rst

@@ -49,8 +49,7 @@ attributes:
 
 :id: The unique id of the executing task.
 
-:taskset: The unique id of the taskset that owns the executing task.
-          This key exists only if the task is a member of a taskset.
+:taskset: The unique id of the taskset this task is a member of (if any).
 
 :args: Positional arguments.