Browse Source

Merge branch '3.0'

Conflicts:
	celery/backends/amqp.py
	celery/bin/base.py
	celery/tests/tasks/test_http.py
	celery/tests/utils.py
	docs/history/changelog-2.1.rst
	docs/userguide/monitoring.rst
Ask Solem 12 years ago
parent
commit
9359f8c302

+ 9 - 6
celery/backends/amqp.py

@@ -161,19 +161,22 @@ class AMQPBackend(BaseBackend):
         with self.app.pool.acquire_channel(block=True) as (_, channel):
             binding = self._create_binding(task_id)(channel)
             binding.declare()
-            latest, acc = None, None
-            for i in range(backlog_limit):
-                latest, acc = acc, binding.get(no_ack=True)
+
+            prev = latest = acc = None
+            for i in range(backlog_limit):  ## spool ffwd
+                prev, latest, acc = latest, acc, binding.get(no_ack=False)
                 if not acc:  # no more messages
                     break
+                if prev:
+                    # backends are not expected to keep history,
+                    # so we delete everything except the most recent state.
+                    prev.ack()
             else:
                 raise self.BacklogLimitExceeded(task_id)
 
             if latest:
-                # new state to report
-                self._republish(channel, task_id, latest.body,
-                                latest.content_type, latest.content_encoding)
                 payload = self._cache[task_id] = latest.payload
+                latest.requeue()
                 return payload
             else:
                 # no new state, use previous

+ 1 - 1
celery/platforms.py

@@ -87,7 +87,7 @@ def _find_option_with_arg(argv, short_opts=None, long_opts=None):
 
 
 def maybe_patch_concurrency(argv, short_opts=None, long_opts=None):
-    """With short and long opt alternatives that specify the command-line
+    """With short and long opt alternatives that specify the command line
     option to set the pool, this makes sure that anything that needs
     to be patched is completed as early as possible.
     (e.g. eventlet/gevent monkey patches)."""

+ 10 - 6
celery/tests/backends/test_cache.py

@@ -135,9 +135,11 @@ class MockCacheMixin(object):
         memcache.Client = MemcachedClient
         memcache.Client.__module__ = memcache.__name__
         prev, sys.modules['memcache'] = sys.modules.get('memcache'), memcache
-        yield True
-        if prev is not None:
-            sys.modules['memcache'] = prev
+        try:
+            yield True
+        finally:
+            if prev is not None:
+                sys.modules['memcache'] = prev
 
     @contextmanager
     def mock_pylibmc(self):
@@ -146,9 +148,11 @@ class MockCacheMixin(object):
         pylibmc.Client.__module__ = pylibmc.__name__
         prev = sys.modules.get('pylibmc')
         sys.modules['pylibmc'] = pylibmc
-        yield True
-        if prev is not None:
-            sys.modules['pylibmc'] = prev
+        try:
+            yield True
+        finally:
+            if prev is not None:
+                sys.modules['pylibmc'] = prev
 
 
 class test_get_best_memcache(Case, MockCacheMixin):

+ 4 - 2
celery/tests/tasks/test_chord.py

@@ -43,8 +43,10 @@ def patch_unlock_retry():
     unlock = current_app.tasks['celery.chord_unlock']
     retry = Mock()
     prev, unlock.retry = unlock.retry, retry
-    yield unlock, retry
-    unlock.retry = prev
+    try:
+        yield unlock, retry
+    finally:
+        unlock.retry = prev
 
 
 class test_unlock_chord_task(AppCase):

+ 4 - 3
celery/tests/tasks/test_http.py

@@ -28,9 +28,10 @@ def mock_urlopen(response_method):
 
     http.urlopen = _mocked
 
-    yield True
-
-    http.urlopen = urlopen
+    try:
+        yield True
+    finally:
+        http.urlopen = urlopen
 
 
 def _response(res):

+ 52 - 37
celery/tests/utils.py

@@ -222,9 +222,10 @@ def wrap_logger(logger, loglevel=logging.ERROR):
     siohandler = logging.StreamHandler(sio)
     logger.handlers = [siohandler]
 
-    yield sio
-
-    logger.handlers = old_handlers
+    try:
+        yield sio
+    finally:
+        logger.handlers = old_handlers
 
 
 @contextmanager
@@ -233,10 +234,10 @@ def eager_tasks():
 
     prev = app.conf.CELERY_ALWAYS_EAGER
     app.conf.CELERY_ALWAYS_EAGER = True
-
-    yield True
-
-    app.conf.CELERY_ALWAYS_EAGER = prev
+    try:
+        yield True
+    finally:
+        app.conf.CELERY_ALWAYS_EAGER = prev
 
 
 def with_eager_tasks(fun):
@@ -374,8 +375,10 @@ def mask_modules(*modnames):
             return realimport(name, *args, **kwargs)
 
     builtins.__import__ = myimp
-    yield True
-    builtins.__import__ = realimport
+    try:
+        yield True
+    finally:
+        builtins.__import__ = realimport
 
 
 @contextmanager
@@ -386,10 +389,11 @@ def override_stdouts():
     sys.stdout = sys.__stdout__ = mystdout
     sys.stderr = sys.__stderr__ = mystderr
 
-    yield mystdout, mystderr
-
-    sys.stdout = sys.__stdout__ = prev_out
-    sys.stderr = sys.__stderr__ = prev_err
+    try:
+        yield mystdout, mystderr
+    finally:
+        sys.stdout = sys.__stdout__ = prev_out
+        sys.stderr = sys.__stderr__ = prev_err
 
 
 def patch(module, name, mocked):
@@ -420,14 +424,16 @@ def replace_module_value(module, name, value=None):
             delattr(module, name)
         except AttributeError:
             pass
-    yield
-    if prev is not None:
-        setattr(sys, name, prev)
-    if not has_prev:
-        try:
-            delattr(module, name)
-        except AttributeError:
-            pass
+    try:
+        yield
+    finally:
+        if prev is not None:
+            setattr(sys, name, prev)
+        if not has_prev:
+            try:
+                delattr(module, name)
+            except AttributeError:
+                pass
 pypy_version = partial(
     replace_module_value, sys, 'pypy_version_info',
 )
@@ -439,15 +445,19 @@ platform_pyimp = partial(
 @contextmanager
 def sys_platform(value):
     prev, sys.platform = sys.platform, value
-    yield
-    sys.platform = prev
+    try:
+        yield
+    finally:
+        sys.platform = prev
 
 
 @contextmanager
 def reset_modules(*modules):
     prev = dict((k, sys.modules.pop(k)) for k in modules if k in sys.modules)
-    yield
-    sys.modules.update(prev)
+    try:
+        yield
+    finally:
+        sys.modules.update(prev)
 
 
 @contextmanager
@@ -455,12 +465,14 @@ def patch_modules(*modules):
     prev = {}
     for mod in modules:
         prev[mod], sys.modules[mod] = sys.modules[mod], ModuleType(mod)
-    yield
-    for name, mod in items(prev):
-        if mod is None:
-            sys.modules.pop(name, None)
-        else:
-            sys.modules[name] = mod
+    try:
+        yield
+    finally:
+        for name, mod in items(prev):
+            if mod is None:
+                sys.modules.pop(name, None)
+            else:
+                sys.modules[name] = mod
 
 
 @contextmanager
@@ -505,8 +517,10 @@ def mock_context(mock, typ=Mock):
             reraise(x[0], x[1], x[2])
     context.__exit__.side_effect = on_exit
     context.__enter__.return_value = context
-    yield context
-    context.reset()
+    try:
+        yield context
+    finally:
+        context.reset()
 
 
 @contextmanager
@@ -537,10 +551,11 @@ def patch_settings(app=None, **config):
             pass
         setattr(app.conf, key, value)
 
-    yield app.conf
-
-    for key, value in items(prev):
-        setattr(app.conf, key, value)
+    try:
+        yield app.conf
+    finally:
+        for key, value in items(prev):
+            setattr(app.conf, key, value)
 
 
 @contextmanager

+ 2 - 2
docs/history/changelog-2.1.rst

@@ -390,7 +390,7 @@ News
 * :func:`~celery.task.control.broadcast`: Added callback argument, this can be
   used to process replies immediately as they arrive.
 
-* celeryctl: New command-line utility to manage and inspect worker nodes,
+* celeryctl: New command line utility to manage and inspect worker nodes,
   apply tasks and inspect the results of tasks.
 
     .. seealso::
@@ -472,7 +472,7 @@ News
             stdouts = logging.getLogger("mystdoutslogger")
             log.redirect_stdouts_to_logger(stdouts, loglevel=logging.WARNING)
 
-* worker: Added command-line option :option:`-I`/:option:`--include`:
+* worker Added command line option :option:`-I`/:option:`--include`:
 
     A comma separated list of (task) modules to be imported.
 

+ 1 - 1
docs/history/changelog-2.3.rst

@@ -360,7 +360,7 @@ Fixes
 * ``CELERY_TASK_ERROR_WHITE_LIST`` is now properly initialized
   in all loaders.
 
-* celeryd_detach now passes through command-line configuration.
+* celeryd_detach now passes through command line configuration.
 
 * Remote control command ``add_consumer`` now does nothing if the
   queue is already being consumed from.

+ 0 - 2
docs/userguide/monitoring.rst

@@ -22,11 +22,9 @@ Workers
 
 .. _monitoring-control:
 
-
 Management Command-line Utilities (``inspect``/``control``)
 -----------------------------------------------------------
 
-.. versionadded:: 2.1
 
 :program:`celery` can also be used to inspect
 and manage worker nodes (and to some degree tasks).

+ 1 - 1
docs/userguide/routing.rst

@@ -360,7 +360,7 @@ Hands-on with the API
 ---------------------
 
 Celery comes with a tool called :program:`celery amqp`
-that is used for command-line access to the AMQP API, enabling access to
+that is used for command line access to the AMQP API, enabling access to
 administration tasks like creating/deleting queues and exchanges, purging
 queues or sending messages.  It can also be used for non-AMQP brokers,
 but different implementation may not implement all commands.

+ 2 - 2
docs/whatsnew-3.0.rst

@@ -759,7 +759,7 @@ In Other News
         >>> import celery
         >>> print(celery.bugreport())
 
-    - Using the ``celery`` command-line program:
+    - Using the ``celery`` command line program:
 
         .. code-block:: bash
 
@@ -910,7 +910,7 @@ Internals
 
 - Renamed module ``celery.abstract`` -> :mod:`celery.worker.bootsteps`.
 
-- Command-line docs are now parsed from the module docstrings.
+- Command line docs are now parsed from the module docstrings.
 
 - Test suite directory has been reorganized.