Bläddra i källkod

Fixes issue #802: apply_async accepts time limits

Mher Movsisyan 13 år sedan
förälder
incheckning
1ec9d17e0d
3 ändrade filer med 13 tillägg och 4 borttagningar
  1. 5 2
      celery/app/amqp.py
  2. 3 0
      celery/app/task.py
  3. 5 2
      celery/worker/job.py

+ 5 - 2
celery/app/amqp.py

@@ -156,7 +156,8 @@ class TaskProducer(Producer):
             queue=None, now=None, retries=0, chord=None, callbacks=None,
             errbacks=None, mandatory=None, priority=None, immediate=None,
             routing_key=None, serializer=None, delivery_mode=None,
-            compression=None, **kwargs):
+            compression=None, time_limit=None, soft_time_limit=None,
+            **kwargs):
         """Send task message."""
         # merge default and custom policy
         _rp = (dict(self.retry_policy, **retry_policy) if retry_policy
@@ -176,6 +177,7 @@ class TaskProducer(Producer):
             expires = now + timedelta(seconds=expires)
         eta = eta and eta.isoformat()
         expires = expires and expires.isoformat()
+        time_limits = (time_limit, soft_time_limit)
 
         body = {'task': task_name,
                 'id': task_id,
@@ -186,7 +188,8 @@ class TaskProducer(Producer):
                 'expires': expires,
                 'utc': self.utc,
                 'callbacks': callbacks,
-                'errbacks': errbacks}
+                'errbacks': errbacks,
+                'time_limits': time_limits}
         group_id = group_id or taskset_id
         if group_id:
             body['taskset'] = group_id

+ 3 - 0
celery/app/task.py

@@ -35,6 +35,7 @@ extract_exec_options = mattrgetter(
     'queue', 'routing_key', 'exchange',
     'immediate', 'mandatory', 'priority', 'expires',
     'serializer', 'delivery_mode', 'compression',
+    'time_limit', 'soft_time_limit',
 )
 
 #: Billiard sets this when execv is enabled.
@@ -517,6 +518,8 @@ class Task(object):
         :keyword eta: Explicit time and date to run the retry at
                       (must be a :class:`~datetime.datetime` instance).
         :keyword max_retries: If set, overrides the default retry limit.
+        :keyword time_limit: If set, overrides the default time limit.
+        :keyword soft_time_limit: If set, overrides the default soft time limit.
         :keyword \*\*options: Any extra options to pass on to
                               meth:`apply_async`.
         :keyword throw: If this is :const:`False`, do not raise the

+ 5 - 2
celery/worker/job.py

@@ -182,6 +182,9 @@ class Request(object):
         request.update({'hostname': hostname, 'is_eager': False,
                         'delivery_info': self.delivery_info,
                         'group': self.request_dict.get('taskset')})
+        time_limit, soft_time_limit = request.get('time_limits', (None, None))
+        time_limit = time_limit or task.time_limit
+        soft_time_limit = soft_time_limit or task.soft_time_limit
         result = pool.apply_async(trace_task_ret,
                                   args=(self.name, self.id,
                                         self.args, kwargs, request),
@@ -189,8 +192,8 @@ class Request(object):
                                   timeout_callback=self.on_timeout,
                                   callback=self.on_success,
                                   error_callback=self.on_failure,
-                                  soft_timeout=task.soft_time_limit,
-                                  timeout=task.time_limit)
+                                  soft_timeout=soft_time_limit,
+                                  timeout=time_limit)
         return result
 
     def execute(self, loglevel=None, logfile=None):