浏览代码

Now sending all task events except for task-retried

Ask Solem 15 年之前
父节点
当前提交
024fe6a109
共有 5 个文件被更改,包括 98 次插入38 次删除
  1. 50 11
      celery/conf.py
  2. 11 9
      celery/events.py
  3. 10 10
      celery/monitoring.py
  4. 8 4
      celery/worker/__init__.py
  5. 19 4
      celery/worker/job.py

+ 50 - 11
celery/conf.py

@@ -27,6 +27,9 @@ DEFAULT_CELERYBEAT_PID_FILE = "celerybeat.pid"
 DEFAULT_CELERYBEAT_LOG_LEVEL = "INFO"
 DEFAULT_CELERYBEAT_LOG_FILE = "celerybeat.log"
 DEFAULT_CELERYBEAT_SCHEDULE_FILENAME = "celerybeat-schedule"
+DEFAULT_CELERYMON_PID_FILE = "celerymon.pid"
+DEFAULT_CELERYMON_LOG_LEVEL = "INFO"
+DEFAULT_CELERYMON_LOG_FILE = "celerymon.log"
 
 
 """
@@ -258,17 +261,6 @@ cache backend in ``CACHE_BACKEND`` will be used.
 """
 CELERY_CACHE_BACKEND = getattr(settings, "CELERY_CACHE_BACKEND", None)
 
-"""
-
-.. data:: CELERYBEAT_PID_FILE
-
-Name of celerybeats pid file.
-Default is: ``celerybeat.pid``.
-
-"""
-CELERYBEAT_PID_FILE = getattr(settings, "CELERYBEAT_PID_FILE",
-                              DEFAULT_CELERYBEAT_PID_FILE)
-
 
 """
 
@@ -293,6 +285,18 @@ DISABLE_RATE_LIMITS = getattr(settings, "CELERY_DISABLE_RATE_LIMITS",
 
 """
 
+.. data:: CELERYBEAT_PID_FILE
+
+Name of celerybeats pid file.
+Default is: ``celerybeat.pid``.
+
+"""
+CELERYBEAT_PID_FILE = getattr(settings, "CELERYBEAT_PID_FILE",
+                              DEFAULT_CELERYBEAT_PID_FILE)
+
+
+"""
+
 .. data:: CELERYBEAT_LOG_LEVEL
 
 Default log level for celerybeat.
@@ -324,3 +328,38 @@ Default is: ``celerybeat-schedule``.
 CELERYBEAT_SCHEDULE_FILENAME = getattr(settings,
                                        "CELERYBEAT_SCHEDULE_FILENAME",
                                        DEFAULT_CELERYBEAT_SCHEDULE_FILENAME)
+
+"""
+
+.. data:: CELERYMON_PID_FILE
+
+Name of celerymons pid file.
+Default is: ``celerymon.pid``.
+
+"""
+CELERYMON_PID_FILE = getattr(settings, "CELERYMON_PID_FILE",
+                              DEFAULT_CELERYMON_PID_FILE)
+
+
+"""
+
+.. data:: CELERYMON_LOG_LEVEL
+
+Default log level for celerymon.
+Default is: ``INFO``.
+
+"""
+CELERYMON_LOG_LEVEL = getattr(settings, "CELERYMON_LOG_LEVEL",
+                               DEFAULT_CELERYMON_LOG_LEVEL)
+
+"""
+
+.. data:: CELERYMON_LOG_FILE
+
+Default log file for celerymon.
+Default is: ``celerymon.log``.
+
+"""
+CELERYMON_LOG_FILE = getattr(settings, "CELERYMON_LOG_FILE",
+                              DEFAULT_CELERYMON_LOG_FILE)
+

+ 11 - 9
celery/events.py

@@ -8,11 +8,11 @@ Events
 
 WORKER-ONLINE    hostname timestamp
 WORKER-OFFLINE   hostname timestamp
-TASK-RECEIVED    id name args kwargs retries eta queue exchange rkey timestamp
-TASK-ACCEPTED    id timestamp
-TASK-SUCCEEDED   id result timestamp
-TASK-FAILED      id exception timestamp
-TASK-RETRIED     id exception timestamp
+TASK-RECEIVED    uuid name args kwargs retries eta timestamp
+TASK-ACCEPTED    uuid timestamp
+TASK-SUCCEEDED   uuid result timestamp
+TASK-FAILED      uuid exception timestamp
+TASK-RETRIED     uuid exception timestamp
 WORKER-HEARTBEAT hostname timestamp
 
 """
@@ -32,6 +32,7 @@ class EventDispatcher(object):
         self.publisher = EventPublisher(self.connection)
 
     def send(self, type, **fields):
+        fields["timestamp"] = time.time()
         self.publisher.send(Event(type, **fields))
 
 
@@ -43,13 +44,14 @@ class EventReceiver(object):
         if handlers is not None:
             self.handlers = handlers
 
-    def process(self, event):
-        type = event["type"]
+    def process(self, type, event):
+        print("Received event: %s" % event)
         handler = self.handlers.get(type) or self.handlers.get("*")
         handler and handler(event)
 
-    def _receive(message, message_data):
-        self.process(message_data)
+    def _receive(self, message_data, message):
+        type = message_data.pop("type").lower()
+        self.process(type, Event(type, **message_data))
 
     def consume(self, limit=None):
         consumer = EventConsumer(self.connection)

+ 10 - 10
celery/monitoring.py

@@ -51,15 +51,15 @@ class MonitorListener(object):
 
     def __init__(self, state):
         self.connection = DjangoBrokerConnection()
-        self.receiver = EventReceiver(connection, handlers={
-            "worker-heartbeat": self.state.receive_heartbeat,
-            "worker-online": self.state.receive_worker_event,
-            "worker-offline": self.state.receive_worker_event,
-            "task-received": self.state.receive_task_event,
-            "task-accepted": self.state.receive_task_event,
-            "task-succeeded": self.state.receive_task_event,
-            "task-failed": self.state.receive_task_event,
-            "task-retried": self.state.receive_task_event
+        self.receiver = EventReceiver(self.connection, handlers={
+            "worker-heartbeat": state.receive_heartbeat,
+            "worker-online": state.receive_worker_event,
+            "worker-offline": state.receive_worker_event,
+            "task-received": state.receive_task_event,
+            "task-accepted": state.receive_task_event,
+            "task-succeeded": state.receive_task_event,
+            "task-failed": state.receive_task_event,
+            "task-retried": state.receive_task_event
         })
 
     def start(self):
@@ -72,7 +72,7 @@ class MonitorService(object):
         self.logger = logger
         self.is_detached = is_detached
 
-    def start():
+    def start(self):
         state = MonitorState()
         listener = MonitorListener(state)
 

+ 8 - 4
celery/worker/__init__.py

@@ -103,16 +103,20 @@ class CarrotListener(object):
         """
         try:
             task = TaskWrapper.from_message(message, message_data,
-                                            logger=self.logger)
+                                            logger=self.logger,
+                                            eventer=self.event_dispatcher)
         except NotRegistered, exc:
             self.logger.error("Unknown task ignored: %s" % (exc))
             return
 
         eta = message_data.get("eta")
 
-        print(message_data)
-        self.event_dispatcher.send("task-received", **message_data)
-
+        self.event_dispatcher.send("task-received", uuid=task.task_id,
+                                                    name=task.task_name,
+                                                    args=task.args,
+                                                    kwargs=task.kwargs,
+                                                    retries=task.retries,
+                                                    eta=eta)
         if eta:
             if not isinstance(eta, datetime):
                 eta = parse_iso8601(eta)

+ 19 - 4
celery/worker/job.py

@@ -181,7 +181,8 @@ class TaskWrapper(object):
         self.retries = retries
         self.args = args
         self.kwargs = kwargs
-        self.logger = kwargs.get("logger")
+        self.logger = opts.get("logger")
+        self.eventer = opts.get("eventer")
         self.on_ack = on_ack
         self.executed = False
         for opt in ("success_msg", "fail_msg", "fail_email_subject",
@@ -200,7 +201,7 @@ class TaskWrapper(object):
                 self.args, self.kwargs)
 
     @classmethod
-    def from_message(cls, message, message_data, logger=None):
+    def from_message(cls, message, message_data, logger=None, eventer=None):
         """Create a :class:`TaskWrapper` from a task message sent by
         :class:`celery.messaging.TaskPublisher`.
 
@@ -221,7 +222,8 @@ class TaskWrapper(object):
                         for key, value in kwargs.items())
 
         return cls(task_name, task_id, args, kwargs,
-                    retries=retries, on_ack=message.ack, logger=logger)
+                    retries=retries, on_ack=message.ack,
+                    logger=logger, eventer=eventer)
 
     def extend_with_default_kwargs(self, loglevel, logfile):
         """Extend the tasks keyword arguments with standard task arguments.
@@ -275,6 +277,10 @@ class TaskWrapper(object):
         tracer = WorkerTaskTrace(*self._get_tracer_args(loglevel, logfile))
         return tracer.execute()
 
+    def send_event(self, type, **fields):
+        if self.eventer:
+            self.eventer.send(type, **fields)
+
     def execute_using_pool(self, pool, loglevel=None, logfile=None):
         """Like :meth:`execute`, but using the :mod:`multiprocessing` pool.
 
@@ -290,6 +296,8 @@ class TaskWrapper(object):
         # Make sure task has not already been executed.
         self._set_executed_bit()
 
+        self.send_event("task-accepted", uuid=self.task_id)
+
         args = self._get_tracer_args(loglevel, logfile)
         return pool.apply_async(execute_and_trace, args=args,
                 callbacks=[self.on_success], errbacks=[self.on_failure],
@@ -298,6 +306,9 @@ class TaskWrapper(object):
     def on_success(self, ret_value):
         """The handler used if the task was successfully processed (
         without raising an exception)."""
+
+        self.send_event("task-succeeded", uuid=self.task_id, result=ret_value)
+
         msg = self.success_msg.strip() % {
                 "id": self.task_id,
                 "name": self.task_name,
@@ -308,6 +319,10 @@ class TaskWrapper(object):
         """The handler used if the task raised an exception."""
         from celery.conf import SEND_CELERY_TASK_ERROR_EMAILS
 
+        self.send_event("task-failed", uuid=self.task_id,
+                                       exception=exc_info.exception,
+                                       traceback=exc_info.traceback)
+
         context = {
             "hostname": socket.gethostname(),
             "id": self.task_id,
@@ -321,7 +336,7 @@ class TaskWrapper(object):
 
         task_obj = tasks.get(self.task_name, object)
         send_error_email = SEND_CELERY_TASK_ERROR_EMAILS and not \
-                getattr(task_obj, "disable_error_emails", False)
+                                task_obj.disable_error_emails
         if send_error_email:
             subject = self.fail_email_subject.strip() % context
             body = self.fail_email_body.strip() % context