Переглянути джерело

Set process name for pool workers as well, also they display the currently executing task.

Ask Solem 15 роки тому
батько
коміт
c01b0ca376
4 змінених файлів з 20 додано та 6 видалено
  1. 2 1
      celery/bin/celeryd.py
  2. 7 1
      celery/worker/__init__.py
  3. 7 2
      celery/worker/job.py
  4. 4 2
      celery/worker/pool.py

+ 2 - 1
celery/bin/celeryd.py

@@ -258,7 +258,8 @@ def parse_options(arguments):
 
 def set_process_status(info):
     arg_start = "manage" in sys.argv[0] and 2 or 1
-    info = "%s (%s)" % (info, " ".join(sys.argv[arg_start:]))
+    if sys.argv[arg_start:]:
+        info = "%s (%s)" % (info, " ".join(sys.argv[arg_start:]))
     platform.set_mp_process_title("celeryd", info=info)
 
 

+ 7 - 1
celery/worker/__init__.py

@@ -9,6 +9,7 @@ from Queue import Queue
 
 from celery import conf
 from celery import registry
+from celery import platform
 from celery.log import setup_logger
 from celery.beat import ClockServiceThread
 from celery.worker.pool import TaskPool
@@ -18,6 +19,10 @@ from celery.worker.scheduler import Scheduler
 from celery.worker.controllers import Mediator, ScheduleController
 
 
+def process_initializer():
+    platform.set_mp_process_title("celeryd")
+
+
 class WorkController(object):
     """Executes tasks waiting in the task queue.
 
@@ -117,7 +122,8 @@ class WorkController(object):
 
         # Threads+Pool
         self.schedule_controller = ScheduleController(self.eta_scheduler)
-        self.pool = TaskPool(self.concurrency, logger=self.logger)
+        self.pool = TaskPool(self.concurrency, logger=self.logger,
+                             initializer=process_initializer)
         self.broker_listener = CarrotListener(self.ready_queue,
                                         self.eta_scheduler,
                                         logger=self.logger,

+ 7 - 2
celery/worker/job.py

@@ -10,6 +10,7 @@ import warnings
 
 from django.core.mail import mail_admins
 
+from celery import platform
 from celery.log import get_default_logger
 from celery.utils import noop, fun_takes_kwargs
 from celery.loaders import current_loader
@@ -118,8 +119,12 @@ class WorkerTaskTrace(TaskTrace):
                 stored_exc, type_, tb, strtb)
 
 
-def execute_and_trace(*args, **kwargs):
-    return WorkerTaskTrace(*args, **kwargs).execute_safe()
+def execute_and_trace(task_name, *args, **kwargs):
+    platform.set_mp_process_title("celeryd", info=task_name)
+    try:
+        return WorkerTaskTrace(task_name, *args, **kwargs).execute_safe()
+    finally:
+        platform.set_mp_process_title("celeryd")
 
 
 class TaskWrapper(object):

+ 4 - 2
celery/worker/pool.py

@@ -29,9 +29,10 @@ class TaskPool(object):
 
     """
 
-    def __init__(self, limit, logger=None):
+    def __init__(self, limit, logger=None, initializer=None):
         self.limit = limit
         self.logger = logger or get_logger()
+        self.initializer = initializer
         self._pool = None
 
     def start(self):
@@ -40,7 +41,8 @@ class TaskPool(object):
         Will pre-fork all workers so they're ready to accept tasks.
 
         """
-        self._pool = DynamicPool(processes=self.limit)
+        self._pool = DynamicPool(processes=self.limit,
+                                 initializer=self.initializer)
 
     def stop(self):
         """Terminate the pool."""