Browse Source

event hub now supports writers

Ask Solem 13 years ago
parent
commit
2d93e31360
1 changed files with 13 additions and 3 deletions
  1. 13 3
      celery/worker/hub.py

+ 13 - 3
celery/worker/hub.py

@@ -3,7 +3,7 @@ from __future__ import absolute_import
 from collections import deque
 
 from kombu.utils import cached_property
-from kombu.utils.eventio import poll, POLL_READ, POLL_ERR
+from kombu.utils.eventio import poll, POLL_READ, POLL_ERR, POLL_WRITE
 
 from celery.utils.timer2 import Schedule
 
@@ -72,8 +72,17 @@ class Hub(object):
             fileno = fd
         self.fdmap[fileno] = callback
 
-    def update(self, *maps):
-        [self.add(*x) for row in maps for x in row.iteritems()]
+    def add_reader(self, fd, callback):
+        return self.add(fd, callback, POLL_READ|POLL_ERR)
+
+    def add_writer(self, fd, callback):
+        return self.add(fd, callback, POLL_WRITE)
+
+    def update_readers(self, *maps):
+        [self.add_reader(*x) for row in maps for x in row.iteritems()]
+
+    def update_writers(self, *maps):
+        [self.add_writer(*x) for row in maps for x in row.iteritems()]
 
     def remove(self, fd):
         try:
@@ -83,6 +92,7 @@ class Hub(object):
 
     def close(self):
         [self.remove(fd) for fd in self.fdmap.keys()]
+        self.poller.close()
 
     @cached_property
     def scheduler(self):