Browse Source

Now works with Windows again!

Ask Solem 15 years ago
parent
commit
96e01cce6c

+ 8 - 3
bin/celeryd

@@ -1,9 +1,14 @@
 #!/usr/bin/env python
 #!/usr/bin/env python
 import sys
 import sys
-if not '' in sys.path:
-    sys.path.insert(0, '')
+    if not '' in sys.path:
+sys.path.insert(0, '')
 from celery.bin.celeryd import run_worker, parse_options
 from celery.bin.celeryd import run_worker, parse_options
 
 
-if __name__ == "__main__":
+def main():
+    import multiprocessing
+    multiprocessing.freeze_support()
     options = parse_options(sys.argv[1:])
     options = parse_options(sys.argv[1:])
     run_worker(**vars(options))
     run_worker(**vars(options))
+
+if __name__ == "__main__":
+    main()

+ 1 - 1
celery/bin/celerybeat.py

@@ -67,7 +67,7 @@ def run_clockservice(loglevel=conf.CELERYBEAT_LOG_LEVEL,
     # Run the worker init handler.
     # Run the worker init handler.
     # (Usually imports task modules and such.)
     # (Usually imports task modules and such.)
     from celery.loaders import current_loader
     from celery.loaders import current_loader
-    current_loader().on_worker_init()
+    current_loader().init_worker()
 
 
 
 
     # Dump configuration to screen so we have some basic information
     # Dump configuration to screen so we have some basic information

+ 1 - 2
celery/bin/celeryd.py

@@ -178,7 +178,7 @@ class Worker(object):
     def worker_init(self):
     def worker_init(self):
         # Run the worker init handler.
         # Run the worker init handler.
         # (Usually imports task modules and such.)
         # (Usually imports task modules and such.)
-        self.loader.on_worker_init()
+        self.loader.init_worker()
 
 
     def tasklist(self, include_builtins=True):
     def tasklist(self, include_builtins=True):
         from celery.registry import tasks
         from celery.registry import tasks
@@ -292,5 +292,4 @@ def main():
     return run_worker(**vars(options))
     return run_worker(**vars(options))
 
 
 if __name__ == "__main__":
 if __name__ == "__main__":
-    multiprocessing.freeze_support()
     main()
     main()

+ 8 - 0
celery/loaders/base.py

@@ -1,3 +1,4 @@
+BUILTIN_MODULES = ["celery.task"]
 
 
 
 
 class BaseLoader(object):
 class BaseLoader(object):
@@ -17,6 +18,7 @@ class BaseLoader(object):
 
 
     """
     """
     _conf_cache = None
     _conf_cache = None
+    worker_initialized = False
 
 
     def on_task_init(self, task_id, task):
     def on_task_init(self, task_id, task):
         """This method is called before a task is executed."""
         """This method is called before a task is executed."""
@@ -31,8 +33,14 @@ class BaseLoader(object):
 
 
     def import_default_modules(self):
     def import_default_modules(self):
         imports = getattr(self.conf, "CELERY_IMPORTS", None) or []
         imports = getattr(self.conf, "CELERY_IMPORTS", None) or []
+        imports = set(list(imports) + BUILTIN_MODULES)
         return map(self.import_task_module, imports)
         return map(self.import_task_module, imports)
 
 
+    def init_worker(self):
+        if not self.worker_initialized:
+            self.worker_initialized = True
+            self.on_worker_init()
+
     @property
     @property
     def conf(self):
     def conf(self):
         """Loader configuration."""
         """Loader configuration."""

+ 2 - 1
celery/tests/test_loaders.py

@@ -2,6 +2,7 @@ import os
 import sys
 import sys
 import unittest2 as unittest
 import unittest2 as unittest
 
 
+from celery import task
 from celery import loaders
 from celery import loaders
 from celery.loaders import base
 from celery.loaders import base
 from celery.loaders import djangoapp
 from celery.loaders import djangoapp
@@ -60,7 +61,7 @@ class TestLoaderBase(unittest.TestCase):
 
 
     def test_import_default_modules(self):
     def test_import_default_modules(self):
         self.assertSameElements(self.loader.import_default_modules(),
         self.assertSameElements(self.loader.import_default_modules(),
-                                [os, sys])
+                                [os, sys, task])
 
 
 
 
 class TestDjangoLoader(unittest.TestCase):
 class TestDjangoLoader(unittest.TestCase):

+ 9 - 1
celery/worker/__init__.py

@@ -28,10 +28,18 @@ def process_initializer():
     # There seems to a bug in multiprocessing (backport?)
     # There seems to a bug in multiprocessing (backport?)
     # when detached, where the worker gets EOFErrors from time to time
     # when detached, where the worker gets EOFErrors from time to time
     # and the logger is left from the parent process causing a crash.
     # and the logger is left from the parent process causing a crash.
-    platform.reset_signal("SIGTERM")
     _hijack_multiprocessing_logger()
     _hijack_multiprocessing_logger()
+
+    platform.reset_signal("SIGTERM")
     platform.set_mp_process_title("celeryd")
     platform.set_mp_process_title("celeryd")
 
 
+    # This is for windows and other platforms not supporting
+    # fork(). Note that init_worker makes sure it's only
+    # run once per process.
+    from celery.loaders import current_loader
+    current_loader().init_worker()
+
+
 
 
 class WorkController(object):
 class WorkController(object):
     """Executes tasks waiting in the task queue.
     """Executes tasks waiting in the task queue.