浏览代码

Merge branch 'master' into autopool

Conflicts:
	celery/backends/amqp.py
Ask Solem 14 年之前
父节点
当前提交
064a370cbd

+ 8 - 10
celery/app/base.py

@@ -244,22 +244,20 @@ class BaseApp(object):
         return ConfigurationView({},
         return ConfigurationView({},
                 [self.prepare_config(self.loader.conf), deepcopy(DEFAULTS)])
                 [self.prepare_config(self.loader.conf), deepcopy(DEFAULTS)])
 
 
-    def _set_pool(self):
-        self._pool = self.broker_connection().Pool(self.conf.BROKER_POOL_LIMIT)
-        self._pool.owner_pid = os.getpid()
-
-    def _reset_after_fork(self):
+    def _after_fork(self, obj_):
         if self._pool:
         if self._pool:
             self._pool.force_close_all()
             self._pool.force_close_all()
+            self._pool = None
 
 
     @property
     @property
     def pool(self):
     def pool(self):
         if self._pool is None:
         if self._pool is None:
-            self._set_pool()
-        elif os.getpid() != self._pool.owner_pid:
-            print("-- RESET POOL AFTER FORK -- ")
-            self._reset_after_fork()
-            self._set_pool()
+            try:
+                from multiprocessing.util import register_after_fork
+                register_after_fork(self, self._after_fork)
+            except ImportError:
+                pass
+            self._pool = self.broker_connection().Pool(self.conf.BROKER_POOL_LIMIT)
         return self._pool
         return self._pool
 
 
     @cached_property
     @cached_property

+ 1 - 1
celery/app/defaults.py

@@ -115,7 +115,7 @@ NAMESPACES = {
         "LOG_COLOR": Option(type="bool"),
         "LOG_COLOR": Option(type="bool"),
         "LOG_LEVEL": Option("WARN"),
         "LOG_LEVEL": Option("WARN"),
         "LOG_FILE": Option(),
         "LOG_FILE": Option(),
-        "MEDIATOR": Option("celery.worker.controllers.Mediator"),
+        "MEDIATOR": Option("celery.worker.mediator.Mediator"),
         "MAX_TASKS_PER_CHILD": Option(type="int"),
         "MAX_TASKS_PER_CHILD": Option(type="int"),
         "POOL": Option(DEFAULT_POOL),
         "POOL": Option(DEFAULT_POOL),
         "POOL_PUTLOCKS": Option(True, type="bool"),
         "POOL_PUTLOCKS": Option(True, type="bool"),

+ 38 - 51
celery/beat.py

@@ -1,8 +1,3 @@
-"""
-
-Periodic Task Scheduler
-
-"""
 import errno
 import errno
 import os
 import os
 import time
 import time
@@ -37,38 +32,37 @@ class SchedulingError(Exception):
 class ScheduleEntry(object):
 class ScheduleEntry(object):
     """An entry in the scheduler.
     """An entry in the scheduler.
 
 
-    :param name: see :attr:`name`.
-    :param schedule: see :attr:`schedule`.
-    :param args: see :attr:`args`.
-    :param kwargs: see :attr:`kwargs`.
+    :keyword name: see :attr:`name`.
+    :keyword schedule: see :attr:`schedule`.
+    :keyword args: see :attr:`args`.
+    :keyword kwargs: see :attr:`kwargs`.
+    :keyword options: see :attr:`options`.
     :keyword last_run_at: see :attr:`last_run_at`.
     :keyword last_run_at: see :attr:`last_run_at`.
     :keyword total_run_count: see :attr:`total_run_count`.
     :keyword total_run_count: see :attr:`total_run_count`.
+    :keyword relative: Is the time relative to when the server starts?
 
 
-    .. attribute:: name
-
-        The task name.
-
-    .. attribute:: schedule
-
-        The schedule (run_every/crontab)
-
-    .. attribute:: args
-
-        Args to apply.
+    """
 
 
-    .. attribute:: kwargs
+    #: The task name
+    name = None
 
 
-        Keyword arguments to apply.
+    #: The schedule (run_every/crontab)
+    schedule = None
 
 
-    .. attribute:: last_run_at
+    #: Positional arguments to apply.
+    args = None
 
 
-        The time and date of when this task was last run.
+    #: Keyword arguments to apply.
+    kwargs = None
 
 
-    .. attribute:: total_run_count
+    #: Task execution options.
+    options = None
 
 
-        Total number of times this periodic task has been executed.
+    #: The time and date of when this task was last scheduled.
+    last_run_at = None
 
 
-    """
+    #: Total number of times this task has been scheduled.
+    total_run_count = 0
 
 
     def __init__(self, name=None, task=None, last_run_at=None,
     def __init__(self, name=None, task=None, last_run_at=None,
             total_run_count=None, schedule=None, args=(), kwargs={},
             total_run_count=None, schedule=None, args=(), kwargs={},
@@ -112,34 +106,29 @@ class ScheduleEntry(object):
         return vars(self).iteritems()
         return vars(self).iteritems()
 
 
     def __repr__(self):
     def __repr__(self):
-        return "<Entry: %s %s(*%s, **%s) {%s}>" % (self.name,
-                                                   self.task,
-                                                   self.args,
-                                                   self.kwargs,
-                                                   self.schedule)
+        return "<Entry: %s %s(*%s, **%s) {%s}>" % (
+                self.name, self.task, self.args, self.kwargs, self.schedule)
 
 
 
 
 class Scheduler(object):
 class Scheduler(object):
     """Scheduler for periodic tasks.
     """Scheduler for periodic tasks.
 
 
     :keyword schedule: see :attr:`schedule`.
     :keyword schedule: see :attr:`schedule`.
-    :keyword logger:  see :attr:`logger`.
+    :keyword logger: see :attr:`logger`.
     :keyword max_interval: see :attr:`max_interval`.
     :keyword max_interval: see :attr:`max_interval`.
 
 
-    .. attribute:: schedule
-
-        The schedule dict/shelve.
+    """
 
 
-    .. attribute:: logger
+    Entry = ScheduleEntry
 
 
-        The logger to use.
+    #: The schedule dict/shelve.
+    schedule = None
 
 
-    .. attribute:: max_interval
+    #: Current logger.
+    logger = None
 
 
-        Maximum time to sleep between re-checking the schedule.
-
-    """
-    Entry = ScheduleEntry
+    #: Maximum time to sleep between re-checking the schedule.
+    max_interval = 1
 
 
     def __init__(self, schedule=None, logger=None, max_interval=None,
     def __init__(self, schedule=None, logger=None, max_interval=None,
             app=None, Publisher=None, lazy=False, **kwargs):
             app=None, Publisher=None, lazy=False, **kwargs):
@@ -301,28 +290,27 @@ class PersistentScheduler(Scheduler):
         Scheduler.__init__(self, *args, **kwargs)
         Scheduler.__init__(self, *args, **kwargs)
 
 
     def _remove_db(self):
     def _remove_db(self):
-        for suffix in "", ".db", ".dat":
+        for suffix in "", ".db", ".dat", ".bak", ".dir":
             try:
             try:
                 os.remove(self.schedule_filename + suffix)
                 os.remove(self.schedule_filename + suffix)
             except OSError, exc:
             except OSError, exc:
                 if exc.errno != errno.ENOENT:
                 if exc.errno != errno.ENOENT:
                     raise
                     raise
-            else:
-                break
 
 
     def setup_schedule(self):
     def setup_schedule(self):
         try:
         try:
             self._store = self.persistence.open(self.schedule_filename,
             self._store = self.persistence.open(self.schedule_filename,
                                                 writeback=True)
                                                 writeback=True)
+            entries = self._store.setdefault("entries", {})
         except Exception, exc:
         except Exception, exc:
             self.logger.error("Removing corrupted schedule file %r: %r" % (
             self.logger.error("Removing corrupted schedule file %r: %r" % (
                 self.schedule_filename, exc))
                 self.schedule_filename, exc))
             self._remove_db()
             self._remove_db()
             self._store = self.persistence.open(self.schedule_filename,
             self._store = self.persistence.open(self.schedule_filename,
                                                 writeback=True)
                                                 writeback=True)
-
-        if "__version__" not in self._store:
-            self._store.clear()   # remove schedule at 2.2.2 upgrade.
+        else:
+            if "__version__" not in self._store:
+                self._store.clear()   # remove schedule at 2.2.2 upgrade.
         entries = self._store.setdefault("entries", {})
         entries = self._store.setdefault("entries", {})
         self.merge_inplace(self.app.conf.CELERYBEAT_SCHEDULE)
         self.merge_inplace(self.app.conf.CELERYBEAT_SCHEDULE)
         self.install_default_entries(self.schedule)
         self.install_default_entries(self.schedule)
@@ -352,8 +340,7 @@ class PersistentScheduler(Scheduler):
 class Service(object):
 class Service(object):
     scheduler_cls = PersistentScheduler
     scheduler_cls = PersistentScheduler
 
 
-    def __init__(self, logger=None,
-            max_interval=None, schedule_filename=None,
+    def __init__(self, logger=None, max_interval=None, schedule_filename=None,
             scheduler_cls=None, app=None):
             scheduler_cls=None, app=None):
         self.app = app_or_default(app)
         self.app = app_or_default(app)
         self.max_interval = max_interval or \
         self.max_interval = max_interval or \

+ 6 - 2
celery/bin/celerybeat.py

@@ -24,6 +24,7 @@
 """
 """
 from celery.bin.base import Command, Option, daemon_options
 from celery.bin.base import Command, Option, daemon_options
 from celery.platforms import create_daemon_context
 from celery.platforms import create_daemon_context
+from celery.utils.functional import partial
 
 
 
 
 class BeatCommand(Command):
 class BeatCommand(Command):
@@ -33,8 +34,11 @@ class BeatCommand(Command):
             gid=None, umask=None, working_directory=None, **kwargs):
             gid=None, umask=None, working_directory=None, **kwargs):
         kwargs.pop("app", None)
         kwargs.pop("app", None)
 
 
+        beat = partial(self.app.Beat, logfile=logfile, pidfile=pidfile,
+                       **kwargs)
+
         if not detach:
         if not detach:
-            return self.app.Beat(logfile=logfile, **kwargs).run()
+            return beat().run()
 
 
         context, on_stop = create_daemon_context(
         context, on_stop = create_daemon_context(
                                 logfile=logfile,
                                 logfile=logfile,
@@ -45,7 +49,7 @@ class BeatCommand(Command):
                                 working_directory=working_directory)
                                 working_directory=working_directory)
         context.open()
         context.open()
         try:
         try:
-            self.app.Beat(pidfile=pidfile, logfile=logfile, **kwargs).run()
+            beat().run()
         finally:
         finally:
             on_stop()
             on_stop()
 
 

+ 6 - 1
celery/concurrency/evlet.py

@@ -3,13 +3,13 @@ import sys
 
 
 from time import time
 from time import time
 
 
-
 if not os.environ.get("EVENTLET_NOPATCH"):
 if not os.environ.get("EVENTLET_NOPATCH"):
     import eventlet
     import eventlet
     import eventlet.debug
     import eventlet.debug
     eventlet.monkey_patch()
     eventlet.monkey_patch()
     eventlet.debug.hub_prevent_multiple_readers(False)
     eventlet.debug.hub_prevent_multiple_readers(False)
 
 
+from celery import signals
 from celery.concurrency import base
 from celery.concurrency import base
 from celery.utils import timer2
 from celery.utils import timer2
 
 
@@ -106,13 +106,18 @@ class TaskPool(base.BasePool):
 
 
     def on_start(self):
     def on_start(self):
         self._pool = self.Pool(self.limit)
         self._pool = self.Pool(self.limit)
+        signals.eventlet_pool_started.send(sender=self)
 
 
     def on_stop(self):
     def on_stop(self):
+        signals.eventlet_pool_preshutdown.send(sender=self)
         if self._pool is not None:
         if self._pool is not None:
             self._pool.waitall()
             self._pool.waitall()
+        signals.eventlet_pool_postshutdown.send(sender=self)
 
 
     def on_apply(self, target, args=None, kwargs=None, callback=None,
     def on_apply(self, target, args=None, kwargs=None, callback=None,
             accept_callback=None, **_):
             accept_callback=None, **_):
+        signals.eventlet_pool_apply.send(sender=self,
+                target=target, args=args, kwargs=kwargs)
         self._pool.spawn_n(apply_target, target, args, kwargs,
         self._pool.spawn_n(apply_target, target, args, kwargs,
                            callback, accept_callback,
                            callback, accept_callback,
                            self.getcurrent)
                            self.getcurrent)

+ 63 - 26
celery/concurrency/processes/pool.py

@@ -13,6 +13,7 @@ __all__ = ['Pool']
 #
 #
 
 
 import os
 import os
+import sys
 import errno
 import errno
 import threading
 import threading
 import Queue
 import Queue
@@ -22,6 +23,7 @@ import time
 import signal
 import signal
 
 
 from multiprocessing import Process, cpu_count, TimeoutError
 from multiprocessing import Process, cpu_count, TimeoutError
+from multiprocessing import util
 from multiprocessing.util import Finalize, debug
 from multiprocessing.util import Finalize, debug
 
 
 from celery.exceptions import SoftTimeLimitExceeded, TimeLimitExceeded
 from celery.exceptions import SoftTimeLimitExceeded, TimeLimitExceeded
@@ -58,6 +60,11 @@ def mapstar(args):
     return map(*args)
     return map(*args)
 
 
 
 
+def error(msg, *args, **kwargs):
+    if util._logger:
+        util._logger.error(msg, *args, **kwargs)
+
+
 class LaxBoundedSemaphore(threading._Semaphore):
 class LaxBoundedSemaphore(threading._Semaphore):
     """Semaphore that checks that # release is <= # acquires,
     """Semaphore that checks that # release is <= # acquires,
     but ignores if # releases >= value."""
     but ignores if # releases >= value."""
@@ -168,6 +175,14 @@ class PoolThread(threading.Thread):
         self._state = RUN
         self._state = RUN
         self.daemon = True
         self.daemon = True
 
 
+    def run(self):
+        try:
+            return self.body()
+        except Exception, exc:
+            error("Thread %r crashed: %r" % (self.__class__.__name__, exc, ),
+                  exc_info=sys.exc_info())
+            os._exit(1)
+
     def terminate(self):
     def terminate(self):
         self._state = TERMINATE
         self._state = TERMINATE
 
 
@@ -181,11 +196,11 @@ class Supervisor(PoolThread):
         self.pool = pool
         self.pool = pool
         super(Supervisor, self).__init__()
         super(Supervisor, self).__init__()
 
 
-    def run(self):
+    def body(self):
         debug('worker handler starting')
         debug('worker handler starting')
         while self._state == RUN and self.pool._state == RUN:
         while self._state == RUN and self.pool._state == RUN:
             self.pool._maintain_pool()
             self.pool._maintain_pool()
-            time.sleep(0.1)
+            time.sleep(0.8)
         debug('worker handler exiting')
         debug('worker handler exiting')
 
 
 
 
@@ -198,7 +213,7 @@ class TaskHandler(PoolThread):
         self.pool = pool
         self.pool = pool
         super(TaskHandler, self).__init__()
         super(TaskHandler, self).__init__()
 
 
-    def run(self):
+    def body(self):
         taskqueue = self.taskqueue
         taskqueue = self.taskqueue
         outqueue = self.outqueue
         outqueue = self.outqueue
         put = self.put
         put = self.put
@@ -249,7 +264,7 @@ class TimeoutHandler(PoolThread):
         self.putlock = putlock
         self.putlock = putlock
         super(TimeoutHandler, self).__init__()
         super(TimeoutHandler, self).__init__()
 
 
-    def run(self):
+    def body(self):
         processes = self.processes
         processes = self.processes
         cache = self.cache
         cache = self.cache
         putlock = self.putlock
         putlock = self.putlock
@@ -338,7 +353,7 @@ class ResultHandler(PoolThread):
         self.putlock = putlock
         self.putlock = putlock
         super(ResultHandler, self).__init__()
         super(ResultHandler, self).__init__()
 
 
-    def run(self):
+    def body(self):
         get = self.get
         get = self.get
         outqueue = self.outqueue
         outqueue = self.outqueue
         cache = self.cache
         cache = self.cache
@@ -521,11 +536,22 @@ class Pool(object):
         w.start()
         w.start()
         return w
         return w
 
 
-    def _join_exited_workers(self):
+    def _join_exited_workers(self, lost_worker_timeout=10.0):
         """Cleanup after any worker processes which have exited due to
         """Cleanup after any worker processes which have exited due to
         reaching their specified lifetime. Returns True if any workers were
         reaching their specified lifetime. Returns True if any workers were
         cleaned up.
         cleaned up.
         """
         """
+        now = None
+        # The worker may have published a result before being terminated,
+        # but we have no way to accurately tell if it did.  So we wait for
+        # 10 seconds before we mark the job with WorkerLostError.
+        for job in [job for job in self._cache.values()
+                if not job.ready() and job._worker_lost]:
+            now = now or time.time()
+            if now - job._worker_lost > lost_worker_timeout:
+                err = WorkerLostError("Worker exited prematurely.")
+                job._set(None, (False, err))
+
         cleaned = []
         cleaned = []
         for i in reversed(range(len(self._pool))):
         for i in reversed(range(len(self._pool))):
             worker = self._pool[i]
             worker = self._pool[i]
@@ -541,8 +567,7 @@ class Pool(object):
                     if worker_pid in cleaned and not job.ready():
                     if worker_pid in cleaned and not job.ready():
                         if self._putlock is not None:
                         if self._putlock is not None:
                             self._putlock.release()
                             self._putlock.release()
-                        err = WorkerLostError("Worker exited prematurely.")
-                        job._set(None, (False, err))
+                        job._worker_lost = time.time()
                         continue
                         continue
             return True
             return True
         return False
         return False
@@ -817,9 +842,11 @@ DynamicPool = Pool
 
 
 
 
 class ApplyResult(object):
 class ApplyResult(object):
+    _worker_lost = None
 
 
     def __init__(self, cache, callback, accept_callback=None,
     def __init__(self, cache, callback, accept_callback=None,
             timeout_callback=None, error_callback=None):
             timeout_callback=None, error_callback=None):
+        self._mutex = threading.Lock()
         self._cond = threading.Condition(threading.Lock())
         self._cond = threading.Condition(threading.Lock())
         self._job = job_counter.next()
         self._job = job_counter.next()
         self._cache = cache
         self._cache = cache
@@ -865,28 +892,38 @@ class ApplyResult(object):
             raise self._value
             raise self._value
 
 
     def _set(self, i, obj):
     def _set(self, i, obj):
-        self._success, self._value = obj
-        if self._callback and self._success:
-            self._callback(self._value)
-        if self._errback and not self._success:
-            self._errback(self._value)
-        self._cond.acquire()
+        self._mutex.acquire()
         try:
         try:
-            self._ready = True
-            self._cond.notify()
+            self._success, self._value = obj
+            self._cond.acquire()
+            try:
+                self._ready = True
+                self._cond.notify()
+            finally:
+                self._cond.release()
+            if self._accepted:
+                self._cache.pop(self._job, None)
+
+            # apply callbacks last
+            if self._callback and self._success:
+                self._callback(self._value)
+            if self._errback and not self._success:
+                self._errback(self._value)
         finally:
         finally:
-            self._cond.release()
-        if self._accepted:
-            self._cache.pop(self._job, None)
+            self._mutex.release()
 
 
     def _ack(self, i, time_accepted, pid):
     def _ack(self, i, time_accepted, pid):
-        self._accepted = True
-        self._time_accepted = time_accepted
-        self._worker_pid = pid
-        if self._accept_callback:
-            self._accept_callback(pid, time_accepted)
-        if self._ready:
-            self._cache.pop(self._job, None)
+        self._mutex.acquire()
+        try:
+            self._accepted = True
+            self._time_accepted = time_accepted
+            self._worker_pid = pid
+            if self._ready:
+                self._cache.pop(self._job, None)
+            if self._accept_callback:
+                self._accept_callback(pid, time_accepted)
+        finally:
+            self._mutex.release()
 
 
 #
 #
 # Class whose instances are returned by `Pool.map_async()`
 # Class whose instances are returned by `Pool.map_async()`

+ 10 - 1
celery/datastructures.py

@@ -84,6 +84,15 @@ class DictAttribute(object):
 
 
 
 
 class ConfigurationView(AttributeDictMixin):
 class ConfigurationView(AttributeDictMixin):
+    """A view over an applications configuration dicts.
+
+    If the key does not exist in ``changes``, the ``defaults`` dict
+    is consulted.
+
+    :param changes:  Dict containing changes to the configuration.
+    :param defaults: Dict containing the default configuration.
+
+    """
     changes = None
     changes = None
     defaults = None
     defaults = None
 
 
@@ -198,7 +207,7 @@ class LimitedSet(object):
     consume too much resources.
     consume too much resources.
 
 
     :keyword maxlen: Maximum number of members before we start
     :keyword maxlen: Maximum number of members before we start
-                     deleting expired members.
+                     evicting expired members.
     :keyword expires: Time in seconds, before a membership expires.
     :keyword expires: Time in seconds, before a membership expires.
 
 
     """
     """

+ 59 - 0
celery/signals.py

@@ -211,6 +211,60 @@ Dispatched in addition to the :signal:`beat_init` signal when celerybeat is
 started as an embedded process.  Sender is the
 started as an embedded process.  Sender is the
 :class:`celery.beat.Service` instance.
 :class:`celery.beat.Service` instance.
 
 
+Eventlet Signals
+----------------
+
+.. signal:: eventlet_pool_started
+
+eventlet_pool_started
+~~~~~~~~~~~~~~~~~~~~~
+
+Sent when the eventlet pool has been started.
+
+Sender is the :class:`celery.concurrency.evlet.TaskPool` instance.
+
+.. signal:: eventlet_pool_preshutdown
+
+eventlet_pool_preshutdown
+~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Sent when the worker shutdown, just before the eventlet pool
+is requested to wait for remaining workers.
+
+Sender is the :class:`celery.concurrency.evlet.TaskPool` instance.
+
+.. signal:: eventlet_pool_postshutdown
+
+eventlet_pool_postshutdown
+~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Sent when the pool has been joined and the worker is ready to shutdown.
+
+Sender is the :class:`celery.concurrency.evlet.TaskPool` instance.
+
+.. signal:: eventlet_pool_apply
+
+eventlet_pool_apply
+~~~~~~~~~~~~~~~~~~~
+
+Sent whenever a task is applied to the pool.
+
+Sender is the :class:`celery.concurrency.evlet.TaskPool` instance.
+
+Provides arguments:
+
+* target
+
+    The target function.
+
+* args
+
+    Positional arguments.
+
+* kwargs
+
+    Keyword arguments.
+
 
 
 """
 """
 from celery.utils.dispatch import Signal
 from celery.utils.dispatch import Signal
@@ -239,3 +293,8 @@ setup_logging = Signal(providing_args=["loglevel", "logfile",
 
 
 beat_init = Signal(providing_args=[])
 beat_init = Signal(providing_args=[])
 beat_embedded_init = Signal(providing_args=[])
 beat_embedded_init = Signal(providing_args=[])
+
+eventlet_pool_started = Signal(providing_args=[])
+eventlet_pool_preshutdown = Signal(providing_args=[])
+eventlet_pool_postshutdown = Signal(providing_args=[])
+eventlet_pool_apply = Signal(providing_args=["target", "args", "kwargs"])

+ 1 - 1
celery/tests/test_slow/test_buckets.py

@@ -83,7 +83,7 @@ class test_TokenBucketQueue(unittest.TestCase):
         x = buckets.TokenBucketQueue(fill_rate=1)
         x = buckets.TokenBucketQueue(fill_rate=1)
         x.put_nowait("The quick brown fox")
         x.put_nowait("The quick brown fox")
         self.assertEqual(x.get_nowait(), "The quick brown fox")
         self.assertEqual(x.get_nowait(), "The quick brown fox")
-        self.assertTrue(x.expected_time())
+        self.assertFalse(x.expected_time())
 
 
     @skip_if_disabled
     @skip_if_disabled
     def test_qsize(self):
     def test_qsize(self):

+ 1 - 1
celery/tests/test_worker/test_worker_controllers.py → celery/tests/test_worker/test_worker_mediator.py

@@ -3,7 +3,7 @@ from celery.tests.utils import unittest
 from Queue import Queue
 from Queue import Queue
 
 
 from celery.utils import gen_unique_id
 from celery.utils import gen_unique_id
-from celery.worker.controllers import Mediator
+from celery.worker.mediator import Mediator
 from celery.worker.state import revoked as revoked_tasks
 from celery.worker.state import revoked as revoked_tasks
 
 
 
 

+ 24 - 16
celery/utils/timer2.py

@@ -4,6 +4,8 @@ from __future__ import generators
 
 
 import atexit
 import atexit
 import heapq
 import heapq
+import logging
+import os
 import sys
 import sys
 import traceback
 import traceback
 import warnings
 import warnings
@@ -146,6 +148,7 @@ class Timer(Thread):
         self._shutdown = Event()
         self._shutdown = Event()
         self._stopped = Event()
         self._stopped = Event()
         self.mutex = Lock()
         self.mutex = Lock()
+        self.logger = logging.getLogger("timer2.Timer")
         self.not_empty = Condition(self.mutex)
         self.not_empty = Condition(self.mutex)
         self.setDaemon(True)
         self.setDaemon(True)
         self.setName("Timer-%s" % (self._timer_count(), ))
         self.setName("Timer-%s" % (self._timer_count(), ))
@@ -172,23 +175,28 @@ class Timer(Thread):
         return self.apply_entry(entry)
         return self.apply_entry(entry)
 
 
     def run(self):
     def run(self):
-        self.running = True
-        self.scheduler = iter(self.schedule)
-
-        while not self._shutdown.isSet():
-            delay = self.next()
-            if delay:
-                if self.on_tick:
-                    self.on_tick(delay)
-                if sleep is None:
-                    break
-                sleep(delay)
         try:
         try:
-            self._stopped.set()
-        except TypeError:           # pragma: no cover
-            # we lost the race at interpreter shutdown,
-            # so gc collected built-in modules.
-            pass
+            self.running = True
+            self.scheduler = iter(self.schedule)
+
+            while not self._shutdown.isSet():
+                delay = self.next()
+                if delay:
+                    if self.on_tick:
+                        self.on_tick(delay)
+                    if sleep is None:
+                        break
+                    sleep(delay)
+            try:
+                self._stopped.set()
+            except TypeError:           # pragma: no cover
+                # we lost the race at interpreter shutdown,
+                # so gc collected built-in modules.
+                pass
+        except Exception, exc:
+            self.logger.error("Thread Timer crashed: %r" % (exc, ),
+                  exc_info=sys.exc_info())
+            os._exit(1)
 
 
     def stop(self):
     def stop(self):
         if self.running:
         if self.running:

+ 1 - 1
celery/utils/timeutils.py

@@ -23,7 +23,7 @@ TIME_UNITS = (("day", 60 * 60 * 24, lambda n: int(math.ceil(n))),
 def maybe_timedelta(delta):
 def maybe_timedelta(delta):
     """Coerces integer to timedelta if `delta` is an integer."""
     """Coerces integer to timedelta if `delta` is an integer."""
     if isinstance(delta, int):
     if isinstance(delta, int):
-        return timedelta(seconds=int)
+        return timedelta(seconds=delta)
     return delta
     return delta
 
 
 
 

+ 1 - 1
celery/worker/__init__.py

@@ -88,7 +88,7 @@ class WorkController(object):
     #: processing.
     #: processing.
     ready_queue = None
     ready_queue = None
 
 
-    #: Instance of :class:`celery.worker.controllers.Mediator`.
+    #: Instance of :class:`celery.worker.mediator.Mediator`.
     mediator = None
     mediator = None
 
 
     #: Consumer instance.
     #: Consumer instance.

+ 7 - 1
celery/worker/autoscale.py

@@ -1,3 +1,4 @@
+import os
 import sys
 import sys
 import threading
 import threading
 import traceback
 import traceback
@@ -56,7 +57,12 @@ class Autoscaler(threading.Thread):
 
 
     def run(self):
     def run(self):
         while not self._shutdown.isSet():
         while not self._shutdown.isSet():
-            self.scale()
+            try:
+                self.scale()
+            except Exception, exc:
+                self.logger.error("Thread Autoscaler crashed: %r" % (exc, ),
+                                  exc_info=sys.exc_info())
+                os._exit(1)
         self._stopped.set()
         self._stopped.set()
 
 
     def stop(self):
     def stop(self):

+ 1 - 1
celery/worker/consumer.py

@@ -43,7 +43,7 @@ up and running.
 * If the task has an ETA/countdown, the task is moved to the `eta_schedule`
 * If the task has an ETA/countdown, the task is moved to the `eta_schedule`
   so the :class:`timer2.Timer` can schedule it at its
   so the :class:`timer2.Timer` can schedule it at its
   deadline. Tasks without an eta are moved immediately to the `ready_queue`,
   deadline. Tasks without an eta are moved immediately to the `ready_queue`,
-  so they can be picked up by the :class:`~celery.worker.controllers.Mediator`
+  so they can be picked up by the :class:`~celery.worker.mediator.Mediator`
   to be sent to the pool.
   to be sent to the pool.
 
 
 * When a task with an ETA is received the QoS prefetch count is also
 * When a task with an ETA is received the QoS prefetch count is also

+ 10 - 10
celery/worker/control/builtins.py

@@ -26,7 +26,7 @@ def revoke(panel, task_id, terminate=False, signal=None, **kwargs):
                 request.terminate(panel.consumer.pool, signal=signum)
                 request.terminate(panel.consumer.pool, signal=signum)
                 break
                 break
 
 
-    panel.logger.warn("Task %s %s." % (task_id, action))
+    panel.logger.info("Task %s %s." % (task_id, action))
     return {"ok": "task %s %s" % (task_id, action)}
     return {"ok": "task %s %s" % (task_id, action)}
 
 
 
 
@@ -36,7 +36,7 @@ def enable_events(panel):
     if not dispatcher.enabled:
     if not dispatcher.enabled:
         dispatcher.enable()
         dispatcher.enable()
         dispatcher.send("worker-online")
         dispatcher.send("worker-online")
-        panel.logger.warn("Events enabled by remote.")
+        panel.logger.info("Events enabled by remote.")
         return {"ok": "events enabled"}
         return {"ok": "events enabled"}
     return {"ok": "events already enabled"}
     return {"ok": "events already enabled"}
 
 
@@ -47,7 +47,7 @@ def disable_events(panel):
     if dispatcher.enabled:
     if dispatcher.enabled:
         dispatcher.send("worker-offline")
         dispatcher.send("worker-offline")
         dispatcher.disable()
         dispatcher.disable()
-        panel.logger.warn("Events disabled by remote.")
+        panel.logger.info("Events disabled by remote.")
         return {"ok": "events disabled"}
         return {"ok": "events disabled"}
     return {"ok": "events already disabled"}
     return {"ok": "events already disabled"}
 
 
@@ -89,11 +89,11 @@ def rate_limit(panel, task_name, rate_limit, **kwargs):
     panel.consumer.ready_queue.refresh()
     panel.consumer.ready_queue.refresh()
 
 
     if not rate_limit:
     if not rate_limit:
-        panel.logger.warn("Disabled rate limits for tasks of type %s" % (
+        panel.logger.info("Disabled rate limits for tasks of type %s" % (
                             task_name, ))
                             task_name, ))
         return {"ok": "rate limit disabled successfully"}
         return {"ok": "rate limit disabled successfully"}
 
 
-    panel.logger.warn("New rate limit for tasks of type %s: %s." % (
+    panel.logger.info("New rate limit for tasks of type %s: %s." % (
                 task_name, rate_limit))
                 task_name, rate_limit))
     return {"ok": "new rate limit set successfully"}
     return {"ok": "new rate limit set successfully"}
 
 
@@ -110,7 +110,7 @@ def dump_schedule(panel, safe=False, **kwargs):
             item["priority"],
             item["priority"],
             item["item"])
             item["item"])
     info = map(formatitem, enumerate(schedule.info()))
     info = map(formatitem, enumerate(schedule.info()))
-    panel.logger.info("* Dump of current schedule:\n%s" % (
+    panel.logger.debug("* Dump of current schedule:\n%s" % (
                             "\n".join(info, )))
                             "\n".join(info, )))
     scheduled_tasks = []
     scheduled_tasks = []
     for item in schedule.info():
     for item in schedule.info():
@@ -128,7 +128,7 @@ def dump_reserved(panel, safe=False, **kwargs):
     if not reserved:
     if not reserved:
         panel.logger.info("--Empty queue--")
         panel.logger.info("--Empty queue--")
         return []
         return []
-    panel.logger.info("* Dump of currently reserved tasks:\n%s" % (
+    panel.logger.debug("* Dump of currently reserved tasks:\n%s" % (
                             "\n".join(map(safe_repr, reserved), )))
                             "\n".join(map(safe_repr, reserved), )))
     return [request.info(safe=safe)
     return [request.info(safe=safe)
             for request in reserved]
             for request in reserved]
@@ -166,8 +166,8 @@ def dump_tasks(panel, **kwargs):
 
 
     info = map(_extract_info, (tasks[task]
     info = map(_extract_info, (tasks[task]
                                         for task in sorted(tasks.keys())))
                                         for task in sorted(tasks.keys())))
-    panel.logger.warn("* Dump of currently registered tasks:\n%s" % (
-                "\n".join(info)))
+    panel.logger.debug("* Dump of currently registered tasks:\n%s" % (
+                    "\n".join(info)))
 
 
     return info
     return info
 
 
@@ -191,7 +191,7 @@ def pool_shrink(panel, n=1, **kwargs):
 
 
 @Panel.register
 @Panel.register
 def shutdown(panel, **kwargs):
 def shutdown(panel, **kwargs):
-    panel.logger.critical("Got shutdown from remote.")
+    panel.logger.warning("Got shutdown from remote.")
     raise SystemExit("Got shutdown from remote")
     raise SystemExit("Got shutdown from remote")
 
 
 
 

+ 9 - 1
celery/worker/controllers.py → celery/worker/mediator.py

@@ -4,6 +4,7 @@ Worker Controller Threads
 
 
 """
 """
 import logging
 import logging
+import os
 import sys
 import sys
 import threading
 import threading
 import traceback
 import traceback
@@ -47,6 +48,7 @@ class Mediator(threading.Thread):
             "Mediator: Running callback for task: %s[%s]" % (
             "Mediator: Running callback for task: %s[%s]" % (
                 task.task_name, task.task_id))
                 task.task_name, task.task_id))
 
 
+
         try:
         try:
             self.callback(task)
             self.callback(task)
         except Exception, exc:
         except Exception, exc:
@@ -61,7 +63,13 @@ class Mediator(threading.Thread):
     def run(self):
     def run(self):
         """Move tasks forver or until :meth:`stop` is called."""
         """Move tasks forver or until :meth:`stop` is called."""
         while not self._shutdown.isSet():
         while not self._shutdown.isSet():
-            self.move()
+            try:
+                self.move()
+            except Exception, exc:
+                self.logger.error("Mediator crash: %r" % (exc, ),
+                    exc_info=sys.exc_info())
+                # exiting by normal means does not work here, so force exit.
+                os._exit(1)
         self._stopped.set()
         self._stopped.set()
 
 
     def stop(self):
     def stop(self):