Browse Source

Merge branch 'master' into gossip2

Ask Solem 12 years ago
parent
commit
18137641d6
6 changed files with 108 additions and 25 deletions
  1. 37 0
      Changelog
  2. 0 1
      celery/__init__.py
  3. 20 3
      celery/app/amqp.py
  4. 1 0
      celery/app/defaults.py
  5. 24 21
      celery/app/task.py
  6. 26 0
      docs/configuration.rst

+ 37 - 0
Changelog

@@ -20,6 +20,43 @@ If you're looking for versions prior to 3.0.x you should go to :ref:`history`.
 - `Task.apply_async` now supports timeout and soft_timeout arguments (Issue #802)
 - `App.control.Inspect.conf` can be used for inspecting worker configuration
 
+.. _version-3.0.12:
+
+3.0.12
+======
+:release-date: TBA
+
+- Event state's ``tasks_by_name`` applied limit before filtering by name.
+
+    Fix contributed by Alexander A. Sosnovskiy.
+
+- Results get_many method did not respect timeout argument.
+
+    Fix contributed by Remigiusz Modrzejewski
+
+- generic_init.d scripts now support setting :envvar:`CELERY_CREATE_DIRS` to
+  always create log and pid directories (Issue #1045).
+
+    This can be set in your :file:`/etc/default/celeryd`.
+
+- Fixed strange kombu import problem on Python 3.2 (Issue #1034).
+
+- Workers ETA scheduler now uses millisecond precision (Issue #1040).
+
+- The :param:`--config` argument to programs is now supported by all loaders.
+
+- The :setting:`CASSANDRA_OPTIONS` setting has now been documented.
+
+    Contributed by Jared Biel.
+
+- Task methods (:mod:`celery.contrib.methods`) can not be used with the old
+  task base class.
+
+- An optimization was too eager and caused some logging messages to never emit.
+
+- :mod:`celery.contrib.batches` now works again.
+>>>>>>> 3.0
+
 .. _version-3.0.11:
 
 3.0.11

+ 0 - 1
celery/__init__.py

@@ -38,7 +38,6 @@ if os.environ.get('C_IMPDEBUG'):
         return real_import(name, locals, globals, fromlist, level)
     __builtin__.__import__ = debug_import
 
-
 STATICA_HACK = True
 globals()['kcah_acitats'[::-1].upper()] = False
 if STATICA_HACK:

+ 20 - 3
celery/app/amqp.py

@@ -38,6 +38,7 @@ class Queues(dict):
                              added automatically, but if disabled
                              the occurrence of unknown queues
                              in `wanted` will raise :exc:`KeyError`.
+    :keyword ha_policy: Default HA policy for queues with none set.
 
 
     """
@@ -46,11 +47,12 @@ class Queues(dict):
     _consume_from = None
 
     def __init__(self, queues=None, default_exchange=None,
-            create_missing=True):
+            create_missing=True, ha_policy=None):
         dict.__init__(self)
         self.aliases = WeakValueDictionary()
         self.default_exchange = default_exchange
         self.create_missing = create_missing
+        self.ha_policy = ha_policy
         if isinstance(queues, (tuple, list)):
             queues = dict((q.name, q) for q in queues)
         for name, q in (queues or {}).iteritems():
@@ -87,6 +89,10 @@ class Queues(dict):
         """
         if not isinstance(queue, Queue):
             return self.add_compat(queue, **kwargs)
+        if self.ha_policy:
+            if queue.queue_arguments is None:
+                queue.queue_arguments = {}
+            self._set_ha_policy(queue.queue_arguments)
         self[queue.name] = queue
         return queue
 
@@ -95,9 +101,18 @@ class Queues(dict):
         options.setdefault('routing_key', options.get('binding_key'))
         if options['routing_key'] is None:
             options['routing_key'] = name
+        if self.ha_policy is not None:
+            self._set_ha_policy(options.setdefault('queue_arguments', {}))
         q = self[name] = entry_to_queue(name, **options)
         return q
 
+    def _set_ha_policy(self, args):
+        policy = self.ha_policy
+        if isinstance(policy, (list, tuple)):
+            return args.update({'x-ha-policy': 'nodes',
+                                'x-ha-policy-params': list(policy)})
+        args['x-ha-policy'] = policy
+
     def format(self, indent=0, indent_first=True):
         """Format routing table into string for log dumps."""
         active = self.consume_from
@@ -276,17 +291,19 @@ class AMQP(object):
     def flush_routes(self):
         self._rtable = _routes.prepare(self.app.conf.CELERY_ROUTES)
 
-    def Queues(self, queues, create_missing=None):
+    def Queues(self, queues, create_missing=None, ha_policy=None):
         """Create new :class:`Queues` instance, using queue defaults
         from the current configuration."""
         conf = self.app.conf
         if create_missing is None:
             create_missing = conf.CELERY_CREATE_MISSING_QUEUES
+        if ha_policy is None:
+            ha_policy = conf.CELERY_QUEUE_HA_POLICY
         if not queues and conf.CELERY_DEFAULT_QUEUE:
             queues = (Queue(conf.CELERY_DEFAULT_QUEUE,
                             exchange=self.default_exchange,
                             routing_key=conf.CELERY_DEFAULT_ROUTING_KEY), )
-        return Queues(queues, self.default_exchange, create_missing)
+        return Queues(queues, self.default_exchange, create_missing, ha_policy)
 
     def Router(self, queues=None, create_missing=None):
         """Returns the current task router."""

+ 1 - 0
celery/app/defaults.py

@@ -144,6 +144,7 @@ NAMESPACES = {
         'REDIRECT_STDOUTS': Option(True, type='bool'),
         'REDIRECT_STDOUTS_LEVEL': Option('WARNING'),
         'QUEUES': Option(type='dict'),
+        'QUEUE_HA_POLICY': Option(None, type='string'),
         'SECURITY_KEY': Option(type='string'),
         'SECURITY_CERTIFICATE': Option(type='string'),
         'SECURITY_CERT_STORE': Option(type='string'),

+ 24 - 21
celery/app/task.py

@@ -488,6 +488,23 @@ class Task(object):
                 parent.request.children.append(result)
         return result
 
+    def subtask_from_request(self, request=None, args=None, kwargs=None,
+            **extra_options):
+
+        request = self.request if request is None else request
+        args = request.args if args is None else args
+        kwargs = request.kwargs if kwargs is None else kwargs
+        delivery_info = request.delivery_info or {}
+        options = {
+            'task_id': request.id,
+            'link': request.callbacks,
+            'link_error': request.errbacks,
+            'exchange': delivery_info.get('exchange'),
+            'routing_key': delivery_info.get('routing_key'),
+            'expires': delivery_info.get('expires'),
+        }
+        return self.subtask(args, kwargs, options, **extra_options)
+
     def retry(self, args=None, kwargs=None, exc=None, throw=True,
             eta=None, countdown=None, max_retries=None, **options):
         """Retry the task.
@@ -536,45 +553,31 @@ class Task(object):
 
         """
         request = self.request
+        retries = request.retries + 1
         max_retries = self.max_retries if max_retries is None else max_retries
-        args = request.args if args is None else args
-        kwargs = request.kwargs if kwargs is None else kwargs
-        delivery_info = request.delivery_info
 
         # Not in worker or emulated by (apply/always_eager),
         # so just raise the original exception.
         if request.called_directly:
-            maybe_reraise()
+            maybe_reraise()  # raise orig stack if PyErr_Occurred
             raise exc or RetryTaskError('Task can be retried', None)
 
-        if delivery_info:
-            options.setdefault('exchange', delivery_info.get('exchange'))
-            options.setdefault('routing_key', delivery_info.get('routing_key'))
-        options.setdefault('expires', request.expires)
-
         if not eta and countdown is None:
             countdown = self.default_retry_delay
 
-        options.update({'retries': request.retries + 1,
-                        'task_id': request.id,
-                        'countdown': countdown,
-                        'eta': eta,
-                        'link': request.callbacks,
-                        'link_error': request.errbacks})
+        S = self.subtask_from_request(request, args, kwargs,
+            countdown=countdown, eta=eta, retries=retries)
 
-        if max_retries is not None and options['retries'] > max_retries:
+        if max_retries is not None and retries > max_retries:
             if exc:
                 maybe_reraise()
             raise self.MaxRetriesExceededError(
                     "Can't retry {0}[{1}] args:{2} kwargs:{3}".format(
-                        self.name, options['task_id'], args, kwargs))
+                        self.name, request.id, S.args, S.kwargs))
 
         # If task was executed eagerly using apply(),
         # then the retry must also be executed eagerly.
-        if request.is_eager:
-            self.apply(args=args, kwargs=kwargs, **options).get()
-        else:
-            self.apply_async(args=args, kwargs=kwargs, **options)
+        S.apply() if request.is_eager else S.apply_async()
         ret = RetryTaskError(exc=exc, when=eta or countdown)
         if throw:
             raise ret

+ 26 - 0
docs/configuration.rst

@@ -609,6 +609,32 @@ A list of routers, or a single router used to route tasks to queues.
 When deciding the final destination of a task the routers are consulted
 in order.  See :ref:`routers` for more information.
 
+.. setting:: CELERY_QUEUE_HA_POLICY
+
+CELERY_QUEUE_HA_POLICY
+~~~~~~~~~~~~~~~~~~~~~~
+:brokers: RabbitMQ
+
+This will set the default HA policy for a queue, and the value
+can either be a string (usually ``all``):
+
+.. code-block:: python
+
+    CELERY_QUEUE_HA_POLICY = 'all'
+
+Using 'all' will replicate the queue to all current nodes,
+Or you can give it a list of nodes to replicate to:
+
+.. code-block:: python
+
+    CELERY_QUEUE_HA_POLICY = ['rabbit@host1', 'rabbit@host2']
+
+
+Using a list will implicitly set ``x-ha-policy`` to 'nodes' and
+``x-ha-policy-params`` to the given list of nodes.
+
+See http://www.rabbitmq.com/ha.html for more information.
+
 .. setting:: CELERY_WORKER_DIRECT
 
 CELERY_WORKER_DIRECT