Ask Solem vor 16 Jahren
Ursprung
Commit
7e824b2025
6 geänderte Dateien mit 89 neuen und 74 gelöschten Zeilen
  1. 1 1
      celery/backends/base.py
  2. 78 64
      celery/bin/celeryd.py
  3. 2 1
      celery/fields.py
  4. 1 1
      celery/tests/test_models.py
  5. 2 1
      celery/views.py
  6. 5 6
      celery/worker.py

+ 1 - 1
celery/backends/base.py

@@ -182,6 +182,6 @@ class BaseBackend(object):
         """Cleanup actions to do at the end of a task worker process.
 
         See :func:`celery.worker.jail`.
-        
+
         """
         pass

+ 78 - 64
celery/bin/celeryd.py

@@ -29,17 +29,17 @@
     Discard all waiting tasks before the daemon is started.
     **WARNING**: This is unrecoverable, and the tasks will be
     deleted from the messaging server.
-    
+
 .. cmdoption:: -u, --uid
 
     User-id to run ``celeryd`` as when in daemon mode.
 
 .. cmdoption:: -g, --gid
-       
+
     Group-id to run ``celeryd`` as when in daemon mode.
 
 .. cmdoption:: --umask
-    
+
     umask of the process when in daemon mode.
 
 .. cmdoption:: --workdir
@@ -62,6 +62,8 @@ from django.conf import settings
 from celery.log import emergency_error
 from celery.conf import LOG_LEVELS, DAEMON_LOG_FILE, DAEMON_LOG_LEVEL
 from celery.conf import DAEMON_CONCURRENCY, DAEMON_PID_FILE
+from celery.log import setup_logger
+from celery.messaging import TaskConsumer
 from celery import conf
 from celery import discovery
 from celery.task import discard_all
@@ -76,12 +78,50 @@ import errno
 
 STARTUP_INFO_FMT = """
     * Celery loading with the following configuration
-        * Broker -> amqp://%(vhost)s@%(host)s:%(port)s 
+        * Broker -> amqp://%(vhost)s@%(host)s:%(port)s
         * Exchange -> %(exchange)s (%(exchange_type)s)
         * Consumer -> Queue:%(consumer_queue)s Routing:%(consumer_rkey)s
         * Concurrency:%(concurrency)s
 """.strip()
 
+OPTION_LIST = (
+    optparse.make_option('-c', '--concurrency', default=DAEMON_CONCURRENCY,
+            action="store", dest="concurrency", type="int",
+            help="Number of child processes processing the queue."),
+    optparse.make_option('--discard', default=False,
+            action="store_true", dest="discard",
+            help="Discard all waiting tasks before the server is started. "
+                 "WARNING: This is unrecoverable, and the tasks will be "
+                 "deleted from the messaging server."),
+    optparse.make_option('-f', '--logfile', default=DAEMON_LOG_FILE,
+            action="store", dest="logfile",
+            help="Path to log file."),
+    optparse.make_option('-l', '--loglevel', default=DAEMON_LOG_LEVEL,
+            action="store", dest="loglevel",
+            help="Choose between DEBUG/INFO/WARNING/ERROR/CRITICAL/FATAL."),
+    optparse.make_option('-p', '--pidfile', default=DAEMON_PID_FILE,
+            action="store", dest="pidfile",
+            help="Path to pidfile."),
+    optparse.make_option('-d', '--detach', '--daemon', default=False,
+            action="store_true", dest="detach",
+            help="Run in the background as a daemon."),
+    optparse.make_option('-u', '--uid', default=None,
+            action="store", dest="uid",
+            help="User-id to run celeryd as when in daemon mode."),
+    optparse.make_option('-g', '--gid', default=None,
+            action="store", dest="gid",
+            help="Group-id to run celeryd as when in daemon mode."),
+    optparse.make_option('--umask', default=0,
+            action="store", type="int", dest="umask",
+            help="umask of the process when in daemon mode."),
+    optparse.make_option('--workdir', default=None,
+            action="store", dest="working_directory",
+            help="Directory to change to when in daemon mode."),
+    optparse.make_option('--chroot', default=None,
+            action="store", dest="chroot",
+            help="Change root directory to this path when in daemon mode."),
+    )
+
 
 def acquire_pidlock(pidfile):
     """Get the :class:`daemon.pidlockfile.PIDLockFile` handler for
@@ -103,7 +143,7 @@ def acquire_pidlock(pidfile):
     except os.error, exc:
         if exc.errno == errno.ESRCH:
             sys.stderr.write("Stale pidfile exists. Removing it.\n")
-            pidlock.release() 
+            pidlock.release()
             return
     else:
         raise SystemExit(
@@ -113,11 +153,12 @@ def acquire_pidlock(pidfile):
     return pidlock
 
 
-def run_worker(concurrency=DAEMON_CONCURRENCY, daemon=False,
+def run_worker(concurrency=DAEMON_CONCURRENCY, detach=False,
         loglevel=DAEMON_LOG_LEVEL, logfile=DAEMON_LOG_FILE, discard=False,
         pidfile=DAEMON_PID_FILE, umask=0, uid=None, gid=None,
         working_directory=None, chroot=None, **kwargs):
-    """Run the celery daemon."""
+    """Starts the celery worker server."""
+
     if not concurrency:
         concurrency = multiprocessing.cpu_count()
     if settings.DATABASE_ENGINE == "sqlite3" and concurrency > 1:
@@ -126,16 +167,30 @@ def run_worker(concurrency=DAEMON_CONCURRENCY, daemon=False,
                 "concurrency. We'll be using a single process only.",
                 UserWarning)
         concurrency = 1
-    
+
+    # Setup logging
     if not isinstance(loglevel, int):
         loglevel = LOG_LEVELS[loglevel.upper()]
+    if not detach:
+        logfile = None # log to stderr when not running in the background.
+    logger = setup_logger(logfile=logfile, loglevel=loglevel)
+
+    def say(msg):
+        """Log the message using loglevel ``INFO`` if running in the
+        background, else just print the message to ``stdout``."""
+        if detach:
+            return logger.info(msg)
+        print(msg)
 
     if discard:
         discarded_count = discard_all()
         what = discard_count > 1 and "messages" or "message"
-        sys.stderr.write("Discard: Erased %d %s from the queue.\n" % (
-            discarded_count, what))
-    startup_info = STARTUP_INFO_FMT % {
+        say("discard: Erased %d %s from the queue.\n" % (
+                discarded_count, what))
+
+    # Dump configuration to screen so we have some basic information
+    # when users sends e-mails.
+    say(STARTUP_INFO_FMT % {
             "vhost": settings.AMQP_VHOST,
             "host": settings.AMQP_SERVER,
             "port": settings.AMQP_PORT,
@@ -147,20 +202,20 @@ def run_worker(concurrency=DAEMON_CONCURRENCY, daemon=False,
             "concurrency": concurrency,
             "loglevel": loglevel,
             "pidfile": pidfile,
-    }
-    sys.stderr.write(startup_info + "\n")
-    if daemon:
+    })
+
+    if detach:
         # Since without stderr any errors will be silently suppressed,
         # we need to know that we have access to the logfile
+        if logfile:
+            open(logfile, "a").close()
         pidlock = acquire_pidlock(pidfile)
         if not umask:
             umask = 0
-        if logfile:
-            open(logfile, "a").close()
         uid = uid and int(uid) or os.geteuid()
         gid = gid and int(gid) or os.getegid()
         working_directory = working_directory or os.getcwd()
-        sys.stderr.write("* Launching celeryd in the background...\n")
+        print("* Launching celeryd in the background...")
         context = DaemonContext(chroot_directory=chroot,
                                 working_directory=working_directory,
                                 umask=umask,
@@ -168,64 +223,23 @@ def run_worker(concurrency=DAEMON_CONCURRENCY, daemon=False,
                                 uid=uid,
                                 gid=gid)
         context.open()
-    else:
-        logfile = None # log to stderr when not running as daemon.
 
     discovery.autodiscover()
-    celeryd = WorkController(concurrency=concurrency,
-                               loglevel=loglevel,
-                               logfile=logfile,
-                               is_detached=daemon)
+    worker = WorkController(concurrency=concurrency,
+                            loglevel=loglevel,
+                            logfile=logfile,
+                            is_detached=detach)
     try:
-        celeryd.run()
+        worker.run()
     except Exception, e:
         emergency_error(logfile, "celeryd raised exception %s: %s\n%s" % (
                             e.__class__, e, traceback.format_exc()))
     except:
-        if daemon:
+        if context:
             context.close()
         raise
 
 
-OPTION_LIST = (
-    optparse.make_option('-c', '--concurrency', default=DAEMON_CONCURRENCY,
-            action="store", dest="concurrency", type="int",
-            help="Number of child processes processing the queue."),
-    optparse.make_option('--discard', default=False,
-            action="store_true", dest="discard",
-            help="Discard all waiting tasks before the daemon is started. "
-                 "WARNING: This is unrecoverable, and the tasks will be "
-                 "deleted from the messaging server."),
-    optparse.make_option('-f', '--logfile', default=DAEMON_LOG_FILE,
-            action="store", dest="logfile",
-            help="Path to log file."),
-    optparse.make_option('-l', '--loglevel', default=DAEMON_LOG_LEVEL,
-            action="store", dest="loglevel",
-            help="Choose between DEBUG/INFO/WARNING/ERROR/CRITICAL/FATAL."),
-    optparse.make_option('-p', '--pidfile', default=DAEMON_PID_FILE,
-            action="store", dest="pidfile",
-            help="Path to pidfile."),
-    optparse.make_option('-d', '--detach', '--daemon', default=False,
-            action="store_true", dest="daemon",
-            help="Run in the background as a daemon."),
-    optparse.make_option('-u', '--uid', default=None,
-            action="store", dest="uid",
-            help="User-id to run celeryd as when in daemon mode."),
-    optparse.make_option('-g', '--gid', default=None,
-            action="store", dest="gid",
-            help="Group-id to run celeryd as when in daemon mode."),
-    optparse.make_option('--umask', default=0,
-            action="store", type="int", dest="umask",
-            help="umask of the process when in daemon mode."),
-    optparse.make_option('--workdir', default=None,
-            action="store", dest="working_directory",
-            help="Directory to change to when in daemon mode."),
-    optparse.make_option('--chroot', default=None,
-            action="store", dest="chroot",
-            help="Change root directory to this path when in daemon mode."),
-    )
-
-
 def parse_options(arguments):
     """Parse the available options to ``celeryd``."""
     parser = optparse.OptionParser(option_list=OPTION_LIST)

+ 2 - 1
celery/fields.py

@@ -23,7 +23,8 @@ if settings.DATABASE_ENGINE == "postgresql_psycopg2":
     import psycopg2.extensions
     # register PickledObject as a QuotedString otherwise we will see
     # can't adapt errors from psycopg2.
-    psycopg2.extensions.register_adapter(PickledObject, psycopg2.extensions.QuotedString)
+    psycopg2.extensions.register_adapter(PickledObject,
+            psycopg2.extensions.QuotedString)
 
 
 class PickledObjectField(models.Field):

+ 1 - 1
celery/tests/test_models.py

@@ -57,7 +57,7 @@ class TestModels(unittest.TestCase):
         self.assertTrue(unicode(p).startswith("<PeriodicTask:"))
         self.assertFalse(p in PeriodicTaskMeta.objects.get_waiting_tasks())
         # Have to avoid save() because it applies the auto_now=True.
-        PeriodicTaskMeta.objects.filter(name=p.name).update (
+        PeriodicTaskMeta.objects.filter(name=p.name).update(
                 last_run_at=datetime.now() - (TestPeriodicTask.run_every +
                 timedelta(seconds=10)))
         self.assertTrue(p in PeriodicTaskMeta.objects.get_waiting_tasks())

+ 2 - 1
celery/views.py

@@ -27,4 +27,5 @@ def task_status(request, task_id):
             "status": status,
             "result": async_result.result,
         }
-    return HttpResponse(JSON_dump({"task": response_data}), mimetype="application/json")
+    return HttpResponse(JSON_dump({"task": response_data}),
+            mimetype="application/json")

+ 5 - 6
celery/worker.py

@@ -97,7 +97,6 @@ def jail(task_id, func, args, kwargs):
         default_backend.mark_as_done(task_id, result)
         return result
 
-    
 
 class TaskWrapper(object):
     """Class wrapping a task to be run.
@@ -270,9 +269,9 @@ class PeriodicWorkController(threading.Thread):
     Example
 
         >>> PeriodicWorkController().start()
-    
+
     """
-    
+
     def __init__(self):
         super(PeriodicWorkController, self).__init__()
         self._shutdown = threading.Event()
@@ -286,7 +285,7 @@ class PeriodicWorkController(threading.Thread):
             default_periodic_status_backend.run_periodic_tasks()
             time.sleep(1)
         self._stopped.set() # indicate that we are stopped
-    
+
     def stop(self):
         self._shutdown.set()
         self._stopped.wait() # block until this thread is done
@@ -428,9 +427,9 @@ class WorkController(object):
                 "|".join(map(str, self.pool.get_worker_pids()))))
             if not self.is_detached:
                 time.sleep(1)
-        
+
         try:
-            while True: 
+            while True:
                 it.next()
         except (SystemExit, KeyboardInterrupt):
             self.shutdown()