123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260 |
- # -*- coding: utf-8 -*-
- """Internal worker state (global)
- This includes the currently active and reserved tasks,
- statistics, and revoked tasks.
- """
- import os
- import sys
- import platform
- import shelve
- import weakref
- import zlib
- from collections import Counter
- from kombu.serialization import pickle, pickle_protocol
- from kombu.utils.objects import cached_property
- from celery import __version__
- from celery.exceptions import WorkerShutdown, WorkerTerminate
- from celery.utils.collections import LimitedSet
- __all__ = [
- 'SOFTWARE_INFO', 'reserved_requests', 'active_requests',
- 'total_count', 'revoked', 'task_reserved', 'maybe_shutdown',
- 'task_accepted', 'task_reserved', 'task_ready', 'Persistent',
- ]
- #: Worker software/platform information.
- SOFTWARE_INFO = {
- 'sw_ident': 'py-celery',
- 'sw_ver': __version__,
- 'sw_sys': platform.system(),
- }
- #: maximum number of revokes to keep in memory.
- REVOKES_MAX = 50000
- #: how many seconds a revoke will be active before
- #: being expired when the max limit has been exceeded.
- REVOKE_EXPIRES = 10800
- #: Mapping of reserved task_id->Request.
- requests = {}
- #: set of all reserved :class:`~celery.worker.request.Request`'s.
- reserved_requests = weakref.WeakSet()
- #: set of currently active :class:`~celery.worker.request.Request`'s.
- active_requests = weakref.WeakSet()
- #: count of tasks accepted by the worker, sorted by type.
- total_count = Counter()
- #: count of all tasks accepted by the worker
- all_total_count = [0]
- #: the list of currently revoked tasks. Persistent if ``statedb`` set.
- revoked = LimitedSet(maxlen=REVOKES_MAX, expires=REVOKE_EXPIRES)
- should_stop = None
- should_terminate = None
- def reset_state():
- requests.clear()
- reserved_requests.clear()
- active_requests.clear()
- total_count.clear()
- all_total_count[:] = [0]
- revoked.clear()
- def maybe_shutdown():
- if should_stop is not None and should_stop is not False:
- raise WorkerShutdown(should_stop)
- elif should_terminate is not None and should_terminate is not False:
- raise WorkerTerminate(should_terminate)
- def task_reserved(request,
- add_request=requests.__setitem__,
- add_reserved_request=reserved_requests.add):
- """Update global state when a task has been reserved."""
- add_request(request.id, request)
- add_reserved_request(request)
- def task_accepted(request,
- _all_total_count=all_total_count,
- add_active_request=active_requests.add,
- add_to_total_count=total_count.update):
- """Updates global state when a task has been accepted."""
- add_active_request(request)
- add_to_total_count({request.name: 1})
- all_total_count[0] += 1
- def task_ready(request,
- remove_request=requests.pop,
- discard_active_request=active_requests.discard,
- discard_reserved_request=reserved_requests.discard):
- """Updates global state when a task is ready."""
- remove_request(request.id, None)
- discard_active_request(request)
- discard_reserved_request(request)
- C_BENCH = os.environ.get('C_BENCH') or os.environ.get('CELERY_BENCH')
- C_BENCH_EVERY = int(os.environ.get('C_BENCH_EVERY') or
- os.environ.get('CELERY_BENCH_EVERY') or 1000)
- if C_BENCH: # pragma: no cover
- import atexit
- from time import monotonic
- from billiard.process import current_process
- from celery.utils.debug import memdump, sample_mem
- all_count = 0
- bench_first = None
- bench_start = None
- bench_last = None
- bench_every = C_BENCH_EVERY
- bench_sample = []
- __reserved = task_reserved
- __ready = task_ready
- if current_process()._name == 'MainProcess':
- @atexit.register
- def on_shutdown():
- if bench_first is not None and bench_last is not None:
- print('- Time spent in benchmark: {0!r}'.format(
- bench_last - bench_first))
- print('- Avg: {0}'.format(
- sum(bench_sample) / len(bench_sample)))
- memdump()
- def task_reserved(request): # noqa
- global bench_start
- global bench_first
- now = None
- if bench_start is None:
- bench_start = now = monotonic()
- if bench_first is None:
- bench_first = now
- return __reserved(request)
- def task_ready(request): # noqa
- global all_count
- global bench_start
- global bench_last
- all_count += 1
- if not all_count % bench_every:
- now = monotonic()
- diff = now - bench_start
- print('- Time spent processing {0} tasks (since first '
- 'task received): ~{1:.4f}s\n'.format(bench_every, diff))
- sys.stdout.flush()
- bench_start = bench_last = now
- bench_sample.append(diff)
- sample_mem()
- return __ready(request)
- class Persistent:
- """This is the persistent data stored by the worker when
- :option:`celery worker --statedb` is enabled.
- Currently only stores revoked task id's.
- """
- storage = shelve
- protocol = pickle_protocol
- compress = zlib.compress
- decompress = zlib.decompress
- _is_open = False
- def __init__(self, state, filename, clock=None):
- self.state = state
- self.filename = filename
- self.clock = clock
- self.merge()
- def open(self):
- return self.storage.open(
- self.filename, protocol=self.protocol, writeback=True,
- )
- def merge(self):
- self._merge_with(self.db)
- def sync(self):
- self._sync_with(self.db)
- self.db.sync()
- def close(self):
- if self._is_open:
- self.db.close()
- self._is_open = False
- def save(self):
- self.sync()
- self.close()
- def _merge_with(self, d):
- self._merge_revoked(d)
- self._merge_clock(d)
- return d
- def _sync_with(self, d):
- self._revoked_tasks.purge()
- d.update(
- __proto__=3,
- zrevoked=self.compress(self._dumps(self._revoked_tasks)),
- clock=self.clock.forward() if self.clock else 0,
- )
- return d
- def _merge_clock(self, d):
- if self.clock:
- d['clock'] = self.clock.adjust(d.get('clock') or 0)
- def _merge_revoked(self, d):
- try:
- self._merge_revoked_v3(d['zrevoked'])
- except KeyError:
- try:
- self._merge_revoked_v2(d.pop('revoked'))
- except KeyError:
- pass
- # purge expired items at boot
- self._revoked_tasks.purge()
- def _merge_revoked_v3(self, zrevoked):
- if zrevoked:
- self._revoked_tasks.update(pickle.loads(self.decompress(zrevoked)))
- def _merge_revoked_v2(self, saved):
- if not isinstance(saved, LimitedSet):
- # (pre 3.0.18) used to be stored as a dict
- return self._merge_revoked_v1(saved)
- self._revoked_tasks.update(saved)
- def _merge_revoked_v1(self, saved):
- add = self._revoked_tasks.add
- for item in saved:
- add(item)
- def _dumps(self, obj):
- return pickle.dumps(obj, protocol=self.protocol)
- @property
- def _revoked_tasks(self):
- return self.state.revoked
- @cached_property
- def db(self):
- self._is_open = True
- return self.open()
|