Browse Source

Merge branch 'master' into 2.5

Ask Solem 13 years ago
parent
commit
a3041a0d6e

+ 4 - 9
celery/backends/amqp.py

@@ -83,9 +83,9 @@ class AMQPBackend(BaseDictBackend):
                           auto_delete=self.auto_delete,
                           queue_arguments=self.queue_arguments)
 
-    def _create_producer(self, task_id, channel):
-        self._create_binding(task_id)(channel).declare()
-        return self.Producer(channel, exchange=self.exchange,
+    def _create_producer(self, task_id, connection):
+        self._create_binding(task_id)(connection.default_channel).declare()
+        return self.Producer(connection, exchange=self.exchange,
                              routing_key=task_id.replace("-", ""),
                              serializer=self.serializer)
 
@@ -94,12 +94,7 @@ class AMQPBackend(BaseDictBackend):
 
     def _publish_result(self, connection, task_id, meta):
         # cache single channel
-        if connection._default_channel is not None and \
-                connection._default_channel.connection is None:
-            connection.maybe_close_channel(connection._default_channel)
-        channel = connection.default_channel
-
-        self._create_producer(task_id, channel).publish(meta)
+        self._create_producer(task_id, connection).publish(meta)
 
     def revive(self, channel):
         pass

+ 13 - 1
celery/bin/celeryctl.py

@@ -7,6 +7,7 @@ if __name__ == "__main__" and __package__ is None:
 
 import sys
 
+from importlib import import_module
 from optparse import OptionParser, make_option as Option
 from pprint import pformat
 from textwrap import wrap
@@ -372,11 +373,22 @@ class shell(Command):
                 Option("--without-tasks", "-T", action="store_true",
                     dest="without_tasks", default=False,
                     help="Don't add tasks to locals."),
+                Option("--eventlet", action="store_true",
+                    dest="eventlet", default=False,
+                    help="Use eventlet."),
+                Option("--gevent", action="store_true",
+                    dest="gevent", default=False,
+                    help="Use gevent."),
     )
 
     def run(self, force_ipython=False, force_bpython=False,
-            force_python=False, without_tasks=False, **kwargs):
+            force_python=False, without_tasks=False, eventlet=False,
+            gevent=False, **kwargs):
         from .. import registry
+        if eventlet:
+            import_module("celery.concurrency.eventlet")
+        if gevent:
+            import_module("celery.concurrency.gevent")
         self.app.loader.import_default_modules()
         self.locals = {"celery": self.app}
 

+ 2 - 2
celery/bin/celeryd.py

@@ -96,8 +96,8 @@ class WorkerCommand(Command):
         # Pools like eventlet/gevent needs to patch libs as early
         # as possible.
         from celery import concurrency
-        kwargs["pool"] = concurrency.get_implementation(
-                    kwargs.get("pool") or self.app.conf.CELERYD_POOL)
+        kwargs["pool_cls"] = concurrency.get_implementation(
+                    kwargs.get("pool_cls") or self.app.conf.CELERYD_POOL)
         return self.app.Worker(**kwargs).run()
 
     def get_options(self):

+ 4 - 4
celery/concurrency/eventlet.py

@@ -2,16 +2,16 @@
 from __future__ import absolute_import
 
 import os
-import sys
-
-from time import time
-
 if not os.environ.get("EVENTLET_NOPATCH"):
     import eventlet
     import eventlet.debug
     eventlet.monkey_patch()
     eventlet.debug.hub_prevent_multiple_readers(False)
 
+import sys
+
+from time import time
+
 from .. import signals
 from ..utils import timer2
 

+ 4 - 4
celery/concurrency/gevent.py

@@ -2,14 +2,14 @@
 from __future__ import absolute_import
 
 import os
-import sys
-
-from time import time
-
 if not os.environ.get("GEVENT_NOPATCH"):
     from gevent import monkey
     monkey.patch_all()
 
+import sys
+
+from time import time
+
 from ..utils import timer2
 
 from .base import apply_target, BasePool

+ 1 - 1
celery/concurrency/processes/pool.py

@@ -562,13 +562,13 @@ class Pool(object):
 
         self._pool = []
         self._poolctrl = {}
+        self._putlock = LaxBoundedSemaphore(self._processes)
         for i in range(processes):
             self._create_worker_process()
 
         self._worker_handler = self.Supervisor(self)
         self._worker_handler.start()
 
-        self._putlock = LaxBoundedSemaphore(self._processes)
         self._task_handler = self.TaskHandler(self._taskqueue,
                                               self._quick_put,
                                               self._outqueue,