|
@@ -19,17 +19,17 @@ from datetime import datetime
|
|
|
from kombu.utils import kwdict, reprcall
|
|
|
from kombu.utils.encoding import safe_repr, safe_str
|
|
|
|
|
|
-from celery import exceptions
|
|
|
from celery import signals
|
|
|
from celery.app import app_or_default
|
|
|
from celery.datastructures import ExceptionInfo
|
|
|
-from celery.exceptions import Ignore, TaskRevokedError
|
|
|
+from celery.exceptions import (
|
|
|
+ Ignore, TaskRevokedError, InvalidTaskError,
|
|
|
+ SoftTimeLimitExceeded, TimeLimitExceeded,
|
|
|
+ WorkerLostError, Terminated, RetryTaskError,
|
|
|
+)
|
|
|
from celery.five import items
|
|
|
from celery.platforms import signals as _signals
|
|
|
-from celery.task.trace import (
|
|
|
- trace_task,
|
|
|
- trace_task_ret,
|
|
|
-)
|
|
|
+from celery.task.trace import trace_task, trace_task_ret
|
|
|
from celery.utils import fun_takes_kwargs
|
|
|
from celery.utils.functional import noop
|
|
|
from celery.utils.log import get_logger
|
|
@@ -112,7 +112,7 @@ class Request(object):
|
|
|
try:
|
|
|
self.kwargs.items
|
|
|
except AttributeError:
|
|
|
- raise exceptions.InvalidTaskError(
|
|
|
+ raise InvalidTaskError(
|
|
|
'Task keyword arguments is not a mapping')
|
|
|
if NEEDS_KWDICT:
|
|
|
self.kwargs = kwdict(self.kwargs)
|
|
@@ -131,13 +131,21 @@ class Request(object):
|
|
|
# timezone means the message is timezone-aware, and the only timezone
|
|
|
# supported at this point is UTC.
|
|
|
if eta is not None:
|
|
|
- self.eta = maybe_iso8601(eta)
|
|
|
+ try:
|
|
|
+ self.eta = maybe_iso8601(eta)
|
|
|
+ except (AttributeError, ValueError), exc:
|
|
|
+ raise InvalidTaskError(
|
|
|
+ 'invalid eta value %r: %s' % (eta, exc, ))
|
|
|
if utc:
|
|
|
self.eta = maybe_make_aware(self.eta, self.tzlocal)
|
|
|
else:
|
|
|
self.eta = None
|
|
|
if expires is not None:
|
|
|
- self.expires = maybe_iso8601(expires)
|
|
|
+ try:
|
|
|
+ self.expires = maybe_iso8601(expires)
|
|
|
+ except (AttributeError, ValueError), exc:
|
|
|
+ raise InvalidTaskError(
|
|
|
+ 'invalid expires value %r: %s' % (expires, exc, ))
|
|
|
if utc:
|
|
|
self.expires = maybe_make_aware(self.expires, self.tzlocal)
|
|
|
else:
|
|
@@ -313,11 +321,11 @@ class Request(object):
|
|
|
if soft:
|
|
|
warn('Soft time limit (%ss) exceeded for %s[%s]',
|
|
|
timeout, self.name, self.id)
|
|
|
- exc = exceptions.SoftTimeLimitExceeded(timeout)
|
|
|
+ exc = SoftTimeLimitExceeded(timeout)
|
|
|
else:
|
|
|
error('Hard time limit (%ss) exceeded for %s[%s]',
|
|
|
timeout, self.name, self.id)
|
|
|
- exc = exceptions.TimeLimitExceeded(timeout)
|
|
|
+ exc = TimeLimitExceeded(timeout)
|
|
|
|
|
|
if self.store_errors:
|
|
|
self.task.backend.mark_as_failure(self.id, exc)
|
|
@@ -369,15 +377,15 @@ class Request(object):
|
|
|
if not exc_info.internal:
|
|
|
exc = exc_info.exception
|
|
|
|
|
|
- if isinstance(exc, exceptions.RetryTaskError):
|
|
|
+ if isinstance(exc, RetryTaskError):
|
|
|
return self.on_retry(exc_info)
|
|
|
|
|
|
# These are special cases where the process would not have had
|
|
|
# time to write the result.
|
|
|
if self.store_errors:
|
|
|
- if isinstance(exc, exceptions.WorkerLostError):
|
|
|
+ if isinstance(exc, WorkerLostError):
|
|
|
self.task.backend.mark_as_failure(self.id, exc)
|
|
|
- elif isinstance(exc, exceptions.Terminated):
|
|
|
+ elif isinstance(exc, Terminated):
|
|
|
self._announce_revoked('terminated', True, str(exc), False)
|
|
|
# (acks_late) acknowledge after result stored.
|
|
|
if self.task.acks_late:
|