Przeglądaj źródła

[synack] acks must always be flushed at restart (even if not started)

Ask Solem 12 lat temu
rodzic
commit
ddc0a2fd18
2 zmienionych plików z 9 dodań i 3 usunięć
  1. 8 3
      celery/concurrency/processes.py
  2. 1 0
      celery/worker/consumer.py

+ 8 - 3
celery/concurrency/processes.py

@@ -551,7 +551,7 @@ class TaskPool(BasePool):
                     callback()
                 active_writes.discard(fd)
 
-        def _write_to(fd, job, callback=None):
+        def _write_job(fd, job, callback=None):
             header, body, body_size = job._payload
             try:
                 try:
@@ -596,7 +596,7 @@ class TaskPool(BasePool):
             else:
                 if not job._accepted:
                     callback = promise(write_generator_gone)
-                    cor = _write_to(ready_fd, job, callback=callback)
+                    cor = _write_job(ready_fd, job, callback=callback)
                     mark_write_gen_as_active(cor)
                     mark_write_fd_as_active(ready_fd)
                     callback.args = (cor, )  # tricky as we need to pass ref
@@ -660,7 +660,12 @@ class TaskPool(BasePool):
                 while self._active_writers:
                     writers = list(self._active_writers)
                     for gen in writers:
-                        if gen.gi_frame.f_lasti != -1:  # generator started?
+                        if (gen.__name__ == '_write_job' and
+                                gen.gi_frame.f_lasti != -1):
+                            # has not started writing the job so can
+                            # safely discard
+                            self._active_writers.discard(gen)
+                        else:
                             try:
                                 next(gen)
                             except StopIteration:

+ 1 - 0
celery/worker/consumer.py

@@ -234,6 +234,7 @@ class Consumer(object):
                         self.connection.collect()
                     except Exception:
                         pass
+                    self.on_close()
                     ns.restart(self)
 
     def shutdown(self):