Parcourir la source

Merge branch 'master' of git://github.com/ask/celery

Jerzy Kozera il y a 15 ans
Parent
commit
b9b49c7cd4
6 fichiers modifiés avec 80 ajouts et 13 suppressions
  1. 0 1
      celery/execute.py
  2. 10 3
      celery/log.py
  3. 3 0
      celery/messaging.py
  4. 30 0
      celery/signals.py
  5. 5 5
      celery/worker/controllers.py
  6. 32 4
      celery/worker/job.py

+ 0 - 1
celery/execute.py

@@ -6,7 +6,6 @@ from celery.registry import tasks
 from celery.utils import gen_unique_id, noop, fun_takes_kwargs
 from functools import partial as curry
 from datetime import datetime, timedelta
-from multiprocessing import get_logger
 from celery.exceptions import RetryTaskError
 from celery.datastructures import ExceptionInfo
 from celery.backends import default_backend

+ 10 - 3
celery/log.py

@@ -7,6 +7,13 @@ import traceback
 from celery.conf import LOG_FORMAT, DAEMON_LOG_LEVEL
 
 
+def get_default_logger(loglevel=None):
+    import multiprocessing
+    logger = multiprocessing.get_logger()
+    loglevel is not None and logger.setLevel(loglevel)
+    return logger
+
+
 def setup_logger(loglevel=DAEMON_LOG_LEVEL, logfile=None, format=LOG_FORMAT,
         **kwargs):
     """Setup the ``multiprocessing`` logger. If ``logfile`` is not specified,
@@ -14,10 +21,9 @@ def setup_logger(loglevel=DAEMON_LOG_LEVEL, logfile=None, format=LOG_FORMAT,
 
     Returns logger object.
     """
-    import multiprocessing
-    logger = multiprocessing.get_logger()
-    logger.setLevel(loglevel)
+    logger = get_default_logger(loglevel=loglevel)
     if logger.handlers:
+        # Logger already configured
         return logger
     if logfile:
         if hasattr(logfile, "write"):
@@ -28,6 +34,7 @@ def setup_logger(loglevel=DAEMON_LOG_LEVEL, logfile=None, format=LOG_FORMAT,
         log_file_handler.setFormatter(formatter)
         logger.addHandler(log_file_handler)
     else:
+        import multiprocessing
         multiprocessing.log_to_stderr()
     return logger
 

+ 3 - 0
celery/messaging.py

@@ -5,6 +5,7 @@ Sending and Receiving Messages
 """
 from carrot.messaging import Publisher, Consumer, ConsumerSet
 from celery import conf
+from celery import signals
 from celery.utils import gen_unique_id
 from celery.utils import mitemgetter
 from celery.serialization import pickle
@@ -58,6 +59,8 @@ class TaskPublisher(Publisher):
             message_data["taskset"] = part_of_set
 
         self.send(message_data, **extract_msg_options(kwargs))
+        signals.task_sent.send(sender=task_name, **message_data)
+
         return task_id
 
 

+ 30 - 0
celery/signals.py

@@ -1,8 +1,38 @@
 from django.dispatch import Signal
 
+"""
+
+.. DATA: task_sent
+
+Triggered when a task has been sent to the broker.
+
+Provides arguments:
+
+* task_id
+    Id of the task to be executed.
+
+* task
+    The task being executed.
+
+* args
+    the tasks positional arguments.
+
+* kwargs
+    The tasks keyword arguments.
+
+* eta
+    The time to execute the task.
+
+* taskset
+    Id of the taskset this task is part of (if any).
+
 
 """
+task_sent = Signal(providing_args=[
+                        "task_id", "task", "args", "kwargs", "eta",
+                        "taskset"])
 
+"""
 .. DATA: task_prerun
 
 Triggered before a task is executed.

+ 5 - 5
celery/worker/controllers.py

@@ -6,7 +6,7 @@ Worker Controller Threads
 from celery.backends import default_periodic_status_backend
 from Queue import Empty as QueueEmpty
 from datetime import datetime
-from multiprocessing import get_logger
+from celery.log import get_default_logger
 import traceback
 import threading
 import time
@@ -84,7 +84,7 @@ class Mediator(BackgroundThread):
         self.callback = callback
 
     def on_iteration(self):
-        logger = get_logger()
+        logger = get_default_logger()
         try:
             logger.debug("Mediator: Trying to get message from bucket_queue")
             # This blocks until there's a message in the queue.
@@ -119,7 +119,7 @@ class PeriodicWorkController(BackgroundThread):
         default_periodic_status_backend.init_periodic_tasks()
 
     def on_iteration(self):
-        logger = get_logger()
+        logger = get_default_logger()
         logger.debug("PeriodicWorkController: Running periodic tasks...")
         try:
             self.run_periodic_tasks()
@@ -133,7 +133,7 @@ class PeriodicWorkController(BackgroundThread):
         time.sleep(1)
 
     def run_periodic_tasks(self):
-        logger = get_logger()
+        logger = get_default_logger()
         applied = default_periodic_status_backend.run_periodic_tasks()
         for task, task_id in applied:
             logger.debug(
@@ -143,7 +143,7 @@ class PeriodicWorkController(BackgroundThread):
     def process_hold_queue(self):
         """Finds paused tasks that are ready for execution and move
         them to the :attr:`bucket_queue`."""
-        logger = get_logger()
+        logger = get_default_logger()
         try:
             logger.debug(
                 "PeriodicWorkController: Getting next task from hold queue..")

+ 32 - 4
celery/worker/job.py

@@ -7,11 +7,13 @@ from celery.registry import tasks
 from celery.exceptions import NotRegistered
 from celery.execute import ExecuteWrapper
 from celery.utils import noop, fun_takes_kwargs
+from celery.log import get_default_logger
 from django.core.mail import mail_admins
 import multiprocessing
 import socket
 import sys
 
+
 # pep8.py borks on a inline signature separator and
 # says "trailing whitespace" ;)
 EMAIL_SIGNATURE_SEP = "-- "
@@ -19,19 +21,26 @@ TASK_FAIL_EMAIL_BODY = """
 Task %%(name)s with id %%(id)s raised exception: %%(exc)s
 
 
-Task was called with args:%%(args)s kwargs:%%(kwargs)s.
+Task was called with args: %%(args)s kwargs: %%(kwargs)s.
+
 The contents of the full traceback was:
 
 %%(traceback)s
 
 %(EMAIL_SIGNATURE_SEP)s
-Just thought I'd let you know!
+Just to let you know,
 celeryd at %%(hostname)s.
 """ % {"EMAIL_SIGNATURE_SEP": EMAIL_SIGNATURE_SEP}
 
 
+class AlreadyExecutedError(Exception):
+    """Tasks can only be executed once, as they might change
+    world-wide state."""
+
+
 class TaskWrapper(object):
-    """Class wrapping a task to be run.
+    """Class wrapping a task to be passed around and finally
+    executed inside of the worker.
 
     :param task_name: see :attr:`task_name`.
 
@@ -67,6 +76,11 @@ class TaskWrapper(object):
 
         The original message sent. Used for acknowledging the message.
 
+    .. attribute executed
+
+    Set if the task has been executed. A task should only be executed
+    once.
+
     """
     success_msg = "Task %(name)s[%(id)s] processed: %(return_value)s"
     fail_msg = """
@@ -87,11 +101,12 @@ class TaskWrapper(object):
         self.kwargs = kwargs
         self.logger = kwargs.get("logger")
         self.on_ack = on_ack
+        self.executed = False
         for opt in ("success_msg", "fail_msg", "fail_email_subject",
                 "fail_email_body"):
             setattr(self, opt, opts.get(opt, getattr(self, opt, None)))
         if not self.logger:
-            self.logger = multiprocessing.get_logger()
+            self.logger = get_default_logger()
 
     def __repr__(self):
         return '<%s: {name:"%s", id:"%s", args:"%s", kwargs:"%s"}>' % (
@@ -154,6 +169,13 @@ class TaskWrapper(object):
         return ExecuteWrapper(self.task_func, self.task_id, self.task_name,
                               self.args, task_func_kwargs)
 
+    def _set_executed_bit(self):
+        if self.executed:
+            raise AlreadyExecutedError(
+                   "Task %s[%s] has already been executed" % (
+                       self.task_name, self.task_id))
+        self.executed = True
+
     def execute(self, loglevel=None, logfile=None):
         """Execute the task in a :class:`celery.execute.ExecuteWrapper`.
 
@@ -162,6 +184,9 @@ class TaskWrapper(object):
         :keyword logfile: The logfile used by the task.
 
         """
+        # Make sure task has not already been executed.
+        self._set_executed_bit()
+
         # acknowledge task as being processed.
         self.on_ack()
 
@@ -179,6 +204,9 @@ class TaskWrapper(object):
         :returns :class:`multiprocessing.AsyncResult` instance.
 
         """
+        # Make sure task has not already been executed.
+        self._set_executed_bit()
+
         wrapper = self._executeable(loglevel, logfile)
         return pool.apply_async(wrapper,
                 callbacks=[self.on_success], errbacks=[self.on_failure],