Browse Source

Fix tyrant and reset to using multiprocessing.Pool but using a new algorithm

Ask Solem 16 years ago
parent
commit
27268af673
5 changed files with 43 additions and 34 deletions
  1. 1 1
      README.rst
  2. 1 1
      celery/__init__.py
  3. 5 5
      celery/backends/tyrant.py
  4. 26 18
      celery/datastructures.py
  5. 10 9
      celery/worker.py

+ 1 - 1
README.rst

@@ -2,7 +2,7 @@
 celery - Distributed Task Queue for Django.
 ============================================
 
-:Version: 0.2.0
+:Version: 0.2.2
 
 Introduction
 ============

+ 1 - 1
celery/__init__.py

@@ -1,5 +1,5 @@
 """Distributed Task Queue for Django"""
-VERSION = (0, 2, 1)
+VERSION = (0, 2, 2)
 __version__ = ".".join(map(str, VERSION))
 __author__ = "Ask Solem"
 __contact__ = "askh@opera.com"

+ 5 - 5
celery/backends/tyrant.py

@@ -38,15 +38,15 @@ class Backend(BaseBackend):
         :setting:`TT_HOST` or :setting:`TT_PORT` is not set.
 
         """
-        self.tyrant_host = kwargs.get("tyrant_host",
-                            getattr(settings, "TT_HOST", self.tyrant_host))
-        self.tyrant_port = kwargs.get("tyrant_port",
-                            getattr(settings, "TT_PORT", self.tyrant_port))
+        self.tyrant_host = tyrant_host or \
+                            getattr(settings, "TT_HOST", self.tyrant_host)
+        self.tyrant_port = tyrant_port or \
+                            getattr(settings, "TT_PORT", self.tyrant_port)
         if not self.tyrant_host or not self.tyrant_port:
             raise ImproperlyConfigured(
                 "To use the Tokyo Tyrant backend, you have to "
                 "set the TT_HOST and TT_PORT settings in your settings.py")
-        super(Backend, self).__init__(*args, **kwargs)
+        super(Backend, self).__init__()
         self._cache = {}
 
     def get_server(self):

+ 26 - 18
celery/datastructures.py

@@ -162,27 +162,35 @@ class TaskProcessQueue(UserList):
         self.data.append([result, task_name, task_id])
 
         if self.data and len(self.data) >= self.limit:
-            self.collect()
+            self.wait_for_result()
+        else:
+            self.reap()
 
-    def collect(self):
+
+    def wait_for_result(self):
         """Collect results from processes that are ready."""
-        processes_joined = 0
-        while not processes_joined:
-            for process_no, process_info in enumerate(self.data):
-                result, task_name, task_id = process_info
-                if result.ready():
-                    try:
-                        self.on_ready(result, task_name, task_id)
-                    except multiprocessing.TimeoutError:
-                        pass
-                    else:
-                        del(self[i])
-                        processed_join += 1
-
-    def on_ready(self, result, task_name, task_id):
-        ret_value = result.get(timeout=self.process_timeout)
+        while True:
+            if self.reap():
+                break
+
+    def reap(self):
+        processes_reaped = 0
+        for process_no, process_info in enumerate(self.data):
+            result, task_name, task_id = process_info
+            try:
+                ret_value = result.get(timeout=0.1)
+            except multiprocessing.TimeoutError:
+                continue
+            else:
+                del(self[process_no])
+                self.on_ready(ret_value, task_name, task_id)
+                processes_reaped += 1
+        return processes_reaped
+
+
+    def on_ready(self, ret_value, task_name, task_id):
         if self.done_msg and self.logger:
-            self.logger_info(self.done_msg % {
+            self.logger.info(self.done_msg % {
                 "name": task_name,
                 "id": task_id,
                 "return_value": ret_value})

+ 10 - 9
celery/worker.py

@@ -5,7 +5,7 @@ from celery.conf import DAEMON_CONCURRENCY, DAEMON_LOG_FILE
 from celery.conf import QUEUE_WAKEUP_AFTER, EMPTY_MSG_EMIT_EVERY
 from celery.log import setup_logger
 from celery.registry import tasks
-from celery.datastructures import TaskWorkerPool
+from celery.datastructures import TaskProcessQueue
 from celery.models import PeriodicTaskMeta
 from celery.backends import default_backend
 from celery.timer import EventTimer
@@ -162,9 +162,8 @@ class TaskWrapper(object):
 
         """
         task_func_kwargs = self.extend_with_default_kwargs(loglevel, logfile)
-        jail_args = [self.task_id, self.task_func, self.args,
-                     task_func_kwargs]
-        return pool.add(jail, jail_args, self.task_name, self.task_id)
+        return pool.apply_async(jail, [self.task_id, self.task_func,
+                                       self.args, task_func_kwargs])
 
 
 class TaskDaemon(object):
@@ -232,8 +231,7 @@ class TaskDaemon(object):
         self.queue_wakeup_after = queue_wakeup_after or \
                                     self.queue_wakeup_after
         self.logger = setup_logger(loglevel, logfile)
-        self.pool = TaskWorkerPool(self.concurrency, logger=self.logger,
-                done_msg="Task %(name)s[%(id)s] processed: %(return_value)s")
+        self.pool = multiprocessing.Pool(self.concurrency)
         self.task_consumer = None
         self.reset_connection()
 
@@ -331,12 +329,13 @@ class TaskDaemon(object):
 
     def run(self):
         """Starts the workers main loop."""
+        results = TaskProcessQueue(self.concurrency, logger=self.logger,
+                done_msg="Task %(name)s[%(id)s] processed: %(return_value)s")
         log_wait = lambda: self.logger.info("Waiting for queue...")
         ev_msg_waiting = EventTimer(log_wait, self.empty_msg_emit_every)
         events = [
-            EventTimer(self.run_periodic_tasks, 2),
-            EventTimer(self.schedule_retry_tasks, 4),
-            EventTimer(self.pool.reap, 2),
+            EventTimer(self.run_periodic_tasks, 1),
+            EventTimer(self.schedule_retry_tasks, 2),
         ]
 
         while True:
@@ -358,3 +357,5 @@ class TaskDaemon(object):
                 self.logger.critical("Message queue raised %s: %s\n%s" % (
                              e.__class__, e, traceback.format_exc()))
                 continue
+
+            results.add(result, task_name, task_id)