# Released under the MIT License. See LICENSE for details.
#
"""Utilities for debugging memory leaks or other issues.
IMPORTANT - these functions use the gc module which looks 'under the hood'
at Python and sometimes returns not-fully-initialized objects, which may
cause crashes or errors due to suddenly having references to them that they
didn't expect, etc. See https://github.com/python/cpython/issues/59313.
For this reason, these methods should NEVER be called in production code.
Enable them only for debugging situations and be aware that their use may
itself cause problems. The same is true for the gc module itself.
"""
from __future__ import annotations
import gc
import sys
import time
import types
import weakref
import threading
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from typing import Any, TextIO
from logging import Logger
ABS_MAX_LEVEL = 10
# NOTE: In general we want this toolset to allow us to explore
# which objects are holding references to others so we can diagnose
# leaks/etc. It is a bit tricky to do that, however, without
# affecting the objects we are looking at by adding temporary references
# from module dicts, function scopes, etc. So we need to try to be
# careful about cleaning up after ourselves and explicitly avoiding
# returning these temporary references wherever possible.
# A good test is running printrefs() repeatedly on some object that is
# known to be static. If the list of references or the ids or any
# the listed references changes with each run, it's a good sign that
# we're showing some temporary objects that we should be ignoring.
[docs]
def getobjs(
cls: type | str, contains: str | None = None, expanded: bool = False
) -> list[Any]:
"""Return all garbage-collected objects matching criteria.
'type' can be an actual type or a string in which case objects
whose types contain that string will be returned.
If 'contains' is provided, objects will be filtered to those
containing that in their str() representations.
"""
# Don't wanna return stuff waiting to be garbage-collected.
gc.collect()
if not isinstance(cls, type | str):
raise TypeError('Expected a type or string for cls')
if not isinstance(contains, str | None):
raise TypeError('Expected a string or None for contains')
allobjs = _get_all_objects(expanded=expanded)
if isinstance(cls, str):
objs = [o for o in allobjs if cls in str(type(o))]
else:
objs = [o for o in allobjs if isinstance(o, cls)]
if contains is not None:
objs = [o for o in objs if contains in str(o)]
return objs
# Recursively expand slists objects into olist, using seen to track
# already processed objects.
def _getr(slist: list[Any], olist: list[Any], seen: set[int]) -> None:
for obj in slist:
if id(obj) in seen:
continue
seen.add(id(obj))
olist.append(obj)
tll = gc.get_referents(obj)
if tll:
_getr(tll, olist, seen)
def _get_all_objects(expanded: bool) -> list[Any]:
"""Return an expanded list of all objects.
See https://utcc.utoronto.ca/~cks/space/blog/python/GetAllObjects
"""
gcl = gc.get_objects()
if not expanded:
return gcl
olist: list[Any] = []
seen: set[int] = set()
# Just in case:
seen.add(id(gcl))
seen.add(id(olist))
seen.add(id(seen))
# _getr does the real work.
_getr(gcl, olist, seen)
return olist
[docs]
def getobj(objid: int, expanded: bool = False) -> Any:
"""Return a garbage-collected object by its id.
Remember that this is VERY inefficient and should only ever be used
for debugging.
"""
if not isinstance(objid, int):
raise TypeError(f'Expected an int for objid; got a {type(objid)}.')
# Don't wanna return stuff waiting to be garbage-collected.
gc.collect()
allobjs = _get_all_objects(expanded=expanded)
for obj in allobjs:
if id(obj) == objid:
return obj
raise RuntimeError(f'Object with id {objid} not found.')
[docs]
def getrefs(obj: Any) -> list[Any]:
"""Given an object, return things referencing it."""
v = vars() # Ignore ref coming from locals.
return [o for o in gc.get_referrers(obj) if o is not v]
[docs]
def printfiles(file: TextIO | None = None) -> None:
"""Print info about open files in the current app."""
import io
file = sys.stderr if file is None else file
try:
import psutil
except ImportError:
print(
"Error: printfiles requires the 'psutil' module to be installed.",
file=file,
)
return
proc = psutil.Process()
# Let's grab all Python file handles so we can associate raw files
# with their Python objects when possible.
fileio_ids = {obj.fileno(): obj for obj in getobjs(io.FileIO)}
textio_ids = {obj.fileno(): obj for obj in getobjs(io.TextIOWrapper)}
# FIXME: we could do a more limited version of this when psutil is
# not present that simply includes Python's files.
print('Files open by this app (not limited to Python\'s):', file=file)
for i, ofile in enumerate(proc.open_files()):
# Mypy doesn't know about mode apparently.
# (and can't use type: ignore because we don't require psutil
# and then mypy complains about unused ignore comment when its
# not present)
mode = getattr(ofile, 'mode')
assert isinstance(mode, str)
textio = textio_ids.get(ofile.fd)
textio_s = id(textio) if textio is not None else '<not found>'
fileio = fileio_ids.get(ofile.fd)
fileio_s = id(fileio) if fileio is not None else '<not found>'
print(
f'#{i+1}: path={ofile.path!r},'
f' fd={ofile.fd}, mode={mode!r}, TextIOWrapper={textio_s},'
f' FileIO={fileio_s}'
)
[docs]
def printrefs(
obj: Any,
max_level: int = 2,
exclude_objs: list[Any] | None = None,
expand_ids: list[int] | None = None,
file: TextIO | None = None,
) -> None:
"""Print human readable list of objects referring to an object.
'max_level' specifies how many levels of recursion are printed.
'exclude_objs' can be a list of exact objects to skip if found in the
referrers list. This can be useful to avoid printing the local context
where the object was passed in from (locals(), etc).
'expand_ids' can be a list of object ids; if that particular object is
found, it will always be expanded even if max_level has been reached.
"""
_printrefs(
obj,
level=0,
max_level=max_level,
exclude_objs=[] if exclude_objs is None else exclude_objs,
expand_ids=[] if expand_ids is None else expand_ids,
file=sys.stderr if file is None else file,
)
[docs]
def printtypes(
limit: int = 50, file: TextIO | None = None, expanded: bool = False
) -> None:
"""Print a human readable list of which types have the most instances."""
assert limit > 0
objtypes: dict[str, int] = {}
gc.collect() # Recommended before get_objects().
allobjs = _get_all_objects(expanded=expanded)
allobjc = len(allobjs)
for obj in allobjs:
modname = type(obj).__module__
tpname = type(obj).__qualname__
if modname != 'builtins':
tpname = f'{modname}.{tpname}'
objtypes[tpname] = objtypes.get(tpname, 0) + 1
# Presumably allobjs contains stack-frame/dict type stuff
# from this function call which in turn contain refs to allobjs.
# Let's try to prevent these huge lists from accumulating until
# the cyclical collector (hopefully) gets to them.
allobjs.clear()
del allobjs
print(f'Types most allocated ({allobjc} total objects):', file=file)
for i, tpitem in enumerate(
sorted(objtypes.items(), key=lambda x: x[1], reverse=True)[:limit]
):
tpname, tpval = tpitem
percent = tpval / allobjc * 100.0
print(f'{i+1}: {tpname}: {tpval} ({percent:.2f}%)', file=file)
[docs]
def printsizes(
limit: int = 50, file: TextIO | None = None, expanded: bool = False
) -> None:
"""Print total allocated sizes of different types."""
assert limit > 0
objsizes: dict[str, int] = {}
gc.collect() # Recommended before get_objects().
allobjs = _get_all_objects(expanded=expanded)
totalobjsize = 0
for obj in allobjs:
modname = type(obj).__module__
tpname = type(obj).__qualname__
if modname != 'builtins':
tpname = f'{modname}.{tpname}'
objsize = sys.getsizeof(obj)
objsizes[tpname] = objsizes.get(tpname, 0) + objsize
totalobjsize += objsize
totalobjmb = totalobjsize / (1024 * 1024)
print(
f'Types with most allocated bytes ({totalobjmb:.2f} mb total):',
file=file,
)
for i, tpitem in enumerate(
sorted(objsizes.items(), key=lambda x: x[1], reverse=True)[:limit]
):
tpname, tpval = tpitem
percent = tpval / totalobjsize * 100.0
print(f'{i+1}: {tpname}: {tpval} ({percent:.2f}%)', file=file)
def _desctype(obj: Any) -> str:
cls = type(obj)
if cls is types.ModuleType:
return f'{type(obj).__name__} {obj.__name__}'
if cls is types.MethodType:
bnd = 'bound' if hasattr(obj, '__self__') else 'unbound'
return f'{bnd} {type(obj).__name__} {obj.__name__}'
return f'{type(obj).__name__}'
def _desc(obj: Any) -> str:
extra: str | None = None
if isinstance(obj, list | tuple):
# Print length and the first few types.
tps = [_desctype(i) for i in obj[:3]]
tpsj = ', '.join(tps)
tpss = (
f', contains [{tpsj}, ...]'
if len(obj) > 3
else f', contains [{tpsj}]' if tps else ''
)
extra = f' (len {len(obj)}{tpss})'
elif isinstance(obj, dict):
# If it seems to be the vars() for a type or module, try to
# identify what.
for ref in getrefs(obj):
if hasattr(ref, '__dict__') and vars(ref) is obj:
extra = f' (vars for {_desctype(ref)} @ {id(ref)})'
# Generic dict: print length and the first few key:type pairs.
if extra is None:
pairs = [
f'{repr(n)}: {_desctype(v)}' for n, v in list(obj.items())[:3]
]
pairsj = ', '.join(pairs)
pairss = (
f', contains {{{pairsj}, ...}}'
if len(obj) > 3
else f', contains {{{pairsj}}}' if pairs else ''
)
extra = f' (len {len(obj)}{pairss})'
if extra is None:
extra = ''
return f'{_desctype(obj)} @ {id(obj)}{extra}'
def _printrefs(
obj: Any,
*,
level: int,
max_level: int,
exclude_objs: list,
expand_ids: list[int],
file: TextIO,
) -> None:
ind = ' ' * level
print(ind + _desc(obj), file=file)
v = vars()
if level < max_level or (id(obj) in expand_ids and level < ABS_MAX_LEVEL):
refs = getrefs(obj)
for ref in refs:
# It seems we tend to get a transient cell object with
# contents set to obj. Would be nice to understand why that
# happens but just ignoring it for now.
if isinstance(ref, types.CellType) and ref.cell_contents is obj:
continue
# Ignore anything we were asked to ignore.
if exclude_objs is not None:
if any(ref is eobj for eobj in exclude_objs):
continue
# Ignore references from our locals.
if ref is v:
continue
# The 'refs' list we just made will be listed as a referrer
# of this obj, so explicitly exclude it from the obj's
# listing.
_printrefs(
ref,
level=level + 1,
max_level=max_level,
exclude_objs=exclude_objs + [refs],
expand_ids=expand_ids,
file=file,
)
[docs]
class DeadlockDumper:
"""Dumps thread states if still around after timeout seconds.
This uses low level Python functionality so should still fire
even in the case of deadlock.
"""
# faulthandler has a single traceback-dump-later state, so only
# one of us can control it at a time.
lock = threading.Lock()
watch_in_progress = False
def __init__(self, timeout: float) -> None:
import faulthandler
cls = type(self)
with cls.lock:
if cls.watch_in_progress:
print(
'Existing DeadlockDumper found; new one will be a no-op.',
file=sys.stderr,
)
self.active = False
return
# Ok; no watch is in progress; we can be the active one.
cls.watch_in_progress = True
self.active = True
faulthandler.dump_traceback_later(timeout=timeout)
def __del__(self) -> None:
import faulthandler
cls = type(self)
# If we made it to here, call off the dump. But only if we are
# the active dump.
if self.active:
faulthandler.cancel_dump_traceback_later()
cls.watch_in_progress = False
[docs]
class DeadlockWatcher:
"""Individual watcher for deadlock conditions.
Use the enable_deadlock_watchers() to enable this system.
Next, use these wrapped in a with statement around some operation
that may deadlock. If the with statement does not complete within the
timeout period, a traceback of all threads will be dumped.
Note that the checker thread runs a cycle every ~5 seconds, so
something stuck needs to remain stuck for 5 seconds or so to be
caught for sure.
"""
watchers_lock: threading.Lock | None = None
watchers: list[weakref.ref[DeadlockWatcher]] | None = None
def __init__(
self,
timeout: float = 10.0,
logger: Logger | None = None,
logextra: dict | None = None,
) -> None:
from efro.util import caller_source_location
# pylint: disable=not-context-manager
cls = type(self)
if cls.watchers_lock is None or cls.watchers is None:
print(
'DeadlockWatcher created without watchers enabled.',
file=sys.stderr,
)
return
# All we do is record when we were made and how long till we
# expire.
self.create_time = time.monotonic()
self.timeout = timeout
self.noted_expire = False
self.logger = logger
self.logextra = logextra
self.caller_source_loc = caller_source_location()
curthread = threading.current_thread()
self.thread_id = (
'<unknown>'
if curthread.ident is None
else hex(curthread.ident).removeprefix('0x')
)
self.active = False
with cls.watchers_lock:
cls.watchers.append(weakref.ref(self))
# Support the with statement.
def __enter__(self) -> Any:
self.active = True
return self
def __exit__(self, exc_type: Any, exc_value: Any, exc_tb: Any) -> None:
self.active = False
# Print if we lived past our deadline. This is just an extra
# data point. The watcher thread should be doing the actual
# stack dumps/etc.
if self.logger is None or self.logextra is None:
return
duration = time.monotonic() - self.create_time
if duration > self.timeout:
self.logger.error(
'DeadlockWatcher %s at %s in thread %s lived %.2fs,'
' past timeout %.2fs. This should have triggered'
' a deadlock dump.',
id(self),
self.caller_source_loc,
self.thread_id,
duration,
self.timeout,
extra=self.logextra,
)
[docs]
@classmethod
def enable_deadlock_watchers(cls) -> None:
"""Spins up deadlock-watcher functionality.
Must be explicitly called before any DeadlockWatchers are
created.
"""
assert cls.watchers_lock is None
cls.watchers_lock = threading.Lock()
assert cls.watchers is None
cls.watchers = []
threading.Thread(
target=cls._deadlock_watcher_thread_main, daemon=True
).start()
@classmethod
def _deadlock_watcher_thread_main(cls) -> None:
# pylint: disable=not-context-manager
# pylint: disable=not-an-iterable
assert cls.watchers_lock is not None and cls.watchers is not None
# Spin in a loop checking our watchers periodically and dumping
# state if any have timed out. The trick here is that we don't
# explicitly dump state, but rather we set up a "dead man's
# switch" that does so after some amount of time if we don't
# explicitly cancel it. This way we'll hopefully get state dumps
# even for things like total GIL deadlocks.
while True:
# Set a dead man's switch for this pass.
dumper = DeadlockDumper(timeout=5.171)
# Sleep most of the way through it but give ourselves time
# to turn it off if we're still responsive.
time.sleep(4.129)
now = time.monotonic()
found_fresh_expired = False
# If any watcher is still active and expired, sleep past the
# timeout to force the dumper to do its thing.
with cls.watchers_lock:
for wref in cls.watchers:
w = wref()
if (
w is not None
and now - w.create_time > w.timeout
and not w.noted_expire
and w.active
):
# If they supplied a logger, let them know they
# should check stderr for a dump.
if w.logger is not None:
w.logger.error(
'DeadlockWatcher %s at %s in thread %s'
' with time %.2f expired;'
' check stderr for stack traces.',
id(w),
w.caller_source_loc,
w.thread_id,
w.timeout,
extra=w.logextra,
)
found_fresh_expired = True
w.noted_expire = True
# Important to clear this ref; otherwise we can keep
# a random watcher alive until our next time through.
w = None
# Prune dead watchers and reset for the next pass.
cls.watchers = [w for w in cls.watchers if w() is not None]
if found_fresh_expired:
# Push us over the dumper time limit which give us a
# lovely dump. Technically we could just do an immediate
# dump here instead, but that would give us two paths to
# maintain instead of this single one.
time.sleep(2.0)
# Call off the dump if it hasn't happened yet.
del dumper