Browse Source

Merge branch 'master' into statistics

Conflicts:
	celery/bin/celeryd.py
	celery/messaging.py
	celery/worker.py
Ask Solem 16 years ago
parent
commit
5657bc0476

+ 81 - 18
Changelog

@@ -2,13 +2,76 @@
 Change history
 ==============
 
-0.3.3 [2009-06-08 01:07 P.M CET] askh@opera.com
+0.3.7 [2008-06-16 11:41 P.M CET] 
+-----------------------------------------------
+
+	* **IMPORTANT** Now uses AMQP's ``basic.consume`` instead of
+		``basic.get``. This means we're no longer polling the broker for
+		new messages.
+
+	* **IMPORTANT** Default concurrency limit is now set to the number of CPUs
+		available on the system.
+
+	* **IMPORTANT** ``tasks.register``: Renamed ``task_name`` argument to
+		``name``, so
+
+			>>> tasks.register(func, task_name="mytask")
+
+		has to be replaced with:
+
+			>>> tasks.register(func, name="mytask")
+
+	* The daemon now correctly runs if the pidlock is stale.
+
+	* Now compatible with carrot 0.4.5
+
+	* Default AMQP connnection timeout is now 4 seconds.
+	* ``AsyncResult.read()`` was always returning ``True``.
+
+	*  Only use README as long_description if the file exists so easy_install
+		doesn't break.
+
+	* ``celery.view``: JSON responses now properly set its mime-type. 
+
+	* ``apply_async`` now has a ``connection`` keyword argument so you
+		can re-use the same AMQP connection if you want to execute
+		more than one task.
+
+	* Handle failures in task_status view such that it won't throw 500s.
+
+	* Fixed typo ``AMQP_SERVER`` in documentation to ``AMQP_HOST``.
+
+	* Worker exception e-mails sent to admins now works properly.
+
+	* No longer depends on ``django``, so installing ``celery`` won't affect
+		the preferred Django version installed.
+
+	* Now works with PostgreSQL (psycopg2) again by registering the
+		``PickledObject`` field.
+
+	* ``celeryd``: Added ``--detach`` option as an alias to ``--daemon``, and
+		it's the term used in the documentation from now on.
+
+	* Make sure the pool and periodic task worker thread is terminated
+		properly at exit. (So ``Ctrl-C`` works again).
+
+	* Now depends on ``python-daemon``.
+
+	* Removed dependency to ``simplejson``
+
+	* Cache Backend: Re-establishes connection for every task process
+		if the Django cache backend is memcached/libmemcached.
+
+	* Tyrant Backend: Now re-establishes the connection for every task
+		executed.
+
+0.3.3 [2009-06-08 01:07 P.M CET] 
 -----------------------------------------------
 
 	* The ``PeriodicWorkController`` now sleeps for 1 second between checking
 		for periodic tasks to execute.
 
-0.3.2 [2009-06-08 01:07 P.M CET] askh@opera.com
+0.3.2 [2009-06-08 01:07 P.M CET]
 -----------------------------------------------
 
 	* celeryd: Added option ``--discard``: Discard (delete!) all waiting
@@ -16,7 +79,7 @@ Change history
 
 	* celeryd: The ``--wakeup-after`` option was not handled as a float.
 
-0.3.1 [2009-06-08 01:07 P.M CET] askh@opera.com
+0.3.1 [2009-06-08 01:07 P.M CET]
 -----------------------------------------------
 
 	* The `PeriodicTask`` worker is now running in its own thread instead
@@ -24,7 +87,7 @@ Change history
 
 	* Default ``QUEUE_WAKEUP_AFTER`` has been lowered to ``0.1`` (was ``0.3``)
 
-0.3.0 [2009-06-08 12:41 P.M CET] askh@opera.com
+0.3.0 [2009-06-08 12:41 P.M CET]
 -----------------------------------------------
 
 **NOTE** This is a development version, for the stable release, please
@@ -98,7 +161,7 @@ arguments, so be sure to flush your task queue before you upgrade.
 	* The pool algorithm has been refactored for greater performance and
 		stability.
 
-0.2.0 [2009-05-20 05:14 P.M CET] askh@opera.com
+0.2.0 [2009-05-20 05:14 P.M CET]
 ------------------------------------------------
 
 	* Final release of 0.2.0
@@ -108,20 +171,20 @@ arguments, so be sure to flush your task queue before you upgrade.
 	* Fixes some syntax errors related to fetching results
 	  from the database backend.
 
-0.2.0-pre3 [2009-05-20 05:14 P.M CET] askh@opera.com
+0.2.0-pre3 [2009-05-20 05:14 P.M CET]
 ----------------------------------------------------
 
 	 * *Internal release*. Improved handling of unpickled exceptions,
 	 	get_result() now tries to recreate something looking like the
 	 	original exception.
 
-0.2.0-pre2 [2009-05-20 01:56 P.M CET] askh@opera.com
+0.2.0-pre2 [2009-05-20 01:56 P.M CET]
 ----------------------------------------------------
 
 	* Now handles unpickleable exceptions (like the dynimically generated
 	  subclasses of ``django.core.exception.MultipleObjectsReturned``).
 
-0.2.0-pre1 [2009-05-20 12:33 P.M CET] askh@opera.com
+0.2.0-pre1 [2009-05-20 12:33 P.M CET]
 ----------------------------------------------------
 
 	* It's getting quite stable, with a lot of new features, so bump
@@ -131,20 +194,20 @@ arguments, so be sure to flush your task queue before you upgrade.
 	  been removed. Use ``celery.backends.default_backend.mark_as_read()``, 
 	  and ``celery.backends.default_backend.mark_as_failure()`` instead.
 
-0.1.15 [2009-05-19 04:13 P.M CET] askh@opera.com
+0.1.15 [2009-05-19 04:13 P.M CET]
 ------------------------------------------------
 
 	* The celery daemon was leaking AMQP connections, this should be fixed,
 	  if you have any problems with too many files open (like ``emfile``
 	  errors in ``rabbit.log``, please contact us!
 
-0.1.14 [2009-05-19 01:08 P.M CET] askh@opera.com
+0.1.14 [2009-05-19 01:08 P.M CET]
 ------------------------------------------------
 
 	* Fixed a syntax error in the ``TaskSet`` class.  (No such variable
 	  ``TimeOutError``).
 
-0.1.13 [2009-05-19 12:36 P.M CET] askh@opera.com
+0.1.13 [2009-05-19 12:36 P.M CET]
 ------------------------------------------------
 
 	* Forgot to add ``yadayada`` to install requirements.
@@ -165,7 +228,7 @@ arguments, so be sure to flush your task queue before you upgrade.
 	
 	  and the result will be in ``docs/.build/html``.
 
-0.1.12 [2009-05-18 04:38 P.M CET] askh@opera.com
+0.1.12 [2009-05-18 04:38 P.M CET]
 ------------------------------------------------
 
     * ``delay_task()`` etc. now returns ``celery.task.AsyncResult`` object,
@@ -203,13 +266,13 @@ arguments, so be sure to flush your task queue before you upgrade.
         TT_HOST = "localhost"; # Hostname for the Tokyo Tyrant server.
         TT_PORT = 6657; # Port of the Tokyo Tyrant server.
 
-0.1.11 [2009-05-12 02:08 P.M CET] askh@opera.com
+0.1.11 [2009-05-12 02:08 P.M CET]
 -------------------------------------------------
 
 	* The logging system was leaking file descriptors, resulting in
 	  servers stopping with the EMFILES (too many open files) error. (fixed)
 
-0.1.10 [2009-05-11 12:46 P.M CET] askh@opera.com
+0.1.10 [2009-05-11 12:46 P.M CET]
 -------------------------------------------------
 
 	* Tasks now supports both positional arguments and keyword arguments.
@@ -218,7 +281,7 @@ arguments, so be sure to flush your task queue before you upgrade.
 
 	* The daemon now tries to reconnect if the connection is lost.
 
-0.1.8 [2009-05-07 12:27 P.M CET] askh@opera.com
+0.1.8 [2009-05-07 12:27 P.M CET]
 ------------------------------------------------
 
 	* Better test coverage
@@ -226,7 +289,7 @@ arguments, so be sure to flush your task queue before you upgrade.
 	* celeryd doesn't emit ``Queue is empty`` message if
 	  ``settings.CELERYD_EMPTY_MSG_EMIT_EVERY`` is 0.
 
-0.1.7 [2009-04-30 1:50 P.M CET] askh@opera.com
+0.1.7 [2009-04-30 1:50 P.M CET]
 -----------------------------------------------
 
 	* Added some unittests
@@ -241,7 +304,7 @@ arguments, so be sure to flush your task queue before you upgrade.
 	  ``settings.CELERY_AMQP_EXCHANGE``, ``settings.CELERY_AMQP_ROUTING_KEY``,
 	  and ``settings.CELERY_AMQP_CONSUMER_QUEUE``.
 
-0.1.6 [2009-04-28 2:13 P.M CET] askh@opera.com
+0.1.6 [2009-04-28 2:13 P.M CET]
 -----------------------------------------------
 
 	* Introducing ``TaskSet``. A set of subtasks is executed and you can
@@ -283,7 +346,7 @@ arguments, so be sure to flush your task queue before you upgrade.
 	* Project changed name from ``crunchy`` to ``celery``. The details of
 	  the name change request is in ``docs/name_change_request.txt``.
 
-0.1.0 [2009-04-24 11:28 A.M CET] askh@opera.com
+0.1.0 [2009-04-24 11:28 A.M CET]
 ------------------------------------------------
 
 	* Initial release

+ 1 - 1
MANIFEST.in

@@ -5,6 +5,6 @@ include MANIFEST.in
 include LICENSE
 include TODO
 include THANKS
-recursive-include celery *
+recursive-include celery *.py
 recursive-include docs *
 recursive-include testproj *

+ 3 - 3
README

@@ -2,7 +2,7 @@
 celery - Distributed Task Queue for Django.
 ============================================
 
-:Version: 0.3.5
+:Version: 0.3.7
 
 Introduction
 ============
@@ -284,8 +284,8 @@ Getting Help
 Mailing list
 ------------
 
-Join the `celery-users`_ mailing list for discussion about using and
-the development of celery.
+For discussions about the usage, development, and future of celery,
+please join the `celery-users`_ mailing list. 
 
 .. _`celery-users`: http://groups.google.com/group/celery-users/
 

+ 1 - 1
celery/__init__.py

@@ -1,5 +1,5 @@
 """Distributed Task Queue for Django"""
-VERSION = (0, 3, 5)
+VERSION = (0, 3, 7)
 __version__ = ".".join(map(str, VERSION))
 __author__ = "Ask Solem"
 __contact__ = "askh@opera.com"

+ 1 - 1
celery/backends/base.py

@@ -182,6 +182,6 @@ class BaseBackend(object):
         """Cleanup actions to do at the end of a task worker process.
 
         See :func:`celery.worker.jail`.
-        
+
         """
         pass

+ 66 - 76
celery/bin/celeryd.py

@@ -35,17 +35,17 @@
     Discard all waiting tasks before the daemon is started.
     **WARNING**: This is unrecoverable, and the tasks will be
     deleted from the messaging server.
-    
+
 .. cmdoption:: -u, --uid
 
     User-id to run ``celeryd`` as when in daemon mode.
 
 .. cmdoption:: -g, --gid
-       
+
     Group-id to run ``celeryd`` as when in daemon mode.
 
 .. cmdoption:: --umask
-    
+
     umask of the process when in daemon mode.
 
 .. cmdoption:: --workdir
@@ -68,6 +68,7 @@ from django.conf import settings
 from celery.log import emergency_error
 from celery.conf import LOG_LEVELS, DAEMON_LOG_FILE, DAEMON_LOG_LEVEL
 from celery.conf import DAEMON_CONCURRENCY, DAEMON_PID_FILE
+from celery.messaging import TaskConsumer
 from celery import conf
 from celery import discovery
 from celery.task import discard_all
@@ -88,12 +89,50 @@ settings.CELERY_STATISTICS = USE_STATISTICS
 
 STARTUP_INFO_FMT = """
     * Celery loading with the following configuration
-        * Broker -> amqp://%(vhost)s@%(host)s:%(port)s 
+        * Broker -> amqp://%(vhost)s@%(host)s:%(port)s
         * Exchange -> %(exchange)s (%(exchange_type)s)
         * Consumer -> Queue:%(consumer_queue)s Routing:%(consumer_rkey)s
         * Concurrency:%(concurrency)s
 """.strip()
 
+OPTION_LIST = (
+    optparse.make_option('-c', '--concurrency', default=DAEMON_CONCURRENCY,
+            action="store", dest="concurrency", type="int",
+            help="Number of child processes processing the queue."),
+    optparse.make_option('--discard', default=False,
+            action="store_true", dest="discard",
+            help="Discard all waiting tasks before the server is started. "
+                 "WARNING: This is unrecoverable, and the tasks will be "
+                 "deleted from the messaging server."),
+    optparse.make_option('-f', '--logfile', default=DAEMON_LOG_FILE,
+            action="store", dest="logfile",
+            help="Path to log file."),
+    optparse.make_option('-l', '--loglevel', default=DAEMON_LOG_LEVEL,
+            action="store", dest="loglevel",
+            help="Choose between DEBUG/INFO/WARNING/ERROR/CRITICAL/FATAL."),
+    optparse.make_option('-p', '--pidfile', default=DAEMON_PID_FILE,
+            action="store", dest="pidfile",
+            help="Path to pidfile."),
+    optparse.make_option('-d', '--detach', '--daemon', default=False,
+            action="store_true", dest="detach",
+            help="Run in the background as a daemon."),
+    optparse.make_option('-u', '--uid', default=None,
+            action="store", dest="uid",
+            help="User-id to run celeryd as when in daemon mode."),
+    optparse.make_option('-g', '--gid', default=None,
+            action="store", dest="gid",
+            help="Group-id to run celeryd as when in daemon mode."),
+    optparse.make_option('--umask', default=0,
+            action="store", type="int", dest="umask",
+            help="umask of the process when in daemon mode."),
+    optparse.make_option('--workdir', default=None,
+            action="store", dest="working_directory",
+            help="Directory to change to when in daemon mode."),
+    optparse.make_option('--chroot', default=None,
+            action="store", dest="chroot",
+            help="Change root directory to this path when in daemon mode."),
+    )
+
 
 def acquire_pidlock(pidfile):
     """Get the :class:`daemon.pidlockfile.PIDLockFile` handler for
@@ -115,8 +154,8 @@ def acquire_pidlock(pidfile):
     except os.error, exc:
         if exc.errno == errno.ESRCH:
             sys.stderr.write("Stale pidfile exists. Removing it.\n")
-            pidlock.release() 
-            return
+            pidlock.release()
+            return PIDLockFile(pidfile)
     else:
         raise SystemExit(
                 "ERROR: Pidfile (%s) already exists.\n"
@@ -125,11 +164,11 @@ def acquire_pidlock(pidfile):
     return pidlock
 
 
-def run_worker(concurrency=DAEMON_CONCURRENCY, daemon=False,
+def run_worker(concurrency=DAEMON_CONCURRENCY, detach=False,
         loglevel=DAEMON_LOG_LEVEL, logfile=DAEMON_LOG_FILE, discard=False,
         pidfile=DAEMON_PID_FILE, umask=0, uid=None, gid=None,
         working_directory=None, chroot=None, statistics=None, **kwargs):
-    """Start a celery worker server."""
+    """Starts the celery worker server."""
 
     print(". Launching celery, please hold on to something...")
 
@@ -145,17 +184,22 @@ def run_worker(concurrency=DAEMON_CONCURRENCY, daemon=False,
                 "concurrency. We'll be using a single process only.",
                 UserWarning)
         concurrency = 1
-    
+
+    # Setup logging
     if not isinstance(loglevel, int):
         loglevel = LOG_LEVELS[loglevel.upper()]
+    if not detach:
+        logfile = None # log to stderr when not running in the background.
 
     if discard:
         discarded_count = discard_all()
-        what = discard_count > 1 and "messages" or "message"
-        print("* Discard: Erased %d %s from the queue." % (
+        what = discarded_count > 1 and "messages" or "message"
+        print("discard: Erased %d %s from the queue.\n" % (
                 discarded_count, what))
-    
-    startup_info = STARTUP_INFO_FMT % {
+
+    # Dump configuration to screen so we have some basic information
+    # when users sends e-mails.
+    print(STARTUP_INFO_FMT % {
             "vhost": settings.AMQP_VHOST,
             "host": settings.AMQP_SERVER,
             "port": settings.AMQP_PORT,
@@ -168,30 +212,21 @@ def run_worker(concurrency=DAEMON_CONCURRENCY, daemon=False,
             "loglevel": loglevel,
             "pidfile": pidfile,
     }
-    print(startup_info)
     print("* Reporting of statistics is %s..." % (
         settings.CELERY_STATISTICS and "ON" or "OFF"))
 
-    print("* Declaring consumers...")
-    conn = DjangoAMQPConnection()
-    TaskConsumer(connection=conn).close()
-    if settings.CELERY_STATISTICS:
-        StatsConsumer(connection=conn).close()
-
-
-    context = None
-    if daemon:
+    if detach:
         # Since without stderr any errors will be silently suppressed,
         # we need to know that we have access to the logfile
+        if logfile:
+            open(logfile, "a").close()
         pidlock = acquire_pidlock(pidfile)
         if not umask:
             umask = 0
-        if logfile:
-            open(logfile, "a").close()
         uid = uid and int(uid) or os.geteuid()
         gid = gid and int(gid) or os.getegid()
         working_directory = working_directory or os.getcwd()
-        sys.stderr.write("* Launching celeryd in the background...\n")
+        print("* Launching celeryd in the background...")
         context = DaemonContext(chroot_directory=chroot,
                                 working_directory=working_directory,
                                 umask=umask,
@@ -199,16 +234,14 @@ def run_worker(concurrency=DAEMON_CONCURRENCY, daemon=False,
                                 uid=uid,
                                 gid=gid)
         context.open()
-    else:
-        logfile = None # log to stderr when not running as daemon.
 
     discovery.autodiscover()
-    celeryd = WorkController(concurrency=concurrency,
-                               loglevel=loglevel,
-                               logfile=logfile,
-                               is_detached=daemon)
+    worker = WorkController(concurrency=concurrency,
+                            loglevel=loglevel,
+                            logfile=logfile,
+                            is_detached=detach)
     try:
-        celeryd.run()
+        worker.run()
     except Exception, e:
         emergency_error(logfile, "celeryd raised exception %s: %s\n%s" % (
                             e.__class__, e, traceback.format_exc()))
@@ -218,49 +251,6 @@ def run_worker(concurrency=DAEMON_CONCURRENCY, daemon=False,
         raise
 
 
-OPTION_LIST = (
-    optparse.make_option('-c', '--concurrency', default=DAEMON_CONCURRENCY,
-            action="store", dest="concurrency", type="int",
-            help="Number of child processes processing the queue."),
-    optparse.make_option('--discard', default=False,
-            action="store_true", dest="discard",
-            help="Discard all waiting tasks before the daemon is started. "
-                 "WARNING: This is unrecoverable, and the tasks will be "
-                 "deleted from the messaging server."),
-    optparse.make_option('-f', '--logfile', default=DAEMON_LOG_FILE,
-            action="store", dest="logfile",
-            help="Path to log file."),
-    optparse.make_option('-l', '--loglevel', default=DAEMON_LOG_LEVEL,
-            action="store", dest="loglevel",
-            help="Choose between DEBUG/INFO/WARNING/ERROR/CRITICAL/FATAL."),
-    optparse.make_option('-p', '--pidfile', default=DAEMON_PID_FILE,
-            action="store", dest="pidfile",
-            help="Path to pidfile."),
-    optparse.make_option('-s', '--statistics', default=USE_STATISTICS,
-            action="store_true", dest="statistics",
-            help="Turn on reporting of statistics (remember to flush the "
-                 "statistics message queue from time to time)."),
-    optparse.make_option('-d', '--detach', '--daemon', default=False,
-            action="store_true", dest="daemon",
-            help="Run in the background as a daemon."),
-    optparse.make_option('-u', '--uid', default=None,
-            action="store", dest="uid",
-            help="User-id to run celeryd as when in daemon mode."),
-    optparse.make_option('-g', '--gid', default=None,
-            action="store", dest="gid",
-            help="Group-id to run celeryd as when in daemon mode."),
-    optparse.make_option('--umask', default=0,
-            action="store", type="int", dest="umask",
-            help="umask of the process when in daemon mode."),
-    optparse.make_option('--workdir', default=None,
-            action="store", dest="working_directory",
-            help="Directory to change to when in daemon mode."),
-    optparse.make_option('--chroot', default=None,
-            action="store", dest="chroot",
-            help="Change root directory to this path when in daemon mode."),
-    )
-
-
 def parse_options(arguments):
     """Parse the available options to ``celeryd``."""
     parser = optparse.OptionParser(option_list=OPTION_LIST)

+ 10 - 0
celery/conf.py

@@ -12,6 +12,7 @@ DEFAULT_DAEMON_PID_FILE = "celeryd.pid"
 DEFAULT_LOG_FMT = '[%(asctime)s: %(levelname)s/%(processName)s] %(message)s'
 DEFAULT_DAEMON_LOG_LEVEL = "INFO"
 DEFAULT_DAEMON_LOG_FILE = "celeryd.log"
+DEFAULT_AMQP_CONNECTION_TIMEOUT = 4
 
 """
 .. data:: LOG_LEVELS
@@ -131,6 +132,15 @@ AMQP_CONSUMER_QUEUE = getattr(settings, "CELERY_AMQP_CONSUMER_QUEUE",
                               DEFAULT_AMQP_CONSUMER_QUEUE)
 
 """
+.. data:: AMQP_CONNECTION_TIMEOUT
+
+    The timeout in seconds before we give up establishing a connection
+    to the AMQP server.
+
+"""
+AMQP_CONNECTION_TIMEOUT = getattr(settings, "CELERY_AMQP_CONNECTION_TIMEOUT",
+                                  DEFAULT_AMQP_CONNECTION_TIMEOUT)
+"""
 .. data:: SEND_CELERY_TASK_ERROR_EMAILS
 
     If set to ``True``, errors in tasks will be sent to admins by e-mail.

+ 2 - 1
celery/fields.py

@@ -23,7 +23,8 @@ if settings.DATABASE_ENGINE == "postgresql_psycopg2":
     import psycopg2.extensions
     # register PickledObject as a QuotedString otherwise we will see
     # can't adapt errors from psycopg2.
-    psycopg2.extensions.register_adapter(PickledObject, psycopg2.extensions.QuotedString)
+    psycopg2.extensions.register_adapter(PickledObject,
+            psycopg2.extensions.QuotedString)
 
 
 class PickledObjectField(models.Field):

+ 3 - 5
celery/log.py

@@ -15,10 +15,9 @@ def setup_logger(loglevel=DAEMON_LOG_LEVEL, logfile=None, format=LOG_FORMAT,
     """
     import multiprocessing
     logger = multiprocessing.get_logger()
-    for handler in logger.handlers:
-        if hasattr(handler, "close"):
-            handler.close()
-    logger.handlers = []
+    logger.setLevel(loglevel)
+    if logger.handlers:
+        return logger
     if logfile:
         if hasattr(logfile, "write"):
             log_file_handler = logging.StreamHandler(logfile)
@@ -29,7 +28,6 @@ def setup_logger(loglevel=DAEMON_LOG_LEVEL, logfile=None, format=LOG_FORMAT,
         logger.addHandler(log_file_handler)
     else:
         multiprocessing.log_to_stderr()
-    logger.setLevel(loglevel)
     return logger
 
 

+ 4 - 2
celery/messaging.py

@@ -16,6 +16,7 @@ except ImportError:
 class TaskPublisher(Publisher):
     """The AMQP Task Publisher class."""
     exchange = conf.AMQP_EXCHANGE
+    exchange_type = conf.AMQP_EXCHANGE_TYPE
     routing_key = conf.AMQP_PUBLISHER_ROUTING_KEY
     encoder = pickle.dumps
 
@@ -69,8 +70,9 @@ class TaskConsumer(Consumer):
     exchange = conf.AMQP_EXCHANGE
     routing_key = conf.AMQP_CONSUMER_ROUTING_KEY
     exchange_type = conf.AMQP_EXCHANGE_TYPE
-    auto_ack = True
     decoder = pickle.loads
+    auto_ack = False
+    no_ack = False
 
 
 class StatsPublisher(Publisher):
@@ -84,8 +86,8 @@ class StatsConsumer(Consumer):
     exchange = "celerygraph"
     routing_key = "stats"
     exchange_type = "direct"
-    auto_ack = True
     decoder = pickle.loads
+    no_ack=True
 
     def receive(self, message_data, message):
         pass

+ 6 - 5
celery/pool.py

@@ -86,7 +86,7 @@ class TaskPool(object):
         return self._pool._state == POOL_STATE_RUN
 
     def apply_async(self, target, args=None, kwargs=None, callbacks=None,
-            errbacks=None, meta=None):
+            errbacks=None, on_acknowledge=None, meta=None):
         """Equivalent of the :func:``apply`` built-in function.
 
         All ``callbacks`` and ``errbacks`` should complete immediately since
@@ -107,9 +107,13 @@ class TaskPool(object):
 
         on_return = lambda r: self.on_return(r, tid, callbacks, errbacks, meta)
 
+
+        if self.full():
+            self.wait_for_result()
         result = self._pool.apply_async(target, args, kwargs,
                                            callback=on_return)
-
+        if on_acknowledge:
+            on_acknowledge()
         self.add(result, callbacks, errbacks, tid, meta)
 
         return result
@@ -147,9 +151,6 @@ class TaskPool(object):
 
         self._processes[tid] = [result, callbacks, errbacks, meta]
 
-        if self.full():
-            self.wait_for_result()
-
     def full(self):
         """Is the pool full?
 

+ 1 - 1
celery/result.py

@@ -68,7 +68,7 @@ class BaseAsyncResult(object):
 
         """
         status = self.backend.get_status(self.task_id)
-        return status != "PENDING" or status != "RETRY"
+        return status not in ["PENDING", "RETRY"]
 
     def successful(self):
         """Alias to :meth:`is_done`."""

+ 32 - 7
celery/task.py

@@ -4,6 +4,7 @@ Working with tasks and task sets.
 
 """
 from carrot.connection import DjangoAMQPConnection
+from celery.conf import AMQP_CONNECTION_TIMEOUT
 from celery.messaging import TaskPublisher, TaskConsumer
 from celery.log import setup_logger
 from celery.registry import tasks
@@ -16,7 +17,7 @@ import pickle
 
 def apply_async(task, args=None, kwargs=None, routing_key=None,
         immediate=None, mandatory=None, connection=None,
-        connect_timeout=None, priority=None):
+        connect_timeout=AMQP_CONNECTION_TIMEOUT, priority=None):
     """Run a task asynchronously by the celery daemon(s).
 
     :param task: The task to run (a callable object, or a :class:`Task`
@@ -100,7 +101,7 @@ def delay_task(task_name, *args, **kwargs):
     return apply_async(task, args, kwargs)
 
 
-def discard_all():
+def discard_all(connect_timeout=AMQP_CONNECTION_TIMEOUT):
     """Discard all waiting tasks.
 
     This will ignore all tasks waiting for execution, and they will
@@ -111,7 +112,7 @@ def discard_all():
     :rtype: int
 
     """
-    amqp_connection = DjangoAMQPConnection()
+    amqp_connection = DjangoAMQPConnection(connect_timeout=connect_timeout)
     consumer = TaskConsumer(connection=amqp_connection)
     discarded_count = consumer.discard_all()
     amqp_connection.close()
@@ -231,7 +232,8 @@ class Task(object):
             >>> publisher.connection.close()
 
         """
-        return TaskPublisher(connection=DjangoAMQPConnection())
+        return TaskPublisher(connection=DjangoAMQPConnection(
+                                connect_timeout=AMQP_CONNECTION_TIMEOUT))
 
     def get_consumer(self):
         """Get a celery task message consumer.
@@ -246,7 +248,8 @@ class Task(object):
             >>> consumer.connection.close()
 
         """
-        return TaskConsumer(connection=DjangoAMQPConnection())
+        return TaskConsumer(connection=DjangoAMQPConnection(
+                                connect_timeout=AMQP_CONNECTION_TIMEOUT))
 
     @classmethod
     def delay(cls, *args, **kwargs):
@@ -326,7 +329,7 @@ class TaskSet(object):
         self.arguments = args
         self.total = len(args)
 
-    def run(self):
+    def run(self, connect_timeout=AMQP_CONNECTION_TIMEOUT):
         """Run all tasks in the taskset.
 
         :returns: A :class:`celery.result.TaskSetResult` instance.
@@ -357,7 +360,7 @@ class TaskSet(object):
 
         """
         taskset_id = str(uuid.uuid4())
-        conn = DjangoAMQPConnection()
+        conn = DjangoAMQPConnection(connect_timeout=connect_timeout)
         publisher = TaskPublisher(connection=conn)
         subtask_ids = [publisher.delay_task_in_set(task_name=self.task_name,
                                                    taskset_id=taskset_id,
@@ -573,3 +576,25 @@ class DeleteExpiredTaskMetaTask(PeriodicTask):
         logger.info("Deleting expired task meta objects...")
         default_backend.cleanup()
 tasks.register(DeleteExpiredTaskMetaTask)
+
+
+class PingTask(Task):
+    """The task used by :func:`ping`."""
+    name = "celery.ping"
+
+    def run(self, **kwargs):
+        """:returns: the string ``"pong"``."""
+        return "pong"
+tasks.register(PingTask)
+
+
+def ping():
+    """Test if the server is alive.
+
+    Example:
+
+        >>> from celery.task import ping
+        >>> ping()
+        'pong'
+    """
+    return PingTask.apply_async().get()

+ 14 - 0
celery/tests/runners.py

@@ -0,0 +1,14 @@
+from django.conf import settings
+from django.test.simple import run_tests as django_test_runner
+
+
+def run_tests(test_labels, verbosity=1, interactive=True, extra_tests=None,
+        **kwargs):
+    """ Test runner that only runs tests for the apps
+    listed in ``settings.TEST_APPS``.
+    """
+    extra_tests = extra_tests or []
+    app_labels = getattr(settings, "TEST_APPS", test_labels)
+    return django_test_runner(app_labels,
+                              verbosity=verbosity, interactive=interactive,
+                              extra_tests=extra_tests, **kwargs)

+ 1 - 1
celery/tests/test_models.py

@@ -57,7 +57,7 @@ class TestModels(unittest.TestCase):
         self.assertTrue(unicode(p).startswith("<PeriodicTask:"))
         self.assertFalse(p in PeriodicTaskMeta.objects.get_waiting_tasks())
         # Have to avoid save() because it applies the auto_now=True.
-        PeriodicTaskMeta.objects.filter(name=p.name).update (
+        PeriodicTaskMeta.objects.filter(name=p.name).update(
                 last_run_at=datetime.now() - (TestPeriodicTask.run_every +
                 timedelta(seconds=10)))
         self.assertTrue(p in PeriodicTaskMeta.objects.get_waiting_tasks())

+ 2 - 1
celery/views.py

@@ -27,4 +27,5 @@ def task_status(request, task_id):
             "status": status,
             "result": async_result.result,
         }
-    return HttpResponse(JSON_dump({"task": response_data}), mimetype="application/json")
+    return HttpResponse(JSON_dump({"task": response_data}),
+            mimetype="application/json")

+ 32 - 17
celery/worker.py

@@ -104,7 +104,6 @@ def jail(task_id, task_name, func, args, kwargs):
 
     return retval
 
-    
 
 class TaskWrapper(object):
     """Class wrapping a task to be run.
@@ -139,6 +138,10 @@ class TaskWrapper(object):
 
         Mapping of keyword arguments to apply to the task.
 
+    .. attribute:: message
+
+        The original message sent. Used for acknowledging the message.
+
     """
     success_msg = "Task %(name)s[%(id)s] processed: %(return_value)s"
     fail_msg = """
@@ -149,13 +152,15 @@ class TaskWrapper(object):
     """
     fail_email_body = TASK_FAIL_EMAIL_BODY
 
-    def __init__(self, task_name, task_id, task_func, args, kwargs, **opts):
+    def __init__(self, task_name, task_id, task_func, args, kwargs,
+            on_acknowledge=None, **opts):
         self.task_name = task_name
         self.task_id = task_id
         self.task_func = task_func
         self.args = args
         self.kwargs = kwargs
         self.logger = kwargs.get("logger")
+        self.on_acknowledge = on_acknowledge
         for opt in ("success_msg", "fail_msg", "fail_email_subject",
                 "fail_email_body"):
             setattr(self, opt, opts.get(opt, getattr(self, opt, None)))
@@ -169,7 +174,7 @@ class TaskWrapper(object):
                 self.args, self.kwargs)
 
     @classmethod
-    def from_message(cls, message, logger):
+    def from_message(cls, message, message_data, logger):
         """Create a :class:`TaskWrapper` from a task message sent by
         :class:`celery.messaging.TaskPublisher`.
 
@@ -179,7 +184,6 @@ class TaskWrapper(object):
         :returns: :class:`TaskWrapper` instance.
 
         """
-        message_data = message.decode()
         task_name = message_data["task"]
         task_id = message_data["id"]
         args = message_data["args"]
@@ -192,7 +196,8 @@ class TaskWrapper(object):
         if task_name not in tasks:
             raise UnknownTask(task_name)
         task_func = tasks[task_name]
-        return cls(task_name, task_id, task_func, args, kwargs, logger=logger)
+        return cls(task_name, task_id, task_func, args, kwargs,
+                    on_acknowledge=message.ack, logger=logger)
 
     def extend_with_default_kwargs(self, loglevel, logfile):
         """Extend the tasks keyword arguments with standard task arguments.
@@ -217,6 +222,8 @@ class TaskWrapper(object):
 
         """
         task_func_kwargs = self.extend_with_default_kwargs(loglevel, logfile)
+        if self.on_acknowledge:
+            self.on_acknowledge()
         return jail(self.task_id, self.task_name, [
                         self.task_func, self.args, task_func_kwargs])
 
@@ -266,6 +273,7 @@ class TaskWrapper(object):
                      self.args, task_func_kwargs]
         return pool.apply_async(jail, args=jail_args,
                 callbacks=[self.on_success], errbacks=[self.on_failure],
+                on_acknowledge=self.on_acknowledge,
                 meta={"task_id": self.task_id, "task_name": self.task_name})
 
 
@@ -277,9 +285,9 @@ class PeriodicWorkController(threading.Thread):
     Example
 
         >>> PeriodicWorkController().start()
-    
+
     """
-    
+
     def __init__(self):
         super(PeriodicWorkController, self).__init__()
         self._shutdown = threading.Event()
@@ -293,8 +301,9 @@ class PeriodicWorkController(threading.Thread):
             default_periodic_status_backend.run_periodic_tasks()
             time.sleep(1)
         self._stopped.set() # indicate that we are stopped
-    
+
     def stop(self):
+        """Shutdown the thread."""
         self._shutdown.set()
         self._stopped.wait() # block until this thread is done
 
@@ -339,6 +348,7 @@ class WorkController(object):
     loglevel = logging.ERROR
     concurrency = DAEMON_CONCURRENCY
     logfile = DAEMON_LOG_FILE
+    _state = None
 
     def __init__(self, concurrency=None, logfile=None, loglevel=None,
             is_detached=False):
@@ -353,6 +363,7 @@ class WorkController(object):
         self.task_consumer = None
 
     def close_connection(self):
+        """Close the AMQP connection."""
         if self.task_consumer:
             self.task_consumer.close()
         if self.amqp_connection:
@@ -368,6 +379,7 @@ class WorkController(object):
         self.close_connection()
         self.amqp_connection = DjangoAMQPConnection()
         self.task_consumer = TaskConsumer(connection=self.amqp_connection)
+        self.task_consumer.register_callback(self._message_callback)
         return self.task_consumer
 
     def connection_diagnostics(self):
@@ -381,9 +393,10 @@ class WorkController(object):
             self.reset_connection()
 
     def _message_callback(self, message_data, message):
+        """The method called when we receive a message."""
         try:
             try:
-                self.process_task(message)
+                self.process_task(message_data, message)
             except ValueError:
                 # execute_next_task didn't return a r/name/id tuple,
                 # probably because it got an exception.
@@ -393,15 +406,13 @@ class WorkController(object):
             except Exception, exc:
                 self.logger.critical("Message queue raised %s: %s\n%s" % (
                                 exc.__class__, exc, traceback.format_exc()))
-            except:
-                self.shutdown()
-                raise
         except (SystemExit, KeyboardInterrupt):
             self.shutdown()
 
-    def process_task(self, message):
+    def process_task(self, message_data, message):
         """Process task message by passing it to the pool of workers."""
-        task = TaskWrapper.from_message(message, logger=self.logger)
+        task = TaskWrapper.from_message(message, message_data,
+                                        logger=self.logger)
         self.logger.info("Got task from broker: %s[%s]" % (
             task.task_name, task.task_id))
         self.logger.debug("Got a task: %s. Trying to execute it..." % task)
@@ -414,15 +425,19 @@ class WorkController(object):
         return result
 
     def shutdown(self):
+        """Make sure ``celeryd`` exits cleanly."""
         # shut down the periodic work controller thread
+        if self._state != "RUN":
+            return
+        self._state = "TERMINATE"
         self.periodicworkcontroller.stop()
         self.pool.terminate()
         self.close_connection()
 
     def run(self):
         """Starts the workers main loop."""
+        self._state = "RUN"
         task_consumer = self.reset_connection()
-        task_consumer.register_callback(self._message_callback)
         it = task_consumer.iterconsume(limit=None)
 
         self.pool.run()
@@ -435,9 +450,9 @@ class WorkController(object):
                 "|".join(map(str, self.pool.get_worker_pids()))))
             if not self.is_detached:
                 time.sleep(1)
-        
+
         try:
-            while True: 
+            while True:
                 it.next()
         except (SystemExit, KeyboardInterrupt):
             self.shutdown()

+ 5 - 2
setup.py

@@ -48,7 +48,10 @@ py_minor_version = py_version_info[1]
 if (py_major_version == 2 and py_minor_version <=5) or py_major_version < 2:
     install_requires.append("multiprocessing")
 
-long_description = codecs.open("README", "r", "utf-8").read()
+if os.path.exists("README"):
+    long_description = codecs.open("README", "r", "utf-8").read()
+else:
+    long_description = "See http://pypi.python.org/pypi/celery"
 
 
 setup(
@@ -63,7 +66,7 @@ setup(
     scripts=["bin/celeryd", "bin/celeryctl"],
     zip_safe=False,
     install_requires=[
-        'carrot>=0.4.1',
+        'carrot>=0.4.5',
         'python-daemon',
     ],
     cmdclass = {"test": RunTests},

+ 1 - 1
testproj/settings.py

@@ -12,7 +12,7 @@ ADMINS = (
     # ('Your Name', 'your_email@domain.com'),
 )
 
-TEST_RUNNER = "yadayada.test.run_tests"
+TEST_RUNNER = "celery.tests.runners.run_tests"
 TEST_APPS = (
     "celery",
 )