Browse Source

Allow Task to override the backend used to store results.

Ask Solem 15 years ago
parent
commit
2321ee7a27
2 changed files with 8 additions and 7 deletions
  1. 2 0
      celery/task/base.py
  2. 6 7
      celery/worker/job.py

+ 2 - 0
celery/task/base.py

@@ -6,6 +6,7 @@ from celery.result import TaskSetResult, EagerResult
 from celery.execute import apply_async, apply
 from celery.utils import gen_unique_id, get_full_cls_name
 from celery.registry import tasks
+from celery.backends import default_backend
 from celery.serialization import pickle
 from celery.exceptions import MaxRetriesExceededError, RetryTaskError
 from datetime import timedelta
@@ -187,6 +188,7 @@ class Task(object):
     default_retry_delay = 3 * 60
     serializer = conf.TASK_SERIALIZER
     rate_limit = conf.DEFAULT_RATE_LIMIT
+    backend = default_backend
 
     MaxRetriesExceededError = MaxRetriesExceededError
 

+ 6 - 7
celery/worker/job.py

@@ -11,7 +11,6 @@ from celery.log import get_default_logger
 from celery.monitoring import TaskTimerStats
 from django.core.mail import mail_admins
 from celery.loaders import current_loader
-from celery.backends import default_backend
 from celery.datastructures import ExceptionInfo
 import sys
 import socket
@@ -67,7 +66,6 @@ class WorkerTaskTrace(TaskTrace):
     """
 
     def __init__(self, *args, **kwargs):
-        self.backend = kwargs.pop("backend", default_backend)
         self.loader = kwargs.pop("loader", current_loader)
         super(WorkerTaskTrace, self).__init__(*args, **kwargs)
 
@@ -76,7 +74,7 @@ class WorkerTaskTrace(TaskTrace):
             return self.execute(*args, **kwargs)
         except Exception, exc:
             type_, value_, tb = sys.exc_info()
-            exc = self.backend.prepare_exception(exc)
+            exc = self.task.backend.prepare_exception(exc)
             warnings.warn("Exception happend outside of task body: %s: %s" % (
                 str(exc.__class__), str(exc)))
             return ExceptionInfo((type_, exc, tb))
@@ -86,7 +84,7 @@ class WorkerTaskTrace(TaskTrace):
         self.loader.on_task_init(self.task_id, self.task)
 
         # Backend process cleanup
-        self.backend.process_cleanup()
+        self.task.backend.process_cleanup()
 
         timer_stat = TaskTimerStats.start(self.task_id, self.task_name,
                                           self.args, self.kwargs)
@@ -103,13 +101,13 @@ class WorkerTaskTrace(TaskTrace):
 
         """
         if not self.task.ignore_result:
-            self.backend.mark_as_done(self.task_id, retval)
+            self.task.backend.mark_as_done(self.task_id, retval)
         return super(WorkerTaskTrace, self).handle_success(retval, *args)
 
     def handle_retry(self, exc, type_, tb, strtb):
         """Handle retry exception."""
         message, orig_exc = exc.args
-        self.backend.mark_as_retry(self.task_id, orig_exc, strtb)
+        self.task.backend.mark_as_retry(self.task_id, orig_exc, strtb)
         return super(WorkerTaskTrace, self).handle_retry(exc, type_,
                                                          tb, strtb)
 
@@ -117,7 +115,8 @@ class WorkerTaskTrace(TaskTrace):
         """Handle exception."""
         # mark_as_failure returns an exception that is guaranteed to
         # be pickleable.
-        stored_exc = self.backend.mark_as_failure(self.task_id, exc, strtb)
+        stored_exc = self.task.backend.mark_as_failure(self.task_id,
+                                                       exc, strtb)
         return super(WorkerTaskTrace, self).handle_failure(
                 stored_exc, type_, tb, strtb)