Ask Solem 8 năm trước cách đây
mục cha
commit
9b6bebf4df
2 tập tin đã thay đổi với 14 bổ sung6 xóa
  1. 8 0
      .landscape.yml
  2. 6 6
      celery/concurrency/asynpool.py

+ 8 - 0
.landscape.yml

@@ -29,3 +29,11 @@ pylint:
         - too-many-arguments
         - too-many-locals
         - redefined-builtin
+        - not-callable
+        - cyclic-import
+        - expression-not-assigned
+        - lost-exception
+        - dangerous-default-value
+        - unused-argument
+    options:
+        exclude-protected: _reader, _writer, _popen, _sentinel_poll, _job, _is_alive, _write_to, _scheduled_for, _terminated, _accepted, _set_terminated, _payload, _cancel

+ 6 - 6
celery/concurrency/asynpool.py

@@ -477,7 +477,7 @@ class AsynPool(_pool.Pool):
 
         hub.on_tick.add(self.on_poll_start)
 
-    def _create_timelimit_handlers(self, hub, now=time.time):
+    def _create_timelimit_handlers(self, hub):
         """Create handlers used to implement time limits."""
         call_later = hub.call_later
         trefs = self._tref_for_id = WeakValueDictionary()
@@ -506,7 +506,7 @@ class AsynPool(_pool.Pool):
             _discard_tref(R._job)
         self.on_timeout_cancel = on_timeout_cancel
 
-    def _on_soft_timeout(self, job, soft, hard, hub, now=time.time):
+    def _on_soft_timeout(self, job, soft, hard, hub):
         # only used by async pool.
         if hard:
             self._tref_for_id[job] = hub.call_at(
@@ -538,7 +538,7 @@ class AsynPool(_pool.Pool):
     def on_job_ready(self, job, i, obj, inqW_fd):
         self._mark_worker_as_available(inqW_fd)
 
-    def _create_process_handlers(self, hub, READ=READ, ERR=ERR):
+    def _create_process_handlers(self, hub):
         """Create handlers called on process up/down, etc."""
         add_reader, remove_reader, remove_writer = (
             hub.add_reader, hub.remove_reader, hub.remove_writer,
@@ -748,7 +748,7 @@ class AsynPool(_pool.Pool):
             # have to test further]
             num_ready = len(ready_fds)
 
-            for i in range(num_ready):
+            for _ in range(num_ready):
                 ready_fd = ready_fds[total_write_count[0] % num_ready]
                 total_write_count[0] += 1
                 if ready_fd in active_writes:
@@ -873,7 +873,7 @@ class AsynPool(_pool.Pool):
                 active_writes.discard(fd)
                 write_generator_done(job._writer())  # is a weakref
 
-        def send_ack(response, pid, job, fd, WRITE=WRITE, ERR=ERR):
+        def send_ack(response, pid, job, fd):
             # Only used when synack is enabled.
             # Schedule writing ack response for when the fd is writable.
             msg = Ack(job, fd, precalc[response])
@@ -1164,7 +1164,7 @@ class AsynPool(_pool.Pool):
         on_state_change = self._result_handler.on_state_change
         fds = {resq}
         while fds and not resq.closed and self._state != TERMINATE:
-            readable, _, again = _select(fds, None, fds, timeout=0.01)
+            readable, _, _ = _select(fds, None, fds, timeout=0.01)
             if readable:
                 try:
                     task = resq.recv()