فهرست منبع

Implements the task protocol 2 shadow field

Ask Solem 10 سال پیش
والد
کامیت
2d931569c9
4فایلهای تغییر یافته به همراه56 افزوده شده و 18 حذف شده
  1. 2 2
      celery/app/amqp.py
  2. 4 2
      celery/app/base.py
  3. 35 5
      celery/app/task.py
  4. 15 9
      celery/worker/request.py

+ 2 - 2
celery/app/amqp.py

@@ -270,7 +270,7 @@ class AMQP(object):
                    callbacks=None, errbacks=None, reply_to=None,
                    time_limit=None, soft_time_limit=None,
                    create_sent_event=False, root_id=None, parent_id=None,
-                   now=None, timezone=None):
+                   shadow=None, now=None, timezone=None):
         args = args or ()
         kwargs = kwargs or {}
         utc = self.utc
@@ -337,7 +337,7 @@ class AMQP(object):
                    chord=None, callbacks=None, errbacks=None, reply_to=None,
                    time_limit=None, soft_time_limit=None,
                    create_sent_event=False, root_id=None, parent_id=None,
-                   now=None, timezone=None):
+                   shadow=None, now=None, timezone=None):
         args = args or ()
         kwargs = kwargs or {}
         utc = self.utc

+ 4 - 2
celery/app/base.py

@@ -366,7 +366,8 @@ class Celery(object):
                   publisher=None, link=None, link_error=None,
                   add_to_parent=True, group_id=None, retries=0, chord=None,
                   reply_to=None, time_limit=None, soft_time_limit=None,
-                  root_id=None, parent_id=None, route_name=None, **options):
+                  root_id=None, parent_id=None, route_name=None,
+                  shadow=None, **options):
         amqp = self.amqp
         task_id = task_id or uuid()
         producer = producer or publisher  # XXX compat
@@ -383,7 +384,8 @@ class Celery(object):
             expires, retries, chord,
             maybe_list(link), maybe_list(link_error),
             reply_to or self.oid, time_limit, soft_time_limit,
-            self.conf.CELERY_SEND_TASK_SENT_EVENT, root_id, parent_id,
+            self.conf.CELERY_SEND_TASK_SENT_EVENT,
+            root_id, parent_id, shadow,
         )
 
         if connection:

+ 35 - 5
celery/app/task.py

@@ -360,7 +360,7 @@ class Task(object):
         return self.apply_async(args, kwargs)
 
     def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
-                    link=None, link_error=None, **options):
+                    link=None, link_error=None, shadow=None, **options):
         """Apply tasks asynchronously by sending a message.
 
         :keyword args: The positional arguments to pass on to the
@@ -384,6 +384,9 @@ class Task(object):
                           the task should expire.  The task will not be
                           executed after the expiration time.
 
+        :keyword shadow: Override task name used in logs/monitoring
+            (default from :meth:`shadow_name`).
+
         :keyword connection: Re-use existing broker connection instead
                              of establishing a new one.
 
@@ -440,9 +443,9 @@ class Task(object):
             attribute.  Trailing can also be disabled by default using the
             :attr:`trail` attribute
         :keyword publisher: Deprecated alias to ``producer``.
-        
-        :rtype :class:`celery.result.AsyncResult`: if 
-            :setting:`CELERY_ALWAYS_EAGER` is not set, otherwise 
+
+        :rtype :class:`celery.result.AsyncResult`: if
+            :setting:`CELERY_ALWAYS_EAGER` is not set, otherwise
             :class:`celery.result.EagerResult`:
 
         Also supports all keyword arguments supported by
@@ -468,12 +471,39 @@ class Task(object):
         if self.__self__ is not None:
             args = args if isinstance(args, tuple) else tuple(args or ())
             args = (self.__self__, ) + args
+        final_options = self._get_exec_options()
+        if options:
+            final_options = dict(final_options, **options)
         return app.send_task(
             self.name, args, kwargs, task_id=task_id, producer=producer,
             link=link, link_error=link_error, result_cls=self.AsyncResult,
-            **dict(self._get_exec_options(), **options)
+            shadow=shadow or self.shadow_name(args, kwargs, final_options),
+            **final_options
         )
 
+    def shadow_name(self, args, kwargs, options):
+        """Override for custom task name in worker logs/monitoring.
+
+        :param args: Task positional arguments.
+        :param kwargs: Task keyword arguments.
+        :param options: Task execution options.
+
+        **Example**:
+
+        .. code-block:: python
+
+            from celery.utils.imports import qualname
+
+            def shadow_name(task, args, kwargs, options):
+                return qualname(args[0])
+
+            @app.task(shadow_name=shadow_name, serializer='pickle')
+            def apply_function_async(fun, *args, **kwargs):
+                return fun(*args, **kwargs)
+
+        """
+        pass
+
     def signature_from_request(self, request=None, args=None, kwargs=None,
                                queue=None, **extra_options):
         request = self.request if request is None else request

+ 15 - 9
celery/worker/request.py

@@ -76,7 +76,7 @@ class Request(object):
 
     if not IS_PYPY:  # pragma: no cover
         __slots__ = (
-            'app', 'name', 'id', 'on_ack', 'body',
+            'app', 'type', 'name', 'id', 'on_ack', 'body',
             'hostname', 'eventer', 'connection_errors', 'task', 'eta',
             'expires', 'request_dict', 'on_reject', 'utc',
             'content_type', 'content_encoding',
@@ -105,8 +105,10 @@ class Request(object):
                 message.content_type, message.content_encoding,
             )
 
-        name = self.name = headers['task']
         self.id = headers['id']
+        type = self.type = self.name = headers['task']
+        if 'shadow' in headers:
+            self.name = headers['shadow']
         if 'timelimit' in headers:
             self.time_limits = headers['timelimit']
         self.on_ack = on_ack
@@ -114,7 +116,7 @@ class Request(object):
         self.hostname = hostname or socket.gethostname()
         self.eventer = eventer
         self.connection_errors = connection_errors or ()
-        self.task = task or self.app.tasks[name]
+        self.task = task or self.app.tasks[type]
 
         # timezone means the message is timezone-aware, and the only timezone
         # supported at this point is UTC.
@@ -178,7 +180,7 @@ class Request(object):
         soft_time_limit = soft_time_limit or task.soft_time_limit
         result = pool.apply_async(
             trace_task_ret,
-            args=(self.name, task_id, self.request_dict, self.body,
+            args=(self.type, task_id, self.request_dict, self.body,
                   self.content_type, self.content_encoding),
             accept_callback=self.on_accepted,
             timeout_callback=self.on_timeout,
@@ -377,6 +379,7 @@ class Request(object):
     def info(self, safe=False):
         return {'id': self.id,
                 'name': self.name,
+                'type': self.type,
                 'body': self.body,
                 'hostname': self.hostname,
                 'time_start': self.time_start,
@@ -385,15 +388,18 @@ class Request(object):
                 'worker_pid': self.worker_pid}
 
     def __str__(self):
-        return '{0.name}[{0.id}]{1}{2}'.format(
-            self,
+        return ' '.join([
+            self.humaninfo(),
             ' eta:[{0}]'.format(self.eta) if self.eta else '',
             ' expires:[{0}]'.format(self.expires) if self.expires else '',
-        )
+        ])
     shortinfo = __str__
 
+    def humaninfo(self):
+        return '{0.name}[{0.id}]'.format(self)
+
     def __repr__(self):
-        return '<{0} {1}: {2}>'.format(type(self).__name__, self.id, self.name)
+        return '<{0}: {1}>'.format(type(self).__name__, self.humaninfo())
 
     @property
     def tzlocal(self):
@@ -457,7 +463,7 @@ def create_request_cls(base, task, pool, hostname, eventer,
             soft_time_limit = soft_time_limit or default_soft_time_limit
             result = apply_async(
                 trace,
-                args=(self.name, task_id, self.request_dict, self.body,
+                args=(self.type, task_id, self.request_dict, self.body,
                       self.content_type, self.content_encoding),
                 accept_callback=self.on_accepted,
                 timeout_callback=self.on_timeout,