|
@@ -1,17 +1,11 @@
|
|
-"""
|
|
|
|
-
|
|
|
|
-Jobs Executable by the Worker Server.
|
|
|
|
-
|
|
|
|
-"""
|
|
|
|
import sys
|
|
import sys
|
|
import time
|
|
import time
|
|
import socket
|
|
import socket
|
|
import warnings
|
|
import warnings
|
|
|
|
|
|
-
|
|
|
|
from celery import conf
|
|
from celery import conf
|
|
from celery import platform
|
|
from celery import platform
|
|
-from celery.log import get_default_logger
|
|
|
|
|
|
+from celery import log
|
|
from celery.utils import noop, kwdict, fun_takes_kwargs
|
|
from celery.utils import noop, kwdict, fun_takes_kwargs
|
|
from celery.utils.mail import mail_admins
|
|
from celery.utils.mail import mail_admins
|
|
from celery.worker.revoke import revoked
|
|
from celery.worker.revoke import revoked
|
|
@@ -23,7 +17,7 @@ from celery.datastructures import ExceptionInfo
|
|
# pep8.py borks on a inline signature separator and
|
|
# pep8.py borks on a inline signature separator and
|
|
# says "trailing whitespace" ;)
|
|
# says "trailing whitespace" ;)
|
|
EMAIL_SIGNATURE_SEP = "-- "
|
|
EMAIL_SIGNATURE_SEP = "-- "
|
|
-TASK_FAIL_EMAIL_BODY = """
|
|
|
|
|
|
+TASK_ERROR_EMAIL_BODY = """
|
|
Task %%(name)s with id %%(id)s raised exception: %%(exc)s
|
|
Task %%(name)s with id %%(id)s raised exception: %%(exc)s
|
|
|
|
|
|
|
|
|
|
@@ -128,6 +122,13 @@ class WorkerTaskTrace(TaskTrace):
|
|
|
|
|
|
|
|
|
|
def execute_and_trace(task_name, *args, **kwargs):
|
|
def execute_and_trace(task_name, *args, **kwargs):
|
|
|
|
+ """This is a pickleable method used as a target when applying to pools.
|
|
|
|
+
|
|
|
|
+ It's the same as::
|
|
|
|
+
|
|
|
|
+ >>> WorkerTaskTrace(task_name, *args, **kwargs).execute_safe()
|
|
|
|
+
|
|
|
|
+ """
|
|
platform.set_mp_process_title("celeryd", info=task_name)
|
|
platform.set_mp_process_title("celeryd", info=task_name)
|
|
try:
|
|
try:
|
|
return WorkerTaskTrace(task_name, *args, **kwargs).execute_safe()
|
|
return WorkerTaskTrace(task_name, *args, **kwargs).execute_safe()
|
|
@@ -182,20 +183,28 @@ class TaskRequest(object):
|
|
Set to ``True`` if the task has been acknowledged.
|
|
Set to ``True`` if the task has been acknowledged.
|
|
|
|
|
|
"""
|
|
"""
|
|
|
|
+ # Logging output
|
|
success_msg = "Task %(name)s[%(id)s] processed: %(return_value)s"
|
|
success_msg = "Task %(name)s[%(id)s] processed: %(return_value)s"
|
|
- fail_msg = """
|
|
|
|
|
|
+ error_msg = """
|
|
Task %(name)s[%(id)s] raised exception: %(exc)s\n%(traceback)s
|
|
Task %(name)s[%(id)s] raised exception: %(exc)s\n%(traceback)s
|
|
"""
|
|
"""
|
|
- fail_email_subject = """
|
|
|
|
|
|
+
|
|
|
|
+ # E-mails
|
|
|
|
+ email_subject = """
|
|
[celery@%(hostname)s] Error: Task %(name)s (%(id)s): %(exc)s
|
|
[celery@%(hostname)s] Error: Task %(name)s (%(id)s): %(exc)s
|
|
"""
|
|
"""
|
|
- fail_email_body = TASK_FAIL_EMAIL_BODY
|
|
|
|
|
|
+ email_body = TASK_ERROR_EMAIL_BODY
|
|
|
|
+
|
|
|
|
+ # Internal flags
|
|
executed = False
|
|
executed = False
|
|
acknowledged = False
|
|
acknowledged = False
|
|
time_start = None
|
|
time_start = None
|
|
|
|
+ _already_revoked = False
|
|
|
|
|
|
def __init__(self, task_name, task_id, args, kwargs,
|
|
def __init__(self, task_name, task_id, args, kwargs,
|
|
- on_ack=noop, retries=0, delivery_info=None, hostname=None, **opts):
|
|
|
|
|
|
+ on_ack=noop, retries=0, delivery_info=None, hostname=None,
|
|
|
|
+ email_subject=None, email_body=None, logger=None,
|
|
|
|
+ eventer=None, **opts):
|
|
self.task_name = task_name
|
|
self.task_name = task_name
|
|
self.task_id = task_id
|
|
self.task_id = task_id
|
|
self.retries = retries
|
|
self.retries = retries
|
|
@@ -203,16 +212,13 @@ class TaskRequest(object):
|
|
self.kwargs = kwargs
|
|
self.kwargs = kwargs
|
|
self.on_ack = on_ack
|
|
self.on_ack = on_ack
|
|
self.delivery_info = delivery_info or {}
|
|
self.delivery_info = delivery_info or {}
|
|
- self.task = tasks[self.task_name]
|
|
|
|
self.hostname = hostname or socket.gethostname()
|
|
self.hostname = hostname or socket.gethostname()
|
|
- self._already_revoked = False
|
|
|
|
-
|
|
|
|
- for opt in ("success_msg", "fail_msg", "fail_email_subject",
|
|
|
|
- "fail_email_body", "logger", "eventer"):
|
|
|
|
- setattr(self, opt, opts.get(opt, getattr(self, opt, None)))
|
|
|
|
|
|
+ self.logger = logger or log.get_default_logger()
|
|
|
|
+ self.eventer = eventer
|
|
|
|
+ self.email_subject = email_subject or self.email_subject
|
|
|
|
+ self.email_body = email_body or self.email_body
|
|
|
|
|
|
- if not self.logger:
|
|
|
|
- self.logger = get_default_logger()
|
|
|
|
|
|
+ self.task = tasks[self.task_name]
|
|
|
|
|
|
def __repr__(self):
|
|
def __repr__(self):
|
|
return '<%s: {name:"%s", id:"%s", args:"%s", kwargs:"%s"}>' % (
|
|
return '<%s: {name:"%s", id:"%s", args:"%s", kwargs:"%s"}>' % (
|
|
@@ -407,12 +413,12 @@ class TaskRequest(object):
|
|
"args": self.args,
|
|
"args": self.args,
|
|
"kwargs": self.kwargs,
|
|
"kwargs": self.kwargs,
|
|
}
|
|
}
|
|
- self.logger.error(self.fail_msg.strip() % context)
|
|
|
|
|
|
+ self.logger.error(self.error_msg.strip() % context)
|
|
|
|
|
|
task_obj = tasks.get(self.task_name, object)
|
|
task_obj = tasks.get(self.task_name, object)
|
|
send_error_email = conf.CELERY_SEND_TASK_ERROR_EMAILS and not \
|
|
send_error_email = conf.CELERY_SEND_TASK_ERROR_EMAILS and not \
|
|
task_obj.disable_error_emails
|
|
task_obj.disable_error_emails
|
|
if send_error_email:
|
|
if send_error_email:
|
|
- subject = self.fail_email_subject.strip() % context
|
|
|
|
- body = self.fail_email_body.strip() % context
|
|
|
|
|
|
+ subject = self.email_subject.strip() % context
|
|
|
|
+ body = self.email_body.strip() % context
|
|
mail_admins(subject, body, fail_silently=True)
|
|
mail_admins(subject, body, fail_silently=True)
|