|
@@ -20,7 +20,7 @@ if pymongo:
|
|
|
from bson.binary import Binary
|
|
|
except ImportError: # pragma: no cover
|
|
|
from pymongo.binary import Binary # noqa
|
|
|
- from pymongo.errors import InvalidDocument # noqa
|
|
|
+ from pymongo.errors import InvalidDocument # noqa
|
|
|
else: # pragma: no cover
|
|
|
Binary = None # noqa
|
|
|
InvalidDocument = None # noqa
|
|
@@ -39,13 +39,16 @@ from .base import BaseBackend
|
|
|
|
|
|
__all__ = ['MongoBackend']
|
|
|
|
|
|
-BINARY_CODECS = frozenset(['pickle','msgpack'])
|
|
|
+BINARY_CODECS = frozenset(['pickle', 'msgpack'])
|
|
|
+
|
|
|
+# register a fake bson serializer which will return the document as it is
|
|
|
+
|
|
|
|
|
|
-#register a fake bson serializer which will return the document as it is
|
|
|
class bson_serializer():
|
|
|
+
|
|
|
@staticmethod
|
|
|
def loads(obj, *args, **kwargs):
|
|
|
- if isinstance(obj,string_t):
|
|
|
+ if isinstance(obj, string_t):
|
|
|
try:
|
|
|
from anyjson import loads
|
|
|
return loads(obj)
|
|
@@ -58,10 +61,11 @@ class bson_serializer():
|
|
|
return obj
|
|
|
|
|
|
register('bson', bson_serializer.loads, bson_serializer.dumps,
|
|
|
- content_type='application/data',
|
|
|
- content_encoding='utf-8')
|
|
|
+ content_type='application/data',
|
|
|
+ content_encoding='utf-8')
|
|
|
+
|
|
|
+disable_insecure_serializers(['json', 'bson'])
|
|
|
|
|
|
-disable_insecure_serializers(['json','bson'])
|
|
|
|
|
|
class Bunch(object):
|
|
|
|
|
@@ -77,6 +81,7 @@ class MongoBackend(BaseBackend):
|
|
|
password = None
|
|
|
database_name = 'celery'
|
|
|
taskmeta_collection = 'celery_taskmeta'
|
|
|
+ groupmeta_collection = 'celery_groupmeta'
|
|
|
max_pool_size = 10
|
|
|
options = None
|
|
|
|
|
@@ -123,7 +128,7 @@ class MongoBackend(BaseBackend):
|
|
|
'taskmeta_collection', self.taskmeta_collection,
|
|
|
)
|
|
|
self.groupmeta_collection = config.pop(
|
|
|
- 'groupmeta_collection', self.taskmeta_collection,
|
|
|
+ 'groupmeta_collection', self.groupmeta_collection,
|
|
|
)
|
|
|
|
|
|
self.options = dict(config, **config.pop('options', None) or {})
|
|
@@ -138,7 +143,6 @@ class MongoBackend(BaseBackend):
|
|
|
# Specifying backend as an URL
|
|
|
self.host = url
|
|
|
|
|
|
-
|
|
|
def _get_connection(self):
|
|
|
"""Connect to the MongoDB server."""
|
|
|
if self._connection is None:
|
|
@@ -172,7 +176,7 @@ class MongoBackend(BaseBackend):
|
|
|
|
|
|
def encode(self, data):
|
|
|
payload = super(MongoBackend, self).encode(data)
|
|
|
- #serializer which are in a unsupported format (pickle/binary)
|
|
|
+ # serializer which are in a unsupported format (pickle/binary)
|
|
|
if self.serializer in BINARY_CODECS:
|
|
|
payload = Binary(payload)
|
|
|
|
|
@@ -213,7 +217,7 @@ class MongoBackend(BaseBackend):
|
|
|
# if collection don't contain it try searching in the
|
|
|
# group_collection it could be a groupresult instead
|
|
|
obj = self.collection.find_one({'_id': task_id}) or \
|
|
|
- self.group_collection.find_one({'_id': task_id})
|
|
|
+ self.group_collection.find_one({'_id': task_id})
|
|
|
if not obj:
|
|
|
return {'status': states.PENDING, 'result': None}
|
|
|
|
|
@@ -231,7 +235,7 @@ class MongoBackend(BaseBackend):
|
|
|
def _save_group(self, group_id, result):
|
|
|
"""Save the group result."""
|
|
|
|
|
|
- task_ids = [ i.id for i in result ]
|
|
|
+ task_ids = [i.id for i in result]
|
|
|
|
|
|
meta = {'_id': group_id,
|
|
|
'result': self.encode(task_ids),
|
|
@@ -248,7 +252,7 @@ class MongoBackend(BaseBackend):
|
|
|
|
|
|
tasks = self.decode(obj['result'])
|
|
|
|
|
|
- tasks = [ AsyncResult(task) for task in tasks ]
|
|
|
+ tasks = [AsyncResult(task) for task in tasks]
|
|
|
|
|
|
meta = {
|
|
|
'task_id': obj['_id'],
|