Browse Source

Magic keyword arguments are now available as task.request. Tasks can choose to not accept magic keyword arguments by setting task.accept_magic_kwargs=False. task.retry/task.get_logger and task.update_state now uses the values in task.request by default.

Ask Solem 14 năm trước cách đây
mục cha
commit
d4a75425a3
5 tập tin đã thay đổi với 191 bổ sung149 xóa
  1. 6 1
      celery/execute/trace.py
  2. 64 23
      celery/task/base.py
  3. 16 2
      celery/worker/job.py
  4. 100 121
      docs/userguide/tasks.rst
  5. 5 2
      examples/app/myapp.py

+ 6 - 1
celery/execute/trace.py

@@ -40,12 +40,14 @@ class TraceInfo(object):
 
 class TaskTrace(object):
 
-    def __init__(self, task_name, task_id, args, kwargs, task=None, **_):
+    def __init__(self, task_name, task_id, args, kwargs, task=None,
+            request=None, **_):
         self.task_id = task_id
         self.task_name = task_name
         self.args = args
         self.kwargs = kwargs
         self.task = task or tasks[self.task_name]
+        self.request = request or {}
         self.status = states.PENDING
         self.strtb = None
         self._trace_handlers = {states.FAILURE: self.handle_failure,
@@ -56,6 +58,8 @@ class TaskTrace(object):
         return self.execute()
 
     def execute(self):
+        self.task.request.update(self.request, args=self.args,
+                                               kwargs=self.kwargs)
         signals.task_prerun.send(sender=self.task, task_id=self.task_id,
                                  task=self.task, args=self.args,
                                  kwargs=self.kwargs)
@@ -64,6 +68,7 @@ class TaskTrace(object):
         signals.task_postrun.send(sender=self.task, task_id=self.task_id,
                                   task=self.task, args=self.args,
                                   kwargs=self.kwargs, retval=retval)
+        self.task.request.clear()
         return retval
 
     def _trace(self):

+ 64 - 23
celery/task/base.py

@@ -1,4 +1,5 @@
 import sys
+import threading
 import warnings
 
 from celery.app import app_or_default
@@ -32,12 +33,34 @@ extract_exec_options = mattrgetter("queue", "routing_key",
                                    "exchange", "immediate",
                                    "mandatory", "priority",
                                    "serializer", "delivery_mode")
-
+_default_context = {"logfile": None,
+                    "loglevel": None,
+                    "id": None,
+                    "args": None,
+                    "kwargs": None,
+                    "retries": 0,
+                    "is_eager": False,
+                    "delivery_info": None}
 
 def _unpickle_task(name):
     return tasks[name]
 
 
+
+
+class Context(threading.local):
+
+    def update(self, d, **kwargs):
+        self.__dict__.update(d, **kwargs)
+
+    def clear(self):
+        self.__dict__.clear()
+        self.update(_default_context)
+
+    def get(self, key, default=None):
+        return self.__dict__.get(key, default)
+
+
 class TaskType(type):
     """Metaclass for tasks.
 
@@ -235,6 +258,8 @@ class BaseTask(object):
     abstract = True
     autoregister = True
     type = "regular"
+    accept_magic_kwargs = True
+    request = Context()
 
     queue = None
     routing_key = None
@@ -297,9 +322,13 @@ class BaseTask(object):
         See :func:`celery.log.setup_task_logger`.
 
         """
+        if loglevel is None:
+            loglevel = self.request.loglevel
+        if logfile is None:
+            logfile = self.request.logfile
         return self.app.log.setup_task_logger(loglevel=loglevel,
                                               logfile=logfile,
-                                              task_kwargs=kwargs)
+                                              task_kwargs=self.request.kwargs)
 
     @classmethod
     def establish_connection(self, connect_timeout=None):
@@ -506,16 +535,17 @@ class BaseTask(object):
             ...                        countdown=60 * 5, exc=exc)
 
         """
-        if not kwargs:
-            raise TypeError(
-                    "kwargs argument to retries can't be empty. "
-                    "Task must accept **kwargs, see http://bit.ly/cAx3Bg")
+        request = self.request
+        if args is None:
+            args = request.args
+        if kwargs is None:
+            kwargs = request.kwargs
 
-        delivery_info = kwargs.pop("delivery_info", {})
+        delivery_info = request.delivery_info
         options.setdefault("exchange", delivery_info.get("exchange"))
         options.setdefault("routing_key", delivery_info.get("routing_key"))
 
-        options["retries"] = kwargs.pop("task_retries", 0) + 1
+        options["retries"] = request.retries + 1
         options["task_id"] = kwargs.pop("task_id", None)
         options["countdown"] = options.get("countdown",
                                         self.default_retry_delay)
@@ -528,7 +558,7 @@ class BaseTask(object):
 
         # If task was executed eagerly using apply(),
         # then the retry must also be executed eagerly.
-        if kwargs.get("task_is_eager", False):
+        if request.is_eager:
             result = self.apply(args=args, kwargs=kwargs, **options)
             if isinstance(result, EagerResult):
                 return result.get()             # propogates exceptions.
@@ -565,19 +595,28 @@ class BaseTask(object):
         # Make sure we get the task instance, not class.
         task = tasks[self.name]
 
-        default_kwargs = {"task_name": task.name,
-                          "task_id": task_id,
-                          "task_retries": retries,
-                          "task_is_eager": True,
-                          "logfile": options.get("logfile"),
-                          "delivery_info": {"is_eager": True},
-                          "loglevel": options.get("loglevel", 0)}
-        supported_keys = fun_takes_kwargs(task.run, default_kwargs)
-        extend_with = dict((key, val) for key, val in default_kwargs.items()
-                            if key in supported_keys)
-        kwargs.update(extend_with)
-
-        trace = TaskTrace(task.name, task_id, args, kwargs, task=task)
+        request = {"id": task_id,
+                   "retries": retries,
+                   "is_eager": True,
+                   "logfile": options.get("logfile"),
+                   "loglevel": options.get("loglevel", 0),
+                   "delivery_info": {"is_eager": True}}
+        if self.accept_magic_kwargs:
+            default_kwargs = {"task_name": task.name,
+                              "task_id": task_id,
+                              "task_retries": retries,
+                              "task_is_eager": True,
+                              "logfile": options.get("logfile"),
+                              "loglevel": options.get("loglevel", 0),
+                              "delivery_info": {"is_eager": True}}
+            supported_keys = fun_takes_kwargs(task.run, default_kwargs)
+            extend_with = dict((key, val)
+                                    for key, val in default_kwargs.items()
+                                        if key in supported_keys)
+            kwargs.update(extend_with)
+
+        trace = TaskTrace(task.name, task_id, args, kwargs,
+                          task=task, request=request)
         retval = trace.execute()
         if isinstance(retval, ExceptionInfo):
             if throw:
@@ -595,7 +634,7 @@ class BaseTask(object):
         """
         return self.app.AsyncResult(task_id, backend=self.backend)
 
-    def update_state(self, task_id, state, meta=None):
+    def update_state(self, task_id=None, state=None, meta=None):
         """Update task state.
 
         :param task_id: Id of the task to update.
@@ -603,6 +642,8 @@ class BaseTask(object):
         :param meta: State metadata (:class:`dict`).
 
         """
+        if task_id is None:
+            task_id = self.request.id
         self.backend.store_result(task_id, meta, state)
 
     def on_retry(self, exc, task_id, args, kwargs, einfo=None):

+ 16 - 2
celery/worker/job.py

@@ -291,6 +291,14 @@ class TaskRequest(object):
                    eventer=eventer, hostname=hostname,
                    eta=eta, expires=expires, app=app)
 
+    def get_instance_attrs(self, loglevel, logfile):
+        return {"logfile": logfile,
+                "loglevel": loglevel,
+                "id": self.task_id,
+                "retries": self.retries,
+                "is_eager": False,
+                "delivery_info": self.delivery_info}
+
     def extend_with_default_kwargs(self, loglevel, logfile):
         """Extend the tasks keyword arguments with standard task arguments.
 
@@ -300,6 +308,8 @@ class TaskRequest(object):
         See :meth:`celery.task.base.Task.run` for more information.
 
         """
+        if not self.task.accept_magic_kwargs:
+            return self.kwargs
         kwargs = dict(self.kwargs)
         default_kwargs = {"logfile": logfile,
                           "loglevel": loglevel,
@@ -345,9 +355,11 @@ class TaskRequest(object):
         if not self.task.acks_late:
             self.acknowledge()
 
+        instance_attrs = self.get_instance_attrs(loglevel, logfile)
         tracer = WorkerTaskTrace(*self._get_tracer_args(loglevel, logfile),
                                  **{"hostname": self.hostname,
-                                    "loader": self.app.loader})
+                                    "loader": self.app.loader,
+                                    "request": instance_attrs})
         retval = tracer.execute()
         self.acknowledge()
         return retval
@@ -372,10 +384,12 @@ class TaskRequest(object):
         self._set_executed_bit()
 
         args = self._get_tracer_args(loglevel, logfile)
+        instance_attrs = self.get_instance_attrs(loglevel, logfile)
         self.time_start = time.time()
         result = pool.apply_async(execute_and_trace,
                                   args=args,
-                                  kwargs={"hostname": self.hostname},
+                                  kwargs={"hostname": self.hostname,
+                                          "request": instance_attrs},
                                   accept_callback=self.on_accepted,
                                   timeout_callback=self.on_timeout,
                                   callbacks=[self.on_success],

+ 100 - 121
docs/userguide/tasks.rst

@@ -10,7 +10,7 @@
 
 This guide gives an overview of how tasks are defined. For a complete
 listing of task attributes and methods, please see the
-:class:`API reference <celery.task.base.Task>`.
+:class:`API reference <celery.task.base.BaseTask>`.
 
 .. _task-basics:
 
@@ -23,70 +23,64 @@ Given a function ``create_user``, that takes two arguments: ``username`` and
 
 .. code-block:: python
 
-    from celery.task import Task
     from django.contrib.auth import User
 
-    class CreateUserTask(Task):
-        def run(self, username, password):
-            User.objects.create(username=username, password=password)
-
-For convenience there is a shortcut decorator that turns any function into
-a task:
-
-.. code-block:: python
-
-    from celery.decorators import task
-    from django.contrib.auth import User
-
-    @task
+    @celery.task()
     def create_user(username, password):
         User.objects.create(username=username, password=password)
 
-The task decorator takes the same execution options as the
-:class:`~celery.task.base.Task` class does:
+
+Task options are added as arguments to ``task``::
 
 .. code-block:: python
 
-    @task(serializer="json")
+    @celery.task(serializer="json")
     def create_user(username, password):
         User.objects.create(username=username, password=password)
 
-.. _task-keyword-arguments:
+.. _task-request-info:
 
-Default keyword arguments
-=========================
+Task Request Info
+=================
 
-Celery supports a set of default arguments that can be forwarded to any task.
-Tasks can choose not to take these, or list the ones they want.
-The worker will do the right thing.
+The ``task.request`` attribute contains information about
+the task being executed, and contains the following attributes:
 
-The current default keyword arguments are:
+:id: The unique id of the executing task.
 
-:task_id: The unique id of the executing task.
+:args: Positional arguments.
 
-:task_name: Name of the currently executing task.
+:kwargs: Keyword arguments.
 
-:task_retries: How many times the current task has been retried.
-               An integer starting at ``0``.
+:retries: How many times the current task has been retried.
+          An integer starting at ``0``.
 
-:task_is_eager: Set to :const:`True` if the task is executed locally in
-                the client, kand not by a worker.
+:is_eager: Set to :const:`True` if the task is executed locally in
+           the client, kand not by a worker.
 
-:logfile: The log file, can be passed on to
-          :meth:`~celery.task.base.Task.get_logger` to gain access to
-          the workers log file. See `Logging`_.
+:logfile: The file the worker logs to.  See `Logging`_.
 
 :loglevel: The current loglevel used.
 
-
 :delivery_info: Additional message delivery information. This is a mapping
                 containing the exchange and routing key used to deliver this
-                task. It's used by e.g. :meth:`~celery.task.base.Task.retry`
+                task.  Used by e.g. :meth:`~celery.task.base.BaseTask.retry`
                 to resend the task to the same destination queue.
 
   **NOTE** As some messaging backends doesn't have advanced routing
   capabilities, you can't trust the availability of keys in this mapping.
 
+
+Example Usage
+-------------
+
+::
+
+    @celery.task
+    def add(x, y):
+        print("Executing task id %r, args: %r kwargs: %r" % (
+            add.request.id, add.request.args, add.request.kwargs))
+
 .. _task-logging:
 
 Logging
@@ -97,20 +91,9 @@ the worker log:
 
 .. code-block:: python
 
-    class AddTask(Task):
-
-        def run(self, x, y, **kwargs):
-            logger = self.get_logger(**kwargs)
-            logger.info("Adding %s + %s" % (x, y))
-            return x + y
-
-or using the decorator syntax:
-
-.. code-block:: python
-
-    @task()
-    def add(x, y, **kwargs):
-        logger = add.get_logger(**kwargs)
+    @celery.task()
+    def add(x, y):
+        logger = add.get_logger()
         logger.info("Adding %s + %s" % (x, y))
         return x + y
 
@@ -125,34 +108,27 @@ out/-err will be written to the logfile as well.
 Retrying a task if something fails
 ==================================
 
-Simply use :meth:`~celery.task.base.Task.retry` to re-send the task.
+Simply use :meth:`~celery.task.base.BaseTask.retry` to re-send the task.
 It will do the right thing, and respect the
-:attr:`~celery.task.base.Task.max_retries` attribute:
+:attr:`~celery.task.base.BaseTask.max_retries` attribute:
 
 .. code-block:: python
 
-    @task()
-    def send_twitter_status(oauth, tweet, **kwargs):
+    @celery.task()
+    def send_twitter_status(oauth, tweet):
         try:
             twitter = Twitter(oauth)
             twitter.update_status(tweet)
         except (Twitter.FailWhaleError, Twitter.LoginError), exc:
-            send_twitter_status.retry(args=[oauth, tweet], kwargs=kwargs, exc=exc)
+            send_twitter_status.retry(exc=exc)
 
 Here we used the ``exc`` argument to pass the current exception to
-:meth:`~celery.task.base.Task.retry`. At each step of the retry this exception
+:meth:`~celery.task.base.BaseTask.retry`. At each step of the retry this exception
 is available as the tombstone (result) of the task. When
-:attr:`~celery.task.base.Task.max_retries` has been exceeded this is the
-exception raised. However, if an ``exc`` argument is not provided the
+:attr:`~celery.task.base.BaseTask.max_retries` has been exceeded this is the
+exception raised.  However, if an ``exc`` argument is not provided the
 :exc:`~celery.exceptions.RetryTaskError` exception is raised instead.
 
-**Important note:** The task has to take the magic keyword arguments
-in order for max retries to work properly, this is because it keeps track
-of the current number of retries using the ``task_retries`` keyword argument
-passed on to the task. In addition, it also uses the ``task_id`` keyword
-argument to use the same task id, and ``delivery_info`` to route the
-retried task to the same destination.
-
 .. _task-retry-custom-delay:
 
 Using a custom retry delay
@@ -160,25 +136,22 @@ Using a custom retry delay
 
 When a task is to be retried, it will wait for a given amount of time
 before doing so. The default delay is in the
-:attr:`~celery.task.base.Task.default_retry_delay` 
+:attr:`~celery.task.base.BaseTask.default_retry_delay` 
 attribute on the task. By default this is set to 3 minutes. Note that the
 unit for setting the delay is in seconds (int or float).
 
 You can also provide the ``countdown`` argument to
-:meth:`~celery.task.base.Task.retry` to override this default.
+:meth:`~celery.task.base.BaseTask.retry` to override this default.
 
 .. code-block:: python
 
-    class MyTask(Task):
-        default_retry_delay = 30 * 60 # retry in 30 minutes
-
-        def run(self, x, y, **kwargs):
-            try:
-                ...
-            except Exception, exc:
-                self.retry([x, y], kwargs, exc=exc,
-                           countdown=60) # override the default and
-                                         # - retry in 1 minute
+    @celery.task(default_retry_delay=30 * 60)  # retry in 30 minutes.
+    def add(x, y):
+        try:
+            ...
+        except Exception, exc:
+            self.retry(exc=exc, countdown=60)  # override the default and
+                                               # retry in 1 minute
 
 .. _task-options:
 
@@ -198,6 +171,13 @@ General
     automatically generated using the module and class name.  See
     :ref:`task-names`.
 
+.. attribute Task.request
+
+    If the task is being executed this will contain information
+    about the current request.  Thread local storage is used.
+
+    See :ref:`task-request-info`.
+
 .. attribute:: Task.abstract
 
     Abstract classes are not registered, but are used as the
@@ -301,7 +281,7 @@ General
 
 .. seealso::
 
-    The API reference for :class:`~celery.task.base.Task`.
+    The API reference for :class:`~celery.task.base.BaseTask`.
 
 .. _task-message-options:
 
@@ -367,7 +347,7 @@ For example:
 
 .. code-block:: python
 
-    >>> @task(name="sum-of-two-numbers")
+    >>> @celery.task(name="sum-of-two-numbers")
     >>> def add(x, y):
     ...     return x + y
 
@@ -380,7 +360,7 @@ another module:
 
 .. code-block:: python
 
-    >>> @task(name="tasks.add")
+    >>> @celery.task(name="tasks.add")
     >>> def add(x, y):
     ...     return x + y
 
@@ -393,7 +373,7 @@ task if the module name is "tasks.py":
 
 .. code-block:: python
 
-    >>> @task()
+    >>> @celery.task()
     >>> def add(x, y):
     ...     return x + y
 
@@ -535,15 +515,14 @@ The name of the state is usually an uppercase string.  As an example
 you could have a look at :mod:`abortable tasks <~celery.contrib.abortable>`
 wich defines its own custom :state:`ABORTED` state.
 
-Use :meth:`Task.update_state <celery.task.base.Task.update_state>` to
+Use :meth:`Task.update_state <celery.task.base.BaseTask.update_state>` to
 update a tasks state::
 
-    @task
-    def upload_files(filenames, **kwargs):
-
+    @celery.task
+    def upload_files(filenames):
         for i, file in enumerate(filenames):
-            upload_files.update_state(kwargs["task_id"], "PROGRESS",
-                {"current": i, "total": len(filenames)})
+            upload_files.update_state(state="PROGRESS",
+                meta={"current": i, "total": len(filenames)})
 
 
 Here we created the state ``"PROGRESS"``, which tells any application
@@ -588,10 +567,10 @@ The default loader imports any modules listed in the
 
 The entity responsible for registering your task in the registry is a
 meta class, :class:`~celery.task.base.TaskType`.  This is the default
-meta class for :class:`~celery.task.base.Task`.
+meta class for :class:`~celery.task.base.BaseTask`.
 
 If you want to register your task manually you can set mark the
-task as :attr:`~celery.task.base.Task.abstract`:
+task as :attr:`~celery.task.base.BaseTask.abstract`:
 
 .. code-block:: python
 
@@ -619,12 +598,12 @@ Ignore results you don't want
 -----------------------------
 
 If you don't care about the results of a task, be sure to set the
-:attr:`~celery.task.base.Task.ignore_result` option, as storing results
+:attr:`~celery.task.base.BaseTask.ignore_result` option, as storing results
 wastes time and resources.
 
 .. code-block:: python
 
-    @task(ignore_result=True)
+    @celery.task(ignore_result=True)
     def mytask(...)
         something()
 
@@ -661,21 +640,21 @@ Make your design asynchronous instead, for example by using *callbacks*.
 
 .. code-block:: python
 
-    @task()
+    @celery.task()
     def update_page_info(url):
         page = fetch_page.delay(url).get()
         info = parse_page.delay(url, page).get()
         store_page_info.delay(url, info)
 
-    @task()
+    @celery.task()
     def fetch_page(url):
         return myhttplib.get(url)
 
-    @task()
+    @celery.task()
     def parse_page(url, page):
         return myparser.parse_document(page)
 
-    @task()
+    @celery.task()
     def store_page_info(url, info):
         return PageInfo.objects.create(url, info)
 
@@ -684,13 +663,13 @@ Make your design asynchronous instead, for example by using *callbacks*.
 
 .. code-block:: python
 
-    @task(ignore_result=True)
+    @celery.task(ignore_result=True)
     def update_page_info(url):
         # fetch_page -> parse_page -> store_page
         fetch_page.delay(url, callback=subtask(parse_page,
                                     callback=subtask(store_page_info)))
 
-    @task(ignore_result=True)
+    @celery.task(ignore_result=True)
     def fetch_page(url, callback=None):
         page = myhttplib.get(url)
         if callback:
@@ -699,13 +678,13 @@ Make your design asynchronous instead, for example by using *callbacks*.
             # into a subtask object.
             subtask(callback).delay(url, page)
 
-    @task(ignore_result=True)
+    @celery.task(ignore_result=True)
     def parse_page(url, page, callback=None):
         info = myparser.parse_document(page)
         if callback:
             subtask(callback).delay(url, info)
 
-    @task(ignore_result=True)
+    @celery.task(ignore_result=True)
     def store_page_info(url, info):
         PageInfo.objects.create(url, info)
 
@@ -805,7 +784,7 @@ that automatically expands some abbreviations in it:
         title = models.CharField()
         body = models.TextField()
 
-    @task
+    @celery.task
     def expand_abbreviations(article):
         article.body.replace("MyCorp", "My Corporation")
         article.save()
@@ -826,7 +805,7 @@ re-fetch the article in the task body:
 
 .. code-block:: python
 
-    @task
+    @celery.task
     def expand_abbreviations(article_id):
         article = Article.objects.get(id=article_id)
         article.body.replace("MyCorp", "My Corporation")
@@ -987,26 +966,26 @@ blog/tasks.py
 
 
     @task
-    def spam_filter(comment_id, remote_addr=None, **kwargs):
-            logger = spam_filter.get_logger(**kwargs)
-            logger.info("Running spam filter for comment %s" % comment_id)
-
-            comment = Comment.objects.get(pk=comment_id)
-            current_domain = Site.objects.get_current().domain
-            akismet = Akismet(settings.AKISMET_KEY, "http://%s" % domain)
-            if not akismet.verify_key():
-                raise ImproperlyConfigured("Invalid AKISMET_KEY")
-
-
-            is_spam = akismet.comment_check(user_ip=remote_addr,
-                                comment_content=comment.comment,
-                                comment_author=comment.name,
-                                comment_author_email=comment.email_address)
-            if is_spam:
-                comment.is_spam = True
-                comment.save()
-
-            return is_spam
+    def spam_filter(comment_id, remote_addr=None):
+        logger = spam_filter.get_logger()
+        logger.info("Running spam filter for comment %s" % comment_id)
+
+        comment = Comment.objects.get(pk=comment_id)
+        current_domain = Site.objects.get_current().domain
+        akismet = Akismet(settings.AKISMET_KEY, "http://%s" % domain)
+        if not akismet.verify_key():
+            raise ImproperlyConfigured("Invalid AKISMET_KEY")
+
+
+        is_spam = akismet.comment_check(user_ip=remote_addr,
+                            comment_content=comment.comment,
+                            comment_author=comment.name,
+                            comment_author_email=comment.email_address)
+        if is_spam:
+            comment.is_spam = True
+            comment.save()
+
+        return is_spam
 
 .. _`Akismet`: http://akismet.com/faq/
 .. _`akismet.py`: http://www.voidspace.org.uk/downloads/akismet.py

+ 5 - 2
examples/app/myapp.py

@@ -16,8 +16,11 @@ from celery import Celery
 celery = Celery("myapp")
 celery.conf.update(BROKER_HOST="localhost")
 
-@celery.task()
-def add(x, y):
+@celery.task(accept_magic_kwargs=False)
+def add(x, y, **kwargs):
+    print("add id: %r %r %r" % (add.request.id, add.request.args,
+        add.request.kwargs))
+    print("kwargs: %r" % (kwargs, ))
     return x + y
 
 if __name__ == "__main__":