Browse Source

Merge branch '3.0'

Conflicts:
	Changelog
	celery/events/state.py
	celery/worker/__init__.py
	celery/worker/hub.py
Ask Solem 12 years ago
parent
commit
54cc05cabb

+ 3 - 1
celery/concurrency/base.py

@@ -57,11 +57,12 @@ class BasePool(object):
     uses_semaphore = False
     uses_semaphore = False
 
 
     def __init__(self, limit=None, putlocks=True,
     def __init__(self, limit=None, putlocks=True,
-                 forking_enable=True, **options):
+                 forking_enable=True, callbacks_propagate=(), **options):
         self.limit = limit
         self.limit = limit
         self.putlocks = putlocks
         self.putlocks = putlocks
         self.options = options
         self.options = options
         self.forking_enable = forking_enable
         self.forking_enable = forking_enable
+        self.callbacks_propagate = callbacks_propagate
         self._does_debug = logger.isEnabledFor(logging.DEBUG)
         self._does_debug = logger.isEnabledFor(logging.DEBUG)
 
 
     def on_start(self):
     def on_start(self):
@@ -134,6 +135,7 @@ class BasePool(object):
 
 
         return self.on_apply(target, args, kwargs,
         return self.on_apply(target, args, kwargs,
                              waitforslot=self.putlocks,
                              waitforslot=self.putlocks,
+                             callbacks_propagate=self.callbacks_propagate,
                              **options)
                              **options)
 
 
     def _get_info(self):
     def _get_info(self):

+ 0 - 21
celery/tests/concurrency/test_processes.py

@@ -12,7 +12,6 @@ from celery.utils.functional import noop
 from celery.tests.utils import Case
 from celery.tests.utils import Case
 try:
 try:
     from celery.concurrency import processes as mp
     from celery.concurrency import processes as mp
-    from billiard.pool import safe_apply_callback
 except ImportError:
 except ImportError:
 
 
     class _mp(object):
     class _mp(object):
@@ -33,7 +32,6 @@ except ImportError:
             def apply_async(self, *args, **kwargs):
             def apply_async(self, *args, **kwargs):
                 pass
                 pass
     mp = _mp()  # noqa
     mp = _mp()  # noqa
-    safe_apply_callback = None  # noqa
 
 
 
 
 class Object(object):   # for writeable attributes.
 class Object(object):   # for writeable attributes.
@@ -135,25 +133,6 @@ class test_TaskPool(Case):
         pool.terminate()
         pool.terminate()
         self.assertTrue(_pool.terminated)
         self.assertTrue(_pool.terminated)
 
 
-    def test_safe_apply_callback(self):
-        if safe_apply_callback is None:
-            raise SkipTest('multiprocessig not supported')
-        _good_called = [0]
-        _evil_called = [0]
-
-        def good(x):
-            _good_called[0] = 1
-            return x
-
-        def evil(x):
-            _evil_called[0] = 1
-            raise KeyError(x)
-
-        self.assertIsNone(safe_apply_callback(good, 10))
-        self.assertIsNone(safe_apply_callback(evil, 10))
-        self.assertTrue(_good_called[0])
-        self.assertTrue(_evil_called[0])
-
     def test_apply_async(self):
     def test_apply_async(self):
         pool = TaskPool(10)
         pool = TaskPool(10)
         pool.start()
         pool.start()

+ 2 - 0
celery/tests/worker/test_worker.py

@@ -1133,6 +1133,7 @@ class test_WorkController(AppCase):
 
 
     def test_Pool_crate_threaded(self):
     def test_Pool_crate_threaded(self):
         w = Mock()
         w = Mock()
+        w._conninfo.connection_errors = w._conninfo.channel_errors = ()
         w.pool_cls = Mock()
         w.pool_cls = Mock()
         w.use_eventloop = False
         w.use_eventloop = False
         pool = components.Pool(w)
         pool = components.Pool(w)
@@ -1141,6 +1142,7 @@ class test_WorkController(AppCase):
     def test_Pool_create(self):
     def test_Pool_create(self):
         from celery.worker.hub import BoundedSemaphore
         from celery.worker.hub import BoundedSemaphore
         w = Mock()
         w = Mock()
+        w._conninfo.connection_errors = w._conninfo.channel_errors = ()
         w.hub = Mock()
         w.hub = Mock()
         w.hub.on_init = []
         w.hub.on_init = []
         w.pool_cls = Mock()
         w.pool_cls = Mock()

+ 3 - 1
celery/worker/__init__.py

@@ -128,6 +128,8 @@ class WorkController(configurated):
         # Options
         # Options
         self.loglevel = mlevel(self.loglevel)
         self.loglevel = mlevel(self.loglevel)
         self.ready_callback = ready_callback or self.on_consumer_ready
         self.ready_callback = ready_callback or self.on_consumer_ready
+        # this connection is not established, only used for params
+        self._conninfo = self.app.connection()
         self.use_eventloop = self.should_use_eventloop()
         self.use_eventloop = self.should_use_eventloop()
         self.options = kwargs
         self.options = kwargs
 
 
@@ -236,7 +238,7 @@ class WorkController(configurated):
 
 
     def should_use_eventloop(self):
     def should_use_eventloop(self):
         return (detect_environment() == 'default' and
         return (detect_environment() == 'default' and
-                self.app.connection().is_evented and not self.app.IS_WINDOWS)
+                self._conninfo.is_evented and not self.app.IS_WINDOWS)
 
 
     def stop(self, in_sighandler=False):
     def stop(self, in_sighandler=False):
         """Graceful shutdown of the worker server."""
         """Graceful shutdown of the worker server."""

+ 3 - 8
docs/configuration.rst

@@ -808,15 +808,10 @@ manner using TCP/IP alone, so AMQP defines something called heartbeats
 that's is used both by the client and the broker to detect if
 that's is used both by the client and the broker to detect if
 a connection was closed.
 a connection was closed.
 
 
-Heartbeats are currently only supported by the ``pyamqp://`` transport,
-and this requires the :mod:`amqp` module:
+Hartbeats are disabled by default.
 
 
-.. code-block:: bash
-
-    $ pip install amqp
-
-The default heartbeat value is 10 seconds,
-the heartbeat will then be monitored at the interval specified
+If the heartbeat value is 10 seconds, then
+the heartbeat will be monitored at the interval specified
 by the :setting:`BROKER_HEARTBEAT_CHECKRATE` setting, which by default is
 by the :setting:`BROKER_HEARTBEAT_CHECKRATE` setting, which by default is
 double the rate of the heartbeat value
 double the rate of the heartbeat value
 (so for the default 10 seconds, the heartbeat is checked every 5 seconds).
 (so for the default 10 seconds, the heartbeat is checked every 5 seconds).

+ 51 - 0
docs/history/changelog-3.0.rst

@@ -9,6 +9,57 @@
 
 
 If you're looking for versions prior to 3.0.x you should go to :ref:`history`.
 If you're looking for versions prior to 3.0.x you should go to :ref:`history`.
 
 
+.. _version-3.0.16:
+
+3.0.16
+======
+:release-date: 2013-03-07 XX:XX:XX X.X UTC
+
+- AMQP heartbeats are now disabled by default.
+
+    Some users experiences issues with heartbeats enabled,
+    and it's not strictly necessary to use them.
+
+    If you're experiencing problems detecting connection failures,
+    you can re-enable heartbeats by configuring the :setting:`BROKER_HEARTBEAT`
+    setting.
+
+- Worker: Now propagates connection errors occurring in multiprocessing
+  callbacks, so that the connection can be reset (Issue #1226).
+
+- Worker: Now propagates connection errors occurring in timer callbacks,
+  so that the connection can be reset.
+
+- New bash completion for ``celery`` available in the git repository:
+
+    https://github.com/celery/celery/tree/3.0/extra/bash-completion
+
+    You can source this file or put it in ``bash_completion.d`` to
+    get auto-completion for the ``celery`` command-line utility.
+
+- The node name of a worker can now include unicode characters (Issue #1186).
+
+- The repr of a ``crontab`` object now displays correctly (Issue #972).
+
+- ``events.State`` no longer modifies the original event dictionary.
+
+- No longer uses ``Logger.warn`` deprecated in Python 3.
+
+- Cache Backend: Now works with chords again (Issue #1094).
+
+- Chord unlock now handles errors occurring while calling the callback.
+
+- ``ResultSet.join`` now always works with empty result set (Issue #1219).
+
+- A ``group`` consisting of a single task is now supported (Issue #1219).
+
+- Now supports the ``pycallgraph`` program (Issue #1051).
+
+- Fixed Jython compatibility problems.
+
+- Django tutorial: Now mentions that the example app must be added to
+  ``INSTALLED_APPS`` (Issue #1192).
+
 .. _version-3.0.15:
 .. _version-3.0.15:
 
 
 3.0.15
 3.0.15