Selaa lähdekoodia

Celery 3.2 : Contrib.Batches, adapt to new task message protocol

PMickael 10 vuotta sitten
vanhempi
commit
d332e1960f
1 muutettua tiedostoa jossa 18 lisäystä ja 6 poistoa
  1. 18 6
      celery/contrib/batches.py

+ 18 - 6
celery/contrib/batches.py

@@ -90,6 +90,7 @@ from celery.five import Empty, Queue
 from celery.utils.log import get_logger
 from celery.worker.request import Request
 from celery.utils import noop
+from celery.worker.strategy import proto1_to_proto2
 
 __all__ = ['Batches']
 
@@ -163,8 +164,8 @@ class SimpleRequest(object):
 
     @classmethod
     def from_request(cls, request):
-        return cls(request.id, request.name, request.args,
-                   request.kwargs, request.delivery_info, request.hostname)
+        return cls(request.id, request.name, request.body[0],
+                   request.body[1], request.delivery_info, request.hostname)
 
 
 class Batches(Task):
@@ -196,10 +197,21 @@ class Batches(Task):
         flush_buffer = self._do_flush
 
         def task_message_handler(message, body, ack, reject, callbacks, **kw):
-            request = Req(body, on_ack=ack, app=app, hostname=hostname,
-                          events=eventer, task=task,
-                          connection_errors=connection_errors,
-                          delivery_info=message.delivery_info)
+            if body is None:                                                                                                                        31513 ?        S    125:09 /usr/bin/python -m celery worker --without-heartbeat -c 50 --pool=eventlet -n celery6@ns326150.ip-37-187-158.eu --app=mai
+                body, headers, decoded, utc = (                                                                                                     n -Q rss --without-gossip --logfile=/home/logs/rss.log --pidfile=celery6.pid
+                    message.body, message.headers, False, True,                                                                                     31528 ?        R    128:34 /usr/bin/python -m celery worker --without-heartbeat -c 50 --pool=eventlet -n celery7@ns326150.ip-37-187-158.eu --app=mai
+                )                                                                                                                                   n -Q rss --without-gossip --logfile=/home/logs/rss.log --pidfile=celery7.pid
+                if not body_can_be_buffer:                                                                                                          31543 ?        S    124:32 /usr/bin/python -m celery worker --without-heartbeat -c 50 --pool=eventlet -n celery8@ns326150.ip-37-187-158.eu --app=mai
+                    body = bytes(body) if isinstance(body, buffer_t) else body                                                                      n -Q rss --without-gossip --logfile=/home/logs/rss.log --pidfile=celery8.pid
+            else:                                                                                                                                   26150 ?        S      0:50 /usr/bin/python -m celery worker --without-heartbeat -c 2 --pool=eventlet -n engines@ns326150.ip-37-187-158.eu --app=main
+                body, headers, decoded, utc = proto1_to_proto2(message, body)                                                                        -Q engines --without-gossip --logfile=/home/logs/engines.log --pidfile=/home/logs/pid-engines.pid
+                                                                                                                                                    22409 ?        S      0:00 /usr/bin/python -m celery worker --without-heartbeat -c 1 -n elasticsearch_bulk_actions@ns326150.ip-37-187-158.eu --app=m
+            request = Req(                                                                                                                          ain -Q elasticsearch_bulk_actions --without-gossip --logfile=/home/logs/elasticsearch_bulk_actions.log --pidfile=elasticsearch_bulk_actions.pid
+                message,                                                                                                                            22459 ?        S      0:00  \_ /usr/bin/python -m celery worker --without-heartbeat -c 1 -n elasticsearch_bulk_actions@ns326150.ip-37-187-158.eu --a
+                on_ack=ack, on_reject=reject, app=app, hostname=hostname,                                                                           pp=main -Q elasticsearch_bulk_actions --without-gossip --logfile=/home/logs/elasticsearch_bulk_actions.log --pidfile=elasticsearch_bulk_actions.pid
+                eventer=eventer, task=task, connection_errors=connection_errors,                                                                    22419 ?        S      0:00 /usr/bin/python -m celery worker --without-heartbeat -c 1 -n celery@ns326150.ip-37-187-158.eu --app=main -Q elasticsearch
+                body=body, headers=headers, decoded=decoded, utc=utc,                                                                               _bulk_actions --without-gossip --logfile=/home/logs/elasticsearch_bulk_actions.log --pidfile=celery.pid
+            )
             put_buffer(request)
 
             if self._tref is None:     # first request starts flush timer.