Browse Source

Billiard moved back to celery repo, as Debian doesn't approve much of it.

See http://bugs.debian.org/cgi-bin/bugreport.cgi?bug=571656
Ask Solem 14 years ago
parent
commit
4e57d705e9

+ 1 - 2
celery/backends/__init__.py

@@ -1,7 +1,6 @@
-from billiard.utils.functional import curry
-
 from celery import conf
 from celery import conf
 from celery.utils import get_cls_by_name
 from celery.utils import get_cls_by_name
+from celery.utils.functional import curry
 from celery.loaders import current_loader
 from celery.loaders import current_loader
 
 
 BACKEND_ALIASES = {
 BACKEND_ALIASES = {

+ 2 - 4
celery/backends/base.py

@@ -1,13 +1,11 @@
 """celery.backends.base"""
 """celery.backends.base"""
 import time
 import time
 
 
-from billiard.serialization import pickle
-from billiard.serialization import get_pickled_exception
-from billiard.serialization import get_pickleable_exception
-
 from celery import conf
 from celery import conf
 from celery import states
 from celery import states
 from celery.exceptions import TimeoutError, TaskRevokedError
 from celery.exceptions import TimeoutError, TaskRevokedError
+from celery.serialization import pickle, get_pickled_exception
+from celery.serialization import get_pickleable_exception
 from celery.datastructures import LocalCache
 from celery.datastructures import LocalCache
 
 
 
 

+ 1 - 1
celery/backends/mongodb.py

@@ -1,7 +1,6 @@
 """MongoDB backend for celery."""
 """MongoDB backend for celery."""
 from datetime import datetime
 from datetime import datetime
 
 
-from billiard.serialization import pickle
 try:
 try:
     import pymongo
     import pymongo
 except ImportError:
 except ImportError:
@@ -12,6 +11,7 @@ from celery import states
 from celery.loaders import load_settings
 from celery.loaders import load_settings
 from celery.backends.base import BaseDictBackend
 from celery.backends.base import BaseDictBackend
 from celery.exceptions import ImproperlyConfigured
 from celery.exceptions import ImproperlyConfigured
+from celery.serialization import pickle
 
 
 
 
 class Bunch:
 class Bunch:

+ 972 - 0
celery/concurrency/processes.py

@@ -0,0 +1,972 @@
+#
+# Module providing the `Pool` class for managing a process pool
+#
+# multiprocessing/pool.py
+#
+# Copyright (c) 2007-2008, R Oudkerk --- see COPYING.txt
+#
+
+__all__ = ['Pool']
+
+#
+# Imports
+#
+
+import os
+import errno
+import threading
+import Queue
+import itertools
+import collections
+import time
+import signal
+
+from multiprocessing import Process, cpu_count, TimeoutError
+from multiprocessing.util import Finalize, debug
+
+#
+# Constants representing the state of a pool
+#
+
+RUN = 0
+CLOSE = 1
+TERMINATE = 2
+
+# Signal used for soft time limits.
+SIG_SOFT_TIMEOUT = getattr(signal, "SIGUSR1", None)
+
+#
+# Miscellaneous
+#
+
+job_counter = itertools.count()
+
+def mapstar(args):
+    return map(*args)
+
+#
+# Code run by worker processes
+#
+
+class TimeLimitExceeded(Exception):
+    """The time limit has been exceeded and the job has been terminated."""
+
+class SoftTimeLimitExceeded(Exception):
+    """The soft time limit has been exceeded. This exception is raised
+    to give the job a chance to clean up."""
+
+def soft_timeout_sighandler(signum, frame):
+    raise SoftTimeLimitExceeded()
+
+def worker(inqueue, outqueue, ackqueue, initializer=None, initargs=(),
+        maxtasks=None):
+    assert maxtasks is None or (type(maxtasks) == int and maxtasks > 0)
+    pid = os.getpid()
+    put = outqueue.put
+    get = inqueue.get
+    ack = ackqueue.put
+    if hasattr(inqueue, '_writer'):
+        inqueue._writer.close()
+        outqueue._reader.close()
+
+    if initializer is not None:
+        initializer(*initargs)
+
+    if SIG_SOFT_TIMEOUT is not None:
+        signal.signal(SIG_SOFT_TIMEOUT, soft_timeout_sighandler)
+
+    completed = 0
+    while maxtasks is None or (maxtasks and completed < maxtasks):
+        try:
+            task = get()
+        except (EOFError, IOError):
+            debug('worker got EOFError or IOError -- exiting')
+            break
+
+        if task is None:
+            debug('worker got sentinel -- exiting')
+            break
+
+        job, i, func, args, kwds = task
+        ack((job, i, time.time(), pid))
+        try:
+            result = (True, func(*args, **kwds))
+        except Exception, e:
+            result = (False, e)
+        put((job, i, result))
+        completed += 1
+    debug('worker exiting after %d tasks' % completed)
+
+
+#
+# Class representing a process pool
+#
+
+
+class PoolThread(threading.Thread):
+
+    def __init__(self, *args, **kwargs):
+        threading.Thread.__init__(self)
+        self._state = RUN
+        self.daemon = True
+
+    def terminate(self):
+        self._state = TERMINATE
+
+    def close(self):
+        self._state = CLOSE
+
+
+class Supervisor(PoolThread):
+
+    def __init__(self, pool):
+        self.pool = pool
+        super(Supervisor, self).__init__()
+
+    def run(self):
+        debug('worker handler starting')
+        while self._state == RUN and self.pool._state == RUN:
+            self.pool._maintain_pool()
+            time.sleep(0.1)
+        debug('worker handler exiting')
+
+
+class TaskHandler(PoolThread):
+
+    def __init__(self, taskqueue, put, outqueue, pool):
+        self.taskqueue = taskqueue
+        self.put = put
+        self.outqueue = outqueue
+        self.pool = pool
+        super(TaskHandler, self).__init__()
+
+    def run(self):
+        taskqueue = self.taskqueue
+        outqueue = self.outqueue
+        put = self.put
+        pool = self.pool
+
+        for taskseq, set_length in iter(taskqueue.get, None):
+            i = -1
+            for i, task in enumerate(taskseq):
+                if self._state:
+                    debug('task handler found thread._state != RUN')
+                    break
+                try:
+                    put(task)
+                except IOError:
+                    debug('could not put task on queue')
+                    break
+            else:
+                if set_length:
+                    debug('doing set_length()')
+                    set_length(i+1)
+                continue
+            break
+        else:
+            debug('task handler got sentinel')
+
+        try:
+            # tell result handler to finish when cache is empty
+            debug('task handler sending sentinel to result handler')
+            outqueue.put(None)
+
+            # tell workers there is no more work
+            debug('task handler sending sentinel to workers')
+            for p in pool:
+                put(None)
+        except IOError:
+            debug('task handler got IOError when sending sentinels')
+
+        debug('task handler exiting')
+
+
+class AckHandler(PoolThread):
+
+    def __init__(self, ackqueue, get, cache):
+        self.ackqueue = ackqueue
+        self.get = get
+        self.cache = cache
+
+        super(AckHandler, self).__init__()
+
+    def run(self):
+        debug('ack handler starting')
+        ackqueue = self.ackqueue
+        get = self.get
+        cache = self.cache
+
+        while 1:
+            try:
+                task = get()
+            except (IOError, EOFError), exc:
+                debug('ack handler got %s -- exiting',
+                        exc.__class__.__name__)
+
+            if self._state:
+                assert self._state == TERMINATE
+                debug('ack handler found thread._state=TERMINATE')
+                break
+
+            if task is None:
+                debug('ack handler got sentinel')
+                break
+
+            job, i, time_accepted, pid = task
+            try:
+                cache[job]._ack(i, time_accepted, pid)
+            except (KeyError, AttributeError), exc:
+                # Object gone, or doesn't support _ack (e.g. IMapIterator)
+                pass
+
+        while cache and self._state != TERMINATE:
+            try:
+                task = get()
+            except (IOError, EOFError), exc:
+                debug('ack handler got %s -- exiting',
+                        exc.__class__.__name__)
+                return
+
+            if task is None:
+                debug('result handler ignoring extra sentinel')
+                continue
+
+            job, i, time_accepted, pid = task
+            try:
+                cache[job]._ack(i, time_accepted, pid)
+            except KeyError:
+                pass
+
+        debug('ack handler exiting: len(cache)=%s, thread._state=%s',
+                len(cache), self._state)
+
+
+class TimeoutHandler(PoolThread):
+
+    def __init__(self, processes, sentinel_event, cache, t_soft, t_hard):
+        self.pool = pool
+        self.sentinel_event = sentinel_event
+        self.cache = cache
+        self.t_soft = t_soft
+        self.t_hard = t_hard
+        super(TimeoutHandler, self).__init__()
+
+    def run(self):
+        processes = self.processes
+        cache = self.cache
+        t_hard, t_soft = self.t_hard, self.t_soft
+        dirty = set()
+
+        def _process_by_pid(pid):
+            for index, process in enumerate(processes):
+                if process.pid == pid:
+                    return process, index
+            return None, None
+
+        def _pop_by_pid(pid):
+            process, index = _process_by_pid(pid)
+            if not process:
+                return
+            p = processes.pop(index)
+            assert p is process
+            return process
+
+        def _timed_out(start, timeout):
+            if not start or not timeout:
+                return False
+            if time.time() >= start + timeout:
+                return True
+
+        def _on_soft_timeout(job, i):
+            debug('soft time limit exceeded for %i' % i)
+            process, _index = _process_by_pid(job._accept_pid)
+            if not process:
+                return
+
+            # Run timeout callback
+            if job._timeout_callback is not None:
+                job._timeout_callback(soft=True)
+
+            try:
+                os.kill(job._accept_pid, SIG_SOFT_TIMEOUT)
+            except OSError, exc:
+                if exc.errno == errno.ESRCH:
+                    pass
+                else:
+                    raise
+
+            dirty.add(i)
+
+        def _on_hard_timeout(job, i):
+            debug('hard time limit exceeded for %i', i)
+            # Remove from _pool
+            process = _pop_by_pid(job._accept_pid)
+            # Remove from cache and set return value to an exception
+            job._set(i, (False, TimeLimitExceeded()))
+            # Run timeout callback
+            if job._timeout_callback is not None:
+                job._timeout_callback(soft=False)
+            if not process:
+                return
+            # Terminate the process
+            process.terminate()
+
+        # Inner-loop
+        while self._state == RUN:
+
+            # Remove dirty items not in cache anymore
+            if dirty:
+                dirty = set(k for k in dirty if k in cache)
+
+            for i, job in cache.items():
+                ack_time = job._time_accepted
+                if _timed_out(ack_time, t_hard):
+                    _on_hard_timeout(job, i)
+                elif i not in dirty and _timed_out(ack_time, t_soft):
+                    _on_soft_timeout(job, i)
+
+            time.sleep(0.5) # Don't waste CPU cycles.
+
+        debug('timeout handler exiting')
+
+
+class ResultHandler(PoolThread):
+
+    def __init__(self, outqueue, get, cache, putlock):
+        self.outqueue = outqueue
+        self.get = get
+        self.cache = cache
+        self.putlock = putlock
+        super(ResultHandler, self).__init__()
+
+    def run(self):
+        get = self.get
+        outqueue = self.outqueue
+        cache = self.cache
+        putlock = self.putlock
+
+        debug('result handler starting')
+        while 1:
+            try:
+                task = get()
+            except (IOError, EOFError), exc:
+                debug('result handler got %s -- exiting',
+                        exc.__class__.__name__)
+                return
+
+            if putlock is not None:
+                putlock.release()
+
+            if self._state:
+                assert self._state == TERMINATE
+                debug('result handler found thread._state=TERMINATE')
+                break
+
+            if task is None:
+                debug('result handler got sentinel')
+                break
+
+            job, i, obj = task
+            try:
+                cache[job]._set(i, obj)
+            except KeyError:
+                pass
+
+        if putlock is not None:
+            putlock.release()
+
+        while cache and self._state != TERMINATE:
+            try:
+                task = get()
+            except (IOError, EOFError), exc:
+                debug('result handler got %s -- exiting',
+                        exc.__class__.__name__)
+                return
+
+            if task is None:
+                debug('result handler ignoring extra sentinel')
+                continue
+            job, i, obj = task
+            try:
+                cache[job]._set(i, obj)
+            except KeyError:
+                pass
+
+        if hasattr(outqueue, '_reader'):
+            debug('ensuring that outqueue is not full')
+            # If we don't make room available in outqueue then
+            # attempts to add the sentinel (None) to outqueue may
+            # block.  There is guaranteed to be no more than 2 sentinels.
+            try:
+                for i in range(10):
+                    if not outqueue._reader.poll():
+                        break
+                    get()
+            except (IOError, EOFError):
+                pass
+
+        debug('result handler exiting: len(cache)=%s, thread._state=%s',
+              len(cache), self._state)
+
+
+class Pool(object):
+    '''
+    Class which supports an async version of the `apply()` builtin
+    '''
+    Process = Process
+    Supervisor = Supervisor
+    TaskHandler = TaskHandler
+    AckHandler = AckHandler
+    TimeoutHandler = TimeoutHandler
+    ResultHandler = ResultHandler
+    SoftTimeLimitExceeded = SoftTimeLimitExceeded
+
+    def __init__(self, processes=None, initializer=None, initargs=(),
+            maxtasksperchild=None, timeout=None, soft_timeout=None):
+        self._setup_queues()
+        self._taskqueue = Queue.Queue()
+        self._cache = {}
+        self._state = RUN
+        self.timeout = timeout
+        self.soft_timeout = soft_timeout
+        self._maxtasksperchild = maxtasksperchild
+        self._initializer = initializer
+        self._initargs = initargs
+
+        if self.soft_timeout and SIG_SOFT_TIMEOUT is None:
+            raise NotImplementedError("Soft timeouts not supported: "
+                    "Your platform does not have the SIGUSR1 signal.")
+
+        if processes is None:
+            try:
+                processes = cpu_count()
+            except NotImplementedError:
+                processes = 1
+        self._processes = processes
+
+        if initializer is not None and not hasattr(initializer, '__call__'):
+            raise TypeError('initializer must be a callable')
+
+        self._pool = []
+        for i in range(processes):
+            self._create_worker_process()
+
+        self._worker_handler = self.Supervisor(self)
+        self._worker_handler.start()
+
+        self._putlock = threading.Semaphore(self._processes)
+
+        self._task_handler = self.TaskHandler(self._taskqueue, self._quick_put,
+                                         self._outqueue, self._pool)
+        self._task_handler.start()
+
+        # Thread processing acknowledgements from the ackqueue.
+        self._ack_handler = self.AckHandler(self._ackqueue,
+                self._quick_get_ack, self._cache)
+        self._ack_handler.start()
+
+        # Thread killing timedout jobs.
+        if self.timeout or self.soft_timeout:
+            self._timeout_handler = self.TimeoutHandler(
+                    self._pool, self._cache,
+                    self.soft_timeout, self.timeout)
+            self._timeout_handler.start()
+        else:
+            self._timeout_handler = None
+
+        # Thread processing results in the outqueue.
+        self._result_handler = self.ResultHandler(self._outqueue,
+                                        self._quick_get, self._cache,
+                                        self._putlock)
+        self._result_handler.start()
+
+        self._terminate = Finalize(
+            self, self._terminate_pool,
+            args=(self._taskqueue, self._inqueue, self._outqueue,
+                  self._ackqueue, self._pool, self._ack_handler,
+                  self._worker_handler, self._task_handler,
+                  self._result_handler, self._cache,
+                  self._timeout_handler),
+            exitpriority=15
+            )
+
+    def _create_worker_process(self):
+        w = self.Process(
+            target=worker,
+            args=(self._inqueue, self._outqueue, self._ackqueue,
+                    self._initializer, self._initargs, self._maxtasksperchild)
+            )
+        self._pool.append(w)
+        w.name = w.name.replace('Process', 'PoolWorker')
+        w.daemon = True
+        w.start()
+        return w
+
+    def _join_exited_workers(self):
+        """Cleanup after any worker processes which have exited due to
+        reaching their specified lifetime. Returns True if any workers were
+        cleaned up.
+        """
+        for i in reversed(range(len(self._pool))):
+            worker = self._pool[i]
+            if worker.exitcode is not None:
+                # worker exited
+                debug('cleaning up worker %d' % i)
+                worker.join()
+                del self._pool[i]
+        return len(self._pool) < self._processes
+
+    def _repopulate_pool(self):
+        """Bring the number of pool processes up to the specified number,
+        for use after reaping workers which have exited.
+        """
+        debug('repopulating pool')
+        for i in range(self._processes - len(self._pool)):
+            self._create_worker_process()
+            debug('added worker')
+
+    def _maintain_pool(self):
+        """"Clean up any exited workers and start replacements for them.
+        """
+        if self._join_exited_workers():
+            self._repopulate_pool()
+
+    def _setup_queues(self):
+        from multiprocessing.queues import SimpleQueue
+        self._inqueue = SimpleQueue()
+        self._outqueue = SimpleQueue()
+        self._ackqueue = SimpleQueue()
+        self._quick_put = self._inqueue._writer.send
+        self._quick_get = self._outqueue._reader.recv
+        self._quick_get_ack = self._ackqueue._reader.recv
+
+    def apply(self, func, args=(), kwds={}):
+        '''
+        Equivalent of `apply()` builtin
+        '''
+        assert self._state == RUN
+        return self.apply_async(func, args, kwds).get()
+
+    def map(self, func, iterable, chunksize=None):
+        '''
+        Equivalent of `map()` builtin
+        '''
+        assert self._state == RUN
+        return self.map_async(func, iterable, chunksize).get()
+
+    def imap(self, func, iterable, chunksize=1):
+        '''
+        Equivalent of `itertools.imap()` -- can be MUCH slower than `Pool.map()`
+        '''
+        assert self._state == RUN
+        if chunksize == 1:
+            result = IMapIterator(self._cache)
+            self._taskqueue.put((((result._job, i, func, (x,), {})
+                         for i, x in enumerate(iterable)), result._set_length))
+            return result
+        else:
+            assert chunksize > 1
+            task_batches = Pool._get_tasks(func, iterable, chunksize)
+            result = IMapIterator(self._cache)
+            self._taskqueue.put((((result._job, i, mapstar, (x,), {})
+                     for i, x in enumerate(task_batches)), result._set_length))
+            return (item for chunk in result for item in chunk)
+
+    def imap_unordered(self, func, iterable, chunksize=1):
+        '''
+        Like `imap()` method but ordering of results is arbitrary
+        '''
+        assert self._state == RUN
+        if chunksize == 1:
+            result = IMapUnorderedIterator(self._cache)
+            self._taskqueue.put((((result._job, i, func, (x,), {})
+                         for i, x in enumerate(iterable)), result._set_length))
+            return result
+        else:
+            assert chunksize > 1
+            task_batches = Pool._get_tasks(func, iterable, chunksize)
+            result = IMapUnorderedIterator(self._cache)
+            self._taskqueue.put((((result._job, i, mapstar, (x,), {})
+                     for i, x in enumerate(task_batches)), result._set_length))
+            return (item for chunk in result for item in chunk)
+
+    def apply_async(self, func, args=(), kwds={},
+            callback=None, accept_callback=None, timeout_callback=None,
+            waitforslot=False):
+        '''
+        Asynchronous equivalent of `apply()` builtin.
+
+        Callback is called when the functions return value is ready.
+        The accept callback is called when the job is accepted to be executed.
+
+        Simplified the flow is like this:
+
+            >>> if accept_callback:
+            ...     accept_callback()
+            >>> retval = func(*args, **kwds)
+            >>> if callback:
+            ...     callback(retval)
+
+        '''
+        assert self._state == RUN
+        result = ApplyResult(self._cache, callback,
+                             accept_callback, timeout_callback)
+        if waitforslot:
+            self._putlock.acquire()
+        self._taskqueue.put(([(result._job, None, func, args, kwds)], None))
+        return result
+
+    def map_async(self, func, iterable, chunksize=None, callback=None):
+        '''
+        Asynchronous equivalent of `map()` builtin
+        '''
+        assert self._state == RUN
+        if not hasattr(iterable, '__len__'):
+            iterable = list(iterable)
+
+        if chunksize is None:
+            chunksize, extra = divmod(len(iterable), len(self._pool) * 4)
+            if extra:
+                chunksize += 1
+        if len(iterable) == 0:
+            chunksize = 0
+
+        task_batches = Pool._get_tasks(func, iterable, chunksize)
+        result = MapResult(self._cache, chunksize, len(iterable), callback)
+        self._taskqueue.put((((result._job, i, mapstar, (x,), {})
+                              for i, x in enumerate(task_batches)), None))
+        return result
+
+    @staticmethod
+    def _get_tasks(func, it, size):
+        it = iter(it)
+        while 1:
+            x = tuple(itertools.islice(it, size))
+            if not x:
+                return
+            yield (func, x)
+
+    def __reduce__(self):
+        raise NotImplementedError(
+              'pool objects cannot be passed between processes or pickled'
+              )
+
+    def close(self):
+        debug('closing pool')
+        if self._state == RUN:
+            self._state = CLOSE
+            self._worker_handler.close()
+            self._taskqueue.put(None)
+
+    def terminate(self):
+        debug('terminating pool')
+        self._state = TERMINATE
+        self._worker_handler.terminate()
+        self._terminate()
+
+    def join(self):
+        assert self._state in (CLOSE, TERMINATE)
+        self._worker_handler.join()
+        self._task_handler.join()
+        self._result_handler.join()
+        for p in self._pool:
+            p.join()
+        debug('after join()')
+
+    @staticmethod
+    def _help_stuff_finish(inqueue, task_handler, size):
+        # task_handler may be blocked trying to put items on inqueue
+        debug('removing tasks from inqueue until task handler finished')
+        inqueue._rlock.acquire()
+        while task_handler.is_alive() and inqueue._reader.poll():
+            inqueue._reader.recv()
+            time.sleep(0)
+
+    @classmethod
+    def _terminate_pool(cls, taskqueue, inqueue, outqueue, ackqueue, pool,
+                        ack_handler, worker_handler, task_handler,
+                        result_handler, cache, timeout_handler):
+
+        # this is guaranteed to only be called once
+        debug('finalizing pool')
+
+        worker_handler.terminate()
+
+        task_handler.terminate()
+        taskqueue.put(None)                 # sentinel
+
+        debug('helping task handler/workers to finish')
+        cls._help_stuff_finish(inqueue, task_handler, len(pool))
+
+        assert result_handler.is_alive() or len(cache) == 0
+
+        result_handler.terminate()
+        outqueue.put(None)                  # sentinel
+
+        ack_handler.terminate()
+        ackqueue.put(None)                  # sentinel
+
+        if timeout_handler is not None:
+            timeout_handler.terminate()
+
+        # Terminate workers which haven't already finished
+        if pool and hasattr(pool[0], 'terminate'):
+            debug('terminating workers')
+            for p in pool:
+                if p.exitcode is None:
+                    p.terminate()
+
+        debug('joining task handler')
+        task_handler.join(1e100)
+
+        debug('joining result handler')
+        result_handler.join(1e100)
+
+        debug('joining ack handler')
+        ack_handler.join(1e100)
+
+        if timeout_handler is not None:
+            debug('joining timeout handler')
+            timeout_handler.join(1e100)
+
+        if pool and hasattr(pool[0], 'terminate'):
+            debug('joining pool workers')
+            for p in pool:
+                if p.is_alive():
+                    # worker has not yet exited
+                    debug('cleaning up worker %d' % p.pid)
+                    p.join()
+DynamicPool = Pool
+
+#
+# Class whose instances are returned by `Pool.apply_async()`
+#
+
+class ApplyResult(object):
+
+    def __init__(self, cache, callback, accept_callback=None,
+            timeout_callback=None):
+        self._cond = threading.Condition(threading.Lock())
+        self._job = job_counter.next()
+        self._cache = cache
+        self._accepted = False
+        self._accept_pid = None
+        self._time_accepted = None
+        self._ready = False
+        self._callback = callback
+        self._accept_callback = accept_callback
+        self._timeout_callback = timeout_callback
+        cache[self._job] = self
+
+    def ready(self):
+        return self._ready
+
+    def accepted(self):
+        return self._accepted
+
+    def successful(self):
+        assert self._ready
+        return self._success
+
+    def wait(self, timeout=None):
+        self._cond.acquire()
+        try:
+            if not self._ready:
+                self._cond.wait(timeout)
+        finally:
+            self._cond.release()
+
+    def get(self, timeout=None):
+        self.wait(timeout)
+        if not self._ready:
+            raise TimeoutError
+        if self._success:
+            return self._value
+        else:
+            raise self._value
+
+    def _set(self, i, obj):
+        self._success, self._value = obj
+        if self._callback and self._success:
+            self._callback(self._value)
+        self._cond.acquire()
+        try:
+            self._ready = True
+            self._cond.notify()
+        finally:
+            self._cond.release()
+        if self._accepted:
+            del self._cache[self._job]
+
+    def _ack(self, i, time_accepted, pid):
+        self._accepted = True
+        self._time_accepted = time_accepted
+        self._accept_pid = pid
+        if self._accept_callback:
+            self._accept_callback()
+        if self._ready:
+            del self._cache[self._job]
+
+#
+# Class whose instances are returned by `Pool.map_async()`
+#
+
+class MapResult(ApplyResult):
+
+    def __init__(self, cache, chunksize, length, callback):
+        ApplyResult.__init__(self, cache, callback)
+        self._success = True
+        self._value = [None] * length
+        self._chunksize = chunksize
+        if chunksize <= 0:
+            self._number_left = 0
+            self._ready = True
+        else:
+            self._number_left = length//chunksize + bool(length % chunksize)
+
+    def _set(self, i, success_result):
+        success, result = success_result
+        if success:
+            self._value[i*self._chunksize:(i+1)*self._chunksize] = result
+            self._number_left -= 1
+            if self._number_left == 0:
+                if self._callback:
+                    self._callback(self._value)
+                del self._cache[self._job]
+                self._cond.acquire()
+                try:
+                    self._ready = True
+                    self._cond.notify()
+                finally:
+                    self._cond.release()
+
+        else:
+            self._success = False
+            self._value = result
+            del self._cache[self._job]
+            self._cond.acquire()
+            try:
+                self._ready = True
+                self._cond.notify()
+            finally:
+                self._cond.release()
+
+#
+# Class whose instances are returned by `Pool.imap()`
+#
+
+class IMapIterator(object):
+
+    def __init__(self, cache):
+        self._cond = threading.Condition(threading.Lock())
+        self._job = job_counter.next()
+        self._cache = cache
+        self._items = collections.deque()
+        self._index = 0
+        self._length = None
+        self._unsorted = {}
+        cache[self._job] = self
+
+    def __iter__(self):
+        return self
+
+    def next(self, timeout=None):
+        self._cond.acquire()
+        try:
+            try:
+                item = self._items.popleft()
+            except IndexError:
+                if self._index == self._length:
+                    raise StopIteration
+                self._cond.wait(timeout)
+                try:
+                    item = self._items.popleft()
+                except IndexError:
+                    if self._index == self._length:
+                        raise StopIteration
+                    raise TimeoutError
+        finally:
+            self._cond.release()
+
+        success, value = item
+        if success:
+            return value
+        raise value
+
+    __next__ = next                    # XXX
+
+    def _set(self, i, obj):
+        self._cond.acquire()
+        try:
+            if self._index == i:
+                self._items.append(obj)
+                self._index += 1
+                while self._index in self._unsorted:
+                    obj = self._unsorted.pop(self._index)
+                    self._items.append(obj)
+                    self._index += 1
+                self._cond.notify()
+            else:
+                self._unsorted[i] = obj
+
+            if self._index == self._length:
+                del self._cache[self._job]
+        finally:
+            self._cond.release()
+
+    def _set_length(self, length):
+        self._cond.acquire()
+        try:
+            self._length = length
+            if self._index == self._length:
+                self._cond.notify()
+                del self._cache[self._job]
+        finally:
+            self._cond.release()
+
+#
+# Class whose instances are returned by `Pool.imap_unordered()`
+#
+
+class IMapUnorderedIterator(IMapIterator):
+
+    def _set(self, i, obj):
+        self._cond.acquire()
+        try:
+            self._items.append(obj)
+            self._index += 1
+            self._cond.notify()
+            if self._index == self._length:
+                del self._cache[self._job]
+        finally:
+            self._cond.release()
+
+#
+#
+#
+
+class ThreadPool(Pool):
+
+    from multiprocessing.dummy import Process
+
+    def __init__(self, processes=None, initializer=None, initargs=()):
+        Pool.__init__(self, processes, initializer, initargs)
+
+    def _setup_queues(self):
+        self._inqueue = Queue.Queue()
+        self._outqueue = Queue.Queue()
+        self._ackqueue = Queue.Queue()
+        self._quick_put = self._inqueue.put
+        self._quick_get = self._outqueue.get
+        self._quick_get_ack = self._ackqueue.get
+
+    @staticmethod
+    def _help_stuff_finish(inqueue, task_handler, size):
+        # put sentinels at head of inqueue to make workers finish
+        inqueue.not_empty.acquire()
+        try:
+            inqueue.queue.clear()
+            inqueue.queue.extend([None] * size)
+            inqueue.not_empty.notify_all()
+        finally:
+            inqueue.not_empty.release()

+ 1 - 1
celery/concurrency/threads.py

@@ -1,9 +1,9 @@
 
 
 import threading
 import threading
-from billiard.utils.functional import curry
 from threadpool import ThreadPool, WorkRequest
 from threadpool import ThreadPool, WorkRequest
 
 
 from celery import log
 from celery import log
+from celery.utils.functional import curry
 from celery.datastructures import ExceptionInfo
 from celery.datastructures import ExceptionInfo
 
 
 
 

+ 1 - 2
celery/decorators.py

@@ -5,9 +5,8 @@ Decorators
 """
 """
 from inspect import getargspec
 from inspect import getargspec
 
 
-from billiard.utils.functional import wraps
-
 from celery.task.base import Task, PeriodicTask
 from celery.task.base import Task, PeriodicTask
+from celery.utils.functional import wraps
 
 
 
 
 def task(*args, **options):
 def task(*args, **options):

+ 2 - 2
celery/exceptions.py

@@ -3,7 +3,7 @@
 Common Exceptions
 Common Exceptions
 
 
 """
 """
-from billiard.pool import SoftTimeLimitExceeded as _SoftTimeLimitExceeded
+from celery.concurrency.processes import SoftTimeLimitExceeded as _STLE
 
 
 UNREGISTERED_FMT = """
 UNREGISTERED_FMT = """
 Task of kind %s is not registered, please make sure it's imported.
 Task of kind %s is not registered, please make sure it's imported.
@@ -14,7 +14,7 @@ class RouteNotFound(KeyError):
     """Task routed to a queue not in the routing table (CELERY_QUEUES)."""
     """Task routed to a queue not in the routing table (CELERY_QUEUES)."""
 
 
 
 
-class SoftTimeLimitExceeded(_SoftTimeLimitExceeded):
+class SoftTimeLimitExceeded(_STLE):
     """The soft time limit has been exceeded. This exception is raised
     """The soft time limit has been exceeded. This exception is raised
     to give the task a chance to clean up."""
     to give the task a chance to clean up."""
     pass
     pass

+ 1 - 1
celery/messaging.py

@@ -9,11 +9,11 @@ from itertools import count
 
 
 from carrot.connection import DjangoBrokerConnection
 from carrot.connection import DjangoBrokerConnection
 from carrot.messaging import Publisher, Consumer, ConsumerSet as _ConsumerSet
 from carrot.messaging import Publisher, Consumer, ConsumerSet as _ConsumerSet
-from billiard.utils.functional import wraps
 
 
 from celery import conf
 from celery import conf
 from celery import signals
 from celery import signals
 from celery.utils import gen_unique_id, mitemgetter, noop
 from celery.utils import gen_unique_id, mitemgetter, noop
+from celery.utils.functional import wraps
 from celery.routes import lookup_route, expand_destination
 from celery.routes import lookup_route, expand_destination
 from celery.loaders import load_settings
 from celery.loaders import load_settings
 
 

+ 137 - 0
celery/serialization.py

@@ -0,0 +1,137 @@
+import sys
+import types
+import operator
+try:
+    import cPickle as pickle
+except ImportError:
+    import pickle
+from copy import deepcopy
+
+try:
+    _error_bases = (BaseException, )
+except NameError:
+    _error_bases = (SystemExit, KeyboardInterrupt)
+
+
+def is_unwanted_exception_cls(exc_cls):
+    unwanted_classes = (Exception, ) + _error_bases + (object, )
+    for unwanted_cls in unwanted_classes:
+        if exc_cls is unwanted_cls:
+            return True
+    return False
+
+if sys.version_info < (2, 5):
+
+    # Prior to Python 2.5, Exception was an old-style class
+    def subclass_exception(name, parent, unused):
+        return types.ClassType(name, (parent,), {})
+else:
+    def subclass_exception(name, parent, module):
+        return type(name, (parent,), {'__module__': module})
+
+
+def find_nearest_pickleable_exception(exc):
+    """With an exception instance, iterate over its super classes (by mro)
+    and find the first super exception that is pickleable. It does
+    not go below :exc:`Exception` (i.e. it skips :exc:`Exception`,
+    :class:`BaseException` and :class:`object`). If that happens
+    you should use :exc:`UnpickleableException` instead.
+
+    :param exc: An exception instance.
+
+    :returns: the nearest exception if it's not :exc:`Exception` or below,
+        if it is it returns ``None``.
+
+    :rtype: :exc:`Exception`
+
+    """
+
+    mro_ = getattr(exc.__class__, "mro", lambda: [])
+    for supercls in mro_():
+        if is_unwanted_exception_cls(supercls):
+            # only BaseException and object, from here on down,
+            # we don't care about these.
+            return None
+        try:
+            exc_args = getattr(exc, "args", [])
+            superexc = supercls(*exc_args)
+            pickle.dumps(superexc)
+        except:
+            pass
+        else:
+            return superexc
+    return None
+
+
+def create_exception_cls(name, module, parent=None):
+    """Dynamically create an exception class."""
+    if not parent:
+        parent = Exception
+    return subclass_exception(name, parent, module)
+
+
+class UnpickleableExceptionWrapper(Exception):
+    """Wraps unpickleable exceptions.
+
+    :param exc_module: see :attr:`exc_module`.
+
+    :param exc_cls_name: see :attr:`exc_cls_name`.
+
+    :param exc_args: see :attr:`exc_args`
+
+    .. attribute:: exc_module
+
+        The module of the original exception.
+
+    .. attribute:: exc_cls_name
+
+        The name of the original exception class.
+
+    .. attribute:: exc_args
+
+        The arguments for the original exception.
+
+    Example
+
+        >>> try:
+        ...     something_raising_unpickleable_exc()
+        >>> except Exception, e:
+        ...     exc = UnpickleableException(e.__class__.__module__,
+        ...                                 e.__class__.__name__,
+        ...                                 e.args)
+        ...     pickle.dumps(exc) # Works fine.
+
+    """
+
+    def __init__(self, exc_module, exc_cls_name, exc_args):
+        self.exc_module = exc_module
+        self.exc_cls_name = exc_cls_name
+        self.exc_args = exc_args
+        Exception.__init__(self, exc_module, exc_cls_name, exc_args)
+
+
+def get_pickleable_exception(exc):
+    """Make sure exception is pickleable."""
+    nearest = find_nearest_pickleable_exception(exc)
+    if nearest:
+        return nearest
+
+    try:
+        pickle.dumps(deepcopy(exc))
+    except Exception:
+        excwrapper = UnpickleableExceptionWrapper(
+                        exc.__class__.__module__,
+                        exc.__class__.__name__,
+                        getattr(exc, "args", []))
+        return excwrapper
+    return exc
+
+
+def get_pickled_exception(exc):
+    """Get original exception from exception pickled using
+    :meth:`get_pickleable_exception`."""
+    if isinstance(exc, UnpickleableExceptionWrapper):
+        exc_cls = create_exception_cls(exc.exc_cls_name,
+                                       exc.exc_module)
+        return exc_cls(*exc.exc_args)
+    return exc

+ 2 - 2
celery/task/__init__.py

@@ -3,13 +3,13 @@
 Working with tasks and task sets.
 Working with tasks and task sets.
 
 
 """
 """
-from billiard.serialization import pickle
 
 
 from celery.execute import apply_async
 from celery.execute import apply_async
 from celery.registry import tasks
 from celery.registry import tasks
+from celery.serialization import pickle
 from celery.task.base import Task, TaskSet, PeriodicTask, ExecuteRemoteTask
 from celery.task.base import Task, TaskSet, PeriodicTask, ExecuteRemoteTask
-from celery.task.control import discard_all
 from celery.task.builtins import PingTask
 from celery.task.builtins import PingTask
+from celery.task.control import discard_all
 from celery.task.http import HttpDispatchTask
 from celery.task.http import HttpDispatchTask
 
 
 __all__ = ["Task", "TaskSet", "PeriodicTask", "tasks", "discard_all",
 __all__ = ["Task", "TaskSet", "PeriodicTask", "tasks", "discard_all",

+ 1 - 2
celery/task/base.py

@@ -1,8 +1,6 @@
 import sys
 import sys
 from datetime import timedelta
 from datetime import timedelta
 
 
-from billiard.serialization import pickle
-
 from celery import conf
 from celery import conf
 from celery.log import setup_task_logger
 from celery.log import setup_task_logger
 from celery.utils import gen_unique_id, padlist
 from celery.utils import gen_unique_id, padlist
@@ -14,6 +12,7 @@ from celery.backends import default_backend
 from celery.messaging import TaskPublisher, TaskConsumer
 from celery.messaging import TaskPublisher, TaskConsumer
 from celery.messaging import establish_connection as _establish_connection
 from celery.messaging import establish_connection as _establish_connection
 from celery.exceptions import MaxRetriesExceededError, RetryTaskError
 from celery.exceptions import MaxRetriesExceededError, RetryTaskError
+from celery.serialization import pickle
 
 
 from celery.task.schedules import schedule
 from celery.task.schedules import schedule
 
 

+ 4 - 4
celery/tests/test_backends/test_base.py

@@ -2,10 +2,10 @@ import sys
 import types
 import types
 import unittest2 as unittest
 import unittest2 as unittest
 
 
-from billiard.serialization import subclass_exception
-from billiard.serialization import find_nearest_pickleable_exception as fnpe
-from billiard.serialization import UnpickleableExceptionWrapper
-from billiard.serialization import get_pickleable_exception as gpe
+from celery.serialization import subclass_exception
+from celery.serialization import find_nearest_pickleable_exception as fnpe
+from celery.serialization import UnpickleableExceptionWrapper
+from celery.serialization import get_pickleable_exception as gpe
 
 
 from celery import states
 from celery import states
 from celery.backends.base import BaseBackend, KeyValueStoreBackend
 from celery.backends.base import BaseBackend, KeyValueStoreBackend

+ 1 - 1
celery/tests/test_buckets.py

@@ -6,11 +6,11 @@ import time
 import unittest2 as unittest
 import unittest2 as unittest
 from itertools import chain, izip
 from itertools import chain, izip
 
 
-from billiard.utils.functional import curry
 
 
 from celery.task.base import Task
 from celery.task.base import Task
 from celery.utils import timeutils
 from celery.utils import timeutils
 from celery.utils import gen_unique_id
 from celery.utils import gen_unique_id
+from celery.utils.functional import curry
 from celery.worker import buckets
 from celery.worker import buckets
 from celery.registry import TaskRegistry
 from celery.registry import TaskRegistry
 
 

+ 1 - 1
celery/tests/test_pickle.py

@@ -1,6 +1,6 @@
 import unittest2 as unittest
 import unittest2 as unittest
 
 
-from billiard.serialization import pickle
+from celery.serialization import pickle
 
 
 
 
 class RegularException(Exception):
 class RegularException(Exception):

+ 1 - 1
celery/tests/test_routes.py

@@ -1,10 +1,10 @@
 import unittest2 as unittest
 import unittest2 as unittest
 
 
-from billiard.utils.functional import wraps
 
 
 from celery import conf
 from celery import conf
 from celery import routes
 from celery import routes
 from celery.utils import gen_unique_id
 from celery.utils import gen_unique_id
+from celery.utils.functional import wraps
 from celery.exceptions import RouteNotFound
 from celery.exceptions import RouteNotFound
 
 
 
 

+ 3 - 3
celery/tests/test_serialization.py

@@ -7,10 +7,10 @@ from celery.tests.utils import execute_context, mask_modules
 class TestAAPickle(unittest.TestCase):
 class TestAAPickle(unittest.TestCase):
 
 
     def test_no_cpickle(self):
     def test_no_cpickle(self):
-        prev = sys.modules.pop("billiard.serialization")
+        prev = sys.modules.pop("celery.serialization")
         try:
         try:
             def with_cPickle_masked(_val):
             def with_cPickle_masked(_val):
-                from billiard.serialization import pickle
+                from celery.serialization import pickle
                 import pickle as orig_pickle
                 import pickle as orig_pickle
                 self.assertIs(pickle.dumps, orig_pickle.dumps)
                 self.assertIs(pickle.dumps, orig_pickle.dumps)
 
 
@@ -18,4 +18,4 @@ class TestAAPickle(unittest.TestCase):
             execute_context(context, with_cPickle_masked)
             execute_context(context, with_cPickle_masked)
 
 
         finally:
         finally:
-            sys.modules["billiard.serialization"] = prev
+            sys.modules["celery.serialization"] = prev

+ 1 - 1
celery/tests/test_task.py

@@ -4,7 +4,6 @@ from datetime import datetime, timedelta
 
 
 from pyparsing import ParseException
 from pyparsing import ParseException
 
 
-from billiard.utils.functional import wraps
 
 
 from celery import conf
 from celery import conf
 from celery import task
 from celery import task
@@ -12,6 +11,7 @@ from celery import messaging
 from celery.task.schedules import crontab, crontab_parser
 from celery.task.schedules import crontab, crontab_parser
 from celery.utils import timeutils
 from celery.utils import timeutils
 from celery.utils import gen_unique_id
 from celery.utils import gen_unique_id
+from celery.utils.functional import wraps
 from celery.result import EagerResult
 from celery.result import EagerResult
 from celery.execute import send_task
 from celery.execute import send_task
 from celery.backends import default_backend
 from celery.backends import default_backend

+ 1 - 2
celery/tests/test_task_builtins.py

@@ -1,9 +1,8 @@
 import unittest2 as unittest
 import unittest2 as unittest
 
 
-from billiard.serialization import pickle
-
 from celery.task.base import ExecuteRemoteTask
 from celery.task.base import ExecuteRemoteTask
 from celery.task.builtins import PingTask, DeleteExpiredTaskMetaTask
 from celery.task.builtins import PingTask, DeleteExpiredTaskMetaTask
+from celery.serialization import pickle
 
 
 
 
 def some_func(i):
 def some_func(i):

+ 1 - 1
celery/tests/test_task_http.py

@@ -13,10 +13,10 @@ try:
 except ImportError:
 except ImportError:
     from StringIO import StringIO
     from StringIO import StringIO
 
 
-from billiard.utils.functional import wraps
 from anyjson import serialize
 from anyjson import serialize
 
 
 from celery.task import http
 from celery.task import http
+from celery.utils.functional import wraps
 
 
 from celery.tests.utils import eager_tasks, execute_context
 from celery.tests.utils import eager_tasks, execute_context
 
 

+ 1 - 1
celery/tests/test_worker.py

@@ -5,7 +5,6 @@ from multiprocessing import get_logger
 
 
 from carrot.connection import BrokerConnection
 from carrot.connection import BrokerConnection
 from carrot.backends.base import BaseMessage
 from carrot.backends.base import BaseMessage
-from billiard.serialization import pickle
 
 
 from celery import conf
 from celery import conf
 from celery.utils import gen_unique_id
 from celery.utils import gen_unique_id
@@ -16,6 +15,7 @@ from celery.worker.listener import CarrotListener, QoS, RUN
 from celery.worker.scheduler import Scheduler
 from celery.worker.scheduler import Scheduler
 from celery.decorators import task as task_dec
 from celery.decorators import task as task_dec
 from celery.decorators import periodic_task as periodic_task_dec
 from celery.decorators import periodic_task as periodic_task_dec
+from celery.serialization import pickle
 
 
 from celery.tests.utils import execute_context
 from celery.tests.utils import execute_context
 from celery.tests.compat import catch_warnings
 from celery.tests.compat import catch_warnings

+ 2 - 1
celery/tests/utils.py

@@ -5,9 +5,10 @@ import sys
 import __builtin__
 import __builtin__
 from StringIO import StringIO
 from StringIO import StringIO
 
 
-from billiard.utils.functional import wraps
 from nose import SkipTest
 from nose import SkipTest
 
 
+from celery.utils.functional import wraps
+
 
 
 class GeneratorContextManager(object):
 class GeneratorContextManager(object):
     def __init__(self, gen):
     def __init__(self, gen):

+ 1 - 1
celery/utils/__init__.py

@@ -16,10 +16,10 @@ from inspect import getargspec
 from itertools import islice
 from itertools import islice
 
 
 from carrot.utils import rpartition
 from carrot.utils import rpartition
-from billiard.utils.functional import curry
 
 
 from celery.utils.compat import all, any, defaultdict
 from celery.utils.compat import all, any, defaultdict
 from celery.utils.timeutils import timedelta_seconds # was here before
 from celery.utils.timeutils import timedelta_seconds # was here before
+from celery.utils.functional import curry
 
 
 
 
 def noop(*args, **kwargs):
 def noop(*args, **kwargs):

+ 135 - 0
celery/utils/functional.py

@@ -0,0 +1,135 @@
+"""Functional utilities for Python 2.4 compatibility."""
+# License for code in this file that was taken from Python 2.5.
+
+# PYTHON SOFTWARE FOUNDATION LICENSE VERSION 2
+# --------------------------------------------
+#
+# 1. This LICENSE AGREEMENT is between the Python Software Foundation
+# ("PSF"), and the Individual or Organization ("Licensee") accessing and
+# otherwise using this software ("Python") in source or binary form and
+# its associated documentation.
+#
+# 2. Subject to the terms and conditions of this License Agreement, PSF
+# hereby grants Licensee a nonexclusive, royalty-free, world-wide
+# license to reproduce, analyze, test, perform and/or display publicly,
+# prepare derivative works, distribute, and otherwise use Python
+# alone or in any derivative version, provided, however, that PSF's
+# License Agreement and PSF's notice of copyright, i.e., "Copyright (c)
+# 2001, 2002, 2003, 2004, 2005, 2006, 2007 Python Software Foundation;
+# All Rights Reserved" are retained in Python alone or in any derivative
+# version prepared by Licensee.
+#
+# 3. In the event Licensee prepares a derivative work that is based on
+# or incorporates Python or any part thereof, and wants to make
+# the derivative work available to others as provided herein, then
+# Licensee hereby agrees to include in any such work a brief summary of
+# the changes made to Python.
+#
+# 4. PSF is making Python available to Licensee on an "AS IS"
+# basis.  PSF MAKES NO REPRESENTATIONS OR WARRANTIES, EXPRESS OR
+# IMPLIED.  BY WAY OF EXAMPLE, BUT NOT LIMITATION, PSF MAKES NO AND
+# DISCLAIMS ANY REPRESENTATION OR WARRANTY OF MERCHANTABILITY OR FITNESS
+# FOR ANY PARTICULAR PURPOSE OR THAT THE USE OF PYTHON WILL NOT
+# INFRINGE ANY THIRD PARTY RIGHTS.
+#
+# 5. PSF SHALL NOT BE LIABLE TO LICENSEE OR ANY OTHER USERS OF PYTHON
+# FOR ANY INCIDENTAL, SPECIAL, OR CONSEQUENTIAL DAMAGES OR LOSS AS
+# A RESULT OF MODIFYING, DISTRIBUTING, OR OTHERWISE USING PYTHON,
+# OR ANY DERIVATIVE THEREOF, EVEN IF ADVISED OF THE POSSIBILITY THEREOF.
+#
+# 6. This License Agreement will automatically terminate upon a material
+# breach of its terms and conditions.
+#
+# 7. Nothing in this License Agreement shall be deemed to create any
+# relationship of agency, partnership, or joint venture between PSF and
+# Licensee.  This License Agreement does not grant permission to use PSF
+# trademarks or trade name in a trademark sense to endorse or promote
+# products or services of Licensee, or any third party.
+#
+# 8. By copying, installing or otherwise using Python, Licensee
+# agrees to be bound by the terms and conditions of this License
+# Agreement.
+
+### Begin from Python 2.5 functools.py ########################################
+
+# Summary of changes made to the Python 2.5 code below:
+#   * swapped ``partial`` for ``curry`` to maintain backwards-compatibility
+#     in Django.
+#   * Wrapped the ``setattr`` call in ``update_wrapper`` with a try-except
+#     block to make it compatible with Python 2.3, which doesn't allow
+#     assigning to ``__name__``.
+
+# Copyright (c) 2001, 2002, 2003, 2004, 2005, 2006, 2007 Python Software
+# Foundation. All Rights Reserved.
+
+###############################################################################
+
+# update_wrapper() and wraps() are tools to help write
+# wrapper functions that can handle naive introspection
+
+def _compat_curry(fun, *args, **kwargs):
+    """New function with partial application of the given arguments
+    and keywords."""
+
+    def _curried(*addargs, **addkwargs):
+        return fun(*(args+addargs), **dict(kwargs, **addkwargs))
+    return _curried
+
+
+try:
+    from functools import partial as curry
+except ImportError:
+    curry = _compat_curry
+
+WRAPPER_ASSIGNMENTS = ('__module__', '__name__', '__doc__')
+WRAPPER_UPDATES = ('__dict__',)
+def _compat_update_wrapper(wrapper, wrapped, assigned=WRAPPER_ASSIGNMENTS,
+        updated=WRAPPER_UPDATES):
+    """Update a wrapper function to look like the wrapped function
+
+       wrapper is the function to be updated
+       wrapped is the original function
+       assigned is a tuple naming the attributes assigned directly
+       from the wrapped function to the wrapper function (defaults to
+       functools.WRAPPER_ASSIGNMENTS)
+       updated is a tuple naming the attributes off the wrapper that
+       are updated with the corresponding attribute from the wrapped
+       function (defaults to functools.WRAPPER_UPDATES)
+
+    """
+    for attr in assigned:
+        try:
+            setattr(wrapper, attr, getattr(wrapped, attr))
+        except TypeError: # Python 2.3 doesn't allow assigning to __name__.
+            pass
+    for attr in updated:
+        getattr(wrapper, attr).update(getattr(wrapped, attr))
+    # Return the wrapper so this can be used as a decorator via curry()
+    return wrapper
+
+try:
+    from functools import update_wrapper
+except ImportError:
+    update_wrapper = _compat_update_wrapper
+
+
+def _compat_wraps(wrapped, assigned=WRAPPER_ASSIGNMENTS,
+        updated=WRAPPER_UPDATES):
+    """Decorator factory to apply update_wrapper() to a wrapper function
+
+    Returns a decorator that invokes update_wrapper() with the decorated
+    function as the wrapper argument and the arguments to wraps() as the
+    remaining arguments. Default arguments are as for update_wrapper().
+    This is a convenience function to simplify applying curry() to
+    update_wrapper().
+
+    """
+    return curry(update_wrapper, wrapped=wrapped,
+                 assigned=assigned, updated=updated)
+
+try:
+    from functools import wraps
+except ImportError:
+    wraps = _compat_wraps
+
+### End from Python 2.5 functools.py ##########################################

+ 2 - 2
celery/worker/pool.py

@@ -3,11 +3,11 @@
 Process Pools.
 Process Pools.
 
 
 """
 """
-from billiard.pool import Pool, RUN
-from billiard.utils.functional import curry
 
 
 from celery import log
 from celery import log
+from celery.concurrency.processing import Pool, RUN
 from celery.datastructures import ExceptionInfo
 from celery.datastructures import ExceptionInfo
+from celery.utils.functional import curry
 
 
 
 
 class TaskPool(object):
 class TaskPool(object):

+ 0 - 1
contrib/requirements/default.txt

@@ -4,5 +4,4 @@ sqlalchemy
 anyjson
 anyjson
 carrot>=0.10.5
 carrot>=0.10.5
 django-picklefield
 django-picklefield
-billiard>=0.3.0
 pyparsing
 pyparsing

+ 0 - 1
setup.cfg

@@ -20,4 +20,3 @@ requires = python-uuid
            python-dateutil
            python-dateutil
            python-anyjson
            python-anyjson
            python-carrot>=0.10.4
            python-carrot>=0.10.4
-           python-billiard>=0.3.0

+ 0 - 1
setup.py

@@ -46,7 +46,6 @@ install_requires.extend([
     "sqlalchemy",
     "sqlalchemy",
     "anyjson",
     "anyjson",
     "carrot>=0.10.5",
     "carrot>=0.10.5",
-    "billiard>=0.3.0",
     "pyparsing"])
     "pyparsing"])
 
 
 py_version = sys.version_info
 py_version = sys.version_info