Просмотр исходного кода

Smarter pickling of AsyncResult's

Ask Solem 15 лет назад
Родитель
Сommit
1d8fa2cc89
3 измененных файлов с 23 добавлено и 9 удалено
  1. 4 0
      celery/registry.py
  2. 16 3
      celery/result.py
  3. 3 6
      celery/task/base.py

+ 4 - 0
celery/registry.py

@@ -76,3 +76,7 @@ class TaskRegistry(UserDict):
 
 """
 tasks = TaskRegistry()
+
+
+def _unpickle_task(name):
+    return tasks[name]

+ 16 - 3
celery/result.py

@@ -10,9 +10,14 @@ from celery.backends import default_backend
 from celery.datastructures import PositionQueue
 from celery.exceptions import TimeoutError
 from celery.messaging import with_connection
+from celery.registry import _unpickle_task
 from celery.utils import any, all
 
 
+def _unpickle_result(task_id, task_name):
+    return _unpickle_task(task_name).AsyncResult(task_id)
+
+
 class BaseAsyncResult(object):
     """Base class for pending result, supports custom task result backend.
 
@@ -31,9 +36,16 @@ class BaseAsyncResult(object):
 
     TimeoutError = TimeoutError
 
-    def __init__(self, task_id, backend):
+    def __init__(self, task_id, backend, task_name=None):
         self.task_id = task_id
         self.backend = backend
+        self.task_name = task_name
+
+    def __reduce__(self):
+        if self.task_name:
+            return (_unpickle_result, (self.task_id, self.task_name))
+        else:
+            return (self.__class__, (self.task_id, self.backend))
 
     def forget(self):
         """Forget about (and possibly remove the result of) this task."""
@@ -183,8 +195,9 @@ class AsyncResult(BaseAsyncResult):
 
     """
 
-    def __init__(self, task_id, backend=None):
-        super(AsyncResult, self).__init__(task_id, backend or default_backend)
+    def __init__(self, task_id, backend=None, task_name=None):
+        super(AsyncResult, self).__init__(task_id, backend or default_backend,
+                                          task_name=task_name)
 
 
 class TaskSetResult(object):

+ 3 - 6
celery/task/base.py

@@ -8,7 +8,7 @@ from celery.execute import apply_async, apply
 from celery.log import setup_task_logger
 from celery.messaging import TaskPublisher, TaskConsumer
 from celery.messaging import establish_connection as _establish_connection
-from celery.registry import tasks
+from celery.registry import tasks, _unpickle_task
 from celery.result import BaseAsyncResult, EagerResult
 from celery.schedules import maybe_schedule
 from celery.utils.timeutils import timedelta_seconds
@@ -29,10 +29,6 @@ Please use the CELERYBEAT_SCHEDULE setting instead:
 """
 
 
-def _unpickle_task(name):
-    return tasks[name]
-
-
 class TaskType(type):
     """Metaclass for tasks.
 
@@ -465,7 +461,8 @@ class Task(object):
         :param task_id: Task id to get result for.
 
         """
-        return BaseAsyncResult(task_id, backend=self.backend)
+        return BaseAsyncResult(task_id, backend=self.backend,
+                                        task_name=self.name)
 
     def update_state(self, task_id, state, meta=None):
         """Update task state.