Browse Source

Merge branch '3.0'

Conflicts:
	celery/apps/worker.py
	celery/contrib/batches.py
Ask Solem 12 years ago
parent
commit
104530996b

+ 22 - 21
celery/app/task.py

@@ -321,6 +321,9 @@ class Task(object):
         _task_stack.push(self)
         self.push_request()
         try:
+            # add self if this is a bound task
+            if self.__self__ is not None:
+                return self.run(self.__self__, *args, **kwargs)
             return self.run(*args, **kwargs)
         finally:
             self.pop_request()
@@ -397,28 +400,20 @@ class Task(object):
         :keyword retry_policy:  Override the retry policy used.  See the
                                 :setting:`CELERY_TASK_PUBLISH_RETRY` setting.
 
-        :keyword routing_key: The routing key used to route the task to a
-                              worker server.  Defaults to the
-                              :attr:`routing_key` attribute.
+        :keyword routing_key: Custom routing key used to route the task to a
+                              worker server. If in combination with a
+                              ``queue`` argument only used to specify custom
+                              routing keys to topic exchanges.
 
-        :keyword exchange: The named exchange to send the task to.
-                           Defaults to the :attr:`exchange` attribute.
+        :keyword queue: The queue to route the task to.  This must be a key
+                        present in :setting:`CELERY_QUEUES`, or
+                        :setting:`CELERY_CREATE_MISSING_QUEUES` must be
+                        enabled.  See :ref:`guide-routing` for more
+                        information.
 
-        :keyword exchange_type: The exchange type to initialize the exchange
-                                if not already declared.  Defaults to the
-                                :attr:`exchange_type` attribute.
-
-        :keyword immediate: Request immediate delivery.  Will raise an
-                            exception if the task cannot be routed to a worker
-                            immediately.  (Do not confuse this parameter with
-                            the `countdown` and `eta` settings, as they are
-                            unrelated).  Defaults to the :attr:`immediate`
-                            attribute.
-
-        :keyword mandatory: Mandatory routing. Raises an exception if
-                            there's no running workers able to take on this
-                            task.  Defaults to the :attr:`mandatory`
-                            attribute.
+        :keyword exchange: Named custom exchange to send the task to.
+                           Usually not used in combination with the ``queue``
+                           argument.
 
         :keyword priority: The task priority, a number between 0 and 9.
                            Defaults to the :attr:`priority` attribute.
@@ -448,6 +443,9 @@ class Task(object):
             attribute.
         :keyword publisher: Deprecated alias to ``producer``.
 
+        Also supports all keyword arguments supported by
+        :meth:`kombu.messaging.Producer.publish`.
+
         .. note::
             If the :setting:`CELERY_ALWAYS_EAGER` setting is set, it will
             be replaced by a local :func:`apply` call instead.
@@ -596,7 +594,10 @@ class Task(object):
         from celery.task.trace import eager_trace_task
 
         app = self._get_app()
-        args = args or []
+        args = args or ()
+        # add 'self' if this is a bound method.
+        if self.__self__ is not None:
+            args = (self.__self__, ) + tuple(args)
         kwargs = kwargs or {}
         task_id = options.get('task_id') or uuid()
         retries = options.get('retries', 0)

+ 2 - 3
celery/apps/worker.py

@@ -103,13 +103,12 @@ class Worker(WorkController):
 
     def on_init_namespace(self):
         self.setup_logging()
+        # apply task execution optimizations
+        trace.setup_worker_optimizations(self.app)
 
     def on_start(self):
         WorkController.on_start(self)
 
-        # apply task execution optimizations
-        trace.setup_worker_optimizations(self.app)
-
         # this signal can be used to e.g. change queues after
         # the -Q option has been applied.
         signals.celeryd_after_setup.send(

+ 0 - 1
celery/contrib/batches.py

@@ -44,7 +44,6 @@ from Queue import Empty, Queue
 
 from celery.task import Task
 from celery.utils.log import get_logger
-from celery.worker import state
 from celery.worker.job import Request
 
 

+ 30 - 4
celery/contrib/methods.py

@@ -30,6 +30,33 @@ or with any task decorator:
         def add(self, x, y):
             return x + y
 
+.. note::
+
+    The task must use the new Task base class (:class:`celery.Task`),
+    and the old base class using classmethods (``celery.task.Task``,
+    ``celery.task.base.Task``).
+
+    This means that you have to use the task decorator from a Celery app
+    instance, and not the old-API:
+
+    .. code-block:: python
+
+
+        from celery import task       # BAD
+        from celery.task import task  # ALSO BAD
+
+        # GOOD:
+        celery = Celery(...)
+
+        @celery.task(filter=task_method)
+        def foo(self): pass
+
+        # ALSO GOOD:
+        from celery import current_app
+
+        @current_app.task(filter=task_method)
+        def foo(self): pass
+
 Caveats
 -------
 
@@ -71,9 +98,7 @@ Caveats
 
 from __future__ import absolute_import
 
-from functools import partial
-
-from celery import task as _task
+from celery import current_app
 
 
 class task_method(object):
@@ -89,4 +114,5 @@ class task_method(object):
         return task
 
 
-task = partial(_task, filter=task_method)
+def task(*args, **kwargs):
+    return current_app.task(*args, **dict(kwargs, filter=task_method))

+ 4 - 1
celery/task/trace.py

@@ -346,9 +346,12 @@ def setup_worker_optimizations(app):
 
     trace_task_ret = _fast_trace_task
     try:
-        sys.modules['celery.worker.job'].trace_task_ret = _fast_trace_task
+        job = sys.modules['celery.worker.job']
     except KeyError:
         pass
+    else:
+        job.trace_task_ret = _fast_trace_task
+        job.__optimize__()
 
 
 def reset_worker_optimizations():

+ 10 - 2
celery/worker/job.py

@@ -41,8 +41,16 @@ from . import state
 logger = get_logger(__name__)
 debug, info, warn, error = (logger.debug, logger.info,
                             logger.warn, logger.error)
-_does_debug = logger.isEnabledFor(logging.DEBUG)
-_does_info = logger.isEnabledFor(logging.INFO)
+_does_info = False
+_does_debug = False
+
+
+def __optimize__():
+    global _does_debug
+    global _does_info
+    _does_debug = logger.isEnabledFor(logging.DEBUG)
+    _does_info = logger.isEnabledFor(logging.INFO)
+__optimize__()
 
 # Localize
 tz_utc = timezone.utc

+ 12 - 0
docs/configuration.rst

@@ -556,6 +556,14 @@ use the ``TimeUUID`` type as a comparator::
 
     create column family task_results with comparator = TimeUUIDType;
 
+CASSANDRA_OPTIONS
+~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Options to be passed to the `pycassa connection pool`_ (optional).
+
+.. _`pycassa connection pool`: http://pycassa.github.com/pycassa/api/pycassa/pool.html
+.. setting:: CASSANDRA_DETAILED_MODE
+
 Example configuration
 ~~~~~~~~~~~~~~~~~~~~~
 
@@ -567,6 +575,10 @@ Example configuration
     CASSANDRA_READ_CONSISTENCY = "ONE"
     CASSANDRA_WRITE_CONSISTENCY = "ONE"
     CASSANDRA_DETAILED_MODE = True
+    CASSANDRA_OPTIONS = {
+        'timeout': 300,
+        'max_retries': 10
+    }
 
 .. _conf-messaging:
 

+ 1 - 1
docs/userguide/tasks.rst

@@ -1085,7 +1085,7 @@ Make your design asynchronous instead, for example by using *callbacks*.
     def update_page_info(url):
         # fetch_page -> parse_page -> store_page
         chain = fetch_page.s() | parse_page.s(url) | store_page_info.s(url)
-        chain.apply_async()
+        chain()
 
     @celery.task(ignore_result=True)
     def fetch_page(url):