Przeglądaj źródła

Last commit forgot changes

Ask Solem 9 lat temu
rodzic
commit
3fff58c174

+ 5 - 8
celery/app/trace.py

@@ -187,10 +187,9 @@ class TraceInfo(object):
             einfo = ExceptionInfo()
             einfo.exception = get_pickleable_exception(einfo.exception)
             einfo.type = get_pickleable_etype(einfo.type)
-            if store_errors:
-                task.backend.mark_as_failure(
-                    req.id, exc, einfo.traceback, request=req,
-                )
+            task.backend.mark_as_failure(
+                req.id, exc, einfo.traceback, req, store_errors,
+            )
             task.on_failure(exc, req.id, req.args, req.kwargs, einfo)
             signals.task_failure.send(sender=task, task_id=req.id,
                                       exception=exc, args=req.args,
@@ -282,6 +281,7 @@ def build_tracer(name, task, loader=None, hostname=None, store_errors=True,
         task_after_return = task.after_return
 
     store_result = backend.store_result
+    mark_as_done = backend.mark_as_done
     backend_cleanup = backend.process_cleanup
 
     pid = os.getpid()
@@ -394,10 +394,7 @@ def build_tracer(name, task, loader=None, hostname=None, store_errors=True,
                                     group(sigs).apply_async((retval,))
                             else:
                                 signature(callbacks[0], app=app).delay(retval)
-                        if publish_result:
-                            store_result(
-                                uuid, retval, SUCCESS, request=task_request,
-                            )
+                        mark_as_done(uuid, retval, task_request, publish_result)
                     except EncodeError as exc:
                         I, R, state, retval = on_error(task_request, exc, uuid)
                     else:

+ 25 - 16
celery/backends/base.py

@@ -112,20 +112,40 @@ class BaseBackend(object):
         """Mark a task as started"""
         return self.store_result(task_id, meta, status=states.STARTED)
 
-    def mark_as_done(self, task_id, result, request=None, state=states.SUCCESS):
+    def mark_as_done(self, task_id, result,
+                     request=None, store_result=True, state=states.SUCCESS):
         """Mark task as successfully executed."""
-        self.store_result(task_id, result, status=state, request=request)
+        if store_result:
+            self.store_result(task_id, result, status=state, request=request)
         if request and request.chord:
             self.on_chord_part_return(request, state)
 
     def mark_as_failure(self, task_id, exc,
-                        traceback=None, request=None, state=states.FAILURE):
+                        traceback=None, request=None, store_result=True,
+                        state=states.FAILURE):
         """Mark task as executed with failure. Stores the exception."""
-        self.store_result(task_id, exc, status=state,
-                          traceback=traceback, request=request)
+        if store_result:
+            self.store_result(task_id, exc, status=state,
+                              traceback=traceback, request=request)
         if request and request.chord:
             self.on_chord_part_return(request, state, exc)
 
+    def mark_as_revoked(self, task_id, reason='',
+                        request=None, store_result=True, state=states.REVOKED):
+        exc = TaskRevokedError(reason)
+        if store_result:
+            self.store_result(task_id, exc,
+                              status=state, traceback=None, request=request)
+        if request and request.chord:
+            self.on_chord_part_return(request, state, exc)
+
+    def mark_as_retry(self, task_id, exc, traceback=None,
+                      request=None, store_result=True, state=states.RETRY):
+        """Mark task as being retries. Stores the current
+        exception (if any)."""
+        return self.store_result(task_id, exc, status=state,
+                                 traceback=traceback, request=request)
+
     def chord_error_from_stack(self, callback, exc=None):
         from celery import group
         app = self.app
@@ -151,17 +171,6 @@ class BaseBackend(object):
         finally:
             del(tb)
 
-    def mark_as_retry(self, task_id, exc, traceback=None, request=None):
-        """Mark task as being retries. Stores the current
-        exception (if any)."""
-        return self.store_result(task_id, exc, status=states.RETRY,
-                                 traceback=traceback, request=request)
-
-    def mark_as_revoked(self, task_id, reason='', request=None):
-        return self.store_result(task_id, TaskRevokedError(reason),
-                                 status=states.REVOKED, traceback=None,
-                                 request=request)
-
     def prepare_exception(self, exc, serializer=None):
         """Prepare exception for serialization."""
         serializer = self.serializer if serializer is None else serializer

+ 6 - 11
celery/tests/tasks/test_trace.py

@@ -103,19 +103,14 @@ class test_trace(TraceCase):
             return x + y
         add.backend = Mock()
 
-        class TestRequest(object):
-
-            def __init__(self, request):
-                self.request = request
-
-            def __eq__(self, other):
-                return self.request['chord'] == other['chord']
-
         request = {'chord': uuid()}
         self.trace(add, (2, 2), {}, request=request)
-        add.backend.on_chord_part_return.assert_called_with(
-            TestRequest(request), 'SUCCESS', 4,
-        )
+        self.assertTrue(add.backend.mark_as_done.called)
+        args, kwargs = add.backend.mark_as_done.call_args
+        self.assertEqual(args[0], 'id-1')
+        self.assertEqual(args[1], 4)
+        self.assertEqual(args[2].chord, request['chord'])
+        self.assertFalse(args[3])
 
     def test_when_backend_cleanup_raises(self):
 

+ 1 - 1
celery/tests/worker/test_request.py

@@ -146,7 +146,7 @@ class test_trace_task(AppCase):
         tid = uuid()
         ret = jail(self.app, tid, self.mytask.name, [2], {})
         self.assertEqual(ret, 4)
-        self.assertTrue(self.mytask.backend.store_result.called)
+        self.assertTrue(self.mytask.backend.mark_as_done.called)
         self.assertIn('Process cleanup failed', _logger.error.call_args[0][0])
 
     def test_process_cleanup_BaseException(self):

+ 16 - 12
celery/worker/request.py

@@ -27,6 +27,7 @@ from celery.exceptions import (
 )
 from celery.five import string
 from celery.platforms import signals as _signals
+from celery.utils import cached_property
 from celery.utils.functional import noop
 from celery.utils.log import get_logger
 from celery.utils.timeutils import maybe_iso8601, timezone, maybe_make_aware
@@ -245,8 +246,9 @@ class Request(object):
         task_ready(self)
         self.send_event('task-revoked',
                         terminated=terminated, signum=signum, expired=expired)
-        if self.store_errors:
-            self.task.backend.mark_as_revoked(self.id, reason, request=self)
+        self.task.backend.mark_as_revoked(
+            self.id, reason, request=self, store_result=self.store_errors,
+        )
         self.acknowledge()
         self._already_revoked = True
         send_revoked(self.task, request=self,
@@ -296,8 +298,9 @@ class Request(object):
                   timeout, self.name, self.id)
             exc = TimeLimitExceeded(timeout)
 
-        if self.store_errors:
-            self.task.backend.mark_as_failure(self.id, exc, request=self)
+        self.task.backend.mark_as_failure(
+            self.id, exc, request=self, store_result=self.store_errors,
+        )
 
         if self.task.acks_late:
             self.acknowledge()
@@ -342,13 +345,14 @@ class Request(object):
 
         # These are special cases where the process would not have had
         # time to write the result.
-        if self.store_errors:
-            if isinstance(exc, Terminated):
-                self._announce_revoked(
-                    'terminated', True, string(exc), False)
-                send_failed_event = False  # already sent revoked event
-            elif isinstance(exc, WorkerLostError) or not return_ok:
-                self.task.backend.mark_as_failure(self.id, exc, request=self)
+        if isinstance(exc, Terminated):
+            self._announce_revoked(
+                'terminated', True, string(exc), False)
+            send_failed_event = False  # already sent revoked event
+        elif isinstance(exc, WorkerLostError) or not return_ok:
+            self.task.backend.mark_as_failure(
+                self.id, exc, request=self, store_result=self.store_errors,
+            )
         # (acks_late) acknowledge after result stored.
         if self.task.acks_late:
             reject_and_requeue = (
@@ -460,7 +464,7 @@ class Request(object):
         # used by backend.on_chord_part_return when failures reported
         # by parent process
         _, _, embed = self._payload
-        return embed['chord']
+        return embed.get('chord')
 
     @cached_property
     def group(self):