Browse Source

Merge branch 'issue802'

Mher Movsisyan 12 years ago
parent
commit
a803238c25
4 changed files with 20 additions and 4 deletions
  1. 5 2
      celery/app/amqp.py
  2. 3 0
      celery/app/task.py
  3. 5 2
      celery/worker/job.py
  4. 7 0
      docs/internals/protocol.rst

+ 5 - 2
celery/app/amqp.py

@@ -157,7 +157,8 @@ class TaskProducer(Producer):
             queue=None, now=None, retries=0, chord=None, callbacks=None,
             queue=None, now=None, retries=0, chord=None, callbacks=None,
             errbacks=None, mandatory=None, priority=None, immediate=None,
             errbacks=None, mandatory=None, priority=None, immediate=None,
             routing_key=None, serializer=None, delivery_mode=None,
             routing_key=None, serializer=None, delivery_mode=None,
-            compression=None, **kwargs):
+            compression=None, timeout=None, soft_timeout=None,
+            **kwargs):
         """Send task message."""
         """Send task message."""
         # merge default and custom policy
         # merge default and custom policy
         retry = self.retry if retry is None else retry
         retry = self.retry if retry is None else retry
@@ -178,6 +179,7 @@ class TaskProducer(Producer):
             expires = now + timedelta(seconds=expires)
             expires = now + timedelta(seconds=expires)
         eta = eta and eta.isoformat()
         eta = eta and eta.isoformat()
         expires = expires and expires.isoformat()
         expires = expires and expires.isoformat()
+        timeouts = (timeout, soft_timeout)
 
 
         body = {'task': task_name,
         body = {'task': task_name,
                 'id': task_id,
                 'id': task_id,
@@ -188,7 +190,8 @@ class TaskProducer(Producer):
                 'expires': expires,
                 'expires': expires,
                 'utc': self.utc,
                 'utc': self.utc,
                 'callbacks': callbacks,
                 'callbacks': callbacks,
-                'errbacks': errbacks}
+                'errbacks': errbacks,
+                'timeouts': timeouts}
         group_id = group_id or taskset_id
         group_id = group_id or taskset_id
         if group_id:
         if group_id:
             body['taskset'] = group_id
             body['taskset'] = group_id

+ 3 - 0
celery/app/task.py

@@ -29,6 +29,7 @@ extract_exec_options = mattrgetter(
     'queue', 'routing_key', 'exchange',
     'queue', 'routing_key', 'exchange',
     'immediate', 'mandatory', 'priority', 'expires',
     'immediate', 'mandatory', 'priority', 'expires',
     'serializer', 'delivery_mode', 'compression',
     'serializer', 'delivery_mode', 'compression',
+    'timeout', 'soft_timeout',
 )
 )
 
 
 
 
@@ -487,6 +488,8 @@ class Task(object):
         :keyword eta: Explicit time and date to run the retry at
         :keyword eta: Explicit time and date to run the retry at
                       (must be a :class:`~datetime.datetime` instance).
                       (must be a :class:`~datetime.datetime` instance).
         :keyword max_retries: If set, overrides the default retry limit.
         :keyword max_retries: If set, overrides the default retry limit.
+        :keyword timeout: If set, overrides the default timeout.
+        :keyword soft_timeout: If set, overrides the default soft timeout.
         :keyword \*\*options: Any extra options to pass on to
         :keyword \*\*options: Any extra options to pass on to
                               meth:`apply_async`.
                               meth:`apply_async`.
         :keyword throw: If this is :const:`False`, do not raise the
         :keyword throw: If this is :const:`False`, do not raise the

+ 5 - 2
celery/worker/job.py

@@ -185,6 +185,9 @@ class Request(object):
         request.update({'hostname': hostname, 'is_eager': False,
         request.update({'hostname': hostname, 'is_eager': False,
                         'delivery_info': self.delivery_info,
                         'delivery_info': self.delivery_info,
                         'group': self.request_dict.get('taskset')})
                         'group': self.request_dict.get('taskset')})
+        timeout, soft_timeout = request.get('timeouts', (None, None))
+        timeout = timeout or task.time_limit
+        soft_timeout = soft_timeout or task.soft_time_limit
         result = pool.apply_async(trace_task_ret,
         result = pool.apply_async(trace_task_ret,
                                   args=(self.name, self.id,
                                   args=(self.name, self.id,
                                         self.args, kwargs, request),
                                         self.args, kwargs, request),
@@ -192,8 +195,8 @@ class Request(object):
                                   timeout_callback=self.on_timeout,
                                   timeout_callback=self.on_timeout,
                                   callback=self.on_success,
                                   callback=self.on_success,
                                   error_callback=self.on_failure,
                                   error_callback=self.on_failure,
-                                  soft_timeout=task.soft_time_limit,
-                                  timeout=task.time_limit)
+                                  soft_timeout=soft_timeout,
+                                  timeout=timeout)
         return result
         return result
 
 
     def execute(self, loglevel=None, logfile=None):
     def execute(self, loglevel=None, logfile=None):

+ 7 - 0
docs/internals/protocol.rst

@@ -54,6 +54,13 @@ Message format
     will be expired when the message is received and the expiration date
     will be expired when the message is received and the expiration date
     has been exceeded.
     has been exceeded.
 
 
+* timeouts
+    :`tuple`:
+
+    .. versionadded:: 2.7
+
+    Task execution timeouts. This is a tuple of hard and soft timeouts.
+    Timeout values are `int` or `float`.
 
 
 Extensions
 Extensions
 ==========
 ==========