Browse Source

More fixes

Ask Solem 13 years ago
parent
commit
81acac32e2

+ 7 - 5
celery/app/amqp.py

@@ -157,10 +157,10 @@ class TaskPublisher(messaging.Publisher):
         self.utc = kwargs.pop("enable_utc", False)
         super(TaskPublisher, self).__init__(*args, **kwargs)
 
-    def declare(self):
-        if self.exchange.name and \
-                not declaration_cached(self.exchange, self.channel):
-            super(TaskPublisher, self).declare()
+    #def declare(self):
+    #    if self.exchange.name and \
+    #       #            not declaration_cached(self.exchange, self.channel):
+    #       #super(TaskPublisher, self).declare()
 
     def _get_queue(self, name):
         if name not in self._queue_cache:
@@ -201,7 +201,9 @@ class TaskPublisher(messaging.Publisher):
         # declare entities
         if queue:
             self._declare_queue(queue, retry, _retry_policy)
-        self._declare_exchange(exchange, exchange_type, retry, _retry_policy)
+        else:
+            self._declare_exchange(exchange, exchange_type,
+                                   retry, _retry_policy)
 
         task_id = task_id or uuid()
         task_args = task_args or []

+ 0 - 1
celery/app/builtins.py

@@ -125,7 +125,6 @@ def add_group_task(app):
 @builtin_task
 def add_chain_task(app):
     from celery.canvas import maybe_subtask
-    from celery.result import EagerResult
 
     class Chain(app.Task):
         name = "celery.chain"

+ 0 - 3
celery/app/utils.py

@@ -5,11 +5,8 @@ import kombu
 import os
 import platform as _platform
 
-from operator import add
-
 from celery import datastructures
 from celery import platforms
-from celery.utils.functional import maybe_list
 from celery.utils.text import pretty
 from celery.utils.imports import qualname
 

+ 0 - 1
celery/apps/beat.py

@@ -1,7 +1,6 @@
 # -*- coding: utf-8 -*-
 from __future__ import absolute_import
 
-import atexit
 import socket
 import sys
 

+ 0 - 2
celery/apps/worker.py

@@ -1,7 +1,6 @@
 # -*- coding: utf-8 -*-
 from __future__ import absolute_import
 
-import atexit
 import logging
 import os
 import socket
@@ -228,7 +227,6 @@ class Worker(configurated):
                 hostname=self.hostname)
 
 
-
 def _shutdown_handler(worker, sig="TERM", how="stop", exc=SystemExit,
         callback=None, types={"terminate": "Cold", "stop": "Warm"}):
 

+ 1 - 3
celery/bin/celery.py

@@ -6,19 +6,17 @@ import anyjson
 import sys
 
 from importlib import import_module
-from optparse import OptionParser, make_option as Option
 from pprint import pformat
 from textwrap import wrap
 
 from celery import __version__
-from celery.app import app_or_default
 from celery.platforms import EX_OK, EX_FAILURE, EX_UNAVAILABLE, EX_USAGE
 from celery.utils import term
 from celery.utils.imports import symbol_by_name
 from celery.utils.text import pluralize
 from celery.utils.timeutils import maybe_iso8601
 
-from celery.bin.base import Command as BaseCommand
+from celery.bin.base import Command as BaseCommand, Option
 
 HELP = """
 Type '%(prog_name)s <command> --help' for help using

+ 1 - 0
celery/bin/celeryd.py

@@ -179,6 +179,7 @@ class WorkerCommand(Command):
             Option("--no-execv", action="store_true", default=False),
         )
 
+
 def main():
     # Fix for setuptools generated scripts, so that it will
     # work with multiprocessing fork emulation.

+ 0 - 2
celery/events/snapshot.py

@@ -15,8 +15,6 @@
 """
 from __future__ import absolute_import
 
-import atexit
-
 from kombu.utils.limits import TokenBucket
 
 from celery import platforms

+ 1 - 1
celery/tests/bin/test_celeryd.py

@@ -205,7 +205,7 @@ class test_Worker(AppCase):
         worker2 = self.Worker(include="some.module,another.package")
         self.assertListEqual(worker2.include,
                 ["some.module", "another.package"])
-        worker3 = self.Worker(include=["os", "sys"])
+        self.Worker(include=["os", "sys"])
 
     @disable_stdouts
     def test_unknown_loglevel(self):

+ 14 - 18
celery/worker/__init__.py

@@ -18,7 +18,6 @@ import atexit
 import logging
 import socket
 import sys
-import threading
 import traceback
 
 from billiard import forking_enable
@@ -31,6 +30,7 @@ from celery.exceptions import SystemTerminate
 from celery.utils.functional import noop
 from celery.utils.imports import qualname, reload_from_cwd
 from celery.utils.log import get_logger
+from celery.utils.threads import Event
 
 from . import abstract
 from . import state
@@ -150,8 +150,14 @@ class Timers(abstract.Component):
             w.eta_scheduler_cls = w.pool.Timer
         w.scheduler = self.instantiate(w.eta_scheduler_cls,
                                 precision=w.eta_scheduler_precision,
-                                on_error=w.on_timer_error,
-                                on_tick=w.on_timer_tick)
+                                on_error=self.on_timer_error,
+                                on_tick=self.on_timer_tick)
+
+    def on_timer_error(self, einfo):
+        logger.error("Timer error: %r", einfo[1], exc_info=einfo)
+
+    def on_timer_tick(self, delay):
+        logger.debug("Scheduler wake-up! Next eta %s secs.", delay)
 
 
 class StateDB(abstract.Component):
@@ -212,7 +218,7 @@ class WorkController(configurated):
         # running in the same process.
         set_default_app(self.app)
 
-        self._shutdown_complete = threading.Event()
+        self._shutdown_complete = Event()
         self.setup_defaults(kwargs, namespace="celeryd")
         self.app.select_queues(queues)  # select queues subset.
 
@@ -221,7 +227,6 @@ class WorkController(configurated):
         self.hostname = hostname or socket.gethostname()
         self.ready_callback = ready_callback
         self._finalize = Finalize(self, self.stop, exitpriority=1)
-        self._finalize_db = None
 
         # Initialize boot steps
         self.pool_cls = _concurrency.get_implementation(self.pool_cls)
@@ -251,15 +256,13 @@ class WorkController(configurated):
         # makes sure all greenthreads have exited.
         self._shutdown_complete.wait()
 
-    def process_task(self, request):
+    def process_task(self, req):
         """Process task by sending it to the pool of workers."""
         try:
-            request.task.execute(request, self.pool,
-                                 self.loglevel, self.logfile)
+            req.task.execute(req, self.pool, self.loglevel, self.logfile)
         except Exception, exc:
-            logger.critical("Internal error %s: %s\n%s",
-                            exc.__class__, exc, traceback.format_exc(),
-                            exc_info=True)
+            logger.critical("Internal error: %r\n%s",
+                            exc, traceback.format_exc(), exc_info=True)
         except SystemTerminate:
             self.terminate()
             raise
@@ -288,7 +291,6 @@ class WorkController(configurated):
             self._state = self.TERMINATE
             self._shutdown_complete.set()
             return
-
         self._state = self.CLOSE
 
         for component in reversed(self.components):
@@ -317,12 +319,6 @@ class WorkController(configurated):
                 reload_from_cwd(sys.modules[module], reloader)
         self.pool.restart()
 
-    def on_timer_error(self, einfo):
-        logger.error("Timer error: %r", einfo[1], exc_info=einfo)
-
-    def on_timer_tick(self, delay):
-        logger.debug("Scheduler wake-up! Next eta %s secs.", delay)
-
     @property
     def state(self):
         return state

+ 13 - 17
celery/worker/autoscale.py

@@ -40,9 +40,8 @@ class WorkerComponent(StartStopComponent):
         w.autoscaler = None
 
     def create(self, w):
-        scaler = w.autoscaler = self.instantiate(w.autoscaler_cls, w.pool,
-                                    max_concurrency=w.max_concurrency,
-                                    min_concurrency=w.min_concurrency)
+        scaler = w.autoscaler = self.instantiate(
+            w.autoscaler_cls, w.pool, w.max_concurrency, w.min_concurrency)
         return scaler
 
 
@@ -61,14 +60,12 @@ class Autoscaler(bgThread):
 
     def body(self):
         with self.mutex:
-            current = min(self.qty, self.max_concurrency)
-            if current > self.processes:
-                self.scale_up(current - self.processes)
-            elif current < self.processes:
-                self.scale_down(
-                    (self.processes - current) - self.min_concurrency)
+            cur = min(self.qty, self.max_concurrency)
+            if cur > procs:
+                self.scale_up(cur - procs)
+            elif cur < procs:
+                self.scale_down((self.processes - cur) - self.min_concurrency)
         sleep(1.0)
-    scale = body  # XXX compat
 
     def update(self, max=None, min=None):
         with self.mutex:
@@ -101,6 +98,12 @@ class Autoscaler(bgThread):
         self._last_action = time()
         return self._grow(n)
 
+    def scale_down(self, n):
+        if n and self._last_action and (
+                time() - self._last_action > self.keepalive):
+            self._last_action = time()
+            return self._shrink(n)
+
     def _grow(self, n):
         info("Scaling up %s processes.", n)
         self.pool.grow(n)
@@ -114,13 +117,6 @@ class Autoscaler(bgThread):
         except Exception, exc:
             error("Autoscaler: scale_down: %r", exc, exc_info=True)
 
-    def scale_down(self, n):
-        if not self._last_action or not n:
-            return
-        if time() - self._last_action > self.keepalive:
-            self._last_action = time()
-            self._shrink(n)
-
     def info(self):
         return {"max": self.max_concurrency,
                 "min": self.min_concurrency,