Browse Source

celery.process.ProcessQueue moved to celery.datastructures.TaskProcessQueue

Ask Solem 16 years ago
parent
commit
d84f3dba77
3 changed files with 24 additions and 27 deletions
  1. 22 0
      celery/datastructures.py
  2. 0 25
      celery/process.py
  3. 2 2
      celery/worker.py

+ 22 - 0
celery/datastructures.py

@@ -29,3 +29,25 @@ class PositionQueue(UserList):
                       self)
 
         
+class TaskProcessQueue(UserList):
+    """Queue of running child processes, which starts waiting for the
+    processes to finish when the queue limit is reached."""
+
+    def __init__(self, limit, logger=None, done_msg=None):
+        self.limit = limit
+        self.logger = logger
+        self.done_msg = done_msg
+        self.data = []
+        
+    def add(self, result, task_name, task_id):
+        self.data.append([result, task_name, task_id])
+
+        if self.data and len(self.data) >= self.limit:
+            for result, task_name, task_id in self.data:
+                ret_value = result.get()
+                if self.done_msg and self.logger:
+                    self.logger.info(self.done_msg % {
+                        "name": task_name,
+                        "id": task_id,
+                        "return_value": ret_value})
+            self.data = []

+ 0 - 25
celery/process.py

@@ -1,25 +0,0 @@
-from UserList import UserList
-
-
-class ProcessQueue(UserList):
-    """Queue of running child processes, which starts waiting for the
-    processes to finish when the queue limit is reached."""
-
-    def __init__(self, limit, logger=None, done_msg=None):
-        self.limit = limit
-        self.logger = logger
-        self.done_msg = done_msg
-        self.data = []
-        
-    def add(self, result, task_name, task_id):
-        self.data.append([result, task_name, task_id])
-
-        if self.data and len(self.data) >= self.limit:
-            for result, task_name, task_id in self.data:
-                ret_value = result.get()
-                if self.done_msg and self.logger:
-                    self.logger.info(self.done_msg % {
-                        "name": task_name,
-                        "id": task_id,
-                        "return_value": ret_value})
-            self.data = []

+ 2 - 2
celery/worker.py

@@ -4,7 +4,7 @@ from celery.conf import DAEMON_CONCURRENCY, DAEMON_LOG_FILE
 from celery.conf import QUEUE_WAKEUP_AFTER, EMPTY_MSG_EMIT_EVERY
 from celery.log import setup_logger
 from celery.registry import tasks
-from celery.process import ProcessQueue
+from celery.datastructures import TaskProcessQueue
 from celery.models import PeriodicTaskMeta
 from celery.task import mark_as_done, mark_as_failure
 import multiprocessing
@@ -164,7 +164,7 @@ class TaskDaemon(object):
 
     def run(self):
         """The worker server's main loop."""
-        results = ProcessQueue(self.concurrency, logger=self.logger,
+        results = TaskProcessQueue(self.concurrency, logger=self.logger,
                 done_msg="Task %(name)s[%(id)s] processed: %(return_value)s")
         log_wait = lambda: self.logger.info("Waiting for queue...")
         ev_msg_waiting = EventTimer(log_wait, self.empty_msg_emit_every)