Explorar o código

Merge branch 'master' of http://github.com/ask/celery

Gunnlaugur Thor Briem %!s(int64=14) %!d(string=hai) anos
pai
achega
91d456bdad

+ 30 - 15
Changelog

@@ -7,7 +7,10 @@
 
 2.0.1
 =====
-:release-date: TBA
+:release-date: 2010-07-09 03:02 P.M CEST
+
+* multiprocessing.pool: Now handles encoding errors, so that pickling errors
+  doesn't crash the worker processes.
 
 * The remote control command replies was not working with RabbitMQ 1.8.0's
   stricter equivalence checks.
@@ -24,25 +27,37 @@
 * A bug sneaked in the ETA scheduler that made it only able to execute
   one task per second(!)
 
-  The scheduler sleeps between iterations so it doesn't consume too much CPU.
-  It keeps a list of the scheduled items sorted by time, at each iteration
-  it sleeps for the remaining time of the item with the nearest deadline.
-  If there are no eta tasks it will sleep for a minimum amount of time, one
-  second by default.
+    The scheduler sleeps between iterations so it doesn't consume too much CPU.
+    It keeps a list of the scheduled items sorted by time, at each iteration
+    it sleeps for the remaining time of the item with the nearest deadline.
+    If there are no eta tasks it will sleep for a minimum amount of time, one
+    second by default.
+
+    A bug sneaked in here, making it sleep for one second for every task
+    that was scheduled. This has been fixed, so now it should move
+    tasks like hot knife through butter.
+
+    In addition a new setting has been added to control the minimum sleep
+    interval; ``CELERYD_ETA_SCHEDULER_PRECISION``. A good
+    value for this would be a float between 0 and 1, depending
+    on the needed precision. A value of 0.8 means that when the ETA of a task
+    is met, it will take at most 0.8 seconds for the task to be moved to the
+    ready queue.
+
+* Pool: Supervisor did not release the semaphore.
 
-  A bug sneaked in here, making it sleep for one second for every task
-  that was scheduled. This has been fixed, so now it should move
-  tasks like hot knife through butter.
+    This would lead to a deadlock if all workers terminated prematurely.
 
-  In addition a new setting has been added to control the minimum sleep
-  interval; ``CELERYD_ETA_SCHEDULER_PRECISION``. A good
-  value for this would be a float between 0 and 1, depending
-  on the needed precision. A value of 0.8 means that when the ETA of a task
-  is met, it will take at most 0.8 seconds for the task to be moved to the
-  ready queue.
+* Added Python version trove classifiers: 2.4, 2.5, 2.6 and 2.7
+
+* Tests now passing on Python 2.7.
 
 * Task.__reduce__: Tasks created using the task decorator can now be pickled.
 
+* setup.py: nose added to ``tests_require``.
+
+* Pickle should now work with SQLAlchemy 0.5.x
+
 * New homepage design by Jan Henrik Helmers: http://celeryproject.org
 
 * New Sphinx theme by Armin Ronacher: http://celeryproject.org/docs

+ 18 - 35
FAQ

@@ -330,58 +330,44 @@ Can I use celery with ActiveMQ/STOMP?
 
 **Answer**: Yes, but this is somewhat experimental for now.
 It is working ok in a test configuration, but it has not
-been tested in production like RabbitMQ has. If you have any problems with
-using STOMP and celery, please report the bugs to the issue tracker:
+been tested in production. If you have any problems
+using STOMP with celery, please report an issue here::
 
     http://github.com/ask/celery/issues/
 
-First you have to use the ``master`` branch of ``celery``::
+The STOMP carrot backend requires the `stompy`_ library::
 
-    $ git clone git://github.com/ask/celery.git
-    $ cd celery
-    $ sudo python setup.py install
-    $ cd ..
-
-Then you need to install the ``stompbackend`` branch of ``carrot``::
-
-    $ git clone git://github.com/ask/carrot.git
-    $ cd carrot
-    $ git checkout stompbackend
-    $ sudo python setup.py install
-    $ cd ..
-
-And my fork of ``python-stomp`` which adds non-blocking support::
-
-    $ hg clone http://bitbucket.org/asksol/python-stomp/
+    $ pip install stompy
     $ cd python-stomp
     $ sudo python setup.py install
     $ cd ..
 
+.. _`stompy`: http://pypi.python.org/pypi/stompy
+
 In this example we will use a queue called ``celery`` which we created in
 the ActiveMQ web admin interface.
 
-**Note**: For ActiveMQ the queue name has to have ``"/queue/"`` prepended to
-it. i.e. the queue ``celery`` becomes ``/queue/celery``.
+**Note**: When using ActiveMQ the queue name needs to have ``"/queue/"``
+prepended to it. i.e. the queue ``celery`` becomes ``/queue/celery``.
 
-Since a STOMP queue is a single named entity and it doesn't have the
-routing capabilities of AMQP you need to set both the ``queue``, and
-``exchange`` settings to your queue name. This is a minor inconvenience since
-carrot needs to maintain the same interface for both AMQP and STOMP (obviously
-the one with the most capabilities won).
+Since STOMP doesn't have exchanges and the routing capabilities of AMQP,
+you need to set ``exchange`` name to the same as the queue name. This is
+a minor inconvenience since carrot needs to maintain the same interface
+for both AMQP and STOMP.
 
-Use the following specific settings in your ``celeryconfig.py``/django ``settings.py``:
+Use the following settings in your ``celeryconfig.py``/django ``settings.py``:
 
 .. code-block:: python
 
-    # Makes python-stomp the default backend for carrot.
+    # Use the stomp carrot backend.
     CARROT_BACKEND = "stomp"
 
     # STOMP hostname and port settings.
     BROKER_HOST = "localhost"
     BROKER_PORT = 61613
 
-    # The queue name to use (both queue and exchange must be set to the
-    # same queue name when using STOMP)
+    # The queue name to use (the exchange *must* be set to the
+    # same as the queue name when using STOMP)
     CELERY_DEFAULT_QUEUE = "/queue/celery"
     CELERY_DEFAULT_EXCHANGE = "/queue/celery" 
 
@@ -389,11 +375,8 @@ Use the following specific settings in your ``celeryconfig.py``/django ``setting
         "/queue/celery": {"exchange": "/queue/celery"}
     }
 
-Now you can go on reading the tutorial in the README, ignoring any AMQP
-specific options. 
-
-What features are not supported when using STOMP?
---------------------------------------------------
+What features are not supported when using ghettoq/STOMP?
+---------------------------------------------------------
 
 This is a (possible incomplete) list of features not available when
 using the STOMP backend:

+ 4 - 3
README.rst

@@ -4,7 +4,7 @@
 
 .. image:: http://cloud.github.com/downloads/ask/celery/celery_favicon_128.png
 
-:Version: 2.0.0
+:Version: 2.0.1
 :Web: http://celeryproject.org/
 :Download: http://pypi.python.org/pypi/celery/
 :Source: http://github.com/ask/celery/
@@ -13,8 +13,9 @@
 
 --
 
-Celery is an asynchronous task queue/job queue based on distributed message passing.
-It is focused on real-time operation, but supports scheduling as well.
+Celery is an open source asynchronous task queue/job queue based on
+distributed message passing. It is focused on real-time operation,
+but supports scheduling as well.
 
 The execution units, called tasks, are executed concurrently on a single or
 more worker servers. Tasks can execute asynchronously (in the background) or synchronously

+ 1 - 1
celery/__init__.py

@@ -1,6 +1,6 @@
 """Distributed Task Queue"""
 
-VERSION = (2, 0, 0)
+VERSION = (2, 0, 1)
 
 __version__ = ".".join(map(str, VERSION[0:3])) + "".join(VERSION[3:])
 __author__ = "Ask Solem"

+ 1 - 1
celery/concurrency/processes/__init__.py

@@ -8,7 +8,7 @@ from celery import log
 from celery.datastructures import ExceptionInfo
 from celery.utils.functional import curry
 
-from celery.concurrency.processes.pool import Pool, RUN, MaybeEncodingError
+from celery.concurrency.processes.pool import Pool, RUN
 
 
 class TaskPool(object):

+ 8 - 8
celery/concurrency/processes/pool.py

@@ -51,10 +51,6 @@ def mapstar(args):
 #
 
 
-def soft_timeout_sighandler(signum, frame):
-    raise SoftTimeLimitExceeded()
-
-
 class MaybeEncodingError(Exception):
     """Wraps unpickleable object."""
 
@@ -63,12 +59,17 @@ class MaybeEncodingError(Exception):
         self.value = repr(value)
         super(MaybeEncodingError, self).__init__(self.exc, self.value)
 
-    def __str__(self):
-        return "Error sending result: '%s'. Reason: '%s'." % (self.value,
-                                                              self.exc)
     def __repr__(self):
         return "<MaybeEncodingError: %s>" % str(self)
 
+    def __str__(self):
+        return "Error sending result: '%s'. Reason: '%s'." % (
+                    self.value, self.exc)
+
+
+def soft_timeout_sighandler(signum, frame):
+    raise SoftTimeLimitExceeded()
+
 
 def worker(inqueue, outqueue, ackqueue, initializer=None, initargs=(),
         maxtasks=None):
@@ -109,7 +110,6 @@ def worker(inqueue, outqueue, ackqueue, initializer=None, initargs=(),
             put((job, i, result))
         except Exception, exc:
             wrapped = MaybeEncodingError(exc, result[1])
-            debug('Got possible encoding error while sending result: %s' % wrapped)
             put((job, i, (False, wrapped)))
 
         completed += 1

+ 10 - 4
celery/db/dfd042c7.py

@@ -1,6 +1,9 @@
 """
 dfd042c7
-SQLAlchemy 0.5.8 version of a805d4bd, see the docstring of that module for an explanation of this workaround.
+
+SQLAlchemy 0.5.8 version of a805d4bd, see the docstring of that module
+for an explanation of this workaround.
+
 """
 from sqlalchemy.types import PickleType as _PickleType
 from sqlalchemy import util
@@ -31,11 +34,14 @@ class PickleType(_PickleType):
         if self.comparator:
             return self.comparator(x, y)
         elif self.mutable and not hasattr(x, '__eq__') and x is not None:
-            util.warn_deprecated("Objects stored with PickleType when mutable=True must implement __eq__() for reliable comparison.")
-            return self.pickler.dumps(x, self.protocol) == self.pickler.dumps(y, self.protocol)
+            util.warn_deprecated(
+                    "Objects stored with PickleType when mutable=True "
+                    "must implement __eq__() for reliable comparison.")
+            a = self.pickler.dumps(x, self.protocol)
+            b = self.pickler.dumps(y, self.protocol)
+            return a == b
         else:
             return x == y
 
     def is_mutable(self):
         return self.mutable
-

+ 2 - 0
celery/tests/config.py

@@ -12,6 +12,8 @@ CELERY_DEFAULT_EXCHANGE = "testcelery"
 CELERY_DEFAULT_ROUTING_KEY = "testcelery"
 CELERY_QUEUES = {"testcelery": {"binding_key": "testcelery"}}
 
+CELERYD_LOG_COLOR = False
+
 @atexit.register
 def teardown_testdb():
     import os

+ 7 - 0
celery/tests/functional/__init__.py

@@ -0,0 +1,7 @@
+import os
+
+config = os.environ.setdefault("CELERY_FUNTEST_CONFIG_MODULE",
+                               "celery.tests.functional.config")
+
+os.environ["CELERY_CONFIG_MODULE"] = config
+os.environ["CELERY_LOADER"] = "default"

+ 120 - 0
celery/tests/functional/case.py

@@ -0,0 +1,120 @@
+import atexit
+import logging
+import os
+import signal
+import socket
+import sys
+import traceback
+import unittest2 as unittest
+
+from itertools import count
+
+from celery.task.control import broadcast, ping
+from celery.utils import get_full_cls_name
+
+HOSTNAME = socket.gethostname()
+
+def say(msg):
+    sys.stderr.write("%s\n" % msg)
+
+def flatten_response(response):
+    flat = {}
+    for item in response:
+        flat.update(item)
+    return flat
+
+class Worker(object):
+    started = False
+    next_worker_id = count(1).next
+    _shutdown_called = False
+
+    def __init__(self, hostname, loglevel="error"):
+        self.hostname = hostname
+        self.loglevel = loglevel
+
+    def start(self):
+        if not self.started:
+            self._fork_and_exec()
+            self.started = True
+
+    def _fork_and_exec(self):
+        pid = os.fork()
+        if pid == 0:
+            os.execv(sys.executable,
+                    [sys.executable] + ["-m", "celery.bin.celeryd",
+                                        "-l", self.loglevel,
+                                        "-n", self.hostname])
+            os.exit()
+        self.pid = pid
+
+    def is_alive(self, timeout=1):
+        r = ping(destination=[self.hostname],
+                 timeout=timeout)
+        return self.hostname in flatten_response(r)
+
+    def wait_until_started(self, timeout=10, interval=0.2):
+        for iteration in count(0):
+            if iteration * interval >= timeout:
+                raise Exception(
+                        "Worker won't start (after %s secs.)" % timeout)
+            if self.is_alive(interval):
+                break
+        say("--WORKER %s IS ONLINE--" % self.hostname)
+
+    def ensure_shutdown(self, timeout=10, interval=0.5):
+        os.kill(self.pid, signal.SIGTERM)
+        for iteration in count(0):
+            if iteration * interval >= timeout:
+                raise Exception(
+                        "Worker won't shutdown (after %s secs.)" % timeout)
+            broadcast("shutdown", destination=[self.hostname])
+            if not self.is_alive(interval):
+                break
+        say("--WORKER %s IS SHUTDOWN--" % self.hostname)
+        self._shutdown_called = True
+
+    def ensure_started(self):
+        self.start()
+        self.wait_until_started()
+
+    @classmethod
+    def managed(cls, hostname=None, caller=None):
+        hostname = hostname or socket.gethostname()
+        if caller:
+            hostname = ".".join([get_full_cls_name(caller), hostname])
+        else:
+            hostname += str(cls.next_worker_id())
+        worker = cls(hostname)
+        worker.ensure_started()
+        stack = traceback.format_stack()
+
+        @atexit.register
+        def _ensure_shutdown_once():
+            if not worker._shutdown_called:
+                say("-- Found worker not stopped at shutdown: %s\n%s" % (
+                        worker.hostname,
+                        "\n".join(stack)))
+                worker.ensure_shutdown()
+
+        return worker
+
+
+class WorkerCase(unittest.TestCase):
+    hostname = HOSTNAME
+    worker = None
+
+    @classmethod
+    def setUpClass(cls):
+        logging.getLogger("amqplib").setLevel(logging.ERROR)
+        cls.worker = Worker.managed(cls.hostname, caller=cls)
+
+    @classmethod
+    def tearDownClass(cls):
+        cls.worker.ensure_shutdown()
+
+    def assertWorkerAlive(self, timeout=1):
+        self.assertTrue(self.worker.is_alive)
+
+    def my_response(self, response):
+        return flatten_response(response)[self.worker.hostname]
+

+ 28 - 0
celery/tests/functional/config.py

@@ -0,0 +1,28 @@
+import atexit
+import os
+
+CARROT_BACKEND = os.environ.get("CARROT_BACKEND") or "amqplib"
+
+BROKER_HOST = os.environ.get("BROKER_HOST") or "localhost"
+BROKER_USER = os.environ.get("BROKER_USER") or "guest"
+BROKER_PASSWORD = os.environ.get("BROKER_PASSWORD") or "guest"
+BROKER_VHOST = os.environ.get("BROKER_VHOST") or "/"
+
+
+CELERY_RESULT_BACKEND = "amqp"
+CELERY_SEND_TASK_ERROR_EMAILS = False
+
+CELERY_DEFAULT_QUEUE = "testcelery"
+CELERY_DEFAULT_EXCHANGE = "testcelery"
+CELERY_DEFAULT_ROUTING_KEY = "testcelery"
+CELERY_QUEUES = {"testcelery": {"binding_key": "testcelery"}}
+
+CELERYD_LOG_COLOR = False
+
+CELERY_IMPORTS = ("celery.tests.functional.tasks", )
+
+@atexit.register
+def teardown_testdb():
+    import os
+    if os.path.exists("test.db"):
+        os.remove("test.db")

+ 23 - 0
celery/tests/functional/tasks.py

@@ -0,0 +1,23 @@
+import time
+
+from celery.decorators import task
+from celery.task.sets import subtask
+
+
+@task
+def add(x, y):
+    return x + y
+
+
+@task
+def add_cb(x, y, callback=None):
+    result = x + y
+    if callback:
+        return subtask(callback).apply_async(result)
+    return result
+
+
+@task
+def sleeptask(i):
+    time.sleep(i)
+    return i

+ 2 - 0
celery/tests/functional/test.cfg

@@ -0,0 +1,2 @@
+[nose]
+where = celery/tests/functional

+ 32 - 0
celery/tests/functional/test_basic.py

@@ -0,0 +1,32 @@
+import operator
+import time
+
+from celery.task.control import broadcast
+
+from celery.tests.functional import tasks
+from celery.tests.functional.case import WorkerCase
+
+
+class test_basic(WorkerCase):
+
+    def test_started(self):
+        self.assertWorkerAlive()
+
+    def test_roundtrip_simple_task(self):
+        publisher = tasks.add.get_publisher()
+        results = [(tasks.add.apply_async(i, publisher=publisher), i)
+                        for i in zip(xrange(100), xrange(100))]
+        for result, i in results:
+            self.assertEqual(result.get(timeout=10), operator.add(*i))
+
+    def test_dump_active(self):
+        tasks.sleeptask.delay(2)
+        tasks.sleeptask.delay(2)
+        time.sleep(0.2)
+        r = broadcast("dump_active",
+                           arguments={"safe": True}, reply=True)
+        active = self.my_response(r)
+        self.assertEqual(len(active), 2)
+        self.assertEqual(active[0]["name"], tasks.sleeptask.name)
+        self.assertEqual(active[0]["args"], [2])
+

+ 6 - 4
celery/tests/test_task_sets.py

@@ -96,16 +96,17 @@ class test_TaskSet(unittest.TestCase):
             self.assertTrue(log)
             self.assertIn("Using this invocation of TaskSet is deprecated",
                           log[0].message.args[0])
+            self.assertListEqual(ts.tasks,
+                                 [MockTask.subtask((i, i))
+                                    for i in (2, 4, 8)])
             return ts
 
         context = catch_warnings(record=True)
-        ts = execute_context(context, with_catch_warnings)
-        self.assertListEqual(ts.tasks,
-                             [MockTask.subtask((i, i))
-                                 for i in (2, 4, 8)])
+        execute_context(context, with_catch_warnings)
 
         # TaskSet.task (deprecated)
         def with_catch_warnings2(log):
+            ts = TaskSet(MockTask, [[(2, 2)], [(4, 4)], [(8, 8)]])
             self.assertEqual(ts.task, MockTask)
             self.assertTrue(log)
             self.assertIn("TaskSet.task is deprecated",
@@ -115,6 +116,7 @@ class test_TaskSet(unittest.TestCase):
 
         # TaskSet.task_name (deprecated)
         def with_catch_warnings3(log):
+            ts = TaskSet(MockTask, [[(2, 2)], [(4, 4)], [(8, 8)]])
             self.assertEqual(ts.task_name, MockTask.name)
             self.assertTrue(log)
             self.assertIn("TaskSet.task_name is deprecated",

+ 0 - 1
celery/tests/test_utils.py

@@ -101,7 +101,6 @@ class test_utils(unittest.TestCase):
         self.assertEqual("four", utils.firstmethod("m")([
             A(), A(), A(), promise(lambda: A("four")), A("five")]))
 
-
     def test_first(self):
         iterations = [0]
 

+ 1 - 2
celery/utils/__init__.py

@@ -88,6 +88,7 @@ def maybe_promise(value):
         return value.evaluate()
     return value
 
+
 def noop(*args, **kwargs):
     """No operation.
 
@@ -341,5 +342,3 @@ def instantiate(name, *args, **kwargs):
 
     """
     return get_cls_by_name(name)(*args, **kwargs)
-
-

+ 4 - 4
celery/utils/compat.py

@@ -366,7 +366,7 @@ def _compat_chain_from_iterable(iterables):
         for element in it:
             yield element
 
-#try:
-#    chain_from_iterable = getattr(chain, "from_iterable")
-#except AttributeError:
-chain_from_iterable = _compat_chain_from_iterable
+try:
+    chain_from_iterable = getattr(chain, "from_iterable")
+except AttributeError:
+    chain_from_iterable = _compat_chain_from_iterable

+ 0 - 1
celery/worker/control/builtins.py

@@ -3,7 +3,6 @@ from datetime import datetime
 from celery import conf
 from celery.backends import default_backend
 from celery.registry import tasks
-from celery.serialization import pickle
 from celery.utils import timeutils
 from celery.worker import state
 from celery.worker.state import revoked

+ 7 - 9
celery/worker/job.py

@@ -403,15 +403,13 @@ class TaskRequest(object):
                                        exception=repr(exc_info.exception),
                                        traceback=exc_info.traceback)
 
-        context = {
-            "hostname": self.hostname,
-            "id": self.task_id,
-            "name": self.task_name,
-            "exc": repr(exc_info.exception),
-            "traceback": unicode(exc_info.traceback, 'utf-8'),
-            "args": self.args,
-            "kwargs": self.kwargs,
-        }
+        context = {"hostname": self.hostname,
+                   "id": self.task_id,
+                   "name": self.task_name,
+                   "exc": repr(exc_info.exception),
+                   "traceback": unicode(exc_info.traceback, 'utf-8'),
+                   "args": self.args,
+                   "kwargs": self.kwargs}
         self.logger.error(self.error_msg.strip() % context)
 
         task_obj = tasks.get(self.task_name, object)

+ 20 - 0
celery/worker/state.py

@@ -3,20 +3,40 @@ import shelve
 from celery.utils.compat import defaultdict
 from celery.datastructures import LimitedSet
 
+# Maximum number of revokes to keep in memory.
 REVOKES_MAX = 10000
+
+# How many seconds a revoke will be active before
+# being expired when the max limit has been exceeded.
 REVOKE_EXPIRES = 3600 # One hour.
 
+"""
+.. data:: active_requests
+
+Set of currently active :class:`~celery.worker.job.TaskRequest`'s.
+
+.. data:: total_count
+
+Count of tasks executed by the worker, sorted by type.
+
+.. data:: revoked
+
+The list of currently revoked tasks. (PERSISTENT if statedb set).
+
+"""
 active_requests = set()
 total_count = defaultdict(lambda: 0)
 revoked = LimitedSet(maxlen=REVOKES_MAX, expires=REVOKE_EXPIRES)
 
 
 def task_accepted(request):
+    """Updates global state when a task has been accepted."""
     active_requests.add(request)
     total_count[request.task_name] += 1
 
 
 def task_ready(request):
+    """Updates global state when a task is ready."""
     try:
         active_requests.remove(request)
     except KeyError:

+ 2 - 1
contrib/release/doc4allmods

@@ -7,7 +7,8 @@ SKIP_FILES="celery.bin.rst celery.contrib.rst
             celery.models.rst
             celery.concurrency.rst
             celery.db.rst
-            celery.db.a805d4bd.rst"
+            celery.db.a805d4bd.rst
+            celery.db.dfd042c7.rst"
 
 modules=$(find "$PACKAGE" -name "*.py")
 

+ 0 - 1
contrib/requirements/default.txt

@@ -3,5 +3,4 @@ python-dateutil
 sqlalchemy
 anyjson
 carrot>=0.10.5
-django-picklefield
 pyparsing

+ 3 - 3
docs/conf.py

@@ -65,7 +65,7 @@ latex_documents = [
 html_theme = "celery"
 html_theme_path = ["_theme"]
 html_sidebars = {
-    'index':    ['sidebarintro.html', 'sourcelink.html', 'searchbox.html'],
-    '**':       ['sidebarlogo.html', 'localtoc.html', 'relations.html',
-                 'sourcelink.html', 'searchbox.html']
+    'index': ['sidebarintro.html', 'sourcelink.html', 'searchbox.html'],
+    '**': ['sidebarlogo.html', 'localtoc.html', 'relations.html',
+           'sourcelink.html', 'searchbox.html'],
 }

+ 4 - 3
docs/includes/introduction.txt

@@ -1,4 +1,4 @@
-:Version: 2.0.0
+:Version: 2.0.1
 :Web: http://celeryproject.org/
 :Download: http://pypi.python.org/pypi/celery/
 :Source: http://github.com/ask/celery/
@@ -7,8 +7,9 @@
 
 --
 
-Celery is an asynchronous task queue/job queue based on distributed message passing.
-It is focused on real-time operation, but supports scheduling as well.
+Celery is an open source asynchronous task queue/job queue based on
+distributed message passing. It is focused on real-time operation,
+but supports scheduling as well.
 
 The execution units, called tasks, are executed concurrently on a single or
 more worker servers. Tasks can execute asynchronously (in the background) or synchronously

+ 0 - 10
docs/internals/reference/celery.worker.revoke.rst

@@ -1,10 +0,0 @@
-==============================================
- Worker Revoked Tasks - celery.worker.revoke
-==============================================
-
-.. data:: revoked
-
-    A :class:`celery.datastructures.LimitedSet` containing revoked task ids.
-
-    Items expire after one hour, and the structure can only hold
-    10000 expired items at a time (about 300kb).

+ 11 - 0
docs/internals/reference/celery.worker.state.rst

@@ -0,0 +1,11 @@
+====================================
+ Worker State - celery.worker.state
+====================================
+
+.. contents::
+    :local:
+.. currentmodule:: celery.worker.state
+
+.. automodule:: celery.worker.state
+    :members:
+    :undoc-members:

+ 1 - 1
docs/internals/reference/index.rst

@@ -18,7 +18,7 @@
     celery.worker.control
     celery.worker.control.builtins
     celery.worker.control.registry
-    celery.worker.revoke
+    celery.worker.state
     celery.concurrency.processes
     celery.concurrency.processes.pool
     celery.concurrency.threads