Browse Source

[synack] Write NACKs and flush acks at connection restart

Ask Solem 12 years ago
parent
commit
b2d7c1e699
2 changed files with 34 additions and 8 deletions
  1. 29 8
      celery/concurrency/processes.py
  2. 5 0
      funtests/stress/stress.py

+ 29 - 8
celery/concurrency/processes.py

@@ -24,7 +24,9 @@ from time import sleep, time
 from billiard import forking_enable
 from billiard import pool as _pool
 from billiard.exceptions import WorkerLostError
-from billiard.pool import RUN, CLOSE, TERMINATE, ACK, WorkersJoined, CoroStop
+from billiard.pool import (
+    RUN, CLOSE, TERMINATE, ACK, NACK, WorkersJoined, CoroStop,
+)
 from billiard.queues import _SimpleQueue
 from kombu.serialization import pickle as _pickle
 from kombu.utils import fxrange
@@ -506,7 +508,7 @@ class TaskPool(BasePool):
         self._pool.on_process_down = on_process_down
 
         class Ack(object):
-            _write_to = None
+            __slots__ = ('id', 'fd', '_payload')
 
             def __init__(self, id, fd, payload):
                 self.id = id
@@ -600,14 +602,23 @@ class TaskPool(BasePool):
                     callback.args = (cor, )  # tricky as we need to pass ref
                     hub_add((ready_fd, ), cor, WRITE)
 
-        ACK_BODY = dumps((ACK, (0, )), protocol=protocol)
-        ACK_SIZE = len(ACK_BODY)
-        ACK_HEAD = pack('>I', ACK_SIZE)
+        def _create_payload(type_, args):
+            body = dumps((type_, args), protocol=protocol)
+            size = len(body)
+            header = pack('>I', size)
+            return header, body, size
 
-        def send_ack(pid, i, fd):
-            msg = Ack(i, fd, (ACK_HEAD, ACK_BODY, ACK_SIZE))
+        MESSAGES = {ACK: _create_payload(ACK, (0, )),
+                    NACK: _create_payload(NACK, (0, ))}
+
+        def send_ack(response, pid, i, fd):
+            msg = Ack(i, fd, MESSAGES[response])
+            callback = promise(write_generator_gone)
+            cor = _write_ack(fd, msg, callback=callback)
+            mark_write_gen_as_active(cor)
             mark_write_fd_as_active(fd)
-            hub_add((fd, ), _write_ack(fd, msg), WRITE)
+            callback.args = (cor, )
+            hub_add((fd, ), cor, WRITE)
         self._pool.send_ack = send_ack
 
         def on_poll_start(hub):
@@ -630,9 +641,19 @@ class TaskPool(BasePool):
             self._pool._timeout_handler.handle_event()
 
     def flush(self):
+        # cancel all tasks that have not been accepted to that NACK is sent.
+        for job in values(self._pool._pool):
+            if not job._accepted:
+                job.cancel()
+
+        # clear the outgoing buffer as the tasks will be redelivered by
+        # the broker anyway.
         if self.outbound_buffer:
             self.outbound_buffer.clear()
         try:
+            # but we must continue to write the payloads we already started
+            # otherwise we will not recover the message boundaries.
+            # the messages will be NACK'ed later.
             if self._pool._state == RUN:
                 # flush outgoing buffers
                 intervals = fxrange(0.01, 0.1, 0.01, repeatlast=True)

+ 5 - 0
funtests/stress/stress.py

@@ -57,6 +57,11 @@ def kill(sig=signal.SIGKILL):
     os.kill(os.getpid(), sig)
 
 
+@celery.task
+def sleeping(i):
+    sleep(i)
+
+
 @celery.task
 def segfault():
     import ctypes