Browse Source

[block-detect] cleanup branch

Ask Solem 12 years ago
parent
commit
69e8e614f6
3 changed files with 43 additions and 50 deletions
  1. 1 1
      celery/utils/debug.py
  2. 41 44
      celery/worker/consumer.py
  3. 1 5
      celery/worker/job.py

+ 1 - 1
celery/utils/debug.py

@@ -33,7 +33,7 @@ def _on_blocking(signum, frame):
 
 
 @contextmanager
-def blocking_detection(timeout):
+def blockdetection(timeout):
     if not timeout:
         yield
     else:

+ 41 - 44
celery/worker/consumer.py

@@ -88,7 +88,6 @@ from celery.app import app_or_default
 from celery.datastructures import AttributeDict
 from celery.exceptions import InvalidTaskError, SystemTerminate
 from celery.task.trace import build_tracer
-from celery.utils.debug import blocking_detection
 from celery.utils import text
 from celery.utils import timer2
 from celery.utils.functional import noop
@@ -456,50 +455,48 @@ class Consumer(object):
                 # the number of seconds until we need to fire timers again.
                 poll_timeout = (fire_timers(propagate=errors) if scheduled
                                 else 1)
-                print('POLL TIMEOUT: %r' % (poll_timeout, ))
-                with blocking_detection(10):
-
-                    # We only update QoS when there is no more messages to read.
-                    # This groups together qos calls, and makes sure that remote
-                    # control commands will be prioritized over task messages.
-                    if qos.prev != qos.value:
-                        update_qos()
-    
-                    update_readers(on_poll_start())
-                    if readers or writers:
-                        connection.more_to_read = True
-                        while connection.more_to_read:
+
+                # We only update QoS when there is no more messages to read.
+                # This groups together qos calls, and makes sure that remote
+                # control commands will be prioritized over task messages.
+                if qos.prev != qos.value:
+                    update_qos()
+
+                update_readers(on_poll_start())
+                if readers or writers:
+                    connection.more_to_read = True
+                    while connection.more_to_read:
+                        try:
+                            events = poll(poll_timeout)
+                        except ValueError:  # Issue 882
+                            return
+                        if not events:
+                            on_poll_empty()
+                        for fileno, event in events or ():
                             try:
-                                events = poll(poll_timeout)
-                            except ValueError:  # Issue 882
-                                return
-                            if not events:
-                                on_poll_empty()
-                            for fileno, event in events or ():
-                                try:
-                                    if event & READ:
-                                        readers[fileno](fileno, event)
-                                    if event & WRITE:
-                                        writers[fileno](fileno, event)
-                                    if event & ERR:
-                                        for handlermap in readers, writers:
-                                            try:
-                                                handlermap[fileno](fileno, event)
-                                            except KeyError:
-                                                pass
-                                except (KeyError, Empty):
-                                    continue
-                                except socket.error:
-                                    if self._state != CLOSE:  # pragma: no cover
-                                        raise
-                            if keep_draining:
-                                drain_nowait()
-                                poll_timeout = 0
-                            else:
-                                connection.more_to_read = False
-                    else:
-                        # no sockets yet, startup is probably not done.
-                        sleep(min(poll_timeout, 0.1))
+                                if event & READ:
+                                    readers[fileno](fileno, event)
+                                if event & WRITE:
+                                    writers[fileno](fileno, event)
+                                if event & ERR:
+                                    for handlermap in readers, writers:
+                                        try:
+                                            handlermap[fileno](fileno, event)
+                                        except KeyError:
+                                            pass
+                            except (KeyError, Empty):
+                                continue
+                            except socket.error:
+                                if self._state != CLOSE:  # pragma: no cover
+                                    raise
+                        if keep_draining:
+                            drain_nowait()
+                            poll_timeout = 0
+                        else:
+                            connection.more_to_read = False
+                else:
+                    # no sockets yet, startup is probably not done.
+                    sleep(min(poll_timeout, 0.1))
 
     def on_task(self, task, task_reserved=task_reserved,
                 to_system_tz=timezone.to_system):

+ 1 - 5
celery/worker/job.py

@@ -374,11 +374,7 @@ class Request(object):
             # time to write the result.
             if self.store_errors:
                 if isinstance(exc, exceptions.WorkerLostError):
-                    print('+++ WRITE RESULT')
-                    try:
-                        self.task.backend.mark_as_failure(self.id, exc)
-                    finally:
-                        print('--- WRITE RESULT')
+                    self.task.backend.mark_as_failure(self.id, exc)
                 elif isinstance(exc, exceptions.Terminated):
                     self._announce_revoked('terminated', True, str(exc), False)
             # (acks_late) acknowledge after result stored.