scripts: Adopted -k/--keep-open in treemap.py/codemap.py

It probably makes sense to adopt this in all ascii-art scripts that
operate on files.
This commit is contained in:
Christopher Haster
2025-03-15 03:49:43 -05:00
parent 6c36f76ffd
commit cb0119681d
3 changed files with 412 additions and 45 deletions

View File

@ -13,13 +13,21 @@ import bisect
import collections as co
import csv
import fnmatch
import io
import itertools as it
import json
import math as mt
import os
import re
import shlex
import shutil
import subprocess as sp
import time
try:
import inotify_simple
except ModuleNotFoundError:
inotify_simple = None
# we don't actually need that many chars/colors thanks to the
@ -53,6 +61,107 @@ def openio(path, mode='r', buffering=-1):
else:
return open(path, mode, buffering)
if inotify_simple is None:
Inotify = None
else:
class Inotify(inotify_simple.INotify):
def __init__(self, paths):
super().__init__()
# wait for interesting events
flags = (inotify_simple.flags.ATTRIB
| inotify_simple.flags.CREATE
| inotify_simple.flags.DELETE
| inotify_simple.flags.DELETE_SELF
| inotify_simple.flags.MODIFY
| inotify_simple.flags.MOVED_FROM
| inotify_simple.flags.MOVED_TO
| inotify_simple.flags.MOVE_SELF)
# recurse into directories
for path in paths:
if os.path.isdir(path):
for dir, _, files in os.walk(path):
self.add_watch(dir, flags)
for f in files:
self.add_watch(os.path.join(dir, f), flags)
else:
self.add_watch(path, flags)
class RingIO:
def __init__(self, maxlen=None, head=False):
self.maxlen = maxlen
self.head = head
self.lines = co.deque(maxlen=maxlen)
self.tail = io.StringIO()
# trigger automatic sizing
if maxlen == 0:
self.resize(0)
def __len__(self):
return len(self.lines)
def write(self, s):
# note using split here ensures the trailing string has no newline
lines = s.split('\n')
if len(lines) > 1 and self.tail.getvalue():
self.tail.write(lines[0])
lines[0] = self.tail.getvalue()
self.tail = io.StringIO()
self.lines.extend(lines[:-1])
if lines[-1]:
self.tail.write(lines[-1])
def resize(self, maxlen):
self.maxlen = maxlen
if maxlen == 0:
maxlen = shutil.get_terminal_size((80, 5))[1]
if maxlen != self.lines.maxlen:
self.lines = co.deque(self.lines, maxlen=maxlen)
canvas_lines = 1
def draw(self):
# did terminal size change?
if self.maxlen == 0:
self.resize(0)
# copy lines
lines = self.lines.copy()
# pad to fill any existing canvas, but truncate to terminal size
h = shutil.get_terminal_size((80, 5))[1]
lines.extend('' for _ in range(
len(lines),
min(RingIO.canvas_lines, h)))
while len(lines) > h:
if self.head:
lines.pop()
else:
lines.popleft()
# first thing first, give ourself a canvas
while RingIO.canvas_lines < len(lines):
sys.stdout.write('\n')
RingIO.canvas_lines += 1
# write lines from top to bottom so later lines overwrite earlier
# lines, note [xA/[xB stop at terminal boundaries
for i, line in enumerate(lines):
# move cursor, clear line, disable/reenable line wrapping
sys.stdout.write('\r')
if len(lines)-1-i > 0:
sys.stdout.write('\x1b[%dA' % (len(lines)-1-i))
sys.stdout.write('\x1b[K')
sys.stdout.write('\x1b[?7l')
sys.stdout.write(line)
sys.stdout.write('\x1b[?7h')
if len(lines)-1-i > 0:
sys.stdout.write('\x1b[%dB' % (len(lines)-1-i))
sys.stdout.flush()
def iself(path):
# check for an elf file's magic string (\x7fELF)
with open(path, 'rb') as f:
@ -699,7 +808,7 @@ def collect_ctx(obj_paths, *,
return ctx
def main(paths, *,
def main_(f, paths, *,
namespace_depth=2,
labels=[],
chars=[],
@ -718,6 +827,12 @@ def main(paths, *,
label=False,
no_label=False,
**args):
# give f an writeln function
def writeln(s=''):
f.write(s)
f.write('\n')
f.writeln = writeln
# figure out what color should be
if color == 'auto':
color = sys.stdout.isatty()
@ -748,17 +863,17 @@ def main(paths, *,
# figure out width/height
if width is None:
width_ = min(80, shutil.get_terminal_size((80, 5))[0])
elif width:
elif width > 0:
width_ = width
else:
width_ = shutil.get_terminal_size((80, 5))[0]
width_ = max(0, shutil.get_terminal_size((80, 5))[0] + width)
if height is None:
height_ = 2 if not no_header else 1
elif height:
elif height > 0:
height_ = height
else:
height_ = shutil.get_terminal_size((80, 5))[1] - 1
height_ = max(0, shutil.get_terminal_size((80, 5))[1] + height)
# try to parse files as CSV/JSON
results = []
@ -767,25 +882,25 @@ def main(paths, *,
# elf/callgraph files
fs = []
for path in paths:
f = openio(path)
if f.buffer.peek(4)[:4] == b'\x7fELF':
f_ = openio(path)
if f_.buffer.peek(4)[:4] == b'\x7fELF':
for f_ in fs:
f_.close()
raise StopIteration()
fs.append(f)
fs.append(f_)
for f in fs:
with f:
for f_ in fs:
with f_:
# csv or json? assume json starts with [
is_json = (f.buffer.peek(1)[:1] == b'[')
is_json = (f_.buffer.peek(1)[:1] == b'[')
# read csv?
if not is_json:
results.extend(csv.DictReader(f, restval=''))
results.extend(csv.DictReader(f_, restval=''))
# read json?
else:
results.extend(json.load(f))
results.extend(json.load(f_))
# fall back to extracting code/stack/ctx info from elf/callgraph files
except StopIteration:
@ -863,29 +978,29 @@ def main(paths, *,
return f['frame'] + limit
for k, f in functions.items():
if 'stack' in f:
if mt.isinf(f['stack']):
f['limit'] = limitof(k, f)
for k, f_ in functions.items():
if 'stack' in f_:
if mt.isinf(f_['stack']):
f_['limit'] = limitof(k, f_)
else:
f['limit'] = f['stack']
f_['limit'] = f_['stack']
# organize into subsystems
namespace_pattern = re.compile('_*[^_]+(?:_*$)?')
namespace_slice = slice(namespace_depth if namespace_depth else None)
subsystems = {}
for k, f in functions.items():
for k, f_ in functions.items():
# ignore leading/trailing underscores
f['subsystem'] = ''.join(
f_['subsystem'] = ''.join(
namespace_pattern.findall(k)[
namespace_slice])
if f['subsystem'] not in subsystems:
subsystems[f['subsystem']] = {'name': f['subsystem']}
if f_['subsystem'] not in subsystems:
subsystems[f_['subsystem']] = {'name': f_['subsystem']}
# include ctx in subsystems to give them different colors
for _, f in functions.items():
for a in f.get('args', []):
for _, f_ in functions.items():
for a in f_.get('args', []):
a['subsystem'] = a['name']
if a['subsystem'] not in subsystems:
@ -1119,9 +1234,9 @@ def main(paths, *,
# print some summary info
if not no_header:
if title:
print(punescape(title, totals['attrs'] | totals))
f.writeln(punescape(title, totals['attrs'] | totals))
else:
print('code %d stack %s ctx %d' % (
f.writeln('code %d stack %s ctx %d' % (
totals.get('code', 0),
(lambda s: '' if mt.isinf(s) else s)(
totals.get('stack', 0)),
@ -1130,7 +1245,58 @@ def main(paths, *,
# draw canvas
for row in range(canvas.height//canvas.yscale):
line = canvas.draw(row)
print(line)
f.writeln(line)
def main(paths, *,
height=None,
keep_open=False,
head=False,
cat=False,
sleep=False,
**args):
# keep-open?
if keep_open:
try:
while True:
# register inotify before running the command, this avoids
# modification race conditions
if Inotify:
inotify = Inotify(paths)
if cat:
main_(sys.stdout, paths,
# make space for shell prompt
height=height if height is not False else -1,
**args)
else:
ring = RingIO(head=head)
main_(ring, paths,
height=height if height is not False else 0,
**args)
ring.draw()
# try to inotifywait
if Inotify:
ptime = time.time()
inotify.read()
inotify.close()
# sleep a minimum amount of time to avoid flickering
time.sleep(max(0, (sleep or 0.01) - (time.time()-ptime)))
else:
time.sleep(sleep or 2)
except KeyboardInterrupt:
pass
if not cat:
sys.stdout.write('\n')
# single-pass?
else:
main_(sys.stdout, paths,
# make space for shell prompt
height=height if height is not False else -1,
**args)
if __name__ == "__main__":
@ -1238,14 +1404,15 @@ if __name__ == "__main__":
nargs='?',
type=lambda x: int(x, 0),
const=0,
help="Width in columns. 0 uses the terminal width. Defaults to "
"min(terminal, 80).")
help="Width in columns. <=0 uses the terminal width. Defaults "
"to min(terminal, 80).")
parser.add_argument(
'-H', '--height',
nargs='?',
type=lambda x: int(x, 0),
const=0,
help="Height in rows. 0 uses the terminal height. Defaults to 1.")
const=False,
help="Height in rows. <=0 uses the terminal height. Defaults "
"to 1.")
parser.add_argument(
'--no-header',
action='store_true',
@ -1347,6 +1514,23 @@ if __name__ == "__main__":
default=CTX_PATH,
help="Path to the ctx.py script, may include flags. "
"Defaults to %r." % CTX_PATH)
parser.add_argument(
'-k', '--keep-open',
action='store_true',
help="Continue to open and redraw the CSV files in a loop.")
parser.add_argument(
'-^', '--head',
action='store_true',
help="Show the first n lines.")
parser.add_argument(
'-z', '--cat',
action='store_true',
help="Pipe directly to stdout.")
parser.add_argument(
'-s', '--sleep',
type=float,
help="Time in seconds to sleep between redraws when running "
"with -k. Defaults to 2 seconds.")
sys.exit(main(**{k: v
for k, v in vars(parser.parse_intermixed_args()).items()
if v is not None}))