Browse Source

Import task module when unpickling task. Closes #672

Ask Solem 12 years ago
parent
commit
e37e6e2044
3 changed files with 21 additions and 7 deletions
  1. 10 2
      celery/app/registry.py
  2. 11 4
      celery/app/task.py
  3. 0 1
      celery/tests/worker/test_worker.py

+ 10 - 2
celery/app/registry.py

@@ -10,6 +10,9 @@ from __future__ import absolute_import
 
 import inspect
 
+from importlib import import_module
+
+from celery._state import get_current_app
 from celery.exceptions import NotRegistered
 
 
@@ -56,5 +59,10 @@ class TaskRegistry(dict):
 
 
 def _unpickle_task(name):
-    from celery import current_app
-    return current_app.tasks[name]
+    return get_current_app().tasks[name]
+
+
+def _unpickle_task_v2(name, module=None):
+    if module:
+        import_module(module)
+    return get_current_app().tasks[name]

+ 11 - 4
celery/app/task.py

@@ -8,6 +8,8 @@
 """
 from __future__ import absolute_import
 
+import sys
+
 from celery import current_app
 from celery import states
 from celery.__compat__ import class_property
@@ -21,7 +23,7 @@ from celery.utils.imports import instantiate
 from celery.utils.mail import ErrorMail
 
 from .annotations import resolve_all as resolve_all_annotations
-from .registry import _unpickle_task
+from .registry import _unpickle_task_v2
 
 #: extracts attributes related to publishing a message from an object.
 extract_exec_options = mattrgetter(
@@ -316,10 +318,15 @@ class Task(object):
             self.pop_request()
             _task_stack.pop()
 
-    # - tasks are pickled into the name of the task only, and the reciever
-    # - simply grabs it from the local registry.
     def __reduce__(self):
-        return (_unpickle_task, (self.name, ), None)
+        # - tasks are pickled into the name of the task only, and the reciever
+        # - simply grabs it from the local registry.
+        # - in later versions the module of the task is also included,
+        # - and the receiving side tries to import that module so that
+        # - it will work even if the task has not been registered.
+        mod = type(self).__module__
+        mod = mod if mod and mod in sys.modules else None
+        return (_unpickle_task_v2, (self.name, mod), None)
 
     def run(self, *args, **kwargs):
         """The body of the task executed by workers."""

+ 0 - 1
celery/tests/worker/test_worker.py

@@ -1,7 +1,6 @@
 from __future__ import absolute_import
 
 import socket
-import sys
 
 from collections import deque
 from datetime import datetime, timedelta