Ver código fonte

Merge branch 'master' of github.com:celery/celery

Ask Solem 9 anos atrás
pai
commit
a72871d725

+ 1 - 0
CONTRIBUTORS.txt

@@ -156,6 +156,7 @@ Antoine Legrand, 2014/01/09
 Pepijn de Vos, 2014/01/15
 Dan McGee, 2014/01/27
 Paul Kilgo, 2014/01/28
+Môshe van der Sterre, 2014/01/31
 Martin Davidsson, 2014/02/08
 Chris Clark, 2014/02/20
 Matthew Duggan, 2014/04/10

+ 1 - 0
celery/backends/__init__.py

@@ -33,6 +33,7 @@ BACKEND_ALIASES = {
     'couchbase': 'celery.backends.couchbase:CouchBaseBackend',
     'couchdb': 'celery.backends.couchdb:CouchDBBackend',
     'riak': 'celery.backends.riak:RiakBackend',
+    'file': 'celery.backends.filesystem:FilesystemBackend',
     'disabled': 'celery.backends.base:DisabledBackend',
 }
 

+ 93 - 0
celery/backends/filesystem.py

@@ -0,0 +1,93 @@
+# -*- coding: utf-8 -*-
+"""
+    celery.backends.filesystem
+    ~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+    Filesystem result store backend.
+"""
+from __future__ import absolute_import
+
+from kombu.utils.encoding import ensure_bytes
+
+from celery.exceptions import ImproperlyConfigured
+from celery.backends.base import KeyValueStoreBackend
+from celery.utils import uuid
+
+import os
+import locale
+default_encoding = locale.getpreferredencoding(False)
+
+# Python 2 does not have FileNotFoundError and IsADirectoryError
+try:
+    FileNotFoundError
+except NameError:
+    FileNotFoundError = IOError
+    IsADirectoryError = IOError
+
+
+class FilesystemBackend(KeyValueStoreBackend):
+    def __init__(self, url=None, open=open, unlink=os.unlink, sep=os.sep,
+                 encoding=default_encoding, *args, **kwargs):
+        """Initialize the filesystem backend.
+
+        Keyword arguments (in addition to those of KeyValueStoreBackend):
+        url      -- URL to the directory we should use
+        open     -- open function to use when opening files
+        unlink   -- unlink function to use when deleting files
+        sep      -- directory seperator (to join the directory with the key)
+        encoding -- encoding used on the filesystem
+
+        """
+
+        super(FilesystemBackend, self).__init__(*args, **kwargs)
+        path = self._find_path(url)
+
+        # We need the path and seperator as bytes objects
+        self.path = path.encode(encoding)
+        self.sep = sep.encode(encoding)
+
+        self.open = open
+        self.unlink = unlink
+
+        # Lets verify that we have everything setup right
+        self._do_directory_test(b'.fs-backend-' + uuid().encode(encoding))
+
+    def _find_path(self, url):
+        if url is not None and url.startswith('file:///'):
+            return url[7:]
+        if hasattr(self.app.conf, 'CELERY_RESULT_FSPATH'):
+            return self.app.conf.CELERY_RESULT_FSPATH
+        raise ImproperlyConfigured(
+            'You need to configure a path for the Filesystem backend')
+
+    def _do_directory_test(self, key):
+        try:
+            self.set(key, b'test value')
+            assert self.get(key) == b'test value'
+            self.delete(key)
+        except IOError:
+            raise ImproperlyConfigured(
+                'The configured path for the Filesystem backend does not '
+                'work correctly, please make sure that it exists and has '
+                'the correct permissions.')
+
+    def _filename(self, key):
+        return self.sep.join((self.path, key))
+
+    def get(self, key):
+        try:
+            with self.open(self._filename(key), 'rb') as infile:
+                return infile.read()
+        except FileNotFoundError:
+            return None
+
+    def set(self, key, value):
+        with self.open(self._filename(key), 'wb') as outfile:
+            outfile.write(ensure_bytes(value))
+
+    def mget(self, keys):
+        for key in keys:
+            yield self.get(key)
+
+    def delete(self, key):
+        self.unlink(self._filename(key))

+ 79 - 0
celery/tests/backends/test_filesystem.py

@@ -0,0 +1,79 @@
+# -*- coding: utf-8 -*-
+from __future__ import absolute_import
+
+from celery import states
+from celery.tests.case import AppCase
+from celery.backends.filesystem import FilesystemBackend
+from celery.exceptions import ImproperlyConfigured
+from celery.utils import uuid
+
+import os
+import shutil
+import tempfile
+
+
+class test_FilesystemBackend(AppCase):
+    def setup(self):
+        self.directory = tempfile.mkdtemp()
+        self.url = 'file://' + self.directory
+        self.path = self.directory.encode('ascii')
+
+    def teardown(self):
+        shutil.rmtree(self.directory)
+
+    def test_a_path_is_required(self):
+        with self.assertRaises(ImproperlyConfigured):
+            FilesystemBackend(app=self.app)
+
+    def test_a_path_in_app_conf(self):
+        self.app.conf.CELERY_RESULT_FSPATH = self.url[7:]
+        tb = FilesystemBackend(app=self.app)
+        self.assertEqual(tb.path, self.path)
+
+    def test_a_path_in_url(self):
+        tb = FilesystemBackend(app=self.app, url=self.url)
+        self.assertEqual(tb.path, self.path)
+
+    def test_path_is_incorrect(self):
+        with self.assertRaises(ImproperlyConfigured):
+            FilesystemBackend(app=self.app, url=self.url + '-incorrect')
+
+    def test_missing_task_is_PENDING(self):
+        tb = FilesystemBackend(app=self.app, url=self.url)
+        self.assertEqual(tb.get_status('xxx-does-not-exist'), states.PENDING)
+
+    def test_mark_as_done_writes_file(self):
+        tb = FilesystemBackend(app=self.app, url=self.url)
+        tb.mark_as_done(uuid(), 42)
+        self.assertEqual(len(os.listdir(self.directory)), 1)
+
+    def test_done_task_is_SUCCESS(self):
+        tb = FilesystemBackend(app=self.app, url=self.url)
+        tid = uuid()
+        tb.mark_as_done(tid, 42)
+        self.assertEqual(tb.get_status(tid), states.SUCCESS)
+
+    def test_correct_result(self):
+        data = {'foo': 'bar'}
+
+        tb = FilesystemBackend(app=self.app, url=self.url)
+        tid = uuid()
+        tb.mark_as_done(tid, data)
+        self.assertEqual(tb.get_result(tid), data)
+
+    def test_get_many(self):
+        data = {uuid(): 'foo', uuid(): 'bar', uuid(): 'baz'}
+
+        tb = FilesystemBackend(app=self.app, url=self.url)
+        for key, value in data.items():
+            tb.mark_as_done(key, value)
+
+        for key, result in tb.get_many(data.keys()):
+            self.assertEqual(result['result'], data[key])
+
+    def test_forget_deletes_file(self):
+        tb = FilesystemBackend(app=self.app, url=self.url)
+        tid = uuid()
+        tb.mark_as_done(tid, 42)
+        tb.forget(tid)
+        self.assertEqual(len(os.listdir(self.directory)), 0)

+ 1 - 0
docs/AUTHORS.txt

@@ -106,6 +106,7 @@ Miguel Hernandez Martos <enlavin@gmail.com>
 Mikhail Gusarov <dottedmag@dottedmag.net>
 Mikhail Korobov <kmike84@gmail.com>
 Mitar <mitar@tnode.com>
+Môshe van der Sterre <me@moshe.nl>
 Neil Chintomby
 Noah Kantrowitz <noah@coderanger.net>
 Norman Richards <orb@nostacktrace.com>

+ 24 - 0
docs/configuration.rst

@@ -527,6 +527,10 @@ Can be one of the following:
     Older AMQP backend (badly) emulating a database-based backend.
     See :ref:`conf-amqp-result-backend`.
 
+* filesystem
+    Use a shared directory to store the results.
+    See :ref:`conf-filesystem-result-backend`.
+
 .. warning:
 
     While the AMQP result backend is very efficient, you must make sure
@@ -1199,6 +1203,26 @@ Example configuration
     result_backend = 'amqp'
     result_expires = 18000  # 5 hours.
 
+.. _conf-filesystem-result-backend:
+
+Filesystem backend settings
+---------------------------
+
+This backend can be configured using a file URL, for example::
+
+    CELERY_RESULT_BACKEND = 'file:///var/celery/results'
+
+The configured directory needs to be shared and writeable by all servers using
+the backend.
+
+If you are trying Celery on a single system you can simply use the backend
+without any further configuration. For larger clusters you could use NFS,
+`GlusterFS`_, CIFS, `HDFS`_ (using FUSE) or any other filesystem.
+
+.. _`GlusterFS`: http://www.gluster.org/
+.. _`HDFS`: http://hadoop.apache.org/
+
+
 .. _conf-messaging:
 
 Message Routing

+ 11 - 0
docs/internals/reference/celery.backends.filesystem.rst

@@ -0,0 +1,11 @@
+==========================================
+ celery.backends.filesystem
+==========================================
+
+.. contents::
+    :local:
+.. currentmodule:: celery.backends.filesystem
+
+.. automodule:: celery.backends.filesystem
+    :members:
+    :undoc-members:

+ 1 - 0
docs/internals/reference/index.rst

@@ -34,6 +34,7 @@
     celery.backends.riak
     celery.backends.cassandra
     celery.backends.couchbase
+    celery.backends.filesystem
     celery.app.trace
     celery.app.annotations
     celery.app.routes