Browse Source

App cached class attributes are now pickleable

Ask Solem 13 years ago
parent
commit
6e362f976b
2 changed files with 28 additions and 17 deletions
  1. 16 4
      celery/app/base.py
  2. 12 13
      celery/result.py

+ 16 - 4
celery/app/base.py

@@ -35,10 +35,14 @@ from .annotations import (
 )
 from .builtins import load_builtin_tasks
 from .defaults import DEFAULTS, find_deprecated_settings
-from .state import _tls
+from .state import _tls, get_current_app
 from .utils import AppPickler, Settings, bugreport, _unpickle_app
 
 
+def _unpickle_appattr(reverse, args):
+    return getattr(get_current_app(), reverse)(*args)
+
+
 class Celery(object):
     """Celery Application.
 
@@ -112,7 +116,8 @@ class Celery(object):
         return self.subclass_with_self("celery.app.task:BaseTask", name="Task",
                                        attribute="_app", abstract=True)
 
-    def subclass_with_self(self, Class, name=None, attribute="app", **kw):
+    def subclass_with_self(self, Class, name=None, attribute="app",
+            reverse=None, **kw):
         """Subclass an app-compatible class by setting its app attribute
         to be this app instance.
 
@@ -127,8 +132,15 @@ class Celery(object):
 
         """
         Class = symbol_by_name(Class)
-        return type(name or Class.__name__, (Class, ), dict({attribute: self,
-            "__module__": Class.__module__, "__doc__": Class.__doc__}, **kw))
+        reverse = reverse if reverse else Class.__name__
+
+        def __reduce__(self):
+            return _unpickle_appattr, (reverse, self.__reduce_args__())
+
+        attrs = dict({attribute: self}, __module__=Class.__module__,
+                     __doc__=Class.__doc__, __reduce__=__reduce__, **kw)
+
+        return type(name or Class.__name__, (Class, ), attrs)
 
     @cached_property
     def Worker(self):

+ 12 - 13
celery/result.py

@@ -21,17 +21,12 @@ from itertools import imap
 from . import current_app
 from . import states
 from .app import app_or_default
-from .app.registry import _unpickle_task
 from .datastructures import DependencyGraph
 from .exceptions import IncompleteStream, TimeoutError
 from .utils import cached_property
 from .utils.compat import OrderedDict
 
 
-def _unpickle_result(task_id, task_name):
-    return _unpickle_task(task_name).AsyncResult(task_id)
-
-
 def from_serializable(r):
     # earlier backends may just pickle, so check if
     # result is already prepared.
@@ -211,11 +206,10 @@ class AsyncResult(ResultBase):
         return self.__class__(self.id, backend=self.backend)
 
     def __reduce__(self):
-        if self.task_name:
-            return (_unpickle_result, (self.id, self.task_name))
-        else:
-            return (AsyncResult, (self.id, self.backend,
-                                  None, self.app))
+        return self.__class__, self.__reduce_args__()
+
+    def __reduce_args__(self):
+        return self.id, self.task_name, self.backend
 
     def build_graph(self, intermediate=False):
         graph = DependencyGraph()
@@ -611,7 +605,10 @@ class TaskSetResult(ResultSet):
         return iter(self.results)
 
     def __reduce__(self):
-        return (TaskSetResult, (self.id, self.results))
+        return self.__class__, self.__reduce_args__()
+
+    def __reduce_args__(self):
+        return self.id, self.results
 
     def __eq__(self, other):
         if isinstance(other, TaskSetResult):
@@ -648,8 +645,10 @@ class EagerResult(AsyncResult):
         self._traceback = traceback
 
     def __reduce__(self):
-        return (EagerResult, (self.id, self._result,
-                              self._state, self._traceback))
+        return self.__class__, self.__reduce_args__()
+
+    def __reduce_args__(self):
+        return (self.id, self._result, self._state, self._traceback)
 
     def __copy__(self):
         cls, args = self.__reduce__()