Browse Source

Abstracted out on_task_init, on_worker_init and the reading of settings so the
client doesn't have to use Django.

Ask Solem 16 years ago
parent
commit
dea1636108

+ 1 - 1
celery/backends/__init__.py

@@ -1,6 +1,6 @@
 """celery.backends"""
 from functools import partial
-from django.conf import settings
+from celery.loaders import settings
 import sys
 
 DEFAULT_BACKEND = "database"

+ 1 - 1
celery/backends/tyrant.py

@@ -8,7 +8,7 @@ except ImportError:
             "The Tokyo Tyrant backend requires the pytyrant library.")
 
 from celery.backends.base import BaseBackend
-from django.conf import settings
+from celery.loaders import settings
 from carrot.messaging import serialize, deserialize
 try:
     import cPickle as pickle

+ 5 - 6
celery/bin/celeryd.py

@@ -68,12 +68,9 @@ try:
     import resource
 except ImportError:
     CAN_DETACH = False
-sys.path.append(os.getcwd())
-django_project_dir = os.environ.get("DJANGO_PROJECT_DIR")
-if django_project_dir:
-    sys.path.append(django_project_dir)
 
-from django.conf import settings
+from celery.loaders import current_loader
+from celery.loaders import settings
 from celery import __version__
 from celery.supervisor import OFASupervisor
 from celery.log import emergency_error
@@ -256,7 +253,9 @@ def run_worker(concurrency=DAEMON_CONCURRENCY, detach=False,
                                 gid=gid)
         context.open()
 
-    discovery.autodiscover()
+    # Run the worker init handler.
+    # (Usually imports task modules and such.)
+    current_loader.on_worker_init()
 
     def run_worker():
         worker = WorkController(concurrency=concurrency,

+ 1 - 1
celery/conf.py

@@ -1,5 +1,5 @@
 """celery.conf"""
-from django.conf import settings
+from celery.loaders import settings
 import logging
 
 DEFAULT_AMQP_EXCHANGE = "celery"

+ 0 - 24
celery/loader/default.py

@@ -1,24 +0,0 @@
-
-
-class Loader(object):
-
-    def __init__(self):
-        self._conf_cache = None
-
-    def read_configuration(self):
-        return dict()
-
-    def on_task_preinit(self, task_id, task):
-        pass
-
-    def on_worker_init(self):
-        imports = self.conf.get("imports", [])
-        for module in imports:
-            __import__(module, [], [], {''})
-
-    @property
-    def conf(self):
-        if not self._conf_cache:
-            self._conf_cache = self.read_configuration()
-        return self._conf_cache
-

+ 3 - 0
celery/loaders/__init__.py

@@ -0,0 +1,3 @@
+from celery.loaders.djangoapp import Loader
+current_loader = Loader()
+settings = current_loader.conf

+ 16 - 0
celery/loaders/base.py

@@ -0,0 +1,16 @@
+
+
+class BaseLoader(object):
+    _conf_cache = None
+
+    def on_task_preinit(self, task_id, task):
+        pass
+
+    def on_worker_init(self):
+        pass
+
+    @property
+    def conf(self):
+        if not self._conf_cache:
+            self._conf_cache = self.read_configuration()
+        return self._conf_cache

+ 15 - 0
celery/loaders/default.py

@@ -0,0 +1,15 @@
+from celery.loaders.base import BaseLoader
+
+
+class Loader(BaseLoader):
+
+    def read_configuration(self):
+        import settings
+        from django.core.management import setup_environ
+        setup_environ(settings)
+        return settings
+
+    def on_worker_init(self):
+        imports = getattr(self.conf, "imports", [])
+        for module in imports:
+            __import__(module, [], [], {''})

+ 2 - 10
celery/loader/django.py → celery/loaders/djangoapp.py

@@ -1,8 +1,7 @@
+from celery.loaders.base import BaseLoader
 
-class Loader(object):
 
-    def __init__(self):
-        self._conf_cache = None
+class Loader(BaseLoader):
 
     def read_configuration(self):
         from django.conf import settings
@@ -33,10 +32,3 @@ class Loader(object):
     def on_worker_init(self):
         from celery.discovery import autodiscover
         autodiscover()
-
-    @property
-    def conf(self):
-        if not self._conf_cache:
-            self._conf_cache = self.read_configuration()
-        return self._conf_cache
-

+ 1 - 1
celery/monitoring.py

@@ -5,7 +5,7 @@
 """
 from carrot.connection import DjangoAMQPConnection
 from celery.messaging import StatsPublisher, StatsConsumer
-from django.conf import settings
+from celery.loaders import settings
 from django.core.cache import cache
 import time
 

+ 4 - 0
celery/worker.py

@@ -1,6 +1,7 @@
 """celery.worker"""
 from carrot.connection import DjangoAMQPConnection
 from celery.messaging import TaskConsumer
+from celery.loaders import current_loader
 from celery.conf import DAEMON_CONCURRENCY, DAEMON_LOG_FILE
 from celery.conf import SEND_CELERY_TASK_ERROR_EMAILS
 from celery.log import setup_logger
@@ -66,6 +67,9 @@ def jail(task_id, task_name, func, args, kwargs):
     ignore_result = getattr(func, "ignore_result", False)
     timer_stat = TaskTimerStats.start(task_id, task_name, args, kwargs)
 
+    # Load task pre-init handler
+    current_loader.on_worker_init()
+
     # Backend process cleanup
     default_backend.process_cleanup()