Ask Solem 15 years ago
parent
commit
4cc234159a

+ 0 - 1
celery/backends/__init__.py

@@ -1,4 +1,3 @@
-import sys
 import importlib
 
 from billiard.utils.functional import curry

+ 3 - 2
celery/backends/redis.py

@@ -31,7 +31,9 @@ class RedisBackend(KeyValueStoreBackend):
     redis_connect_retry = None
 
     def __init__(self, redis_host=None, redis_port=None, redis_db=None,
-            redis_timeout=None, redis_connect_timeout=None):
+            redis_timeout=None,
+            redis_connect_retry=None,
+            redis_connect_timeout=None):
         if not redis:
             raise ImproperlyConfigured(
                     "You need to install the redis library in order to use "
@@ -76,7 +78,6 @@ class RedisBackend(KeyValueStoreBackend):
 
     def close(self):
         """Close the redis connection and remove the cache."""
-        # connection overrides bool() # XXX Was the case with tyrant lib!
         if self._connection is not None:
             self._connection.close()
             self._connection = None

+ 1 - 2
celery/beat.py

@@ -9,7 +9,6 @@ from datetime import datetime
 from celery import log
 from celery import conf
 from celery import registry
-from celery.log import setup_logger
 
 TIME_UNITS = (("day", 60 * 60 * 24, lambda n: int(math.ceil(n))),
               ("hour", 60 * 60, lambda n: int(math.ceil(n))),
@@ -162,7 +161,7 @@ class ClockService(object):
     def start(self):
         self.logger.info("ClockService: Starting...")
         schedule = shelve.open(filename=self.schedule_filename)
-        #atexit.register(schedule.close)
+        atexit.register(schedule.close)
         scheduler = self.scheduler_cls(schedule=schedule,
                                        registry=self.registry,
                                        logger=self.logger,

+ 6 - 9
celery/bin/celerybeat.py

@@ -46,7 +46,6 @@
     Change root directory to this path when in daemon mode.
 
 """
-import os
 import sys
 import traceback
 import optparse
@@ -57,7 +56,6 @@ from celery import __version__
 from celery.log import emergency_error
 from celery.beat import ClockService
 from celery.utils import noop
-from celery.loaders import current_loader, settings
 from celery.messaging import get_connection_info
 
 STARTUP_INFO_FMT = """
@@ -120,11 +118,14 @@ def run_clockservice(detach=False, loglevel=conf.CELERYBEAT_LOG_LEVEL,
     if not detach:
         logfile = None # log to stderr when not running in the background.
 
-    # Dump configuration to screen so we have some basic information
-    # when users sends e-mails.
+    # Run the worker init handler.
+    # (Usually imports task modules and such.)
+    from celery.loaders import current_loader
+    current_loader.on_worker_init()
 
-    from celery.messaging import format_routing_table
 
+    # Dump configuration to screen so we have some basic information
+    # when users sends e-mails.
 
     print(STARTUP_INFO_FMT % {
             "conninfo": get_connection_info(),
@@ -150,10 +151,6 @@ def run_clockservice(detach=False, loglevel=conf.CELERYBEAT_LOG_LEVEL,
         redirect_stdouts_to_logger(logger, loglevel)
         platform.set_effective_user(uid, gid)
 
-    # Run the worker init handler.
-    # (Usually imports task modules and such.)
-    current_loader.on_worker_init()
-
     def _run_clock():
         logger = setup_logger(loglevel, logfile)
         clockservice = ClockService(logger=logger, is_detached=detach,

+ 0 - 1
celery/bin/celeryd.py

@@ -64,7 +64,6 @@
 import os
 import sys
 import logging
-import textwrap
 import optparse
 import traceback
 import multiprocessing

+ 0 - 1
celery/conf.py

@@ -2,7 +2,6 @@ import logging
 import warnings
 from datetime import timedelta
 
-from celery.registry import tasks
 from celery.loaders import settings
 
 DEFAULT_LOG_FMT = '[%(asctime)s: %(levelname)s/%(processName)s] %(message)s'

+ 1 - 1
celery/execute/__init__.py

@@ -5,7 +5,7 @@ from celery.utils import gen_unique_id, fun_takes_kwargs, mattrgetter
 from celery.result import AsyncResult, EagerResult
 from celery.execute.trace import TaskTrace
 from celery.registry import tasks
-from celery.messaging import TaskPublisher, with_connection
+from celery.messaging import with_connection
 
 extract_exec_options = mattrgetter("routing_key", "exchange",
                                    "immediate", "mandatory",

+ 3 - 5
celery/loaders/__init__.py

@@ -1,16 +1,12 @@
 import os
 import importlib
 
-from django.conf import settings
-from django.core.management import setup_environ
-
-from carrot.utils import rpartition
 from celery.loaders.default import Loader as DefaultLoader
 from celery.loaders.djangoapp import Loader as DjangoLoader
 
+LOADER_CLASS_NAME = "Loader"
 LOADER_ALIASES = {"django": "celery.loaders.djangoapp",
                   "default": "celery.loaders.default"}
-LOADER_CLASS_NAME = "Loader"
 _loader_cache = {}
 
 
@@ -36,6 +32,7 @@ def _detect_loader():
     if loader:
         return get_loader_cls(loader)
 
+    from django.conf import settings
     if settings.configured:
         return DjangoLoader
     try:
@@ -55,6 +52,7 @@ def _detect_loader():
             # used configuration method so to propogate it to the "child"
             # processes. But this has to be experimented with.
             # [asksol/heyman]
+            from django.core.management import setup_environ
             try:
                 settings_mod = os.environ.get("DJANGO_SETTINGS_MODULE",
                                                 "settings")

+ 4 - 4
celery/managers.py

@@ -19,9 +19,9 @@ class TaskManager(models.Manager):
         """
         try:
             task, created = self.get_or_create(task_id=task_id)
-        except Exception, exc:
-            # depending on the database backend we can get various exceptions,
-            # so we catch every exception type
+        except Exception:
+            # We don't have a map of the different exceptions backends can
+            # throw, so we have to catch everything.
             if exception_retry_count > 0:
                 transaction.rollback_unless_managed()
                 return self.get_task(task_id, exception_retry_count-1)
@@ -74,7 +74,7 @@ class TaskManager(models.Manager):
                 task.result = result
                 task.traceback = traceback
                 task.save()
-        except Exception, exc:
+        except Exception:
             # depending on the database backend we can get various exceptions.
             # for excample, psycopg2 raises an exception if some operation
             # breaks transaction, and saving task result won't be possible

+ 0 - 3
celery/models.py

@@ -3,15 +3,12 @@
 Django Models.
 
 """
-from datetime import datetime
-
 import django
 from django.db import models
 from django.utils.translation import ugettext_lazy as _
 from picklefield.fields import PickledObjectField
 
 from celery import conf
-from celery.registry import tasks
 from celery.managers import TaskManager
 
 TASK_STATUS_PENDING = "PENDING"

+ 2 - 1
celery/task/__init__.py

@@ -12,7 +12,8 @@ from celery.task.control import discard_all
 from celery.task.builtins import PingTask
 
 __all__ = ["Task", "TaskSet", "PeriodicTask", "tasks", "discard_all",
-           "is_successful", "dmap", "dmap_async", "execute_remote", "ping"]
+           "dmap", "dmap_async", "execute_remote", "ping"]
+
 
 def dmap(fun, args, timeout=None):
     """Distribute processing of the arguments and collect the results.

+ 0 - 1
celery/task/builtins.py

@@ -2,7 +2,6 @@ from datetime import timedelta
 
 from celery.task.base import Task, PeriodicTask
 from celery.backends import default_backend
-from celery.registry import tasks
 
 
 class DeleteExpiredTaskMetaTask(PeriodicTask):

+ 0 - 1
celery/task/rest.py

@@ -9,7 +9,6 @@ from urlparse import urlparse
 from anyjson import serialize, deserialize
 
 from celery import __version__ as celery_version
-from celery.registry import tasks
 from celery.task.base import Task as BaseTask
 
 

+ 4 - 4
celery/tests/test_backends/test_database.py

@@ -1,10 +1,10 @@
 import unittest
-from celery.backends.database import DatabaseBackend
-from celery.utils import gen_unique_id
-from celery.task import PeriodicTask
-from celery import registry
 from datetime import timedelta
 
+from celery.task import PeriodicTask
+from celery.utils import gen_unique_id
+from celery.backends.database import DatabaseBackend
+
 
 class SomeClass(object):
 

+ 2 - 6
celery/tests/test_task.py

@@ -1,12 +1,11 @@
 import unittest
 from StringIO import StringIO
+from datetime import datetime, timedelta
 
 from celery import task
-from celery import registry
 from celery import messaging
 from celery.result import EagerResult
 from celery.backends import default_backend
-from datetime import datetime, timedelta
 from celery.decorators import task as task_dec
 from celery.worker.listener import parse_iso8601
 
@@ -222,17 +221,14 @@ class TestCeleryTasks(unittest.TestCase):
 
         # Discarding all tasks.
         task.discard_all()
-        tid3 = task.apply_async(t1)
+        task.apply_async(t1)
         self.assertEquals(task.discard_all(), 1)
         self.assertTrue(consumer.fetch() is None)
 
-        self.assertFalse(task.is_successful(presult.task_id))
         self.assertFalse(presult.successful())
         default_backend.mark_as_done(presult.task_id, result=None)
-        self.assertTrue(task.is_successful(presult.task_id))
         self.assertTrue(presult.successful())
 
-
         publisher = t1.get_publisher()
         self.assertTrue(isinstance(publisher, messaging.TaskPublisher))
 

+ 2 - 3
celery/tests/test_worker.py

@@ -7,7 +7,6 @@ from carrot.connection import BrokerConnection
 from carrot.backends.base import BaseMessage
 from billiard.serialization import pickle
 
-from celery import registry
 from celery.utils import gen_unique_id
 from celery.worker import CarrotListener, WorkController
 from celery.worker.job import TaskWrapper
@@ -110,14 +109,14 @@ class TestCarrotListener(unittest.TestCase):
         l = CarrotListener(self.ready_queue, self.eta_schedule, self.logger,
                            send_events=False)
 
-        c = l.reset_connection()
+        l.reset_connection()
         self.assertTrue(isinstance(l.amqp_connection, BrokerConnection))
 
         l.close_connection()
         self.assertTrue(l.amqp_connection is None)
         self.assertTrue(l.task_consumer is None)
 
-        c = l.reset_connection()
+        l.reset_connection()
         self.assertTrue(isinstance(l.amqp_connection, BrokerConnection))
 
         l.stop()

+ 2 - 3
celery/tests/test_worker_controllers.py

@@ -1,7 +1,6 @@
-import unittest
 import time
-from Queue import Queue, Empty
-from datetime import datetime, timedelta
+import unittest
+from Queue import Queue
 
 from celery.worker.controllers import Mediator
 from celery.worker.controllers import BackgroundThread

+ 6 - 8
celery/tests/test_worker_job.py

@@ -15,8 +15,7 @@ from celery.models import TaskMeta
 from celery.result import AsyncResult
 from celery.worker.job import WorkerTaskTrace, TaskWrapper
 from celery.worker.pool import TaskPool
-from celery.registry import tasks, NotRegistered
-from celery.exceptions import RetryTaskError
+from celery.exceptions import RetryTaskError, NotRegistered
 from celery.decorators import task as task_dec
 from celery.datastructures import ExceptionInfo
 
@@ -107,12 +106,11 @@ class TestJail(unittest.TestCase):
             return old_connection_close(*args, **kwargs)
 
         connection.close = monkeypatched_connection_close
-
-        ret = jail(gen_unique_id(),
-                   get_db_connection.name, [2], {})
-        self.assertTrue(connection._was_closed)
-
-        connection.close = old_connection_close
+        try:
+            jail(gen_unique_id(), get_db_connection.name, [2], {})
+            self.assertTrue(connection._was_closed)
+        finally:
+            connection.close = old_connection_close
 
     def test_django_cache_connection_is_closed(self):
         old_cache_close = getattr(cache.cache, "close", None)

+ 0 - 2
celery/tests/test_worker_revoke.py

@@ -1,6 +1,4 @@
 import unittest
-from Queue import Queue, Empty
-from datetime import datetime, timedelta
 
 from celery.worker import revoke
 

+ 0 - 1
celery/worker/buckets.py

@@ -8,7 +8,6 @@ from celery.utils import all
 RATE_MODIFIER_MAP = {"s": lambda n: n,
                      "m": lambda n: n / 60.0,
                      "h": lambda n: n / 60.0 / 60.0}
-
 BASE_IDENTIFIERS = {"0x": 16, "0o": 8, "0b": 2}
 
 

+ 0 - 1
celery/worker/controllers.py

@@ -5,7 +5,6 @@ Worker Controller Threads
 """
 import time
 import threading
-from datetime import datetime
 from Queue import Empty as QueueEmpty
 
 from celery import log

+ 1 - 1
celery/worker/listener.py

@@ -209,7 +209,7 @@ class CarrotListener(object):
         def _establish_connection():
             """Establish a connection to the AMQP broker."""
             conn = establish_connection()
-            connected = conn.connection # Connection is established lazily.
+            conn.connection # Connection is established lazily, so connect.
             return conn
 
         if not conf.BROKER_CONNECTION_RETRY: