Ask Solem 16 роки тому
батько
коміт
bdd354e9e3
4 змінених файлів з 49 додано та 2 видалено
  1. 7 1
      celery/bin/celeryd.py
  2. 12 0
      celery/conf.py
  3. 17 1
      celery/monitoring.py
  4. 13 0
      celery/task.py

+ 7 - 1
celery/bin/celeryd.py

@@ -104,6 +104,9 @@ OPTION_LIST = (
             help="Discard all waiting tasks before the server is started. "
                  "WARNING: This is unrecoverable, and the tasks will be "
                  "deleted from the messaging server."),
+    optparse.make_option('-s', '--statistics', default=USE_STATISTICS,
+            action="store_true", dest="statistics",
+            help="Collect statistics."),
     optparse.make_option('-f', '--logfile', default=DAEMON_LOG_FILE,
             action="store", dest="logfile",
             help="Path to log file."),
@@ -173,6 +176,9 @@ def run_worker(concurrency=DAEMON_CONCURRENCY, detach=False,
     print(". Launching celery, please hold on to something...")
 
     if statistics:
+        from celery.task import tasks, CollectStatisticsTask
+        tasks.register(CollectStatisticsTask)
+
         settings.CELERY_STATISTICS = statistics
 
     if not concurrency:
@@ -211,7 +217,7 @@ def run_worker(concurrency=DAEMON_CONCURRENCY, detach=False,
             "concurrency": concurrency,
             "loglevel": loglevel,
             "pidfile": pidfile,
-    }
+    })
     print("* Reporting of statistics is %s..." % (
         settings.CELERY_STATISTICS and "ON" or "OFF"))
 

+ 12 - 0
celery/conf.py

@@ -13,6 +13,8 @@ DEFAULT_LOG_FMT = '[%(asctime)s: %(levelname)s/%(processName)s] %(message)s'
 DEFAULT_DAEMON_LOG_LEVEL = "INFO"
 DEFAULT_DAEMON_LOG_FILE = "celeryd.log"
 DEFAULT_AMQP_CONNECTION_TIMEOUT = 4
+DEFAULT_STATISTICS = False
+DEFAULT_STATISTICS_COLLECT_INTERVAL = 60 * 5
 
 """
 .. data:: LOG_LEVELS
@@ -150,3 +152,13 @@ AMQP_CONNECTION_TIMEOUT = getattr(settings, "CELERY_AMQP_CONNECTION_TIMEOUT",
 SEND_CELERY_TASK_ERROR_EMAILS = getattr(settings,
                                         "SEND_CELERY_TASK_ERROR_EMAILS",
                                         not settings.DEBUG)
+
+"""
+.. data:: STATISTICS_COLLECT_INTERVAL
+    The interval in seconds of which the
+    :class:`celery.task.CollectStatisticsTask`` is run.
+
+"""
+STATISTICS_COLLECT_INTERVAL = getattr(settings,
+                                "CELERY_STATISTICS_COLLECT_INTERVAL",
+                                DEFAULT_STATISTICS_COLLECT_INTERVAL)

+ 17 - 1
celery/monitoring.py

@@ -6,8 +6,13 @@
 from carrot.connection import DjangoAMQPConnection
 from celery.messaging import StatsPublisher, StatsConsumer
 from django.conf import settings
+from django.core.cache import cache
+import socket
 import time
 
+HOSTNAME = socket.gethostname()
+DEFAULT_CACHE_KEY_PREFIX = "celery-statistics-%s" % HOSTNAME
+
 
 class Statistics(object):
     """Base class for classes publishing celery statistics.
@@ -71,6 +76,7 @@ class Statistics(object):
         raise NotImplementedError(
                 "Statistics classes must define a on_stop handler.")
 
+
 class TimerStats(Statistics):
     """A generic timer producing ``celery`` statistics.
 
@@ -166,6 +172,16 @@ class StatsCollector(object):
                 handler = getattr(self, stat_type)
                 handler(**stats_entry["data"])
 
+    def dump_to_cache(self, cache_key_prefix=DEFAULT_CACHE_KEY_PREFIX):
+        cache.add("%s-total_tasks_processed" % cache_key_prefix,
+                self.total_tasks_processed)
+        cache.add("%s-total_tasks_processed_by_type" % cache_key_prefix,
+                    self.total_tasks_processed_by_type)
+        cache.add("%s-total_task_time_running" % cache_key_prefix,
+                    self.total_task_time_running)
+        cache.add("%s-total_task_time_running_by_type" % cache_key_prefix,
+                    self.total_task_time_running_by_type)
+
     def task_time_running(self, task_id, task_name, args, kwargs, nsecs):
         """Process statistics regarding how long a task has been running
         (the :class:TaskTimerStats` class is responsible for sending these).
@@ -201,7 +217,7 @@ class StatsCollector(object):
 
             * Total task processing time.
 
-            * Total number of tasks executed.
+            * Total number of tasks executed
         
         """
         print("Total processing time by task type:")

+ 13 - 0
celery/task.py

@@ -5,6 +5,7 @@ Working with tasks and task sets.
 """
 from carrot.connection import DjangoAMQPConnection
 from celery.conf import AMQP_CONNECTION_TIMEOUT
+from celery.conf import STATISTICS_COLLECT_INTERVAL
 from celery.messaging import TaskPublisher, TaskConsumer
 from celery.log import setup_logger
 from celery.registry import tasks
@@ -598,3 +599,15 @@ def ping():
         'pong'
     """
     return PingTask.apply_async().get()
+
+
+class CollectStatisticsTask(PeriodicTask):
+    name = "celery.collect-statistics"
+    run_every = timedelta(seconds=STATISTICS_COLLECT_INTERVAL)
+
+    def run(self, **kwargs):
+        from celery.monitoring import StatsCollector
+        stats = StatsCollector()
+        stats.collect()
+        stats.dump_to_cache()
+        return True