瀏覽代碼

Redis: ?new_join=1 is now the default

Ask Solem 9 年之前
父節點
當前提交
2ccf237436
共有 4 個文件被更改,包括 24 次插入48 次删除
  1. 3 2
      celery/app/task.py
  2. 4 12
      celery/backends/redis.py
  3. 15 30
      celery/tests/backends/test_redis.py
  4. 2 4
      funtests/stress/stress/templates.py

+ 3 - 2
celery/app/task.py

@@ -788,8 +788,9 @@ class Task(object):
         :param lazy: If enabled the new task will not actually be called,
                       and ``sig.delay()`` must be called manually.
 
-        Currently only supported by the Redis result backend when
-        ``?new_join=1`` is enabled.
+        .. versionadded:: 4.0
+
+        Currently only supported by the Redis result backend.
 
         """
         if not self.request.chord:

+ 4 - 12
celery/backends/redis.py

@@ -58,7 +58,7 @@ class RedisBackend(KeyValueStoreBackend):
 
     def __init__(self, host=None, port=None, db=None, password=None,
                  max_connections=None, url=None,
-                 connection_pool=None, new_join=False, **kwargs):
+                 connection_pool=None, **kwargs):
         super(RedisBackend, self).__init__(expires_type=int, **kwargs)
         _get = self.app.conf.get
         if self.redis is None:
@@ -87,14 +87,6 @@ class RedisBackend(KeyValueStoreBackend):
             self.connparams = self._params_from_url(url, self.connparams)
         self.url = url
 
-        try:
-            new_join = strtobool(self.connparams.pop('new_join'))
-        except KeyError:
-            pass
-        if new_join:
-            self.apply_chord = self._new_chord_apply
-            self.on_chord_part_return = self._new_chord_return
-
         self.connection_errors, self.channel_errors = (
             get_redis_error_classes() if get_redis_error_classes
             else ((), ()))
@@ -185,13 +177,13 @@ class RedisBackend(KeyValueStoreBackend):
             raise ChordError('Dependency {0} raised {1!r}'.format(tid, retval))
         return retval
 
-    def _new_chord_apply(self, header, partial_args, group_id, body,
-                         result=None, options={}, **kwargs):
+    def apply_chord(self, header, partial_args, group_id, body,
+                    result=None, options={}, **kwargs):
         # avoids saving the group in the redis db.
         options['task_id'] = group_id
         return header(*partial_args, **options or {})
 
-    def _new_chord_return(self, request, state, result, propagate=None):
+    def on_chord_part_return(self, request, state, result, propagate=None):
         app = self.app
         tid, gid = request.id, request.group
         if not gid or not tid:

+ 15 - 30
celery/tests/backends/test_redis.py

@@ -128,7 +128,7 @@ class test_RedisBackend(AppCase):
     def test_reduce(self):
         try:
             from celery.backends.redis import RedisBackend
-            x = RedisBackend(app=self.app, new_join=True)
+            x = RedisBackend(app=self.app)
             self.assertTrue(loads(dumps(x)))
         except ImportError:
             raise SkipTest('redis not installed')
@@ -136,12 +136,11 @@ class test_RedisBackend(AppCase):
     def test_no_redis(self):
         self.Backend.redis = None
         with self.assertRaises(ImproperlyConfigured):
-            self.Backend(app=self.app, new_join=True)
+            self.Backend(app=self.app)
 
     def test_url(self):
         x = self.Backend(
             'redis://:bosco@vandelay.com:123//1', app=self.app,
-            new_join=True,
         )
         self.assertTrue(x.connparams)
         self.assertEqual(x.connparams['host'], 'vandelay.com')
@@ -152,7 +151,6 @@ class test_RedisBackend(AppCase):
     def test_socket_url(self):
         x = self.Backend(
             'socket:///tmp/redis.sock?virtual_host=/3', app=self.app,
-            new_join=True,
         )
         self.assertTrue(x.connparams)
         self.assertEqual(x.connparams['path'], '/tmp/redis.sock')
@@ -167,7 +165,6 @@ class test_RedisBackend(AppCase):
     def test_compat_propertie(self):
         x = self.Backend(
             'redis://:bosco@vandelay.com:123//1', app=self.app,
-            new_join=True,
         )
         with self.assertPendingDeprecation():
             self.assertEqual(x.host, 'vandelay.com')
@@ -185,65 +182,53 @@ class test_RedisBackend(AppCase):
             'result_expires': None,
             'accept_content': ['json'],
         })
-        self.Backend(app=self.app, new_join=True)
+        self.Backend(app=self.app)
 
     def test_expires_defaults_to_config(self):
         self.app.conf.result_expires = 10
-        b = self.Backend(expires=None, app=self.app, new_join=True)
+        b = self.Backend(expires=None, app=self.app)
         self.assertEqual(b.expires, 10)
 
     def test_expires_is_int(self):
-        b = self.Backend(expires=48, app=self.app, new_join=True)
+        b = self.Backend(expires=48, app=self.app)
         self.assertEqual(b.expires, 48)
 
-    def test_set_new_join_from_url_query(self):
-        b = self.Backend('redis://?new_join=True;foobar=1', app=self.app)
-        self.assertEqual(b.on_chord_part_return, b._new_chord_return)
-        self.assertEqual(b.apply_chord, b._new_chord_apply)
-
     def test_add_to_chord(self):
-        b = self.Backend('redis://?new_join=True', app=self.app)
+        b = self.Backend('redis://', app=self.app)
         gid = uuid()
         b.add_to_chord(gid, 'sig')
         b.client.incr.assert_called_with(b.get_key_for_group(gid, '.t'), 1)
 
-    def test_default_is_old_join(self):
-        b = self.Backend(app=self.app)
-        self.assertNotEqual(b.on_chord_part_return, b._new_chord_return)
-        self.assertNotEqual(b.apply_chord, b._new_chord_apply)
-
     def test_expires_is_None(self):
-        b = self.Backend(expires=None, app=self.app, new_join=True)
+        b = self.Backend(expires=None, app=self.app)
         self.assertEqual(
             b.expires,
             self.app.conf.result_expires.total_seconds(),
         )
 
     def test_expires_is_timedelta(self):
-        b = self.Backend(
-            expires=timedelta(minutes=1), app=self.app, new_join=1,
-        )
+        b = self.Backend(expires=timedelta(minutes=1), app=self.app)
         self.assertEqual(b.expires, 60)
 
     def test_apply_chord(self):
-        self.Backend(app=self.app, new_join=True).apply_chord(
+        self.Backend(app=self.app).apply_chord(
             group(app=self.app), (), 'group_id', {},
             result=[self.app.AsyncResult(x) for x in [1, 2, 3]],
         )
 
     def test_mget(self):
-        b = self.Backend(app=self.app, new_join=True)
+        b = self.Backend(app=self.app)
         self.assertTrue(b.mget(['a', 'b', 'c']))
         b.client.mget.assert_called_with(['a', 'b', 'c'])
 
     def test_set_no_expire(self):
-        b = self.Backend(app=self.app, new_join=True)
+        b = self.Backend(app=self.app)
         b.expires = None
         b.set('foo', 'bar')
 
     @patch('celery.result.GroupResult.restore')
     def test_on_chord_part_return(self, restore):
-        b = self.Backend(app=self.app, new_join=True)
+        b = self.Backend(app=self.app)
 
         def create_task():
             tid = uuid()
@@ -271,10 +256,10 @@ class test_RedisBackend(AppCase):
         ])
 
     def test_process_cleanup(self):
-        self.Backend(app=self.app, new_join=True).process_cleanup()
+        self.Backend(app=self.app).process_cleanup()
 
     def test_get_set_forget(self):
-        b = self.Backend(app=self.app, new_join=True)
+        b = self.Backend(app=self.app)
         tid = uuid()
         b.store_result(tid, 42, states.SUCCESS)
         self.assertEqual(b.get_status(tid), states.SUCCESS)
@@ -283,7 +268,7 @@ class test_RedisBackend(AppCase):
         self.assertEqual(b.get_status(tid), states.PENDING)
 
     def test_set_expires(self):
-        b = self.Backend(expires=512, app=self.app, new_join=True)
+        b = self.Backend(expires=512, app=self.app)
         tid = uuid()
         key = b.get_key_for_task(tid)
         b.store_result(tid, 42, states.SUCCESS)

+ 2 - 4
funtests/stress/stress/templates.py

@@ -85,14 +85,12 @@ class redis(default):
         'fanout_prefix': True,
         'fanout_patterns': True,
     }
-    result_backend = os.environ.get(
-        'CSTRESS_BACKEND', 'redis://?new_join=1',
-    )
+    result_backend = os.environ.get('CSTRESS_BACKEND', 'redis://')
 
 
 @template()
 class redistore(default):
-    result_backend = 'redis://?new_join=1'
+    result_backend = 'redis://'
 
 
 @template()