Sfoglia il codice sorgente

More events.state optimizations

Ask Solem 11 anni fa
parent
commit
e1be15cd79
4 ha cambiato i file con 132 aggiunte e 143 eliminazioni
  1. 4 2
      celery/concurrency/asynpool.py
  2. 121 137
      celery/events/state.py
  3. 3 3
      celery/utils/__init__.py
  4. 4 1
      celery/worker/job.py

+ 4 - 2
celery/concurrency/asynpool.py

@@ -96,6 +96,8 @@ SCHED_STRATEGIES = {
     'fair': SCHED_STRATEGY_FAIR,
 }
 
+RESULT_MAXLEN = 128
+
 Ack = namedtuple('Ack', ('id', 'fd', 'payload'))
 
 
@@ -168,9 +170,9 @@ class Worker(_pool.Worker):
         # is writable.
         self.outq.put((WORKER_UP, (pid, )))
 
-    def prepare_result(self, result):
+    def prepare_result(self, result, RESULT_MAXLEN=RESULT_MAXLEN):
         if not isinstance(result, ExceptionInfo):
-            return truncate(repr(result), 46)
+            return truncate(repr(result), RESULT_MAXLEN)
         return result
 
 

+ 121 - 137
celery/events/state.py

@@ -26,11 +26,14 @@ from heapq import heappush, heappop
 from itertools import islice
 from operator import itemgetter
 from time import time
+from weakref import ref
 
 from kombu.clocks import timetuple
+from kombu.utils import cached_property
 
 from celery import states
-from celery.five import items, values
+from celery.five import class_property, items, values
+from celery.utils import deprecated
 from celery.utils.functional import LRUCache
 from celery.utils.log import get_logger
 
@@ -92,14 +95,14 @@ def with_unique_field(attr):
 @with_unique_field('hostname')
 class Worker(object):
     """Worker State."""
+    clock = 0
     heartbeat_max = 4
     expire_window = HEARTBEAT_EXPIRE_WINDOW
     pid = None
-    _defaults = {'hostname': None, 'pid': None, 'freq': 60}
 
+    _fields = ('hostname', 'pid', 'freq', 'heartbeats', 'clock')
     if not PYPY:
-        __slots__ = ('hostname', 'pid', 'freq',
-                     'heartbeats', 'clock', '__dict__')
+        __slots__ = _fields + ('event', '__dict__', '__weakref__')
 
     def __init__(self, hostname=None, pid=None, freq=60):
         self.hostname = hostname
@@ -142,24 +145,6 @@ class Worker(object):
         for k, v in items(dict(f, **kw) if kw else f):
             setattr(self, k, v)
 
-    def update_heartbeat(self, received, timestamp):
-        self.event(None, timestamp, received)
-
-    def on_online(self, timestamp=None, local_received=None, **fields):
-        """Deprecated, to be removed in 3.1, use:
-        ``.event('online', timestamp, local_received, fields)``."""
-        self.event('online', timestamp, local_received, fields)
-
-    def on_offline(self, timestamp=None, local_received=None, **fields):
-        """Deprecated, to be removed in 3.1, use:
-        ``.event('offline', timestamp, local_received, fields)``."""
-        self.event('offline', timestamp, local_received, fields)
-
-    def on_heartbeat(self, timestamp=None, local_received=None, **fields):
-        """Deprecated, to be removed in 3.1, use:
-        ``.event('heartbeat', timestamp, local_received, fields)``."""
-        self.event('heartbeat', timestamp, local_received, fields)
-
     def __repr__(self):
         return R_WORKER.format(self)
 
@@ -180,19 +165,46 @@ class Worker(object):
     def id(self):
         return '{0.hostname}.{0.pid}'.format(self)
 
+    @deprecated('3.2' '3.3')
+    def update_heartbeat(self, received, timestamp):
+        self.event(None, timestamp, received)
+
+    @deprecated('3.2', '3.3')
+    def on_online(self, timestamp=None, local_received=None, **fields):
+        self.event('online', timestamp, local_received, fields)
+
+    @deprecated('3.2', '3.3')
+    def on_offline(self, timestamp=None, local_received=None, **fields):
+        self.event('offline', timestamp, local_received, fields)
+
+    @deprecated('3.2', '3.3')
+    def on_heartbeat(self, timestamp=None, local_received=None, **fields):
+        self.event('heartbeat', timestamp, local_received, fields)
+
+    @class_property
+    def _defaults(cls):
+        """Deprecated, to be removed in 3.2"""
+        source = cls()
+        return dict((k, getattr(source, k)) for k in cls._fields)
+
 
 @with_unique_field('uuid')
 class Task(object):
     """Task State."""
+    name = received = sent = started = succeeded = failed = retried = \
+        revoked = args = kwargs = eta = expires = retries = worker = result = \
+        exception = timestamp = runtime = traceback = exchange = \
+        routing_key = None
+    state = states.PENDING
+    clock = 0
+
+    _fields = ('uuid', 'name', 'state', 'received', 'sent', 'started',
+               'succeeded', 'failed', 'retried', 'revoked', 'args', 'kwargs',
+               'eta', 'expires', 'retries', 'worker', 'result', 'exception',
+               'timestamp', 'runtime', 'traceback', 'exchange', 'routing_key',
+               'clock')
     if not PYPY:
-        __slots__ = (
-            'uuid', 'name', 'state', 'received', 'sent', 'started',
-            'succeeded', 'failed', 'retried', 'revoked', 'args', 'kwargs',
-            'eta', 'expires', 'retries', 'worker', 'result',
-            'exception', 'timestamp', 'runtime',
-            'traceback', 'exchange', 'routing_key', 'clock',
-            '__dict__',
-        )
+        __slots__ = _fields + ('__dict__', '__weakref__')
 
     #: How to merge out of order events.
     #: Disorder is detected by logical ordering (e.g. :event:`task-received`
@@ -202,77 +214,18 @@ class Task(object):
     #: that state. ``(RECEIVED, ('name', 'args')``, means the name and args
     #: fields are always taken from the RECEIVED state, and any values for
     #: these fields received before or after is simply ignored.
-    merge_rules = {
-        states.RECEIVED: ('name', 'args', 'kwargs',
-                          'retries', 'eta', 'expires'),
-    }
+    merge_rules = {states.RECEIVED: ('name', 'args', 'kwargs',
+                                     'retries', 'eta', 'expires')}
 
     #: meth:`info` displays these fields by default.
-    _info_fields = ('args', 'kwargs', 'retries', 'result',
-                    'eta', 'runtime', 'expires', 'exception',
-                    'exchange', 'routing_key')
-
-    def __init__(self, uuid=None, name=None, state=states.PENDING,
-                 received=None, sent=None, started=None, succeeded=None,
-                 failed=None, retried=None, revoked=None, args=None,
-                 kwargs=None, eta=None, expires=None, retries=None,
-                 worker=None, result=None, exception=None, timestamp=None,
-                 runtime=None, traceback=None, exchange=None,
-                 routing_key=None, clock=0, **fields):
-        self.uuid = uuid
-        self.name = name
-        self.state = state
-        self.received = received
-        self.sent = sent
-        self.started = started
-        self.succeeded = succeeded
-        self.failed = failed
-        self.retried = retried
-        self.revoked = revoked
-        self.args = args
-        self.kwargs = kwargs
-        self.eta = eta
-        self.expires = expires
-        self.retries = retries
-        self.worker = worker
-        self.result = result
-        self.exception = exception
-        self.timestamp = timestamp
-        self.runtime = runtime
-        self.traceback = traceback
-        self.exchange = exchange
-        self.routing_key = routing_key
-        self.clock = clock
-
-    def update(self, state, timestamp, fields,
-               _state=states.state, RETRY=states.RETRY):
-        """Update state from new event.
-
-        :param state: State from event.
-        :param timestamp: Timestamp from event.
-        :param fields: Event data.
+    _info_fields = ('args', 'kwargs', 'retries', 'result', 'eta', 'runtime',
+                    'expires', 'exception', 'exchange', 'routing_key')
 
-        """
-        time_received = fields.get('local_received') or 0
-        if self.worker and time_received:
-            self.worker.update_heartbeat(time_received, timestamp)
-        if state != RETRY and self.state != RETRY and \
-                _state(state) < _state(self.state):
-            # this state logically happens-before the current state, so merge.
-            self.merge(state, timestamp, fields)
-        else:
-            self.state = state
-            self.timestamp = timestamp
-            for key, value in items(self.fields):
-                setattr(self, key, value)
-
-    def merge(self, state, timestamp, fields):
-        """Merge with out of order event."""
-        keep = self.merge_rules.get(state)
-        if keep is not None:
-            fields = dict((k, v) for k, v in items(fields) if k in keep)
-        for key, value in items(fields):
-            setattr(self, key, value)
+    def __init__(self, uuid=None, **kwargs):
+        self.uuid = uuid
+        if kwargs:
+            for k, v in items(kwargs):
+                setattr(self, k, v)
 
     def event(self, type_, timestamp=None, local_received=None, fields=None,
               precedence=states.precedence, items=items, dict=dict,
@@ -335,49 +288,62 @@ class Task(object):
     def ready(self):
         return self.state in states.READY_STATES
 
+    @deprecated('3.2', '3.3')
     def on_sent(self, timestamp=None, **fields):
-        """Deprecated, to be removed in 3.2, use:
-        ``.event('sent', timestamp, fields)``."""
         self.event('sent', timestamp, fields)
 
+    @deprecated('3.2', '3.3')
     def on_received(self, timestamp=None, **fields):
-        """Deprecated, to be removed in 3.2, use:
-        ``.event('received', timestamp, fields)``."""
         self.event('received', timestamp, fields)
 
+    @deprecated('3.2', '3.3')
     def on_started(self, timestamp=None, **fields):
-        """Deprecated, to be removed in 3.2, use:
-        ``.event('started', timestamp, fields)``."""
         self.event('started', timestamp, fields)
 
+    @deprecated('3.2', '3.3')
     def on_failed(self, timestamp=None, **fields):
-        """Deprecated, to be removed in 3.2, use:
-        ``.event('failed', timestamp, fields)``."""
         self.event('failed', timestamp, fields)
 
+    @deprecated('3.2', '3.3')
     def on_retried(self, timestamp=None, **fields):
-        """Deprecated, to be removed in 3.2, use:
-        ``.event('retried', timestamp, fields)``."""
         self.event('retried', timestamp, fields)
 
+    @deprecated('3.2' '3.3')
     def on_succeeded(self, timestamp=None, **fields):
-        """Deprecated, to be removed in 3.2, use:
-        ``.event('succeeded', timestamp, fields)``."""
         self.event('succeeded', timestamp, fields)
 
+    @deprecated('3.2', '3.3')
     def on_revoked(self, timestamp=None, **fields):
-        """Deprecated, to be removed in 3.2, use:
-        ``.event('revoked', timestamp, fields)``."""
         self.event('revoked', timestamp, fields)
 
+    @deprecated('3.2', '3.3')
     def on_unknown_event(self, shortype, timestamp=None, **fields):
-        """Deprecated, to be removed in 3.2, use:
-        ``.event(type, timestamp, fields)``."""
         self.event(shortype, timestamp, fields)
 
+    @deprecated('3.2', '3.3')
+    def update(self, state, timestamp, fields,
+               _state=states.state, RETRY=states.RETRY):
+        return self.event(state, timestamp, None, fields)
+
+    @deprecated('3.2', '3.3')
+    def merge(self, state, timestamp, fields):
+        keep = self.merge_rules.get(state)
+        if keep is not None:
+            fields = dict((k, v) for k, v in items(fields) if k in keep)
+        for key, value in items(fields):
+            setattr(self, key, value)
+
+    @class_property
+    def _defaults(cls):
+        """Deprecated, to be removed in 3.2."""
+        source = cls()
+        return dict((k, getattr(source, k)) for k in source._fields)
+
 
 class State(object):
     """Records clusters state."""
+    Worker = Worker
+    Task = Task
     event_count = 0
     task_count = 0
 
@@ -394,7 +360,11 @@ class State(object):
         self.max_tasks_in_memory = max_tasks_in_memory
         self._mutex = threading.Lock()
         self.handlers = {}
-        self._event = self._create_dispatcher()
+        self._seen_types = set()
+
+    @cached_property
+    def _event(self):
+        return self._create_dispatcher()
 
     def freeze_while(self, fun, *args, **kwargs):
         clear_after = kwargs.pop('clear_after', False)
@@ -441,8 +411,8 @@ class State(object):
                 worker.update(kwargs)
             return worker, False
         except KeyError:
-            worker = self.workers[hostname] = Worker(
-                hostname=hostname, **kwargs)
+            worker = self.workers[hostname] = self.Worker(
+                hostname, **kwargs)
             return worker, True
 
     def get_or_create_task(self, uuid):
@@ -450,7 +420,7 @@ class State(object):
         try:
             return self.tasks[uuid], False
         except KeyError:
-            task = self.tasks[uuid] = Task(uuid=uuid)
+            task = self.tasks[uuid] = self.Task(uuid)
             return task, True
 
     def event(self, event):
@@ -468,14 +438,18 @@ class State(object):
     def _create_dispatcher(self):
         get_handler = self.handlers.__getitem__
         event_callback = self.event_callback
-        get_or_create_worker = self.get_or_create_worker
-        get_or_create_task = self.get_or_create_task
         wfields = itemgetter('hostname', 'timestamp', 'local_received')
         tfields = itemgetter('uuid', 'hostname', 'timestamp', 'local_received')
         taskheap = self._taskheap
         maxtasks = self.max_tasks_in_memory * 2
-
-        def _event(event, timetuple=timetuple):
+        add_type = self._seen_types.add
+        tasks, Task = self.tasks, self.Task
+        workers, Worker = self.workers, self.Worker
+        # avoid updating LRU entry at getitem
+        get_worker, get_task = workers.data.__getitem__, tasks.data.__getitem__
+
+        def _event(event,
+                   timetuple=timetuple, KeyError=KeyError, created=True):
             self.event_count += 1
             if event_callback:
                 event_callback(self, event)
@@ -493,30 +467,39 @@ class State(object):
                 except KeyError:
                     pass
                 else:
-                    worker, created = get_or_create_worker(hostname)
+                    try:
+                        worker, created = get_worker(hostname), False
+                    except KeyError:
+                        worker = workers[hostname] = Worker(hostname)
                     worker.event(subject, timestamp, local_received, event)
                     return created
             elif group == 'task':
                 uuid, hostname, timestamp, local_received = tfields(event)
                 # task-sent event is sent by client, not worker
                 is_client_event = subject == 'sent'
-                task, created = get_or_create_task(uuid)
+                try:
+                    task, created = get_task(uuid), False
+                except KeyError:
+                    task = tasks[uuid] = Task(uuid)
                 if not is_client_event:
-                    worker, _ = get_or_create_worker(hostname)
+                    try:
+                        worker, created = get_worker(hostname), False
+                    except KeyError:
+                        worker = workers[hostname] = Worker(hostname)
                     task.worker = worker
                     if worker is not None and local_received:
                         worker.event(None, local_received, timestamp)
                 clock = 0 if is_client_event else event.get('clock')
-                heappush(
-                    taskheap, timetuple(clock, timestamp, worker.id, task),
-                )
-                while len(taskheap) > maxtasks:
+                heappush(taskheap,
+                         timetuple(clock, timestamp, worker.id, ref(task)))
+                if len(taskheap) > maxtasks:
                     heappop(taskheap)
-                #if len(taskheap) > maxtasks:
-                #    heappop(taskheap)
                 if subject == 'received':
                     self.task_count += 1
                 task.event(subject, timestamp, local_received, event)
+                task_name = task.name
+                if task_name is not None:
+                    add_type(task_name)
                 return created
         return _event
 
@@ -531,10 +514,12 @@ class State(object):
         in ``(uuid, Task)`` tuples."""
         seen = set()
         for evtup in islice(reversed(self._taskheap), 0, limit):
-            uuid = evtup[3].uuid
-            if uuid not in seen:
-                yield uuid, evtup[3]
-                seen.add(uuid)
+            task = evtup[3]()
+            if task is not None:
+                uuid = task.uuid
+                if uuid not in seen:
+                    yield uuid, task
+                    seen.add(uuid)
     tasks_by_timestamp = tasks_by_time
 
     def tasks_by_type(self, name, limit=None):
@@ -561,8 +546,7 @@ class State(object):
 
     def task_types(self):
         """Return a list of all seen task types."""
-        return list(sorted(set(task.name for task in values(self.tasks)
-                               if task.name is not None)))
+        return sorted(self._seen_types)
 
     def alive_workers(self):
         """Return a list of (seemingly) alive workers."""

+ 3 - 3
celery/utils/__init__.py

@@ -85,18 +85,18 @@ def warn_deprecated(description=None, deprecation=None,
     warnings.warn(w)
 
 
-def deprecated(description=None, deprecation=None,
-               removal=None, alternative=None):
+def deprecated(deprecation=None, removal=None,
+               alternative=None, description=None):
     """Decorator for deprecated functions.
 
     A deprecation warning will be emitted when the function is called.
 
-    :keyword description: Description of what is being deprecated.
     :keyword deprecation: Version that marks first deprecation, if this
       argument is not set a ``PendingDeprecationWarning`` will be emitted
       instead.
     :keyword removed:  Future version when this feature will be removed.
     :keyword alternative:  Instructions for an alternative solution (if any).
+    :keyword description: Description of what is being deprecated.
 
     """
     def _inner(fun):

+ 4 - 1
celery/worker/job.py

@@ -48,6 +48,9 @@ debug, info, warn, error = (logger.debug, logger.info,
 _does_info = False
 _does_debug = False
 
+#: Max length of result representation
+RESULT_MAXLEN = 128
+
 
 def __optimize__():
     # this is also called by celery.app.trace.setup_worker_optimizations
@@ -512,7 +515,7 @@ class Request(object):
             self.on_reject(logger, self.connection_errors, requeue)
             self.acknowledged = True
 
-    def repr_result(self, result, maxlen=46):
+    def repr_result(self, result, maxlen=RESULT_MAXLEN):
         # 46 is the length needed to fit
         #     'the quick brown fox jumps over the lazy dog' :)
         if not isinstance(result, string_t):