Browse Source

Organizes Django fixups by worker/client

Ask Solem 11 years ago
parent
commit
ba0bbb3702
2 changed files with 48 additions and 34 deletions
  1. 4 3
      celery/app/base.py
  2. 44 31
      celery/fixups/django.py

+ 4 - 3
celery/app/base.py

@@ -84,6 +84,7 @@ class Celery(object):
     control_cls = 'celery.app.control:Control'
     task_cls = 'celery.app.task:Task'
     registry_cls = TaskRegistry
+    _fixups = None
     _pool = None
     builtin_fixups = BUILTIN_FIXUPS
 
@@ -131,10 +132,10 @@ class Celery(object):
         if include:
             self._preconf['CELERY_IMPORTS'] = include
 
-        # Apply fixups.
+        # - Apply fixups.
         self.fixups = set(self.builtin_fixups) if fixups is None else fixups
-        for fixup in self.fixups:
-            symbol_by_name(fixup)(self)
+        # ...store fixup instances in _fixups to keep weakrefs alive.
+        self._fixups = [symbol_by_name(fixup)(self) for fixup in self.fixups]
 
         if self.set_as_current:
             self.set_current()

+ 44 - 31
celery/fixups/django.py

@@ -4,7 +4,7 @@ import os
 import sys
 import warnings
 
-from kombu.utils import symbol_by_name
+from kombu.utils import cached_property, symbol_by_name
 
 from datetime import datetime
 from importlib import import_module
@@ -19,7 +19,6 @@ Environment variable DJANGO_SETTINGS_MODULE is defined
 but Django is not installed.  Will not apply Django fixups!
 """
 
-
 def _maybe_close_fd(fh):
     try:
         os.close(fh.fileno())
@@ -40,23 +39,52 @@ def fixup(app, env='DJANGO_SETTINGS_MODULE'):
 
 
 class DjangoFixup(object):
-    _db_recycles = 0
 
     def __init__(self, app):
         self.app = app
         self.app.set_default()
 
+    def install(self):
+        # Need to add project directory to path
+        sys.path.append(os.getcwd())
+
+        self.app.loader.now = self.now
+        self.app.loader.mail_admins = self.mail_admins
+
+        signals.worker_init.connect(self.on_worker_init)
+        return self
+
+    def on_worker_init(self, **kwargs):
+        # keep reference
+        self._worker_fixup = DjangoWorkerFixup(self.app).install()
+
+    def now(self, utc=False):
+        return datetime.utcnow() if utc else self._now()
+
+    def mail_admins(self, subject, body, fail_silently=False, **kwargs):
+        return self._mail_admins(subject, body, fail_silently=fail_silently)
+
+    @cached_property
+    def _mail_admins(self):
+        return symbol_by_name('django.core.mail:mail_admins')
+
+    @cached_property
+    def _now(self):
+        try:
+            return symbol_by_name('django.utils.timezone:now')
+        except (AttributeError, ImportError):  # pre django-1.4
+            return datetime.now
+
+
+class DjangoWorkerFixup(object):
+    _db_recycles = 0
+
+    def __init__(self, app):
+        self.app = app
         self.db_reuse_max = self.app.conf.get('CELERY_DB_REUSE_MAX', None)
         self._db = import_module('django.db')
         self._cache = import_module('django.core.cache')
         self._settings = symbol_by_name('django.conf:settings')
-        self._mail_admins = symbol_by_name('django.core.mail:mail_admins')
-
-        # Current time and date
-        try:
-            self._now = symbol_by_name('django.utils.timezone:now')
-        except ImportError:  # pre django-1.4
-            self._now = datetime.now  # noqa
 
         # Database-related exceptions.
         DatabaseError = symbol_by_name('django.db:DatabaseError')
@@ -104,35 +132,20 @@ class DjangoFixup(object):
         )
 
     def install(self):
-        # Need to add project directory to path
-        sys.path.append(os.getcwd())
-        signals.beat_embedded_init.connect(self.close_database)
-        signals.worker_ready.connect(self.on_worker_ready)
-        signals.task_prerun.connect(self.on_task_prerun)
-        signals.task_postrun.connect(self.on_task_postrun)
-        signals.worker_init.connect(self.on_worker_init)
-        signals.worker_process_init.connect(self.on_worker_process_init)
-
-        self.app.loader.now = self.now
-        self.app.loader.mail_admins = self.mail_admins
-
-        return self
-
-    def now(self, utc=False):
-        return datetime.utcnow() if utc else self._now()
-
-    def mail_admins(self, subject, body, fail_silently=False, **kwargs):
-        return self._mail_admins(subject, body, fail_silently=fail_silently)
-
-    def on_worker_init(self, **kwargs):
         """Called when the worker starts.
 
         Automatically discovers any ``tasks.py`` files in the applications
         listed in ``INSTALLED_APPS``.
 
         """
+        signals.beat_embedded_init.connect(self.close_database)
+        signals.worker_ready.connect(self.on_worker_ready)
+        signals.task_prerun.connect(self.on_task_prerun)
+        signals.task_postrun.connect(self.on_task_postrun)
+        signals.worker_process_init.connect(self.on_worker_process_init)
         self.close_database()
         self.close_cache()
+        return self
 
     def on_worker_process_init(self, **kwargs):
         # the parent process may have established these,