Browse Source

And here comes the code

Ask Solem 16 years ago
parent
commit
8dd7ac781a
18 changed files with 747 additions and 0 deletions
  1. 1 0
      AUTHORS
  2. 8 0
      Changelog
  3. 28 0
      LICENSE
  4. 6 0
      MANIFEST.in
  5. 121 0
      README
  6. 7 0
      crunchy/__init__.py
  7. 0 0
      crunchy/bin/__init__.py
  8. 83 0
      crunchy/bin/crunchd
  9. 51 0
      crunchy/conf.py
  10. 38 0
      crunchy/discovery.py
  11. 32 0
      crunchy/log.py
  12. 26 0
      crunchy/messaging.py
  13. 102 0
      crunchy/platform.py
  14. 25 0
      crunchy/process.py
  15. 47 0
      crunchy/registry.py
  16. 21 0
      crunchy/task.py
  17. 99 0
      crunchy/worker.py
  18. 52 0
      setup.py

+ 1 - 0
AUTHORS

@@ -0,0 +1 @@
+Ask Solem <askh@opera.com>

+ 8 - 0
Changelog

@@ -0,0 +1,8 @@
+==============
+Change history
+==============
+
+0.1.0 :date:`2009-04-24 11:28 A.M CET` :author:askh@opera.com
+--------------------------------------------------------------
+
+	* Initial release

+ 28 - 0
LICENSE

@@ -0,0 +1,28 @@
+Copyright (c) 2009, Ask Solem
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are met:
+
+    * Redistributions of source code must retain the above copyright notice,
+      this list of conditions and the following disclaimer.
+    * Redistributions in binary form must reproduce the above copyright
+      notice, this list of conditions and the following disclaimer in the
+      documentation and/or other materials provided with the distribution.
+
+Neither the name of Ask Solem nor the names of its contributors may be used
+to endorse or promote products derived from this software without specific
+prior written permission. 
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
+THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS
+BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+POSSIBILITY OF SUCH DAMAGE.
+

+ 6 - 0
MANIFEST.in

@@ -0,0 +1,6 @@
+include AUTHORS
+include README
+include MANIFEST.in
+include LICENSE
+include Changelog
+recursive-include crunchy *

+ 121 - 0
README

@@ -0,0 +1,121 @@
+============================================
+crunchy - Distributed Task Queue for Django.
+============================================
+
+:Authors:
+    Ask Solem (askh@opera.com)
+:Version: 0.1.0
+
+Introduction
+------------
+
+``crunchy`` is a distributed task queue framework for Django.
+More information will follow.
+
+Installation
+=============
+
+You can install ``crunchy`` either via the Python Package Index (PyPI)
+or from source.
+
+To install using ``pip``,::
+
+    $ pip install crunchy
+
+To install using ``easy_install``,::
+
+    $ easy_install crunchy
+
+If you have downloaded a source tarball you can install it
+by doing the following,::
+
+    $ python setup.py build
+    # python setup.py install # as root
+
+Usage
+=====
+
+Have to write a cool tutorial, but here is some simple usage info.
+
+*Note* You need to have a AMQP message broker running, like `RabbitMQ`_,
+and you need to have the amqp server setup in your settings file, as described
+in the `carrot distribution README`_.
+
+*Note* If you're running ``SQLite`` as the database backend, ``crunchd`` will
+only be able to process one message at a time, this because ``SQLite`` doesn't
+allow concurrent writes.
+
+.. _`RabbitMQ`: http://www.rabbitmq.com
+.. _`carrot distribution README`: http://pypi.python.org/pypi/carrot/0.3.3
+
+
+Defining tasks
+--------------
+
+    >>> from crunchy.task import tasks
+    >>> from crunchy.log import setup_logger
+    >>> def do_something(some_arg, **kwargs):
+    ...     logger = setup_logger(**kwargs)
+    ...     logger.info("Did something: %s" % some_arg)
+    >>> task.register("do_something", some_arg=do_something) 
+
+*Note* Task functions only supports keyword arguments.
+
+Tell the crunch daemon to run a task
+-------------------------------------
+
+    >>> from crunchy.task import delay_task
+    >>> delay_task("do_something", "foo bar baz")
+
+
+Running the crunch daemon
+--------------------------
+
+::
+
+    $ cd mydjangoproject
+    $ env DJANGO_SETTINGS_MODULE=settings crunchd
+    [....]
+    [2009-04-23 17:44:05,115: INFO/Process-1] Did something: foo bar baz
+    [2009-04-23 17:44:05,118: INFO/MainProcess] Waiting for queue.
+
+
+
+
+Autodiscovery of tasks
+-----------------------
+
+``crunchy`` has an autodiscovery feature like the Django Admin, that
+automatically loads any ``tasks.py`` module in the applications listed
+in ``settings.INSTALLED_APPS``.
+
+A good place to add this command could be in your ``urls.py``,
+::
+
+    from crunchy.task import tasks
+    tasks.autodiscover()
+
+
+
+Then you can add new tasks in your applications ``tasks.py`` module,
+::
+
+    from crunchy.task import tasks
+    from crunchy.log import setup_logger
+    from clickcounter.models import ClickCount
+
+    def increment_click(for_url, **kwargs):
+        logger = setup_logger(**kwargs)
+        clicks_for_url, cr = ClickCount.objects.get_or_create(url=for_url)
+        clicks_for_url.clicks = clicks_for_url.clicks + 1
+        clicks_for_url.save()
+        logger.info("Incremented click count for %s (not at %d)" % (
+                        for_url, clicks_for_url.clicks)
+
+License
+=======
+
+This software is licensed under the ``New BSD License``. See the ``LICENSE``
+file in the top distribution directory for the full license text.
+
+.. # vim: syntax=rst expandtab tabstop=4 shiftwidth=4 shiftround

+ 7 - 0
crunchy/__init__.py

@@ -0,0 +1,7 @@
+"""Distributed Task Queue for Django"""
+VERSION = (0, 1, 0)
+__version__ = ".".join(map(str, VERSION))
+__author__ = "Ask Solem"
+__contact__ = "askh@opera.com"
+__homepage__ = "http://github.com/ask/crunchy/"
+__docformat__ = "restructuredtext"

+ 0 - 0
crunchy/bin/__init__.py


+ 83 - 0
crunchy/bin/crunchd

@@ -0,0 +1,83 @@
+#!/usr/bin/env python
+import os
+import sys
+sys.path.append(os.getcwd())
+from django.conf import settings
+from crunchy.platform import PIDFile, daemonize, remove_pidfile
+from crunchy.log import setup_logger
+from crunchy.conf import LOG_LEVELS, DAEMON_LOG_FILE, DAEMON_LOG_LEVEL
+from crunchy.conf import DAEMON_CONCURRENCY, DAEMON_PID_FILE
+from crunchy.conf import QUEUE_WAKEUP_AFTER
+from crunchy import discovery
+from crunchy.worker import TaskDaemon
+import traceback
+import optparse
+import atexit
+
+
+def main(concurrency=DAEMON_CONCURRENCY, daemon=False,
+        loglevel=DAEMON_LOG_LEVEL, logfile=DAEMON_LOG_FILE,
+        pidfile=DAEMON_PID_FILE, queue_wakeup_after=QUEUE_WAKEUP_AFTER):
+    if settings.DATABASE_ENGINE == "sqlite3" and concurrency > 1:
+        import warnings
+        warnings.warn("The sqlite3 database engine doesn't support "
+                "concurrency. We'll be using a single process only.",
+                UserWarning)
+        concurrency = 1
+    if daemon:
+        sys.stderr.write("Launching crunchd in the background...\n")
+        pidfile_handler = PIDFile(pidfile)
+        pidfile_handler.check()
+        daemonize(pidfile=pidfile_handler)
+        atexit.register(remove_pidfile, pidfile)
+    else:
+        logfile = None # log to stderr when not running as daemon.
+
+    discovery.autodiscover()
+    crunchd = TaskDaemon(concurrency=concurrency,
+                               loglevel=loglevel,
+                               logfile=logfile,
+                               queue_wakeup_after=queue_wakeup_after)
+    try:
+        crunchd.run()
+    except Exception, e:
+        raise
+        emergency_error(logfile, "crunchd raised exception %s: %s\n%s" % (
+                            e.__class__, e, traceback.format_exc()))
+
+
+def parse_options(arguments):
+    parser = optparse.OptionParser()
+    parser.add_option('-c', '--concurrency', default=DAEMON_CONCURRENCY,
+            action="store", dest="concurrency", type="int",
+            help="Number of child processes processing the queue.")
+    parser.add_option('-f', '--logfile', default=DAEMON_LOG_FILE,
+            action="store", dest="logfile",
+            help="Path to log file.")
+    parser.add_option('-l', '--loglevel', default=DAEMON_LOG_LEVEL,
+            action="store", dest="loglevel",
+            help="Choose between DEBUG/INFO/WARNING/ERROR/CRITICAL/FATAL.")
+    parser.add_option('-p', '--pidfile', default=DAEMON_PID_FILE,
+            action="store", dest="pidfile",
+            help="Path to PID file.")
+    parser.add_option('-w', '--wakeup-after', default=QUEUE_WAKEUP_AFTER,
+            action="store", dest="queue_wakeup_after",
+            help="If the queue is empty, this is the time *in seconds* the "
+                 "daemon sleeps until it wakes up to check if there's any "
+                 "new messages on the queue.")
+    parser.add_option('-d', '--daemon', default=False,
+            action="store_true", dest="daemon",
+            help="Run in background as a daemon.")
+    options, values = parser.parse_args(arguments)
+    if not isinstance(options.loglevel, int):
+        options.loglevel = LOG_LEVELS[options.loglevel.upper()]
+    return options
+
+if __name__ == "__main__":
+    options = parse_options(sys.argv[1:])
+    main(concurrency=options.concurrency,
+         daemon=options.daemon,
+         logfile=options.logfile,
+         loglevel=options.loglevel,
+         pidfile=options.pidfile,
+         queue_wakeup_after=options.queue_wakeup_after)

+ 51 - 0
crunchy/conf.py

@@ -0,0 +1,51 @@
+from django.conf import settings
+import logging
+
+# The number of processes to work simultaneously at processing the queue.
+DEFAULT_DAEMON_CONCURRENCY = 10
+
+# If the queue is empty, this is the time *in seconds* the daemon sleeps
+# until it wakes up to check if there's any new messages on the queue.
+DEFAULT_QUEUE_WAKEUP_AFTER = 0.3 
+
+# As long as the queue is empty, the daemon logs a "Queue is empty" message
+# every ``EMPTY_MSG_EMIT_EVERY`` *seconds*.
+DEFAULT_EMPTY_MSG_EMIT_EVERY = 5 
+
+DEFAULT_DAEMON_PID_FILE = "crunchd.pid"
+
+# The format we log messages in.
+DEFAULT_LOG_FMT = '[%(asctime)s: %(levelname)s/%(processName)s] %(message)s'
+
+# Default log level [DEBUG|INFO|WARNING|ERROR|CRITICAL|FATAL]
+DEFAULT_DAEMON_LOG_LEVEL = "INFO"
+
+# Default log file
+DEFAULT_DAEMON_LOG_FILE = "refreshd.log"
+
+# Table of loglevels to constants for use in settings.py.
+LOG_LEVELS = {
+    "DEBUG": logging.DEBUG,
+    "INFO": logging.INFO,
+    "WARNING": logging.WARNING,
+    "WARN": logging.WARNING,
+    "ERROR": logging.ERROR,
+    "CRITICAL": logging.CRITICAL,
+    "FATAL": logging.FATAL,
+}
+
+LOG_FORMAT = getattr(settings, "DJANGOFEEDS_DAEMON_LOG_FORMAT",
+                            DEFAULT_LOG_FMT)
+DAEMON_LOG_FILE = getattr(settings, "DJANGOFEEDS_LOG_FILE",
+                            DEFAULT_DAEMON_LOG_FILE)
+DAEMON_LOG_LEVEL = LOG_LEVELS[getattr(settings, "DJANGOFEEDS_DAEMON_LOG_LEVEL",
+                               DEFAULT_DAEMON_LOG_LEVEL).upper()]
+
+QUEUE_WAKEUP_AFTER = getattr(settings, "CRUNCHD_QUEUE_WAKEUP_AFTER",
+                                DEFAULT_QUEUE_WAKEUP_AFTER)
+EMPTY_MSG_EMIT_EVERY = getattr(settings, "CRUNCHD_EMPTY_MSG_EMIT_EVERY",
+                                DEFAULT_EMPTY_MSG_EMIT_EVERY)
+DAEMON_PID_FILE = getattr("settings", "CRUNCHD_PID_FILE",
+                            DEFAULT_DAEMON_PID_FILE)
+DAEMON_CONCURRENCY = getattr("settings", "CRUNCHD_CONCURRENCY",
+                                DEFAULT_DAEMON_CONCURRENCY)

+ 38 - 0
crunchy/discovery.py

@@ -0,0 +1,38 @@
+import imp
+from django.conf import settings
+from django.core import exceptions
+
+
+def autodiscover():
+    """Include tasks for all applications in settings.INSTALLED_APPS."""
+    return filter(None, [tasks_for_app(app)
+                            for app in settings.INSTALLED_APPS])
+
+
+def tasks_for_app(app):
+    """Given an application name, imports any tasks.py file for that app."""
+
+    def found_tasks_module_handler(app_path, app_basename):
+        return __import__("%s.tasks" % app)
+
+    return find_related_module(app, "tasks", found_tasks_module_handler)
+
+
+def find_related_module(app, related_name, handler):
+    """Given an application name and a module name, tries to find that
+    module in the application, and running handler' if it finds it.
+    """
+
+    # See django.contrib.admin.autodiscover for an explanation of this code.
+    try:
+        app_basename = app.split('.')[-1]
+        app_path = __import__(app, {}, {}, app_basename).__path__
+    except AttributeError:
+        return None
+
+    try:
+        imp.find_module(related_name, app_path)
+    except ImportError:
+        return None
+
+    return handler(app_path, app_basename)

+ 32 - 0
crunchy/log.py

@@ -0,0 +1,32 @@
+import multiprocessing
+import os
+import time
+import logging
+from crunchy.conf import LOG_FORMAT, DAEMON_LOG_LEVEL
+
+
+def setup_logger(loglevel=DAEMON_LOG_LEVEL, logfile=None, format=LOG_FORMAT):
+    """Setup the ``multiprocessing`` logger. If ``logfile`` is not specified,
+    ``stderr`` is used.
+    
+    Returns logger object.
+    """
+    logger = multiprocessing.get_logger()
+    if logfile:
+        log_file_handler = logging.FileHandler(logfile)
+        formatter = logging.Formatter(format)
+        log_file_handler.setFormatter(formatter)
+        logger.addHandler(log_file_handler)
+    else:
+        multiprocessing.log_to_stderr()
+    logger.setLevel(loglevel)
+    return logger
+
+
+def emergency_error(logfile, message):
+    logfh = open(logfile, "a")
+    logfh.write("[%(asctime)s: FATAL/%(pid)d]: %(message)s\n" % {
+                    "asctime": time.asctime(),
+                    "pid": os.getpid(),
+                    "message": message})
+    logfh.close()

+ 26 - 0
crunchy/messaging.py

@@ -0,0 +1,26 @@
+from carrot.messaging import Publisher, Consumer
+import uuid
+
+
+class TaskPublisher(Publisher):
+    queue = "crunchy"
+    exchange = "crunchy"
+    routing_key = "crunchy"
+
+    def delay_task(self, task_name, **kwargs):
+        task_id = uuid.uuid4()
+        message_data = dict(kwargs)
+        message_data["crunchTASK"] = task_name
+        message_data["crunchID"] = str(task_id)
+        self.send(message_data)
+        return task_id
+
+
+class TaskConsumer(Consumer):
+    queue = "crunchy"
+    exchange = "crunchy"
+    routing_key = "crunchy"
+
+    def receive(self, message_data, message):
+        raise NotImplementedError(
+                "Don't use process_next() or wait() with the TaskConsumer!")

+ 102 - 0
crunchy/platform.py

@@ -0,0 +1,102 @@
+import os
+import sys
+import errno
+import resource
+
+# File mode creation mask of the daemon.
+# No point in changing this, as we don't really create any files.
+DAEMON_UMASK = 0
+
+# Default working directory for the daemon.
+DAEMON_WORKDIR = "/"
+
+# Default maximum for the number of available file descriptors.
+DAEMON_MAXFD = 1024
+
+# The standard I/O file descriptors are redirected to /dev/null by default.
+if (hasattr(os, "devnull")):
+   REDIRECT_TO = os.devnull
+else:
+   REDIRECT_TO = "/dev/null"
+
+class PIDFile(object):
+
+    def __init__(self, pidfile):
+        self.pidfile = pidfile
+
+    def get_pid(self):
+        pidfile_fh = file(self.pidfile, "r")
+        pid = int(pidfile_fh.read().strip())
+        pidfile_fh.close()
+        return pid
+
+    def check(self):
+        if os.path.exists(self.pidfile) and os.path.isfile(self.pidfile):
+            pid = self.get_pid()
+            try:
+                os.kill(pid, 0)
+            except os.error, e:
+                if e.errno == errno.ESRCH:
+                   sys.stderr.write("Stale pidfile exists. removing it.\n")
+                   self.remove()
+            else:
+                raise SystemExit("refreshd is already running.")
+
+    def remove(self):
+        os.unlink(self.pidfile)
+
+    def write(self, pid=None):
+        if not pid:
+            pid = os.getpid()
+        pidfile_fh = file(self.pidfile, "w")
+        pidfile_fh.write("%d\n" % pid)
+        pidfile_fh.close()
+
+
+def remove_pidfile(pidfile):
+    os.unlink(pidfile)
+
+
+def daemonize(pidfile):
+    """Detach a process from the controlling terminal and run it in the
+    background as a daemon."""
+
+    try:
+        pid = os.fork()
+    except OSError, e:
+        raise Exception, "%s [%d]" % (e.strerror, e.errno)
+
+    if pid == 0: # child
+        os.setsid()
+
+        try:
+            pid = os.fork() # second child
+        except OSError, e:
+            raise Exception, "%s [%d]" % (e.strerror, e.errno)
+
+        if pid == 0: # second child
+            #os.chdir(DAEMON_WORKDIR)
+            os.umask(DAEMON_UMASK)
+        else: # parent (first child)
+            pidfile.write(pid)
+            os._exit(0)
+    else: # root process
+        os._exit(0)
+
+    maxfd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
+    if (maxfd == resource.RLIM_INFINITY):
+        maxfd = DAEMON_MAXFD
+  
+    # Iterate through and close all file descriptors.
+    for fd in range(0, maxfd):
+        try:
+            os.close(fd)
+        except OSError:
+            pass
+
+    os.open(REDIRECT_TO, os.O_RDWR)
+    # Duplicate standard input to standard output and standard error.
+    os.dup2(0, 1) 
+    os.dup2(0, 2)
+
+    return 0

+ 25 - 0
crunchy/process.py

@@ -0,0 +1,25 @@
+from UserList import UserList
+
+
+class ProcessQueue(UserList):
+    """Queue of running child processes, which starts waiting for the
+    processes to finish when the queue limit is reached."""
+
+    def __init__(self, limit, logger=None, done_msg=None):
+        self.limit = limit
+        self.logger = logger
+        self.done_msg = done_msg
+        self.data = []
+        
+    def add(self, result, task_name, task_id):
+        self.data.append([result, task_name, task_id])
+
+        if self.data and len(self.data) >= self.limit:
+            for result, task_name, task_id in self.data:
+                ret_value = result.get()
+                if self.done_msg and self.logger:
+                    self.logger.info(self.done_msg % {
+                        "name": task_name,
+                        "id": task_id,
+                        "return_value": ret_value})
+            self.data = []

+ 47 - 0
crunchy/registry.py

@@ -0,0 +1,47 @@
+from crunchy import discovery
+from UserDict import UserDict
+
+
+class NotRegistered(Exception):
+    """The task is not registered."""
+
+
+class AlreadyRegistered(Exception):
+    """The task is already registered."""
+
+
+class TaskRegistry(UserDict):
+    """Site registry for tasks."""
+
+    AlreadyRegistered = AlreadyRegistered
+    NotRegistered = NotRegistered
+
+    def __init__(self):
+        self.data = {}
+
+    def autodiscover(self):
+        discovery.autodiscover()
+
+    def register(self, task_name, task_func):
+        if task_name in self.data:
+            raise self.AlreadyRegistered(
+                    "Task with name %s is already registered." % task_name)
+        
+        self.data[task_name] = task_func
+
+    def unregister(self, task_name):
+        if task_name not in self.data:
+            raise self.NotRegistered(
+                    "Task with name %s is not registered." % task_name)
+        del self.data[task_name]
+
+    def get_all(self):
+        """Get all task types."""
+        return self.data
+
+    def get_task(self, task_name):
+        """Get task by name."""
+        return self.data[task_name]
+
+"""This is the global task registry."""
+tasks = TaskRegistry()

+ 21 - 0
crunchy/task.py

@@ -0,0 +1,21 @@
+from carrot.connection import DjangoAMQPConnection
+from crunchy.messaging import TaskPublisher, TaskConsumer
+from crunchy.registry import tasks
+from crunchy.discovery import autodiscover
+
+
+def delay_task(task_name, **kwargs):
+    #if task_name not in tasks:
+    #    raise tasks.NotRegistered(
+    #            "Task with name %s not registered in the task registry." % (
+    #                task_name))
+    publisher = TaskPublisher(connection=DjangoAMQPConnection)
+    task_id = publisher.delay_task(task_name, **kwargs)
+    publisher.close()
+    return task_id
+
+def discard_all():
+    consumer = TaskConsumer(connection=DjangoAMQPConnection)
+    discarded_count = consumer.discard_all()
+    consumer.close()
+    return discarded_count

+ 99 - 0
crunchy/worker.py

@@ -0,0 +1,99 @@
+from carrot.connection import DjangoAMQPConnection
+from crunchy.messaging import TaskConsumer
+from crunchy.conf import DAEMON_CONCURRENCY, DAEMON_LOG_FILE
+from crunchy.conf import QUEUE_WAKEUP_AFTER, EMPTY_MSG_EMIT_EVERY
+from crunchy.log import setup_logger
+from crunchy.registry import tasks
+from crunchy.process import ProcessQueue
+import multiprocessing
+import simplejson
+import traceback
+import logging
+import time
+
+
+class EmptyQueue(Exception):
+    """The message queue is currently empty."""
+
+
+class UnknownTask(Exception):
+    """Got an unknown task in the queue. The message is requeued and
+    ignored."""
+
+
+class TaskDaemon(object):
+    """Refreshes feed_urls in the queue using a process pool.
+
+    ``concurrency`` is the number of simultaneous processes.
+    """
+    loglevel = logging.ERROR
+    concurrency = DAEMON_CONCURRENCY
+    logfile = DAEMON_LOG_FILE
+    queue_wakeup_after = QUEUE_WAKEUP_AFTER
+    
+    def __init__(self, concurrency=None, logfile=None, loglevel=None,
+            queue_wakeup_after=None):
+        self.loglevel = loglevel or self.loglevel
+        self.concurrency = concurrency or self.concurrency
+        self.logfile = logfile or self.logfile
+        self.queue_wakeup_after = queue_wakeup_after or \
+                                    self.queue_wakeup_after
+        self.logger = setup_logger(loglevel, logfile)
+        self.pool = multiprocessing.Pool(self.concurrency)
+        self.task_consumer = TaskConsumer(connection=DjangoAMQPConnection)
+        self.task_registry = tasks
+
+    def fetch_next_task(self):
+        message = self.task_consumer.fetch()
+        if message is None: # No messages waiting.
+            raise EmptyQueue()
+
+        message_data = simplejson.loads(message.body)
+        task_name = message_data.pop("crunchTASK")
+        task_id = message_data.pop("crunchID")
+        self.logger.info("Got task from broker: %s[%s]" % (
+                            task_name, task_id))
+        if task_name not in self.task_registry:
+            message.reject()
+            raise UnknownTask(task_name)
+
+        task_func = self.task_registry[task_name]
+        task_func_params = {"loglevel": self.loglevel,
+                            "logfile": self.logfile}
+        task_func_params.update(message_data)
+
+        #try:
+        result = self.pool.apply_async(task_func, [], task_func_params)
+        #except:
+        #    message.reject()
+        #    raise
+
+
+        message.ack()
+        return result, task_name, task_id
+
+    def run(self):
+        results = ProcessQueue(self.concurrency, logger=self.logger,
+                done_msg="Task %(name)s[%(id)s] processed: %(return_value)s")
+        last_empty_emit = None
+
+        while True:
+            try:
+                result, task_name, task_id = self.fetch_next_task()
+            except EmptyQueue:
+                if not last_empty_emit or \
+                        time.time() > last_empty_emit + EMPTY_MSG_EMIT_EVERY:
+                    self.logger.info("Waiting for queue.")
+                    last_empty_emit = time.time()
+                time.sleep(self.queue_wakeup_after)
+                continue
+            except UnknownTask, e:
+                self.logger.info("Unknown task %s requeued and ignored." % (
+                                    task_name))
+                continue
+            #except Exception, e:
+            #    self.logger.critical("Raised %s: %s\n%s" % (
+            #                 e.__class__, e, traceback.format_exc()))
+            #    continue
+           
+            results.add(result, task_name, task_id)

+ 52 - 0
setup.py

@@ -0,0 +1,52 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+import codecs
+import sys
+
+try:
+    from setuptools import setup, find_packages
+except ImportError:
+    from ez_setup import use_setuptools
+    use_setuptools()
+    from setuptools import setup, find_packages
+
+import crunchy
+
+install_requires = ["carrot", "django"]
+py_version_info = sys.version_info
+py_major_version = py_version_info[0]
+py_minor_version = py_version_info[1]
+
+if (py_major_version == 2 and py_minor_version <=5) or py_major_version < 2:
+    install_requires.append("multiprocessing")    
+
+setup(
+    name='crunchy',
+    version=crunchy.__version__,
+    description=crunchy.__doc__,
+    author=crunchy.__author__,
+    author_email=crunchy.__contact__,
+    url=crunchy.__homepage__,
+    platforms=["any"],
+    packages=find_packages(exclude=['ez_setup']),
+    scripts=["crunchy/bin/crunchd"],
+    install_requires=[
+        'simplejson',
+        'carrot',
+        'django',
+    ],
+    classifiers=[
+        "Development Status :: 3 - Alpha",
+        "Framework :: Django",
+        "Operating System :: OS Independent",
+        "Programming Language :: Python",
+        "Environment :: No Input / Output (Daemon)",
+        "Intended Audience :: Developers",
+        "License :: BSD License",
+        "Operating System :: POSIX",
+        "Topic :: Communications",
+        "Topic :: System :: Distributed Computing",
+        "Topic :: Software Development :: Libraries :: Python Modules",
+    ],
+    long_description=codecs.open('README', "r", "utf-8").read(),
+)