فهرست منبع

Working on removing usage of configuration globals.

Ask Solem 14 سال پیش
والد
کامیت
ad6b172201

+ 5 - 0
Changelog

@@ -189,6 +189,11 @@ News
 Fixes
 -----
 
+* AMQP result backend: ``result.get()`` returned and cached
+   ``None`` for states other than success and failure states.
+
+   See http://github.com/ask/celery/issues/issue/179
+
 * Compat ``LoggerAdapter`` implementation: Now works for Python 2.4.
 
     Also added support for several new methods:

+ 1 - 1
README.rst

@@ -4,7 +4,7 @@
 
 .. image:: http://cloud.github.com/downloads/ask/celery/celery_favicon_128.png
 
-:Version: 2.1.0a2
+:Version: 2.1.0a5
 :Web: http://celeryproject.org/
 :Download: http://pypi.python.org/pypi/celery/
 :Source: http://github.com/ask/celery/

+ 1 - 1
celery/__init__.py

@@ -1,6 +1,6 @@
 """Distributed Task Queue"""
 
-VERSION = (2, 1, 0, "a2")
+VERSION = (2, 1, 0, "a5")
 
 __version__ = ".".join(map(str, VERSION[0:3])) + "".join(VERSION[3:])
 __author__ = "Ask Solem"

+ 2 - 3
celery/apps/beat.py

@@ -23,10 +23,9 @@ class Beat(object):
             max_interval=None, scheduler_cls=None, defaults=None, **kwargs):
         """Starts the celerybeat task scheduler."""
 
+        if defaults is None:
+            from celery import conf as defaults
         self.defaults = defaults
-        if self.defaults is None:
-            from celery import conf
-            self.defaults = conf
 
         self.loglevel = loglevel or defaults.CELERYBEAT_LOG_LEVEL
         self.logfile = logfile or defaults.CELERYBEAT_LOG_FILE

+ 15 - 13
celery/apps/worker.py

@@ -64,12 +64,12 @@ class Worker(object):
         self.max_tasks_per_child = (max_tasks_per_child or
                                     defaults.CELERYD_MAX_TASKS_PER_CHILD)
         self.db = db
-        self.queues = queues or []
+        self.use_queues = queues or []
         self.include = include or []
         self._isatty = sys.stdout.isatty()
 
-        if isinstance(self.queues, basestring):
-            self.queues = self.queues.split(",")
+        if isinstance(self.use_queues, basestring):
+            self.use_queues = self.use_queues.split(",")
         if isinstance(self.include, basestring):
             self.include = self.include.split(",")
 
@@ -103,17 +103,20 @@ class Worker(object):
 
     def init_queues(self):
         conf = self.defaults
-        if self.queues:
-            conf.QUEUES = dict((queue, options)
-                                for queue, options in conf.QUEUES.items()
-                                    if queue in self.queues)
-            for queue in self.queues:
-                if queue not in conf.QUEUES:
+        from celery.conf import prepare_queues
+        queues = prepare_queues(conf.QUEUES, conf)
+        if self.use_queues:
+            queues = dict((queue, options)
+                                for queue, options in queues.items()
+                                    if queue in self.use_queues)
+            for queue in self.use_queues:
+                if queue not in queues:
                     if conf.CREATE_MISSING_QUEUES:
-                        Router(queues=conf.QUEUES).add_queue(queue)
+                        Router(queues=queues).add_queue(queue)
                     else:
                         raise ImproperlyConfigured(
                             "Queue '%s' not defined in CELERY_QUEUES" % queue)
+        self.queues = queues
 
     def init_loader(self):
         from celery.loaders import current_loader, load_settings
@@ -159,11 +162,9 @@ class Worker(object):
             include_builtins = self.loglevel <= logging.DEBUG
             tasklist = self.tasklist(include_builtins=include_builtins)
 
-        queues = self.defaults.get_queues()
-
         return STARTUP_INFO_FMT % {
             "conninfo": info.format_broker_info(),
-            "queues": info.format_queues(queues, indent=8),
+            "queues": info.format_queues(self.queues, indent=8),
             "concurrency": self.concurrency,
             "loglevel": LOG_LEVELS[self.loglevel],
             "logfile": self.logfile or "[stderr]",
@@ -183,6 +184,7 @@ class Worker(object):
                                 schedule_filename=self.schedule,
                                 send_events=self.events,
                                 db=self.db,
+                                queues=self.queues,
                                 max_tasks_per_child=self.max_tasks_per_child,
                                 task_time_limit=self.task_time_limit,
                                 task_soft_time_limit=self.task_soft_time_limit)

+ 7 - 3
celery/conf.py

@@ -250,7 +250,7 @@ def prepare(m, source=settings, defaults=_DEFAULTS):
 
 prepare(sys.modules[__name__])
 
-def _init_queues(queues):
+def _init_queues(queues, default_exchange=None, default_exchange_type=None):
     """Convert configuration mapping to a table of queues digestible
     by a :class:`carrot.messaging.ConsumerSet`."""
 
@@ -264,5 +264,9 @@ def _init_queues(queues):
     return dict((queue, _defaults(opts)) for queue, opts in queues.items())
 
 
-def get_queues():
-    return _init_queues(QUEUES)
+def get_queues(): # TODO deprecate
+    return _init_queues(QUEUES, DEFAULT_EXCHANGE, DEFAULT_EXCHANGE_TYPE)
+
+
+def prepare_queues(queues, defaults):
+    return _init_queues(queues, defaults.DEFAULT_EXCHANGE,

+ 3 - 2
celery/execute/__init__.py

@@ -17,7 +17,7 @@ extract_exec_options = mattrgetter("queue", "routing_key", "exchange",
 @with_connection
 def apply_async(task, args=None, kwargs=None, countdown=None, eta=None,
         task_id=None, publisher=None, connection=None, connect_timeout=None,
-        router=None, expires=None, **options):
+        router=None, expires=None, queues=None, **options):
     """Run a task asynchronously by the celery daemon(s).
 
     :param task: The :class:`~celery.task.base.Task` to run.
@@ -84,7 +84,8 @@ def apply_async(task, args=None, kwargs=None, countdown=None, eta=None,
     replaced by a local :func:`apply` call instead.
 
     """
-    router = router or Router(conf.ROUTES, conf.get_queues(),
+    queues = conf.prepare_queues(queues or conf.QUEUES, conf)
+    router = router or Router(conf.ROUTES, queues,
                               conf.CREATE_MISSING_QUEUES)
 
     if conf.ALWAYS_EAGER:

+ 1 - 1
celery/messaging.py

@@ -291,7 +291,7 @@ def get_consumer_set(connection, queues=None, **options):
     Defaults to the queues in ``CELERY_QUEUES``.
 
     """
-    queues = queues or conf.get_queues()
+    queues = conf.prepare_queues(queues, conf)
     cset = ConsumerSet(connection)
     for queue_name, queue_options in queues.items():
         queue_options = dict(queue_options)

+ 1 - 1
celery/task/base.py

@@ -230,7 +230,7 @@ class Task(object):
 
     ignore_result = conf.IGNORE_RESULT
     store_errors_even_if_ignored = conf.STORE_ERRORS_EVEN_IF_IGNORED
-    send_error_emails = conf.CELERYD_SEND_TASK_ERROR_EMAILS
+    send_error_emails = conf.CELERY_SEND_TASK_ERROR_EMAILS
     error_whitelist = conf.CELERY_TASK_ERROR_WHITELIST
     disable_error_emails = False # FIXME
     max_retries = 3

+ 5 - 3
celery/tests/test_bin/test_celerybeat.py

@@ -4,7 +4,8 @@ import unittest2 as unittest
 
 from celery import beat
 from celery import platform
-from celery.bin import celerybeat as celerybeat
+from celery.bin import celerybeat as celerybeat_bin
+from celery.apps import beat as celerybeat
 
 
 class MockService(beat.Service):
@@ -94,7 +95,7 @@ class test_div(unittest.TestCase):
     def test_main(self):
         sys.argv = [sys.argv[0], "-s", "foo"]
         try:
-            celerybeat.main()
+            celerybeat_bin.main()
             self.assertTrue(MockBeat.running)
         finally:
             MockBeat.running = False
@@ -107,5 +108,6 @@ class test_div(unittest.TestCase):
             MockBeat.running = False
 
     def test_parse_options(self):
-        options = celerybeat.parse_options(["-s", "foo"])
+        cmd = celerybeat_bin.BeatCommand()
+        options, args = cmd.parse_options("celerybeat", ["-s", "foo"])
         self.assertEqual(options.schedule, "foo")

+ 3 - 6
celery/tests/test_worker.py

@@ -457,12 +457,9 @@ class test_WorkController(unittest.TestCase):
         self.worker.logger = MockLogger()
 
     def test_with_rate_limits_disabled(self):
-        conf.DISABLE_RATE_LIMITS = True
-        try:
-            worker = WorkController(concurrency=1, loglevel=0)
-            self.assertIsInstance(worker.ready_queue, FastQueue)
-        finally:
-            conf.DISABLE_RATE_LIMITS = False
+        worker = WorkController(concurrency=1, loglevel=0,
+                                disable_rate_limits=True)
+        self.assertIsInstance(worker.ready_queue, FastQueue)
 
     def test_attrs(self):
         worker = self.worker

+ 4 - 4
celery/tests/test_worker_job.py

@@ -128,14 +128,14 @@ class test_TaskRequest(unittest.TestCase):
         from celery import conf
         from celery.worker import job
         old_mail_admins = job.mail_admins
-        old_enable_mails = conf.CELERY_SEND_TASK_ERROR_EMAILS
+        old_enable_mails = mytask.send_error_emails
         mail_sent = [False]
 
         def mock_mail_admins(*args, **kwargs):
             mail_sent[0] = True
 
         job.mail_admins = mock_mail_admins
-        conf.CELERY_SEND_TASK_ERROR_EMAILS = True
+        mytask.send_error_emails = True
         try:
             tw = TaskRequest(mytask.name, gen_unique_id(), [1], {"f": "x"})
             try:
@@ -147,13 +147,13 @@ class test_TaskRequest(unittest.TestCase):
             self.assertTrue(mail_sent[0])
 
             mail_sent[0] = False
-            conf.CELERY_SEND_TASK_ERROR_EMAILS = False
+            mytask.send_error_emails = False
             tw.on_failure(einfo)
             self.assertFalse(mail_sent[0])
 
         finally:
             job.mail_admins = old_mail_admins
-            conf.CELERY_SEND_TASK_ERROR_EMAILS = old_enable_mails
+            mytask.send_error_emails = old_enable_mails
 
     def test_already_revoked(self):
         tw = TaskRequest(mytask.name, gen_unique_id(), [1], {"f": "x"})

+ 52 - 89
celery/worker/__init__.py

@@ -9,7 +9,6 @@ import traceback
 from multiprocessing.util import Finalize
 
 from celery import beat
-from celery import conf
 from celery import log
 from celery import registry
 from celery import platform
@@ -50,99 +49,61 @@ def process_initializer():
 
 
 class WorkController(object):
-    """Executes tasks waiting in the task queue.
-
-    :param concurrency: see :attr:`concurrency`.
-    :param logfile: see :attr:`logfile`.
-    :param loglevel: see :attr:`loglevel`.
-    :param embed_clockservice: see :attr:`embed_clockservice`.
-    :param send_events: see :attr:`send_events`.
-
-    .. attribute:: concurrency
-
-        The number of simultaneous processes doing work (default:
-        ``conf.CELERYD_CONCURRENCY``)
-
-    .. attribute:: loglevel
-
-        The loglevel used (default: :const:`logging.INFO`)
-
-    .. attribute:: logfile
-
-        The logfile used, if no logfile is specified it uses ``stderr``
-        (default: `celery.conf.CELERYD_LOG_FILE`).
-
-    .. attribute:: embed_clockservice
-
-        If ``True``, celerybeat is embedded, running in the main worker
-        process as a thread.
-
-    .. attribute:: send_events
-
-        Enable the sending of monitoring events, these events can be captured
-        by monitors (celerymon).
-
-    .. attribute:: logger
-
-        The :class:`logging.Logger` instance used for logging.
-
-    .. attribute:: pool
-
-        The :class:`multiprocessing.Pool` instance used.
-
-    .. attribute:: ready_queue
-
-        The :class:`Queue.Queue` that holds tasks ready for immediate
-        processing.
-
-    .. attribute:: schedule_controller
-
-        Instance of :class:`celery.worker.controllers.ScheduleController`.
-
-    .. attribute:: mediator
-
-        Instance of :class:`celery.worker.controllers.Mediator`.
-
-    .. attribute:: listener
-
-        Instance of :class:`CarrotListener`.
-
-    """
     loglevel = logging.ERROR
-    concurrency = conf.CELERYD_CONCURRENCY
-    logfile = conf.CELERYD_LOG_FILE
     _state = None
     _running = 0
 
     def __init__(self, concurrency=None, logfile=None, loglevel=None,
-            send_events=conf.SEND_EVENTS, hostname=None,
-            ready_callback=noop, embed_clockservice=False,
-            pool_cls=conf.CELERYD_POOL, listener_cls=conf.CELERYD_LISTENER,
-            mediator_cls=conf.CELERYD_MEDIATOR,
-            eta_scheduler_cls=conf.CELERYD_ETA_SCHEDULER,
-            schedule_filename=conf.CELERYBEAT_SCHEDULE_FILENAME,
-            task_time_limit=conf.CELERYD_TASK_TIME_LIMIT,
-            task_soft_time_limit=conf.CELERYD_TASK_SOFT_TIME_LIMIT,
-            max_tasks_per_child=conf.CELERYD_MAX_TASKS_PER_CHILD,
-            pool_putlocks=conf.CELERYD_POOL_PUTLOCKS,
-            db=conf.CELERYD_STATE_DB):
+            send_events=None, hostname=None, ready_callback=noop,
+            embed_clockservice=False, pool_cls=None, listener_cls=None,
+            mediator_cls=None, eta_scheduler_cls=None,
+            schedule_filename=None, task_time_limit=None,
+            task_soft_time_limit=None, max_tasks_per_child=None,
+            pool_putlocks=None, db=None, prefetch_multiplier=None,
+            eta_scheduler_precision=None, queues=None,
+            disable_rate_limits=None, defaults=None):
+
+        if defaults is None:
+            from celery import conf as defaults
+        self.defaults = defaults
 
         # Options
         self.loglevel = loglevel or self.loglevel
-        self.concurrency = concurrency or self.concurrency
-        self.logfile = logfile or self.logfile
+        self.concurrency = concurrency or defaults.CELERYD_CONCURRENCY
+        self.logfile = logfile or defaults.CELERYD_LOG_FILE
         self.logger = log.get_default_logger()
+        if send_events is None:
+            send_events = defaults.SEND_EVENTS
+        self.send_events = send_events
+        self.pool_cls = pool_cls or defaults.CELERYD_POOL
+        self.listener_cls = listener_cls or defaults.CELERYD_LISTENER
+        self.mediator_cls = mediator_cls or defaults.CELERYD_MEDIATOR
+        self.eta_scheduler_cls = eta_scheduler_cls or \
+                                    defaults.CELERYD_ETA_SCHEDULER
+        self.schedule_filename = schedule_filename or \
+                                    defaults.CELERYBEAT_SCHEDULE_FILENAME
         self.hostname = hostname or socket.gethostname()
         self.embed_clockservice = embed_clockservice
         self.ready_callback = ready_callback
-        self.send_events = send_events
-        self.task_time_limit = task_time_limit
-        self.task_soft_time_limit = task_soft_time_limit
-        self.max_tasks_per_child = max_tasks_per_child
-        self.pool_putlocks = pool_putlocks
+        self.task_time_limit = task_time_limit or \
+                                defaults.CELERYD_TASK_TIME_LIMIT
+        self.task_soft_time_limit = task_soft_time_limit or \
+                                defaults.CELERYD_TASK_SOFT_TIME_LIMIT
+        self.max_tasks_per_child = max_tasks_per_child or \
+                                defaults.CELERYD_MAX_TASKS_PER_CHILD
+        self.pool_putlocks = pool_putlocks or \
+                                defaults.CELERYD_POOL_PUTLOCKS
+        self.eta_scheduler_precision = eta_scheduler_precision or \
+                                defaults.CELERYD_ETA_SCHEDULER_PRECISION
+        self.prefetch_multiplier = prefetch_multiplier or \
+                                defaults.CELERYD_PREFETCH_MULTIPLIER
         self.timer_debug = log.SilenceRepeated(self.logger.debug,
                                                max_iterations=10)
-        self.db = db
+        self.db = db or defaults.CELERYD_STATE_DB
+        self.disable_rate_limits = disable_rate_limits or \
+                                defaults.DISABLE_RATE_LIMITS
+        self.queues = queues
+
         self._finalize = Finalize(self, self.stop, exitpriority=1)
 
         if self.db:
@@ -150,7 +111,7 @@ class WorkController(object):
             Finalize(persistence, persistence.save, exitpriority=5)
 
         # Queues
-        if conf.DISABLE_RATE_LIMITS:
+        if disable_rate_limits:
             self.ready_queue = FastQueue()
         else:
             self.ready_queue = TaskBucket(task_registry=registry.tasks)
@@ -158,28 +119,28 @@ class WorkController(object):
         self.logger.debug("Instantiating thread components...")
 
         # Threads + Pool + Consumer
-        self.pool = instantiate(pool_cls, self.concurrency,
+        self.pool = instantiate(self.pool_cls, self.concurrency,
                                 logger=self.logger,
                                 initializer=process_initializer,
                                 maxtasksperchild=self.max_tasks_per_child,
                                 timeout=self.task_time_limit,
                                 soft_timeout=self.task_soft_time_limit,
                                 putlocks=self.pool_putlocks)
-        self.mediator = instantiate(mediator_cls, self.ready_queue,
+        self.mediator = instantiate(self.mediator_cls, self.ready_queue,
                                     callback=self.process_task,
                                     logger=self.logger)
-        self.scheduler = instantiate(eta_scheduler_cls,
-                               precision=conf.CELERYD_ETA_SCHEDULER_PRECISION,
+        self.scheduler = instantiate(self.eta_scheduler_cls,
+                               precision=eta_scheduler_precision,
                                on_error=self.on_timer_error,
                                on_tick=self.on_timer_tick)
 
         self.beat = None
         if self.embed_clockservice:
             self.beat = beat.EmbeddedService(logger=self.logger,
-                                    schedule_filename=schedule_filename)
+                                    schedule_filename=self.schedule_filename)
 
-        prefetch_count = self.concurrency * conf.CELERYD_PREFETCH_MULTIPLIER
-        self.listener = instantiate(listener_cls,
+        prefetch_count = self.concurrency * self.prefetch_multiplier
+        self.listener = instantiate(self.listener_cls,
                                     self.ready_queue,
                                     self.scheduler,
                                     logger=self.logger,
@@ -187,7 +148,9 @@ class WorkController(object):
                                     send_events=self.send_events,
                                     init_callback=self.ready_callback,
                                     initial_prefetch_count=prefetch_count,
-                                    pool=self.pool)
+                                    pool=self.pool,
+                                    queues=self.queues,
+                                    defaults=self.defaults)
 
         # The order is important here;
         #   the first in the list is the first to start,

+ 3 - 4
celery/worker/control/builtins.py

@@ -1,10 +1,9 @@
 from datetime import datetime
 
-from celery import conf
 from celery import log
 from celery.backends import default_backend
 from celery.registry import tasks
-from celery.utils import timeutils
+from celery.utils import timeutils, LOG_LEVELS
 from celery.worker import state
 from celery.worker.state import revoked
 from celery.worker.control.registry import Panel
@@ -65,7 +64,7 @@ def disable_events(panel):
 def set_loglevel(panel, loglevel=None):
     if loglevel is not None:
         if not isinstance(loglevel, int):
-            loglevel = conf.LOG_LEVELS[loglevel.upper()]
+            loglevel = LOG_LEVELS[loglevel.upper()]
         log.get_default_logger(loglevel=loglevel)
     return {"ok": loglevel}
 
@@ -93,7 +92,7 @@ def rate_limit(panel, task_name, rate_limit, **kwargs):
             task_name, ))
         return {"error": "unknown task"}
 
-    if conf.DISABLE_RATE_LIMITS:
+    if not hasattr(panel.listener.ready_queue, "refresh"):
         panel.logger.error("Rate limit attempt, but rate limits disabled.")
         return {"error": "rate limits disabled"}
 

+ 1 - 1
celery/worker/job.py

@@ -441,7 +441,7 @@ class TaskRequest(object):
 
         task_obj = tasks.get(self.task_name, object)
         self.send_error_email(task_obj, context, exc_info.exception,
-                              enabled=task_obj.send_error_emails
+                              enabled=task_obj.send_error_emails,
                               whitelist=task_obj.error_whitelist)
 
     def send_error_email(self, task, context, exc,

+ 13 - 7
celery/worker/listener.py

@@ -80,7 +80,6 @@ import warnings
 
 from carrot.connection import AMQPConnectionException
 
-from celery import conf
 from celery.utils import noop, retry_over_time
 from celery.worker.job import TaskRequest, InvalidTaskError
 from celery.worker.control import ControlDispatch
@@ -200,7 +199,12 @@ class CarrotListener(object):
 
     def __init__(self, ready_queue, eta_schedule, logger,
             init_callback=noop, send_events=False, hostname=None,
-            initial_prefetch_count=2, pool=None):
+            initial_prefetch_count=2, pool=None, queues=None, defaults=None):
+
+        if defaults is None:
+            from celery import conf as defaults
+        self.defaults = defaults
+
         self.connection = None
         self.task_consumer = None
         self.ready_queue = ready_queue
@@ -216,6 +220,7 @@ class CarrotListener(object):
         self.control_dispatch = ControlDispatch(logger=logger,
                                                 hostname=self.hostname,
                                                 listener=self)
+        self.queues = queues
 
     def start(self):
         """Start the consumer.
@@ -376,7 +381,8 @@ class CarrotListener(object):
 
         self.connection = self._open_connection()
         self.logger.debug("CarrotListener: Connection Established.")
-        self.task_consumer = get_consumer_set(connection=self.connection)
+        self.task_consumer = get_consumer_set(connection=self.connection,
+                                              queues=self.queues)
         # QoS: Reset prefetch window.
         self.qos = QoS(self.task_consumer,
                        self.initial_prefetch_count, self.logger)
@@ -426,16 +432,16 @@ class CarrotListener(object):
 
         def _establish_connection():
             """Establish a connection to the broker."""
-            conn = establish_connection()
+            conn = establish_connection(defaults=self.defaults)
             conn.connect() # Connection is established lazily, so connect.
             return conn
 
-        if not conf.BROKER_CONNECTION_RETRY:
+        if not self.defaults.BROKER_CONNECTION_RETRY:
             return _establish_connection()
 
         conn = retry_over_time(_establish_connection, (socket.error, IOError),
-                               errback=_connection_error_handler,
-                               max_retries=conf.BROKER_CONNECTION_MAX_RETRIES)
+                    errback=_connection_error_handler,
+                    max_retries=self.defaults.BROKER_CONNECTION_MAX_RETRIES)
         return conn
 
     def stop(self):

+ 1 - 1
docs/includes/introduction.txt

@@ -1,4 +1,4 @@
-:Version: 2.1.0a2
+:Version: 2.1.0a5
 :Web: http://celeryproject.org/
 :Download: http://pypi.python.org/pypi/celery/
 :Source: http://github.com/ask/celery/