boon

unix terminal framework
git clone https://git.ce9e.org/boon.git

commit
55019329c45fbb0555884bbb34381d9d251483b7
parent
38a7537104e801f0e043496c78330f33c1f8df03
Author
Tobias Bengfort <tobias.bengfort@posteo.de>
Date
2023-11-05 09:33
lint

Diffstat

M boon.py 242 ++++++++++++++++++++++++++++++------------------------------

1 files changed, 121 insertions, 121 deletions


diff --git a/boon.py b/boon.py

@@ -1,5 +1,5 @@
    1    -1 import os
    2     1 import curses
   -1     2 import os
    3     3 import selectors
    4     4 import shutil
    5     5 import signal
@@ -28,151 +28,151 @@ KEY_LEFT = CSI + 'D'
   28    28 
   29    29 
   30    30 def isatty():
   31    -1 	return os.isatty(sys.stdout.fileno())
   -1    31     return os.isatty(sys.stdout.fileno())
   32    32 
   33    33 
   34    34 def get_cap(cap, *args):
   35    -1 	# see `man terminfo` for available capabilities
   36    -1 	if not isatty():
   37    -1 		return ''
   38    -1 	code = curses.tigetstr(cap)
   39    -1 	if not code:
   40    -1 		return ''
   41    -1 	if args:
   42    -1 		code = curses.tparm(code, *args)
   43    -1 	return code.decode('ascii')
   -1    35     # see `man terminfo` for available capabilities
   -1    36     if not isatty():
   -1    37         return ''
   -1    38     code = curses.tigetstr(cap)
   -1    39     if not code:
   -1    40         return ''
   -1    41     if args:
   -1    42         code = curses.tparm(code, *args)
   -1    43     return code.decode('ascii')
   44    44 
   45    45 
   46    46 def move(y, x):
   47    -1 	sys.stdout.write(get_cap('cup', y, x))
   -1    47     sys.stdout.write(get_cap('cup', y, x))
   48    48 
   49    49 
   50    50 class ReusableContextManager:
   51    -1 	def __init__(self, factory, *args, **kwargs):
   52    -1 		self.factory = factory
   53    -1 		self.args = args
   54    -1 		self.kwargs = kwargs
   -1    51     def __init__(self, factory, *args, **kwargs):
   -1    52         self.factory = factory
   -1    53         self.args = args
   -1    54         self.kwargs = kwargs
   55    55 
   56    -1 	def __enter__(self):
   57    -1 		self.mgr = self.factory(*self.args, **self.kwargs)
   58    -1 		return self.mgr.__enter__()
   -1    56     def __enter__(self):
   -1    57         self.mgr = self.factory(*self.args, **self.kwargs)
   -1    58         return self.mgr.__enter__()
   59    59 
   60    -1 	def __exit__(self, *args, **kwargs):
   61    -1 		return self.mgr.__exit__(*args, **kwargs)
   -1    60     def __exit__(self, *args, **kwargs):
   -1    61         return self.mgr.__exit__(*args, **kwargs)
   62    62 
   63    63 
   64    64 def reusable_contextmanager(func):
   65    -1 	factory = contextmanager(func)
   66    -1 	@wraps(func)
   67    -1 	def wrapper(*args, **kwargs):
   68    -1 		return ReusableContextManager(factory, *args, **kwargs)
   69    -1 	return wrapper
   -1    65     factory = contextmanager(func)
   -1    66     @wraps(func)
   -1    67     def wrapper(*args, **kwargs):
   -1    68         return ReusableContextManager(factory, *args, **kwargs)
   -1    69     return wrapper
   70    70 
   71    71 
   72    72 @reusable_contextmanager
   73    73 def tty_restore(fd):
   74    -1 	old = termios.tcgetattr(fd)
   75    -1 	try:
   76    -1 		yield
   77    -1 	finally:
   78    -1 		termios.tcsetattr(fd, termios.TCSADRAIN, old)
   -1    74     old = termios.tcgetattr(fd)
   -1    75     try:
   -1    76         yield
   -1    77     finally:
   -1    78         termios.tcsetattr(fd, termios.TCSADRAIN, old)
   79    79 
   80    80 
   81    81 @reusable_contextmanager
   82    82 def fullscreen():
   83    -1 	sys.stdout.write(get_cap('civis'))
   84    -1 	sys.stdout.write(get_cap('smcup'))
   85    -1 	sys.stdout.flush()
   86    -1 	try:
   87    -1 		fd = sys.stdin.fileno()
   88    -1 		with tty_restore(fd):
   89    -1 			tty.setcbreak(fd)
   90    -1 			yield
   91    -1 	finally:
   92    -1 		sys.stdout.write(get_cap('rmcup'))
   93    -1 		sys.stdout.write(get_cap('cnorm'))
   94    -1 		sys.stdout.flush()
   -1    83     sys.stdout.write(get_cap('civis'))
   -1    84     sys.stdout.write(get_cap('smcup'))
   -1    85     sys.stdout.flush()
   -1    86     try:
   -1    87         fd = sys.stdin.fileno()
   -1    88         with tty_restore(fd):
   -1    89             tty.setcbreak(fd)
   -1    90             yield
   -1    91     finally:
   -1    92         sys.stdout.write(get_cap('rmcup'))
   -1    93         sys.stdout.write(get_cap('cnorm'))
   -1    94         sys.stdout.flush()
   95    95 
   96    96 
   97    97 def getch():
   98    -1 	# NOTE: result might contain more than one key
   99    -1 	fd = sys.stdin.fileno()
  100    -1 	with tty_restore(fd):
  101    -1 		flags = termios.tcgetattr(fd)
  102    -1 		flags[6][termios.VMIN] = 0
  103    -1 		flags[6][termios.VTIME] = 0
  104    -1 		termios.tcsetattr(fd, termios.TCSADRAIN, flags)
  105    -1 		return sys.stdin.read(8)
   -1    98     # NOTE: result might contain more than one key
   -1    99     fd = sys.stdin.fileno()
   -1   100     with tty_restore(fd):
   -1   101         flags = termios.tcgetattr(fd)
   -1   102         flags[6][termios.VMIN] = 0
   -1   103         flags[6][termios.VTIME] = 0
   -1   104         termios.tcsetattr(fd, termios.TCSADRAIN, flags)
   -1   105         return sys.stdin.read(8)
  106   106 
  107   107 
  108   108 class App:
  109    -1 	def __init__(self):
  110    -1 		self.old_lines = []
  111    -1 		self.running = False
  112    -1 		self.timeout = 0.5
  113    -1 		self.selector = selectors.DefaultSelector()
  114    -1 		self.fullscreen = fullscreen()
  115    -1 
  116    -1 		# self-pipe to avoid concurrency issues with signal
  117    -1 		self.sig_in, self.sig_out = os.pipe2(os.O_NONBLOCK)
  118    -1 		signal.signal(signal.SIGWINCH, self.on_resize)
  119    -1 		signal.signal(signal.SIGTSTP, self.on_stop)
  120    -1 
  121    -1 	def update(self, force=False):
  122    -1 		lines = list(self.render(self.rows, self.cols))
  123    -1 		for i, line in enumerate(lines):
  124    -1 			if not force and len(self.old_lines) > i and line == self.old_lines[i]:
  125    -1 				continue
  126    -1 			move(i, 0)
  127    -1 			sys.stdout.write(get_cap('el'))
  128    -1 			sys.stdout.write(line)
  129    -1 
  130    -1 		# clear rest of screen
  131    -1 		if len(lines) < len(self.old_lines):
  132    -1 			move(len(lines), 0)
  133    -1 			sys.stdout.write(get_cap('ed'))
  134    -1 		sys.stdout.flush()
  135    -1 
  136    -1 		self.old_lines = lines
  137    -1 
  138    -1 	def on_resize(self, *args):
  139    -1 		os.write(self.sig_out, b'r')
  140    -1 
  141    -1 	def on_stop(self, *args):
  142    -1 		os.write(self.sig_out, b's')
  143    -1 
  144    -1 	def select(self, *fileobjs):
  145    -1 		with self.selector as sel:
  146    -1 			for fileobj in fileobjs:
  147    -1 				sel.register(fileobj, selectors.EVENT_READ)
  148    -1 			while self.running:
  149    -1 				yield from sel.select()
  150    -1 
  151    -1 	def run(self):
  152    -1 		self.running = True
  153    -1 		with self.fullscreen:
  154    -1 			self.on_resize()
  155    -1 			for key, mask in self.select(sys.stdin, self.sig_in):
  156    -1 				if key.fileobj is self.sig_in:
  157    -1 					b = os.read(self.sig_in, 1)
  158    -1 					if b == b'r':
  159    -1 						self.cols, self.rows = shutil.get_terminal_size()
  160    -1 						self.update(force=True)
  161    -1 					elif b == b's':
  162    -1 						self.fullscreen.__exit__(None, None, None)
  163    -1 						os.kill(os.getpid(), signal.SIGSTOP)
  164    -1 						# paused until SIGCONT
  165    -1 						self.fullscreen.__enter__()
  166    -1 						self.update(force=True)
  167    -1 				else:
  168    -1 					if key.fileobj is sys.stdin:
  169    -1 						self.on_key(getch())
  170    -1 					elif callable(key.data):
  171    -1 						key.data()
  172    -1 					self.update()
  173    -1 
  174    -1 	def render(self, rows, cols):
  175    -1 		return []
  176    -1 
  177    -1 	def on_key(self, key):
  178    -1 		pass
   -1   109     def __init__(self):
   -1   110         self.old_lines = []
   -1   111         self.running = False
   -1   112         self.timeout = 0.5
   -1   113         self.selector = selectors.DefaultSelector()
   -1   114         self.fullscreen = fullscreen()
   -1   115 
   -1   116         # self-pipe to avoid concurrency issues with signal
   -1   117         self.sig_in, self.sig_out = os.pipe2(os.O_NONBLOCK)
   -1   118         signal.signal(signal.SIGWINCH, self.on_resize)
   -1   119         signal.signal(signal.SIGTSTP, self.on_stop)
   -1   120 
   -1   121     def update(self, *, force=False):
   -1   122         lines = list(self.render(self.rows, self.cols))
   -1   123         for i, line in enumerate(lines):
   -1   124             if not force and len(self.old_lines) > i and line == self.old_lines[i]:
   -1   125                 continue
   -1   126             move(i, 0)
   -1   127             sys.stdout.write(get_cap('el'))
   -1   128             sys.stdout.write(line)
   -1   129 
   -1   130         # clear rest of screen
   -1   131         if len(lines) < len(self.old_lines):
   -1   132             move(len(lines), 0)
   -1   133             sys.stdout.write(get_cap('ed'))
   -1   134         sys.stdout.flush()
   -1   135 
   -1   136         self.old_lines = lines
   -1   137 
   -1   138     def on_resize(self, *args):
   -1   139         os.write(self.sig_out, b'r')
   -1   140 
   -1   141     def on_stop(self, *args):
   -1   142         os.write(self.sig_out, b's')
   -1   143 
   -1   144     def select(self, *fileobjs):
   -1   145         with self.selector as sel:
   -1   146             for fileobj in fileobjs:
   -1   147                 sel.register(fileobj, selectors.EVENT_READ)
   -1   148             while self.running:
   -1   149                 yield from sel.select()
   -1   150 
   -1   151     def run(self):
   -1   152         self.running = True
   -1   153         with self.fullscreen:
   -1   154             self.on_resize()
   -1   155             for key, _mask in self.select(sys.stdin, self.sig_in):
   -1   156                 if key.fileobj is self.sig_in:
   -1   157                     b = os.read(self.sig_in, 1)
   -1   158                     if b == b'r':
   -1   159                         self.cols, self.rows = shutil.get_terminal_size()
   -1   160                         self.update(force=True)
   -1   161                     elif b == b's':
   -1   162                         self.fullscreen.__exit__(None, None, None)
   -1   163                         os.kill(os.getpid(), signal.SIGSTOP)
   -1   164                         # paused until SIGCONT
   -1   165                         self.fullscreen.__enter__()
   -1   166                         self.update(force=True)
   -1   167                 else:
   -1   168                     if key.fileobj is sys.stdin:
   -1   169                         self.on_key(getch())
   -1   170                     elif callable(key.data):
   -1   171                         key.data()
   -1   172                     self.update()
   -1   173 
   -1   174     def render(self, rows, cols):
   -1   175         return []
   -1   176 
   -1   177     def on_key(self, key):
   -1   178         pass