Quellcode durchsuchen

Merge branch 'master' of github.com:celery/celery

Ask Solem vor 8 Jahren
Ursprung
Commit
c88bfa498b
7 geänderte Dateien mit 71 neuen und 25 gelöschten Zeilen
  1. 1 0
      CONTRIBUTORS.txt
  2. 9 3
      celery/backends/base.py
  3. 13 13
      celery/beat.py
  4. 3 2
      celery/result.py
  5. 38 0
      docs/userguide/calling.rst
  6. 1 1
      docs/userguide/tasks.rst
  7. 6 6
      t/unit/app/test_beat.py

+ 1 - 0
CONTRIBUTORS.txt

@@ -218,3 +218,4 @@ Adriano Martins de Jesus, 2016/06/22
 Kevin Richardson, 2016/06/29
 Andrew Stewart, 2016/07/04
 Xin Li, 2016/08/03
+Alli Witheford, 2016/09/29

+ 9 - 3
celery/backends/base.py

@@ -26,7 +26,9 @@ from kombu.utils.url import maybe_sanitize_url
 from celery import states
 from celery import current_app, group, maybe_signature
 from celery.app import current_task
-from celery.exceptions import ChordError, TimeoutError, TaskRevokedError
+from celery.exceptions import (
+    ChordError, TimeoutError, TaskRevokedError, ImproperlyConfigured,
+)
 from celery.five import items
 from celery.result import (
     GroupResult, ResultBase, allow_join_result, result_from_tuple,
@@ -428,9 +430,13 @@ class SyncBackendMixin(object):
         )
 
     def wait_for_pending(self, result, timeout=None, interval=0.5,
-                         no_ack=True, on_interval=None, callback=None,
-                         propagate=True):
+                         no_ack=True, on_message=None, on_interval=None,
+                         callback=None, propagate=True):
         self._ensure_not_eager()
+        if on_message is not None:
+            raise ImproperlyConfigured(
+                'Backend does not support on_message callback')
+
         meta = self.wait_for(
             result.id, timeout=timeout,
             interval=interval,

+ 13 - 13
celery/beat.py

@@ -433,55 +433,55 @@ class PersistentScheduler(Scheduler):
 
         for _ in (1, 2):
             try:
-                self._store[b'entries']
+                self._store[str(b'entries')]
             except KeyError:
                 # new schedule db
                 try:
-                    self._store[b'entries'] = {}
+                    self._store[str(b'entries')] = {}
                 except KeyError as exc:
                     self._store = self._destroy_open_corrupted_schedule(exc)
                     continue
             else:
-                if b'__version__' not in self._store:
+                if str(b'__version__') not in self._store:
                     warning('DB Reset: Account for new __version__ field')
                     self._store.clear()   # remove schedule at 2.2.2 upgrade.
-                elif b'tz' not in self._store:
+                elif str(b'tz') not in self._store:
                     warning('DB Reset: Account for new tz field')
                     self._store.clear()   # remove schedule at 3.0.8 upgrade
-                elif b'utc_enabled' not in self._store:
+                elif str(b'utc_enabled') not in self._store:
                     warning('DB Reset: Account for new utc_enabled field')
                     self._store.clear()   # remove schedule at 3.0.9 upgrade
             break
 
         tz = self.app.conf.timezone
-        stored_tz = self._store.get(b'tz')
+        stored_tz = self._store.get(str(b'tz'))
         if stored_tz is not None and stored_tz != tz:
             warning('Reset: Timezone changed from %r to %r', stored_tz, tz)
             self._store.clear()   # Timezone changed, reset db!
         utc = self.app.conf.enable_utc
-        stored_utc = self._store.get(b'utc_enabled')
+        stored_utc = self._store.get(str(b'utc_enabled'))
         if stored_utc is not None and stored_utc != utc:
             choices = {True: 'enabled', False: 'disabled'}
             warning('Reset: UTC changed from %s to %s',
                     choices[stored_utc], choices[utc])
             self._store.clear()   # UTC setting changed, reset db!
-        entries = self._store.setdefault(b'entries', {})
+        entries = self._store.setdefault(str(b'entries'), {})
         self.merge_inplace(self.app.conf.beat_schedule)
         self.install_default_entries(self.schedule)
         self._store.update({
-            b'__version__': __version__,
-            b'tz': tz,
-            b'utc_enabled': utc,
+            str(b'__version__'): __version__,
+            str(b'tz'): tz,
+            str(b'utc_enabled'): utc,
         })
         self.sync()
         debug('Current schedule:\n' + '\n'.join(
             repr(entry) for entry in values(entries)))
 
     def get_schedule(self):
-        return self._store[b'entries']
+        return self._store[str(b'entries')]
 
     def set_schedule(self, schedule):
-        self._store[b'entries'] = schedule
+        self._store[str(b'entries')] = schedule
     schedule = property(get_schedule, set_schedule)
 
     def sync(self):

+ 3 - 2
celery/result.py

@@ -134,8 +134,8 @@ class AsyncResult(ResultBase):
                                 reply=wait, timeout=timeout)
 
     def get(self, timeout=None, propagate=True, interval=0.5,
-            no_ack=True, follow_parents=True, callback=None, on_interval=None,
-            EXCEPTION_STATES=states.EXCEPTION_STATES,
+            no_ack=True, follow_parents=True, callback=None, on_message=None,
+            on_interval=None, EXCEPTION_STATES=states.EXCEPTION_STATES,
             PROPAGATE_STATES=states.PROPAGATE_STATES):
         """Wait until task is ready, and return its result.
 
@@ -185,6 +185,7 @@ class AsyncResult(ResultBase):
             no_ack=no_ack,
             propagate=propagate,
             callback=callback,
+            on_message=on_message,
         )
     wait = get  # deprecated alias to :meth:`get`.
 

+ 38 - 0
docs/userguide/calling.rst

@@ -172,6 +172,44 @@ The callbacks/errbacks will then be called in order, and all
 callbacks will be called with the return value of the parent task
 as a partial argument.
 
+.. _calling-on-message:
+
+On message
+============================
+
+Celery supports catching all states changes by setting on_message callback.
+
+For example for long-running tasks to send task progress you can do something like this:
+
+.. code-block:: python
+
+    @app.task(bind=True)
+    def hello(self, a, b):
+        time.sleep(1)
+        self.update_state(state="PROGRESS", meta={'progress': 50})
+        time.sleep(1)
+        self.update_state(state="PROGRESS", meta={'progress': 90})
+        time.sleep(1)
+        return 'hello world: %i' % (a+b)
+
+.. code-block:: python
+
+    def on_raw_message(body):
+        print(body)
+
+    r = hello.apply_async()
+    print(r.get(on_message=on_raw_message, propagate=False))
+
+Will generate output like this:
+
+.. code-block:: 
+
+    {'task_id': '5660d3a3-92b8-40df-8ccc-33a5d1d680d7', 'result': {'progress': 50}, 'children': [], 'status': 'PROGRESS', 'traceback': None}
+    {'task_id': '5660d3a3-92b8-40df-8ccc-33a5d1d680d7', 'result': {'progress': 90}, 'children': [], 'status': 'PROGRESS', 'traceback': None}
+    {'task_id': '5660d3a3-92b8-40df-8ccc-33a5d1d680d7', 'result': 'hello world: 10', 'children': [], 'status': 'SUCCESS', 'traceback': None}
+    hello world: 10
+
+
 .. _calling-eta:
 
 ETA and Countdown

+ 1 - 1
docs/userguide/tasks.rst

@@ -1631,7 +1631,7 @@ re-fetch the article in the task body:
 
 .. code-block:: pycon
 
-    >>> expand_abbreviations(article_id)
+    >>> expand_abbreviations.delay(article_id)
 
 There might even be performance benefits to this approach, as sending large
 messages may be expensive.

+ 6 - 6
t/unit/app/test_beat.py

@@ -398,17 +398,17 @@ class test_PersistentScheduler:
         s.setup_schedule()
         s._remove_db.assert_called_with()
 
-        s._store = {b'__version__': 1}
+        s._store = {str(b'__version__'): 1}
         s.setup_schedule()
 
         s._store.clear = Mock()
         op = s.persistence.open = Mock()
         op.return_value = s._store
-        s._store[b'tz'] = 'FUNKY'
+        s._store[str(b'tz')] = 'FUNKY'
         s.setup_schedule()
         op.assert_called_with(s.schedule_filename, writeback=True)
         s._store.clear.assert_called_with()
-        s._store[b'utc_enabled'] = False
+        s._store[str(b'utc_enabled')] = False
         s._store.clear = Mock()
         s.setup_schedule()
         s._store.clear.assert_called_with()
@@ -417,10 +417,10 @@ class test_PersistentScheduler:
         s = create_persistent_scheduler()[0](
             schedule_filename='schedule', app=self.app,
         )
-        s._store = {b'entries': {}}
+        s._store = {str(b'entries'): {}}
         s.schedule = {'foo': 'bar'}
         assert s.schedule == {'foo': 'bar'}
-        assert s._store[b'entries'] == s.schedule
+        assert s._store[str(b'entries')] == s.schedule
 
 
 class test_Service:
@@ -439,7 +439,7 @@ class test_Service:
         assert isinstance(schedule, dict)
         assert isinstance(s.scheduler, beat.Scheduler)
         scheduled = list(schedule.keys())
-        for task_name in keys(sh[b'entries']):
+        for task_name in keys(sh[str(b'entries')]):
             assert task_name in scheduled
 
         s.sync()