浏览代码

Merge branch 'master' into hub_semaphore

Conflicts:
	celery/worker/__init__.py
	celery/worker/consumer.py
	celery/worker/hub.py
Ask Solem 13 年之前
父节点
当前提交
22d11ddc82

+ 1 - 1
README.rst

@@ -4,7 +4,7 @@
 
 
 .. image:: http://cloud.github.com/downloads/ask/celery/celery_128.png
 .. image:: http://cloud.github.com/downloads/ask/celery/celery_128.png
 
 
-:Version: 2.6.0rc2
+:Version: 2.6.0rc3
 :Web: http://celeryproject.org/
 :Web: http://celeryproject.org/
 :Download: http://pypi.python.org/pypi/celery/
 :Download: http://pypi.python.org/pypi/celery/
 :Source: http://github.com/ask/celery/
 :Source: http://github.com/ask/celery/

+ 1 - 1
celery/__init__.py

@@ -5,7 +5,7 @@
 
 
 from __future__ import absolute_import
 from __future__ import absolute_import
 
 
-VERSION = (2, 6, 0, "rc2")
+VERSION = (2, 6, 0, "rc3")
 __version__ = ".".join(map(str, VERSION[0:3])) + "".join(VERSION[3:])
 __version__ = ".".join(map(str, VERSION[0:3])) + "".join(VERSION[3:])
 __author__ = "Ask Solem"
 __author__ = "Ask Solem"
 __contact__ = "ask@celeryproject.org"
 __contact__ = "ask@celeryproject.org"

+ 3 - 3
celery/app/defaults.py

@@ -164,11 +164,11 @@ NAMESPACES = {
         "AUTORELOADER": Option("celery.worker.autoreload.Autoreloader"),
         "AUTORELOADER": Option("celery.worker.autoreload.Autoreloader"),
         "BOOT_STEPS": Option((), type="tuple"),
         "BOOT_STEPS": Option((), type="tuple"),
         "CONCURRENCY": Option(0, type="int"),
         "CONCURRENCY": Option(0, type="int"),
-        "ETA_SCHEDULER": Option(None, type="string"),
-        "ETA_SCHEDULER_PRECISION": Option(1.0, type="float"),
+        "TIMER": Option(None, type="string"),
+        "TIMER_PRECISION": Option(1.0, type="float"),
         "FORCE_EXECV": Option(True, type="bool"),
         "FORCE_EXECV": Option(True, type="bool"),
         "HIJACK_ROOT_LOGGER": Option(True, type="bool"),
         "HIJACK_ROOT_LOGGER": Option(True, type="bool"),
-        "CONSUMER": Option("celery.worker.consumer.Consumer"),
+        "CONSUMER": Option(None, type="string"),
         "LOG_FORMAT": Option(DEFAULT_PROCESS_LOG_FMT),
         "LOG_FORMAT": Option(DEFAULT_PROCESS_LOG_FMT),
         "LOG_COLOR": Option(type="bool"),
         "LOG_COLOR": Option(type="bool"),
         "LOG_LEVEL": Option("WARN", deprecate_by="2.4", remove_by="3.0",
         "LOG_LEVEL": Option("WARN", deprecate_by="2.4", remove_by="3.0",

+ 22 - 12
celery/app/task.py

@@ -613,7 +613,7 @@ class BaseTask(object):
             ...         twitter.post_status_update(message)
             ...         twitter.post_status_update(message)
             ...     except twitter.FailWhale, exc:
             ...     except twitter.FailWhale, exc:
             ...         # Retry in 5 minutes.
             ...         # Retry in 5 minutes.
-            ...         return tweet.retry(countdown=60 * 5, exc=exc)
+            ...         raise tweet.retry(countdown=60 * 5, exc=exc)
 
 
         Although the task will never return above as `retry` raises an
         Although the task will never return above as `retry` raises an
         exception to notify the worker, we use `return` in front of the retry
         exception to notify the worker, we use `return` in front of the retry
@@ -654,13 +654,14 @@ class BaseTask(object):
         # If task was executed eagerly using apply(),
         # If task was executed eagerly using apply(),
         # then the retry must also be executed eagerly.
         # then the retry must also be executed eagerly.
         if request.is_eager:
         if request.is_eager:
-            return self.apply(args=args, kwargs=kwargs, **options).get()
-
-        self.apply_async(args=args, kwargs=kwargs, **options)
+            self.apply(args=args, kwargs=kwargs, **options).get()
+        else:
+            self.apply_async(args=args, kwargs=kwargs, **options)
+        ret = RetryTaskError(eta and "Retry at %s" % eta
+                                  or "Retry in %s secs." % countdown, exc)
         if throw:
         if throw:
-            raise RetryTaskError(
-                eta and "Retry at %s" % (eta, )
-                     or "Retry in %s secs." % (countdown, ), exc)
+            raise ret
+        return ret
 
 
     def apply(self, args=None, kwargs=None, **options):
     def apply(self, args=None, kwargs=None, **options):
         """Execute this task locally, by blocking until the task returns.
         """Execute this task locally, by blocking until the task returns.
@@ -736,24 +737,34 @@ class BaseTask(object):
         """``.s(*a, **k) -> .subtask(a, k)``"""
         """``.s(*a, **k) -> .subtask(a, k)``"""
         return self.subtask(args, kwargs)
         return self.subtask(args, kwargs)
 
 
+    def si(self, *args, **kwargs):
+        """``.si(*a, **k) -> .subtask(a, k, immutable=True)``"""
+        return self.subtask(args, kwargs, immutable=True)
+
     def chunks(self, it, n):
     def chunks(self, it, n):
+        """Creates a :class:`~celery.canvas.chunks` task for this task."""
         from celery import chunks
         from celery import chunks
         return chunks(self.s(), it, n)
         return chunks(self.s(), it, n)
 
 
     def map(self, it):
     def map(self, it):
+        """Creates a :class:`~celery.canvas.xmap` task from ``it``."""
         from celery import xmap
         from celery import xmap
         return xmap(self.s(), it)
         return xmap(self.s(), it)
 
 
     def starmap(self, it):
     def starmap(self, it):
+        """Creates a :class:`~celery.canvas.xstarmap` task from ``it``."""
         from celery import xstarmap
         from celery import xstarmap
         return xstarmap(self.s(), it)
         return xstarmap(self.s(), it)
 
 
     def update_state(self, task_id=None, state=None, meta=None):
     def update_state(self, task_id=None, state=None, meta=None):
         """Update task state.
         """Update task state.
 
 
-        :param task_id: Id of the task to update.
-        :param state: New state (:class:`str`).
-        :param meta: State metadata (:class:`dict`).
+        :keyword task_id: Id of the task to update, defaults to the
+                          id of the current task
+        :keyword state: New state (:class:`str`).
+        :keyword meta: State metadata (:class:`dict`).
+
+
 
 
         """
         """
         if task_id is None:
         if task_id is None:
@@ -817,8 +828,7 @@ class BaseTask(object):
 
 
     def send_error_email(self, context, exc, **kwargs):
     def send_error_email(self, context, exc, **kwargs):
         if self.send_error_emails and not self.disable_error_emails:
         if self.send_error_emails and not self.disable_error_emails:
-            sender = self.ErrorMail(self, **kwargs)
-            sender.send(context, exc)
+            self.ErrorMail(self, **kwargs).send(context, exc)
 
 
     def on_success(self, retval, task_id, args, kwargs):
     def on_success(self, retval, task_id, args, kwargs):
         """Success handler.
         """Success handler.

+ 10 - 0
celery/concurrency/base.py

@@ -46,6 +46,12 @@ class BasePool(object):
     _state = None
     _state = None
     _pool = None
     _pool = None
 
 
+    #: only used by multiprocessing pool
+    on_process_started = None
+
+    #: only used by multiprocessing pool
+    on_process_down = None
+
     def __init__(self, limit=None, putlocks=True, **options):
     def __init__(self, limit=None, putlocks=True, **options):
         self.limit = limit
         self.limit = limit
         self.putlocks = putlocks
         self.putlocks = putlocks
@@ -124,3 +130,7 @@ class BasePool(object):
     @property
     @property
     def eventmap(self):
     def eventmap(self):
         return {}
         return {}
+
+    @property
+    def timers(self):
+        return {}

+ 23 - 0
celery/concurrency/processes/__init__.py

@@ -104,6 +104,25 @@ class TaskPool(BasePool):
                 "put-guarded-by-semaphore": self.putlocks,
                 "put-guarded-by-semaphore": self.putlocks,
                 "timeouts": (self._pool.soft_timeout, self._pool.timeout)}
                 "timeouts": (self._pool.soft_timeout, self._pool.timeout)}
 
 
+    def set_on_process_started(self, callback):
+        self._pool.on_process_created
+
+
+    def _get_on_process_started(self):
+        return self._pool.on_process_started
+
+    def _set_on_process_started(self, fun):
+        self._pool.on_process_started = fun
+    on_process_started = property(_get_on_process_started,
+                                  _set_on_process_started)
+    def _get_on_process_down(self):
+        return self._pool.on_process_down
+
+    def _set_on_process_down(self, fun):
+        self._pool.on_process_down = fun
+    on_process_down = property(_get_on_process_down,
+                               _set_on_process_down)
+
     @property
     @property
     def num_processes(self):
     def num_processes(self):
         return self._pool._processes
         return self._pool._processes
@@ -111,3 +130,7 @@ class TaskPool(BasePool):
     @property
     @property
     def eventmap(self):
     def eventmap(self):
         return self._pool.eventmap
         return self._pool.eventmap
+
+    @property
+    def timers(self):
+        return self._pool.timers

+ 5 - 2
celery/events/__init__.py

@@ -35,7 +35,7 @@ event_exchange = Exchange("celeryev", type="topic")
 
 
 def get_exchange(conn):
 def get_exchange(conn):
     ex = copy(event_exchange)
     ex = copy(event_exchange)
-    if "redis" in conn.transport_cls:
+    if "redis" in type(conn.transport).__module__:
         # quick hack for #436
         # quick hack for #436
         ex.type = "fanout"
         ex.type = "fanout"
     return ex
     return ex
@@ -103,7 +103,10 @@ class EventDispatcher(object):
         self.close()
         self.close()
 
 
     def get_exchange(self):
     def get_exchange(self):
-        return get_exchange(self.connection)
+        if self.connection:
+            return get_exchange(self.connection)
+        else:
+            return get_exchange(self.channel.connection.client)
 
 
     def enable(self):
     def enable(self):
         self.publisher = Producer(self.channel or self.connection,
         self.publisher = Producer(self.channel or self.connection,

+ 4 - 8
celery/tests/tasks/test_tasks.py

@@ -64,7 +64,7 @@ def retry_task(arg1, arg2, kwarg=1, max_retries=None, care=True):
     if care and retries >= rmax:
     if care and retries >= rmax:
         return arg1
         return arg1
     else:
     else:
-        return current.retry(countdown=0, max_retries=rmax)
+        raise current.retry(countdown=0, max_retries=rmax)
 
 
 
 
 @task.task(max_retries=3, iterations=0)
 @task.task(max_retries=3, iterations=0)
@@ -75,7 +75,7 @@ def retry_task_noargs(**kwargs):
     if retries >= 3:
     if retries >= 3:
         return 42
         return 42
     else:
     else:
-        return current.retry(countdown=0)
+        raise current.retry(countdown=0)
 
 
 
 
 @task.task(max_retries=3, iterations=0, base=MockApplyTask)
 @task.task(max_retries=3, iterations=0, base=MockApplyTask)
@@ -87,7 +87,7 @@ def retry_task_mockapply(arg1, arg2, kwarg=1, **kwargs):
         return arg1
         return arg1
     else:
     else:
         kwargs.update(kwarg=kwarg)
         kwargs.update(kwarg=kwarg)
-    return current.retry(countdown=0)
+    raise current.retry(countdown=0)
 
 
 
 
 class MyCustomException(Exception):
 class MyCustomException(Exception):
@@ -106,7 +106,7 @@ def retry_task_customexc(arg1, arg2, kwarg=1, **kwargs):
             raise MyCustomException("Elaine Marie Benes")
             raise MyCustomException("Elaine Marie Benes")
         except MyCustomException, exc:
         except MyCustomException, exc:
             kwargs.update(kwarg=kwarg)
             kwargs.update(kwarg=kwarg)
-            return current.retry(countdown=0, exc=exc)
+            raise current.retry(countdown=0, exc=exc)
 
 
 
 
 class test_task_retries(Case):
 class test_task_retries(Case):
@@ -115,20 +115,17 @@ class test_task_retries(Case):
         retry_task.__class__.max_retries = 3
         retry_task.__class__.max_retries = 3
         retry_task.iterations = 0
         retry_task.iterations = 0
         result = retry_task.apply([0xFF, 0xFFFF])
         result = retry_task.apply([0xFF, 0xFFFF])
-        self.assertEqual(result.get(), 0xFF)
         self.assertEqual(retry_task.iterations, 4)
         self.assertEqual(retry_task.iterations, 4)
 
 
         retry_task.__class__.max_retries = 3
         retry_task.__class__.max_retries = 3
         retry_task.iterations = 0
         retry_task.iterations = 0
         result = retry_task.apply([0xFF, 0xFFFF], {"max_retries": 10})
         result = retry_task.apply([0xFF, 0xFFFF], {"max_retries": 10})
-        self.assertEqual(result.get(), 0xFF)
         self.assertEqual(retry_task.iterations, 11)
         self.assertEqual(retry_task.iterations, 11)
 
 
     def test_retry_no_args(self):
     def test_retry_no_args(self):
         retry_task_noargs.__class__.max_retries = 3
         retry_task_noargs.__class__.max_retries = 3
         retry_task_noargs.iterations = 0
         retry_task_noargs.iterations = 0
         result = retry_task_noargs.apply()
         result = retry_task_noargs.apply()
-        self.assertEqual(result.get(), 42)
         self.assertEqual(retry_task_noargs.iterations, 4)
         self.assertEqual(retry_task_noargs.iterations, 4)
 
 
     def test_retry_kwargs_can_be_empty(self):
     def test_retry_kwargs_can_be_empty(self):
@@ -158,7 +155,6 @@ class test_task_retries(Case):
         retry_task_customexc.__class__.max_retries = 3
         retry_task_customexc.__class__.max_retries = 3
         retry_task_customexc.iterations = 0
         retry_task_customexc.iterations = 0
         result = retry_task_customexc.apply([0xFF, 0xFFFF], {"kwarg": 0xF})
         result = retry_task_customexc.apply([0xFF, 0xFFFF], {"kwarg": 0xF})
-        self.assertEqual(result.get(), 0xFF + 0xF)
         self.assertEqual(retry_task_customexc.iterations, 4)
         self.assertEqual(retry_task_customexc.iterations, 4)
 
 
     def test_retry_with_custom_exception(self):
     def test_retry_with_custom_exception(self):

+ 3 - 3
celery/tests/worker/test_control.py

@@ -43,7 +43,7 @@ class Consumer(object):
                                          uuid(),
                                          uuid(),
                                          args=(2, 2),
                                          args=(2, 2),
                                          kwargs={}))
                                          kwargs={}))
-        self.eta_schedule = Timer()
+        self.timer = Timer()
         self.app = current_app
         self.app = current_app
         self.event_dispatcher = Mock()
         self.event_dispatcher = Mock()
         self.controller = WorkController()
         self.controller = WorkController()
@@ -243,8 +243,8 @@ class test_ControlPanel(Case):
         panel = self.create_panel(consumer=consumer)
         panel = self.create_panel(consumer=consumer)
         self.assertFalse(panel.handle("dump_schedule"))
         self.assertFalse(panel.handle("dump_schedule"))
         r = TaskRequest(mytask.name, "CAFEBABE", (), {})
         r = TaskRequest(mytask.name, "CAFEBABE", (), {})
-        consumer.eta_schedule.schedule.enter(
-                consumer.eta_schedule.Entry(lambda x: x, (r, )),
+        consumer.timer.schedule.enter(
+                consumer.timer.Entry(lambda x: x, (r, )),
                     datetime.now() + timedelta(seconds=10))
                     datetime.now() + timedelta(seconds=10))
         self.assertTrue(panel.handle("dump_schedule"))
         self.assertTrue(panel.handle("dump_schedule"))
 
 

+ 53 - 82
celery/tests/worker/test_worker.py

@@ -24,7 +24,7 @@ from celery.utils import uuid
 from celery.worker import WorkController, Queues, Timers
 from celery.worker import WorkController, Queues, Timers
 from celery.worker.buckets import FastQueue
 from celery.worker.buckets import FastQueue
 from celery.worker.job import Request
 from celery.worker.job import Request
-from celery.worker.consumer import Consumer as MainConsumer
+from celery.worker.consumer import BlockingConsumer
 from celery.worker.consumer import QoS, RUN, PREFETCH_COUNT_MAX, CLOSE
 from celery.worker.consumer import QoS, RUN, PREFETCH_COUNT_MAX, CLOSE
 from celery.utils.serialization import pickle
 from celery.utils.serialization import pickle
 from celery.utils.timer2 import Timer
 from celery.utils.timer2 import Timer
@@ -37,7 +37,7 @@ class PlaceHolder(object):
         pass
         pass
 
 
 
 
-class MyKombuConsumer(MainConsumer):
+class MyKombuConsumer(BlockingConsumer):
     broadcast_consumer = Mock()
     broadcast_consumer = Mock()
     task_consumer = Mock()
     task_consumer = Mock()
 
 
@@ -208,14 +208,13 @@ class test_Consumer(Case):
 
 
     def setUp(self):
     def setUp(self):
         self.ready_queue = FastQueue()
         self.ready_queue = FastQueue()
-        self.eta_schedule = Timer()
+        self.timer = Timer()
 
 
     def tearDown(self):
     def tearDown(self):
-        self.eta_schedule.stop()
+        self.timer.stop()
 
 
     def test_info(self):
     def test_info(self):
-        l = MyKombuConsumer(self.ready_queue, self.eta_schedule,
-                           send_events=False)
+        l = MyKombuConsumer(self.ready_queue, timer=self.timer)
         l.qos = QoS(l.task_consumer, 10)
         l.qos = QoS(l.task_consumer, 10)
         info = l.info
         info = l.info
         self.assertEqual(info["prefetch_count"], 10)
         self.assertEqual(info["prefetch_count"], 10)
@@ -226,14 +225,12 @@ class test_Consumer(Case):
         self.assertTrue(info["broker"])
         self.assertTrue(info["broker"])
 
 
     def test_start_when_closed(self):
     def test_start_when_closed(self):
-        l = MyKombuConsumer(self.ready_queue, self.eta_schedule,
-                            send_events=False)
+        l = MyKombuConsumer(self.ready_queue, timer=self.timer)
         l._state = CLOSE
         l._state = CLOSE
         l.start()
         l.start()
 
 
     def test_connection(self):
     def test_connection(self):
-        l = MyKombuConsumer(self.ready_queue, self.eta_schedule,
-                           send_events=False)
+        l = MyKombuConsumer(self.ready_queue, timer=self.timer)
 
 
         l.reset_connection()
         l.reset_connection()
         self.assertIsInstance(l.connection, BrokerConnection)
         self.assertIsInstance(l.connection, BrokerConnection)
@@ -258,13 +255,11 @@ class test_Consumer(Case):
         self.assertIsNone(l.task_consumer)
         self.assertIsNone(l.task_consumer)
 
 
     def test_close_connection(self):
     def test_close_connection(self):
-        l = MyKombuConsumer(self.ready_queue, self.eta_schedule,
-                           send_events=False)
+        l = MyKombuConsumer(self.ready_queue, timer=self.timer)
         l._state = RUN
         l._state = RUN
         l.close_connection()
         l.close_connection()
 
 
-        l = MyKombuConsumer(self.ready_queue, self.eta_schedule,
-                           send_events=False)
+        l = MyKombuConsumer(self.ready_queue, timer=self.timer)
         eventer = l.event_dispatcher = Mock()
         eventer = l.event_dispatcher = Mock()
         eventer.enabled = True
         eventer.enabled = True
         heart = l.heart = MockHeart()
         heart = l.heart = MockHeart()
@@ -275,8 +270,7 @@ class test_Consumer(Case):
 
 
     @patch("celery.worker.consumer.warn")
     @patch("celery.worker.consumer.warn")
     def test_receive_message_unknown(self, warn):
     def test_receive_message_unknown(self, warn):
-        l = MyKombuConsumer(self.ready_queue, self.eta_schedule,
-                           send_events=False)
+        l = MyKombuConsumer(self.ready_queue, timer=self.timer)
         backend = Mock()
         backend = Mock()
         m = create_message(backend, unknown={"baz": "!!!"})
         m = create_message(backend, unknown={"baz": "!!!"})
         l.event_dispatcher = Mock()
         l.event_dispatcher = Mock()
@@ -288,8 +282,7 @@ class test_Consumer(Case):
     @patch("celery.utils.timer2.to_timestamp")
     @patch("celery.utils.timer2.to_timestamp")
     def test_receive_message_eta_OverflowError(self, to_timestamp):
     def test_receive_message_eta_OverflowError(self, to_timestamp):
         to_timestamp.side_effect = OverflowError()
         to_timestamp.side_effect = OverflowError()
-        l = MyKombuConsumer(self.ready_queue, self.eta_schedule,
-                             send_events=False)
+        l = MyKombuConsumer(self.ready_queue, timer=self.timer)
         m = create_message(Mock(), task=foo_task.name,
         m = create_message(Mock(), task=foo_task.name,
                                    args=("2, 2"),
                                    args=("2, 2"),
                                    kwargs={},
                                    kwargs={},
@@ -304,8 +297,7 @@ class test_Consumer(Case):
 
 
     @patch("celery.worker.consumer.error")
     @patch("celery.worker.consumer.error")
     def test_receive_message_InvalidTaskError(self, error):
     def test_receive_message_InvalidTaskError(self, error):
-        l = MyKombuConsumer(self.ready_queue, self.eta_schedule,
-                           send_events=False)
+        l = MyKombuConsumer(self.ready_queue, timer=self.timer)
         m = create_message(Mock(), task=foo_task.name,
         m = create_message(Mock(), task=foo_task.name,
                            args=(1, 2), kwargs="foobarbaz", id=1)
                            args=(1, 2), kwargs="foobarbaz", id=1)
         l.update_strategies()
         l.update_strategies()
@@ -317,8 +309,7 @@ class test_Consumer(Case):
 
 
     @patch("celery.worker.consumer.crit")
     @patch("celery.worker.consumer.crit")
     def test_on_decode_error(self, crit):
     def test_on_decode_error(self, crit):
-        l = MyKombuConsumer(self.ready_queue, self.eta_schedule,
-                           send_events=False)
+        l = MyKombuConsumer(self.ready_queue, timer=self.timer)
 
 
         class MockMessage(Mock):
         class MockMessage(Mock):
             content_type = "application/x-msgpack"
             content_type = "application/x-msgpack"
@@ -331,8 +322,7 @@ class test_Consumer(Case):
         self.assertIn("Can't decode message body", crit.call_args[0][0])
         self.assertIn("Can't decode message body", crit.call_args[0][0])
 
 
     def test_receieve_message(self):
     def test_receieve_message(self):
-        l = MyKombuConsumer(self.ready_queue, self.eta_schedule,
-                           send_events=False)
+        l = MyKombuConsumer(self.ready_queue, timer=self.timer)
         m = create_message(Mock(), task=foo_task.name,
         m = create_message(Mock(), task=foo_task.name,
                            args=[2, 4, 8], kwargs={})
                            args=[2, 4, 8], kwargs={})
         l.update_strategies()
         l.update_strategies()
@@ -344,11 +334,11 @@ class test_Consumer(Case):
         self.assertIsInstance(in_bucket, Request)
         self.assertIsInstance(in_bucket, Request)
         self.assertEqual(in_bucket.name, foo_task.name)
         self.assertEqual(in_bucket.name, foo_task.name)
         self.assertEqual(in_bucket.execute(), 2 * 4 * 8)
         self.assertEqual(in_bucket.execute(), 2 * 4 * 8)
-        self.assertTrue(self.eta_schedule.empty())
+        self.assertTrue(self.timer.empty())
 
 
     def test_start_connection_error(self):
     def test_start_connection_error(self):
 
 
-        class MockConsumer(MainConsumer):
+        class MockConsumer(BlockingConsumer):
             iterations = 0
             iterations = 0
 
 
             def consume_messages(self):
             def consume_messages(self):
@@ -357,19 +347,19 @@ class test_Consumer(Case):
                     raise KeyError("foo")
                     raise KeyError("foo")
                 raise SyntaxError("bar")
                 raise SyntaxError("bar")
 
 
-        l = MockConsumer(self.ready_queue, self.eta_schedule,
+        l = MockConsumer(self.ready_queue, timer=self.timer,
                              send_events=False, pool=BasePool())
                              send_events=False, pool=BasePool())
         l.connection_errors = (KeyError, )
         l.connection_errors = (KeyError, )
         with self.assertRaises(SyntaxError):
         with self.assertRaises(SyntaxError):
             l.start()
             l.start()
         l.heart.stop()
         l.heart.stop()
-        l.priority_timer.stop()
+        l.timer.stop()
 
 
     def test_start_channel_error(self):
     def test_start_channel_error(self):
         # Regression test for AMQPChannelExceptions that can occur within the
         # Regression test for AMQPChannelExceptions that can occur within the
         # consumer. (i.e. 404 errors)
         # consumer. (i.e. 404 errors)
 
 
-        class MockConsumer(MainConsumer):
+        class MockConsumer(BlockingConsumer):
             iterations = 0
             iterations = 0
 
 
             def consume_messages(self):
             def consume_messages(self):
@@ -378,13 +368,13 @@ class test_Consumer(Case):
                     raise KeyError("foo")
                     raise KeyError("foo")
                 raise SyntaxError("bar")
                 raise SyntaxError("bar")
 
 
-        l = MockConsumer(self.ready_queue, self.eta_schedule,
+        l = MockConsumer(self.ready_queue, timer=self.timer,
                              send_events=False, pool=BasePool())
                              send_events=False, pool=BasePool())
 
 
         l.channel_errors = (KeyError, )
         l.channel_errors = (KeyError, )
         self.assertRaises(SyntaxError, l.start)
         self.assertRaises(SyntaxError, l.start)
         l.heart.stop()
         l.heart.stop()
-        l.priority_timer.stop()
+        l.timer.stop()
 
 
     def test_consume_messages_ignores_socket_timeout(self):
     def test_consume_messages_ignores_socket_timeout(self):
 
 
@@ -395,8 +385,7 @@ class test_Consumer(Case):
                 self.obj.connection = None
                 self.obj.connection = None
                 raise socket.timeout(10)
                 raise socket.timeout(10)
 
 
-        l = MyKombuConsumer(self.ready_queue, self.eta_schedule,
-                            send_events=False)
+        l = MyKombuConsumer(self.ready_queue, timer=self.timer)
         l.connection = Connection()
         l.connection = Connection()
         l.task_consumer = Mock()
         l.task_consumer = Mock()
         l.connection.obj = l
         l.connection.obj = l
@@ -412,8 +401,7 @@ class test_Consumer(Case):
                 self.obj.connection = None
                 self.obj.connection = None
                 raise socket.error("foo")
                 raise socket.error("foo")
 
 
-        l = MyKombuConsumer(self.ready_queue, self.eta_schedule,
-                            send_events=False)
+        l = MyKombuConsumer(self.ready_queue, timer=self.timer)
         l._state = RUN
         l._state = RUN
         c = l.connection = Connection()
         c = l.connection = Connection()
         l.connection.obj = l
         l.connection.obj = l
@@ -434,8 +422,7 @@ class test_Consumer(Case):
             def drain_events(self, **kwargs):
             def drain_events(self, **kwargs):
                 self.obj.connection = None
                 self.obj.connection = None
 
 
-        l = MyKombuConsumer(self.ready_queue, self.eta_schedule,
-                             send_events=False)
+        l = MyKombuConsumer(self.ready_queue, timer=self.timer)
         l.connection = Connection()
         l.connection = Connection()
         l.connection.obj = l
         l.connection.obj = l
         l.task_consumer = Mock()
         l.task_consumer = Mock()
@@ -450,8 +437,7 @@ class test_Consumer(Case):
         l.task_consumer.qos.assert_called_with(prefetch_count=9)
         l.task_consumer.qos.assert_called_with(prefetch_count=9)
 
 
     def test_maybe_conn_error(self):
     def test_maybe_conn_error(self):
-        l = MyKombuConsumer(self.ready_queue, self.eta_schedule,
-                             send_events=False)
+        l = MyKombuConsumer(self.ready_queue, timer=self.timer)
         l.connection_errors = (KeyError, )
         l.connection_errors = (KeyError, )
         l.channel_errors = (SyntaxError, )
         l.channel_errors = (SyntaxError, )
         l.maybe_conn_error(Mock(side_effect=AttributeError("foo")))
         l.maybe_conn_error(Mock(side_effect=AttributeError("foo")))
@@ -462,8 +448,7 @@ class test_Consumer(Case):
 
 
     def test_apply_eta_task(self):
     def test_apply_eta_task(self):
         from celery.worker import state
         from celery.worker import state
-        l = MyKombuConsumer(self.ready_queue, self.eta_schedule,
-                             send_events=False)
+        l = MyKombuConsumer(self.ready_queue, timer=self.timer)
         l.qos = QoS(None, 10)
         l.qos = QoS(None, 10)
 
 
         task = object()
         task = object()
@@ -474,8 +459,7 @@ class test_Consumer(Case):
         self.assertIs(self.ready_queue.get_nowait(), task)
         self.assertIs(self.ready_queue.get_nowait(), task)
 
 
     def test_receieve_message_eta_isoformat(self):
     def test_receieve_message_eta_isoformat(self):
-        l = MyKombuConsumer(self.ready_queue, self.eta_schedule,
-                             send_events=False)
+        l = MyKombuConsumer(self.ready_queue, timer=self.timer)
         m = create_message(Mock(), task=foo_task.name,
         m = create_message(Mock(), task=foo_task.name,
                            eta=datetime.now().isoformat(),
                            eta=datetime.now().isoformat(),
                            args=[2, 4, 8], kwargs={})
                            args=[2, 4, 8], kwargs={})
@@ -486,20 +470,19 @@ class test_Consumer(Case):
         l.enabled = False
         l.enabled = False
         l.update_strategies()
         l.update_strategies()
         l.receive_message(m.decode(), m)
         l.receive_message(m.decode(), m)
-        l.eta_schedule.stop()
+        l.timer.stop()
 
 
-        items = [entry[2] for entry in self.eta_schedule.queue]
+        items = [entry[2] for entry in self.timer.queue]
         found = 0
         found = 0
         for item in items:
         for item in items:
             if item.args[0].name == foo_task.name:
             if item.args[0].name == foo_task.name:
                 found = True
                 found = True
         self.assertTrue(found)
         self.assertTrue(found)
         self.assertTrue(l.task_consumer.qos.call_count)
         self.assertTrue(l.task_consumer.qos.call_count)
-        l.eta_schedule.stop()
+        l.timer.stop()
 
 
     def test_on_control(self):
     def test_on_control(self):
-        l = MyKombuConsumer(self.ready_queue, self.eta_schedule,
-                             send_events=False)
+        l = MyKombuConsumer(self.ready_queue, timer=self.timer)
         l.pidbox_node = Mock()
         l.pidbox_node = Mock()
         l.reset_pidbox_node = Mock()
         l.reset_pidbox_node = Mock()
 
 
@@ -519,8 +502,7 @@ class test_Consumer(Case):
 
 
     def test_revoke(self):
     def test_revoke(self):
         ready_queue = FastQueue()
         ready_queue = FastQueue()
-        l = MyKombuConsumer(ready_queue, self.eta_schedule,
-                           send_events=False)
+        l = MyKombuConsumer(ready_queue, timer=self.timer)
         backend = Mock()
         backend = Mock()
         id = uuid()
         id = uuid()
         t = create_message(backend, task=foo_task.name, args=[2, 4, 8],
         t = create_message(backend, task=foo_task.name, args=[2, 4, 8],
@@ -532,8 +514,7 @@ class test_Consumer(Case):
         self.assertTrue(ready_queue.empty())
         self.assertTrue(ready_queue.empty())
 
 
     def test_receieve_message_not_registered(self):
     def test_receieve_message_not_registered(self):
-        l = MyKombuConsumer(self.ready_queue, self.eta_schedule,
-                          send_events=False)
+        l = MyKombuConsumer(self.ready_queue, timer=self.timer)
         backend = Mock()
         backend = Mock()
         m = create_message(backend, task="x.X.31x", args=[2, 4, 8], kwargs={})
         m = create_message(backend, task="x.X.31x", args=[2, 4, 8], kwargs={})
 
 
@@ -541,13 +522,12 @@ class test_Consumer(Case):
         self.assertFalse(l.receive_message(m.decode(), m))
         self.assertFalse(l.receive_message(m.decode(), m))
         with self.assertRaises(Empty):
         with self.assertRaises(Empty):
             self.ready_queue.get_nowait()
             self.ready_queue.get_nowait()
-        self.assertTrue(self.eta_schedule.empty())
+        self.assertTrue(self.timer.empty())
 
 
     @patch("celery.worker.consumer.warn")
     @patch("celery.worker.consumer.warn")
     @patch("celery.worker.consumer.logger")
     @patch("celery.worker.consumer.logger")
     def test_receieve_message_ack_raises(self, logger, warn):
     def test_receieve_message_ack_raises(self, logger, warn):
-        l = MyKombuConsumer(self.ready_queue, self.eta_schedule,
-                          send_events=False)
+        l = MyKombuConsumer(self.ready_queue, timer=self.timer)
         backend = Mock()
         backend = Mock()
         m = create_message(backend, args=[2, 4, 8], kwargs={})
         m = create_message(backend, args=[2, 4, 8], kwargs={})
 
 
@@ -559,13 +539,12 @@ class test_Consumer(Case):
         self.assertTrue(warn.call_count)
         self.assertTrue(warn.call_count)
         with self.assertRaises(Empty):
         with self.assertRaises(Empty):
             self.ready_queue.get_nowait()
             self.ready_queue.get_nowait()
-        self.assertTrue(self.eta_schedule.empty())
+        self.assertTrue(self.timer.empty())
         m.reject.assert_called_with()
         m.reject.assert_called_with()
         self.assertTrue(logger.critical.call_count)
         self.assertTrue(logger.critical.call_count)
 
 
     def test_receieve_message_eta(self):
     def test_receieve_message_eta(self):
-        l = MyKombuConsumer(self.ready_queue, self.eta_schedule,
-                            send_events=False)
+        l = MyKombuConsumer(self.ready_queue, timer=self.timer)
         l.event_dispatcher = Mock()
         l.event_dispatcher = Mock()
         l.event_dispatcher._outbound_buffer = deque()
         l.event_dispatcher._outbound_buffer = deque()
         backend = Mock()
         backend = Mock()
@@ -584,8 +563,8 @@ class test_Consumer(Case):
         l.stop_consumers()
         l.stop_consumers()
         l.event_dispatcher = Mock()
         l.event_dispatcher = Mock()
         l.receive_message(m.decode(), m)
         l.receive_message(m.decode(), m)
-        l.eta_schedule.stop()
-        in_hold = self.eta_schedule.queue[0]
+        l.timer.stop()
+        in_hold = l.timer.queue[0]
         self.assertEqual(len(in_hold), 3)
         self.assertEqual(len(in_hold), 3)
         eta, priority, entry = in_hold
         eta, priority, entry = in_hold
         task = entry.args[0]
         task = entry.args[0]
@@ -596,8 +575,7 @@ class test_Consumer(Case):
             self.ready_queue.get_nowait()
             self.ready_queue.get_nowait()
 
 
     def test_reset_pidbox_node(self):
     def test_reset_pidbox_node(self):
-        l = MyKombuConsumer(self.ready_queue, self.eta_schedule,
-                          send_events=False)
+        l = MyKombuConsumer(self.ready_queue, timer=self.timer)
         l.pidbox_node = Mock()
         l.pidbox_node = Mock()
         chan = l.pidbox_node.channel = Mock()
         chan = l.pidbox_node.channel = Mock()
         l.connection = Mock()
         l.connection = Mock()
@@ -607,16 +585,14 @@ class test_Consumer(Case):
         chan.close.assert_called_with()
         chan.close.assert_called_with()
 
 
     def test_reset_pidbox_node_green(self):
     def test_reset_pidbox_node_green(self):
-        l = MyKombuConsumer(self.ready_queue, self.eta_schedule,
-                          send_events=False)
+        l = MyKombuConsumer(self.ready_queue, timer=self.timer)
         l.pool = Mock()
         l.pool = Mock()
         l.pool.is_green = True
         l.pool.is_green = True
         l.reset_pidbox_node()
         l.reset_pidbox_node()
         l.pool.spawn_n.assert_called_with(l._green_pidbox_node)
         l.pool.spawn_n.assert_called_with(l._green_pidbox_node)
 
 
     def test__green_pidbox_node(self):
     def test__green_pidbox_node(self):
-        l = MyKombuConsumer(self.ready_queue, self.eta_schedule,
-                          send_events=False)
+        l = MyKombuConsumer(self.ready_queue, timer=self.timer)
         l.pidbox_node = Mock()
         l.pidbox_node = Mock()
 
 
         class BConsumer(Mock):
         class BConsumer(Mock):
@@ -673,8 +649,7 @@ class test_Consumer(Case):
     @patch("kombu.connection.BrokerConnection._establish_connection")
     @patch("kombu.connection.BrokerConnection._establish_connection")
     @patch("kombu.utils.sleep")
     @patch("kombu.utils.sleep")
     def test_open_connection_errback(self, sleep, connect):
     def test_open_connection_errback(self, sleep, connect):
-        l = MyKombuConsumer(self.ready_queue, self.eta_schedule,
-                      send_events=False)
+        l = MyKombuConsumer(self.ready_queue, timer=self.timer)
         from kombu.transport.memory import Transport
         from kombu.transport.memory import Transport
         Transport.connection_errors = (StdChannelError, )
         Transport.connection_errors = (StdChannelError, )
 
 
@@ -687,8 +662,7 @@ class test_Consumer(Case):
         connect.assert_called_with()
         connect.assert_called_with()
 
 
     def test_stop_pidbox_node(self):
     def test_stop_pidbox_node(self):
-        l = MyKombuConsumer(self.ready_queue, self.eta_schedule,
-                      send_events=False)
+        l = MyKombuConsumer(self.ready_queue, timer=self.timer)
         l._pidbox_node_stopped = Event()
         l._pidbox_node_stopped = Event()
         l._pidbox_node_shutdown = Event()
         l._pidbox_node_shutdown = Event()
         l._pidbox_node_stopped.set()
         l._pidbox_node_stopped.set()
@@ -711,8 +685,8 @@ class test_Consumer(Case):
                     raise KeyError("foo")
                     raise KeyError("foo")
 
 
         init_callback = Mock()
         init_callback = Mock()
-        l = _Consumer(self.ready_queue, self.eta_schedule,
-                      send_events=False, init_callback=init_callback)
+        l = _Consumer(self.ready_queue, timer=self.timer,
+                      init_callback=init_callback)
         l.task_consumer = Mock()
         l.task_consumer = Mock()
         l.broadcast_consumer = Mock()
         l.broadcast_consumer = Mock()
         l.qos = _QoS()
         l.qos = _QoS()
@@ -734,7 +708,7 @@ class test_Consumer(Case):
         self.assertEqual(l.qos.prev, l.qos.value)
         self.assertEqual(l.qos.prev, l.qos.value)
 
 
         init_callback.reset_mock()
         init_callback.reset_mock()
-        l = _Consumer(self.ready_queue, self.eta_schedule,
+        l = _Consumer(self.ready_queue, timer=self.timer,
                       send_events=False, init_callback=init_callback)
                       send_events=False, init_callback=init_callback)
         l.qos = _QoS()
         l.qos = _QoS()
         l.task_consumer = Mock()
         l.task_consumer = Mock()
@@ -747,21 +721,18 @@ class test_Consumer(Case):
         self.assertTrue(l.consume_messages.call_count)
         self.assertTrue(l.consume_messages.call_count)
 
 
     def test_reset_connection_with_no_node(self):
     def test_reset_connection_with_no_node(self):
-        l = MainConsumer(self.ready_queue, self.eta_schedule,
-                         send_events=False)
+        l = BlockingConsumer(self.ready_queue, timer=self.timer)
         self.assertEqual(None, l.pool)
         self.assertEqual(None, l.pool)
         l.reset_connection()
         l.reset_connection()
 
 
     def test_on_task_revoked(self):
     def test_on_task_revoked(self):
-        l = MainConsumer(self.ready_queue, self.eta_schedule,
-                         send_events=False)
+        l = BlockingConsumer(self.ready_queue, timer=self.timer)
         task = Mock()
         task = Mock()
         task.revoked.return_value = True
         task.revoked.return_value = True
         l.on_task(task)
         l.on_task(task)
 
 
     def test_on_task_no_events(self):
     def test_on_task_no_events(self):
-        l = MainConsumer(self.ready_queue, self.eta_schedule,
-                         send_events=False)
+        l = BlockingConsumer(self.ready_queue, timer=self.timer)
         task = Mock()
         task = Mock()
         task.revoked.return_value = False
         task.revoked.return_value = False
         l.event_dispatcher = Mock()
         l.event_dispatcher = Mock()
@@ -834,8 +805,8 @@ class test_WorkController(AppCase):
 
 
     def test_attrs(self):
     def test_attrs(self):
         worker = self.worker
         worker = self.worker
-        self.assertIsInstance(worker.scheduler, Timer)
-        self.assertTrue(worker.scheduler)
+        self.assertIsInstance(worker.timer, Timer)
+        self.assertTrue(worker.timer)
         self.assertTrue(worker.pool)
         self.assertTrue(worker.pool)
         self.assertTrue(worker.consumer)
         self.assertTrue(worker.consumer)
         self.assertTrue(worker.mediator)
         self.assertTrue(worker.mediator)
@@ -848,7 +819,7 @@ class test_WorkController(AppCase):
 
 
     def test_with_autoscaler(self):
     def test_with_autoscaler(self):
         worker = self.create_worker(autoscale=[10, 3], send_events=False,
         worker = self.create_worker(autoscale=[10, 3], send_events=False,
-                                eta_scheduler_cls="celery.utils.timer2.Timer")
+                                timer_cls="celery.utils.timer2.Timer")
         self.assertTrue(worker.autoscaler)
         self.assertTrue(worker.autoscaler)
 
 
     def test_dont_stop_or_terminate(self):
     def test_dont_stop_or_terminate(self):

+ 13 - 3
celery/utils/timer2.py

@@ -17,6 +17,7 @@ import heapq
 import os
 import os
 import sys
 import sys
 
 
+from functools import wraps
 from itertools import count
 from itertools import count
 from threading import Condition, Event, Lock, Thread
 from threading import Condition, Event, Lock, Thread
 from time import time, sleep, mktime
 from time import time, sleep, mktime
@@ -83,7 +84,7 @@ class Schedule(object):
 
 
     on_error = None
     on_error = None
 
 
-    def __init__(self, max_interval=None, on_error=None):
+    def __init__(self, max_interval=None, on_error=None, **kwargs):
         self.max_interval = float(max_interval or DEFAULT_MAX_INTERVAL)
         self.max_interval = float(max_interval or DEFAULT_MAX_INTERVAL)
         self.on_error = on_error or self.on_error
         self.on_error = on_error or self.on_error
         self._queue = []
         self._queue = []
@@ -137,15 +138,24 @@ class Schedule(object):
 
 
     def apply_interval(self, msecs, fun, args=(), kwargs={}, priority=0):
     def apply_interval(self, msecs, fun, args=(), kwargs={}, priority=0):
         tref = self.Entry(fun, args, kwargs)
         tref = self.Entry(fun, args, kwargs)
+        secs = msecs * 1000.0
 
 
+        @wraps(fun)
         def _reschedules(*args, **kwargs):
         def _reschedules(*args, **kwargs):
+            last, now = tref._last_run, time()
+            lsince = (now - tref._last_run) * 1000.0 if last else msecs
             try:
             try:
-                return fun(*args, **kwargs)
+                if lsince and lsince >= msecs:
+                    tref._last_run = now
+                    return fun(*args, **kwargs)
             finally:
             finally:
                 if not tref.cancelled:
                 if not tref.cancelled:
-                    self.enter_after(msecs, tref, priority)
+                    last = tref._last_run
+                    next = secs - (now - last) if last else secs
+                    self.enter_after(next / 1000.0, tref, priority)
 
 
         tref.fun = _reschedules
         tref.fun = _reschedules
+        tref._last_run = None
         return self.enter_after(msecs, tref, priority)
         return self.enter_after(msecs, tref, priority)
 
 
     def __iter__(self):
     def __iter__(self):

+ 30 - 25
celery/worker/__init__.py

@@ -90,23 +90,24 @@ class Pool(abstract.StartStopComponent):
         w.no_execv = no_execv
         w.no_execv = no_execv
         if w.autoscale:
         if w.autoscale:
             w.max_concurrency, w.min_concurrency = w.autoscale
             w.max_concurrency, w.min_concurrency = w.autoscale
-        w.use_eventloop = (detect_environment() == "default" and
-                           w.app.broker_connection().is_evented)
 
 
-    def create(self, w, semaphore=None):
+    def create(self, w):
+        threaded = not w.use_eventloop
         forking_enable(w.no_execv or not w.force_execv)
         forking_enable(w.no_execv or not w.force_execv)
         procs = w.min_concurrency
         procs = w.min_concurrency
-        if w.use_eventloop:
+        if not threaded:
             semaphore = w.semaphore = BoundedSemaphore(procs)
             semaphore = w.semaphore = BoundedSemaphore(procs)
-        pool = w.pool = self.instantiate(w.pool_cls, procs,
-                initargs=(w.app, w.hostname),
-                maxtasksperchild=w.max_tasks_per_child,
-                timeout=w.task_time_limit,
-                soft_timeout=w.task_soft_time_limit,
-                putlocks=w.pool_putlocks and not w.use_eventloop,
-                lost_worker_timeout=w.worker_lost_wait,
-                start_result_thread=not w.use_eventloop,
-                semaphore=semaphore)
+        pool = w.pool = self.instantiate(w.pool_cls, w.min_concurrency,
+                            initargs=(w.app, w.hostname),
+                            maxtasksperchild=w.max_tasks_per_child,
+                            timeout=w.task_time_limit,
+                            soft_timeout=w.task_soft_time_limit,
+                            putlocks=w.pool_putlocks,
+                            lost_worker_timeout=w.worker_lost_wait,
+                            with_task_thread=threaded,
+                            with_result_thread=threaded,
+                            with_supervisor_thread=threaded,
+                            semaphore=semaphore)
         return pool
         return pool
 
 
 
 
@@ -156,24 +157,26 @@ class Timers(abstract.Component):
     requires = ("pool", )
     requires = ("pool", )
 
 
     def create(self, w):
     def create(self, w):
+        options = {"on_error": self.on_timer_error,
+                   "on_tick": self.on_timer_tick}
+
         if w.use_eventloop:
         if w.use_eventloop:
-            w.scheduler = w.priority_timer = Schedule(max_interval=10)
+            # the timers are fired by the hub, so don't use the Timer thread.
+            w.timer = Schedule(max_interval=10, **options)
         else:
         else:
-            w.priority_timer = self.instantiate(w.pool.Timer)
-            if not w.eta_scheduler_cls:
+            if not w.timer_cls:
                 # Default Timer is set by the pool, as e.g. eventlet
                 # Default Timer is set by the pool, as e.g. eventlet
                 # needs a custom implementation.
                 # needs a custom implementation.
-                w.eta_scheduler_cls = w.pool.Timer
-            w.scheduler = self.instantiate(w.eta_scheduler_cls,
-                                    max_interval=w.eta_scheduler_precision,
-                                    on_error=self.on_timer_error,
-                                    on_tick=self.on_timer_tick)
+                w.timer_cls = w.pool.Timer
+            w.timer = self.instantiate(w.pool.Timer,
+                                       max_interval=w.timer_precision,
+                                       **options)
 
 
     def on_timer_error(self, exc):
     def on_timer_error(self, exc):
         logger.error("Timer error: %r", exc, exc_info=True)
         logger.error("Timer error: %r", exc, exc_info=True)
 
 
     def on_timer_tick(self, delay):
     def on_timer_tick(self, delay):
-        logger.debug("Scheduler wake-up! Next eta %s secs.", delay)
+        logger.debug("Timer wake-up! Next eta %s secs.", delay)
 
 
 
 
 class StateDB(abstract.Component):
 class StateDB(abstract.Component):
@@ -203,8 +206,8 @@ class WorkController(configurated):
     pool_cls = from_config("pool")
     pool_cls = from_config("pool")
     consumer_cls = from_config("consumer")
     consumer_cls = from_config("consumer")
     mediator_cls = from_config("mediator")
     mediator_cls = from_config("mediator")
-    eta_scheduler_cls = from_config("eta_scheduler")
-    eta_scheduler_precision = from_config()
+    timer_cls = from_config("timer")
+    timer_precision = from_config("timer_precision")
     autoscaler_cls = from_config("autoscaler")
     autoscaler_cls = from_config("autoscaler")
     autoreloader_cls = from_config("autoreloader")
     autoreloader_cls = from_config("autoreloader")
     schedule_filename = from_config()
     schedule_filename = from_config()
@@ -245,6 +248,8 @@ class WorkController(configurated):
         self._finalize = Finalize(self, self.stop, exitpriority=1)
         self._finalize = Finalize(self, self.stop, exitpriority=1)
         self.pidfile = pidfile
         self.pidfile = pidfile
         self.pidlock = None
         self.pidlock = None
+        self.use_eventloop = (detect_environment() == "default" and
+                              self.app.broker_connection().is_evented)
 
 
         # Initialize boot steps
         # Initialize boot steps
         self.pool_cls = _concurrency.get_implementation(self.pool_cls)
         self.pool_cls = _concurrency.get_implementation(self.pool_cls)
@@ -333,7 +338,7 @@ class WorkController(configurated):
                 stop = getattr(component, "terminate", None) or stop
                 stop = getattr(component, "terminate", None) or stop
             stop()
             stop()
 
 
-        self.priority_timer.stop()
+        self.timer.stop()
         self.consumer.close_connection()
         self.consumer.close_connection()
 
 
         if self.pidlock:
         if self.pidlock:

+ 76 - 60
celery/worker/consumer.py

@@ -46,7 +46,7 @@ up and running.
   are acknowledged immediately and logged, so the message is not resent
   are acknowledged immediately and logged, so the message is not resent
   again, and again.
   again, and again.
 
 
-* If the task has an ETA/countdown, the task is moved to the `eta_schedule`
+* If the task has an ETA/countdown, the task is moved to the `timer`
   so the :class:`timer2.Timer` can schedule it at its
   so the :class:`timer2.Timer` can schedule it at its
   deadline. Tasks without an eta are moved immediately to the `ready_queue`,
   deadline. Tasks without an eta are moved immediately to the `ready_queue`,
   so they can be picked up by the :class:`~celery.worker.mediator.Mediator`
   so they can be picked up by the :class:`~celery.worker.mediator.Mediator`
@@ -157,19 +157,22 @@ class Component(StartStopComponent):
     name = "worker.consumer"
     name = "worker.consumer"
     last = True
     last = True
 
 
+    def Consumer(self, w):
+        return (w.consumer_cls or
+                Consumer if w.use_eventloop else BlockingConsumer)
+
     def create(self, w):
     def create(self, w):
         prefetch_count = w.concurrency * w.prefetch_multiplier
         prefetch_count = w.concurrency * w.prefetch_multiplier
-        c = w.consumer = self.instantiate(
-                w.consumer_cls, w.ready_queue, w.scheduler,
+        c = w.consumer = self.instantiate(self.Consumer(w),
+                w.ready_queue,
                 hostname=w.hostname,
                 hostname=w.hostname,
                 send_events=w.send_events,
                 send_events=w.send_events,
                 init_callback=w.ready_callback,
                 init_callback=w.ready_callback,
                 initial_prefetch_count=prefetch_count,
                 initial_prefetch_count=prefetch_count,
                 pool=w.pool,
                 pool=w.pool,
-                priority_timer=w.priority_timer,
+                timer=w.timer,
                 app=w.app,
                 app=w.app,
-                controller=w,
-                use_eventloop=w.use_eventloop)
+                controller=w)
         return c
         return c
 
 
 
 
@@ -244,16 +247,13 @@ class Consumer(object):
     move them to the ready queue for task processing.
     move them to the ready queue for task processing.
 
 
     :param ready_queue: See :attr:`ready_queue`.
     :param ready_queue: See :attr:`ready_queue`.
-    :param eta_schedule: See :attr:`eta_schedule`.
+    :param timer: See :attr:`timer`.
 
 
     """
     """
 
 
     #: The queue that holds tasks ready for immediate processing.
     #: The queue that holds tasks ready for immediate processing.
     ready_queue = None
     ready_queue = None
 
 
-    #: Timer for tasks with an ETA/countdown.
-    eta_schedule = None
-
     #: Enable/disable events.
     #: Enable/disable events.
     send_events = False
     send_events = False
 
 
@@ -295,27 +295,21 @@ class Consumer(object):
 
 
     #: A timer used for high-priority internal tasks, such
     #: A timer used for high-priority internal tasks, such
     #: as sending heartbeats.
     #: as sending heartbeats.
-    priority_timer = None
+    timer = None
 
 
     # Consumer state, can be RUN or CLOSE.
     # Consumer state, can be RUN or CLOSE.
     _state = None
     _state = None
 
 
-    #: If true then pool results and broker messages will be
-    #: handled in an event loop.
-    use_eventloop = False
-
-    def __init__(self, ready_queue, eta_schedule,
+    def __init__(self, ready_queue,
             init_callback=noop, send_events=False, hostname=None,
             init_callback=noop, send_events=False, hostname=None,
             initial_prefetch_count=2, pool=None, app=None,
             initial_prefetch_count=2, pool=None, app=None,
-            priority_timer=None, controller=None, use_eventloop=False,
-            **kwargs):
+            timer=None, controller=None, **kwargs):
         self.app = app_or_default(app)
         self.app = app_or_default(app)
         self.connection = None
         self.connection = None
         self.task_consumer = None
         self.task_consumer = None
         self.controller = controller
         self.controller = controller
         self.broadcast_consumer = None
         self.broadcast_consumer = None
         self.ready_queue = ready_queue
         self.ready_queue = ready_queue
-        self.eta_schedule = eta_schedule
         self.send_events = send_events
         self.send_events = send_events
         self.init_callback = init_callback
         self.init_callback = init_callback
         self.hostname = hostname or socket.gethostname()
         self.hostname = hostname or socket.gethostname()
@@ -323,8 +317,7 @@ class Consumer(object):
         self.event_dispatcher = None
         self.event_dispatcher = None
         self.heart = None
         self.heart = None
         self.pool = pool
         self.pool = pool
-        self.priority_timer = priority_timer or timer2.default_timer
-        self.use_eventloop = use_eventloop
+        self.timer = timer or timer2.default_timer
         pidbox_state = AttributeDict(app=self.app,
         pidbox_state = AttributeDict(app=self.app,
                                      hostname=self.hostname,
                                      hostname=self.hostname,
                                      listener=self,     # pre 2.2
                                      listener=self,     # pre 2.2
@@ -358,62 +351,60 @@ class Consumer(object):
         self.init_callback(self)
         self.init_callback(self)
 
 
         while self._state != CLOSE:
         while self._state != CLOSE:
+            self.maybe_shutdown()
             try:
             try:
                 self.reset_connection()
                 self.reset_connection()
                 self.consume_messages()
                 self.consume_messages()
             except self.connection_errors + self.channel_errors:
             except self.connection_errors + self.channel_errors:
                 error(RETRY_CONNECTION, exc_info=True)
                 error(RETRY_CONNECTION, exc_info=True)
 
 
-    def consume_messages(self):
+    def consume_messages(self, sleep=sleep, min=min, Empty=Empty):
+        """Consume messages forever (or until an exception is raised)."""
         self.task_consumer.consume()
         self.task_consumer.consume()
         debug("Ready to accept tasks!")
         debug("Ready to accept tasks!")
 
 
-        # evented version
-        if self.use_eventloop:
-            return self._eventloop()
-
-        while self._state != CLOSE and self.connection:
-            if state.should_stop:
-                raise SystemExit()
-            elif state.should_terminate:
-                raise SystemTerminate()
-            if self.qos.prev != self.qos.value:     # pragma: no cover
-                self.qos.update()
-            try:
-                self.connection.drain_events(timeout=1.0)
-            except socket.timeout:
-                pass
-            except socket.error:
-                if self._state != CLOSE:            # pragma: no cover
-                    raise
-
-    def _eventloop(self):
-        """Consume messages forever (or until an exception is raised)."""
-        on_poll_start = self.connection.transport.on_poll_start
-
-        qos = self.qos
         with self.hub as hub:
         with self.hub as hub:
-            update = hub.update
+            qos = self.qos
+            update_qos = qos.update
+            update_fds = hub.update
             fdmap = hub.fdmap
             fdmap = hub.fdmap
             poll = hub.poller.poll
             poll = hub.poller.poll
             fire_timers = hub.fire_timers
             fire_timers = hub.fire_timers
-            scheduled = hub.schedule._queue
-            update(self.connection.eventmap,
-                       self.pool.eventmap)
-            self.connection.transport.on_poll_init(hub.poller)
+            scheduled = hub.timer._queue
+            transport = self.connection.transport
+            on_poll_start = transport.on_poll_start
+
+            self.task_consumer.callbacks.append(fire_timers)
+
+            update_fds(self.connection.eventmap, self.pool.eventmap)
+            for handler, interval in self.pool.timers.iteritems():
+                self.timer.apply_interval(interval * 1000.0, handler)
+
+            def on_process_started(w):
+                hub.add(w._popen.sentinel, self.pool._pool.maintain_pool)
+            self.pool.on_process_started = on_process_started
+
+            def on_process_down(w):
+                hub.remove(w._popen.sentinel)
+            self.pool.on_process_down = on_process_down
+
+            transport.on_poll_init(hub.poller)
 
 
             while self._state != CLOSE and self.connection:
             while self._state != CLOSE and self.connection:
+                # shutdown if signal handlers told us to.
                 if state.should_stop:
                 if state.should_stop:
                     raise SystemExit()
                     raise SystemExit()
                 elif state.should_terminate:
                 elif state.should_terminate:
                     raise SystemTerminate()
                     raise SystemTerminate()
 
 
+                # fire any ready timers, this also determines
+                # when we need to wake up next.
                 time_to_sleep = fire_timers() if scheduled else 1
                 time_to_sleep = fire_timers() if scheduled else 1
 
 
-                if qos.prev != qos.value:     # pragma: no cover
-                    qos.update()
+                if qos.prev != qos.value:
+                    update_qos()
 
 
-                update(on_poll_start())
+                update_fds(on_poll_start())
                 if fdmap:
                 if fdmap:
                     for fileno, event in poll(time_to_sleep) or ():
                     for fileno, event in poll(time_to_sleep) or ():
                         try:
                         try:
@@ -457,9 +448,8 @@ class Consumer(object):
                 task.acknowledge()
                 task.acknowledge()
             else:
             else:
                 self.qos.increment()
                 self.qos.increment()
-                self.eta_schedule.apply_at(eta,
-                                           self.apply_eta_task, (task, ),
-                                           priority=6)
+                self.timer.apply_at(eta, self.apply_eta_task, (task, ),
+                                    priority=6)
         else:
         else:
             state.task_reserved(task)
             state.task_reserved(task)
             self.ready_queue.put(task)
             self.ready_queue.put(task)
@@ -645,7 +635,7 @@ class Consumer(object):
         # They can't be acked anyway, as a delivery tag is specific
         # They can't be acked anyway, as a delivery tag is specific
         # to the current channel.
         # to the current channel.
         self.ready_queue.clear()
         self.ready_queue.clear()
-        self.eta_schedule.clear()
+        self.timer.clear()
 
 
         # Re-establish the broker connection and setup the task consumer.
         # Re-establish the broker connection and setup the task consumer.
         self.connection = self._open_connection()
         self.connection = self._open_connection()
@@ -687,7 +677,7 @@ class Consumer(object):
         can tell if the worker is off-line/missing.
         can tell if the worker is off-line/missing.
 
 
         """
         """
-        self.heart = Heart(self.priority_timer, self.event_dispatcher)
+        self.heart = Heart(self.timer, self.event_dispatcher)
         self.heart.start()
         self.heart.start()
 
 
     def _open_connection(self):
     def _open_connection(self):
@@ -713,7 +703,8 @@ class Consumer(object):
             return conn
             return conn
 
 
         return conn.ensure_connection(_error_handler,
         return conn.ensure_connection(_error_handler,
-                    self.app.conf.BROKER_CONNECTION_MAX_RETRIES)
+                    self.app.conf.BROKER_CONNECTION_MAX_RETRIES,
+                    callback=self.maybe_shutdown)
 
 
     def stop(self):
     def stop(self):
         """Stop consuming.
         """Stop consuming.
@@ -731,6 +722,12 @@ class Consumer(object):
     def close(self):
     def close(self):
         self._state = CLOSE
         self._state = CLOSE
 
 
+    def maybe_shutdown(self):
+        if state.should_stop:
+            raise SystemExit()
+        elif state.should_terminate:
+            raise SystemTerminate()
+
     @property
     @property
     def info(self):
     def info(self):
         """Returns information about this consumer instance
         """Returns information about this consumer instance
@@ -745,3 +742,22 @@ class Consumer(object):
             conninfo = self.connection.info()
             conninfo = self.connection.info()
             conninfo.pop("password", None)  # don't send password.
             conninfo.pop("password", None)  # don't send password.
         return {"broker": conninfo, "prefetch_count": self.qos.value}
         return {"broker": conninfo, "prefetch_count": self.qos.value}
+
+
+class BlockingConsumer(Consumer):
+
+    def consume_messages(self):
+        self.task_consumer.consume()
+        debug("Ready to accept tasks!")
+
+        while self._state != CLOSE and self.connection:
+            self.maybe_shutdown()
+            if self.qos.prev != self.qos.value:     # pragma: no cover
+                self.qos.update()
+            try:
+                self.connection.drain_events(timeout=10.0)
+            except socket.timeout:
+                pass
+            except socket.error:
+                if self._state != CLOSE:            # pragma: no cover
+                    raise

+ 1 - 1
celery/worker/control.py

@@ -144,7 +144,7 @@ def time_limit(panel, task_name=None, hard=None, soft=None, **kwargs):
 
 
 @Panel.register
 @Panel.register
 def dump_schedule(panel, safe=False, **kwargs):
 def dump_schedule(panel, safe=False, **kwargs):
-    schedule = panel.consumer.eta_schedule.schedule
+    schedule = panel.consumer.timer.schedule
     if not schedule.queue:
     if not schedule.queue:
         logger.info("--Empty schedule--")
         logger.info("--Empty schedule--")
         return []
         return []

+ 11 - 10
celery/worker/hub.py

@@ -43,11 +43,10 @@ class BoundedSemaphore(object):
 class Hub(object):
 class Hub(object):
     eventflags = POLL_READ | POLL_ERR
     eventflags = POLL_READ | POLL_ERR
 
 
-    def __init__(self, schedule=None):
+    def __init__(self, timer=None):
         self.fdmap = {}
         self.fdmap = {}
         self.poller = poll()
         self.poller = poll()
-        self.schedule = Schedule() if schedule is None else schedule
-        self._on_event = set()
+        self.timer = Schedule() if timer is None else timer
 
 
     def __enter__(self):
     def __enter__(self):
         return self
         return self
@@ -55,12 +54,14 @@ class Hub(object):
     def __exit__(self, *exc_info):
     def __exit__(self, *exc_info):
         return self.close()
         return self.close()
 
 
-    def fire_timers(self, min_delay=10, max_delay=10):
-        while 1:
-            delay, entry = self.scheduler.next()
-            if entry is None:
-                break
-            self.schedule.apply_entry(entry)
+    def fire_timers(self, min_delay=1, max_delay=10, max_timers=10):
+        delay = None
+        if self.timer._queue:
+            for i in xrange(max_timers):
+                delay, entry = self.scheduler.next()
+                if entry is None:
+                    break
+                self.timer.apply_entry(entry)
         return min(max(delay, min_delay), max_delay)
         return min(max(delay, min_delay), max_delay)
 
 
     def add(self, fd, callback, flags=None):
     def add(self, fd, callback, flags=None):
@@ -86,4 +87,4 @@ class Hub(object):
 
 
     @cached_property
     @cached_property
     def scheduler(self):
     def scheduler(self):
-        return iter(self.schedule)
+        return iter(self.timer)

+ 31 - 10
docs/configuration.rst

@@ -612,8 +612,16 @@ If enabled (default), any queues specified that is not defined in
 CELERY_DEFAULT_QUEUE
 CELERY_DEFAULT_QUEUE
 ~~~~~~~~~~~~~~~~~~~~
 ~~~~~~~~~~~~~~~~~~~~
 
 
-The queue used by default, if no custom queue is specified.  This queue must
-be listed in :setting:`CELERY_QUEUES`.  The default is: `celery`.
+The name of the default queue used by `.apply_async` if the message has
+no route or no custom queue has been specified.
+
+
+This queue must be listed in :setting:`CELERY_QUEUES`.
+If :setting:`CELERY_QUEUES` is not specified then it this automatically
+created containing one queue entry, where this name is used as the name of
+that queue.
+
+The default is: `celery`.
 
 
 .. seealso::
 .. seealso::
 
 
@@ -625,14 +633,17 @@ CELERY_DEFAULT_EXCHANGE
 ~~~~~~~~~~~~~~~~~~~~~~~
 ~~~~~~~~~~~~~~~~~~~~~~~
 
 
 Name of the default exchange to use when no custom exchange is
 Name of the default exchange to use when no custom exchange is
-specified.  The default is: `celery`.
+specified for a key in the :setting:`CELERY_QUEUES` setting.
+
+The default is: `celery`.
 
 
 .. setting:: CELERY_DEFAULT_EXCHANGE_TYPE
 .. setting:: CELERY_DEFAULT_EXCHANGE_TYPE
 
 
 CELERY_DEFAULT_EXCHANGE_TYPE
 CELERY_DEFAULT_EXCHANGE_TYPE
 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 
 
-Default exchange type used when no custom exchange is specified.
+Default exchange type used when no custom exchange type is specified.
+for a key in the :setting:`CELERY_QUEUES` setting.
 The default is: `direct`.
 The default is: `direct`.
 
 
 .. setting:: CELERY_DEFAULT_ROUTING_KEY
 .. setting:: CELERY_DEFAULT_ROUTING_KEY
@@ -640,7 +651,9 @@ The default is: `direct`.
 CELERY_DEFAULT_ROUTING_KEY
 CELERY_DEFAULT_ROUTING_KEY
 ~~~~~~~~~~~~~~~~~~~~~~~~~~
 ~~~~~~~~~~~~~~~~~~~~~~~~~~
 
 
-The default routing key used when sending tasks.
+The default routing key used when no custom routing key
+is specified for a key in the :setting:`CELERY_QUEUES` setting.
+
 The default is: `celery`.
 The default is: `celery`.
 
 
 .. setting:: CELERY_DEFAULT_DELIVERY_MODE
 .. setting:: CELERY_DEFAULT_DELIVERY_MODE
@@ -1007,6 +1020,14 @@ A sequence of modules to import when the celery daemon starts.
 This is used to specify the task modules to import, but also
 This is used to specify the task modules to import, but also
 to import signal handlers and additional remote control commands, etc.
 to import signal handlers and additional remote control commands, etc.
 
 
+.. setting:: CELERY_INCLUDE
+
+CELERY_INCLUDE
+~~~~~~~~~~~~~~
+
+Exact same semantics as :setting:`CELERY_IMPORTS`, but can be used as a means
+to have different import categories.
+
 .. setting:: CELERYD_FORCE_EXECV
 .. setting:: CELERYD_FORCE_EXECV
 
 
 CELERYD_FORCE_EXECV
 CELERYD_FORCE_EXECV
@@ -1099,10 +1120,10 @@ Can also be set via the :option:`--statedb` argument to
 
 
 Not enabled by default.
 Not enabled by default.
 
 
-.. setting:: CELERYD_ETA_SCHEDULER_PRECISION
+.. setting:: CELERYD_TIMER_PRECISION
 
 
-CELERYD_ETA_SCHEDULER_PRECISION
-~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+CELERYD_TIMER_PRECISION
+~~~~~~~~~~~~~~~~~~~~~~~
 
 
 Set the maximum time in seconds that the ETA scheduler can sleep between
 Set the maximum time in seconds that the ETA scheduler can sleep between
 rechecking the schedule.  Default is 1 second.
 rechecking the schedule.  Default is 1 second.
@@ -1471,9 +1492,9 @@ CELERYD_MEDIATOR
 Name of the mediator class used by the worker.
 Name of the mediator class used by the worker.
 Default is :class:`celery.worker.controllers.Mediator`.
 Default is :class:`celery.worker.controllers.Mediator`.
 
 
-.. setting:: CELERYD_ETA_SCHEDULER
+.. setting:: CELERYD_TIMER
 
 
-CELERYD_ETA_SCHEDULER
+CELERYD_TIMER
 ~~~~~~~~~~~~~~~~~~~~~
 ~~~~~~~~~~~~~~~~~~~~~
 
 
 Name of the ETA scheduler class used by the worker.
 Name of the ETA scheduler class used by the worker.

+ 5 - 1
docs/getting-started/next-steps.rst

@@ -161,7 +161,11 @@ Sometimes you want to specify a callback that does not take
 additional arguments, and in that case you can set the subtask
 additional arguments, and in that case you can set the subtask
 to be immutable::
 to be immutable::
 
 
-    >>> add.s(2, 2).link( reset_buffers.subtask(immutable=True) )
+    >>> add.apply_async((2, 2), link=reset_buffers.subtask(immutable=True))
+
+The ``.si()`` shortcut can also be used to create immutable subtasks::
+
+    >>> add.apply_async((2, 2), link=reset_buffers.si())
 
 
 Only the execution options can be set when a subtask is immutable,
 Only the execution options can be set when a subtask is immutable,
 and it's not possible to apply the subtask with partial args/kwargs.
 and it's not possible to apply the subtask with partial args/kwargs.

+ 1 - 1
docs/includes/introduction.txt

@@ -1,4 +1,4 @@
-:Version: 2.6.0rc2
+:Version: 2.6.0rc3
 :Web: http://celeryproject.org/
 :Web: http://celeryproject.org/
 :Download: http://pypi.python.org/pypi/celery/
 :Download: http://pypi.python.org/pypi/celery/
 :Source: http://github.com/ask/celery/
 :Source: http://github.com/ask/celery/

+ 0 - 11
docs/internals/reference/celery.app.state.rst

@@ -1,11 +0,0 @@
-================================
- celery.app.state
-================================
-
-.. contents::
-    :local:
-.. currentmodule:: celery.app.state
-
-.. automodule:: celery.app.state
-    :members:
-    :undoc-members:

+ 0 - 1
docs/internals/reference/index.rst

@@ -41,7 +41,6 @@
     celery.task.trace
     celery.task.trace
     celery.app.abstract
     celery.app.abstract
     celery.app.annotations
     celery.app.annotations
-    celery.app.state
     celery.app.routes
     celery.app.routes
     celery.security.certificate
     celery.security.certificate
     celery.security.key
     celery.security.key

+ 0 - 2
docs/reference/index.rst

@@ -21,9 +21,7 @@
     celery.task
     celery.task
     celery.task.base
     celery.task.base
     celery.task.sets
     celery.task.sets
-    celery.task.chords
     celery.result
     celery.result
-    celery.task.control
     celery.task.http
     celery.task.http
     celery.schedules
     celery.schedules
     celery.signals
     celery.signals

+ 1 - 1
docs/userguide/groups.rst

@@ -247,7 +247,7 @@ Example implementation:
     def unlock_chord(taskset, callback, interval=1, max_retries=None):
     def unlock_chord(taskset, callback, interval=1, max_retries=None):
         if taskset.ready():
         if taskset.ready():
             return subtask(callback).delay(taskset.join())
             return subtask(callback).delay(taskset.join())
-        unlock_chord.retry(countdown=interval, max_retries=max_retries)
+        raise unlock_chord.retry(countdown=interval, max_retries=max_retries)
 
 
 
 
 This is used by all result backends except Redis and Memcached, which increment a
 This is used by all result backends except Redis and Memcached, which increment a

+ 18 - 1
docs/userguide/periodic-tasks.rst

@@ -100,7 +100,7 @@ Crontab schedules
 
 
 If you want more control over when the task is executed, for
 If you want more control over when the task is executed, for
 example, a particular time of day or day of the week, you can use
 example, a particular time of day or day of the week, you can use
-the `crontab` schedule type:
+the :class:`~celery.schedules.crontab` schedule type:
 
 
 .. code-block:: python
 .. code-block:: python
 
 
@@ -156,6 +156,23 @@ The syntax of these crontab expressions are very flexible.  Some examples:
 | ``crontab(minute=0, hour="*/3,8-17")``  | Execute every hour divisible by 3, and     |
 | ``crontab(minute=0, hour="*/3,8-17")``  | Execute every hour divisible by 3, and     |
 |                                         | every hour during office hours (8am-5pm).  |
 |                                         | every hour during office hours (8am-5pm).  |
 +-----------------------------------------+--------------------------------------------+
 +-----------------------------------------+--------------------------------------------+
+| ``crontab(day_of_month="2")``           | Execute on the second day of every month.  |
+|                                         |                                            |
++-----------------------------------------+--------------------------------------------+
+| ``crontab(day_of_month="2-30/3")``      | Execute on every even numbered day.        |
+|                                         |                                            |
++-----------------------------------------+--------------------------------------------+
+| ``crontab(day_of_month="1-7,15-21")``   | Execute on the first and third weeks of    |
+|                                         | the month.                                 |
++-----------------------------------------+--------------------------------------------+
+| ``crontab(day_of_month="11",``          | Execute on 11th of May every year.         |
+|         ``month_of_year="5")``          |                                            |
++-----------------------------------------+--------------------------------------------+
+| ``crontab(month_of_year="*/3")``        | Execute on the first month of every        |
+|                                         | quarter.                                   |
++-----------------------------------------+--------------------------------------------+
+
+See :class:`celery.schedules.crontab` for more documentation.
 
 
 .. _beat-timezones:
 .. _beat-timezones:
 
 

+ 3 - 3
docs/userguide/tasks.rst

@@ -133,7 +133,7 @@ of temporary failure.
             twitter = Twitter(oauth)
             twitter = Twitter(oauth)
             twitter.update_status(tweet)
             twitter.update_status(tweet)
         except (Twitter.FailWhaleError, Twitter.LoginError), exc:
         except (Twitter.FailWhaleError, Twitter.LoginError), exc:
-            send_twitter_status.retry(exc=exc)
+            raise send_twitter_status.retry(exc=exc)
 
 
 Here we used the `exc` argument to pass the current exception to
 Here we used the `exc` argument to pass the current exception to
 :meth:`@-Task.retry`. At each step of the retry this exception
 :meth:`@-Task.retry`. At each step of the retry this exception
@@ -173,8 +173,8 @@ override this default.
         try:
         try:
             ...
             ...
         except Exception, exc:
         except Exception, exc:
-            add.retry(exc=exc, countdown=60)  # override the default and
-                                              # retry in 1 minute
+            raise add.retry(exc=exc, countdown=60)  # override the default and
+                                                    # retry in 1 minute
 
 
 .. _task-options:
 .. _task-options: