Forráskód Böngészése

Removed celerymon, moved to http://github.com/ask/celerymon/

Ask Solem 15 éve
szülő
commit
2038edb8db

+ 0 - 7
celerymon/bin/celerymon

@@ -1,7 +0,0 @@
-#!/usr/bin/env python
-import sys
-from celerymon.bin.celerymon import run_monitor, parse_options
-
-if __name__ == "__main__":
-    options = parse_options(sys.argv[1:])
-    run_monitor(**vars(options))

+ 0 - 20
celerymon/celerymon/__init__.py

@@ -1,20 +0,0 @@
-"""Real-time monitoring of Celery workers."""
-
-VERSION = (0, 1, 0)
-
-__version__ = ".".join(map(str, VERSION))
-__author__ = "Ask Solem"
-__contact__ = "askh@opera.com"
-__homepage__ = "http://github.com/ask/celerymon/"
-__docformat__ = "restructuredtext"
-
-
-def is_stable_release():
-    return bool(not VERSION[1] % 2)
-
-
-def version_with_meta():
-    meta = "unstable"
-    if is_stable_release():
-        meta = "stable"
-    return "%s (%s)" % (__version__, meta)

+ 0 - 0
celerymon/celerymon/bin/__init__


+ 0 - 177
celerymon/celerymon/bin/celerymon.py

@@ -1,177 +0,0 @@
-#!/usr/bin/env python
-"""celerymon
-
-.. program:: celerymon
-
-.. cmdoption:: -P, --port
-
-    Port the webserver should listen to. Default: ``8989``.
-
-.. cmdoption:: -f, --logfile
-
-    Path to log file. If no logfile is specified, ``stderr`` is used.
-
-.. cmdoption:: -l, --loglevel
-
-    Logging level, choose between ``DEBUG``, ``INFO``, ``WARNING``,
-    ``ERROR``, ``CRITICAL``, or ``FATAL``.
-
-.. cmdoption:: -p, --pidfile
-
-    Path to pidfile.
-
-.. cmdoption:: -d, --detach, --daemon
-
-    Run in the background as a daemon.
-
-.. cmdoption:: -u, --uid
-
-    User-id to run ``celerymon`` as when in daemon mode.
-
-.. cmdoption:: -g, --gid
-
-    Group-id to run ``celerymon`` as when in daemon mode.
-
-.. cmdoption:: --umask
-
-    umask of the process when in daemon mode.
-
-.. cmdoption:: --workdir
-
-    Directory to change to when in daemon mode.
-
-.. cmdoption:: --chroot
-
-    Change root directory to this path when in daemon mode.
-
-"""
-import os
-import sys
-import traceback
-import optparse
-
-from celery import conf
-from celery import platform
-from celery import __version__
-from celery.log import emergency_error
-from celery.loaders import settings
-from celery.messaging import get_connection_info
-
-from celerymon.service import MonitorService
-
-STARTUP_INFO_FMT = """
-Configuration ->
-    . broker -> %(conninfo)s
-    . exchange -> %(exchange)s (%(exchange_type)s)
-    . consumer -> queue:%(consumer_queue)s binding:%(consumer_rkey)s
-    . webserver -> http://localhost:%(http_port)s
-""".strip()
-
-OPTION_LIST = (
-    optparse.make_option('-f', '--logfile', default=conf.CELERYMON_LOG_FILE,
-            action="store", dest="logfile",
-            help="Path to log file."),
-    optparse.make_option('-l', '--loglevel',
-            default=conf.CELERYMON_LOG_LEVEL,
-            action="store", dest="loglevel",
-            help="Choose between DEBUG/INFO/WARNING/ERROR/CRITICAL/FATAL."),
-    optparse.make_option('-P', '--port',
-            action="store", type="int", dest="http_port", default=8989,
-            help="Port the webserver should listen to."),
-    optparse.make_option('-p', '--pidfile',
-            default=conf.CELERYMON_PID_FILE,
-            action="store", dest="pidfile",
-            help="Path to pidfile."),
-    optparse.make_option('-d', '--detach', '--daemon', default=False,
-            action="store_true", dest="detach",
-            help="Run in the background as a daemon."),
-    optparse.make_option('-u', '--uid', default=None,
-            action="store", dest="uid",
-            help="User-id to run celerymon as when in daemon mode."),
-    optparse.make_option('-g', '--gid', default=None,
-            action="store", dest="gid",
-            help="Group-id to run celerymon as when in daemon mode."),
-    optparse.make_option('--umask', default=0,
-            action="store", type="int", dest="umask",
-            help="umask of the process when in daemon mode."),
-    optparse.make_option('--workdir', default=None,
-            action="store", dest="working_directory",
-            help="Directory to change to when in daemon mode."),
-    optparse.make_option('--chroot', default=None,
-            action="store", dest="chroot",
-            help="Change root directory to this path when in daemon mode."),
-    )
-
-
-def run_monitor(detach=False, loglevel=conf.CELERYMON_LOG_LEVEL,
-        logfile=conf.CELERYMON_LOG_FILE, pidfile=conf.CELERYMON_PID_FILE,
-        umask=0, uid=None, gid=None, working_directory=None, chroot=None,
-        http_port=8989, **kwargs):
-    """Starts the celery monitor."""
-
-    print("celerymon %s is starting." % __version__)
-
-    # Setup logging
-    if not isinstance(loglevel, int):
-        loglevel = conf.LOG_LEVELS[loglevel.upper()]
-    if not detach:
-        logfile = None # log to stderr when not running in the background.
-
-    # Dump configuration to screen so we have some basic information
-    # when users sends e-mails.
-    print(STARTUP_INFO_FMT % {
-            "http_port": http_port,
-            "conninfo": get_connection_info(),
-            "exchange": conf.AMQP_EXCHANGE,
-            "exchange_type": conf.AMQP_EXCHANGE_TYPE,
-            "consumer_queue": conf.AMQP_CONSUMER_QUEUE,
-            "consumer_rkey": conf.AMQP_CONSUMER_ROUTING_KEY,
-            "publisher_rkey": conf.AMQP_PUBLISHER_ROUTING_KEY,
-            "loglevel": loglevel,
-            "pidfile": pidfile,
-    })
-
-    from celery.log import setup_logger, redirect_stdouts_to_logger
-    print("celerymon has started.")
-    if detach:
-        context = platform.create_daemon_context(logfile, pidfile,
-                                        chroot_directory=chroot,
-                                        working_directory=working_directory,
-                                        umask=umask,
-                                        uid=uid,
-                                        gid=gid)
-        context.open()
-        logger = setup_logger(loglevel, logfile)
-        redirect_stdouts_to_logger(logger, loglevel)
-
-    def _run_clock():
-        logger = setup_logger(loglevel, logfile)
-        monitor = MonitorService(logger=logger,
-                                 is_detached=detach,
-                                 http_port=http_port)
-
-        try:
-            monitor.start()
-        except Exception, e:
-            emergency_error(logfile,
-                    "celerymon raised exception %s: %s\n%s" % (
-                            e.__class__, e, traceback.format_exc()))
-
-    try:
-        _run_clock()
-    except:
-        if detach:
-            context.close()
-        raise
-
-
-def parse_options(arguments):
-    """Parse the available options to ``celerymon``."""
-    parser = optparse.OptionParser(option_list=OPTION_LIST)
-    options, values = parser.parse_args(arguments)
-    return options
-
-
-if __name__ == "__main__":
-    options = parse_options(sys.argv[1:])
-    run_monitor(**vars(options))

+ 0 - 0
celerymon/celerymon/handlers/__init__.py


+ 0 - 85
celerymon/celerymon/handlers/api.py

@@ -1,85 +0,0 @@
-from functools import wraps
-
-import simplejson
-from tornado.web import RequestHandler, Application
-
-from celery.task import revoke
-from celerymon.state import monitor_state
-
-
-def JSON(fun):
-
-    @wraps(fun)
-    def _write_json(self, *args, **kwargs):
-        content = fun(self, *args, **kwargs)
-        self.write(simplejson.dumps(content))
-
-    return _write_json
-
-
-class APIHandler(RequestHandler):
-
-    def __init__(self, *args, **kwargs):
-        super(APIHandler, self).__init__(*args, **kwargs)
-        self.set_header("Content-Type", "application/javascript")
-
-
-def api_handler(fun):
-
-    @JSON
-    def get(self, *args, **kwargs):
-        return fun(self, *args, **kwargs)
-
-    return type(fun.__name__, (APIHandler, ), {"get": get})
-
-
-@api_handler
-def task_state(request, task_id):
-    return monitor_state.get_task_info(task_id)
-
-
-@api_handler
-def list_tasks(request):
-    return monitor_state.tasks_by_time()
-
-
-@api_handler
-def list_tasks_by_name(request, name):
-    return monitor_state.tasks_by_type()[name]
-
-
-@api_handler
-def list_task_types(request):
-    return monitor_state.tasks_by_type()
-
-
-@api_handler
-def list_workers(request):
-    return monitor_state.list_workers()
-
-
-@api_handler
-def list_worker_tasks(request, hostname):
-    return monitor_state.list_worker_tasks(hostname)
-
-
-class RevokeTaskHandler(APIHandler):
-
-    SUPPORTED_METHODS = ["POST"]
-
-    @JSON
-    def post(self):
-        task_id = self.get_argument("task_id")
-        revoke(task_id)
-        return {"ok": True}
-
-
-API = [
-       (r"/task/name/$", list_task_types),
-       (r"/task/name/(.+?)", list_tasks_by_name),
-       (r"/task/$", list_tasks),
-       (r"/revoke/task/", RevokeTaskHandler),
-       (r"/task/(.+)", task_state),
-       (r"/worker/", list_workers),
-       (r"/worker/(.+?)/tasks", list_worker_tasks),
-]

+ 0 - 0
celerymon/celerymon/management/__init__.py


+ 0 - 0
celerymon/celerymon/management/commands/__init__.py


+ 0 - 18
celerymon/celerymon/management/commands/celerymon.py

@@ -1,18 +0,0 @@
-"""
-
-Start the celery clock service from the Django management command.
-
-"""
-from django.core.management.base import BaseCommand
-
-from celery.bin.celerymon import run_monitor, OPTION_LIST
-
-
-class Command(BaseCommand):
-    """Run the celery monitor."""
-    option_list = BaseCommand.option_list + OPTION_LIST
-    help = 'Run the celery monitor'
-
-    def handle(self, *args, **options):
-        """Handle the management command."""
-        run_monitor(**options)

+ 0 - 39
celerymon/celerymon/service.py

@@ -1,39 +0,0 @@
-from carrot.connection import DjangoBrokerConnection
-from celery.events import EventReceiver
-
-from celerymon.state import monitor_state
-from celerymon.web import WebServerThread
-
-
-class MonitorListener(object):
-    """Capture events sent by messages and store them in memory."""
-
-    def __init__(self, state):
-        self.connection = DjangoBrokerConnection()
-        self.receiver = EventReceiver(self.connection, handlers={
-            "task-received": state.receive_task_received,
-            "task-accepted": state.receive_task_event,
-            "task-succeeded": state.receive_task_event,
-            "task-retried": state.receive_task_event,
-            "task-failed": state.receive_task_event,
-            "worker-online": state.receive_worker_event,
-            "worker-offline": state.receive_worker_event,
-            "worker-heartbeat": state.receive_heartbeat,
-        })
-
-    def start(self):
-        self.receiver.capture()
-
-
-class MonitorService(object):
-    """celerymon"""
-
-
-    def __init__(self, logger, is_detached=False, http_port=8989):
-        self.logger = logger
-        self.is_detached = is_detached
-        self.http_port = http_port
-
-    def start(self):
-        MonitorListener(monitor_state).start()
-        WebServerThread(port=self.http_port).start()

+ 0 - 98
celerymon/celerymon/state.py

@@ -1,98 +0,0 @@
-import time
-from collections import defaultdict
-from datetime import datetime
-
-HEARTBEAT_EXPIRE = 120 # Heartbeats must be at most 2 minutes apart.
-
-
-class MonitorState(object):
-
-    def __init__(self):
-        self.hearts = {}
-        self.tasks = {}
-        self.task_events = defaultdict(lambda: [])
-        self.workers = defaultdict(lambda: [])
-
-    def tasks_by_type(self):
-        t = defaultdict(lambda: [])
-        for id, events in self.task_events.items():
-            try:
-                task_type = self.tasks[id]["name"]
-            except KeyError:
-                pass
-            else:
-                t[task_type].append(id)
-        return t
-
-    def get_task_info(self, task_id):
-        task_info = dict(self.tasks[task_id])
-
-        task_events = []
-        for event in self.task_events[task_id]:
-            if event["state"] in ("task-failed", "task-retried"):
-                task_info["exception"] = event["exception"]
-                task_info["traceback"] = event["traceback"]
-            elif event["state"] == "task-succeeded":
-                task_info["result"] = event["result"]
-            task_events.append({event["state"]: event["when"]})
-        task_info["events"] = task_events
-
-        return task_info
-
-    def receive_task_event(self, event):
-        event["state"] = event.pop("type")
-        event["when"] = self.timestamp_to_isoformat(event["timestamp"])
-        self.task_events[event["uuid"]].append(event)
-
-    def timestamp_to_isoformat(self, timestamp):
-        return datetime.fromtimestamp(timestamp).isoformat()
-
-    def receive_heartbeat(self, event):
-        self.hearts[event["hostname"]] = event["timestamp"]
-
-    def receive_task_received(self, event):
-        task_info = dict(event)
-        event = dict(event)
-        task_info.pop("type")
-        event["state"] = event.pop("type")
-        event["when"] = self.timestamp_to_isoformat(event["timestamp"])
-        self.tasks[task_info["uuid"]] = task_info
-        self.task_events[event["uuid"]].append(event)
-
-    def list_workers(self):
-        alive_workers = []
-        for hostname, events in self.workers.items():
-            if events[-1]["state"] == "worker-online":
-                alive_workers.append({hostname: events[-1]["when"]})
-        return alive_workers
-
-    def list_worker_tasks(self, hostname):
-        alive_workers = self.list_workers()
-        tasks_for_worker = defaultdict(lambda: [])
-        for hostname, when in alive_workers.items():
-            for task_id, task_info in self.tasks:
-                if task_info["hostname"] == hostname:
-                    tasks_for_worker[hostname].append(task_id)
-        return tasks_for_worker
-
-    def receive_worker_event(self, event):
-        event["state"] = event.pop("type")
-        event["when"] = self.timestamp_to_isoformat(event["timestamp"])
-        self.workers[event["hostname"]].append(event)
-
-    def worker_is_alive(self, hostname):
-        last_worker_event = self.workers[hostname][-1]
-        if last_worker_event and last_worker_event == "worker-online":
-            time_of_last_heartbeat = self.hearts[hostname]
-            if time.time() < time_of_last_heartbeat + HEARTBEAT_EXPIRE:
-                return True
-        return False
-
-    def tasks_by_time(self):
-        return dict(sorted(self.task_events.items(),
-                        key=lambda uuid__events: uuid__events[1][-1]["timestamp"]))
-
-    def tasks_by_last_state(self):
-        return [events[-1] for event in self.task_by_time()]
-
-monitor_state = MonitorState()

+ 0 - 35
celerymon/celerymon/web.py

@@ -1,35 +0,0 @@
-import threading
-
-from tornado import httpserver
-from tornado import ioloop
-from tornado.web import Application
-
-from celerymon.handlers import api
-
-
-class Site(Application):
-    """Tornado Website with multiple :class:`Application`'s."""
-
-    def __init__(self, applications, *args, **kwargs):
-        handlers = []
-        for urlprefix, application in applications:
-            for urlmatch, handler in application:
-                handlers.append((urlprefix + urlmatch, handler))
-        kwargs["handlers"] = handlers
-        super(Site, self).__init__(*args, **kwargs)
-
-
-class WebServerThread(threading.Thread):
-
-    def __init__(self, port=8989):
-        super(WebServerThread, self).__init__()
-        self.port = port
-        self.setDaemon(True)
-
-    def run(self):
-        site = Site([
-            (r"/api", api.API),
-        ])
-        http_server = httpserver.HTTPServer(site)
-        http_server.listen(self.port)
-        ioloop.IOLoop.instance().start()

+ 0 - 91
celerymon/setup.py

@@ -1,91 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-import codecs
-import sys
-import os
-import platform
-
-try:
-    from setuptools import setup, find_packages, Command
-except ImportError:
-    from ez_setup import use_setuptools
-    use_setuptools()
-    from setuptools import setup, find_packages, Command
-
-import celerymon as distmeta
-
-
-class RunTests(Command):
-    description = "Run the test suite from the testproj dir."
-
-    user_options = []
-
-    def initialize_options(self):
-        pass
-
-    def finalize_options(self):
-        pass
-
-    def run(self):
-        this_dir = os.getcwd()
-        testproj_dir = os.path.join(this_dir, "testproj")
-        os.chdir(testproj_dir)
-        sys.path.append(testproj_dir)
-        from django.core.management import execute_manager
-        os.environ["DJANGO_SETTINGS_MODULE"] = os.environ.get(
-                        "DJANGO_SETTINGS_MODULE", "settings")
-        settings_file = os.environ["DJANGO_SETTINGS_MODULE"]
-        settings_mod = __import__(settings_file, {}, {}, [''])
-        execute_manager(settings_mod, argv=[
-            __file__, "test"])
-        os.chdir(this_dir)
-
-install_requires = [
-    "celery",
-    "tornado"
-]
-
-# python-daemon doesn't run on windows, so check current platform
-if platform.system() == "Windows":
-    print("""
-    ***WARNING***
-    I see you are using windows. You will not be able to run celerymon
-    in daemon mode with the --detach parameter.""")
-else:
-    install_requires.append("python-daemon>=1.4.8")
-
-if os.path.exists("README.rst"):
-    long_description = codecs.open("README.rst", "r", "utf-8").read()
-else:
-    long_description = "See http://pypi.python.org/pypi/celerymon"
-
-
-setup(
-    name='celerymon',
-    version=distmeta.__version__,
-    description=distmeta.__doc__,
-    author=distmeta.__author__,
-    author_email=distmeta.__contact__,
-    url=distmeta.__homepage__,
-    platforms=["any"],
-    license="BSD",
-    packages=find_packages(exclude=['ez_setup']),
-    scripts=["bin/celerymon"],
-    zip_safe=False,
-    install_requires=install_requires,
-    cmdclass = {"test": RunTests},
-    classifiers=[
-        "Development Status :: 3 - Alpha",
-        "Operating System :: OS Independent",
-        "Programming Language :: Python",
-        "Programming Language :: Python :: 2.4",
-        "Programming Language :: Python :: 2.5",
-        "Programming Language :: Python :: 2.6",
-        "Programming Language :: Python :: 2.7",
-        "License :: OSI Approved :: BSD License",
-        "Operating System :: POSIX",
-        "Topic :: System :: Monitoring",
-        "Topic :: System :: Distributed Computing",
-    ],
-    long_description=long_description,
-)