|
@@ -2,11 +2,8 @@ import multiprocessing
|
|
|
import itertools
|
|
|
import threading
|
|
|
import uuid
|
|
|
-import time
|
|
|
-import os
|
|
|
|
|
|
from multiprocessing.pool import RUN as POOL_STATE_RUN
|
|
|
-from celery.timer import TimeoutTimer, TimeoutError
|
|
|
from celery.datastructures import ExceptionInfo
|
|
|
|
|
|
|
|
@@ -70,31 +67,31 @@ class TaskPool(object):
|
|
|
callbacks = callbacks or []
|
|
|
errbacks = errbacks or []
|
|
|
meta = meta or {}
|
|
|
- id = str(uuid.uuid4())
|
|
|
+ tid = str(uuid.uuid4())
|
|
|
|
|
|
if not self._pool_is_running():
|
|
|
self._start()
|
|
|
|
|
|
self._processed_total = self._process_counter.next()
|
|
|
|
|
|
- on_return = lambda r: self.on_return(r, id, callbacks, errbacks, meta)
|
|
|
+ on_return = lambda r: self.on_return(r, tid, callbacks, errbacks, meta)
|
|
|
|
|
|
result = self._pool.apply_async(target, args, kwargs,
|
|
|
callback=on_return)
|
|
|
|
|
|
- self.add(result, callbacks, errbacks, id, meta)
|
|
|
+ self.add(result, callbacks, errbacks, tid, meta)
|
|
|
|
|
|
return result
|
|
|
|
|
|
- def on_return(self, ret_val, id, callbacks, errbacks, meta):
|
|
|
+ def on_return(self, ret_val, tid, callbacks, errbacks, meta):
|
|
|
try:
|
|
|
- del(self._processes[id])
|
|
|
+ del(self._processes[tid])
|
|
|
except KeyError:
|
|
|
pass
|
|
|
else:
|
|
|
self.on_ready(ret_val, callbacks, errbacks, meta)
|
|
|
|
|
|
- def add(self, result, callbacks, errbacks, id, meta):
|
|
|
+ def add(self, result, callbacks, errbacks, tid, meta):
|
|
|
"""Add a process to the queue.
|
|
|
|
|
|
If the queue is full, it will wait for the first task to finish,
|
|
@@ -112,11 +109,11 @@ class TaskPool(object):
|
|
|
and exception. Must have the function signature:
|
|
|
``myerrback(exc, meta)``.
|
|
|
|
|
|
- :option id: Explicitly set the id for this task.
|
|
|
-st
|
|
|
+ :option tid: The tid for this task (unqiue pool id).
|
|
|
+
|
|
|
"""
|
|
|
|
|
|
- self._processes[id] = [result, callbacks, errbacks, meta]
|
|
|
+ self._processes[tid] = [result, callbacks, errbacks, meta]
|
|
|
|
|
|
if self.full():
|
|
|
self.wait_for_result()
|
|
@@ -138,14 +135,14 @@ st
|
|
|
self.logger.debug("Reaping processes...")
|
|
|
processes_reaped = 0
|
|
|
for process_no, entry in enumerate(self._processes.items()):
|
|
|
- id, process_info = entry
|
|
|
+ tid, process_info = entry
|
|
|
result, callbacks, errbacks, meta = process_info
|
|
|
try:
|
|
|
ret_value = result.get(timeout=0.3)
|
|
|
except multiprocessing.TimeoutError:
|
|
|
continue
|
|
|
else:
|
|
|
- self.on_return(ret_value, id, callbacks, errbacks, meta)
|
|
|
+ self.on_return(ret_value, tid, callbacks, errbacks, meta)
|
|
|
processes_reaped += 1
|
|
|
return processes_reaped
|
|
|
|