瀏覽代碼

Merge branch 'master' of github.com:celery/celery

Ask Solem 10 年之前
父節點
當前提交
b286238401

+ 14 - 13
README.rst

@@ -166,19 +166,19 @@ Framework Integration
 Celery is easy to integrate with web frameworks, some of which even have
 Celery is easy to integrate with web frameworks, some of which even have
 integration packages:
 integration packages:
 
 
-    +--------------------+------------------------+
-    | `Django`_          | not needed             |
-    +--------------------+------------------------+
-    | `Pyramid`_         | `pyramid_celery`_      |
-    +--------------------+------------------------+
-    | `Pylons`_          | `celery-pylons`_       |
-    +--------------------+------------------------+
-    | `Flask`_           | not needed             |
-    +--------------------+------------------------+
-    | `web2py`_          | `web2py-celery`_       |
-    +--------------------+------------------------+
-    | `Tornado`_         | `tornado-celery`_      |
-    +--------------------+------------------------+
+    +--------------------+----------------------------------------------------+
+    | `Django`_          | not needed                                         |
+    +--------------------+----------------------------------------------------+
+    | `Pyramid`_         | `pyramid_celery`_                                  |
+    +--------------------+----------------------------------------------------+
+    | `Pylons`_          | `celery-pylons`_                                   |
+    +--------------------+----------------------------------------------------+
+    | `Flask`_           | not needed                                         |
+    +--------------------+----------------------------------------------------+
+    | `web2py`_          | `web2py-celery`_                                   |
+    +--------------------+----------------------------------------------------+
+    | `Tornado`_         | `tornado-celery`_ | `another tornado-celery`_      |
+    +--------------------+----------------------------------------------------+
 
 
 The integration packages are not strictly necessary, but they can make
 The integration packages are not strictly necessary, but they can make
 development easier, and sometimes they add important hooks like closing
 development easier, and sometimes they add important hooks like closing
@@ -196,6 +196,7 @@ database connections at ``fork``.
 .. _`web2py-celery`: http://code.google.com/p/web2py-celery/
 .. _`web2py-celery`: http://code.google.com/p/web2py-celery/
 .. _`Tornado`: http://www.tornadoweb.org/
 .. _`Tornado`: http://www.tornadoweb.org/
 .. _`tornado-celery`: http://github.com/mher/tornado-celery/
 .. _`tornado-celery`: http://github.com/mher/tornado-celery/
+.. _`another tornado-celery`: https://github.com/mayflaver/tornado-celery
 
 
 .. _celery-documentation:
 .. _celery-documentation:
 
 

+ 2 - 1
celery/app/task.py

@@ -471,13 +471,14 @@ class Task(object):
         if self.__self__ is not None:
         if self.__self__ is not None:
             args = args if isinstance(args, tuple) else tuple(args or ())
             args = args if isinstance(args, tuple) else tuple(args or ())
             args = (self.__self__, ) + args
             args = (self.__self__, ) + args
+            shadow = shadow or self.shadow_name(args, kwargs, final_options)
 
 
         preopts = self._get_exec_options()
         preopts = self._get_exec_options()
         options = dict(preopts, **options) if options else preopts
         options = dict(preopts, **options) if options else preopts
         return app.send_task(
         return app.send_task(
             self.name, args, kwargs, task_id=task_id, producer=producer,
             self.name, args, kwargs, task_id=task_id, producer=producer,
             link=link, link_error=link_error, result_cls=self.AsyncResult,
             link=link, link_error=link_error, result_cls=self.AsyncResult,
-            shadow=shadow or self.shadow_name(args, kwargs, options),
+            shadow=shadow,
             **options
             **options
         )
         )
 
 

+ 1 - 1
celery/backends/base.py

@@ -118,7 +118,7 @@ class BaseBackend(object):
                                  status=states.SUCCESS, request=request)
                                  status=states.SUCCESS, request=request)
 
 
     def mark_as_failure(self, task_id, exc, traceback=None, request=None):
     def mark_as_failure(self, task_id, exc, traceback=None, request=None):
-        """Mark task as executed with failure. Stores the execption."""
+        """Mark task as executed with failure. Stores the exception."""
         return self.store_result(task_id, exc, status=states.FAILURE,
         return self.store_result(task_id, exc, status=states.FAILURE,
                                  traceback=traceback, request=request)
                                  traceback=traceback, request=request)
 
 

+ 4 - 0
celery/backends/couchbase.py

@@ -17,6 +17,7 @@ try:
 except ImportError:
 except ImportError:
     Couchbase = Connection = NotFoundError = None   # noqa
     Couchbase = Connection = NotFoundError = None   # noqa
 
 
+from kombu.utils.encoding import str_t
 from kombu.utils.url import _parse_url
 from kombu.utils.url import _parse_url
 
 
 from celery.exceptions import ImproperlyConfigured
 from celery.exceptions import ImproperlyConfigured
@@ -38,6 +39,9 @@ class CouchBaseBackend(KeyValueStoreBackend):
     timeout = 2.5
     timeout = 2.5
     transcoder = None
     transcoder = None
     # supports_autoexpire = False
     # supports_autoexpire = False
+    
+    # Use str as couchbase key not bytes
+    key_t = str_t
 
 
     def __init__(self, url=None, *args, **kwargs):
     def __init__(self, url=None, *args, **kwargs):
         """Initialize CouchBase backend instance.
         """Initialize CouchBase backend instance.

+ 1 - 0
celery/backends/redis.py

@@ -85,6 +85,7 @@ class RedisBackend(KeyValueStoreBackend):
             'port': _get('PORT') or 6379,
             'port': _get('PORT') or 6379,
             'db': _get('DB') or 0,
             'db': _get('DB') or 0,
             'password': _get('PASSWORD'),
             'password': _get('PASSWORD'),
+            'socket_timeout': _get('SOCKET_TIMEOUT'),
             'max_connections': self.max_connections,
             'max_connections': self.max_connections,
         }
         }
         if url:
         if url:

+ 51 - 13
celery/tests/backends/test_couchbase.py

@@ -1,5 +1,9 @@
+"""Tests for the CouchBaseBackend."""
+
 from __future__ import absolute_import
 from __future__ import absolute_import
 
 
+from kombu.utils.encoding import str_t
+
 from celery.backends import couchbase as module
 from celery.backends import couchbase as module
 from celery.backends.couchbase import CouchBaseBackend
 from celery.backends.couchbase import CouchBaseBackend
 from celery.exceptions import ImproperlyConfigured
 from celery.exceptions import ImproperlyConfigured
@@ -18,32 +22,42 @@ COUCHBASE_BUCKET = 'celery_bucket'
 
 
 class test_CouchBaseBackend(AppCase):
 class test_CouchBaseBackend(AppCase):
 
 
+    """CouchBaseBackend TestCase."""
+
     def setup(self):
     def setup(self):
+        """Skip the test if couchbase cannot be imported."""
         if couchbase is None:
         if couchbase is None:
             raise SkipTest('couchbase is not installed.')
             raise SkipTest('couchbase is not installed.')
         self.backend = CouchBaseBackend(app=self.app)
         self.backend = CouchBaseBackend(app=self.app)
 
 
     def test_init_no_couchbase(self):
     def test_init_no_couchbase(self):
-        """test init no couchbase raises"""
-        prev, module.couchbase = module.couchbase, None
+        """
+        Test init no couchbase raises.
+
+        If celery.backends.couchbase cannot import the couchbase client, it
+        sets the couchbase.Couchbase to None and then handles this in the
+        CouchBaseBackend __init__ method.
+        """
+        prev, module.Couchbase = module.Couchbase, None
         try:
         try:
             with self.assertRaises(ImproperlyConfigured):
             with self.assertRaises(ImproperlyConfigured):
                 CouchBaseBackend(app=self.app)
                 CouchBaseBackend(app=self.app)
         finally:
         finally:
-            module.couchbase = prev
+            module.Couchbase = prev
 
 
     def test_init_no_settings(self):
     def test_init_no_settings(self):
-        """test init no settings"""
+        """Test init no settings."""
         self.app.conf.CELERY_COUCHBASE_BACKEND_SETTINGS = []
         self.app.conf.CELERY_COUCHBASE_BACKEND_SETTINGS = []
         with self.assertRaises(ImproperlyConfigured):
         with self.assertRaises(ImproperlyConfigured):
             CouchBaseBackend(app=self.app)
             CouchBaseBackend(app=self.app)
 
 
     def test_init_settings_is_None(self):
     def test_init_settings_is_None(self):
-        """Test init settings is None"""
+        """Test init settings is None."""
         self.app.conf.CELERY_COUCHBASE_BACKEND_SETTINGS = None
         self.app.conf.CELERY_COUCHBASE_BACKEND_SETTINGS = None
         CouchBaseBackend(app=self.app)
         CouchBaseBackend(app=self.app)
 
 
     def test_get_connection_connection_exists(self):
     def test_get_connection_connection_exists(self):
+        """Test _get_connection works."""
         with patch('couchbase.connection.Connection') as mock_Connection:
         with patch('couchbase.connection.Connection') as mock_Connection:
             self.backend._connection = sentinel._connection
             self.backend._connection = sentinel._connection
 
 
@@ -53,12 +67,13 @@ class test_CouchBaseBackend(AppCase):
             self.assertFalse(mock_Connection.called)
             self.assertFalse(mock_Connection.called)
 
 
     def test_get(self):
     def test_get(self):
-        """test_get
+        """
+        Test get method.
 
 
         CouchBaseBackend.get should return  and take two params
         CouchBaseBackend.get should return  and take two params
         db conn to couchbase is mocked.
         db conn to couchbase is mocked.
-        TODO Should test on key not exists
 
 
+        TODO Should test on key not exists
         """
         """
         self.app.conf.CELERY_COUCHBASE_BACKEND_SETTINGS = {}
         self.app.conf.CELERY_COUCHBASE_BACKEND_SETTINGS = {}
         x = CouchBaseBackend(app=self.app)
         x = CouchBaseBackend(app=self.app)
@@ -70,11 +85,11 @@ class test_CouchBaseBackend(AppCase):
         x._connection.get.assert_called_once_with('1f3fab')
         x._connection.get.assert_called_once_with('1f3fab')
 
 
     def test_set(self):
     def test_set(self):
-        """test_set
+        """
+        Test set method.
 
 
         CouchBaseBackend.set should return None and take two params
         CouchBaseBackend.set should return None and take two params
         db conn to couchbase is mocked.
         db conn to couchbase is mocked.
-
         """
         """
         self.app.conf.CELERY_COUCHBASE_BACKEND_SETTINGS = None
         self.app.conf.CELERY_COUCHBASE_BACKEND_SETTINGS = None
         x = CouchBaseBackend(app=self.app)
         x = CouchBaseBackend(app=self.app)
@@ -84,12 +99,13 @@ class test_CouchBaseBackend(AppCase):
         self.assertIsNone(x.set(sentinel.key, sentinel.value))
         self.assertIsNone(x.set(sentinel.key, sentinel.value))
 
 
     def test_delete(self):
     def test_delete(self):
-        """test_delete
+        """
+        Test delete method.
 
 
         CouchBaseBackend.delete should return and take two params
         CouchBaseBackend.delete should return and take two params
         db conn to couchbase is mocked.
         db conn to couchbase is mocked.
-        TODO Should test on key not exists
 
 
+        TODO Should test on key not exists.
         """
         """
         self.app.conf.CELERY_COUCHBASE_BACKEND_SETTINGS = {}
         self.app.conf.CELERY_COUCHBASE_BACKEND_SETTINGS = {}
         x = CouchBaseBackend(app=self.app)
         x = CouchBaseBackend(app=self.app)
@@ -101,9 +117,10 @@ class test_CouchBaseBackend(AppCase):
         x._connection.delete.assert_called_once_with('1f3fab')
         x._connection.delete.assert_called_once_with('1f3fab')
 
 
     def test_config_params(self):
     def test_config_params(self):
-        """test_config_params
+        """
+        Test config params are correct.
 
 
-        celery.conf.CELERY_COUCHBASE_BACKEND_SETTINGS is properly set
+        celery.conf.CELERY_COUCHBASE_BACKEND_SETTINGS is properly set.
         """
         """
         self.app.conf.CELERY_COUCHBASE_BACKEND_SETTINGS = {
         self.app.conf.CELERY_COUCHBASE_BACKEND_SETTINGS = {
             'bucket': 'mycoolbucket',
             'bucket': 'mycoolbucket',
@@ -120,12 +137,14 @@ class test_CouchBaseBackend(AppCase):
         self.assertEqual(x.port, 1234)
         self.assertEqual(x.port, 1234)
 
 
     def test_backend_by_url(self, url='couchbase://myhost/mycoolbucket'):
     def test_backend_by_url(self, url='couchbase://myhost/mycoolbucket'):
+        """Test that a CouchBaseBackend is loaded from the couchbase url."""
         from celery.backends.couchbase import CouchBaseBackend
         from celery.backends.couchbase import CouchBaseBackend
         backend, url_ = backends.get_backend_by_url(url, self.app.loader)
         backend, url_ = backends.get_backend_by_url(url, self.app.loader)
         self.assertIs(backend, CouchBaseBackend)
         self.assertIs(backend, CouchBaseBackend)
         self.assertEqual(url_, url)
         self.assertEqual(url_, url)
 
 
     def test_backend_params_by_url(self):
     def test_backend_params_by_url(self):
+        """Test config params are correct from config url."""
         url = 'couchbase://johndoe:mysecret@myhost:123/mycoolbucket'
         url = 'couchbase://johndoe:mysecret@myhost:123/mycoolbucket'
         with self.Celery(backend=url) as app:
         with self.Celery(backend=url) as app:
             x = app.backend
             x = app.backend
@@ -134,3 +153,22 @@ class test_CouchBaseBackend(AppCase):
             self.assertEqual(x.username, 'johndoe')
             self.assertEqual(x.username, 'johndoe')
             self.assertEqual(x.password, 'mysecret')
             self.assertEqual(x.password, 'mysecret')
             self.assertEqual(x.port, 123)
             self.assertEqual(x.port, 123)
+
+    def test_correct_key_types(self):
+        """
+        Test that the key is the correct type for the couchbase python API.
+
+        We check that get_key_for_task, get_key_for_chord, and
+        get_key_for_group always returns a python string. Need to use str_t
+        for cross Python reasons.
+        """
+        keys = [
+            self.backend.get_key_for_task('task_id', bytes('key')),
+            self.backend.get_key_for_chord('group_id', bytes('key')),
+            self.backend.get_key_for_group('group_id', bytes('key')),
+            self.backend.get_key_for_task('task_id', 'key'),
+            self.backend.get_key_for_chord('group_id', 'key'),
+            self.backend.get_key_for_group('group_id', 'key'),
+        ]
+        for key in keys:
+            self.assertIsInstance(key, str_t)

+ 5 - 5
docs/userguide/monitoring.rst

@@ -188,16 +188,16 @@ Features
 
 
 - Real-time monitoring using Celery Events
 - Real-time monitoring using Celery Events
 
 
-    - Task progress and history.
+    - Task progress and history
     - Ability to show task details (arguments, start time, runtime, and more)
     - Ability to show task details (arguments, start time, runtime, and more)
     - Graphs and statistics
     - Graphs and statistics
 
 
 - Remote Control
 - Remote Control
 
 
-    - View worker status and statistics.
-    - Shutdown and restart worker instances.
-    - Control worker pool size and autoscale settings.
-    - View and modify the queues a worker instance consumes from.
+    - View worker status and statistics
+    - Shutdown and restart worker instances
+    - Control worker pool size and autoscale settings
+    - View and modify the queues a worker instance consumes from
     - View currently running tasks
     - View currently running tasks
     - View scheduled tasks (ETA/countdown)
     - View scheduled tasks (ETA/countdown)
     - View reserved and revoked tasks
     - View reserved and revoked tasks

+ 1 - 1
docs/userguide/workers.rst

@@ -887,7 +887,7 @@ You can get a list of tasks waiting to be scheduled by using
 Dump of reserved tasks
 Dump of reserved tasks
 ----------------------
 ----------------------
 
 
-Reserved tasks are tasks that has been received, but is still waiting to be
+Reserved tasks are tasks that have been received, but are still waiting to be
 executed.
 executed.
 
 
 You can get a list of these using
 You can get a list of these using