Explorar el Código

Merge branch 'master' of github.com:celery/celery

Ask Solem hace 12 años
padre
commit
30b492425f
Se han modificado 5 ficheros con 31 adiciones y 8 borrados
  1. 7 0
      Changelog
  2. 5 2
      celery/app/amqp.py
  3. 7 4
      celery/app/task.py
  4. 5 2
      celery/worker/job.py
  5. 7 0
      docs/internals/protocol.rst

+ 7 - 0
Changelog

@@ -5,6 +5,13 @@
 .. contents::
 .. contents::
     :local:
     :local:
 
 
+.. _version-3.0.2:
+
+3.0.2
+=====
+
+- `Task.apply_async` now supports timeout and soft_timeout arguments (Issue #802)
+
 .. _version-3.0.1:
 .. _version-3.0.1:
 
 
 3.0.1
 3.0.1

+ 5 - 2
celery/app/amqp.py

@@ -157,7 +157,8 @@ class TaskProducer(Producer):
             queue=None, now=None, retries=0, chord=None, callbacks=None,
             queue=None, now=None, retries=0, chord=None, callbacks=None,
             errbacks=None, mandatory=None, priority=None, immediate=None,
             errbacks=None, mandatory=None, priority=None, immediate=None,
             routing_key=None, serializer=None, delivery_mode=None,
             routing_key=None, serializer=None, delivery_mode=None,
-            compression=None, **kwargs):
+            compression=None, timeout=None, soft_timeout=None,
+            **kwargs):
         """Send task message."""
         """Send task message."""
         # merge default and custom policy
         # merge default and custom policy
         retry = self.retry if retry is None else retry
         retry = self.retry if retry is None else retry
@@ -178,6 +179,7 @@ class TaskProducer(Producer):
             expires = now + timedelta(seconds=expires)
             expires = now + timedelta(seconds=expires)
         eta = eta and eta.isoformat()
         eta = eta and eta.isoformat()
         expires = expires and expires.isoformat()
         expires = expires and expires.isoformat()
+        timeouts = (timeout, soft_timeout)
 
 
         body = {'task': task_name,
         body = {'task': task_name,
                 'id': task_id,
                 'id': task_id,
@@ -188,7 +190,8 @@ class TaskProducer(Producer):
                 'expires': expires,
                 'expires': expires,
                 'utc': self.utc,
                 'utc': self.utc,
                 'callbacks': callbacks,
                 'callbacks': callbacks,
-                'errbacks': errbacks}
+                'errbacks': errbacks,
+                'timeouts': timeouts}
         group_id = group_id or taskset_id
         group_id = group_id or taskset_id
         if group_id:
         if group_id:
             body['taskset'] = group_id
             body['taskset'] = group_id

+ 7 - 4
celery/app/task.py

@@ -29,6 +29,7 @@ extract_exec_options = mattrgetter(
     'queue', 'routing_key', 'exchange',
     'queue', 'routing_key', 'exchange',
     'immediate', 'mandatory', 'priority', 'expires',
     'immediate', 'mandatory', 'priority', 'expires',
     'serializer', 'delivery_mode', 'compression',
     'serializer', 'delivery_mode', 'compression',
+    'timeout', 'soft_timeout',
 )
 )
 
 
 
 
@@ -174,15 +175,15 @@ class Task(object):
     #: If enabled the worker will not store task state and return values
     #: If enabled the worker will not store task state and return values
     #: for this task.  Defaults to the :setting:`CELERY_IGNORE_RESULT`
     #: for this task.  Defaults to the :setting:`CELERY_IGNORE_RESULT`
     #: setting.
     #: setting.
-    ignore_result = False
+    ignore_result = None
 
 
     #: When enabled errors will be stored even if the task is otherwise
     #: When enabled errors will be stored even if the task is otherwise
     #: configured to ignore results.
     #: configured to ignore results.
-    store_errors_even_if_ignored = False
+    store_errors_even_if_ignored = None
 
 
     #: If enabled an email will be sent to :setting:`ADMINS` whenever a task
     #: If enabled an email will be sent to :setting:`ADMINS` whenever a task
     #: of this type fails.
     #: of this type fails.
-    send_error_emails = False
+    send_error_emails = None
 
 
     #: The name of a serializer that are registered with
     #: The name of a serializer that are registered with
     #: :mod:`kombu.serialization.registry`.  Default is `'pickle'`.
     #: :mod:`kombu.serialization.registry`.  Default is `'pickle'`.
@@ -213,7 +214,7 @@ class Task(object):
     #:
     #:
     #: The application default can be overridden using the
     #: The application default can be overridden using the
     #: :setting:`CELERY_TRACK_STARTED` setting.
     #: :setting:`CELERY_TRACK_STARTED` setting.
-    track_started = False
+    track_started = None
 
 
     #: When enabled messages for this task will be acknowledged **after**
     #: When enabled messages for this task will be acknowledged **after**
     #: the task has been executed, and not *just before* which is the
     #: the task has been executed, and not *just before* which is the
@@ -487,6 +488,8 @@ class Task(object):
         :keyword eta: Explicit time and date to run the retry at
         :keyword eta: Explicit time and date to run the retry at
                       (must be a :class:`~datetime.datetime` instance).
                       (must be a :class:`~datetime.datetime` instance).
         :keyword max_retries: If set, overrides the default retry limit.
         :keyword max_retries: If set, overrides the default retry limit.
+        :keyword timeout: If set, overrides the default timeout.
+        :keyword soft_timeout: If set, overrides the default soft timeout.
         :keyword \*\*options: Any extra options to pass on to
         :keyword \*\*options: Any extra options to pass on to
                               meth:`apply_async`.
                               meth:`apply_async`.
         :keyword throw: If this is :const:`False`, do not raise the
         :keyword throw: If this is :const:`False`, do not raise the

+ 5 - 2
celery/worker/job.py

@@ -185,6 +185,9 @@ class Request(object):
         request.update({'hostname': hostname, 'is_eager': False,
         request.update({'hostname': hostname, 'is_eager': False,
                         'delivery_info': self.delivery_info,
                         'delivery_info': self.delivery_info,
                         'group': self.request_dict.get('taskset')})
                         'group': self.request_dict.get('taskset')})
+        timeout, soft_timeout = request.get('timeouts', (None, None))
+        timeout = timeout or task.time_limit
+        soft_timeout = soft_timeout or task.soft_time_limit
         result = pool.apply_async(trace_task_ret,
         result = pool.apply_async(trace_task_ret,
                                   args=(self.name, self.id,
                                   args=(self.name, self.id,
                                         self.args, kwargs, request),
                                         self.args, kwargs, request),
@@ -192,8 +195,8 @@ class Request(object):
                                   timeout_callback=self.on_timeout,
                                   timeout_callback=self.on_timeout,
                                   callback=self.on_success,
                                   callback=self.on_success,
                                   error_callback=self.on_failure,
                                   error_callback=self.on_failure,
-                                  soft_timeout=task.soft_time_limit,
-                                  timeout=task.time_limit)
+                                  soft_timeout=soft_timeout,
+                                  timeout=timeout)
         return result
         return result
 
 
     def execute(self, loglevel=None, logfile=None):
     def execute(self, loglevel=None, logfile=None):

+ 7 - 0
docs/internals/protocol.rst

@@ -54,6 +54,13 @@ Message format
     will be expired when the message is received and the expiration date
     will be expired when the message is received and the expiration date
     has been exceeded.
     has been exceeded.
 
 
+* timeouts
+    :`tuple`:
+
+    .. versionadded:: 2.7
+
+    Task execution timeouts. This is a tuple of hard and soft timeouts.
+    Timeout values are `int` or `float`.
 
 
 Extensions
 Extensions
 ==========
 ==========