Ver Fonte

Cosmetic changes

Ask Solem há 15 anos atrás
pai
commit
c6ce1aa6f8
4 ficheiros alterados com 37 adições e 40 exclusões
  1. 1 1
      celery/backends/base.py
  2. 28 28
      celery/decorators.py
  3. 7 10
      celery/result.py
  4. 1 1
      celery/tests/test_registry.py

+ 1 - 1
celery/backends/base.py

@@ -19,9 +19,9 @@ class BaseBackend(object):
     UNREADY_STATES = UNREADY_STATES
     EXCEPTION_STATES = EXCEPTION_STATES
 
+    TimeoutError = TimeoutError
 
     capabilities = []
-    TimeoutError = TimeoutError
 
     def encode_result(self, result, status):
         if status == "SUCCESS":

+ 28 - 28
celery/decorators.py

@@ -1,32 +1,39 @@
 from inspect import getargspec
 
+from billiard.utils.functional import wraps
+
 from celery.task.base import Task, PeriodicTask
 
 
 def task(**options):
-    """Make a task out of any callable.
+    """Decorator to create a task class out of any callable.
+
+    Examples:
 
-        Examples:
+    .. code-block:: python
 
-            >>> @task()
-            ... def refresh_feed(url):
-            ...     return Feed.objects.get(url=url).refresh()
+        @task()
+        def refresh_feed(url):
+            return Feed.objects.get(url=url).refresh()
 
 
-            >>> refresh_feed("http://example.com/rss") # Regular
-            <Feed: http://example.com/rss>
-            >>> refresh_feed.delay("http://example.com/rss") # Async
-            <AsyncResult: 8998d0f4-da0b-4669-ba03-d5ab5ac6ad5d>
+    With setting extra options and using retry.
+
+    .. code-block:: python
 
-            # With setting extra options and using retry.
+        @task(exchange="feeds")
+        def refresh_feed(url, **kwargs):
+            try:
+                return Feed.objects.get(url=url).refresh()
+            except socket.error, exc:
+                refresh_feed.retry(args=[url], kwargs=kwargs, exc=exc)
 
-            >>> @task(exchange="feeds")
-            ... def refresh_feed(url, **kwargs):
-            ...     try:
-            ...         return Feed.objects.get(url=url).refresh()
-            ...     except socket.error, exc:
-            ...         refresh_feed.retry(args=[url], kwargs=kwargs,
-            ...                            exc=exc)
+    Calling the resulting task.
+
+        >>> refresh_feed("http://example.com/rss") # Regular
+        <Feed: http://example.com/rss>
+        >>> refresh_feed.delay("http://example.com/rss") # Async
+        <AsyncResult: 8998d0f4-da0b-4669-ba03-d5ab5ac6ad5d>
 
 
     """
@@ -34,20 +41,13 @@ def task(**options):
     def _create_task_cls(fun):
         base = options.pop("base", Task)
 
-        cls_name = fun.__name__
-
+        @wraps(fun)
         def run(self, *args, **kwargs):
             return fun(*args, **kwargs)
-        run.__name__ = fun.__name__
         run.argspec = getargspec(fun)
 
-        cls_dict = dict(options)
-        cls_dict["run"] = run
-        cls_dict["__module__"] = fun.__module__
-
-        task = type(cls_name, (base, ), cls_dict)()
-
-        return task
+        cls_dict = dict(options, run=run, __module__=fun.__module__)
+        return type(fun.__name__, (base, ), cls_dict)()
 
     return _create_task_cls
 
@@ -69,5 +69,5 @@ def periodic_task(**options):
             logger.warn("Task running...")
 
     """
-    options["base"] = PeriodicTask
+    options.setdefault("base", PeriodicTask)
     return task(**options)

+ 7 - 10
celery/result.py

@@ -13,14 +13,11 @@ from celery.datastructures import PositionQueue
 
 
 class BaseAsyncResult(object):
-    """Base class for pending result, supports custom
-    task meta :attr:`backend`
+    """Base class for pending result, supports custom task result backend.
 
     :param task_id: see :attr:`task_id`.
-
     :param backend: see :attr:`backend`.
 
-
     .. attribute:: task_id
 
         The unique identifier for this task.
@@ -37,14 +34,14 @@ class BaseAsyncResult(object):
         self.task_id = task_id
         self.backend = backend
 
-    def get(self):
+    def get(self, timeout=None):
         """Alias to :meth:`wait`."""
-        return self.wait()
+        return self.wait(timeout=timeout)
 
     def wait(self, timeout=None):
         """Wait for task, and return the result when it arrives.
 
-        :keyword timeout: How long to wait in seconds, before the
+        :keyword timeout: How long to wait, in seconds, before the
             operation times out.
 
         :raises celery.exceptions.TimeoutError: if ``timeout`` is not ``None``
@@ -65,7 +62,7 @@ class BaseAsyncResult(object):
 
         """
         status = self.backend.get_status(self.task_id)
-        return status not in ["PENDING", "RETRY"]
+        return status not in self.backend.UNREADY_STATES
 
     def successful(self):
         """Returns ``True`` if the task executed successfully.
@@ -89,7 +86,7 @@ class BaseAsyncResult(object):
         If the task raised an exception, this will be the exception instance.
 
         """
-        if self.status == "SUCCESS" or self.status == "FAILURE":
+        if self.status in self.backend.READY_STATES:
             return self.backend.get_result(self.task_id)
         return None
 
@@ -246,7 +243,7 @@ class TaskSetResult(object):
         while results:
             for task_id, pending_result in results.items():
                 if pending_result.status == "SUCCESS":
-                    del(results[task_id])
+                    results.pop(task_id, None)
                     yield pending_result.result
                 elif pending_result.status == "FAILURE":
                     raise pending_result.result

+ 1 - 1
celery/tests/test_registry.py

@@ -39,7 +39,7 @@ class TestTaskRegistry(unittest.TestCase):
         self.assertRegisterUnregisterCls(r, TestTask)
         self.assertRegisterUnregisterCls(r, TestPeriodicTask)
 
-        tasks = r.all()
+        tasks = dict(r)
         self.assertTrue(isinstance(tasks.get(TestTask.name), TestTask))
         self.assertTrue(isinstance(tasks.get(TestPeriodicTask.name),
                                    TestPeriodicTask))