Browse Source

Add task properties to AsyncResult, store in backend (#4490)

* Add task properties to AsyncResult and backends/base.py

* make unit test more exact, re-arrange comment line

* isort fix up, please work

* revert result meta for children attrib

* bump up the code coverage a bit, nothing major

* added result_extended option, removed str casts from result meta

* added result_extended config option to documentation

* Add PicklableMock, attempt to fix up unit test serialization issues

* fix tests that were failing on fallback to default app

* change option to default False

* remove unneeded comment

* Add docstring to Picklablemock

* trivial pydocstyle fix

* enable extended results in test

* rename Context property task to task_name

* revert PicklableMock and app.set_current for testing-review
John Arnold 6 years ago
parent
commit
1afaa7b049

+ 1 - 0
celery/app/defaults.py

@@ -180,6 +180,7 @@ NAMESPACES = Namespace(
             type='float', old={'celery_task_result_expires'},
         ),
         persistent=Option(None, type='bool'),
+        extended=Option(False, type='bool'),
         serializer=Option('json'),
         backend_transport_options=Option({}, type='dict'),
     ),

+ 2 - 0
celery/app/task.py

@@ -92,6 +92,7 @@ class Context(object):
     errbacks = None
     timelimit = None
     origin = None
+    task_name = None
     _children = None   # see property
     _protected = 0
 
@@ -128,6 +129,7 @@ class Context(object):
             'retries': self.retries,
             'reply_to': self.reply_to,
             'origin': self.origin,
+            'task_name': self.task_name
         }
 
     @property

+ 31 - 5
celery/backends/base.py

@@ -8,6 +8,7 @@
 """
 from __future__ import absolute_import, unicode_literals
 
+import datetime
 import sys
 import time
 from collections import namedtuple
@@ -70,14 +71,13 @@ def unpickle_backend(cls, args, kwargs):
 
 
 class _nulldict(dict):
-
     def ignore(self, *a, **kw):
         pass
+
     __setitem__ = update = setdefault = ignore
 
 
 class Backend(object):
-
     READY_STATES = states.READY_STATES
     UNREADY_STATES = states.UNREADY_STATES
     EXCEPTION_STATES = states.EXCEPTION_STATES
@@ -332,6 +332,7 @@ class Backend(object):
     def get_state(self, task_id):
         """Get the state of a task."""
         return self.get_task_meta(task_id)['status']
+
     get_status = get_state  # XXX compat
 
     def get_traceback(self, task_id):
@@ -448,7 +449,6 @@ class Backend(object):
 
 
 class SyncBackendMixin(object):
-
     def iter_native(self, result, timeout=None, interval=0.5, no_ack=True,
                     on_message=None, on_interval=None):
         self._ensure_not_eager()
@@ -656,13 +656,39 @@ class BaseKeyValueStoreBackend(Backend):
 
     def _store_result(self, task_id, result, state,
                       traceback=None, request=None, **kwargs):
+
+        if state in self.READY_STATES:
+            date_done = datetime.datetime.utcnow()
+        else:
+            date_done = None
+
         meta = {
-            'status': state, 'result': result, 'traceback': traceback,
+            'status': state,
+            'result': result,
+            'traceback': traceback,
             'children': self.current_task_children(request),
             'task_id': bytes_to_str(task_id),
+            'date_done': date_done,
         }
+
         if request and getattr(request, 'group', None):
             meta['group_id'] = request.group
+
+        if self.app.conf.find_value_for_key('extended', 'result'):
+            if request:
+                request_meta = {
+                    'name': getattr(request, 'task_name', None),
+                    'args': getattr(request, 'args', None),
+                    'kwargs': getattr(request, 'kwargs', None),
+                    'worker': getattr(request, 'hostname', None),
+                    'retries': getattr(request, 'retries', None),
+                    'queue': request.delivery_info.get('routing_key')
+                    if hasattr(request, 'delivery_info') and
+                    request.delivery_info else None
+                }
+
+                meta.update(request_meta)
+
         self.set(self.get_key_for_task(task_id), self.encode(meta))
         return result
 
@@ -769,7 +795,7 @@ class KeyValueStoreBackend(BaseKeyValueStoreBackend, SyncBackendMixin):
 class DisabledBackend(BaseBackend):
     """Dummy result backend."""
 
-    _cache = {}   # need this attribute to reset cache in tests.
+    _cache = {}  # need this attribute to reset cache in tests.
 
     def store_result(self, *args, **kwargs):
         pass

+ 28 - 0
celery/result.py

@@ -480,6 +480,34 @@ class AsyncResult(ResultBase):
     def task_id(self, id):
         self.id = id
 
+    @property
+    def name(self):
+        return self._get_task_meta().get('name')
+
+    @property
+    def args(self):
+        return self._get_task_meta().get('args')
+
+    @property
+    def kwargs(self):
+        return self._get_task_meta().get('kwargs')
+
+    @property
+    def worker(self):
+        return self._get_task_meta().get('worker')
+
+    @property
+    def date_done(self):
+        return self._get_task_meta().get('date_done')
+
+    @property
+    def retries(self):
+        return self._get_task_meta().get('retries')
+
+    @property
+    def queue(self):
+        return self._get_task_meta().get('queue')
+
 
 @Thenable.register
 @python_2_unicode_compatible

+ 10 - 0
docs/userguide/configuration.rst

@@ -642,6 +642,16 @@ Default: No compression.
 Optional compression method used for task results.
 Supports the same options as the :setting:`task_serializer` setting.
 
+.. setting:: result_extended
+
+``result_extended``
+~~~~~~~~~~~~~~~~~~~~~~
+
+Default: ``False``
+
+Enables extended task result attributes (name, args, kwargs, worker,
+retries, queue, delivery_info) to be written to backend.
+
 .. setting:: result_expires
 
 ``result_expires``

+ 2 - 3
t/unit/backends/test_base.py

@@ -8,6 +8,7 @@ import pytest
 from case import ANY, Mock, call, patch, skip
 
 from celery import chord, group, states, uuid
+from celery.app.task import Context
 from celery.backends.base import (BaseBackend, DisabledBackend,
                                   KeyValueStoreBackend, _nulldict)
 from celery.exceptions import ChordError, TimeoutError
@@ -426,9 +427,7 @@ class test_KeyValueStoreBackend:
         tid = uuid()
         state = 'SUCCESS'
         result = 10
-        request = Mock()
-        request.group = 'gid'
-        request.children = []
+        request = Context(group='gid', children=[])
         self.b.store_result(
             tid, state=state, result=result, request=request,
         )

+ 26 - 0
t/unit/tasks/test_result.py

@@ -8,6 +8,7 @@ import pytest
 from case import Mock, call, patch, skip
 
 from celery import states, uuid
+from celery.app.task import Context
 from celery.backends.base import SyncBackendMixin
 from celery.exceptions import (CPendingDeprecationWarning,
                                ImproperlyConfigured, IncompleteStream,
@@ -67,6 +68,7 @@ class test_AsyncResult:
     def setup(self):
         self.app.conf.result_cache_max = 100
         self.app.conf.result_serializer = 'pickle'
+        self.app.conf.result_extended = True
         self.task1 = mock_task('task1', states.SUCCESS, 'the')
         self.task2 = mock_task('task2', states.SUCCESS, 'quick')
         self.task3 = mock_task('task3', states.FAILURE, KeyError('brown'))
@@ -392,6 +394,30 @@ class test_AsyncResult:
         result.backend = None
         del result
 
+    def test_get_request_meta(self):
+
+        x = self.app.AsyncResult('1')
+        request = Context(
+            task_name='foo',
+            children=None,
+            args=['one', 'two'],
+            kwargs={'kwarg1': 'three'},
+            hostname="foo",
+            retries=1,
+            delivery_info={'routing_key': 'celery'}
+        )
+        x.backend.store_result(task_id="1", result='foo', state=states.SUCCESS,
+                               traceback=None, request=request)
+        assert x.name == 'foo'
+        assert x.args == ['one', 'two']
+        assert x.kwargs == {'kwarg1': 'three'}
+        assert x.worker == 'foo'
+        assert x.retries == 1
+        assert x.queue == 'celery'
+        assert x.date_done is not None
+        assert x.task_id == "1"
+        assert x.state == "SUCCESS"
+
 
 class test_ResultSet:
 

+ 3 - 1
t/unit/worker/test_request.py

@@ -253,7 +253,7 @@ class test_Request(RequestCase):
         req.on_retry(Mock())
         req.on_ack.assert_called_with(req_logger, req.connection_errors)
 
-    def test_on_failure_Termianted(self):
+    def test_on_failure_Terminated(self):
         einfo = None
         try:
             raise Terminated('9')
@@ -451,6 +451,7 @@ class test_Request(RequestCase):
                 terminated=False, expired=True, signum=None):
             job.revoked()
             assert job.id in revoked
+            self.app.set_current()
             assert self.mytask.backend.get_status(job.id) == states.REVOKED
 
     def test_revoked_expires_not_expired(self):
@@ -597,6 +598,7 @@ class test_Request(RequestCase):
         job = self.xRequest()
         exc_info = get_ei()
         job.on_failure(exc_info)
+        self.app.set_current()
         assert self.mytask.backend.get_status(job.id) == states.FAILURE
 
         self.mytask.ignore_result = True