Ask Solem 14 yıl önce
ebeveyn
işleme
fb7023dfe8

+ 2 - 2
celery/app/__init__.py

@@ -45,6 +45,7 @@ class App(base.BaseApp):
     """
     """
 
 
     def set_current(self):
     def set_current(self):
+        """Make this the current app for this thread."""
         _tls.current_app = self
         _tls.current_app = self
 
 
     def on_init(self):
     def on_init(self):
@@ -89,8 +90,7 @@ class App(base.BaseApp):
     def TaskSet(self, *args, **kwargs):
     def TaskSet(self, *args, **kwargs):
         """Create new :class:`~celery.task.sets.TaskSet`."""
         """Create new :class:`~celery.task.sets.TaskSet`."""
         from celery.task.sets import TaskSet
         from celery.task.sets import TaskSet
-        kwargs["app"] = self
-        return TaskSet(*args, **kwargs)
+        return TaskSet(*args, app=self, **kwargs)
 
 
     def worker_main(self, argv=None):
     def worker_main(self, argv=None):
         """Run :program:`celeryd` using `argv`.  Uses :data:`sys.argv`
         """Run :program:`celeryd` using `argv`.  Uses :data:`sys.argv`

+ 16 - 11
celery/app/base.py

@@ -129,7 +129,7 @@ class BaseApp(object):
     amqp_cls = "celery.app.amqp.AMQP"
     amqp_cls = "celery.app.amqp.AMQP"
     backend_cls = None
     backend_cls = None
     events_cls = "celery.events.Events"
     events_cls = "celery.events.Events"
-    loader_cls = "app"
+    loader_cls = "celery.loaders.app.AppLoader"
     log_cls = "celery.log.Logging"
     log_cls = "celery.log.Logging"
     control_cls = "celery.task.control.Control"
     control_cls = "celery.task.control.Control"
 
 
@@ -147,16 +147,17 @@ class BaseApp(object):
         self.control_cls = control or self.control_cls
         self.control_cls = control or self.control_cls
         self.set_as_current = set_as_current
         self.set_as_current = set_as_current
         self.accept_magic_kwargs = accept_magic_kwargs
         self.accept_magic_kwargs = accept_magic_kwargs
-        self.on_init()
         self.clock = LamportClock()
         self.clock = LamportClock()
 
 
+        self.on_init()
+
     def on_init(self):
     def on_init(self):
         """Called at the end of the constructor."""
         """Called at the end of the constructor."""
         pass
         pass
 
 
     def config_from_object(self, obj, silent=False):
     def config_from_object(self, obj, silent=False):
         """Read configuration from object, where object is either
         """Read configuration from object, where object is either
-        a real object, or the name of an object to import.
+        a object, or the name of a module to import.
 
 
             >>> celery.config_from_object("myapp.celeryconfig")
             >>> celery.config_from_object("myapp.celeryconfig")
 
 
@@ -171,7 +172,7 @@ class BaseApp(object):
         """Read configuration from environment variable.
         """Read configuration from environment variable.
 
 
         The value of the environment variable must be the name
         The value of the environment variable must be the name
-        of an object to import.
+        of a module to import.
 
 
             >>> os.environ["CELERY_CONFIG_MODULE"] = "myapp.celeryconfig"
             >>> os.environ["CELERY_CONFIG_MODULE"] = "myapp.celeryconfig"
             >>> celery.config_from_envvar("CELERY_CONFIG_MODULE")
             >>> celery.config_from_envvar("CELERY_CONFIG_MODULE")
@@ -227,8 +228,7 @@ class BaseApp(object):
     def AsyncResult(self, task_id, backend=None, task_name=None):
     def AsyncResult(self, task_id, backend=None, task_name=None):
         """Create :class:`celery.result.BaseAsyncResult` instance."""
         """Create :class:`celery.result.BaseAsyncResult` instance."""
         from celery.result import BaseAsyncResult
         from celery.result import BaseAsyncResult
-        return BaseAsyncResult(task_id, app=self,
-                               task_name=task_name,
+        return BaseAsyncResult(task_id, app=self, task_name=task_name,
                                backend=backend or self.backend)
                                backend=backend or self.backend)
 
 
     def TaskSetResult(self, taskset_id, results, **kwargs):
     def TaskSetResult(self, taskset_id, results, **kwargs):
@@ -272,7 +272,13 @@ class BaseApp(object):
     @contextmanager
     @contextmanager
     def default_connection(self, connection=None, connect_timeout=None):
     def default_connection(self, connection=None, connect_timeout=None):
         """For use within a with-statement to get a connection from the pool
         """For use within a with-statement to get a connection from the pool
-        if one is not already provided."""
+        if one is not already provided.
+
+        :keyword connection: If not provided, then a connection will be
+                             acquired from the connection pool.
+        :keyword connect_timeout: *No longer used.*
+
+        """
         if connection:
         if connection:
             yield connection
             yield connection
         else:
         else:
@@ -295,8 +301,7 @@ class BaseApp(object):
         @wraps(fun)
         @wraps(fun)
         def _inner(*args, **kwargs):
         def _inner(*args, **kwargs):
             connection = kwargs.pop("connection", None)
             connection = kwargs.pop("connection", None)
-            connect_timeout = kwargs.get("connect_timeout")
-            with self.default_connection(connection, connect_timeout) as c:
+            with self.default_connection(connection) as c:
                 return fun(*args, **dict(kwargs, connection=c))
                 return fun(*args, **dict(kwargs, connection=c))
         return _inner
         return _inner
 
 
@@ -365,8 +370,8 @@ class BaseApp(object):
                 register_after_fork(self, self._after_fork)
                 register_after_fork(self, self._after_fork)
             except ImportError:
             except ImportError:
                 pass
                 pass
-            self._pool = self.broker_connection().Pool(
-                            self.conf.BROKER_POOL_LIMIT)
+            limit = self.conf.BROKER_POOL_LIMIT
+            self._pool = self.broker_connection().Pool(limit)
         return self._pool
         return self._pool
 
 
     @cached_property
     @cached_property

+ 11 - 21
celery/app/task/__init__.py

@@ -48,7 +48,7 @@ class TaskType(type):
     Automatically registers the task in the task registry, except
     Automatically registers the task in the task registry, except
     if the `abstract` attribute is set.
     if the `abstract` attribute is set.
 
 
-    If no `name` attribute is provided, the name is automatically
+    If no `name` attribute is provided, then no name is automatically
     set to the name of the module it was defined in, and the class name.
     set to the name of the module it was defined in, and the class name.
 
 
     """
     """
@@ -240,15 +240,10 @@ class BaseTask(object):
     def get_logger(self, loglevel=None, logfile=None, propagate=False,
     def get_logger(self, loglevel=None, logfile=None, propagate=False,
             **kwargs):
             **kwargs):
         """Get task-aware logger object."""
         """Get task-aware logger object."""
-        if loglevel is None:
-            loglevel = self.request.loglevel
-        if logfile is None:
-            logfile = self.request.logfile
-        return self.app.log.setup_task_logger(loglevel=loglevel,
-                                              logfile=logfile,
-                                              propagate=propagate,
-                                              task_name=self.name,
-                                              task_id=self.request.id)
+        return self.app.log.setup_task_logger(
+            loglevel=self.request.loglevel if loglevel is None else loglevel,
+            logfile=self.request.logfile if logfile is None else logfile,
+            propagate=propagate, task_name=self.name, task_id=self.request.id)
 
 
     @classmethod
     @classmethod
     def establish_connection(self, connect_timeout=None):
     def establish_connection(self, connect_timeout=None):
@@ -278,8 +273,7 @@ class BaseTask(object):
                 ...     # ... do something with publisher
                 ...     # ... do something with publisher
 
 
         """
         """
-        if exchange is None:
-            exchange = self.exchange
+        exchange = self.exchange if exchange is None else exchange
         if exchange_type is None:
         if exchange_type is None:
             exchange_type = self.exchange_type
             exchange_type = self.exchange_type
         connection = connection or self.establish_connection(connect_timeout)
         connection = connection or self.establish_connection(connect_timeout)
@@ -341,7 +335,7 @@ class BaseTask(object):
 
 
         :keyword countdown: Number of seconds into the future that the
         :keyword countdown: Number of seconds into the future that the
                             task should execute. Defaults to immediate
                             task should execute. Defaults to immediate
-                            delivery (do not confuse with the
+                            execution (do not confuse with the
                             `immediate` flag, as they are unrelated).
                             `immediate` flag, as they are unrelated).
 
 
         :keyword eta: A :class:`~datetime.datetime` object describing
         :keyword eta: A :class:`~datetime.datetime` object describing
@@ -496,12 +490,9 @@ class BaseTask(object):
 
 
         """
         """
         request = self.request
         request = self.request
-        if max_retries is None:
-            max_retries = self.max_retries
-        if args is None:
-            args = request.args
-        if kwargs is None:
-            kwargs = request.kwargs
+        max_retries = self.max_retries if max_retries is None else max_retries
+        args = request.args if args is None else args
+        kwargs = request.kwargs if kwargs is None else kwargs
         delivery_info = request.delivery_info
         delivery_info = request.delivery_info
 
 
         if delivery_info:
         if delivery_info:
@@ -534,8 +525,7 @@ class BaseTask(object):
 
 
     @classmethod
     @classmethod
     def apply(self, args=None, kwargs=None, **options):
     def apply(self, args=None, kwargs=None, **options):
-        """Execute this task locally, by blocking until the task
-        returns.
+        """Execute this task locally, by blocking until the task returns.
 
 
         :param args: positional arguments passed on to the task.
         :param args: positional arguments passed on to the task.
         :param kwargs: keyword arguments passed on to the task.
         :param kwargs: keyword arguments passed on to the task.

+ 3 - 8
celery/loaders/__init__.py

@@ -15,18 +15,13 @@ def get_loader_cls(loader):
     return get_cls_by_name(loader, LOADER_ALIASES)
     return get_cls_by_name(loader, LOADER_ALIASES)
 
 
 
 
-def setup_loader():
-    # XXX Deprecate
+def setup_loader():     # XXX Deprecate
     return get_loader_cls(os.environ.setdefault("CELERY_LOADER", "default"))()
     return get_loader_cls(os.environ.setdefault("CELERY_LOADER", "default"))()
 
 
 
 
-def current_loader():
-    """Detect and return the current loader."""
-    # XXX Deprecate
+def current_loader():   # XXX Deprecate
     return current_app.loader
     return current_app.loader
 
 
 
 
-def load_settings():
-    """Load the global settings object."""
-    # XXX Deprecate
+def load_settings():    # XXX Deprecate
     return current_app.conf
     return current_app.conf

+ 2 - 2
celery/tests/test_concurrency/test_concurrency_processes.py

@@ -189,10 +189,10 @@ class test_TaskPool(unittest.TestCase):
         pool.start()
         pool.start()
         pool.apply_async(lambda x: x, (2, ), {})
         pool.apply_async(lambda x: x, (2, ), {})
 
 
-    def test_terminate_job(self, _kill):
+    def test_terminate_job(self):
 
 
         @patch("celery.concurrency.processes._kill")
         @patch("celery.concurrency.processes._kill")
-        def _do_test():
+        def _do_test(_kill):
             pool = TaskPool(10)
             pool = TaskPool(10)
             pool.terminate_job(1341)
             pool.terminate_job(1341)
             _kill.assert_called_with(1341, signal.SIGTERM)
             _kill.assert_called_with(1341, signal.SIGTERM)