Browse Source

Hub now patches and disables more mutexes

Fixes typo
Ask Solem 11 years ago
parent
commit
cd7ea50155
3 changed files with 26 additions and 14 deletions
  1. 10 13
      celery/backends/amqp.py
  2. 11 1
      celery/worker/components.py
  3. 5 0
      celery/worker/consumer.py

+ 10 - 13
celery/backends/amqp.py

@@ -11,7 +11,6 @@
 from __future__ import absolute_import
 
 import socket
-import threading
 import time
 
 from collections import deque
@@ -83,7 +82,6 @@ class AMQPBackend(BaseBackend):
         self.queue_arguments = dictfilter({
             'x-expires': maybe_s_to_ms(self.expires),
         })
-        self.mutex = threading.Lock()
 
     def _create_exchange(self, name, type='direct', persistent=True):
         delivery_mode = persistent and 'persistent' or 'transient'
@@ -110,17 +108,16 @@ class AMQPBackend(BaseBackend):
 
     def _store_result(self, task_id, result, status, traceback=None):
         """Send task return value and status."""
-        with self.mutex:
-            with self.app.amqp.producer_pool.acquire(block=True) as pub:
-                pub.publish({'task_id': task_id, 'status': status,
-                             'result': self.encode_result(result, status),
-                             'traceback': traceback,
-                             'children': self.current_task_children()},
-                            exchange=self.exchange,
-                            routing_key=self._routing_key(task_id),
-                            serializer=self.serializer,
-                            retry=True, retry_policy=self.retry_policy,
-                            declare=self.on_reply_declare(task_id))
+        with self.app.amqp.producer_pool.acquire(block=True) as pub:
+            pub.publish({'task_id': task_id, 'status': status,
+                         'result': self.encode_result(result, status),
+                         'traceback': traceback,
+                         'children': self.current_task_children()},
+                        exchange=self.exchange,
+                        routing_key=self._routing_key(task_id),
+                        serializer=self.serializer,
+                        retry=True, retry_policy=self.retry_policy,
+                        declare=self.on_reply_declare(task_id))
         return result
 
     def on_reply_declare(self, task_id):

+ 11 - 1
celery/worker/components.py

@@ -63,9 +63,19 @@ class Hub(bootsteps.StartStopStep):
 
     def create(self, w):
         w.hub = hub.Hub(w.timer)
+        self._patch_thread_primitives(w)
+        return w.hub
+
+    def _patch_thread_primitives(self, w):
         # make clock use dummy lock
         w.app.clock.lock = hub.DummyLock()
-        return w.hub
+        # multiprocessing's ApplyResult uses this lock.
+        try:
+            from billiard import pool
+        except ImportError:
+            pass
+        else:
+            pool.Lock = hub.DummyLock
 
 
 class Queues(bootsteps.Step):

+ 5 - 0
celery/worker/consumer.py

@@ -41,6 +41,7 @@ from celery.utils.text import truncate
 from celery.utils.timeutils import humanize_seconds, rate
 
 from . import heartbeat, loops, pidbox
+from .hub import DummyLock
 from .state import task_reserved, maybe_shutdown, revoked, reserved_requests
 
 try:
@@ -717,4 +718,8 @@ class Evloop(bootsteps.StartStopStep):
     last = True
 
     def start(self, c):
+        self.patch_all(c)
         c.loop(*c.loop_args())
+
+    def patch_all(self, c):
+        c.qos._mutex = DummyLock()