|
@@ -769,10 +769,31 @@ class Task(object):
|
|
|
from celery import xstarmap
|
|
|
return xstarmap(self.s(), it, app=self.app)
|
|
|
|
|
|
- def send_event(self, type_, **fields):
|
|
|
+ def send_event(self, type_, retry=True, retry_policy=None, **fields):
|
|
|
+ """Send monitoring event message.
|
|
|
+
|
|
|
+ This can be used to add custom event types in :pypi:`Flower`
|
|
|
+ and other monitors.
|
|
|
+
|
|
|
+ Arguments:
|
|
|
+ type_ (str): Type of event, e.g. ``"task-failed"``.
|
|
|
+
|
|
|
+ Keyword Arguments:
|
|
|
+ retry (bool): Retry sending the message
|
|
|
+ if the connection is lost. Default is taken from the
|
|
|
+ :setting:`task_publish_retry` setting.
|
|
|
+ retry_policy (Mapping): Retry settings. Default is taken
|
|
|
+ from the :setting:`task_publish_retry_policy` setting.
|
|
|
+ **fields (**Any): Map containing information about the event.
|
|
|
+ Must be JSON serializable.
|
|
|
+ """
|
|
|
req = self.request
|
|
|
+ if retry_policy is None:
|
|
|
+ retry_policy = self.app.conf.task_publish_retry_policy
|
|
|
with self.app.events.default_dispatcher(hostname=req.hostname) as d:
|
|
|
- return d.send(type_, uuid=req.id, **fields)
|
|
|
+ return d.send(
|
|
|
+ type_,
|
|
|
+ uuid=req.id, retry=retry, retry_policy=retry_policy, **fields)
|
|
|
|
|
|
def replace(self, sig):
|
|
|
"""Replace this task, with a new task inheriting the task id.
|