Ver Fonte

Small fixes

Ask Solem há 12 anos atrás
pai
commit
247674ceca

+ 27 - 24
celery/worker/consumer.py

@@ -423,32 +423,35 @@ class Consumer(object):
                 # the number of seconds until we need to fire timers again.
                 poll_timeout = fire_timers() if scheduled else 1
 
-                if qos.prev != qos.value:
-                    update_qos()
-
                 update_readers(on_poll_start())
                 if readers or writers:
                     connection.more_to_read = True
-                    for fileno, event in poll(poll_timeout) or ():
-                        try:
-                            if event & READ:
-                                readers[fileno](fileno, event)
-                            if event & WRITE:
-                                writers[fileno](fileno, event)
-                            if event & ERR:
-                                for handlermap in readers, writers:
-                                    try:
-                                        handlermap[fileno](fileno, event)
-                                    except KeyError:
-                                        pass
-                        except Empty:
-                            break
-                        except socket.error:
-                            if self._state != CLOSE:  # pragma: no cover
-                                raise
-                    while keep_draining and connection.more_to_read:
-                        drain_nowait()
-                    poll_timeout = 0
+                    while connection.more_to_read:
+                        if qos.prev != qos.value:
+                            update_qos()
+
+                        for fileno, event in poll(poll_timeout) or ():
+                            try:
+                                if event & READ:
+                                    readers[fileno](fileno, event)
+                                if event & WRITE:
+                                    writers[fileno](fileno, event)
+                                if event & ERR:
+                                    for handlermap in readers, writers:
+                                        try:
+                                            handlermap[fileno](fileno, event)
+                                        except KeyError:
+                                            pass
+                            except Empty:
+                                continue
+                            except socket.error:
+                                if self._state != CLOSE:  # pragma: no cover
+                                    raise
+                        if keep_draining:
+                            drain_nowait()
+                            poll_timeout = 0
+                        else:
+                            connection.more_to_read = False
                 else:
                     # no sockets yet, startup is probably not done.
                     sleep(min(poll_timeout, 0.1))
@@ -464,7 +467,7 @@ class Consumer(object):
             return
 
         if self._does_info:
-            info('Got task from broker: %s', task.shortinfo())
+            info('Got task from broker: %s', task)
 
         if self.event_dispatcher.enabled:
             self.event_dispatcher.send('task-received', uuid=task.id,

+ 4 - 5
celery/worker/job.py

@@ -412,12 +412,12 @@ class Request(object):
                 'delivery_info': self.delivery_info,
                 'worker_pid': self.worker_pid}
 
-    def shortinfo(self):
+    def __str__(self):
         return '%s[%s]%s%s' % (
                     self.name, self.id,
                     ' eta:[%s]' % (self.eta, ) if self.eta else '',
                     ' expires:[%s]' % (self.expires, ) if self.expires else '')
-    __str__ = shortinfo
+    shortinfo == __str__
 
     def __repr__(self):
         return '<%s %s: %s>' % (type(self).__name__, self.id,
@@ -432,21 +432,20 @@ class Request(object):
     @property
     def store_errors(self):
         return (not self.task.ignore_result
-                or self.task.store_errors_even_if_ignored)
+                 or self.task.store_errors_even_if_ignored)
 
     def _compat_get_task_id(self):
         return self.id
 
     def _compat_set_task_id(self, value):
         self.id = value
+    task_id = property(_compat_get_task_id, _compat_set_task_id)
 
     def _compat_get_task_name(self):
         return self.name
 
     def _compat_set_task_name(self, value):
         self.name = value
-
-    task_id = property(_compat_get_task_id, _compat_set_task_id)
     task_name = property(_compat_get_task_name, _compat_set_task_name)
 
 

+ 1 - 0
celery/worker/mediator.py

@@ -45,6 +45,7 @@ class WorkerComponent(StartStopComponent):
 
 
 class Mediator(bgThread):
+    """Mediator thread."""
 
     #: The task queue, a :class:`~Queue.Queue` instance.
     ready_queue = None

+ 6 - 0
celery/worker/state.py

@@ -120,6 +120,12 @@ if C_BENCH:  # pragma: no cover
 
 
 class Persistent(object):
+    """This is the persistent data stored by the worker when
+    :option:`--statedb` is enabled.
+
+    It currently only stores revoked task id's.
+
+    """
     storage = shelve
     _is_open = False
 

+ 0 - 1
celery/worker/strategy.py

@@ -23,5 +23,4 @@ def default(task, app, consumer):
                          eventer=eventer, task=task,
                          connection_errors=connection_errors,
                          delivery_info=message.delivery_info))
-
     return task_message_handler