Parcourir la source

Events off by default, can now turn it on by using -E|--event or CELERY_SEND_EVENTS.

Ask Solem il y a 15 ans
Parent
commit
3345f160ea

+ 16 - 6
celery/bin/celeryd.py

@@ -26,6 +26,10 @@
     Also run the ``celerybeat`` periodic task scheduler. Please note that
     there must only be one instance of this service.
 
+.. cmdoption:: -E, --events
+
+    Send events that can be captured by monitors like ``celerymon``.
+
 .. cmdoption:: -d, --detach, --daemon
 
     Run in the background as a daemon.
@@ -77,11 +81,12 @@ from celery.messaging import get_connection_info
 
 STARTUP_INFO_FMT = """
 Configuration ->
-    * Broker -> %(conninfo)s
-    * Exchange -> %(exchange)s (%(exchange_type)s)
-    * Consumer -> Queue:%(consumer_queue)s Binding:%(consumer_rkey)s
-    * Concurrency -> %(concurrency)s
-    * Celerybeat -> %(celerybeat)s
+    . broker -> %(conninfo)s
+    . exchange -> %(exchange)s (%(exchange_type)s)
+    . consumer -> queue:%(consumer_queue)s binding:%(consumer_rkey)s
+    . concurrency -> %(concurrency)s
+    . events -> %(events)s
+    . beat -> %(celerybeat)s
 """.strip()
 
 OPTION_LIST = (
@@ -107,6 +112,9 @@ OPTION_LIST = (
             action="store_true", dest="run_clockservice",
             help="Also run the celerybeat periodic task scheduler. \
                   Please note that only one instance must be running."),
+    optparse.make_option('-E', '--events', default=conf.CELERY_SEND_EVENTS,
+            action="store_true", dest="events",
+            help="Send events so celery can be monitored by e.g. celerymon."),
     optparse.make_option('-d', '--detach', '--daemon', default=False,
             action="store_true", dest="detach",
             help="Run in the background as a daemon."),
@@ -132,7 +140,7 @@ def run_worker(concurrency=conf.DAEMON_CONCURRENCY, detach=False,
         loglevel=conf.DAEMON_LOG_LEVEL, logfile=conf.DAEMON_LOG_FILE,
         discard=False, pidfile=conf.DAEMON_PID_FILE, umask=0,
         uid=None, gid=None, working_directory=None,
-        chroot=None, run_clockservice=False, **kwargs):
+        chroot=None, run_clockservice=False, events=False, **kwargs):
     """Starts the celery worker server."""
 
     print("Celery %s is starting." % __version__)
@@ -175,6 +183,7 @@ def run_worker(concurrency=conf.DAEMON_CONCURRENCY, detach=False,
             "loglevel": loglevel,
             "pidfile": pidfile,
             "celerybeat": run_clockservice and "ON" or "OFF",
+            "events": events and "ON" or "OFF",
     })
 
     print("Celery has started.")
@@ -199,6 +208,7 @@ def run_worker(concurrency=conf.DAEMON_CONCURRENCY, detach=False,
                                 loglevel=loglevel,
                                 logfile=logfile,
                                 embed_clockservice=run_clockservice,
+                                send_events=events,
                                 is_detached=detach)
 
         # Install signal handler that restarts celeryd on SIGHUP,

+ 13 - 0
celery/conf.py

@@ -29,6 +29,7 @@ DEFAULT_CELERYBEAT_SCHEDULE_FILENAME = "celerybeat-schedule"
 DEFAULT_CELERYMON_PID_FILE = "celerymon.pid"
 DEFAULT_CELERYMON_LOG_LEVEL = "INFO"
 DEFAULT_CELERYMON_LOG_FILE = "celerymon.log"
+DEFAULT_SEND_EVENTS = False
 
 
 """
@@ -362,3 +363,15 @@ Default is: ``celerymon.log``.
 CELERYMON_LOG_FILE = getattr(settings, "CELERYMON_LOG_FILE",
                               DEFAULT_CELERYMON_LOG_FILE)
 
+
+"""
+
+.. data:: CELERY_SEND_EVENTS
+
+If set, celery will send events that can be captured by monitors like
+``celerymon``.
+Default is: ``False``.
+
+"""
+CELERY_SEND_EVENTS = getattr(settings, "CELERY_SEND_EVENTS",
+                             DEFAULT_SEND_EVENTS)

+ 7 - 1
celery/events.py

@@ -17,14 +17,18 @@ class EventDispatcher(object):
     :keyword hostname: Hostname to identify ourselves as,
         by default uses the hostname returned by :func:`socket.gethostname`.
 
+    :keyword enabled: Set to ``False`` to not actually publish any events,
+        making :meth:`send` a noop operation.
+
     You need to :meth:`close` this after use.
 
     """
 
-    def __init__(self, connection, hostname=None):
+    def __init__(self, connection, hostname=None, enabled=True):
         self.connection = connection
         self.publisher = EventPublisher(self.connection)
         self.hostname = hostname or socket.gethostname()
+        self.enabled = enabled
         self._lock = threading.Lock()
 
     def send(self, type, **fields):
@@ -34,6 +38,8 @@ class EventDispatcher(object):
         :keyword \*\*fields: Event arguments.
 
         """
+        if not self.enabled:
+            return
         self._lock.acquire()
         try:
             fields["timestamp"] = time.time()

+ 1 - 1
celery/monitoring/__init__.py

@@ -14,7 +14,7 @@ class MonitorListener(object):
             "task-received": state.receive_task_received,
             "task-accepted": state.receive_task_event,
             "task-succeeded": state.receive_task_event,
-            "task-retried": state.receive_task_event
+            "task-retried": state.receive_task_event,
             "task-failed": state.receive_task_event,
             "worker-online": state.receive_worker_event,
             "worker-offline": state.receive_worker_event,

+ 15 - 0
celery/worker/__init__.py

@@ -24,6 +24,8 @@ class WorkController(object):
     :param concurrency: see :attr:`concurrency`.
     :param logfile: see :attr:`logfile`.
     :param loglevel: see :attr:`loglevel`.
+    :param embed_clockservice: see :attr:`run_clockservice`.
+    :param send_events: see :attr:`send_events`.
 
 
     .. attribute:: concurrency
@@ -40,6 +42,16 @@ class WorkController(object):
         The logfile used, if no logfile is specified it uses ``stderr``
         (default: :const:`celery.conf.DAEMON_LOG_FILE`).
 
+    .. attribute:: embed_clockservice
+
+        If ``True``, celerybeat is embedded, running in the main worker
+        process as a thread.
+
+    .. attribute:: send_events
+
+        Enable the sending of monitoring events, these events can be captured
+        by monitors (celerymon).
+
     .. attribute:: logger
 
         The :class:`logging.Logger` instance used for logging.
@@ -82,6 +94,7 @@ class WorkController(object):
     _state = None
 
     def __init__(self, concurrency=None, logfile=None, loglevel=None,
+            send_events=conf.CELERY_SEND_EVENTS,
             is_detached=False, embed_clockservice=False):
 
         # Options
@@ -91,6 +104,7 @@ class WorkController(object):
         self.is_detached = is_detached
         self.logger = setup_logger(loglevel, logfile)
         self.embed_clockservice = embed_clockservice
+        self.send_events = send_events
 
         # Queues
         if conf.DISABLE_RATE_LIMITS:
@@ -107,6 +121,7 @@ class WorkController(object):
         self.broker_listener = CarrotListener(self.ready_queue,
                                         self.eta_scheduler,
                                         logger=self.logger,
+                                        send_events=send_events,
                                         initial_prefetch_count=concurrency)
         self.mediator = Mediator(self.ready_queue, self.safe_process_task)
 

+ 4 - 2
celery/worker/listener.py

@@ -41,11 +41,12 @@ class CarrotListener(object):
     """
 
     def __init__(self, ready_queue, eta_scheduler, logger,
-            initial_prefetch_count=2):
+            send_events=False, initial_prefetch_count=2):
         self.amqp_connection = None
         self.task_consumer = None
         self.ready_queue = ready_queue
         self.eta_scheduler = eta_scheduler
+        self.send_events = send_events
         self.logger = logger
         self.prefetch_count = SharedCounter(initial_prefetch_count)
         self.event_dispatcher = None
@@ -172,7 +173,8 @@ class CarrotListener(object):
         self.broadcast_consumer = BroadcastConsumer(self.amqp_connection)
         self.task_consumer.add_consumer(self.broadcast_consumer)
         self.task_consumer.register_callback(self.receive_message)
-        self.event_dispatcher = EventDispatcher(self.amqp_connection)
+        self.event_dispatcher = EventDispatcher(self.amqp_connection,
+                                                enabled=self.send_events)
         self.heart = Heart(self.event_dispatcher)
         self.heart.start()