|
@@ -50,6 +50,14 @@ from celery.utils import timer2
|
|
|
from celery.worker import state
|
|
|
|
|
|
|
|
|
+def apply_batches_task(task, args, loglevel, logfile):
|
|
|
+ task.request.update({"loglevel": loglevel, "logfile": logfile})
|
|
|
+ try:
|
|
|
+ return task(*args)
|
|
|
+ finally:
|
|
|
+ task.request.clear()
|
|
|
+
|
|
|
+
|
|
|
class SimpleRequest(object):
|
|
|
"""Pickleable request."""
|
|
|
|
|
@@ -99,6 +107,7 @@ class Batches(Task):
|
|
|
self._count = count(1).next
|
|
|
self._tref = None
|
|
|
self._pool = None
|
|
|
+ self._logging = None
|
|
|
|
|
|
def run(self, requests):
|
|
|
raise NotImplementedError("%r must implement run(requests)" % (self, ))
|
|
@@ -110,6 +119,8 @@ class Batches(Task):
|
|
|
def execute(self, request, pool, loglevel, logfile):
|
|
|
if not self._pool: # just take pool from first task.
|
|
|
self._pool = pool
|
|
|
+ if not self._logging:
|
|
|
+ self._logging = loglevel, logfile
|
|
|
|
|
|
state.task_ready(request) # immediately remove from worker state.
|
|
|
self._buffer.put(request)
|
|
@@ -145,7 +156,9 @@ class Batches(Task):
|
|
|
def on_return(result):
|
|
|
[req.acknowledge() for req in acks_late[True]]
|
|
|
|
|
|
- return self._pool.apply_async(self, args,
|
|
|
+ loglevel, logfile = self._logging
|
|
|
+ return self._pool.apply_async(apply_batches_task,
|
|
|
+ (self, args, loglevel, logfile),
|
|
|
accept_callback=on_accepted,
|
|
|
callbacks=acks_late[True] and [on_return] or [])
|
|
|
|