Ask Solem 16 年之前
父节点
当前提交
2f4e8fb12b

+ 4 - 4
celery/backends/base.py

@@ -213,10 +213,10 @@ class KeyValueStoreBackend(BaseBackend):
 
 
     def get(self, key):
     def get(self, key):
         raise NotImplementedError("Must implement the get method.")
         raise NotImplementedError("Must implement the get method.")
-    
+
     def set(self, key, value):
     def set(self, key, value):
         raise NotImplementedError("Must implement the set method.")
         raise NotImplementedError("Must implement the set method.")
-    
+
     def store_result(self, task_id, result, status):
     def store_result(self, task_id, result, status):
         """Store task result and status."""
         """Store task result and status."""
         if status == "DONE":
         if status == "DONE":
@@ -229,7 +229,7 @@ class KeyValueStoreBackend(BaseBackend):
     def get_status(self, task_id):
     def get_status(self, task_id):
         """Get the status of a task."""
         """Get the status of a task."""
         return self._get_task_meta_for(task_id)["status"]
         return self._get_task_meta_for(task_id)["status"]
-    
+
     def get_result(self, task_id):
     def get_result(self, task_id):
         """Get the result of a task."""
         """Get the result of a task."""
         meta = self._get_task_meta_for(task_id)
         meta = self._get_task_meta_for(task_id)
@@ -237,7 +237,7 @@ class KeyValueStoreBackend(BaseBackend):
             return self.exception_to_python(meta["result"])
             return self.exception_to_python(meta["result"])
         else:
         else:
             return meta["result"]
             return meta["result"]
-    
+
     def is_done(self, task_id):
     def is_done(self, task_id):
         """Returns ``True`` if the task executed successfully."""
         """Returns ``True`` if the task executed successfully."""
         return self.get_status(task_id) == "DONE"
         return self.get_status(task_id) == "DONE"

+ 0 - 1
celery/backends/cache.py

@@ -11,4 +11,3 @@ class Backend(KeyValueStoreBackend):
 
 
     def set(self, key, value):
     def set(self, key, value):
         cache.set(key, value)
         cache.set(key, value)
-

+ 2 - 3
celery/backends/tyrant.py

@@ -49,10 +49,10 @@ class Backend(KeyValueStoreBackend):
     def open(self):
     def open(self):
         """Get :class:`pytyrant.PyTyrant`` instance with the current
         """Get :class:`pytyrant.PyTyrant`` instance with the current
         server configuration.
         server configuration.
-        
+
         The connection is then cached until you do an
         The connection is then cached until you do an
         explicit :meth:`close`.
         explicit :meth:`close`.
-        
+
         """
         """
         # connection overrides bool()
         # connection overrides bool()
         if self._connection is None:
         if self._connection is None:
@@ -75,4 +75,3 @@ class Backend(KeyValueStoreBackend):
 
 
     def set(self, key, value):
     def set(self, key, value):
         self.open()[key] = value
         self.open()[key] = value
-

+ 1 - 3
celery/messaging.py

@@ -43,6 +43,7 @@ class TaskPublisher(Publisher):
 
 
     def retry_task(self, task_name, task_id, delivery_info, **kwargs):
     def retry_task(self, task_name, task_id, delivery_info, **kwargs):
         kwargs["routing_key"] = delivery_info.get("routing_key")
         kwargs["routing_key"] = delivery_info.get("routing_key")
+        kwargs["retries"] = kwargs.get("retries", 0) + 1
         self._delay_task(task_name, task_id, **kwargs)
         self._delay_task(task_name, task_id, **kwargs)
 
 
     def _delay_task(self, task_name, task_id=None, part_of_set=None,
     def _delay_task(self, task_name, task_id=None, part_of_set=None,
@@ -90,6 +91,3 @@ class StatsConsumer(Consumer):
     exchange_type = "direct"
     exchange_type = "direct"
     decoder = pickle.loads
     decoder = pickle.loads
     no_ack=True
     no_ack=True
-
-    def receive(self, message_data, message):
-        pass

+ 1 - 1
celery/pool.py

@@ -17,7 +17,7 @@ from functools import partial as curry
 class DynamicPool(Pool):
 class DynamicPool(Pool):
     """Version of :class:`multiprocessing.Pool` that can dynamically grow
     """Version of :class:`multiprocessing.Pool` that can dynamically grow
     in size."""
     in size."""
-    
+
     def __init__(self, processes=None, initializer=None, initargs=()):
     def __init__(self, processes=None, initializer=None, initargs=()):
         super(DynamicPool, self).__init__(processes=processes,
         super(DynamicPool, self).__init__(processes=processes,
                                           initializer=initializer,
                                           initializer=initializer,

+ 1 - 1
celery/result.py

@@ -329,6 +329,6 @@ class EagerResult(BaseAsyncResult):
     def status(self):
     def status(self):
         """The tasks status"""
         """The tasks status"""
         return self._status
         return self._status
-    
+
     def __repr__(self):
     def __repr__(self):
         return "<EagerResult: %s>" % self.task_id
         return "<EagerResult: %s>" % self.task_id

+ 1 - 0
celery/task/base.py

@@ -218,6 +218,7 @@ class Task(object):
         """
         """
         return apply(cls, args, kwargs, **options)
         return apply(cls, args, kwargs, **options)
 
 
+
 class TaskSet(object):
 class TaskSet(object):
     """A task containing several subtasks, making it possible
     """A task containing several subtasks, making it possible
     to track how many, or when all of the tasks has been completed.
     to track how many, or when all of the tasks has been completed.

+ 1 - 1
celery/tests/test_backends/test_cache.py

@@ -33,7 +33,7 @@ class TestCacheBackend(unittest.TestCase):
 
 
     def test_is_pickled(self):
     def test_is_pickled(self):
         cb = CacheBackend()
         cb = CacheBackend()
-    
+
         tid2 = gen_unique_id()
         tid2 = gen_unique_id()
         result = {"foo": "baz", "bar": SomeClass(12345)}
         result = {"foo": "baz", "bar": SomeClass(12345)}
         cb.mark_as_done(tid2, result)
         cb.mark_as_done(tid2, result)

+ 2 - 2
celery/tests/test_backends/test_tyrant.py

@@ -39,7 +39,7 @@ class TestTyrantBackend(unittest.TestCase):
     def test_cached_connection(self):
     def test_cached_connection(self):
         tb = get_tyrant_or_None()
         tb = get_tyrant_or_None()
         if not tb:
         if not tb:
-            return # Skip test 
+            return # Skip test
 
 
         self.assertTrue(tb._connection is not None)
         self.assertTrue(tb._connection is not None)
         tb.close()
         tb.close()
@@ -69,7 +69,7 @@ class TestTyrantBackend(unittest.TestCase):
         tb = get_tyrant_or_None()
         tb = get_tyrant_or_None()
         if not tb:
         if not tb:
             return
             return
-    
+
         tid2 = gen_unique_id()
         tid2 = gen_unique_id()
         result = {"foo": "baz", "bar": SomeClass(12345)}
         result = {"foo": "baz", "bar": SomeClass(12345)}
         tb.mark_as_done(tid2, result)
         tb.mark_as_done(tid2, result)

+ 11 - 11
celery/tests/test_result.py

@@ -39,14 +39,14 @@ class TestAsyncResult(unittest.TestCase):
 
 
         self.assertTrue(ok_res.is_done())
         self.assertTrue(ok_res.is_done())
         self.assertFalse(nok_res.is_done())
         self.assertFalse(nok_res.is_done())
-    
+
     def test_sucessful(self):
     def test_sucessful(self):
         ok_res = AsyncResult(self.task1["id"])
         ok_res = AsyncResult(self.task1["id"])
         nok_res = AsyncResult(self.task3["id"])
         nok_res = AsyncResult(self.task3["id"])
 
 
         self.assertTrue(ok_res.successful())
         self.assertTrue(ok_res.successful())
         self.assertFalse(nok_res.successful())
         self.assertFalse(nok_res.successful())
-       
+
     def test_str(self):
     def test_str(self):
         ok_res = AsyncResult(self.task1["id"])
         ok_res = AsyncResult(self.task1["id"])
         ok2_res = AsyncResult(self.task2["id"])
         ok2_res = AsyncResult(self.task2["id"])
@@ -54,7 +54,7 @@ class TestAsyncResult(unittest.TestCase):
         self.assertEquals(str(ok_res), self.task1["id"])
         self.assertEquals(str(ok_res), self.task1["id"])
         self.assertEquals(str(ok2_res), self.task2["id"])
         self.assertEquals(str(ok2_res), self.task2["id"])
         self.assertEquals(str(nok_res), self.task3["id"])
         self.assertEquals(str(nok_res), self.task3["id"])
-    
+
     def test_repr(self):
     def test_repr(self):
         ok_res = AsyncResult(self.task1["id"])
         ok_res = AsyncResult(self.task1["id"])
         ok2_res = AsyncResult(self.task2["id"])
         ok2_res = AsyncResult(self.task2["id"])
@@ -97,7 +97,7 @@ class TestTaskSetResult(unittest.TestCase):
 
 
         for i, t in enumerate(it):
         for i, t in enumerate(it):
             self.assertEquals(t.get(), i)
             self.assertEquals(t.get(), i)
-    
+
     def test___iter__(self):
     def test___iter__(self):
 
 
         it = iter(self.ts)
         it = iter(self.ts)
@@ -111,16 +111,16 @@ class TestTaskSetResult(unittest.TestCase):
 
 
     def test_successful(self):
     def test_successful(self):
         self.assertTrue(self.ts.successful())
         self.assertTrue(self.ts.successful())
-    
+
     def test_failed(self):
     def test_failed(self):
         self.assertFalse(self.ts.failed())
         self.assertFalse(self.ts.failed())
-    
+
     def test_waiting(self):
     def test_waiting(self):
         self.assertFalse(self.ts.waiting())
         self.assertFalse(self.ts.waiting())
 
 
     def test_ready(self):
     def test_ready(self):
         self.assertTrue(self.ts.ready())
         self.assertTrue(self.ts.ready())
-    
+
     def test_completed_count(self):
     def test_completed_count(self):
         self.assertEquals(self.ts.completed_count(), self.ts.total)
         self.assertEquals(self.ts.completed_count(), self.ts.total)
 
 
@@ -143,7 +143,7 @@ class TestFailedTaskSetResult(TestTaskSetResult):
         save_result(failed)
         save_result(failed)
         failed_res = AsyncResult(failed["id"])
         failed_res = AsyncResult(failed["id"])
         self.ts = TaskSetResult(gen_unique_id(), subtasks + [failed_res])
         self.ts = TaskSetResult(gen_unique_id(), subtasks + [failed_res])
-    
+
     def test_itersubtasks(self):
     def test_itersubtasks(self):
 
 
         it = self.ts.itersubtasks()
         it = self.ts.itersubtasks()
@@ -166,10 +166,10 @@ class TestFailedTaskSetResult(TestTaskSetResult):
 
 
     def test_join(self):
     def test_join(self):
         self.assertRaises(KeyError, self.ts.join)
         self.assertRaises(KeyError, self.ts.join)
-    
+
     def test_successful(self):
     def test_successful(self):
         self.assertFalse(self.ts.successful())
         self.assertFalse(self.ts.successful())
-    
+
     def test_failed(self):
     def test_failed(self):
         self.assertTrue(self.ts.failed())
         self.assertTrue(self.ts.failed())
 
 
@@ -186,7 +186,7 @@ class TestTaskSetPending(unittest.TestCase):
 
 
     def test_ready(self):
     def test_ready(self):
         self.assertFalse(self.ts.ready())
         self.assertFalse(self.ts.ready())
-    
+
     def test_waiting(self):
     def test_waiting(self):
         self.assertTrue(self.ts.waiting())
         self.assertTrue(self.ts.waiting())
 
 

+ 2 - 2
celery/tests/test_task.py

@@ -154,10 +154,10 @@ class TestTaskApply(unittest.TestCase):
         e = IncrementCounterTask.apply()
         e = IncrementCounterTask.apply()
         self.assertTrue(isinstance(e, EagerResult))
         self.assertTrue(isinstance(e, EagerResult))
         self.assertEquals(e.get(), 1)
         self.assertEquals(e.get(), 1)
-        
+
         e = IncrementCounterTask.apply(args=[1])
         e = IncrementCounterTask.apply(args=[1])
         self.assertEquals(e.get(), 2)
         self.assertEquals(e.get(), 2)
-        
+
         e = IncrementCounterTask.apply(kwargs={"increment_by": 4})
         e = IncrementCounterTask.apply(kwargs={"increment_by": 4})
         self.assertEquals(e.get(), 6)
         self.assertEquals(e.get(), 6)
 
 

+ 1 - 1
celery/utils.py

@@ -34,7 +34,7 @@ def chunks(it, n):
 def gen_unique_id():
 def gen_unique_id():
     """Generate a unique id, having - hopefully - a very small chance of
     """Generate a unique id, having - hopefully - a very small chance of
     collission.
     collission.
-    
+
     For now this is provided by :func:`uuid.uuid4`.
     For now this is provided by :func:`uuid.uuid4`.
     """
     """
     return str(uuid.uuid4())
     return str(uuid.uuid4())

+ 0 - 1
celery/views.py

@@ -5,7 +5,6 @@ from celery.result import AsyncResult
 from anyjson import serialize as JSON_dump
 from anyjson import serialize as JSON_dump
 
 
 
 
-
 def apply(request, task_name, *args):
 def apply(request, task_name, *args):
     """View applying a task.
     """View applying a task.