Prechádzať zdrojové kódy

Documents Ignore, Reject, Retry. Closes #1369

Ask Solem 11 rokov pred
rodič
commit
197c2dd384
2 zmenil súbory, kde vykonal 117 pridanie a 0 odobranie
  1. 1 0
      celery/worker/job.py
  2. 116 0
      docs/userguide/tasks.rst

+ 1 - 0
celery/worker/job.py

@@ -163,6 +163,7 @@ class Request(object):
             'exchange': delivery_info.get('exchange'),
             'routing_key': delivery_info.get('routing_key'),
             'priority': delivery_info.get('priority'),
+            'redelivered': delivery_info.get('redelivered'),
         }
         self.request_dict = body
 

+ 116 - 0
docs/userguide/tasks.rst

@@ -791,6 +791,122 @@ you have to pass them as regular args:
 
             super(HttpError, self).__init__(status_code, headers, body)
 
+.. _task-semipredicates:
+
+Semipredicates
+==============
+
+The worker wraps the task in a tracing function which records the final
+state of the task.  There are a number of exceptions that can be used to
+signal this function to change how it treats the return of the task.
+
+.. _task-semipred-ignore:
+
+Ignore
+------
+
+The task may raise :exc:`~@Ignore` to force the worker to ignore the
+task.  This means that no state will be recorded for the task, but the
+message is still acknowledged (removed from queue).
+
+This is can be used if you want to implement custom revoke-like
+functionality, or manually store the result of a task.
+
+Example keeping revoked tasks in a Redis set:
+
+.. code-block:: python
+
+    from celery.exceptions import Ignore
+
+    @app.task(bind=True)
+    def some_task(self):
+        if redis.ismember('tasks.revoked', self.request.id):
+            raise Ignore()
+
+Example that stores results manually:
+
+.. code-block:: python
+
+    from celery import states
+    from celery.exceptions import Ignore
+
+    @app.task(bind=True)
+    def get_tweets(self, user):
+        timeline = twitter.get_timeline(user)
+        self.update_state(sate=states.SUCCESS, meta=timeline)
+        raise Ignore()
+
+.. _task-semipred-reject:
+
+Reject
+------
+
+The task may raise :exc:`~@Reject` to reject the task message using
+AMQPs ``basic_reject`` method.  This will not have any effect unless
+:attr:`Task.acks_late` is enabled.
+
+Rejecting a message has the same effect as acking it, but some
+brokers may implement additional functionality that can be used.
+For example RabbitMQ supports the concept of `Dead Letter Exchanges`_
+where a queue can be configured to use a dead letter exchange that rejected
+messages are redelivered to.
+
+.. _`Dead Letter Exchanges`: http://www.rabbitmq.com/dlx.html
+
+Reject can also be used to requeue messages, but please be very careful
+when using this as it can easily result in an infinite message loop.
+
+Example using reject when a task causes an out of memory condition:
+
+.. code-block:: python
+
+    import errno
+    from celery.exceptions import Reject
+
+    @app.task(bind=True, acks_late=True)
+    def render_scene(self, path):
+        file = get_file(path)
+        try:
+            renderer.render_scene(file)
+
+        # if the file is too big to fit in memory
+        # we reject it so that it's redelivered to the dead letter exchange
+        # and we can manually inspect the situation.
+        except MemoryError as exc:
+            raise Reject(exc, requeue=False)
+        except OSError as exc:
+            if exc.errno == errno.ENOMEM:
+                raise Reject(exc, requeue=False)
+
+        # For any other error we retry after 10 seconds.
+        except Exception as exc:
+            raise self.retry(exc, countdown=10)
+
+Example requeuing the message:
+
+.. code-block:: python
+
+    import errno
+    from celery.exceptions import Reject
+
+    @app.task(bind=True, acks_late=True)
+    def requeues(self):
+        if not self.request.delivery_info['redelivered']:
+            raise Requeue('no reason', requeue=True)
+        print('received two times')
+
+Consult your broker documentation for more details about the ``basic_reject``
+method.
+
+
+.. _task-semipred-retry:
+
+Retry
+-----
+
+The :exc:`~@Retry` exception is raised by the ``Task.retry`` method
+to tell the worker that the task is being retried.
+
 .. _task-custom-classes:
 
 Custom task classes