Browse Source

Merge branch 'master' into backendrefactor

Ask Solem 15 years ago
parent
commit
b9914120ff

+ 1 - 0
AUTHORS

@@ -16,3 +16,4 @@ Ordered by date of first contribution:
   Jason Baker <amnorvend@gmail.com>
   Wes Turner <wes.turner@gmail.com>
   Maxim Bodyansky <bodyansky@gmail.com>
+  Rune Halvorsen <runefh@gmail.com>

+ 57 - 0
celery/contrib/batches.py

@@ -0,0 +1,57 @@
+from itertools import count
+from collections import deque, defaultdict
+
+from celery.task.base import Task
+
+
+class Batches(Task):
+    abstract = True
+    flush_every = 10
+
+    def __init__(self):
+        self._buffer = deque()
+        self._count = count().next
+
+    def execute(self, wrapper, pool, loglevel, logfile):
+        self._buffer.append((wrapper, pool, loglevel, logfile))
+
+        if not self._count() % self.flush_every:
+            self.flush(self._buffer)
+            self._buffer.clear()
+
+    def flush(self, tasks):
+        for wrapper, pool, loglevel, logfile in tasks:
+            wrapper.execute_using_pool(pool, loglevel, logfile)
+
+
+class Counter(Task):
+    abstract = True
+    flush_every = 10
+
+    def __init__(self):
+        self._buffer = deque()
+        self._count = count().next
+
+    def execute(self, wrapper, pool, loglevel, logfile):
+        self._buffer.append((wrapper.args, wrapper.kwargs))
+
+        if not self._count() % self.flush_every:
+            self.flush(self._buffer)
+            self._buffer.clear()
+
+    def flush(self, buffer):
+        raise NotImplementedError("Counters must implement 'flush'")
+
+
+
+class ClickCounter(Task):
+    flush_every = 1000
+
+    def flush(self, buffer):
+        urlcount = defaultdict(lambda: 0)
+        for args, kwargs in buffer:
+            urlcount[kwargs["url"]] += 1
+
+        for url, count in urlcount.items():
+            print(">>> Clicks: %s -> %s" % (url, count))
+            # increment_in_db(url, n=count)

+ 13 - 0
celery/task/base.py

@@ -401,6 +401,19 @@ class Task(object):
         """
         pass
 
+    def execute(self, wrapper, pool, loglevel, logfile):
+        """The method the worker calls to execute the task.
+
+        :param wrapper: A :class:`celery.worker.job.TaskWrapper`.
+        :param pool: A :class:`celery.worker.pool.TaskPool` object.
+        :param loglevel: Current loglevel.
+        :param logfile: Name of the currently used logfile.
+
+        """
+        wrapper.execute_using_pool(pool, loglevel, logfile)
+
+
+
 
 class ExecuteRemoteTask(Task):
     """Execute an arbitrary function or object.

+ 4 - 4
celery/task/http.py

@@ -32,7 +32,7 @@ def maybe_utf8(value):
     return value
 
 
-def utf8dict(self, tup):
+def utf8dict(tup):
     """With a dict's items() tuple return a new dict with any utf-8
     keys/values encoded."""
     return dict((key.encode("utf-8"), maybe_utf8(value))
@@ -83,7 +83,6 @@ class MutableURL(object):
     def _set_query(self, query):
         self._query = query
 
-
     query = property(_get_query, _set_query)
 
 
@@ -127,10 +126,11 @@ class HttpDispatch(object):
         warnings.warn(DeprecationWarning(
             "execute() has been deprecated and is scheduled for removal in \
             celery v1.2, please use dispatch() instead."))
+        return self.dispatch()
 
     def dispatch(self):
         """Dispatch callback and return result."""
-        response = self._dispatch()
+        response = self._dispatch_raw()
         if not response:
             raise InvalidResponseError("Empty response")
         try:
@@ -182,7 +182,7 @@ class HttpDispatchTask(BaseTask):
         url = url or self.url
         method = method or self.method
         logger = self.get_logger(**kwargs)
-        return HttpDispatch(url, method, kwargs, logger).execute()
+        return HttpDispatch(url, method, kwargs, logger).dispatch()
 
 
 class URL(MutableURL):

+ 145 - 0
celery/tests/test_task_http.py

@@ -0,0 +1,145 @@
+# -*- coding: utf-8 -*-
+from __future__ import with_statement
+
+import logging
+import unittest
+from urllib import addinfourl
+from contextlib import contextmanager
+try:
+    from cStringIO import StringIO
+except ImportError:
+    from StringIO import StringIO
+
+from billiard.utils.functional import wraps
+from anyjson import serialize
+
+from celery.task import http
+from celery.tests.utils import eager_tasks
+
+
+@contextmanager
+def mock_urlopen(response_method):
+
+    import urllib2
+    urlopen = urllib2.urlopen
+
+    @wraps(urlopen)
+    def _mocked(url, *args, **kwargs):
+        response_data, headers = response_method(url)
+        return addinfourl(StringIO(response_data), headers, url)
+
+    urllib2.urlopen = _mocked
+
+    yield
+
+    urllib2.urlopen = urlopen
+
+
+def _response(res):
+    return lambda r: (res, [])
+
+
+def success_response(value):
+    return _response(serialize({"status": "success", "retval": value}))
+
+
+def fail_response(reason):
+    return _response(serialize({"status": "failure", "reason": reason}))
+
+
+def unknown_response():
+    return _response(serialize({"status": "u.u.u.u", "retval": True}))
+
+
+class TestEncodings(unittest.TestCase):
+
+    def test_utf8dict(self):
+        d = {u"følelser ær langé": "ærbadægzaååÆØÅ",
+              "foobar": "xuzzybaz"}
+
+        for key, value in http.utf8dict(d.items()).items():
+            self.assertTrue(isinstance(key, str))
+            self.assertTrue(isinstance(value, str))
+
+
+class TestMutableURL(unittest.TestCase):
+
+    def test_url_query(self):
+        url = http.MutableURL("http://example.com?x=10&y=20&z=Foo")
+        self.assertEquals(url.query.get("x"), "10")
+        self.assertEquals(url.query.get("y"), "20")
+        self.assertEquals(url.query.get("z"), "Foo")
+        url.query["name"] = "George"
+        url = http.MutableURL(str(url))
+        self.assertEquals(url.query.get("x"), "10")
+        self.assertEquals(url.query.get("y"), "20")
+        self.assertEquals(url.query.get("z"), "Foo")
+        self.assertEquals(url.query.get("name"), "George")
+
+
+    def test_url_keeps_everything(self):
+        url = "https://e.com:808/foo/bar#zeta?x=10&y=20"
+        url = http.MutableURL(url)
+
+        self.assertEquals(str(url).split("?")[0],
+            "https://e.com:808/foo/bar#zeta")
+
+
+class TestHttpDispatch(unittest.TestCase):
+
+    def test_dispatch_success(self):
+        logger = logging.getLogger("celery.unittest")
+        with mock_urlopen(success_response(100)):
+            d = http.HttpDispatch("http://example.com/mul", "GET", {
+                                    "x": 10, "y": 10}, logger)
+            self.assertEquals(d.dispatch(), 100)
+
+    def test_dispatch_failure(self):
+        logger = logging.getLogger("celery.unittest")
+        with mock_urlopen(fail_response("Invalid moon alignment")):
+            d = http.HttpDispatch("http://example.com/mul", "GET", {
+                                    "x": 10, "y": 10}, logger)
+            self.assertRaises(http.RemoteExecuteError, d.dispatch)
+
+    def test_dispatch_empty_response(self):
+        logger = logging.getLogger("celery.unittest")
+        with mock_urlopen(_response("")):
+            d = http.HttpDispatch("http://example.com/mul", "GET", {
+                                    "x": 10, "y": 10}, logger)
+            self.assertRaises(http.InvalidResponseError, d.dispatch)
+
+    def test_dispatch_non_json(self):
+        logger = logging.getLogger("celery.unittest")
+        with mock_urlopen(_response("{'#{:'''")):
+            d = http.HttpDispatch("http://example.com/mul", "GET", {
+                                    "x": 10, "y": 10}, logger)
+            self.assertRaises(http.InvalidResponseError, d.dispatch)
+
+    def test_dispatch_unknown_status(self):
+        logger = logging.getLogger("celery.unittest")
+        with mock_urlopen(unknown_response()):
+            d = http.HttpDispatch("http://example.com/mul", "GET", {
+                                    "x": 10, "y": 10}, logger)
+            self.assertRaises(http.UnknownStatusError, d.dispatch)
+
+    def test_dispatch_POST(self):
+        logger = logging.getLogger("celery.unittest")
+        with mock_urlopen(success_response(100)):
+            d = http.HttpDispatch("http://example.com/mul", "POST", {
+                                    "x": 10, "y": 10}, logger)
+            self.assertEquals(d.dispatch(), 100)
+
+
+class TestURL(unittest.TestCase):
+
+    def test_URL_get_async(self):
+        with eager_tasks():
+            with mock_urlopen(success_response(100)):
+                d = http.URL("http://example.com/mul").get_async(x=10, y=10)
+                self.assertEquals(d.get(), 100)
+
+    def test_URL_post_async(self):
+        with eager_tasks():
+            with mock_urlopen(success_response(100)):
+                d = http.URL("http://example.com/mul").post_async(x=10, y=10)
+                self.assertEquals(d.get(), 100)

+ 12 - 0
celery/tests/utils.py

@@ -10,6 +10,18 @@ from contextlib import contextmanager
 from celery.utils import noop
 
 
+@contextmanager
+def eager_tasks():
+
+    from celery import conf
+    prev = conf.ALWAYS_EAGER
+    conf.ALWAYS_EAGER = True
+
+    yield
+
+    conf.ALWAYS_EAGER = prev
+
+
 def with_environ(env_name, env_value):
 
     def _envpatched(fun):

+ 3 - 3
celery/worker/__init__.py

@@ -168,12 +168,12 @@ class WorkController(object):
         finally:
             self.stop()
 
-    def process_task(self, task):
+    def process_task(self, wrapper):
         """Process task by sending it to the pool of workers."""
         try:
             try:
-                task.execute_using_pool(self.pool, self.loglevel,
-                                        self.logfile)
+                wrapper.task.execute(wrapper, self.pool,
+                                     self.loglevel, self.logfile)
             except Exception, exc:
                 self.logger.critical("Internal error %s: %s\n%s" % (
                                 exc.__class__, exc, traceback.format_exc()))

+ 5 - 6
celery/worker/job.py

@@ -175,6 +175,8 @@ class TaskWrapper(object):
         [celery@%(hostname)s] Error: Task %(name)s (%(id)s): %(exc)s
     """
     fail_email_body = TASK_FAIL_EMAIL_BODY
+    executed = False
+    time_start = None
 
     def __init__(self, task_name, task_id, args, kwargs,
             on_ack=noop, retries=0, **opts):
@@ -183,17 +185,14 @@ class TaskWrapper(object):
         self.retries = retries
         self.args = args
         self.kwargs = kwargs
-        self.logger = opts.get("logger")
-        self.eventer = opts.get("eventer")
         self.on_ack = on_ack
-        self.executed = False
-        self.time_start = None
+        self.task = tasks[self.task_name]
+
         for opt in ("success_msg", "fail_msg", "fail_email_subject",
-                "fail_email_body"):
+                "fail_email_body", "logger", "eventer"):
             setattr(self, opt, opts.get(opt, getattr(self, opt, None)))
         if not self.logger:
             self.logger = get_default_logger()
-        self.task = tasks[self.task_name]
 
     def __repr__(self):
         return '<%s: {name:"%s", id:"%s", args:"%s", kwargs:"%s"}>' % (

+ 1 - 1
contrib/doc2ghpages

@@ -8,5 +8,5 @@
 git checkout gh-pages
 cp -r docs/.build/html/* .
 git commit . -m "Autogenerated documentation for github."
-git push --all
+git push origin gh-pages
 git checkout master

+ 13 - 0
docs/cookbook/daemonizing.rst

@@ -0,0 +1,13 @@
+=============================
+ Running celeryd as a daemon
+=============================
+
+Celery is not a daemon by itself. When run in production it should use
+an appropriate daemonizing tool on the platform.
+
+For example start-up scripts see ``contrib/debian/init.d`` for using
+``start-stop-daemon`` on Debian/Ubuntu, or ``contrib/mac/org.celeryq.*`` for using
+``launchd`` on Mac OS X.
+
+.. _`supervisord`: http://supervisord.org/
+

+ 46 - 44
docs/getting-started/first-steps-with-python.rst → docs/getting-started/first-steps-with-celery.rst

@@ -1,14 +1,16 @@
-=========================
- First steps with Python
-=========================
+========================
+ First steps with Celery
+========================
 
 Creating a simple task
 ======================
 
-We put tasks in a dedicated ``tasks.py`` module. Your tasks can be in
-any module, but it's a good convention.
+In this example we are creating a simple task that adds two
+numbers. Tasks are defined in a normal python module. The module can
+be named whatever you like, but the convention is to call it
+``tasks.py``.
 
-Our task is simple, just adding two numbers
+Our addition task looks like this:
 
 ``tasks.py``:
 
@@ -21,30 +23,29 @@ Our task is simple, just adding two numbers
         return x + y
 
 
-Tasks in celery are actually classes inheriting from the ``Task`` class.
-When you create a new task it is automatically registered in a registry, but
-for this to happen in the worker you need to give a list of modules the worker
-should import.
+All celery tasks are classes that inherit from the ``Task``
+class. In this case we're using a decorator that wraps the add
+function in an appropriate class for us automatically. The full
+documentation on how to create tasks and task classes are in
+:doc:`Executing Tasks<../userguide/tasks>`.
+
+
 
 Configuration
 =============
 
-Celery needs a configuration module, usually called ``celeryconfig.py``.
-This module must be importable and located in the Python path.
+Celery is configured by using a configuration module. By convention,
+this module is called ``celeryconfig.py``. This module must be in the
+Python path so it can be imported.
 
 You can set a custom name for the configuration module with the
-``CELERY_CONFIG_MODULE`` variable. In these examples we use the default name.
-
-Let's create our ``celeryconfig.py``.
+``CELERY_CONFIG_MODULE`` variable. In these examples we use the
+default name.
 
-1. Start by making sure Python is able to import modules from the current
-   directory::
 
-        import os
-        import sys
-        sys.path.insert(0, os.getcwd())
+Let's create our ``celeryconfig.py``.
 
-2. Configure the broker::
+1. Configure how we communicate with the broker::
 
         BROKER_HOST = "localhost"
         BROKER_PORT = 5672
@@ -52,53 +53,54 @@ Let's create our ``celeryconfig.py``.
         BROKER_PASSWORD = "mypassword"
         BROKER_VHOST = "myvhost"
 
-3. We don't want to store the results, so we'll just use the simplest
-   backend available; the AMQP backend::
+2. In this example we don't want to store the results of the tasks, so
+   we'll use the simplest backend available; the AMQP backend::
 
         CELERY_BACKEND = "amqp"
 
-4. Finally, we list the modules to import. We only have a single module; the
-   ``tasks.py`` module we added earlier::
+3. Finally, we list the modules to import, that is, all the modules
+   that contain tasks. This is so celery knows about what tasks it can
+   be asked to perform. We only have a single task module,
+   ``tasks.py``, which we added earlier::
 
         CELERY_IMPORTS = ("tasks", )
 
 That's it.
 
-There are more options available, like how many processes you want to process
-work in parallel (the ``CELERY_CONCURRENCY`` setting), and we could use a
-persistent result store backend, but for now, this should do. For all of
-the options available, please see the :doc:`configuration directive
-reference<../configuration>`.
+There are more options available, like how many processes you want to
+process work in parallel (the ``CELERY_CONCURRENCY`` setting), and we
+could use a persistent result store backend, but for now, this should
+do. For all of the options available, see the 
+:doc:`configuration directive reference<../configuration>`.
 
 Running the celery worker server
 ================================
 
-To test this we'll be running the worker server in the foreground, so we can
-see what's going on without consulting the logfile::
+To test we will run the worker server in the foreground, so we can
+see what's going on in the terminal::
 
     $ celeryd --loglevel=INFO
 
 However, in production you probably want to run the worker in the
-background as a daemon. To do this you need to use to tools provided by your
-platform, or something like `supervisord`_.
+background as a daemon. To do this you need to use to tools provided
+by your platform, or something like `supervisord`_.
 
-For example start-up scripts see ``contrib/debian/init.d`` for using
-``start-stop-daemon`` on Debian/Ubuntu, or ``contrib/mac/org.celeryq.*`` for using
-``launchd`` on Mac OS X.
+For a complete listing of the command line options available, use the
+help command::
 
-.. _`supervisord`: http://supervisord.org/
+    $  celeryd --help
 
-For a complete listing of the command line arguments available, with a short
-description, you can use the help command::
+For info on how to run celery as standalone daemon, see 
+:doc:`daemon mode reference<../cookbook/daemonizing>`
 
-    $  celeryd --help
 
 
 Executing the task
 ==================
 
-Now if we want to execute our task, we can use the
-``delay`` method of the task class.
+Whenever we want to execute our task, we can use the ``delay`` method
+of the task class.
+
 This is a handy shortcut to the ``apply_async`` method which gives
 greater control of the task execution.
 See :doc:`Executing Tasks<../userguide/executing>` for more information.
@@ -111,7 +113,7 @@ At this point, the task has been sent to the message broker. The message
 broker will hold on to the task until a celery worker server has successfully
 picked it up.
 
-*Note* If everything is just hanging when you execute ``delay``, please check
+*Note:* If everything is just hanging when you execute ``delay``, please check
 that RabbitMQ is running, and that the user/password has access to the virtual
 host you configured earlier.
 

+ 15 - 21
docs/getting-started/first-steps-with-django.rst

@@ -25,13 +25,15 @@ You only need three simple steps to use celery with your Django project.
 
 That's it.
 
-There are more options available, like how many processes you want to process
-work in parallel (the ``CELERY_CONCURRENCY`` setting), and the backend used
-for storing task statuses. But for now, this should do. For all of the options
-available, please see the :doc:`configuration directive
+There are more options available, like how many processes you want to
+work in parallel (the ``CELERY_CONCURRENCY`` setting). You can also
+configure the backend used for storing task statuses. For now though, 
+this should do. For all of the options available, please see the 
+:doc:`configuration directive
+
 reference<../configuration>`.
 
-**Note**: If you're using SQLite as the Django database back-end,
+**Note:** If you're using SQLite as the Django database back-end,
 ``celeryd`` will only be able to process one task at a time, this is
 because SQLite doesn't allow concurrent writes.
 
@@ -47,16 +49,9 @@ see what's going on without consulting the logfile::
 
 However, in production you probably want to run the worker in the
 background as a daemon. To do this you need to use to tools provided by your
-platform, or something like `supervisord`_.
-
-For example start-up scripts see ``contrib/debian/init.d`` for using
-``start-stop-daemon`` on Debian/Ubuntu, or ``contrib/mac/org.celeryq.*`` for using
-``launchd`` on Mac OS X.
-
-.. _`supervisord`: http://supervisord.org/
+platform. See :doc:`daemon mode reference<../cookbook/daemonizing>`.
 
-For a complete listing of the command line arguments available, with a short
-description, you can use the help command::
+For a complete listing of the command line options available, use the help command::
 
     $ python manage.py help celeryd
 
@@ -64,12 +59,12 @@ description, you can use the help command::
 Defining and executing tasks
 ============================
 
-**Please note** All of these tasks has to be stored in a real module, they can't
+**Please note:** All the tasks have to be stored in a real module, they can't
 be defined in the python shell or ipython/bpython. This is because the celery
 worker server needs access to the task function to be able to run it.
-Put them in the ``tasks`` module of your
-Django application. The worker server will automatically load any ``tasks.py``
-file for all of the applications listed in ``settings.INSTALLED_APPS``.
+Put them in the ``tasks`` module of your Django application. The
+worker server  will automatically load any ``tasks.py`` file for all
+of the applications listed in ``settings.INSTALLED_APPS``.
 Executing tasks using ``delay`` and ``apply_async`` can be done from the
 python shell, but keep in mind that since arguments are pickled, you can't
 use custom classes defined in the shell session.
@@ -84,8 +79,7 @@ This is a task that adds two numbers:
     def add(x, y):
         return x + y
 
-Now if we want to execute this task, we can use the
-``delay`` method of the task class.
+To execute this task, we can use the ``delay`` method of the task class.
 This is a handy shortcut to the ``apply_async`` method which gives
 greater control of the task execution.
 See :doc:`Executing Tasks<../userguide/executing>` for more information.
@@ -97,7 +91,7 @@ At this point, the task has been sent to the message broker. The message
 broker will hold on to the task until a celery worker server has successfully
 picked it up.
 
-*Note* If everything is just hanging when you execute ``delay``, please check
+*Note:* If everything is just hanging when you execute ``delay``, please check
 that RabbitMQ is running, and that the user/password has access to the virtual
 host you configured earlier.
 

+ 1 - 1
docs/getting-started/index.rst

@@ -10,7 +10,7 @@
 
     introduction
     broker-installation
-    first-steps-with-python
+    first-steps-with-celery
     first-steps-with-django
     periodic-tasks
     resources

+ 6 - 2
docs/links.rst

@@ -12,9 +12,13 @@ celery
 AMQP
 ----
 
-* `Shovel`_: An AMQP Relay
+* `RabbitMQ-shovel`: Message Relocation Equipment (as a plug-in to RabbitMQ)
 
-.. _`Shovel`: http://botland.oebfare.com/logger/celery/
+* `Shovel`_: An AMQP Relay (generic AMQP shovel)
+
+.. _`RabbitMQ-shovel`: http://bit.ly/b1hVW2
+
+.. _`Shovel`: http://bit.ly/xFEde
 
 RabbitMQ
 --------

+ 1 - 1
docs/reference/celery.signals.rst

@@ -5,7 +5,7 @@ Signals - celery.signals
 .. data:: task_sent
 
     Triggered when a task has been sent to the broker.
-    Please note that this is executed in the client, the process sending
+    Note that this is executed in the client process, the one sending
     the task, not in the worker.
 
     Provides arguments:

+ 25 - 17
docs/userguide/executing.rst

@@ -2,7 +2,7 @@
  Executing Tasks
 =================
 
-Executing tasks is done with ``apply_async``, and it's shortcut ``delay``.
+Executing tasks is done with ``apply_async``, and its shortcut: ``delay``.
 
 ``delay`` is simple and convenient, as it looks like calling a regular
 function:
@@ -23,8 +23,8 @@ the ``Task`` class: ``routing_key``, ``exchange``, ``immediate``, ``mandatory``,
 ``priority``, and ``serializer``.  In addition you can set a countdown/eta, or provide
 a custom broker connection.
 
-Let's go over these in more detail. The following examples uses this simple
-task, used to add two numbers:
+Let's go over these in more detail. The following examples use this simple
+task, which adds together two numbers:
 
 .. code-block:: python
 
@@ -36,9 +36,9 @@ task, used to add two numbers:
 ETA and countdown
 -----------------
 
-The ETA (estimated time of arrival) lets you set a specific date and time for
-when after, your task should execute. ``countdown`` is a shortcut to set this
-by seconds into the future.
+The ETA (estimated time of arrival) lets you set a specific date and time that
+is the earliest time at which your task will execute. ``countdown`` is
+a shortcut to set this by seconds in the future.
 
 .. code-block:: python
 
@@ -65,14 +65,22 @@ using time in seconds is not very readable.
         UnbanTask.apply_async(args=[username], eta=tomorrow)
 
 
-Serializer
-----------
+Serializers
+-----------
 
-The default serializer used is :mod:`pickle`, but you can change this for each
+Data passed between celery and workers has to be serialized to be
+transferred. The default serializer is :mod:`pickle`, but you can 
+change this for each
 task. There is built-in support for using ``pickle``, ``JSON`` and ``YAML``,
 and you can add your own custom serializers by registering them into the
 carrot serializer registry.
 
+The default serializer (pickle) supports Python objects, like ``datetime`` and
+any custom datatypes you define yourself. But since pickle has poor support
+outside of the Python language, you need to choose another serializer if you
+need to communicate with other languages. In that case, ``JSON`` is a very
+popular choice.
+
 The serialization method is sent with the message, so the worker knows how to
 deserialize any task. Of course, if you use a custom serializer, this must
 also be registered in the worker.
@@ -133,14 +141,14 @@ In Python 2.5 and above, you can use the ``with`` statement:
     print([res.get() for res in results])
 
 The connection timeout is the number of seconds to wait before we give up
-establishing the connection, you can set this with the ``connect_timeout``
+establishing the connection. You can set this with the ``connect_timeout``
 argument to ``apply_async``:
 
 .. code-block:: python
 
     add.apply_async([10, 10], connect_timeout=3)
 
-or if you handle the connection manually:
+Or if you handle the connection manually:
 
 .. code-block:: python
 
@@ -157,10 +165,10 @@ Messages (tasks) are sent to exchanges, a queue binds to an exchange with a
 routing key. Let's look at an example:
 
 Our application has a lot of tasks, some process video, others process images,
-and some gathers collective intelligence about users. Some of these have
+and some gather collective intelligence about users. Some of these have
 higher priority than others so we want to make sure the high priority tasks
 get sent to powerful machines, while low priority tasks are sent to dedicated
-machines that can handle these at their own pace, uninterrupted.
+machines that can handle these at their own pace.
 
 For the sake of example we have only one exchange called ``tasks``.
 There are different types of exchanges that matches the routing key in
@@ -181,8 +189,8 @@ different ways, the exchange types are:
 
 (there are also other exchange types, but these are not used by celery)
 
-So, we create three queues, ``video``, ``image`` and ``lowpri`` that binds to
-our ``tasks`` exchange, for the queues we use the following binding keys::
+So, we create three queues, ``video``, ``image`` and ``lowpri`` that bind to
+our ``tasks`` exchange. For the queues we use the following binding keys::
 
     video: video.#
     image: image.#
@@ -204,7 +212,7 @@ listen to different queues:
     >>> UpdateReccomendationsTask.apply_async(routing_key="misc.recommend")
 
 
-Later, if suddenly the image crop task is consuming a lot of resources,
+Later, if the crop task is consuming a lot of resources,
 we can bind some new workers to handle just the ``"image.crop"`` task,
 by creating a new queue that binds to ``"image.crop``".
 
@@ -226,5 +234,5 @@ if the task cannot be routed to a worker immediately.
 
 A number between ``0`` and ``9``, where ``0`` is the highest priority.
 Note that RabbitMQ does not implement AMQP priorities, and maybe your broker
-does not either, please consult your brokers documentation for more
+does not either, consult your broker's documentation for more
 information.

+ 3 - 2
docs/userguide/remote-tasks.rst

@@ -10,7 +10,7 @@ Executing tasks on a web server
 If you need to call into another language, framework or similar, you can
 do so by using HTTP callback tasks.
 
-The HTTP callback tasks uses GET/POST arguments and uses a simple JSON response
+The HTTP callback tasks use GET/POST arguments and a simple JSON response
 to return results. The scheme to call a task is::
 
     GET http://example.com/mytask/?arg1=a&arg2=b&arg3=c
@@ -19,6 +19,7 @@ or using POST::
 
     POST http://example.com/mytask
 
+**Note:** POST data has to be form encoded.
 Whether to use GET or POST is up to you and your requirements.
 
 The web page should then return a response in the following format
@@ -61,7 +62,7 @@ or in Ruby on Rails:
     end
 
 You can easily port this scheme to any language/framework;
-New examples and libraries are very welcome.
+new examples and libraries are very welcome.
 
 To execute the task you use the :class:`URL` class:
 

+ 53 - 43
docs/userguide/tasks.rst

@@ -5,7 +5,7 @@
 .. module:: celery.task.base
 
 A task is a class that encapsulates a function and its execution options.
-With a function ``create_user``, that takes two arguments: ``username`` and
+Given a function ``create_user``, that takes two arguments: ``username`` and
 ``password``, you can create a task like this:
 
 .. code-block:: python
@@ -51,19 +51,19 @@ Default keyword arguments
 =========================
 
 Celery supports a set of default arguments that can be forwarded to any task.
-task can choose not to take these, or only list the ones it want
-(the worker will do the right thing).
+Tasks can choose not to take these, or list the ones they want.
+The worker will do the right thing.
 
 The current default keyword arguments are:
 
 * logfile
 
-    The currently used log file, can be passed on to ``self.get_logger``
+    The log file, can be passed on to ``self.get_logger``
     to gain access to the workers log file. See `Logging`_.
 
 * loglevel
 
-    The current loglevel used.
+    The loglevel used.
 
 * task_id
 
@@ -76,12 +76,13 @@ The current default keyword arguments are:
 * task_retries
 
     How many times the current task has been retried.
-    (an integer starting a ``0``).
+    An integer starting at ``0``.
+
 
 Logging
 =======
 
-You can use the workers logger to add some diagnostic output to
+You can use the workers logger to add diagnostic output to
 the worker log:
 
 .. code-block:: python
@@ -103,12 +104,13 @@ or using the decorator syntax:
         return x + y
 
 There are several logging levels available, and the workers ``loglevel``
-setting decides whether they will be sent to the log file or not.
+setting decides whether or not they will be written to the log file.
+
 
 Retrying a task if something fails
 ==================================
 
-Simply use :meth:`Task.retry` to re-sent the task, it will
+Simply use :meth:`Task.retry` to re-send the task. It will
 do the right thing, and respect the :attr:`Task.max_retries`
 attribute:
 
@@ -124,7 +126,7 @@ attribute:
 
 Here we used the ``exc`` argument to pass the current exception to
 :meth:`Task.retry`. At each step of the retry this exception
-is available as the tombstone (result) of the task, when
+is available as the tombstone (result) of the task. When
 :attr:`Task.max_retries` has been exceeded this is the exception
 raised. However, if an ``exc`` argument is not provided the
 :exc:`RetryTaskError` exception is raised instead.
@@ -132,9 +134,10 @@ raised. However, if an ``exc`` argument is not provided the
 Using a custom retry delay
 --------------------------
 
-The default countdown is in the tasks
-:attr:`Task.default_retry_delay` attribute, which by
-default is set to 3 minutes.
+When a task is to be retried, it will wait for a given amount of time
+before doing so. The default delay is in the :attr:`Task.default_retry_delay` 
+attribute on the task. By default this is set to 3 minutes. Note that the
+unit for setting the delay is in seconds (int or float).
 
 You can also provide the ``countdown`` argument to
 :meth:`Task.retry` to override this default.
@@ -159,14 +162,14 @@ Task options
 
 * name
 
-    This is the name the task is registered as.
+    The name the task is registered as.
     You can set this name manually, or just use the default which is
     automatically generated using the module and class name.
 
 * abstract
 
-    Abstract classes are not registered, so they're
-    only used for making new task types by sub classing.
+    Abstract classes are not registered, but are used as the superclass
+    when making new task types by subclassing.
 
 * max_retries
 
@@ -178,15 +181,20 @@ Task options
 * default_retry_delay
 
     Default time in seconds before a retry of the task should be
-    executed. Default is a 1 minute delay.
+    executed. Can be either an ``int`` or a ``float``.
+    Default is a 1 minute delay (``60 seconds``).
 
 * rate_limit
 
-  Set the rate limit for this task type,
-  if this is ``None`` no rate limit is in effect.
+  Set the rate limit for this task type, that is, how many times in a given
+  period of time is the task allowed to run.
+
+  If this is ``None`` no rate limit is in effect.
+  If it is an integer, it is interpreted as "tasks per second". 
+
   The rate limits can be specified in seconds, minutes or hours
-  by appending ``"/s"``, ``"/m"`` or "``/h"``". If this is an integer
-  it is interpreted as seconds. Example: ``"100/m" (hundred tasks a
+  by appending ``"/s"``, ``"/m"`` or "``/h"``" to the value.
+  Example: ``"100/m" (hundred tasks a
   minute). Default is the ``CELERY_DEFAULT_RATE_LIMIT`` setting (which
   is off if not specified).
 
@@ -200,7 +208,9 @@ Task options
 
 * disable_error_emails
 
-    Disable all error e-mails for this task.
+    Disable error e-mails for this task. Default is ``False``.
+    *Note:* You can also turn off error e-mails globally using the
+    ``CELERY_SEND_TASK_ERROR_EMAILS`` setting.
 
 * serializer
 
@@ -236,15 +246,16 @@ Message and routing options
 
 * priority
     The message priority. A number from ``0`` to ``9``, where ``0`` is the
-    highest. Note that RabbitMQ doesn't support priorities yet.
+    highest. **Note:** RabbitMQ does not support priorities yet.
 
-Please see :doc:`executing` for descriptions of these options.
+See :doc:`executing` for more information about the messaging options
+available.
 
 Example
 =======
 
 Let's take a real wold example; A blog where comments posted needs to be
-filtered for spam. When the comment is created, we run the spam filter in the
+filtered for spam. When the comment is created, the spam filter runs in the
 background, so the user doesn't have to wait for it to finish.
 
 We have a Django blog application allowing comments
@@ -370,11 +381,11 @@ How it works
 ============
 
 Here comes the technical details, this part isn't something you need to know,
-but you may be interested, so here goes.
+but you may be interested.
 
 All defined tasks are listed in a registry. The registry contains
 a list of task names and their task classes. You can investigate this registry
-by yourself:
+yourself:
 
 .. code-block:: python
 
@@ -394,15 +405,15 @@ by yourself:
 
 This is the list of tasks built-in to celery. Note that we had to import
 ``celery.task`` first for these to show up. This is because the tasks will
-only be registered when the module it is defined in is imported.
+only be registered when the module they are defined in is imported.
 
-When using the default loader the loader imports any modules listed in the
+The default loader imports any modules listed in the
 ``CELERY_IMPORTS`` setting. If using Django it loads all ``tasks.py`` modules
 for the applications listed in ``INSTALLED_APPS``. If you want to do something
 special you can create your own loader to do what you want.
 
 The entity responsible for registering your task in the registry is a
-meta class, :class:`TaskType`, this is the default meta class for
+meta class, :class:`TaskType`. This is the default meta class for
 ``Task``. If you want to register your task manually you can set the
 ``abstract`` attribute:
 
@@ -411,14 +422,14 @@ meta class, :class:`TaskType`, this is the default meta class for
     class MyTask(Task):
         abstract = True
 
-This way the task won't be registered, but any task sub classing it will.
+This way the task won't be registered, but any task subclassing it will.
 
-So when we send a task, we don't send the function code, we just send the name
-of the task, so when the worker receives the message it can just look it up in
+When tasks are sent, we don't send the function code, just the name
+of the task. When the worker receives the message it can just look it up in
 the task registry to find the execution code.
 
-This means that your workers must optimally be updated with the same software
-as the client, this is a drawback, but the alternative is a technical
+This means that your workers should always be updated with the same software
+as the client. This is a drawback, but the alternative is a technical
 challenge that has yet to be solved.
 
 Performance and Strategies
@@ -427,14 +438,13 @@ Performance and Strategies
 Granularity
 -----------
 
-The tasks granularity is the degree of parallelization your task have.
-It's better to have a lot of small tasks, than just a few long running
-ones.
+The task's granularity is the degree of parallelization your task have.
+It's better to have many small tasks, than a few long running ones.
 
 With smaller tasks, you can process more tasks in parallel and the tasks
 won't run long enough to block the worker from processing other waiting tasks.
 
-But there's a limit, sending messages takes processing power and bandwidth. If
+However, there's a limit. Sending messages takes processing power and bandwidth. If
 your tasks are so short the overhead of passing them around is worse than
 just executing them in-line, you should reconsider your strategy. There is no
 universal answer here.
@@ -442,7 +452,7 @@ universal answer here.
 Data locality
 -------------
 
-The worker processing the task should optimally be as close to the data as
+The worker processing the task should be as close to the data as
 possible. The best would be to have a copy in memory, the worst being a
 full transfer from another continent.
 
@@ -463,14 +473,14 @@ State
 -----
 
 Since celery is a distributed system, you can't know in which process, or even
-on what machine the task will run, also you can't even know if the task will
+on what machine the task will run. Indeed you can't even know if the task will
 run in a timely manner, so please be wary of the state you pass on to tasks.
 
-One gotcha is Django model objects, they shouldn't be passed on as arguments
+One gotcha is Django model objects. They shouldn't be passed on as arguments
 to task classes, it's almost always better to re-fetch the object from the
 database instead, as there are possible race conditions involved.
 
-Imagine the following scenario where you have an article, and a task
+Imagine the following scenario where you have an article and a task
 that automatically expands some abbreviations in it.
 
 .. code-block:: python