Ask Solem 15 éve
szülő
commit
7985de1f53

+ 1 - 4
celery/decorators.py

@@ -16,7 +16,6 @@ def task(**options):
         def refresh_feed(url):
             return Feed.objects.get(url=url).refresh()
 
-
     With setting extra options and using retry.
 
     .. code-block:: python
@@ -28,7 +27,7 @@ def task(**options):
             except socket.error, exc:
                 refresh_feed.retry(args=[url], kwargs=kwargs, exc=exc)
 
-    Calling the resulting task.
+    Calling the resulting task:
 
         >>> refresh_feed("http://example.com/rss") # Regular
         <Feed: http://example.com/rss>
@@ -55,8 +54,6 @@ def task(**options):
 def periodic_task(**options):
     """Task decorator to create a periodic task.
 
-    **Usage**
-
     Run a task once every day:
 
     .. code-block:: python

+ 18 - 18
celery/tests/test_worker.py

@@ -102,12 +102,12 @@ class TestCarrotListener(unittest.TestCase):
 
     def setUp(self):
         self.ready_queue = Queue()
-        self.eta_scheduler = Scheduler(self.ready_queue)
+        self.eta_schedule = Scheduler(self.ready_queue)
         self.logger = get_logger()
         self.logger.setLevel(0)
 
     def test_connection(self):
-        l = CarrotListener(self.ready_queue, self.eta_scheduler, self.logger,
+        l = CarrotListener(self.ready_queue, self.eta_schedule, self.logger,
                            send_events=False)
 
         c = l.reset_connection()
@@ -125,7 +125,7 @@ class TestCarrotListener(unittest.TestCase):
         self.assertTrue(l.task_consumer is None)
 
     def test_receieve_message(self):
-        l = CarrotListener(self.ready_queue, self.eta_scheduler, self.logger,
+        l = CarrotListener(self.ready_queue, self.eta_schedule, self.logger,
                            send_events=False)
         backend = MockBackend()
         m = create_message(backend, task=foo_task.name,
@@ -138,11 +138,11 @@ class TestCarrotListener(unittest.TestCase):
         self.assertTrue(isinstance(in_bucket, TaskWrapper))
         self.assertEquals(in_bucket.task_name, foo_task.name)
         self.assertEquals(in_bucket.execute(), 2 * 4 * 8)
-        self.assertTrue(self.eta_scheduler.empty())
+        self.assertTrue(self.eta_schedule.empty())
 
     def test_revoke(self):
         ready_queue = Queue()
-        l = CarrotListener(ready_queue, self.eta_scheduler, self.logger,
+        l = CarrotListener(ready_queue, self.eta_schedule, self.logger,
                            send_events=False)
         backend = MockBackend()
         id = gen_unique_id()
@@ -159,7 +159,7 @@ class TestCarrotListener(unittest.TestCase):
         self.assertTrue(ready_queue.empty())
 
     def test_receieve_message_not_registered(self):
-        l = CarrotListener(self.ready_queue, self.eta_scheduler, self.logger,
+        l = CarrotListener(self.ready_queue, self.eta_schedule, self.logger,
                           send_events=False)
         backend = MockBackend()
         m = create_message(backend, task="x.X.31x", args=[2, 4, 8], kwargs={})
@@ -167,10 +167,10 @@ class TestCarrotListener(unittest.TestCase):
         l.event_dispatcher = MockEventDispatcher()
         self.assertFalse(l.receive_message(m.decode(), m))
         self.assertRaises(Empty, self.ready_queue.get_nowait)
-        self.assertTrue(self.eta_scheduler.empty())
+        self.assertTrue(self.eta_schedule.empty())
 
     def test_receieve_message_eta(self):
-        l = CarrotListener(self.ready_queue, self.eta_scheduler, self.logger,
+        l = CarrotListener(self.ready_queue, self.eta_schedule, self.logger,
                           send_events=False)
         backend = MockBackend()
         m = create_message(backend, task=foo_task.name,
@@ -181,7 +181,7 @@ class TestCarrotListener(unittest.TestCase):
         l.reset_connection()
         l.receive_message(m.decode(), m)
 
-        in_hold = self.eta_scheduler.queue[0]
+        in_hold = self.eta_schedule.queue[0]
         self.assertEquals(len(in_hold), 4)
         eta, priority, task, on_accept = in_hold
         self.assertTrue(isinstance(task, TaskWrapper))
@@ -201,41 +201,41 @@ class TestWorkController(unittest.TestCase):
 
     def test_attrs(self):
         worker = self.worker
-        self.assertTrue(isinstance(worker.eta_scheduler, Scheduler))
-        self.assertTrue(worker.schedule_controller)
+        self.assertTrue(isinstance(worker.eta_schedule, Scheduler))
+        self.assertTrue(worker.scheduler)
         self.assertTrue(worker.pool)
-        self.assertTrue(worker.broker_listener)
+        self.assertTrue(worker.listener)
         self.assertTrue(worker.mediator)
         self.assertTrue(worker.components)
 
-    def test_safe_process_task(self):
+    def test_process_task(self):
         worker = self.worker
         worker.pool = MockPool()
         backend = MockBackend()
         m = create_message(backend, task=foo_task.name, args=[4, 8, 10],
                            kwargs={})
         task = TaskWrapper.from_message(m, m.decode())
-        worker.safe_process_task(task)
+        worker.process_task(task)
         worker.pool.stop()
 
-    def test_safe_process_task_raise_base(self):
+    def test_process_task_raise_base(self):
         worker = self.worker
         worker.pool = MockPool(raise_base=True)
         backend = MockBackend()
         m = create_message(backend, task=foo_task.name, args=[4, 8, 10],
                            kwargs={})
         task = TaskWrapper.from_message(m, m.decode())
-        worker.safe_process_task(task)
+        worker.process_task(task)
         worker.pool.stop()
 
-    def test_safe_process_task_raise_regular(self):
+    def test_process_task_raise_regular(self):
         worker = self.worker
         worker.pool = MockPool(raise_regular=True)
         backend = MockBackend()
         m = create_message(backend, task=foo_task.name, args=[4, 8, 10],
                            kwargs={})
         task = TaskWrapper.from_message(m, m.decode())
-        worker.safe_process_task(task)
+        worker.process_task(task)
         worker.pool.stop()
 
     def test_start_stop(self):

+ 20 - 25
celery/worker/__init__.py

@@ -89,7 +89,7 @@ class WorkController(object):
 
         Instance of :class:`celery.worker.controllers.Mediator`.
 
-    .. attribute:: broker_listener
+    .. attribute:: listener
 
         Instance of :class:`CarrotListener`.
 
@@ -117,38 +117,39 @@ class WorkController(object):
             self.ready_queue = Queue()
         else:
             self.ready_queue = TaskBucket(task_registry=registry.tasks)
-        self.eta_scheduler = Scheduler(self.ready_queue)
+        self.eta_schedule = Scheduler(self.ready_queue)
 
         self.logger.debug("Instantiating thread components...")
 
-        # Threads+Pool
-        self.schedule_controller = ScheduleController(self.eta_scheduler,
-                                                      logger=self.logger)
-        self.pool = TaskPool(self.concurrency, logger=self.logger,
+        # Threads + Pool + Consumer
+        self.pool = TaskPool(self.concurrency,
+                             logger=self.logger,
                              initializer=process_initializer)
-        self.broker_listener = CarrotListener(self.ready_queue,
-                                        self.eta_scheduler,
-                                        logger=self.logger,
-                                        send_events=send_events,
-                                        initial_prefetch_count=concurrency)
-        self.mediator = Mediator(self.ready_queue, self.safe_process_task,
+        self.mediator = Mediator(self.ready_queue,
+                                 callback=self.process_task,
                                  logger=self.logger)
-
+        self.scheduler = ScheduleController(self.eta_schedule,
+                                            logger=self.logger)
         # Need a tight loop interval when embedded so the program
         # can be stopped in a sensible short time.
         self.clockservice = self.embed_clockservice and ClockServiceThread(
                                 logger=self.logger,
                                 is_detached=self.is_detached,
                                 max_interval=1) or None
+        self.listener = CarrotListener(self.ready_queue,
+                                       self.eta_schedule,
+                                       logger=self.logger,
+                                       send_events=send_events,
+                                       initial_prefetch_count=concurrency)
 
         # The order is important here;
         #   the first in the list is the first to start,
         # and they must be stopped in reverse order.
         self.components = filter(None, (self.pool,
                                         self.mediator,
-                                        self.schedule_controller,
+                                        self.scheduler,
                                         self.clockservice,
-                                        self.broker_listener))
+                                        self.listener))
 
     def start(self):
         """Starts the workers main loop."""
@@ -162,30 +163,24 @@ class WorkController(object):
         finally:
             self.stop()
 
-    def safe_process_task(self, task):
-        """Same as :meth:`process_task`, but catches all exceptions
-        the task raises and log them as errors, to make sure the
-        worker doesn't die."""
+    def process_task(self, task):
+        """Process task by sending it to the pool of workers."""
         try:
             try:
-                self.process_task(task)
+                task.execute_using_pool(self.pool, self.loglevel,
+                                        self.logfile)
             except Exception, exc:
                 self.logger.critical("Internal error %s: %s\n%s" % (
                                 exc.__class__, exc, traceback.format_exc()))
         except (SystemExit, KeyboardInterrupt):
             self.stop()
 
-    def process_task(self, task):
-        """Process task by sending it to the pool of workers."""
-        task.execute_using_pool(self.pool, self.loglevel, self.logfile)
-
     def stop(self):
         """Gracefully shutdown the worker server."""
         if self._state != "RUN":
             return
 
         signals.worker_shutdown.send(sender=self)
-
         [component.stop() for component in reversed(self.components)]
 
         self._state = "STOP"

+ 1 - 2
celery/worker/controllers.py

@@ -20,7 +20,6 @@ class BackgroundThread(threading.Thread):
     the :meth:`stop` method.
 
     """
-    is_infinite = True
 
     def __init__(self):
         super(BackgroundThread, self).__init__()
@@ -36,7 +35,7 @@ class BackgroundThread(threading.Thread):
         """
         self.on_start()
 
-        while self.is_infinite:
+        while 1:
             if self._shutdown.isSet():
                 break
             self.on_iteration()

+ 6 - 6
celery/worker/listener.py

@@ -25,13 +25,13 @@ class CarrotListener(object):
     move them the the ready queue for task processing.
 
     :param ready_queue: See :attr:`ready_queue`.
-    :param eta_scheduler: See :attr:`eta_scheduler`.
+    :param eta_schedule: See :attr:`eta_schedule`.
 
     .. attribute:: ready_queue
 
         The queue that holds tasks ready for processing immediately.
 
-    .. attribute:: eta_scheduler
+    .. attribute:: eta_schedule
 
         Scheduler for paused tasks. Reasons for being paused include
         a countdown/eta or that it's waiting for retry.
@@ -42,12 +42,12 @@ class CarrotListener(object):
 
     """
 
-    def __init__(self, ready_queue, eta_scheduler, logger,
+    def __init__(self, ready_queue, eta_schedule, logger,
             send_events=False, initial_prefetch_count=2):
         self.amqp_connection = None
         self.task_consumer = None
         self.ready_queue = ready_queue
-        self.eta_scheduler = eta_scheduler
+        self.eta_schedule = eta_schedule
         self.send_events = send_events
         self.logger = logger
         self.hostname = socket.gethostname()
@@ -116,8 +116,8 @@ class CarrotListener(object):
             self.prefetch_count.increment()
             self.logger.info("Got task from broker: %s[%s] eta:[%s]" % (
                     task.task_name, task.task_id, eta))
-            self.eta_scheduler.enter(task, eta=eta,
-                                     callback=self.prefetch_count.decrement)
+            self.eta_schedule.enter(task, eta=eta,
+                                    callback=self.prefetch_count.decrement)
         else:
             self.logger.info("Got task from broker: %s[%s]" % (
                     task.task_name, task.task_id))