|
@@ -35,7 +35,7 @@ from weakref import WeakValueDictionary, ref
|
|
from amqp.utils import promise
|
|
from amqp.utils import promise
|
|
from billiard.pool import RUN, TERMINATE, ACK, NACK, WorkersJoined
|
|
from billiard.pool import RUN, TERMINATE, ACK, NACK, WorkersJoined
|
|
from billiard import pool as _pool
|
|
from billiard import pool as _pool
|
|
-from billiard.compat import setblocking, isblocking
|
|
|
|
|
|
+from billiard.compat import buf_t, setblocking, isblocking
|
|
from billiard.einfo import ExceptionInfo
|
|
from billiard.einfo import ExceptionInfo
|
|
from billiard.queues import _SimpleQueue
|
|
from billiard.queues import _SimpleQueue
|
|
from kombu.async import READ, WRITE, ERR
|
|
from kombu.async import READ, WRITE, ERR
|
|
@@ -680,7 +680,7 @@ class AsynPool(_pool.Pool):
|
|
header = pack('>I', body_size)
|
|
header = pack('>I', body_size)
|
|
# index 1,0 is the job ID.
|
|
# index 1,0 is the job ID.
|
|
job = get_job(tup[1][0])
|
|
job = get_job(tup[1][0])
|
|
- job._payload = header, body, body_size
|
|
|
|
|
|
+ job._payload = buf_t(header), buf_t(body), body_size
|
|
put_message(job)
|
|
put_message(job)
|
|
self._quick_put = send_job
|
|
self._quick_put = send_job
|
|
|
|
|