Ask Solem 13 年 前
コミット
504510e46a

+ 0 - 2
celery/apps/beat.py

@@ -40,8 +40,6 @@ class Beat(configurated):
         self.app = app = app_or_default(app)
         self.setup_defaults(kwargs, namespace="celerybeat")
 
-        print("SCHEDULE: %r" % (self.schedule, ))
-
         self.max_interval = max_interval
         self.socket_timeout = socket_timeout
         self.colored = app.log.colored(self.logfile)

+ 0 - 1
celery/events/__init__.py

@@ -20,7 +20,6 @@ import threading
 
 from collections import deque
 from contextlib import contextmanager
-from itertools import count
 
 from kombu.common import eventloop
 from kombu.entity import Exchange, Queue

+ 1 - 0
celery/events/cursesmon.py

@@ -10,6 +10,7 @@
 
 """
 from __future__ import absolute_import
+from __future__ import with_statement
 
 import curses
 import sys

+ 25 - 40
celery/result.py

@@ -93,10 +93,7 @@ class AsyncResult(object):
         return self.backend.wait_for(self.task_id, timeout=timeout,
                                                    propagate=propagate,
                                                    interval=interval)
-
-    def wait(self, *args, **kwargs):
-        """Deprecated alias to :meth:`get`."""
-        return self.get(*args, **kwargs)
+    wait = get  # deprecated alias to :meth:`get`.
 
     def ready(self):
         """Returns :const:`True` if the task has been executed.
@@ -105,15 +102,15 @@ class AsyncResult(object):
         for retry then :const:`False` is returned.
 
         """
-        return self.status in self.backend.READY_STATES
+        return self.state in self.backend.READY_STATES
 
     def successful(self):
         """Returns :const:`True` if the task executed successfully."""
-        return self.status == states.SUCCESS
+        return self.state == states.SUCCESS
 
     def failed(self):
         """Returns :const:`True` if the task failed."""
-        return self.status == states.FAILURE
+        return self.state == states.FAILURE
 
     def __str__(self):
         """`str(self) -> self.task_id`"""
@@ -147,11 +144,7 @@ class AsyncResult(object):
         If the task raised an exception, this will be the exception
         instance."""
         return self.backend.get_result(self.task_id)
-
-    @property
-    def info(self):
-        """Get state metadata.  Alias to :meth:`result`."""
-        return self.result
+    info = result
 
     @property
     def traceback(self):
@@ -189,11 +182,7 @@ class AsyncResult(object):
 
         """
         return self.backend.get_status(self.task_id)
-
-    @property
-    def status(self):
-        """Deprecated alias of :attr:`state`."""
-        return self.state
+    status = state
 BaseAsyncResult = AsyncResult  # for backwards compatibility.
 
 
@@ -419,16 +408,19 @@ class ResultSet(object):
 
         """
         results = self.results
-        acc = [None for _ in xrange(self.total)]
+        acc = [None for _ in xrange(len(self))]
         for task_id, meta in self.iter_native(timeout=timeout,
                                               interval=interval):
             acc[results.index(task_id)] = meta["result"]
         return acc
 
+    def __len__(self):
+        return len(self.results)
+
     @property
     def total(self):
-        """Total number of tasks in the set."""
-        return len(self.results)
+        """Deprecated: Use ``len(r)``."""
+        return len(self)
 
     @property
     def subtasks(self):
@@ -482,11 +474,6 @@ class TaskSetResult(ResultSet):
         """Remove this result if it was previously saved."""
         (backend or self.app.backend).delete_taskset(self.taskset_id)
 
-    @classmethod
-    def restore(self, taskset_id, backend=None):
-        """Restore previously saved taskset result."""
-        return (backend or current_app.backend).restore_taskset(taskset_id)
-
     def itersubtasks(self):
         """Depreacted.   Use ``iter(self.results)`` instead."""
         return iter(self.results)
@@ -494,10 +481,14 @@ class TaskSetResult(ResultSet):
     def __reduce__(self):
         return (self.__class__, (self.taskset_id, self.results))
 
+    @classmethod
+    def restore(self, taskset_id, backend=None):
+        """Restore previously saved taskset result."""
+        return (backend or current_app.backend).restore_taskset(taskset_id)
 
-class EagerResult(BaseAsyncResult):
+
+class EagerResult(AsyncResult):
     """Result that we know has already been executed."""
-    TimeoutError = TimeoutError
 
     def __init__(self, task_id, ret_value, state, traceback=None):
         self.task_id = task_id
@@ -513,22 +504,20 @@ class EagerResult(BaseAsyncResult):
         cls, args = self.__reduce__()
         return cls(*args)
 
-    def successful(self):
-        """Returns :const:`True` if the task executed without failure."""
-        return self.state == states.SUCCESS
-
     def ready(self):
-        """Returns :const:`True` if the task has been executed."""
         return True
 
     def get(self, timeout=None, propagate=True, **kwargs):
-        """Wait until the task has been executed and return its result."""
-        if self.state == states.SUCCESS:
+        if self.successful():
             return self.result
-        elif self.state in states.PROPAGATE_STATES:
+        elif self._state in states.PROPAGATE_STATES:
             if propagate:
                 raise self.result
             return self.result
+    wait = get
+
+    def forget(self):
+        pass
 
     def revoke(self):
         self._state = states.REVOKED
@@ -545,13 +534,9 @@ class EagerResult(BaseAsyncResult):
     def state(self):
         """The tasks state."""
         return self._state
+    status = state
 
     @property
     def traceback(self):
         """The traceback if the task failed."""
         return self._traceback
-
-    @property
-    def status(self):
-        """The tasks status (alias to :attr:`state`)."""
-        return self._state

+ 11 - 10
celery/tests/test_task/test_result.py

@@ -13,16 +13,16 @@ from celery.tests.utils import AppCase
 from celery.tests.utils import skip_if_quick
 
 
-def mock_task(name, status, result):
-    return dict(id=uuid(), name=name, status=status, result=result)
+def mock_task(name, state, result):
+    return dict(id=uuid(), name=name, state=state, result=result)
 
 
 def save_result(task):
     app = app_or_default()
     traceback = "Some traceback"
-    if task["status"] == states.SUCCESS:
+    if task["state"] == states.SUCCESS:
         app.backend.mark_as_done(task["id"], task["result"])
-    elif task["status"] == states.RETRY:
+    elif task["state"] == states.RETRY:
         app.backend.mark_as_retry(task["id"], task["result"],
                 traceback=traceback)
     else:
@@ -127,7 +127,7 @@ class TestAsyncResult(AppCase):
         self.assertEqual(ok_res.info, "the")
 
     def test_get_timeout(self):
-        res = AsyncResult(self.task4["id"])             # has RETRY status
+        res = AsyncResult(self.task4["id"])             # has RETRY state
         with self.assertRaises(TimeoutError):
             res.get(timeout=0.1)
 
@@ -137,7 +137,7 @@ class TestAsyncResult(AppCase):
 
     @skip_if_quick
     def test_get_timeout_longer(self):
-        res = AsyncResult(self.task4["id"])             # has RETRY status
+        res = AsyncResult(self.task4["id"])             # has RETRY state
         with self.assertRaises(TimeoutError):
             res.get(timeout=1)
 
@@ -178,7 +178,7 @@ class MockAsyncResultFailure(AsyncResult):
         return KeyError("baz")
 
     @property
-    def status(self):
+    def state(self):
         return states.FAILURE
 
     def get(self, propagate=True, **kwargs):
@@ -198,7 +198,7 @@ class MockAsyncResultSuccess(AsyncResult):
         return 42
 
     @property
-    def status(self):
+    def state(self):
         return states.SUCCESS
 
     def get(self, **kwargs):
@@ -222,6 +222,7 @@ class TestTaskSetResult(AppCase):
         self.ts = TaskSetResult(uuid(), make_mock_taskset(self.size))
 
     def test_total(self):
+        self.assertEqual(len(self.ts), self.size)
         self.assertEqual(self.ts.total, self.size)
 
     def test_iterate_raises(self):
@@ -331,7 +332,7 @@ class TestTaskSetResult(AppCase):
         self.assertTrue(self.ts.ready())
 
     def test_completed_count(self):
-        self.assertEqual(self.ts.completed_count(), self.ts.total)
+        self.assertEqual(self.ts.completed_count(), len(self.ts))
 
 
 class TestPendingAsyncResult(AppCase):
@@ -365,7 +366,7 @@ class TestFailedTaskSetResult(TestTaskSetResult):
             t.get()
 
     def test_completed_count(self):
-        self.assertEqual(self.ts.completed_count(), self.ts.total - 1)
+        self.assertEqual(self.ts.completed_count(), len(self.ts) - 1)
 
     def test___iter__(self):
         it = iter(self.ts)