Ver Fonte

Merge branch 'PMickael/patch-1'

Ask Solem há 10 anos atrás
pai
commit
74f42d8338
2 ficheiros alterados com 20 adições e 6 exclusões
  1. 2 0
      celery/bin/worker.py
  2. 18 6
      celery/contrib/batches.py

+ 2 - 0
celery/bin/worker.py

@@ -242,6 +242,8 @@ class worker(Command):
                    default=conf.CELERYD_TASK_SOFT_TIME_LIMIT, type='float'),
             Option('--maxtasksperchild', dest='max_tasks_per_child',
                    default=conf.CELERYD_MAX_TASKS_PER_CHILD, type='int'),
+            Option('--prefetch-multiplier', dest='prefetch_multiplier',
+                   default=conf.CELERYD_PREFETCH_MULTIPLIER, type='int'),
             Option('--queues', '-Q', default=[]),
             Option('--exclude-queues', '-X', default=[]),
             Option('--include', '-I', default=[]),

+ 18 - 6
celery/contrib/batches.py

@@ -90,6 +90,7 @@ from celery.five import Empty, Queue
 from celery.utils.log import get_logger
 from celery.worker.request import Request
 from celery.utils import noop
+from celery.worker.strategy import proto1_to_proto2
 
 __all__ = ['Batches']
 
@@ -163,8 +164,8 @@ class SimpleRequest(object):
 
     @classmethod
     def from_request(cls, request):
-        return cls(request.id, request.name, request.args,
-                   request.kwargs, request.delivery_info, request.hostname)
+        return cls(request.id, request.name, request.body[0],
+                   request.body[1], request.delivery_info, request.hostname)
 
 
 class Batches(Task):
@@ -196,10 +197,21 @@ class Batches(Task):
         flush_buffer = self._do_flush
 
         def task_message_handler(message, body, ack, reject, callbacks, **kw):
-            request = Req(body, on_ack=ack, app=app, hostname=hostname,
-                          events=eventer, task=task,
-                          connection_errors=connection_errors,
-                          delivery_info=message.delivery_info)
+            if body is None:                                                                                                                        31513 ?        S    125:09 /usr/bin/python -m celery worker --without-heartbeat -c 50 --pool=eventlet -n celery6@ns326150.ip-37-187-158.eu --app=mai
+                body, headers, decoded, utc = (                                                                                                     n -Q rss --without-gossip --logfile=/home/logs/rss.log --pidfile=celery6.pid
+                    message.body, message.headers, False, True,                                                                                     31528 ?        R    128:34 /usr/bin/python -m celery worker --without-heartbeat -c 50 --pool=eventlet -n celery7@ns326150.ip-37-187-158.eu --app=mai
+                )                                                                                                                                   n -Q rss --without-gossip --logfile=/home/logs/rss.log --pidfile=celery7.pid
+                if not body_can_be_buffer:                                                                                                          31543 ?        S    124:32 /usr/bin/python -m celery worker --without-heartbeat -c 50 --pool=eventlet -n celery8@ns326150.ip-37-187-158.eu --app=mai
+                    body = bytes(body) if isinstance(body, buffer_t) else body                                                                      n -Q rss --without-gossip --logfile=/home/logs/rss.log --pidfile=celery8.pid
+            else:                                                                                                                                   26150 ?        S      0:50 /usr/bin/python -m celery worker --without-heartbeat -c 2 --pool=eventlet -n engines@ns326150.ip-37-187-158.eu --app=main
+                body, headers, decoded, utc = proto1_to_proto2(message, body)                                                                        -Q engines --without-gossip --logfile=/home/logs/engines.log --pidfile=/home/logs/pid-engines.pid
+                                                                                                                                                    22409 ?        S      0:00 /usr/bin/python -m celery worker --without-heartbeat -c 1 -n elasticsearch_bulk_actions@ns326150.ip-37-187-158.eu --app=m
+            request = Req(                                                                                                                          ain -Q elasticsearch_bulk_actions --without-gossip --logfile=/home/logs/elasticsearch_bulk_actions.log --pidfile=elasticsearch_bulk_actions.pid
+                message,                                                                                                                            22459 ?        S      0:00  \_ /usr/bin/python -m celery worker --without-heartbeat -c 1 -n elasticsearch_bulk_actions@ns326150.ip-37-187-158.eu --a
+                on_ack=ack, on_reject=reject, app=app, hostname=hostname,                                                                           pp=main -Q elasticsearch_bulk_actions --without-gossip --logfile=/home/logs/elasticsearch_bulk_actions.log --pidfile=elasticsearch_bulk_actions.pid
+                eventer=eventer, task=task, connection_errors=connection_errors,                                                                    22419 ?        S      0:00 /usr/bin/python -m celery worker --without-heartbeat -c 1 -n celery@ns326150.ip-37-187-158.eu --app=main -Q elasticsearch
+                body=body, headers=headers, decoded=decoded, utc=utc,                                                                               _bulk_actions --without-gossip --logfile=/home/logs/elasticsearch_bulk_actions.log --pidfile=celery.pid
+            )
             put_buffer(request)
 
             if self._tref is None:     # first request starts flush timer.