Parcourir la source

TaskWrapper must only execute once.

Ask Solem il y a 15 ans
Parent
commit
6fdd254f82
1 fichiers modifiés avec 29 ajouts et 2 suppressions
  1. 29 2
      celery/worker/job.py

+ 29 - 2
celery/worker/job.py

@@ -12,6 +12,7 @@ import multiprocessing
 import socket
 import sys
 
+
 # pep8.py borks on a inline signature separator and
 # says "trailing whitespace" ;)
 EMAIL_SIGNATURE_SEP = "-- "
@@ -19,7 +20,8 @@ TASK_FAIL_EMAIL_BODY = """
 Task %%(name)s with id %%(id)s raised exception: %%(exc)s
 
 
-Task was called with args:%%(args)s kwargs:%%(kwargs)s.
+Task was called with args: %%(args)s kwargs: %%(kwargs)s.
+
 The contents of the full traceback was:
 
 %%(traceback)s
@@ -30,8 +32,14 @@ celeryd at %%(hostname)s.
 """ % {"EMAIL_SIGNATURE_SEP": EMAIL_SIGNATURE_SEP}
 
 
+class AlreadyExecutedError(Exception):
+    """Tasks can only be executed once, as they might change
+    world-wide state."""
+
+
 class TaskWrapper(object):
-    """Class wrapping a task to be run.
+    """Class wrapping a task to be passed around and finally
+    executed inside of the worker.
 
     :param task_name: see :attr:`task_name`.
 
@@ -67,6 +75,11 @@ class TaskWrapper(object):
 
         The original message sent. Used for acknowledging the message.
 
+    .. attribute executed
+
+    Set if the task has been executed. A task should only be executed
+    once.
+
     """
     success_msg = "Task %(name)s[%(id)s] processed: %(return_value)s"
     fail_msg = """
@@ -87,6 +100,7 @@ class TaskWrapper(object):
         self.kwargs = kwargs
         self.logger = kwargs.get("logger")
         self.on_ack = on_ack
+        self.executed = False
         for opt in ("success_msg", "fail_msg", "fail_email_subject",
                 "fail_email_body"):
             setattr(self, opt, opts.get(opt, getattr(self, opt, None)))
@@ -154,6 +168,13 @@ class TaskWrapper(object):
         return ExecuteWrapper(self.task_func, self.task_id, self.task_name,
                               self.args, task_func_kwargs)
 
+    def _set_executed_bit(self):
+        if self.executed:
+            raise AlreadyExecutedError(
+                   "Task %s[%s] has already been executed" % (
+                       self.task_name, self.task_id))
+        self.executed = True
+
     def execute(self, loglevel=None, logfile=None):
         """Execute the task in a :class:`celery.execute.ExecuteWrapper`.
 
@@ -162,6 +183,9 @@ class TaskWrapper(object):
         :keyword logfile: The logfile used by the task.
 
         """
+        # Make sure task has not already been executed.
+        self._set_executed_bit()
+
         # acknowledge task as being processed.
         self.on_ack()
 
@@ -179,6 +203,9 @@ class TaskWrapper(object):
         :returns :class:`multiprocessing.AsyncResult` instance.
 
         """
+        # Make sure task has not already been executed.
+        self._set_executed_bit()
+
         wrapper = self._executeable(loglevel, logfile)
         return pool.apply_async(wrapper,
                 callbacks=[self.on_success], errbacks=[self.on_failure],