Browse Source

Events: Adds new task-rejected event for basic.reject

Ask Solem 9 years ago
parent
commit
8454428b0f
4 changed files with 23 additions and 7 deletions
  1. 7 5
      celery/events/state.py
  2. 5 2
      celery/states.py
  3. 1 0
      celery/worker/request.py
  4. 10 0
      docs/userguide/monitoring.rst

+ 7 - 5
celery/events/state.py

@@ -205,14 +205,14 @@ class Worker(object):
 class Task(object):
     """Task State."""
     name = received = sent = started = succeeded = failed = retried = \
-        revoked = args = kwargs = eta = expires = retries = worker = result = \
-        exception = timestamp = runtime = traceback = exchange = \
-        routing_key = root_id = parent_id = client = None
+        revoked = rejected = args = kwargs = eta = expires = retries = \
+        worker = result = exception = timestamp = runtime = traceback = \
+        exchange = routing_key = root_id = parent_id = client = None
     state = states.PENDING
     clock = 0
 
     _fields = (
-        'uuid', 'name', 'state', 'received', 'sent', 'started',
+        'uuid', 'name', 'state', 'received', 'sent', 'started', 'rejected',
         'succeeded', 'failed', 'retried', 'revoked', 'args', 'kwargs',
         'eta', 'expires', 'retries', 'worker', 'result', 'exception',
         'timestamp', 'runtime', 'traceback', 'exchange', 'routing_key',
@@ -254,7 +254,7 @@ class Task(object):
               PENDING=states.PENDING, RECEIVED=states.RECEIVED,
               STARTED=states.STARTED, FAILURE=states.FAILURE,
               RETRY=states.RETRY, SUCCESS=states.SUCCESS,
-              REVOKED=states.REVOKED):
+              REVOKED=states.REVOKED, REJECTED=states.REJECTED):
         fields = fields or {}
         if type_ == 'sent':
             state, self.sent = PENDING, timestamp
@@ -270,6 +270,8 @@ class Task(object):
             state, self.succeeded = SUCCESS, timestamp
         elif type_ == 'revoked':
             state, self.revoked = REVOKED, timestamp
+        elif type_ == 'rejected':
+            state, self.rejected = REJECTED, timestamp
         else:
             state = type_.upper()
 

+ 5 - 2
celery/states.py

@@ -72,6 +72,7 @@ PRECEDENCE = ['SUCCESS',
               'REVOKED',
               'STARTED',
               'RECEIVED',
+              'REJECTED',
               'RETRY',
               'PENDING']
 
@@ -126,7 +127,7 @@ class state(str):
 
 #: Task state is unknown (assumed pending since you know the id).
 PENDING = 'PENDING'
-#: Task was received by a worker.
+#: Task was received by a worker (only used in events).
 RECEIVED = 'RECEIVED'
 #: Task was started by a worker (:setting:`task_track_started`).
 STARTED = 'STARTED'
@@ -136,13 +137,15 @@ SUCCESS = 'SUCCESS'
 FAILURE = 'FAILURE'
 #: Task was revoked.
 REVOKED = 'REVOKED'
+#: Task was rejected (only used in events).
+REJECTED = 'REJECTED'
 #: Task is waiting for retry.
 RETRY = 'RETRY'
 IGNORED = 'IGNORED'
 REJECTED = 'REJECTED'
 
 READY_STATES = frozenset({SUCCESS, FAILURE, REVOKED})
-UNREADY_STATES = frozenset({PENDING, RECEIVED, STARTED, RETRY})
+UNREADY_STATES = frozenset({PENDING, RECEIVED, STARTED, REJECTED, RETRY})
 EXCEPTION_STATES = frozenset({RETRY, FAILURE, REVOKED})
 PROPAGATE_STATES = frozenset({FAILURE, REVOKED})
 

+ 1 - 0
celery/worker/request.py

@@ -389,6 +389,7 @@ class Request(object):
         if not self.acknowledged:
             self.on_reject(logger, self.connection_errors, requeue)
             self.acknowledged = True
+            self.send_event('task-rejected', requeue=requeue)
 
     def info(self, safe=False):
         return {'id': self.id,

+ 10 - 0
docs/userguide/monitoring.rst

@@ -696,6 +696,16 @@ task-failed
 
 Sent if the execution of the task failed.
 
+.. event:: task-rejected
+
+task-rejected
+~~~~~~~~~~~~~
+
+:signature: ``task-rejected(uuid, requeued)``
+
+The task was rejected by the worker, possibly to be requeued or moved to a
+dead letter queue.
+
 .. event:: task-revoked
 
 task-revoked