Просмотр исходного кода

Merge branch 'master' of github.com:celery/celery

Ask Solem 10 лет назад
Родитель
Сommit
10fccfddbb

+ 2 - 0
CONTRIBUTORS.txt

@@ -187,3 +187,5 @@ Ilya Georgievsky, 2015/03/31
 Fatih Sucu, 2015/04/17
 James Pulec, 2015/04/19
 Alexander Lebedev, 2015/04/25
+Frantisek Holop, 2015/05/21
+Feanil Patel, 2015/05/21

+ 20 - 0
README.rst

@@ -395,6 +395,26 @@ Wiki
 
 http://wiki.github.com/celery/celery/
 
+
+.. _maintainers:
+
+Maintainers
+===========
+
+- `@ask`_ (primary maintainer)
+- `@thedrow`_
+- `@chrisgogreen`_
+- `@PMickael`_
+- `@malinoff`_
+- And you? We really need more: https://github.com/celery/celery/issues/2534 
+
+.. _`@ask`: http://github.com/ask
+.. _`@thedrow`: http://github.com/thedrow
+.. _`@chrisgogreen`: http://github.com/chrisgogreen
+.. _`@PMickael`: http://github.com/PMickael
+.. _`@malinoff`: http://github.com/malinoff
+
+
 .. _contributing-short:
 
 Contributing

+ 1 - 1
celery/app/builtins.py

@@ -23,7 +23,7 @@ def add_backend_cleanup_task(app):
     backend.
 
     If the configured backend requires periodic cleanup this task is also
-    automatically configured to run every day at midnight (requires
+    automatically configured to run every day at 4am (requires
     :program:`celery beat` to be running).
 
     """

+ 5 - 1
celery/app/task.py

@@ -471,13 +471,15 @@ class Task(object):
         if self.__self__ is not None:
             args = args if isinstance(args, tuple) else tuple(args or ())
             args = (self.__self__, ) + args
+            shadow = shadow or self.shadow_name(args, kwargs, final_options)
+
         final_options = self._get_exec_options()
         if options:
             final_options = dict(final_options, **options)
         return app.send_task(
             self.name, args, kwargs, task_id=task_id, producer=producer,
             link=link, link_error=link_error, result_cls=self.AsyncResult,
-            shadow=shadow or self.shadow_name(args, kwargs, final_options),
+            shadow=shadow,
             **final_options
         )
 
@@ -625,6 +627,8 @@ class Task(object):
             # if task was executed eagerly using apply(),
             # then the retry must also be executed eagerly.
             S.apply().get()
+            if throw:
+                raise ret
             return ret
 
         try:

+ 1 - 1
celery/backends/amqp.py

@@ -195,7 +195,7 @@ class AMQPBackend(BaseBackend):
 
         def callback(meta, message):
             if meta['status'] in states.READY_STATES:
-                results[meta['task_id']] = meta
+                results[meta['task_id']] = self.meta_from_decoded(meta)
 
         consumer.callbacks[:] = [callback]
         time_start = now()

+ 3 - 1
celery/fixups/django.py

@@ -15,6 +15,7 @@ from datetime import datetime
 from importlib import import_module
 
 from celery import signals
+from celery.app import default_app
 from celery.exceptions import FixupWarning
 
 __all__ = ['DjangoFixup', 'fixup']
@@ -48,7 +49,8 @@ class DjangoFixup(object):
 
     def __init__(self, app):
         self.app = app
-        self.app.set_default()
+        if default_app is None:
+            self.app.set_default()
         self._worker_fixup = None
 
     def install(self):

+ 13 - 2
celery/tests/backends/test_amqp.py

@@ -13,6 +13,7 @@ from celery import states
 from celery.backends.amqp import AMQPBackend
 from celery.exceptions import TimeoutError
 from celery.five import Empty, Queue, range
+from celery.result import AsyncResult
 from celery.utils import uuid
 
 from celery.tests.case import (
@@ -246,10 +247,20 @@ class test_AMQPBackend(AppCase):
         with self.assertRaises(TimeoutError):
             b.wait_for(tid, timeout=0.01, cache=False)
 
-    def test_drain_events_remaining_timeouts(self):
+    def test_drain_events_decodes_exceptions_in_meta(self):
+        tid = uuid()
+        b = self.create_backend(serializer="json")
+        b.store_result(tid, RuntimeError("aap"), states.FAILURE)
+        result = AsyncResult(tid, backend=b)
 
-        class Connection(object):
+        with self.assertRaises(Exception) as cm:
+            result.get()
 
+        self.assertEqual(cm.exception.__class__.__name__, "RuntimeError")
+        self.assertEqual(str(cm.exception), "aap")
+
+    def test_drain_events_remaining_timeouts(self):
+        class Connection(object):
             def drain_events(self, timeout=None):
                 pass
 

+ 87 - 1
docs/configuration.rst

@@ -849,6 +849,8 @@ Also see :ref:`routing-basics` for more information.
 The default is a queue/exchange/binding key of ``celery``, with
 exchange type ``direct``.
 
+See also :setting:`CELERY_ROUTES`
+
 .. setting:: CELERY_ROUTES
 
 CELERY_ROUTES
@@ -856,7 +858,91 @@ CELERY_ROUTES
 
 A list of routers, or a single router used to route tasks to queues.
 When deciding the final destination of a task the routers are consulted
-in order.  See :ref:`routers` for more information.
+in order.
+
+A router can be specified as either:
+
+*  A router class instances
+*  A string which provides the path to a router class
+*  A dict containing router specification. It will be converted to a :class:`celery.routes.MapRoute` instance.
+
+Examples:
+
+.. code-block:: python
+
+    CELERY_ROUTES = {"celery.ping": "default",
+                     "mytasks.add": "cpu-bound",
+                     "video.encode": {
+                         "queue": "video",
+                         "exchange": "media"
+                         "routing_key": "media.video.encode"}}
+
+    CELERY_ROUTES = ("myapp.tasks.Router", {"celery.ping": "default})
+
+Where ``myapp.tasks.Router`` could be:
+
+.. code-block:: python
+
+    class Router(object):
+
+        def route_for_task(self, task, args=None, kwargs=None):
+            if task == "celery.ping":
+                return "default"
+
+``route_for_task`` may return a string or a dict. A string then means
+it's a queue name in :setting:`CELERY_QUEUES`, a dict means it's a custom route.
+
+When sending tasks, the routers are consulted in order. The first
+router that doesn't return ``None`` is the route to use. The message options
+is then merged with the found route settings, where the routers settings
+have priority.
+
+Example if :func:`~celery.execute.apply_async` has these arguments:
+
+.. code-block:: python
+
+   Task.apply_async(immediate=False, exchange="video",
+                    routing_key="video.compress")
+
+and a router returns:
+
+.. code-block:: python
+
+    {"immediate": True, "exchange": "urgent"}
+
+the final message options will be:
+
+.. code-block:: python
+
+    immediate=True, exchange="urgent", routing_key="video.compress"
+
+(and any default message options defined in the
+:class:`~celery.task.base.Task` class)
+
+Values defined in :setting:`CELERY_ROUTES` have precedence over values defined in
+:setting:`CELERY_QUEUES` when merging the two.
+
+With the follow settings:
+
+.. code-block:: python
+
+    CELERY_QUEUES = {"cpubound": {"exchange": "cpubound",
+                                  "routing_key": "cpubound"}}
+
+    CELERY_ROUTES = {"tasks.add": {"queue": "cpubound",
+                                   "routing_key": "tasks.add",
+                                   "serializer": "json"}}
+
+The final routing options for ``tasks.add`` will become:
+
+.. code-block:: python
+
+    {"exchange": "cpubound",
+     "routing_key": "tasks.add",
+     "serializer": "json"}
+
+See :ref:`routers` for more examples.
+
 
 .. setting:: CELERY_QUEUE_HA_POLICY
 

+ 8 - 5
docs/faq.rst

@@ -446,14 +446,17 @@ It is essential that you protect against unauthorized
 access to your broker, databases and other services transmitting pickled
 data.
 
-For the task messages you can set the :setting:`CELERY_TASK_SERIALIZER`
-setting to "json" or "yaml" instead of pickle. There is
-currently no alternative solution for task results (but writing a
-custom result backend using JSON is a simple task)
-
 Note that this is not just something you should be aware of with Celery, for
 example also Django uses pickle for its cache client.
 
+For the task messages you can set the :setting:`CELERY_TASK_SERIALIZER`
+setting to "json" or "yaml" instead of pickle.
+
+Similarly for task results you can set :setting:`CELERY_RESULT_SERIALIZER`.
+
+For more details of the formats used and the lookup order when
+checking which format to use for a task see :ref:`calling-serializers`
+
 Can messages be encrypted?
 --------------------------
 

+ 3 - 2
docs/userguide/tasks.rst

@@ -657,8 +657,9 @@ General
 
 .. attribute:: Task.backend
 
-    The result store backend to use for this task.  Defaults to the
-    :setting:`CELERY_RESULT_BACKEND` setting.
+    The result store backend to use for this task. An instance of one of the
+    backend classes in `celery.backends`. Defaults to `app.backend` which is
+    defined by the :setting:`CELERY_RESULT_BACKEND` setting.
 
 .. attribute:: Task.acks_late
 

+ 1 - 1
docs/userguide/workers.rst

@@ -318,7 +318,7 @@ the `terminate` option is set.
     a task is stuck.  It's not for terminating the task,
     it's for terminating the process that is executing the task, and that
     process may have already started processing another task at the point
-    when the signal is sent, so for this rason you must never call this
+    when the signal is sent, so for this reason you must never call this
     programatically.
 
 If `terminate` is set the worker child process processing the task

+ 2 - 2
tox.ini

@@ -48,7 +48,7 @@ deps = -r{toxinidir}/requirements/default.txt
        -r{toxinidir}/requirements/dev.txt
 setenv = C_DEBUG_TEST = 1
 commands = {toxinidir}/extra/release/removepyc.sh {toxinidir}
-           pip install -U -r{toxinidir}/requirements/dev.txt
+           pip install -q -U -r{toxinidir}/requirements/dev.txt
            nosetests -xsv --with-coverage --cover-inclusive --cover-erase []
 
 [testenv:pypy3]
@@ -59,7 +59,7 @@ deps = -r{toxinidir}/requirements/default.txt
        -r{toxinidir}/requirements/dev.txt
 setenv = C_DEBUG_TEST = 1
 commands = {toxinidir}/extra/release/removepyc.sh {toxinidir}
-           pip install -U -r{toxinidir}/requirements/dev.txt
+           pip install -q -U -r{toxinidir}/requirements/dev.txt
            nosetests -xsv --with-coverage --cover-inclusive --cover-erase []
 
 [testenv:docs]