Przeglądaj źródła

Restablish cache/tokyo tyrant connection at start of task process.

Ask Solem 16 lat temu
rodzic
commit
4b2511f654
3 zmienionych plików z 31 dodań i 4 usunięć
  1. 8 0
      celery/backends/base.py
  2. 15 4
      celery/backends/tyrant.py
  3. 8 0
      celery/worker.py

+ 8 - 0
celery/backends/base.py

@@ -174,3 +174,11 @@ class BaseBackend(object):
             elif status == "FAILURE":
                 raise self.get_result(task_id)
             timeout_timer.tick()
+
+    def cleanup_process(self):
+        """Cleanup actions to do at the end of a task worker process.
+
+        See :func:`celery.worker.jail`.
+        
+        """
+        pass

+ 15 - 4
celery/backends/tyrant.py

@@ -50,11 +50,22 @@ class Backend(BaseBackend):
                 "set the TT_HOST and TT_PORT settings in your settings.py")
         super(Backend, self).__init__()
         self._cache = {}
+        self._connection = None
 
-    def get_server(self):
+    def open(self):
         """Get :class:`pytyrant.PyTyrant`` instance with the current
         server configuration."""
-        return pytyrant.PyTyrant.open(self.tyrant_host, self.tyrant_port)
+        if not self._connection:
+            self._connection = pytyrant.PyTyrant.open(self.tyrant_host,
+                                                      self.tyrant_port)
+        return self._connection
+
+    def close(self):
+        if self._connection:
+            self._connection.close()
+
+    def process_cleanup(self):
+        self.close()
 
     def _cache_key(self, task_id):
         """Get the cache key for a task by id."""
@@ -67,7 +78,7 @@ class Backend(BaseBackend):
         elif status == "FAILURE":
             result = self.prepare_exception(result)
         meta = {"status": status, "result": pickle.dumps(result)}
-        self.get_server()[self._cache_key(task_id)] = serialize(meta)
+        self.open()[self._cache_key(task_id)] = serialize(meta)
 
     def get_status(self, task_id):
         """Get the status for a task."""
@@ -89,7 +100,7 @@ class Backend(BaseBackend):
         """Get task metadata for a task by id."""
         if task_id in self._cache:
             return self._cache[task_id]
-        meta = self.get_server().get(self._cache_key(task_id))
+        meta = self.open().get(self._cache_key(task_id))
         if not meta:
             return {"status": "PENDING", "result": None}
         meta = deserialize(meta)

+ 8 - 0
celery/worker.py

@@ -72,6 +72,13 @@ def jail(task_id, func, args, kwargs):
     from django.db import connection
     connection.close()
 
+    # Reset cache connection
+    from django.core.cache import cache
+    cache.close()
+
+    # Backend process cleanup
+    default_backend.process_cleanup()
+
     # Convert any unicode keys in the keyword arguments to ascii.
     kwargs = dict([(k.encode("utf-8"), v)
                         for k, v in kwargs.items()])
@@ -84,6 +91,7 @@ def jail(task_id, func, args, kwargs):
         default_backend.mark_as_done(task_id, result)
         return result
 
+    
 
 class TaskWrapper(object):
     """Class wrapping a task to be run.