Browse Source

possibility to set on_message callback for single task (#3477)

* possibility to set on_message callback for single task

* add missed arg for SyncBackendMixin

* exception import added

* PEP8  compatibility

* make flake8 much happier
Ilya 8 years ago
parent
commit
1a0939701d
2 changed files with 12 additions and 5 deletions
  1. 9 3
      celery/backends/base.py
  2. 3 2
      celery/result.py

+ 9 - 3
celery/backends/base.py

@@ -26,7 +26,9 @@ from kombu.utils.url import maybe_sanitize_url
 from celery import states
 from celery import current_app, group, maybe_signature
 from celery.app import current_task
-from celery.exceptions import ChordError, TimeoutError, TaskRevokedError
+from celery.exceptions import (
+    ChordError, TimeoutError, TaskRevokedError, ImproperlyConfigured,
+)
 from celery.five import items
 from celery.result import (
     GroupResult, ResultBase, allow_join_result, result_from_tuple,
@@ -428,9 +430,13 @@ class SyncBackendMixin(object):
         )
 
     def wait_for_pending(self, result, timeout=None, interval=0.5,
-                         no_ack=True, on_interval=None, callback=None,
-                         propagate=True):
+                         no_ack=True, on_message=None, on_interval=None,
+                         callback=None, propagate=True):
         self._ensure_not_eager()
+        if on_message is not None:
+            raise ImproperlyConfigured(
+                'Backend does not support on_message callback')
+
         meta = self.wait_for(
             result.id, timeout=timeout,
             interval=interval,

+ 3 - 2
celery/result.py

@@ -134,8 +134,8 @@ class AsyncResult(ResultBase):
                                 reply=wait, timeout=timeout)
 
     def get(self, timeout=None, propagate=True, interval=0.5,
-            no_ack=True, follow_parents=True, callback=None, on_interval=None,
-            EXCEPTION_STATES=states.EXCEPTION_STATES,
+            no_ack=True, follow_parents=True, callback=None, on_message=None,
+            on_interval=None, EXCEPTION_STATES=states.EXCEPTION_STATES,
             PROPAGATE_STATES=states.PROPAGATE_STATES):
         """Wait until task is ready, and return its result.
 
@@ -185,6 +185,7 @@ class AsyncResult(ResultBase):
             no_ack=no_ack,
             propagate=propagate,
             callback=callback,
+            on_message=on_message,
         )
     wait = get  # deprecated alias to :meth:`get`.