Преглед на файлове

Merge branch 'master' of http://github.com/ask/celery

Gunnlaugur Thor Briem преди 14 години
родител
ревизия
511bb9fb65

+ 17 - 10
celery/log.py

@@ -1,14 +1,15 @@
 """celery.log"""
 """celery.log"""
+import logging
+import threading
+import time
 import os
 import os
 import sys
 import sys
-import time
-import logging
 import traceback
 import traceback
 
 
 from celery import conf
 from celery import conf
 from celery.utils import noop
 from celery.utils import noop
-from celery.utils.patch import ensure_process_aware_logger
 from celery.utils.compat import LoggerAdapter
 from celery.utils.compat import LoggerAdapter
+from celery.utils.patch import ensure_process_aware_logger
 
 
 _hijacked = False
 _hijacked = False
 _monkeypatched = False
 _monkeypatched = False
@@ -17,12 +18,10 @@ BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = range(8)
 RESET_SEQ = "\033[0m"
 RESET_SEQ = "\033[0m"
 COLOR_SEQ = "\033[1;%dm"
 COLOR_SEQ = "\033[1;%dm"
 BOLD_SEQ = "\033[1m"
 BOLD_SEQ = "\033[1m"
-COLORS = {
-    "WARNING": YELLOW,
-    "DEBUG": BLUE,
-    "CRITICAL": MAGENTA,
-    "ERROR": RED,
-}
+COLORS = {"DEBUG": BLUE,
+          "WARNING": YELLOW,
+          "ERROR": RED,
+          "CRITICAL": MAGENTA}
 
 
 
 
 class ColorFormatter(logging.Formatter):
 class ColorFormatter(logging.Formatter):
@@ -174,6 +173,7 @@ class LoggingProxy(object):
     name = None
     name = None
     closed = False
     closed = False
     loglevel = logging.ERROR
     loglevel = logging.ERROR
+    _thread = threading.local()
 
 
     def __init__(self, logger, loglevel=None):
     def __init__(self, logger, loglevel=None):
         self.logger = logger
         self.logger = logger
@@ -207,10 +207,17 @@ class LoggingProxy(object):
         return map(wrap_handler, self.logger.handlers)
         return map(wrap_handler, self.logger.handlers)
 
 
     def write(self, data):
     def write(self, data):
+        if getattr(self._thread, "recurse_protection", False):
+            # Logger is logging back to this file, so stop recursing.
+            return
         """Write message to logging object."""
         """Write message to logging object."""
         data = data.strip()
         data = data.strip()
         if data and not self.closed:
         if data and not self.closed:
-            self.logger.log(self.loglevel, data)
+            self._thread.recurse_protection = True
+            try:
+                self.logger.log(self.loglevel, data)
+            finally:
+                self._thread.recurse_protection = False
 
 
     def writelines(self, sequence):
     def writelines(self, sequence):
         """``writelines(sequence_of_strings) -> None``.
         """``writelines(sequence_of_strings) -> None``.

+ 51 - 0
celery/task/control.py

@@ -88,6 +88,57 @@ def rate_limit(task_name, rate_limit, destination=None, **kwargs):
                                    **kwargs)
                                    **kwargs)
 
 
 
 
+def flatten_reply(reply):
+    nodes = {}
+    for item in reply:
+        nodes.update(item)
+    return nodes
+
+
+class inspect(object):
+
+    def __init__(self, destination=None, timeout=1):
+        self.destination = destination
+        self.timeout = timeout
+
+    def _prepare(self, reply):
+        if not reply:
+            return
+        by_node = flatten_reply(reply)
+        if not isinstance(self.destination, (list, tuple)):
+            return by_node.get(self.destination)
+        return by_node
+
+    def _request(self, command, **kwargs):
+        return self._prepare(broadcast(command, arguments=kwargs,
+                                      timeout=self.timeout, reply=True))
+
+    def active(self, safe=False):
+        return self._request("dump_active", safe=safe)
+
+    def scheduled(self, safe=False):
+        return self._request("dump_schedule", safe=safe)
+
+    def reserved(self, safe=False):
+        return self._request("dump_reserved", safe=safe)
+
+    def stats(self):
+        return self._request("stats")
+
+    def revoked(self):
+        return self._request("dump_revoked")
+
+    def registered_tasks(self):
+        return self._request("dump_registered_tasks")
+
+    def enable_events(self):
+        return self._request("enable_events")
+
+    def disable_events(self):
+        return self._request("disable_events")
+
+
+
 @with_connection
 @with_connection
 def broadcast(command, arguments=None, destination=None, connection=None,
 def broadcast(command, arguments=None, destination=None, connection=None,
         connect_timeout=conf.BROKER_CONNECTION_TIMEOUT, reply=False,
         connect_timeout=conf.BROKER_CONNECTION_TIMEOUT, reply=False,

+ 0 - 7
celery/tests/functional/__init__.py

@@ -1,7 +0,0 @@
-import os
-
-config = os.environ.setdefault("CELERY_FUNTEST_CONFIG_MODULE",
-                               "celery.tests.functional.config")
-
-os.environ["CELERY_CONFIG_MODULE"] = config
-os.environ["CELERY_LOADER"] = "default"

+ 68 - 22
celery/tests/functional/case.py

@@ -9,19 +9,25 @@ import unittest2 as unittest
 
 
 from itertools import count
 from itertools import count
 
 
-from celery.task.control import broadcast, ping
+from celery.exceptions import TimeoutError
+from celery.task.control import broadcast, ping, flatten_reply, inspect
 from celery.utils import get_full_cls_name
 from celery.utils import get_full_cls_name
 
 
 HOSTNAME = socket.gethostname()
 HOSTNAME = socket.gethostname()
 
 
+
 def say(msg):
 def say(msg):
     sys.stderr.write("%s\n" % msg)
     sys.stderr.write("%s\n" % msg)
 
 
-def flatten_response(response):
-    flat = {}
-    for item in response:
-        flat.update(item)
-    return flat
+
+def try_while(fun, reason="Timed out", timeout=10, interval=0.5):
+    for iterations in count(0):
+        if iterations * interval >= timeout:
+            raise TimeoutError()
+        ret = fun()
+        if ret:
+            return ret
+
 
 
 class Worker(object):
 class Worker(object):
     started = False
     started = False
@@ -50,26 +56,19 @@ class Worker(object):
     def is_alive(self, timeout=1):
     def is_alive(self, timeout=1):
         r = ping(destination=[self.hostname],
         r = ping(destination=[self.hostname],
                  timeout=timeout)
                  timeout=timeout)
-        return self.hostname in flatten_response(r)
+        return self.hostname in flatten_reply(r)
 
 
     def wait_until_started(self, timeout=10, interval=0.2):
     def wait_until_started(self, timeout=10, interval=0.2):
-        for iteration in count(0):
-            if iteration * interval >= timeout:
-                raise Exception(
-                        "Worker won't start (after %s secs.)" % timeout)
-            if self.is_alive(interval):
-                break
+        try_while(lambda: self.is_alive(interval),
+                "Worker won't start (after %s secs.)" % timeout,
+                interval=0.2, timeout=10)
         say("--WORKER %s IS ONLINE--" % self.hostname)
         say("--WORKER %s IS ONLINE--" % self.hostname)
 
 
     def ensure_shutdown(self, timeout=10, interval=0.5):
     def ensure_shutdown(self, timeout=10, interval=0.5):
         os.kill(self.pid, signal.SIGTERM)
         os.kill(self.pid, signal.SIGTERM)
-        for iteration in count(0):
-            if iteration * interval >= timeout:
-                raise Exception(
-                        "Worker won't shutdown (after %s secs.)" % timeout)
-            broadcast("shutdown", destination=[self.hostname])
-            if not self.is_alive(interval):
-                break
+        try_while(lambda: not self.is_alive(interval),
+                  "Worker won't shutdown (after %s secs.)" % timeout,
+                  timeout=10, interval=0.5)
         say("--WORKER %s IS SHUTDOWN--" % self.hostname)
         say("--WORKER %s IS SHUTDOWN--" % self.hostname)
         self._shutdown_called = True
         self._shutdown_called = True
 
 
@@ -115,6 +114,53 @@ class WorkerCase(unittest.TestCase):
     def assertWorkerAlive(self, timeout=1):
     def assertWorkerAlive(self, timeout=1):
         self.assertTrue(self.worker.is_alive)
         self.assertTrue(self.worker.is_alive)
 
 
-    def my_response(self, response):
-        return flatten_response(response)[self.worker.hostname]
+    def inspect(self, timeout=1):
+        return inspect(self.worker.hostname, timeout=timeout)
 
 
+    def my_response(self, response):
+        return flatten_reply(response)[self.worker.hostname]
+
+    def is_accepted(self, task_id, interval=0.5):
+        active = self.inspect(timeout=interval).active()
+        if active:
+            for task in active:
+                if task["id"] == task_id:
+                    return True
+        return False
+
+    def is_reserved(self, task_id, interval=0.5):
+        reserved = self.inspect(timeout=interval).reserved()
+        if reserved:
+            for task in reserved:
+                if task["id"] == task_id:
+                    return True
+        return False
+
+    def is_scheduled(self, task_id, interval=0.5):
+        schedule = self.inspect(timeout=interval).scheduled()
+        if schedule:
+            for item in schedule:
+                if item["request"]["id"] == task_id:
+                    return True
+        return False
+
+    def is_received(self, task_id, interval=0.5):
+        return (self.is_reserved(task_id, interval) or
+                self.is_scheduled(task_id, interval) or
+                self.is_accepted(task_id, interval))
+
+
+    def ensure_accepted(self, task_id, interval=0.5, timeout=10):
+        return try_while(lambda: self.is_accepted(task_id, interval),
+                         "Task not accepted within timeout",
+                         interval=0.5, timeout=10)
+
+    def ensure_received(self, task_id, interval=0.5, timeout=10):
+        return try_while(lambda: self.is_received(task_id, interval),
+                        "Task not receied within timeout",
+                        interval=0.5, timeout=10)
+
+    def ensure_scheduled(self, task_id, interval=0.5, timeout=10):
+        return try_while(lambda: self.is_scheduled(task_id, interval),
+                        "Task not scheduled within timeout",
+                        interval=0.5, timeout=10)

+ 0 - 32
celery/tests/functional/test_basic.py

@@ -1,32 +0,0 @@
-import operator
-import time
-
-from celery.task.control import broadcast
-
-from celery.tests.functional import tasks
-from celery.tests.functional.case import WorkerCase
-
-
-class test_basic(WorkerCase):
-
-    def test_started(self):
-        self.assertWorkerAlive()
-
-    def test_roundtrip_simple_task(self):
-        publisher = tasks.add.get_publisher()
-        results = [(tasks.add.apply_async(i, publisher=publisher), i)
-                        for i in zip(xrange(100), xrange(100))]
-        for result, i in results:
-            self.assertEqual(result.get(timeout=10), operator.add(*i))
-
-    def test_dump_active(self):
-        tasks.sleeptask.delay(2)
-        tasks.sleeptask.delay(2)
-        time.sleep(0.2)
-        r = broadcast("dump_active",
-                           arguments={"safe": True}, reply=True)
-        active = self.my_response(r)
-        self.assertEqual(len(active), 2)
-        self.assertEqual(active[0]["name"], tasks.sleeptask.name)
-        self.assertEqual(active[0]["args"], [2])
-

+ 12 - 5
celery/tests/test_worker_control.py

@@ -8,6 +8,7 @@ from celery.task.builtins import PingTask
 from celery.utils import gen_unique_id
 from celery.utils import gen_unique_id
 from celery.worker import control
 from celery.worker import control
 from celery.worker.buckets import FastQueue
 from celery.worker.buckets import FastQueue
+from celery.worker.job import TaskRequest
 from celery.worker.state import revoked
 from celery.worker.state import revoked
 from celery.worker.scheduler import Scheduler
 from celery.worker.scheduler import Scheduler
 
 
@@ -39,7 +40,10 @@ class Listener(object):
 
 
     def __init__(self):
     def __init__(self):
         self.ready_queue = FastQueue()
         self.ready_queue = FastQueue()
-        self.ready_queue.put("the quick brown fox")
+        self.ready_queue.put(TaskRequest(task_name=mytask.name,
+                                         task_id=gen_unique_id(),
+                                         args=(2, 2),
+                                         kwargs={}))
         self.eta_schedule = Scheduler(self.ready_queue)
         self.eta_schedule = Scheduler(self.ready_queue)
         self.event_dispatcher = Dispatcher()
         self.event_dispatcher = Dispatcher()
 
 
@@ -83,11 +87,14 @@ class test_ControlPanel(unittest.TestCase):
     def test_dump_reserved(self):
     def test_dump_reserved(self):
         listener = Listener()
         listener = Listener()
         panel = self.create_panel(listener=listener)
         panel = self.create_panel(listener=listener)
-        info = "\n".join(panel.execute("dump_reserved"))
-        self.assertIn("the quick brown fox", info)
+        response = panel.execute("dump_reserved", {"safe": True})
+        self.assertDictContainsSubset({"name": mytask.name,
+                                       "args": (2, 2),
+                                       "kwargs": {},
+                                       "hostname": socket.gethostname()},
+                                       response[0])
         listener.ready_queue = FastQueue()
         listener.ready_queue = FastQueue()
-        info = "\n".join(panel.execute("dump_reserved"))
-        self.assertFalse(info)
+        self.assertFalse(panel.execute("dump_reserved"))
 
 
     def test_rate_limit_when_disabled(self):
     def test_rate_limit_when_disabled(self):
         conf.DISABLE_RATE_LIMITS = True
         conf.DISABLE_RATE_LIMITS = True

+ 5 - 5
celery/tests/test_worker_job.py

@@ -365,7 +365,7 @@ class test_TaskRequest(unittest.TestCase):
         tid = gen_unique_id()
         tid = gen_unique_id()
         tw = TaskRequest(mytask.name, tid, [4], {"f": "x"})
         tw = TaskRequest(mytask.name, tid, [4], {"f": "x"})
         self.assertEqual(tw.execute(), 256)
         self.assertEqual(tw.execute(), 256)
-        meta = default_backend._get_task_meta_for(tid)
+        meta = default_backend.get_task_meta(tid)
         self.assertEqual(meta["result"], 256)
         self.assertEqual(meta["result"], 256)
         self.assertEqual(meta["status"], states.SUCCESS)
         self.assertEqual(meta["status"], states.SUCCESS)
 
 
@@ -373,7 +373,7 @@ class test_TaskRequest(unittest.TestCase):
         tid = gen_unique_id()
         tid = gen_unique_id()
         tw = TaskRequest(mytask_no_kwargs.name, tid, [4], {})
         tw = TaskRequest(mytask_no_kwargs.name, tid, [4], {})
         self.assertEqual(tw.execute(), 256)
         self.assertEqual(tw.execute(), 256)
-        meta = default_backend._get_task_meta_for(tid)
+        meta = default_backend.get_task_meta(tid)
         self.assertEqual(meta["result"], 256)
         self.assertEqual(meta["result"], 256)
         self.assertEqual(meta["status"], states.SUCCESS)
         self.assertEqual(meta["status"], states.SUCCESS)
 
 
@@ -381,7 +381,7 @@ class test_TaskRequest(unittest.TestCase):
         tid = gen_unique_id()
         tid = gen_unique_id()
         tw = TaskRequest(mytask_some_kwargs.name, tid, [4], {})
         tw = TaskRequest(mytask_some_kwargs.name, tid, [4], {})
         self.assertEqual(tw.execute(logfile="foobaz.log"), 256)
         self.assertEqual(tw.execute(logfile="foobaz.log"), 256)
-        meta = default_backend._get_task_meta_for(tid)
+        meta = default_backend.get_task_meta(tid)
         self.assertEqual(some_kwargs_scratchpad.get("logfile"), "foobaz.log")
         self.assertEqual(some_kwargs_scratchpad.get("logfile"), "foobaz.log")
         self.assertEqual(meta["result"], 256)
         self.assertEqual(meta["result"], 256)
         self.assertEqual(meta["status"], states.SUCCESS)
         self.assertEqual(meta["status"], states.SUCCESS)
@@ -391,7 +391,7 @@ class test_TaskRequest(unittest.TestCase):
         tw = TaskRequest(mytask.name, tid, [4], {"f": "x"},
         tw = TaskRequest(mytask.name, tid, [4], {"f": "x"},
                         on_ack=on_ack)
                         on_ack=on_ack)
         self.assertEqual(tw.execute(), 256)
         self.assertEqual(tw.execute(), 256)
-        meta = default_backend._get_task_meta_for(tid)
+        meta = default_backend.get_task_meta(tid)
         self.assertTrue(scratch["ACK"])
         self.assertTrue(scratch["ACK"])
         self.assertEqual(meta["result"], 256)
         self.assertEqual(meta["result"], 256)
         self.assertEqual(meta["status"], states.SUCCESS)
         self.assertEqual(meta["status"], states.SUCCESS)
@@ -400,7 +400,7 @@ class test_TaskRequest(unittest.TestCase):
         tid = gen_unique_id()
         tid = gen_unique_id()
         tw = TaskRequest(mytask_raising.name, tid, [4], {"f": "x"})
         tw = TaskRequest(mytask_raising.name, tid, [4], {"f": "x"})
         self.assertIsInstance(tw.execute(), ExceptionInfo)
         self.assertIsInstance(tw.execute(), ExceptionInfo)
-        meta = default_backend._get_task_meta_for(tid)
+        meta = default_backend.get_task_meta(tid)
         self.assertEqual(meta["status"], states.FAILURE)
         self.assertEqual(meta["status"], states.FAILURE)
         self.assertIsInstance(meta["result"], KeyError)
         self.assertIsInstance(meta["result"], KeyError)
 
 

+ 7 - 1
celery/worker/control/__init__.py

@@ -57,7 +57,13 @@ class ControlDispatch(object):
         except KeyError:
         except KeyError:
             self.logger.error("No such control command: %s" % command)
             self.logger.error("No such control command: %s" % command)
         else:
         else:
-            reply = control(self.panel, **kwdict(kwargs))
+            try:
+                reply = control(self.panel, **kwdict(kwargs))
+            except Exception, exc:
+                self.logger.error(
+                        "Error running control command %s kwargs=%s: %s" % (
+                            command, kwargs, exc))
+                reply = {"error": str(exc)}
             if reply_to:
             if reply_to:
                 self.reply({self.hostname: reply},
                 self.reply({self.hostname: reply},
                            exchange=reply_to["exchange"],
                            exchange=reply_to["exchange"],

+ 22 - 6
celery/worker/control/builtins.py

@@ -1,6 +1,7 @@
 from datetime import datetime
 from datetime import datetime
 
 
 from celery import conf
 from celery import conf
+from celery import log
 from celery.backends import default_backend
 from celery.backends import default_backend
 from celery.registry import tasks
 from celery.registry import tasks
 from celery.utils import timeutils
 from celery.utils import timeutils
@@ -48,6 +49,16 @@ def disable_events(panel):
     return {"ok": "events already disabled"}
     return {"ok": "events already disabled"}
 
 
 
 
+@Panel.register
+def set_loglevel(panel, loglevel=None):
+    if loglevel is not None:
+        if not isinstance(loglevel, int):
+            loglevel = conf.LOG_LEVELS[loglevel.upper()]
+        log.get_default_logger(loglevel=loglevel)
+    return {"ok": loglevel}
+
+
+
 @Panel.register
 @Panel.register
 def rate_limit(panel, task_name, rate_limit, **kwargs):
 def rate_limit(panel, task_name, rate_limit, **kwargs):
     """Set new rate limit for a task type.
     """Set new rate limit for a task type.
@@ -88,7 +99,7 @@ def rate_limit(panel, task_name, rate_limit, **kwargs):
 
 
 
 
 @Panel.register
 @Panel.register
-def dump_schedule(panel, **kwargs):
+def dump_schedule(panel, safe=False, **kwargs):
     schedule = panel.listener.eta_schedule
     schedule = panel.listener.eta_schedule
     if not schedule.queue:
     if not schedule.queue:
         panel.logger.info("--Empty schedule--")
         panel.logger.info("--Empty schedule--")
@@ -101,20 +112,25 @@ def dump_schedule(panel, **kwargs):
     info = map(formatitem, enumerate(schedule.info()))
     info = map(formatitem, enumerate(schedule.info()))
     panel.logger.info("* Dump of current schedule:\n%s" % (
     panel.logger.info("* Dump of current schedule:\n%s" % (
                             "\n".join(info, )))
                             "\n".join(info, )))
-    return info
+    scheduled_tasks = []
+    for item in schedule.info():
+        scheduled_tasks.append({"eta": item["eta"],
+                                "priority": item["priority"],
+                                "request": item["item"].info(safe=safe)})
+    return scheduled_tasks
 
 
 
 
 @Panel.register
 @Panel.register
-def dump_reserved(panel, **kwargs):
+def dump_reserved(panel, safe=False, **kwargs):
     ready_queue = panel.listener.ready_queue
     ready_queue = panel.listener.ready_queue
     reserved = ready_queue.items
     reserved = ready_queue.items
     if not reserved:
     if not reserved:
         panel.logger.info("--Empty queue--")
         panel.logger.info("--Empty queue--")
         return []
         return []
-    info = map(repr, reserved)
     panel.logger.info("* Dump of currently reserved tasks:\n%s" % (
     panel.logger.info("* Dump of currently reserved tasks:\n%s" % (
-                            "\n".join(info, )))
-    return info
+                            "\n".join(map(repr, reserved), )))
+    return [request.info(safe=safe)
+            for request in reserved]
 
 
 
 
 @Panel.register
 @Panel.register

+ 58 - 24
docs/userguide/workers.rst

@@ -264,16 +264,33 @@ then import them using the ``CELERY_IMPORTS`` setting::
 
 
     CELERY_IMPORTS = ("myapp.worker.control", )
     CELERY_IMPORTS = ("myapp.worker.control", )
 
 
-Debugging
-=========
+Inspecting workers
+==================
+
+:class:`celery.task.control.inspect` lets you inspect running workers. It uses
+remote control commands under the hood.
+
+.. code-block:: python
+
+    >>> from celery.task.control import inspect
+
+    # Inspect all nodes.
+    >>> i = inspect()
+
+    # Specify multiple nodes to inspect.
+    >>> i = inspect(["worker1.example.com", "worker2.example.com"])
+
+    # Specify a single node to inspect.
+    >>> i = inspect("worker1.example.com")
+
 
 
 Dump of registered tasks
 Dump of registered tasks
 ------------------------
 ------------------------
 
 
 You can get a list of tasks registered in the worker using the
 You can get a list of tasks registered in the worker using the
-``dump_tasks`` remote control command::
+:meth:`~celery.task.control.inspect.registered_tasks`::
 
 
-    >>> broadcast("dump_tasks", reply=True)
+    >>> i.registered_tasks()
     [{'worker1.example.com': ['celery.delete_expired_task_meta',
     [{'worker1.example.com': ['celery.delete_expired_task_meta',
                               'celery.execute_remote',
                               'celery.execute_remote',
                               'celery.map_async',
                               'celery.map_async',
@@ -282,38 +299,55 @@ You can get a list of tasks registered in the worker using the
                               'tasks.add',
                               'tasks.add',
                               'tasks.sleeptask']}]
                               'tasks.sleeptask']}]
 
 
+Dump of currently executing tasks
+---------------------------------
+
+You can get a list of active tasks using
+:meth:`~celery.task.control.inspect.active`::
+
+    >>> i.active()
+    [{'worker1.example.com':
+        [{"name": "tasks.sleeptask",
+          "id": "32666e9b-809c-41fa-8e93-5ae0c80afbbf",
+          "args": "(8,)",
+          "kwargs": "{}"}]}]
+
+
 Dump of scheduled (ETA) tasks
 Dump of scheduled (ETA) tasks
 -----------------------------
 -----------------------------
 
 
 You can get a list of tasks waiting to be scheduled by using
 You can get a list of tasks waiting to be scheduled by using
-the ``dump_schedule`` remote control command.
+:meth:`~celery.task.control.inspect.scheduled`::
 
 
-    >>> broadcast("dump_schedule", reply=True)
+    >>> i.scheduled()
     [{'worker1.example.com':
     [{'worker1.example.com':
-        ['0. 2010-06-07 09:07:52 pri0 <TaskRequest: {
-            name:"tasks.sleeptask",
-            id:"1a7980ea-8b19-413e-91d2-0b74f3844c4d",
-            args:"[1]", kwargs:"{}"}>',
-        '1. 2010-06-07 09:07:53 pri0 <TaskRequest: {
-            name:"tasks.sleeptask",
-            id:"49661b9a-aa22-4120-94b7-9ee8031d219d",
-            args:"[2]",
-            kwargs:"{}"}>',
-
-The outputted fields are (in order): position, eta, priority, request.
+        [{"eta": "2010-06-07 09:07:52", "priority": 0,
+          "request": {
+            "name": "tasks.sleeptask",
+            "id": "1a7980ea-8b19-413e-91d2-0b74f3844c4d",
+            "args": "[1]",
+            "kwargs": "{}"}},
+         {"eta": "2010-06-07 09:07:53", "priority": 0,
+          "request": {
+            "name": "tasks.sleeptask",
+            "id": "49661b9a-aa22-4120-94b7-9ee8031d219d",
+            "args": "[2]",
+            "kwargs": "{}"}}]}]
 
 
 Note that these are tasks with an eta/countdown argument, not periodic tasks.
 Note that these are tasks with an eta/countdown argument, not periodic tasks.
 
 
 Dump of reserved tasks
 Dump of reserved tasks
 ----------------------
 ----------------------
 
 
-Reserved tasks are tasks that has been received by the broker and is waiting
-for immediate execution.
+Reserved tasks are tasks that has been received, but is still waiting to be
+executed.
 
 
-You can get a list of these using the ``dump_reserved`` remote control command.
+You can get a list of these using
+:meth:`~celery.task.control.inspect.reserved`::
 
 
-    >>> broadcast("dump_reserved", reply=True)
+    >>> i.reserved()
     [{'worker1.example.com':
     [{'worker1.example.com':
-        ['<TaskRequest: {name:"tasks.sleeptask",
-                         id:"32666e9b-809c-41fa-8e93-5ae0c80afbbf",
-                         args:"(8,)", kwargs:"{}"}>']}]
+        [{"name": "tasks.sleeptask",
+          "id": "32666e9b-809c-41fa-8e93-5ae0c80afbbf",
+          "args": "(8,)",
+          "kwargs": "{}"}]}]

+ 7 - 0
funtests/__init__.py

@@ -0,0 +1,7 @@
+import os
+
+config = os.environ.setdefault("CELERY_FUNTEST_CONFIG_MODULE",
+                               "celery.tests.functional.config")
+
+os.environ["CELERY_CONFIG_MODULE"] = config
+os.environ["CELERY_LOADER"] = "default"

+ 0 - 0
celery/tests/functional/config.py → funtests/config.py


+ 0 - 0
celery/tests/functional/test.cfg → funtests/test.cfg


+ 50 - 0
funtests/test_basic.py

@@ -0,0 +1,50 @@
+import operator
+import time
+
+from celery.tests.functional import tasks
+from celery.tests.functional.case import WorkerCase
+
+from celery.task.control import broadcast
+
+class test_basic(WorkerCase):
+
+    def test_started(self):
+        self.assertWorkerAlive()
+
+    def test_roundtrip_simple_task(self):
+        publisher = tasks.add.get_publisher()
+        results = [(tasks.add.apply_async(i, publisher=publisher), i)
+                        for i in zip(xrange(100), xrange(100))]
+        for result, i in results:
+            self.assertEqual(result.get(timeout=10), operator.add(*i))
+
+    def test_dump_active(self, sleep=1):
+        r1 = tasks.sleeptask.delay(sleep)
+        r2 = tasks.sleeptask.delay(sleep)
+        self.ensure_accepted(r1.task_id)
+        active = self.inspect().active(safe=True)
+        self.assertEqual(len(active), 2)
+        self.assertEqual(active[0]["name"], tasks.sleeptask.name)
+        self.assertEqual(active[0]["args"], [sleep])
+
+    def test_dump_reserved(self, sleep=1):
+        r1 = tasks.sleeptask.delay(sleep)
+        r2 = tasks.sleeptask.delay(sleep)
+        r3 = tasks.sleeptask.delay(sleep)
+        r4 = tasks.sleeptask.delay(sleep)
+        self.ensure_accepted(r1.task_id)
+        reserved = self.inspect().reserved(safe=True)
+        self.assertTrue(reserved)
+        self.assertEqual(reserved[0]["name"], tasks.sleeptask.name)
+        self.assertEqual(reserved[0]["args"], [sleep])
+
+    def test_dump_schedule(self, countdown=1):
+        r1 = tasks.add.apply_async((2, 2), countdown=countdown)
+        r2 = tasks.add.apply_async((2, 2), countdown=countdown)
+        self.ensure_scheduled(r1.task_id, interval=0.1)
+        schedule = self.inspect().scheduled(safe=True)
+        self.assertTrue(schedule)
+        self.assertTrue(len(schedule), 2)
+        self.assertEqual(schedule[0]["request"]["name"], tasks.add.name)
+        self.assertEqual(schedule[0]["request"]["args"], [2, 2])
+