Procházet zdrojové kódy

Backends: Use state in API consistently over status

- Backend.store_result() -> status argument renamed to state
- Backend.get_status() -> renamed to Backend.get_state()
Ask Solem před 10 roky
rodič
revize
8432fdc5f0

+ 4 - 4
celery/backends/amqp.py

@@ -111,16 +111,16 @@ class AMQPBackend(BaseBackend):
             return self.rkey(task_id), request.correlation_id or task_id
         return self.rkey(task_id), task_id
 
-    def store_result(self, task_id, result, status,
+    def store_result(self, task_id, result, state,
                      traceback=None, request=None, **kwargs):
-        """Send task return value and status."""
+        """Send task return value and state."""
         routing_key, correlation_id = self.destination_for(task_id, request)
         if not routing_key:
             return
         with self.app.amqp.producer_pool.acquire(block=True) as producer:
             producer.publish(
-                {'task_id': task_id, 'status': status,
-                 'result': self.encode_result(result, status),
+                {'task_id': task_id, 'status': state,
+                 'result': self.encode_result(result, state),
                  'traceback': traceback,
                  'children': self.current_task_children(request)},
                 exchange=self.exchange,

+ 12 - 11
celery/backends/base.py

@@ -263,8 +263,8 @@ class BaseBackend(object):
         p = self.app.conf.result_persistent
         return self.persistent if p is None else p
 
-    def encode_result(self, result, status):
-        if status in self.EXCEPTION_STATES and isinstance(result, Exception):
+    def encode_result(self, result, state):
+        if state in self.EXCEPTION_STATES and isinstance(result, Exception):
             return self.prepare_exception(result)
         else:
             return self.prepare_value(result)
@@ -272,11 +272,11 @@ class BaseBackend(object):
     def is_cached(self, task_id):
         return task_id in self._cache
 
-    def store_result(self, task_id, result, status,
+    def store_result(self, task_id, result, state,
                      traceback=None, request=None, **kwargs):
         """Update task state and result."""
-        result = self.encode_result(result, status)
-        self._store_result(task_id, result, status, traceback,
+        result = self.encode_result(result, state)
+        self._store_result(task_id, result, state, traceback,
                            request=request, **kwargs)
         return result
 
@@ -287,9 +287,10 @@ class BaseBackend(object):
     def _forget(self, task_id):
         raise NotImplementedError('backend does not implement forget.')
 
-    def get_status(self, task_id):
-        """Get the status of a task."""
+    def get_state(self, task_id):
+        """Get the state of a task."""
         return self.get_task_meta(task_id)['status']
+    get_status = get_state  # XXX compat
 
     def get_traceback(self, task_id):
         """Get the traceback for a failed task."""
@@ -521,9 +522,9 @@ class KeyValueStoreBackend(BaseBackend):
     def _forget(self, task_id):
         self.delete(self.get_key_for_task(task_id))
 
-    def _store_result(self, task_id, result, status,
+    def _store_result(self, task_id, result, state,
                       traceback=None, request=None, **kwargs):
-        meta = {'status': status, 'result': result, 'traceback': traceback,
+        meta = {'status': state, 'result': result, 'traceback': traceback,
                 'children': self.current_task_children(request)}
         self.set(self.get_key_for_task(task_id), self.encode(meta))
         return result
@@ -639,5 +640,5 @@ class DisabledBackend(BaseBackend):
         raise NotImplementedError(
             'No result backend configured.  '
             'Please see the documentation for more information.')
-    wait_for = get_status = get_result = get_traceback = _is_disabled
-    get_many = _is_disabled
+    get_state = get_status = get_result = get_traceback = _is_disabled
+    wait_for = get_many = _is_disabled

+ 3 - 3
celery/backends/cassandra.py

@@ -188,14 +188,14 @@ class CassandraBackend(BaseBackend):
                 self._session = None
                 raise   # we did fail after all - reraise
 
-    def _store_result(self, task_id, result, status,
+    def _store_result(self, task_id, result, state,
                       traceback=None, request=None, **kwargs):
-        """Store return value and status of an executed task."""
+        """Store return value and state of an executed task."""
         self._get_connection(write=True)
 
         self._session.execute(self._write_stmt, (
             task_id,
-            status,
+            state,
             buf_t(self.encode(result)),
             self.app.now(),
             buf_t(self.encode(traceback)),

+ 3 - 3
celery/backends/database/__init__.py

@@ -106,9 +106,9 @@ class DatabaseBackend(BaseBackend):
         )
 
     @retry
-    def _store_result(self, task_id, result, status,
+    def _store_result(self, task_id, result, state,
                       traceback=None, max_retries=3, **kwargs):
-        """Store return value and status of an executed task."""
+        """Store return value and state of an executed task."""
         session = self.ResultSession()
         with session_cleanup(session):
             task = list(session.query(Task).filter(Task.task_id == task_id))
@@ -118,7 +118,7 @@ class DatabaseBackend(BaseBackend):
                 session.add(task)
                 session.flush()
             task.result = result
-            task.status = status
+            task.status = state
             task.traceback = traceback
             session.commit()
             return result

+ 3 - 3
celery/backends/mongodb.py

@@ -181,12 +181,12 @@ class MongoBackend(BaseBackend):
             return data
         return super(MongoBackend, self).decode(data)
 
-    def _store_result(self, task_id, result, status,
+    def _store_result(self, task_id, result, state,
                       traceback=None, request=None, **kwargs):
-        """Store return value and status of an executed task."""
+        """Store return value and state of an executed task."""
 
         meta = {'_id': task_id,
-                'status': status,
+                'status': state,
                 'result': self.encode(result),
                 'date_done': datetime.utcnow(),
                 'traceback': self.encode(traceback),

+ 2 - 2
celery/contrib/abortable.py

@@ -132,9 +132,9 @@ class AbortableAsyncResult(AsyncResult):
 
         """
         # TODO: store_result requires all four arguments to be set,
-        # but only status should be updated here
+        # but only state should be updated here
         return self.backend.store_result(self.id, result=None,
-                                         status=ABORTED, traceback=None)
+                                         state=ABORTED, traceback=None)
 
 
 class AbortableTask(Task):

+ 3 - 3
celery/result.py

@@ -170,8 +170,8 @@ class AsyncResult(ResultBase):
         )
         if meta:
             self._maybe_set_cache(meta)
-            status = meta['status']
-            if status in PROPAGATE_STATES and propagate:
+            state = meta['status']
+            if state in PROPAGATE_STATES and propagate:
                 raise meta['result']
             if callback is not None:
                 callback(self.id, meta['result'])
@@ -395,7 +395,7 @@ class AsyncResult(ResultBase):
 
         """
         return self._get_task_meta()['status']
-    status = state
+    status = state  # XXX compat
 
     @property
     def task_id(self):

+ 2 - 2
celery/tests/backends/test_amqp.py

@@ -57,7 +57,7 @@ class test_AMQPBackend(AppCase):
         tid = uuid()
 
         tb1.mark_as_done(tid, 42)
-        self.assertEqual(tb2.get_status(tid), states.SUCCESS)
+        self.assertEqual(tb2.get_state(tid), states.SUCCESS)
         self.assertEqual(tb2.get_result(tid), 42)
         self.assertTrue(tb2._cache.get(tid))
         self.assertTrue(tb2.get_result(tid), 42)
@@ -92,7 +92,7 @@ class test_AMQPBackend(AppCase):
         except KeyError as exception:
             einfo = ExceptionInfo()
             tb1.mark_as_failure(tid3, exception, traceback=einfo.traceback)
-            self.assertEqual(tb2.get_status(tid3), states.FAILURE)
+            self.assertEqual(tb2.get_state(tid3), states.FAILURE)
             self.assertIsInstance(tb2.get_result(tid3), KeyError)
             self.assertEqual(tb2.get_traceback(tid3), einfo.traceback)
 

+ 4 - 4
celery/tests/backends/test_base.py

@@ -337,9 +337,9 @@ class test_KeyValueStoreBackend(AppCase):
         tid = uuid()
         self.b.mark_as_done(tid, 'Hello world')
         self.assertEqual(self.b.get_result(tid), 'Hello world')
-        self.assertEqual(self.b.get_status(tid), states.SUCCESS)
+        self.assertEqual(self.b.get_state(tid), states.SUCCESS)
         self.b.forget(tid)
-        self.assertEqual(self.b.get_status(tid), states.PENDING)
+        self.assertEqual(self.b.get_state(tid), states.PENDING)
 
     def test_strip_prefix(self):
         x = self.b.get_key_for_task('x1b34')
@@ -529,7 +529,7 @@ class test_KeyValueStoreBackend(AppCase):
 
     def test_get_missing_meta(self):
         self.assertIsNone(self.b.get_result('xxx-missing'))
-        self.assertEqual(self.b.get_status('xxx-missing'), states.PENDING)
+        self.assertEqual(self.b.get_state('xxx-missing'), states.PENDING)
 
     def test_save_restore_delete_group(self):
         tid = uuid()
@@ -583,4 +583,4 @@ class test_DisabledBackend(AppCase):
 
     def test_is_disabled(self):
         with self.assertRaises(NotImplementedError):
-            DisabledBackend(self.app).get_status('foo')
+            DisabledBackend(self.app).get_state('foo')

+ 7 - 7
celery/tests/backends/test_cache.py

@@ -41,11 +41,11 @@ class test_CacheBackend(AppCase):
             CacheBackend(backend=None, app=self.app)
 
     def test_mark_as_done(self):
-        self.assertEqual(self.tb.get_status(self.tid), states.PENDING)
+        self.assertEqual(self.tb.get_state(self.tid), states.PENDING)
         self.assertIsNone(self.tb.get_result(self.tid))
 
         self.tb.mark_as_done(self.tid, 42)
-        self.assertEqual(self.tb.get_status(self.tid), states.SUCCESS)
+        self.assertEqual(self.tb.get_state(self.tid), states.SUCCESS)
         self.assertEqual(self.tb.get_result(self.tid), 42)
 
     def test_is_pickled(self):
@@ -61,7 +61,7 @@ class test_CacheBackend(AppCase):
             raise KeyError('foo')
         except KeyError as exception:
             self.tb.mark_as_failure(self.tid, exception)
-            self.assertEqual(self.tb.get_status(self.tid), states.FAILURE)
+            self.assertEqual(self.tb.get_state(self.tid), states.FAILURE)
             self.assertIsInstance(self.tb.get_result(self.tid), KeyError)
 
     def test_apply_chord(self):
@@ -219,7 +219,7 @@ class test_memcache_key(AppCase, MockCacheMixin):
                     cache._imp = [None]
                     task_id, result = string(uuid()), 42
                     b = cache.CacheBackend(backend='memcache', app=self.app)
-                    b.store_result(task_id, result, status=states.SUCCESS)
+                    b.store_result(task_id, result, state=states.SUCCESS)
                     self.assertEqual(b.get_result(task_id), result)
 
     def test_memcache_bytes_key(self):
@@ -230,7 +230,7 @@ class test_memcache_key(AppCase, MockCacheMixin):
                     cache._imp = [None]
                     task_id, result = str_to_bytes(uuid()), 42
                     b = cache.CacheBackend(backend='memcache', app=self.app)
-                    b.store_result(task_id, result, status=states.SUCCESS)
+                    b.store_result(task_id, result, state=states.SUCCESS)
                     self.assertEqual(b.get_result(task_id), result)
 
     def test_pylibmc_unicode_key(self):
@@ -240,7 +240,7 @@ class test_memcache_key(AppCase, MockCacheMixin):
                 cache._imp = [None]
                 task_id, result = string(uuid()), 42
                 b = cache.CacheBackend(backend='memcache', app=self.app)
-                b.store_result(task_id, result, status=states.SUCCESS)
+                b.store_result(task_id, result, state=states.SUCCESS)
                 self.assertEqual(b.get_result(task_id), result)
 
     def test_pylibmc_bytes_key(self):
@@ -250,5 +250,5 @@ class test_memcache_key(AppCase, MockCacheMixin):
                 cache._imp = [None]
                 task_id, result = str_to_bytes(uuid()), 42
                 b = cache.CacheBackend(backend='memcache', app=self.app)
-                b.store_result(task_id, result, status=states.SUCCESS)
+                b.store_result(task_id, result, state=states.SUCCESS)
                 self.assertEqual(b.get_result(task_id), result)

+ 7 - 7
celery/tests/backends/test_database.py

@@ -90,7 +90,7 @@ class test_DatabaseBackend(AppCase):
 
     def test_missing_task_id_is_PENDING(self):
         tb = DatabaseBackend(self.uri, app=self.app)
-        self.assertEqual(tb.get_status('xxx-does-not-exist'), states.PENDING)
+        self.assertEqual(tb.get_state('xxx-does-not-exist'), states.PENDING)
 
     def test_missing_task_meta_is_dict_with_pending(self):
         tb = DatabaseBackend(self.uri, app=self.app)
@@ -106,11 +106,11 @@ class test_DatabaseBackend(AppCase):
 
         tid = uuid()
 
-        self.assertEqual(tb.get_status(tid), states.PENDING)
+        self.assertEqual(tb.get_state(tid), states.PENDING)
         self.assertIsNone(tb.get_result(tid))
 
         tb.mark_as_done(tid, 42)
-        self.assertEqual(tb.get_status(tid), states.SUCCESS)
+        self.assertEqual(tb.get_state(tid), states.SUCCESS)
         self.assertEqual(tb.get_result(tid), 42)
 
     def test_is_pickled(self):
@@ -128,13 +128,13 @@ class test_DatabaseBackend(AppCase):
         tb = DatabaseBackend(self.uri, app=self.app)
         tid = uuid()
         tb.mark_as_started(tid)
-        self.assertEqual(tb.get_status(tid), states.STARTED)
+        self.assertEqual(tb.get_state(tid), states.STARTED)
 
     def test_mark_as_revoked(self):
         tb = DatabaseBackend(self.uri, app=self.app)
         tid = uuid()
         tb.mark_as_revoked(tid)
-        self.assertEqual(tb.get_status(tid), states.REVOKED)
+        self.assertEqual(tb.get_state(tid), states.REVOKED)
 
     def test_mark_as_retry(self):
         tb = DatabaseBackend(self.uri, app=self.app)
@@ -145,7 +145,7 @@ class test_DatabaseBackend(AppCase):
             import traceback
             trace = '\n'.join(traceback.format_stack())
             tb.mark_as_retry(tid, exception, traceback=trace)
-            self.assertEqual(tb.get_status(tid), states.RETRY)
+            self.assertEqual(tb.get_state(tid), states.RETRY)
             self.assertIsInstance(tb.get_result(tid), KeyError)
             self.assertEqual(tb.get_traceback(tid), trace)
 
@@ -159,7 +159,7 @@ class test_DatabaseBackend(AppCase):
             import traceback
             trace = '\n'.join(traceback.format_stack())
             tb.mark_as_failure(tid3, exception, traceback=trace)
-            self.assertEqual(tb.get_status(tid3), states.FAILURE)
+            self.assertEqual(tb.get_state(tid3), states.FAILURE)
             self.assertIsInstance(tb.get_result(tid3), KeyError)
             self.assertEqual(tb.get_traceback(tid3), trace)
 

+ 2 - 2
celery/tests/backends/test_filesystem.py

@@ -45,7 +45,7 @@ class test_FilesystemBackend(AppCase):
 
     def test_missing_task_is_PENDING(self):
         tb = FilesystemBackend(app=self.app, url=self.url)
-        self.assertEqual(tb.get_status('xxx-does-not-exist'), states.PENDING)
+        self.assertEqual(tb.get_state('xxx-does-not-exist'), states.PENDING)
 
     def test_mark_as_done_writes_file(self):
         tb = FilesystemBackend(app=self.app, url=self.url)
@@ -56,7 +56,7 @@ class test_FilesystemBackend(AppCase):
         tb = FilesystemBackend(app=self.app, url=self.url)
         tid = uuid()
         tb.mark_as_done(tid, 42)
-        self.assertEqual(tb.get_status(tid), states.SUCCESS)
+        self.assertEqual(tb.get_state(tid), states.SUCCESS)
 
     def test_correct_result(self):
         data = {'foo': 'bar'}

+ 2 - 2
celery/tests/backends/test_redis.py

@@ -385,10 +385,10 @@ class test_RedisBackend(AppCase):
     def test_get_set_forget(self):
         tid = uuid()
         self.b.store_result(tid, 42, states.SUCCESS)
-        self.assertEqual(self.b.get_status(tid), states.SUCCESS)
+        self.assertEqual(self.b.get_state(tid), states.SUCCESS)
         self.assertEqual(self.b.get_result(tid), 42)
         self.b.forget(tid)
-        self.assertEqual(self.b.get_status(tid), states.PENDING)
+        self.assertEqual(self.b.get_state(tid), states.PENDING)
 
     def test_set_expires(self):
         self.b = self.Backend(expires=512, app=self.app)