Browse Source

Merge branch '3.0'

Conflicts:
	celery/app/builtins.py
Ask Solem 12 years ago
parent
commit
bbbfd63c33

+ 17 - 1
celery/app/base.py

@@ -73,6 +73,9 @@ class Celery(object):
         self.registry_cls = symbol_by_name(self.registry_cls)
         self.accept_magic_kwargs = accept_magic_kwargs
 
+        self.configured = False
+        self._pending_defaults = deque()
+
         self.finalized = False
         self._finalize_mutex = Lock()
         self._pending = deque()
@@ -158,11 +161,18 @@ class Celery(object):
 
                 pending = self._pending
                 while pending:
-                    maybe_evaluate(pending.pop())
+                    maybe_evaluate(pending.popleft())
 
                 for task in self._tasks.itervalues():
                     task.bind(self)
 
+    def add_defaults(self, fun):
+        if not callable(fun):
+            d, fun = fun, lambda: d
+        if self.configured:
+            return self.conf.add_defaults(fun())
+        self._pending_defaults.append(fun)
+
     def config_from_object(self, obj, silent=False):
         del(self.conf)
         return self.loader.config_from_object(obj, silent=silent)
@@ -295,8 +305,14 @@ class Celery(object):
         return backend(app=self, url=url)
 
     def _get_config(self):
+        self.configured = True
         s = Settings({}, [self.prepare_config(self.loader.conf),
                              deepcopy(DEFAULTS)])
+
+        # load lazy config dict initializers.
+        pending = self._pending_defaults
+        while pending:
+            s.add_defaults(pending.popleft()())
         if self._preconf:
             for key, value in self._preconf.iteritems():
                 setattr(s, key, value)

+ 12 - 8
celery/app/builtins.py

@@ -70,11 +70,13 @@ def add_unlock_chord_task(app):
     from celery.canvas import subtask
     from celery import result as _res
 
-    @app.task(name='celery.chord_unlock', max_retries=None)
-    def unlock_chord(group_id, callback, interval=1, propagate=False,
-            max_retries=None, result=None):
-        AR = _res.AsyncResult
-        result = _res.GroupResult(group_id, [AR(r) for r in result])
+    @app.task(name='celery.chord_unlock', max_retries=None,
+              default_retry_delay=1)
+    def unlock_chord(group_id, callback, interval=None, propagate=False,
+            max_retries=None, result=None, Result=_res.AsyncResult):
+        if interval is None:
+            interval = unlock_chord.default_retry_delay
+        result = _res.GroupResult(group_id, [Result(r) for r in result])
         j = result.join_native if result.supports_native_join else result.join
         if result.ready():
             subtask(callback).delay(j(propagate=propagate))
@@ -308,15 +310,17 @@ def add_chord_task(app):
                 return self.apply(args, kwargs, **options)
             group_id = options.pop('group_id', None)
             chord = options.pop('chord', None)
-            header, body = (list(maybe_subtask(kwargs['header'])),
-                            maybe_subtask(kwargs['body']))
+            header = kwargs.pop('header')
+            body = kwargs.pop('body')
+            header, body = (list(maybe_subtask(header)),
+                            maybe_subtask(body))
             if group_id:
                 body.set(group_id=group_id)
             if chord:
                 body.set(chord=chord)
             callback_id = body.options.setdefault('task_id', task_id or uuid())
             parent = super(Chord, self).apply_async((header, body, args),
-                                                    **options)
+                                                     kwargs, **options)
             body_result = self.AsyncResult(callback_id)
             body_result.parent = parent
             return body_result

+ 3 - 1
celery/app/task.py

@@ -542,7 +542,9 @@ class Task(object):
         options.update({'retries': request.retries + 1,
                         'task_id': request.id,
                         'countdown': countdown,
-                        'eta': eta})
+                        'eta': eta,
+                        'link': request.callbacks,
+                        'link_error': request.errbacks})
 
         if max_retries is not None and options['retries'] > max_retries:
             if exc:

+ 16 - 11
celery/canvas.py

@@ -76,7 +76,7 @@ class Signature(dict):
     def from_dict(self, d):
         typ = d.get('subtask_type')
         if typ:
-            return self.TYPES[typ].from_dict(d)
+            return self.TYPES[typ].from_dict(kwdict(d))
         return Signature(d)
 
     def __init__(self, task=None, args=None, kwargs=None, options=None,
@@ -332,25 +332,30 @@ Signature.register_type(group)
 class chord(Signature):
     Chord = Chord
 
-    def __init__(self, header, body=None, **options):
-        Signature.__init__(self, 'celery.chord', (),
-                         {'header': _maybe_group(header),
-                          'body': maybe_subtask(body)}, **options)
+    def __init__(self, header, body=None, task='celery.chord',
+            args=(), kwargs={}, **options):
+        Signature.__init__(self, task, args, dict(kwargs,
+            header=_maybe_group(header), body=maybe_subtask(body)), **options)
         self.subtask_type = 'chord'
 
     @classmethod
     def from_dict(self, d):
-        kwargs = d['kwargs']
-        return chord(kwargs['header'], kwargs.get('body'),
-                     **kwdict(d['options']))
+        args, d['kwargs'] = self._unpack_args(**kwdict(d['kwargs']))
+        return self(*args, **kwdict(d))
 
-    def __call__(self, body=None, **options):
+    @staticmethod
+    def _unpack_args(header=None, body=None, **kwargs):
+        # Python signatures are better at extracting keys from dicts
+        # than manually popping things off.
+        return (header, body), kwargs
+
+    def __call__(self, body=None, **kwargs):
         _chord = self.Chord
         body = self.kwargs['body'] = body or self.kwargs['body']
         if _chord.app.conf.CELERY_ALWAYS_EAGER:
-            return self.apply((), {}, **options)
+            return self.apply((), kwargs)
         callback_id = body.options.setdefault('task_id', uuid())
-        _chord(**self.kwargs)
+        _chord(**dict(self.kwargs, **kwargs))
         return _chord.AsyncResult(callback_id)
 
     def clone(self, *args, **kwargs):

+ 4 - 0
celery/datastructures.py

@@ -304,6 +304,10 @@ class ConfigurationView(AttributeDictMixin):
         self.__dict__.update(changes=changes, defaults=defaults,
                              _order=[changes] + defaults)
 
+    def add_defaults(self, d):
+        self.defaults.insert(0, d)
+        self._order.insert(1, d)
+
     def __getitem__(self, key):
         for d in self._order:
             try:

+ 5 - 3
celery/worker/__init__.py

@@ -320,9 +320,7 @@ class WorkController(configurated):
         self._finalize = Finalize(self, self.stop, exitpriority=1)
         self.pidfile = pidfile
         self.pidlock = None
-        self.use_eventloop = (detect_environment() == 'default' and
-                              self.app.connection().is_evented and
-                              not self.app.IS_WINDOWS)
+        self.use_eventloop = self.should_use_eventloop()
 
         # Update celery_include to have all known task modules, so that we
         # ensure all task modules are imported in case an execv happens.
@@ -388,6 +386,10 @@ class WorkController(configurated):
         except AttributeError:
             pass
 
+    def should_use_eventloop(self):
+        return (detect_environment() == 'default' and
+                self.app.connection().is_evented and not self.app.IS_WINDOWS)
+
     def stop(self, in_sighandler=False):
         """Graceful shutdown of the worker server."""
         self.signal_consumer_close()

+ 0 - 1
celery/worker/job.py

@@ -60,7 +60,6 @@ class Request(object):
     """A request for task execution."""
     __slots__ = ('app', 'name', 'id', 'args', 'kwargs',
                  'on_ack', 'delivery_info', 'hostname',
-                 'callbacks', 'errbacks',
                  'eventer', 'connection_errors',
                  'task', 'eta', 'expires',
                  'request_dict', 'acknowledged', 'success_msg',

+ 4 - 1
docs/getting-started/brokers/redis.rst

@@ -32,7 +32,10 @@ your Redis database::
 
 Where the URL is in the format of::
 
-    redis://userid:password@hostname:port/db_number
+    redis://:password@hostname:port/db_number
+
+all fields after the scheme are optional, and will default to localhost on port 6379,
+using database 0.
 
 .. _redis-results-configuration:
 

+ 17 - 0
docs/reference/celery.rst

@@ -114,6 +114,23 @@ Application
             >>> os.environ["CELERY_CONFIG_MODULE"] = "myapp.celeryconfig"
             >>> celery.config_from_envvar("CELERY_CONFIG_MODULE")
 
+    .. method:: Celery.add_defaults(d)
+
+        Add default configuration from dict ``d``.
+
+        If the argument is a callable function then it will be regarded
+        as a promise, and it won't be loaded until the configuration is
+        actually needed.
+
+        This method can be compared to::
+
+            >>> celery.conf.update(d)
+
+        with a difference that 1) no copy will be made and 2) the dict will
+        not be transferred when the worker spawns child processes, so
+        it's important that the same configuration happens at import time
+        when pickle restores the object on the other side.
+
     .. method:: Celery.start(argv=None)
 
         Run :program:`celery` using `argv`.

+ 2 - 1
docs/userguide/routing.rst

@@ -542,7 +542,8 @@ Broadcast
 ---------
 
 Celery can also support broadcast routing.
-Here is an example exchange ``bcast`` that uses this:
+Here is an example exchange ``broadcast_tasks`` that delivers
+copies of tasks to all workers connected to it:
 
 .. code-block:: python
 

+ 1 - 1
extra/generic-init.d/celerybeat

@@ -6,7 +6,7 @@
 # :Usage: /etc/init.d/celerybeat {start|stop|force-reload|restart|try-restart|status}
 # :Configuration file: /etc/default/celerybeat or /etc/default/celeryd
 #
-# See http://docs.celeryq.org/en/latest/cookbook/daemonizing.html#init-script-celerybeat
+# See http://docs.celeryproject.org/en/latest/tutorials/daemonizing.html#generic-init-scripts 
 
 ### BEGIN INIT INFO
 # Provides:          celerybeat

+ 1 - 1
extra/generic-init.d/celeryd

@@ -6,7 +6,7 @@
 # :Usage: /etc/init.d/celeryd {start|stop|force-reload|restart|try-restart|status}
 # :Configuration file: /etc/default/celeryd
 #
-# See http://docs.celeryq.org/en/latest/cookbook/daemonizing.html#init-script-celeryd
+# See http://docs.celeryproject.org/en/latest/tutorials/daemonizing.html#generic-init-scripts 
 
 
 ### BEGIN INIT INFO