Browse Source

Merge branch '3.0'

Conflicts:
	celery/apps/worker.py
Ask Solem 12 years ago
parent
commit
f2017db47a

+ 7 - 3
celery/apps/worker.py

@@ -247,15 +247,19 @@ install_worker_int_handler = partial(
 )
 
 
+def _clone_current_worker():
+    if os.fork() == 0:
+        os.execv(sys.executable, [sys.executable] + sys.argv)
+
+
 def install_worker_restart_handler(worker, sig='SIGHUP'):
 
     def restart_worker_sig_handler(signum, frame):
         """Signal handler restarting the current python program."""
         set_in_sighandler(True)
         safe_say('Restarting celeryd ({0})'.format(' '.join(sys.argv)))
-        pid = os.fork()
-        if pid == 0:
-            os.execv(sys.executable, [sys.executable] + sys.argv)
+        import atexit
+        atexit.register(_clone_current_worker)
         from celery.worker import state
         state.should_stop = True
     platforms.signals[sig] = restart_worker_sig_handler

+ 5 - 1
celery/result.py

@@ -148,7 +148,7 @@ class AsyncResult(ResultBase):
             [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
 
         """
-        for _, R in self.iterdeps():
+        for _, R in self.iterdeps(intermediate=intermediate):
             yield R, R.get(**kwargs)
 
     def get_leaf(self):
@@ -612,6 +612,10 @@ class GroupResult(ResultSet):
     def serializable(self):
         return self.id, [r.serializable() for r in self.results]
 
+    @property
+    def children(self):
+        return self.results
+
     @classmethod
     def restore(self, id, backend=None):
         """Restore previously saved group result."""

+ 3 - 3
celery/task/base.py

@@ -21,8 +21,8 @@ from celery.utils.log import get_task_logger
 
 #: list of methods that must be classmethods in the old API.
 _COMPAT_CLASSMETHODS = (
-    'delay', 'apply_async', 'retry', 'apply',
-    'AsyncResult', 'subtask', 'push_request', 'pop_request')
+    'delay', 'apply_async', 'retry', 'apply', 'AsyncResult', 'subtask',
+)
 
 
 class Task(BaseTask):
@@ -46,7 +46,7 @@ class Task(BaseTask):
     priority = None
     type = 'regular'
     disable_error_emails = False
-    accept_magic_kwargs = None  # get default from app
+    accept_magic_kwargs = False
 
     from_config = BaseTask.from_config + (
         ('exchange_type', 'CELERY_DEFAULT_EXCHANGE_TYPE'),

+ 2 - 1
celery/tests/app/test_app.py

@@ -163,7 +163,8 @@ class test_App(Case):
 
         app = Celery(set_as_current=False, accept_magic_kwargs=True)
         timkX.bind(app)
-        self.assertTrue(timkX.accept_magic_kwargs)
+        # see #918
+        self.assertFalse(timkX.accept_magic_kwargs)
 
         from celery import Task as NewTask
 

+ 6 - 2
celery/tests/bin/test_celeryd.py

@@ -602,8 +602,9 @@ class test_signal_handlers(AppCase):
             state.should_stop = False
 
     @disable_stdouts
+    @patch('atexit.register')
     @patch('os.fork')
-    def test_worker_restart_handler(self, fork):
+    def test_worker_restart_handler(self, fork, register):
         fork.return_value = 0
         if getattr(os, 'execv', None) is None:
             raise SkipTest('platform does not have excv')
@@ -618,10 +619,13 @@ class test_signal_handlers(AppCase):
             handlers = self.psig(cd.install_worker_restart_handler, worker)
             handlers['SIGHUP']('SIGHUP', object())
             self.assertTrue(state.should_stop)
+            self.assertTrue(register.called)
+            callback = register.call_args[0][0]
+            callback()
             self.assertTrue(argv)
             argv[:] = []
             fork.return_value = 1
-            handlers['SIGHUP']('SIGHUP', object())
+            callback()
             self.assertFalse(argv)
         finally:
             os.execv = execv

+ 3 - 0
celery/worker/consumer.py

@@ -390,6 +390,7 @@ class Consumer(object):
             hb = self.amqheartbeat
             hbtick = connection.heartbeat_check
             on_poll_start = connection.transport.on_poll_start
+            on_poll_empty = connection.transport.on_poll_empty
             strategies = self.strategies
             drain_nowait = connection.drain_nowait
             on_task_callbacks = hub.on_task
@@ -444,6 +445,8 @@ class Consumer(object):
                             events = poll(poll_timeout)
                         except ValueError:  # Issue 882
                             return
+                        if not events:
+                            on_poll_empty()
                         for fileno, event in events or ():
                             try:
                                 if event & READ:

+ 0 - 9
docs/getting-started/brokers/mongodb.rst

@@ -47,12 +47,3 @@ Results
 
 If you also want to store the state and return values of tasks in MongoDB,
 you should see :ref:`conf-mongodb-result-backend`.
-
-.. _broker-mongodb-limitations:
-
-Limitations
-===========
-
-The mongodb message transport currently does not support:
-
-    * Remote control commands (celeryctl, broadcast)

+ 1 - 1
docs/getting-started/first-steps-with-celery.rst

@@ -255,7 +255,7 @@ since it turns the asynchronous call into a synchronous one::
     4
 
 In case the task raised an exception, :meth:`~@AsyncResult.get` will
-re-raise the exception, but you can override this by specyfing
+re-raise the exception, but you can override this by specifying
 the ``propagate`` argument::
 
     >>> result.get(propagate=True)

+ 7 - 7
docs/userguide/canvas.rst

@@ -199,9 +199,9 @@ The Primitives
 
     - ``chord``
 
-        A chord is just like a group but with a callback.  A group consists
+        A chord is just like a group but with a callback.  A chord consists
         of a header group and a body,  where the body is a task that should execute
-        after all of the tasks in the header is complete.
+        after all of the tasks in the header are complete.
 
     - ``map``
 
@@ -314,7 +314,7 @@ Here's some examples:
 - Simple chord
 
     The chord primitive enables us to add callback to be called when
-    all of the tasks in a group has finished executing, which is often
+    all of the tasks in a group have finished executing, which is often
     required for algorithms that aren't embarrassingly parallel::
 
         >>> from celery import chord
@@ -323,7 +323,7 @@ Here's some examples:
         90
 
     The above example creates 10 task that all start in parallel,
-    and when all of them is complete the return values is combined
+    and when all of them are complete the return values are combined
     into a list and sent to the ``xsum`` task.
 
     The body of a chord can also be immutable, so that the return value
@@ -586,7 +586,7 @@ Group also supports iterators::
 
     >>> group(add.s(i, i) for i in xrange(100))()
 
-A group is a subclass instance, so it can be used in combination
+A group is a subtask instance, so it can be used in combination
 with other subtasks.
 
 Group Results
@@ -667,7 +667,7 @@ Chords
 
 .. versionadded:: 2.3
 
-A chord is a task that only executes after all of the tasks in a taskset has
+A chord is a task that only executes after all of the tasks in a taskset have
 finished executing.
 
 
@@ -717,7 +717,7 @@ Let's break the chord expression down::
     9900
 
 Remember, the callback can only be executed after all of the tasks in the
-header has returned.  Each step in the header is executed as a task, in
+header have returned.  Each step in the header is executed as a task, in
 parallel, possibly on different nodes.  The callback is then applied with
 the return value of each task in the header.  The task id returned by
 :meth:`chord` is the id of the callback, so you can wait for it to complete

+ 24 - 4
docs/userguide/tasks.rst

@@ -537,8 +537,8 @@ Result Backends
 ---------------
 
 Celery needs to store or send the states somewhere.  There are several
-built-in backends to choose from: SQLAlchemy/Django ORM, Memcached, Redis,
-RabbitMQ (amqp), MongoDB, Tokyo Tyrant and Redis -- or you can define your own.
+built-in backends to choose from: SQLAlchemy/Django ORM, Memcached,
+RabbitMQ (amqp), MongoDB, and Redis -- or you can define your own.
 
 No backend works well for every use case.
 You should read about the strengths and weaknesses of each backend, and choose
@@ -811,17 +811,37 @@ If you have a task,
 And you route every request to the same process, then it
 will keep state between requests.
 
-This can also be useful to keep cached resources::
+This can also be useful to cache resources,
+e.g. a base Task class that caches a database connection:
+
+.. code-block:: python
+
+    from celery import Task
 
     class DatabaseTask(Task):
+        abstract = True
         _db = None
 
         @property
         def db(self):
-            if self._db = None:
+            if self._db is None:
                 self._db = Database.connect()
             return self._db
 
+
+that can be added to tasks like this:
+
+.. code-block:: python
+
+
+    @celery.task(base=DatabaseTask)
+    def process_rows():
+        for row in self.db.table.all():
+            ...
+
+The ``db`` attribute of the ``process_rows`` task will then
+always stay the same in each process.
+
 Abstract classes
 ----------------