Browse Source

Tests passing

Ask Solem 13 years ago
parent
commit
184bda8987

+ 4 - 1
celery/apps/worker.py

@@ -198,7 +198,10 @@ class Worker(configurated):
             appr += " (%s)" % loader
         if self.autoscale:
             concurrency = "{min=%s, max=%s}" % tuple(self.autoscale)
-        concurrency += " (%s)" % self.pool_cls.__module__.split('.')[-1]
+        pool = self.pool_cls
+        if not isinstance(pool, basestring):
+            pool = pool.__module__
+        concurrency += " (%s)" % pool.split('.')[-1]
         events = "ON"
         if not self.send_events:
             events = "OFF (enable -E to monitor this worker)"

+ 5 - 4
celery/backends/amqp.py

@@ -94,9 +94,7 @@ class AMQPBackend(BaseDictBackend):
     def revive(self, channel):
         pass
 
-    def _store_result(self, task_id, result, status, traceback=None,
-            max_retries=20, interval_start=0, interval_step=1,
-            interval_max=1):
+    def _store_result(self, task_id, result, status, traceback=None):
         """Send task return value and status."""
         with self.mutex:
             with self.app.amqp.producer_pool.acquire(block=True) as pub:
@@ -157,6 +155,7 @@ class AMQPBackend(BaseDictBackend):
                 except KeyError:
                     # result probably pending.
                     return {"status": states.PENDING, "result": None}
+    poll = get_task_meta  # XXX compat
 
     def drain_events(self, connection, consumer, timeout=None, now=time.time):
         wait = connection.drain_events
@@ -189,6 +188,7 @@ class AMQPBackend(BaseDictBackend):
     def get_many(self, task_ids, timeout=None, **kwargs):
         with self.app.pool.acquire_channel(block=True) as (conn, channel):
             ids = set(task_ids)
+            cached_ids = set()
             for task_id in ids:
                 try:
                     cached = self._cache[task_id]
@@ -197,7 +197,8 @@ class AMQPBackend(BaseDictBackend):
                 else:
                     if cached["status"] in states.READY_STATES:
                         yield task_id, cached
-                        ids.discard(task_id)
+                        cached_ids.add(task_id)
+            ids ^= cached_ids
 
             bindings = [self._create_binding(task_id) for task_id in task_ids]
             with self.Consumer(channel, bindings, no_ack=True) as consumer:

+ 2 - 0
celery/bin/celery.py

@@ -64,6 +64,7 @@ class Command(BaseCommand):
         self.colored = term.colored(enabled=not no_color)
         self.stdout = stdout
         self.stderr = stderr
+        self.quiet = False
 
     def __call__(self, *args, **kwargs):
         try:
@@ -298,6 +299,7 @@ class rate_limit(Command):
             self.say_remote_command_reply(reply)
 rate_limit = command(rate_limit)
 
+
 class inspect(Command):
     choices = {"active": 1.0,
                "active_queues": 1.0,

+ 1 - 0
celery/concurrency/processes/__init__.py

@@ -33,6 +33,7 @@ WORKER_SIGIGNORE = frozenset(["SIGINT"])
 
 def process_initializer(app, hostname):
     """Initializes the process so it can be used to process tasks."""
+    app.set_current()
     set_default_app(app)
     trace._tasks = app._tasks  # make sure this optimization is set.
     platforms.signals.reset(*WORKER_SIGRESET)

+ 3 - 5
celery/tests/app/test_app.py

@@ -203,15 +203,13 @@ class test_App(Case):
                                        "userid": "guest",
                                        "password": "guest",
                                        "virtual_host": "/"},
-                                      self.app.broker_connection(
-                                          transport="amqplib").info())
+                            self.app.broker_connection("amqplib://").info())
         self.app.conf.BROKER_PORT = 1978
         self.app.conf.BROKER_VHOST = "foo"
         self.assertDictContainsSubset({"port": 1978,
                                        "virtual_host": "foo"},
-                                      self.app.broker_connection(
-                                          transport="amqplib").info())
-        conn = self.app.broker_connection(virtual_host="/value")
+                    self.app.broker_connection("amqplib://:1978/foo").info())
+        conn = self.app.broker_connection("amqplib:////value")
         self.assertDictContainsSubset({"virtual_host": "/value"},
                                       conn.info())
 

+ 7 - 3
celery/tests/app/test_builtins.py

@@ -63,9 +63,13 @@ class test_group(Case):
     def test_apply_async_with_parent(self):
         _task_stack.push(add)
         try:
-            x = group([add.s(4, 4), add.s(8, 8)])
-            x.apply_async()
-            self.assertTrue(add.request.children)
+            add.push_request(called_directly=False)
+            try:
+                x = group([add.s(4, 4), add.s(8, 8)])
+                x.apply_async()
+                self.assertTrue(add.request.children)
+            finally:
+                add.pop_request()
         finally:
             _task_stack.pop()
 

+ 1 - 0
celery/tests/app/test_loaders.py

@@ -183,6 +183,7 @@ class test_DefaultLoader(Case):
 
     @patch("celery.loaders.default.find_module")
     def test_read_configuration_importerror(self, find_module):
+        default.C_WNOCONF = True
         find_module.side_effect = ImportError()
         l = default.Loader()
         with self.assertWarnsRegex(NotConfigured, r'make sure it exists'):

+ 27 - 27
celery/tests/backends/test_amqp.py

@@ -14,7 +14,7 @@ from celery.datastructures import ExceptionInfo
 from celery.exceptions import TimeoutError
 from celery.utils import uuid
 
-from celery.tests.utils import Case, sleepdeprived
+from celery.tests.utils import AppCase, sleepdeprived
 
 
 class SomeClass(object):
@@ -23,7 +23,7 @@ class SomeClass(object):
         self.data = data
 
 
-class test_AMQPBackend(Case):
+class test_AMQPBackend(AppCase):
 
     def create_backend(self, **opts):
         opts = dict(dict(serializer="pickle", persistent=False), **opts)
@@ -101,35 +101,35 @@ class test_AMQPBackend(Case):
 
     @sleepdeprived()
     def test_store_result_retries(self):
+        iterations = [0]
+        stop_raising_at = [5]
 
-        class _Producer(object):
-            iterations = 0
-            stop_raising_at = 5
-
-            def __init__(self, *args, **kwargs):
-                pass
-
-            def publish(self, msg, *args, **kwargs):
-                if self.iterations > self.stop_raising_at:
-                    return
-                raise KeyError("foo")
-
-        class Backend(AMQPBackend):
-            Producer = _Producer
+        def publish(*args, **kwargs):
+            if iterations[0] > stop_raising_at[0]:
+                return
+            iterations[0] += 1
+            raise KeyError("foo")
 
-        backend = Backend()
-        with self.assertRaises(KeyError):
-            backend.store_result("foo", "bar", "STARTED", max_retries=None)
+        backend = AMQPBackend()
+        from celery.app.amqp import TaskProducer
+        prod, TaskProducer.publish = TaskProducer.publish, publish
+        try:
+            with self.assertRaises(KeyError):
+                backend.retry_policy["max_retries"] = None
+                backend.store_result("foo", "bar", "STARTED")
 
-        with self.assertRaises(KeyError):
-            backend.store_result("foo", "bar", "STARTED", max_retries=10)
+            with self.assertRaises(KeyError):
+                backend.retry_policy["max_retries"] = 10
+                backend.store_result("foo", "bar", "STARTED")
+        finally:
+            TaskProducer.publish = prod
 
     def assertState(self, retval, state):
         self.assertEqual(retval["status"], state)
 
     def test_poll_no_messages(self):
         b = self.create_backend()
-        self.assertState(b.poll(uuid()), states.PENDING)
+        self.assertState(b.get_task_meta(uuid()), states.PENDING)
 
     def test_poll_result(self):
 
@@ -167,7 +167,7 @@ class test_AMQPBackend(Case):
         results.put(Message(status=states.RECEIVED, seq=1))
         results.put(Message(status=states.STARTED, seq=2))
         results.put(Message(status=states.FAILURE, seq=3))
-        r1 = backend.poll(uuid())
+        r1 = backend.get_task_meta(uuid())
         self.assertDictContainsSubset({"status": states.FAILURE,
                                        "seq": 3}, r1,
                                        "FFWDs to the last state")
@@ -175,14 +175,14 @@ class test_AMQPBackend(Case):
         # Caches last known state.
         results.put(Message())
         tid = uuid()
-        backend.poll(tid)
+        backend.get_task_meta(tid)
         self.assertIn(tid, backend._cache, "Caches last known state")
 
         # Returns cache if no new states.
         results.queue.clear()
         assert not results.qsize()
         backend._cache[tid] = "hello"
-        self.assertEqual(backend.poll(tid), "hello",
+        self.assertEqual(backend.get_task_meta(tid), "hello",
                          "Returns cache if no new states")
 
     def test_wait_for(self):
@@ -217,7 +217,7 @@ class test_AMQPBackend(Case):
         b = self.create_backend()
         with current_app.pool.acquire_channel(block=False) as (_, channel):
             binding = b._create_binding(uuid())
-            consumer = b._create_consumer(binding, channel)
+            consumer = b.Consumer(channel, binding, no_ack=True)
             with self.assertRaises(socket.timeout):
                 b.drain_events(Connection(), consumer, timeout=0.1)
 
@@ -249,7 +249,7 @@ class test_AMQPBackend(Case):
 
         class Backend(AMQPBackend):
 
-            def _create_consumer(self, *args, **kwargs):
+            def Consumer(*args, **kwargs):
                 raise KeyError("foo")
 
         b = Backend()

+ 13 - 8
celery/tests/bin/test_base.py

@@ -105,14 +105,19 @@ class test_Command(AppCase):
 
     def test_with_cmdline_config(self):
         cmd = MockCommand()
-        cmd.enable_config_from_cmdline = True
-        cmd.namespace = "celeryd"
-        rest = cmd.setup_app_from_commandline(argv=[
-            "--loglevel=INFO", "--", "broker.host=broker.example.com",
-            ".prefetch_multiplier=100"])
-        self.assertEqual(cmd.app.conf.BROKER_HOST, "broker.example.com")
-        self.assertEqual(cmd.app.conf.CELERYD_PREFETCH_MULTIPLIER, 100)
-        self.assertListEqual(rest, ["--loglevel=INFO"])
+        try:
+            cmd.enable_config_from_cmdline = True
+            cmd.namespace = "celeryd"
+            rest = cmd.setup_app_from_commandline(argv=[
+                "--loglevel=INFO", "--",
+                "broker.url=amqp://broker.example.com",
+                ".prefetch_multiplier=100"])
+            self.assertEqual(cmd.app.conf.BROKER_URL,
+                             "amqp://broker.example.com")
+            self.assertEqual(cmd.app.conf.CELERYD_PREFETCH_MULTIPLIER, 100)
+            self.assertListEqual(rest, ["--loglevel=INFO"])
+        finally:
+            cmd.app.conf.BROKER_URL = "memory://"
 
     def test_parse_preload_options_shortopt(self):
         cmd = Command()

+ 3 - 0
celery/tests/config.py

@@ -6,6 +6,9 @@ from kombu import Queue
 
 BROKER_URL = "memory://"
 
+#: warn if config module not found
+os.environ["C_WNOCONF"] = "yes"
+
 #: Don't want log output when running suite.
 CELERYD_HIJACK_ROOT_LOGGER = False
 

+ 3 - 3
celery/tests/worker/test_control.py

@@ -409,7 +409,7 @@ class test_ControlPanel(Case):
 
     def test_pool_restart(self):
         consumer = Consumer()
-        consumer.controller = _WC()
+        consumer.controller = _WC(app=current_app)
         consumer.controller.pool.restart = Mock()
         panel = self.create_panel(consumer=consumer)
         panel.app = self.app
@@ -423,7 +423,7 @@ class test_ControlPanel(Case):
 
     def test_pool_restart_import_modules(self):
         consumer = Consumer()
-        consumer.controller = _WC()
+        consumer.controller = _WC(app=current_app)
         consumer.controller.pool.restart = Mock()
         panel = self.create_panel(consumer=consumer)
         panel.app = self.app
@@ -440,7 +440,7 @@ class test_ControlPanel(Case):
 
     def test_pool_restart_relaod_modules(self):
         consumer = Consumer()
-        consumer.controller = _WC()
+        consumer.controller = _WC(app=current_app)
         consumer.controller.pool.restart = Mock()
         panel = self.create_panel(consumer=consumer)
         panel.app = self.app

+ 1 - 1
celery/tests/worker/test_request.py

@@ -716,7 +716,7 @@ class test_TaskRequest(Case):
         p = MockPool()
         tw.execute_using_pool(p)
         self.assertTrue(p.target)
-        self.assertEqual(p.args[0], mytask)
+        self.assertEqual(p.args[0], mytask.name)
         self.assertEqual(p.args[1], tid)
         self.assertEqual(p.args[2], [4])
         self.assertIn("f", p.args[3])

+ 1 - 1
celery/tests/worker/test_worker.py

@@ -755,7 +755,7 @@ class test_WorkController(AppCase):
         worker.logger = self._logger
 
     def create_worker(self, **kw):
-        worker = WorkController(concurrency=1, loglevel=0, **kw)
+        worker = self.app.WorkController(concurrency=1, loglevel=0, **kw)
         worker._shutdown_complete.set()
         return worker
 

+ 1 - 2
celery/worker/__init__.py

@@ -298,7 +298,6 @@ class WorkController(configurated):
     def __init__(self, loglevel=None, hostname=None, ready_callback=noop,
             queues=None, app=None, pidfile=None, **kwargs):
         self.app = app_or_default(app or self.app)
-
         # all new threads start without a current app, so if an app is not
         # passed on to the thread it will fall back to the "default app",
         # which then could be the wrong app.  So for the worker
@@ -306,7 +305,7 @@ class WorkController(configurated):
         # and means that only a single app can be used for workers
         # running in the same process.
         set_default_app(self.app)
-        app.finalize()
+        self.app.finalize()
         trace._tasks = self.app._tasks
 
         self._shutdown_complete = Event()