|
@@ -197,20 +197,20 @@ class Batches(Task):
|
|
|
flush_buffer = self._do_flush
|
|
|
|
|
|
def task_message_handler(message, body, ack, reject, callbacks, **kw):
|
|
|
- if body is None: 31513 ? S 125:09 /usr/bin/python -m celery worker --without-heartbeat -c 50 --pool=eventlet -n celery6@ns326150.ip-37-187-158.eu --app=mai
|
|
|
- body, headers, decoded, utc = ( n -Q rss --without-gossip --logfile=/home/logs/rss.log --pidfile=celery6.pid
|
|
|
- message.body, message.headers, False, True, 31528 ? R 128:34 /usr/bin/python -m celery worker --without-heartbeat -c 50 --pool=eventlet -n celery7@ns326150.ip-37-187-158.eu --app=mai
|
|
|
- ) n -Q rss --without-gossip --logfile=/home/logs/rss.log --pidfile=celery7.pid
|
|
|
- if not body_can_be_buffer: 31543 ? S 124:32 /usr/bin/python -m celery worker --without-heartbeat -c 50 --pool=eventlet -n celery8@ns326150.ip-37-187-158.eu --app=mai
|
|
|
- body = bytes(body) if isinstance(body, buffer_t) else body n -Q rss --without-gossip --logfile=/home/logs/rss.log --pidfile=celery8.pid
|
|
|
- else: 26150 ? S 0:50 /usr/bin/python -m celery worker --without-heartbeat -c 2 --pool=eventlet -n engines@ns326150.ip-37-187-158.eu --app=main
|
|
|
- body, headers, decoded, utc = proto1_to_proto2(message, body) -Q engines --without-gossip --logfile=/home/logs/engines.log --pidfile=/home/logs/pid-engines.pid
|
|
|
- 22409 ? S 0:00 /usr/bin/python -m celery worker --without-heartbeat -c 1 -n elasticsearch_bulk_actions@ns326150.ip-37-187-158.eu --app=m
|
|
|
- request = Req( ain -Q elasticsearch_bulk_actions --without-gossip --logfile=/home/logs/elasticsearch_bulk_actions.log --pidfile=elasticsearch_bulk_actions.pid
|
|
|
- message, 22459 ? S 0:00 \_ /usr/bin/python -m celery worker --without-heartbeat -c 1 -n elasticsearch_bulk_actions@ns326150.ip-37-187-158.eu --a
|
|
|
- on_ack=ack, on_reject=reject, app=app, hostname=hostname, pp=main -Q elasticsearch_bulk_actions --without-gossip --logfile=/home/logs/elasticsearch_bulk_actions.log --pidfile=elasticsearch_bulk_actions.pid
|
|
|
- eventer=eventer, task=task, connection_errors=connection_errors, 22419 ? S 0:00 /usr/bin/python -m celery worker --without-heartbeat -c 1 -n celery@ns326150.ip-37-187-158.eu --app=main -Q elasticsearch
|
|
|
- body=body, headers=headers, decoded=decoded, utc=utc, _bulk_actions --without-gossip --logfile=/home/logs/elasticsearch_bulk_actions.log --pidfile=celery.pid
|
|
|
+ if body is None:
|
|
|
+ body, headers, decoded, utc = (
|
|
|
+ message.body, message.headers, False, True,
|
|
|
+ )
|
|
|
+ if not body_can_be_buffer:
|
|
|
+ body = bytes(body) if isinstance(body, buffer_t) else body
|
|
|
+ else:
|
|
|
+ body, headers, decoded, utc = proto1_to_proto2(message, body)
|
|
|
+
|
|
|
+ request = Req(
|
|
|
+ message,
|
|
|
+ on_ack=ack, on_reject=reject, app=app, hostname=hostname,
|
|
|
+ eventer=eventer, task=task, connection_errors=connection_errors,
|
|
|
+ body=body, headers=headers, decoded=decoded, utc=utc,
|
|
|
)
|
|
|
put_buffer(request)
|
|
|
|