Browse Source

Merge branch '3.0'

Conflicts:
	celery/app/base.py
	celery/app/builtins.py
	celery/task/sets.py
	celery/worker/__init__.py
	docs/userguide/monitoring.rst
	funtests/suite/test_leak.py
Ask Solem 12 years ago
parent
commit
8a2b06d7ff

+ 7 - 2
celery/app/builtins.py

@@ -144,6 +144,7 @@ def add_group_task(app):
             return result
 
         def prepare(self, options, tasks, args, **kwargs):
+            AsyncResult = self.AsyncResult
             options['group_id'] = group_id = \
                     options.setdefault('task_id', uuid())
 
@@ -155,9 +156,13 @@ def add_group_task(app):
                     tid = opts['task_id']
                 except KeyError:
                     tid = opts['task_id'] = uuid()
-                return task, self.AsyncResult(tid)
+                return task, AsyncResult(tid)
 
-            tasks, res = list(zip(*[prepare_member(task) for task in tasks]))
+            try:
+                tasks, res = list(zip(*[prepare_member(task)
+                                                for task in tasks]))
+            except ValueError:  # tasks empty
+                tasks, res = [], []
             return (tasks, self.app.GroupResult(group_id, res), group_id, args)
 
         def apply_async(self, partial_args=(), kwargs={}, **options):

+ 4 - 1
celery/worker/__init__.py

@@ -30,7 +30,7 @@ from celery import concurrency as _concurrency
 from celery import platforms
 from celery.app import app_or_default, set_default_app
 from celery.app.abstract import configurated, from_config
-from celery.exceptions import SystemTerminate
+from celery.exceptions import SystemTerminate, TaskRevokedError
 from celery.task import trace
 from celery.utils.functional import noop
 from celery.utils.imports import qualname, reload_from_cwd
@@ -369,6 +369,9 @@ class WorkController(configurated):
         """Process task by sending it to the pool of workers."""
         try:
             req.execute_using_pool(self.pool)
+        except TaskRevokedError:
+            if self.semaphore:  # (Issue #877)
+                self.semaphore.release()
         except Exception as exc:
             logger.critical('Internal error: %r\n%s',
                             exc, traceback.format_exc(), exc_info=True)

+ 7 - 3
celery/worker/job.py

@@ -23,6 +23,7 @@ from celery import exceptions
 from celery import signals
 from celery.app import app_or_default
 from celery.datastructures import ExceptionInfo
+from celery.exceptions import TaskRevokedError
 from celery.platforms import signals as _signals
 from celery.task.trace import (
     trace_task,
@@ -170,10 +171,13 @@ class Request(object):
 
         :param pool: A :class:`celery.concurrency.base.TaskPool` instance.
 
+        :raises celery.exceptions.TaskRevokedError: if the task was revoked
+            and ignored.
+
         """
         task = self.task
         if self.revoked():
-            return
+            raise TaskRevokedError(self.id)
 
         hostname = self.hostname
         kwargs = self.kwargs
@@ -228,8 +232,6 @@ class Request(object):
         """If expired, mark the task as revoked."""
         if self.expires and datetime.now(self.tzlocal) > self.expires:
             revoked_tasks.add(self.id)
-            if self.store_errors:
-                self.task.backend.mark_as_revoked(self.id)
             return True
 
     def terminate(self, pool, signal=None):
@@ -251,6 +253,8 @@ class Request(object):
         if self.id in revoked_tasks:
             warn('Skipping revoked task: %s[%s]', self.name, self.id)
             self.send_event('task-revoked', uuid=self.id)
+            if self.store_errors:
+                self.task.backend.mark_as_revoked(self.id)
             self.acknowledge()
             self._already_revoked = True
             send_revoked(self.task, terminated=False,

+ 0 - 3
celery/worker/mediator.py

@@ -66,9 +66,6 @@ class Mediator(bgThread):
         except Empty:
             return
 
-        if task.revoked():
-            return
-
         if self._does_debug:
             logger.debug('Mediator: Running callback for task: %s[%s]',
                          task.name, task.id)

+ 1 - 1
docs/userguide/monitoring.rst

@@ -182,7 +182,7 @@ Usage
 
 Install Celery Flower: ::
 
-    $ pip flower
+    $ pip install flower
 
 Launch Celery Flower and open http://localhost:8008 in browser: ::