Bladeren bron

Easier to extend how celery apps are pickled

Ask Solem 13 jaren geleden
bovenliggende
commit
66d60f54d3
2 gewijzigde bestanden met toevoegingen van 45 en 21 verwijderingen
  1. 44 20
      celery/app/__init__.py
  2. 1 1
      celery/app/base.py

+ 44 - 20
celery/app/__init__.py

@@ -27,6 +27,36 @@ _tls = threading.local()
 _tls.current_app = None
 
 
+class AppPickler(object):
+
+    def __call__(self, cls, *args):
+        kwargs = self.build_kwargs(*args)
+        app = self.construct(cls, **kwargs)
+        self.prepare(app, **kwargs)
+        return app
+
+    def prepare(self, app, **kwargs):
+        app.conf.update(kwargs["changes"])
+
+    def build_kwargs(self, *args):
+        return self.build_standard_kwargs(*args)
+
+    def build_standard_kwargs(self, main, changes, loader, backend, amqp,
+            events, log, control, accept_magic_kwargs):
+        return dict(main=main, loader=loader, backend=backend, amqp=amqp,
+                    changes=changes, events=events, log=log, control=control,
+                    set_as_current=False,
+                    accept_magic_kwargs=accept_magic_kwargs)
+
+
+    def construct(self, cls, **kwargs):
+        return cls(**kwargs)
+
+
+def _unpickle_app(cls, pickler, *args):
+    return pickler()(cls, *args)
+
+
 class App(base.BaseApp):
     """Celery Application.
 
@@ -43,6 +73,7 @@ class App(base.BaseApp):
     :keyword set_as_current:  Make this the global current app.
 
     """
+    Pickler = AppPickler
 
     def set_current(self):
         """Make this the current app for this thread."""
@@ -170,26 +201,19 @@ class App(base.BaseApp):
         # Reduce only pickles the configuration changes,
         # so the default configuration doesn't have to be passed
         # between processes.
-        return (_unpickle_app, (self.__class__,
-                                self.main,
-                                self.conf.changes,
-                                self.loader_cls,
-                                self.backend_cls,
-                                self.amqp_cls,
-                                self.events_cls,
-                                self.log_cls,
-                                self.control_cls,
-                                self.accept_magic_kwargs))
-
-
-def _unpickle_app(cls, main, changes, loader, backend, amqp,
-        events, log, control, accept_magic_kwargs):
-    app = cls(main, loader=loader, backend=backend, amqp=amqp,
-                    events=events, log=log, control=control,
-                    set_as_current=False,
-                    accept_magic_kwargs=accept_magic_kwargs)
-    app.conf.update(changes)
-    return app
+        return (_unpickle_app, (self.__class__, self.Pickler)
+                              + self.__reduce_args__())
+
+    def __reduce_args__(self):
+        return (self.main,
+                self.conf.changes,
+                self.loader_cls,
+                self.backend_cls,
+                self.amqp_cls,
+                self.events_cls,
+                self.log_cls,
+                self.control_cls,
+                self.accept_magic_kwargs)
 
 
 #: The "default" loader is the default loader used by old applications.

+ 1 - 1
celery/app/base.py

@@ -146,7 +146,7 @@ class BaseApp(object):
 
     def __init__(self, main=None, loader=None, backend=None,
             amqp=None, events=None, log=None, control=None,
-            set_as_current=True, accept_magic_kwargs=False):
+            set_as_current=True, accept_magic_kwargs=False, **kwargs):
         self.main = main
         self.amqp_cls = amqp or self.amqp_cls
         self.backend_cls = backend or self.backend_cls