|
@@ -100,120 +100,108 @@ Configuring your Django project
|
|
|
Running the celery worker daemon
|
|
|
--------------------------------
|
|
|
|
|
|
-Defining and executing tasks
|
|
|
-----------------------------
|
|
|
-
|
|
|
-*Note* You need to have a AMQP message broker running, like `RabbitMQ`_,
|
|
|
-and you need to have the amqp server setup in your settings file, as described
|
|
|
-in the `carrot distribution README`_.
|
|
|
+To test this we'll be running the worker daemon in the foreground, so we can
|
|
|
+see what's going on without consulting the logfile::
|
|
|
|
|
|
-*Note* If you're running ``SQLite`` as the database back-end, ``celeryd`` will
|
|
|
-only be able to process one message at a time, this is because ``SQLite``
|
|
|
-doesn't allow concurrent writes.
|
|
|
-
|
|
|
-.. _`RabbitMQ`: http://www.rabbitmq.com
|
|
|
-.. _`carrot distribution README`: http://pypi.python.org/pypi/carrot/0.3.3
|
|
|
+::
|
|
|
+
|
|
|
+ $ python manage.py celeryd
|
|
|
|
|
|
|
|
|
-Defining tasks
|
|
|
---------------
|
|
|
+However, in production you'll probably want to run the worker in the
|
|
|
+background as daemon instead::
|
|
|
|
|
|
- >>> from celery.task import tasks
|
|
|
- >>> from celery.log import setup_logger
|
|
|
- >>> def do_something(some_arg, **kwargs):
|
|
|
- ... logger = setup_logger(**kwargs)
|
|
|
- ... logger.info("Did something: %s" % some_arg)
|
|
|
- ... return 42
|
|
|
- >>> tasks.register(do_something, "do_something")
|
|
|
+ $ python manage.py celeryd --daemon
|
|
|
|
|
|
-Tell the celery daemon to run a task
|
|
|
--------------------------------------
|
|
|
+For help on command line arguments to the worker daemon, you can execute the
|
|
|
+help command::
|
|
|
|
|
|
- >>> from celery.task import delay_task
|
|
|
- >>> delay_task("do_something", some_arg="foo bar baz")
|
|
|
+ $ python manage.py help celeryd
|
|
|
|
|
|
+**Note**: If you're using ``SQLite`` as the Django database back-end,
|
|
|
+``celeryd`` will only be able to process one task at a time, this is
|
|
|
+because ``SQLite`` doesn't allow concurrent writes.
|
|
|
|
|
|
-Execute a task, and get its return value.
|
|
|
------------------------------------------
|
|
|
+Defining and executing tasks
|
|
|
+----------------------------
|
|
|
|
|
|
- >>> from celery.task import delay_task
|
|
|
- >>> result = delay_task("do_something", some_arg="foo bar baz")
|
|
|
- >>> result.ready()
|
|
|
- False
|
|
|
- >>> result.get() # Waits until the task is done.
|
|
|
- 42
|
|
|
- >>> result.status()
|
|
|
- 'DONE'
|
|
|
+**Please note** All of these tasks has to be stored in a real module, they can't
|
|
|
+be defined in the python shell or ipython/bpython. This is because the celery
|
|
|
+worker server needs access to the task function to be able to run it.
|
|
|
+So while it looks like we use the python shell to define the tasks in these
|
|
|
+examples, you can't do it this way. Put them in your Django applications
|
|
|
+``tasks`` module (the worker daemon will automatically load any ``tasks.py``
|
|
|
+file for all of the applications listed in ``settings.INSTALLED_APPS``.
|
|
|
+Execution tasks using ``delay`` and ``apply_async`` can be done from the
|
|
|
+python shell, but keep in mind that since arguments are pickled, you can't
|
|
|
+use custom classes defined in the shell session.
|
|
|
+
|
|
|
+While you can use regular functions, the recommended way is creating
|
|
|
+a task class, this way you can cleanly upgrade the task to use the more
|
|
|
+advanced features of celery later.
|
|
|
+
|
|
|
+This is a task that basically does nothing but take some arguments,
|
|
|
+and return value:
|
|
|
+
|
|
|
+ >>> class MyTask(Task):
|
|
|
+ ... name = "myapp.mytask"
|
|
|
+ ... def run(self, some_arg, **kwargs):
|
|
|
+ ... logger = self.get_logger(**kwargs)
|
|
|
+ ... logger.info("Did something: %s" % some_arg)
|
|
|
+ ... return 42
|
|
|
+ >>> tasks.register(MyTask)
|
|
|
|
|
|
-If the task raises an exception, the tasks status will be ``FAILURE``, and
|
|
|
-``result.result`` will contain the exception instance raised.
|
|
|
+Now if we want to execute this task, we can use the ``delay`` method of the
|
|
|
+task class (this is a handy shortcut to the ``apply_async`` method which gives
|
|
|
+you greater control of the task execution).
|
|
|
|
|
|
-Running the celery daemon
|
|
|
---------------------------
|
|
|
+ >>> from myapp.tasks import MyTask
|
|
|
+ >>> MyTask.delay(some_arg="foo")
|
|
|
|
|
|
-::
|
|
|
+At this point, the task has been sent to the message broker. The message
|
|
|
+broker will hold on to the task until a celery worker server has successfully
|
|
|
+picked it up.
|
|
|
|
|
|
- $ cd mydjangoproject
|
|
|
- $ python manage.py celeryd
|
|
|
- [....]
|
|
|
- [2009-04-23 17:44:05,115: INFO/Process-1] Did something: foo bar baz
|
|
|
- [2009-04-23 17:44:05,118: INFO/MainProcess] Waiting for queue.
|
|
|
+Now the task has been executed, but to know what happened with the task we
|
|
|
+have to check the celery logfile to see its return value and output.
|
|
|
+This is because we didn't keep the ``AsyncResult`` object returned by
|
|
|
+``delay``.
|
|
|
|
|
|
-Launching celeryd in the background as a daemon
|
|
|
-------------------------------------------------
|
|
|
+The ``AsyncResult`` lets us find out the state of the task, wait for the task to
|
|
|
+finish and get its return value (or exception if the task failed).
|
|
|
|
|
|
-::
|
|
|
+So, let's execute the task again, but this time we'll keep track of the task:
|
|
|
|
|
|
- $ cd mydjangoproject
|
|
|
- $ python manage.py celeryd --daemon
|
|
|
-
|
|
|
-Getting help for the available options to celeryd
|
|
|
--------------------------------------------------
|
|
|
-
|
|
|
-::
|
|
|
-
|
|
|
- $ cd mydjangoproject
|
|
|
- $ python manage.py help celeryd
|
|
|
+ >>> result = MyTask.delay("do_something", some_arg="foo bar baz")
|
|
|
+ >>> result.ready() # returns True if the task has finished processing.
|
|
|
+ False
|
|
|
+ >>> result.result # task is not ready, so no return value yet.
|
|
|
+ None
|
|
|
+ >>> result.get() # Waits until the task is done and return the retval.
|
|
|
+ 42
|
|
|
+ >>> result.result
|
|
|
+ 42
|
|
|
+ >>> result.success() # returns True if the task didn't end in failure.
|
|
|
+ True
|
|
|
|
|
|
|
|
|
+If the task raises an exception, the ``result.success()`` will be ``False``,
|
|
|
+and ``result.result`` will contain the exception instance raised.
|
|
|
|
|
|
Auto-discovery of tasks
|
|
|
-----------------------
|
|
|
|
|
|
``celery`` has an auto-discovery feature like the Django Admin, that
|
|
|
automatically loads any ``tasks.py`` module in the applications listed
|
|
|
-in ``settings.INSTALLED_APPS``.
|
|
|
-
|
|
|
-A good place to add this command could be in your ``urls.py``,
|
|
|
-::
|
|
|
-
|
|
|
- from celery.task import tasks
|
|
|
- tasks.autodiscover()
|
|
|
-
|
|
|
-
|
|
|
-Then you can add new tasks in your applications ``tasks.py`` module,
|
|
|
-::
|
|
|
-
|
|
|
- from celery.task import tasks
|
|
|
- from celery.log import setup_logger
|
|
|
- from clickcounter.models import ClickCount
|
|
|
-
|
|
|
- def increment_click(for_url, **kwargs):
|
|
|
- logger = setup_logger(**kwargs)
|
|
|
- clicks_for_url, cr = ClickCount.objects.get_or_create(url=for_url)
|
|
|
- clicks_for_url.clicks = clicks_for_url.clicks + 1
|
|
|
- clicks_for_url.save()
|
|
|
- logger.info("Incremented click count for %s (not at %d)" % (
|
|
|
- for_url, clicks_for_url.clicks)
|
|
|
- tasks.register(increment_click, "increment_click")
|
|
|
+in ``settings.INSTALLED_APPS``. This autodiscovery is used by the celery
|
|
|
+worker to find registered tasks for your Django project.
|
|
|
|
|
|
|
|
|
Periodic Tasks
|
|
|
---------------
|
|
|
|
|
|
-Periodic tasks are tasks that are run every ``n`` seconds. They don't
|
|
|
-support extra arguments. Here's an example of a periodic task:
|
|
|
-
|
|
|
+Periodic tasks are tasks that are run every ``n`` seconds.
|
|
|
+Here's an example of a periodic task:
|
|
|
|
|
|
>>> from celery.task import tasks, PeriodicTask
|
|
|
>>> from datetime import timedelta
|
|
@@ -227,6 +215,8 @@ support extra arguments. Here's an example of a periodic task:
|
|
|
...
|
|
|
>>> tasks.register(MyPeriodicTask)
|
|
|
|
|
|
+**Note:** Periodic tasks does not support arguments, as this doesn't
|
|
|
+really make sense.
|
|
|
|
|
|
For periodic tasks to work you need to add ``celery`` to ``INSTALLED_APPS``,
|
|
|
and issue a ``syncdb``.
|