1130 lines
38 KiB
Python
1130 lines
38 KiB
Python
|
#!/usr/bin/python
|
||
|
#
|
||
|
# Urwid raw display module
|
||
|
# Copyright (C) 2004-2009 Ian Ward
|
||
|
#
|
||
|
# This library is free software; you can redistribute it and/or
|
||
|
# modify it under the terms of the GNU Lesser General Public
|
||
|
# License as published by the Free Software Foundation; either
|
||
|
# version 2.1 of the License, or (at your option) any later version.
|
||
|
#
|
||
|
# This library is distributed in the hope that it will be useful,
|
||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
||
|
# Lesser General Public License for more details.
|
||
|
#
|
||
|
# You should have received a copy of the GNU Lesser General Public
|
||
|
# License along with this library; if not, write to the Free Software
|
||
|
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
||
|
#
|
||
|
# Urwid web site: https://urwid.org/
|
||
|
|
||
|
|
||
|
"""
|
||
|
Direct terminal UI implementation
|
||
|
"""
|
||
|
|
||
|
from __future__ import annotations
|
||
|
|
||
|
import io
|
||
|
import os
|
||
|
import select
|
||
|
import signal
|
||
|
import struct
|
||
|
import sys
|
||
|
import typing
|
||
|
|
||
|
try:
|
||
|
import fcntl
|
||
|
import termios
|
||
|
import tty
|
||
|
except ImportError:
|
||
|
pass # windows
|
||
|
|
||
|
from subprocess import PIPE, Popen
|
||
|
|
||
|
from urwid import escape, signals, util
|
||
|
from urwid.display_common import (
|
||
|
INPUT_DESCRIPTORS_CHANGED,
|
||
|
UNPRINTABLE_TRANS_TABLE,
|
||
|
UPDATE_PALETTE_ENTRY,
|
||
|
AttrSpec,
|
||
|
BaseScreen,
|
||
|
RealTerminal,
|
||
|
)
|
||
|
|
||
|
if typing.TYPE_CHECKING:
|
||
|
from typing_extensions import Literal
|
||
|
|
||
|
|
||
|
class Screen(BaseScreen, RealTerminal):
|
||
|
def __init__(self, input=sys.stdin, output=sys.stdout, bracketed_paste_mode=False):
|
||
|
"""Initialize a screen that directly prints escape codes to an output
|
||
|
terminal.
|
||
|
|
||
|
bracketed_paste_mode -- enable bracketed paste mode in the host terminal.
|
||
|
If the host terminal supports it, the application will receive `begin paste`
|
||
|
and `end paste` keystrokes when the user pastes text.
|
||
|
"""
|
||
|
super().__init__()
|
||
|
self._pal_escape = {}
|
||
|
self._pal_attrspec = {}
|
||
|
signals.connect_signal(self, UPDATE_PALETTE_ENTRY, self._on_update_palette_entry)
|
||
|
self.colors = 16 # FIXME: detect this
|
||
|
self.has_underline = True # FIXME: detect this
|
||
|
self._keyqueue = []
|
||
|
self.prev_input_resize = 0
|
||
|
self.set_input_timeouts()
|
||
|
self.screen_buf = None
|
||
|
self._screen_buf_canvas = None
|
||
|
self._resized = False
|
||
|
self.maxrow = None
|
||
|
self.gpm_mev: Popen | None = None
|
||
|
self.gpm_event_pending: bool = False
|
||
|
self._mouse_tracking_enabled = False
|
||
|
self.last_bstate = 0
|
||
|
self._setup_G1_done = False
|
||
|
self._rows_used = None
|
||
|
self._cy = 0
|
||
|
self.term = os.environ.get('TERM', '')
|
||
|
self.fg_bright_is_bold = not self.term.startswith("xterm")
|
||
|
self.bg_bright_is_blink = (self.term == "linux")
|
||
|
self.back_color_erase = not self.term.startswith("screen")
|
||
|
self.register_palette_entry( None, 'default','default')
|
||
|
self._next_timeout = None
|
||
|
self.signal_handler_setter = signal.signal
|
||
|
self.bracketed_paste_mode = bracketed_paste_mode
|
||
|
|
||
|
# Our connections to the world
|
||
|
self._term_output_file = output
|
||
|
self._term_input_file = input
|
||
|
|
||
|
# pipe for signalling external event loops about resize events
|
||
|
self._resize_pipe_rd, self._resize_pipe_wr = os.pipe()
|
||
|
fcntl.fcntl(self._resize_pipe_rd, fcntl.F_SETFL, os.O_NONBLOCK)
|
||
|
|
||
|
# These store the previous signal handlers after setting ours
|
||
|
self._prev_sigcont_handler = None
|
||
|
self._prev_sigtstp_handler = None
|
||
|
self._prev_sigwinch_handler = None
|
||
|
|
||
|
def _input_fileno(self):
|
||
|
"""Returns the fileno of the input stream, or None if it doesn't have one. A stream without a fileno can't participate in whatever.
|
||
|
"""
|
||
|
if hasattr(self._term_input_file, 'fileno'):
|
||
|
return self._term_input_file.fileno()
|
||
|
else:
|
||
|
return None
|
||
|
|
||
|
def _on_update_palette_entry(self, name, *attrspecs):
|
||
|
# copy the attribute to a dictionary containing the escape seqences
|
||
|
a = attrspecs[{16:0,1:1,88:2,256:3,2**24:4}[self.colors]]
|
||
|
self._pal_attrspec[name] = a
|
||
|
self._pal_escape[name] = self._attrspec_to_escape(a)
|
||
|
|
||
|
def set_input_timeouts(self, max_wait=None, complete_wait=0.125,
|
||
|
resize_wait=0.125):
|
||
|
"""
|
||
|
Set the get_input timeout values. All values are in floating
|
||
|
point numbers of seconds.
|
||
|
|
||
|
max_wait -- amount of time in seconds to wait for input when
|
||
|
there is no input pending, wait forever if None
|
||
|
complete_wait -- amount of time in seconds to wait when
|
||
|
get_input detects an incomplete escape sequence at the
|
||
|
end of the available input
|
||
|
resize_wait -- amount of time in seconds to wait for more input
|
||
|
after receiving two screen resize requests in a row to
|
||
|
stop Urwid from consuming 100% cpu during a gradual
|
||
|
window resize operation
|
||
|
"""
|
||
|
self.max_wait = max_wait
|
||
|
if max_wait is not None:
|
||
|
if self._next_timeout is None:
|
||
|
self._next_timeout = max_wait
|
||
|
else:
|
||
|
self._next_timeout = min(self._next_timeout, self.max_wait)
|
||
|
self.complete_wait = complete_wait
|
||
|
self.resize_wait = resize_wait
|
||
|
|
||
|
def _sigwinch_handler(self, signum, frame=None):
|
||
|
"""
|
||
|
frame -- will always be None when the GLib event loop is being used.
|
||
|
"""
|
||
|
|
||
|
if not self._resized:
|
||
|
os.write(self._resize_pipe_wr, b'R')
|
||
|
self._resized = True
|
||
|
self.screen_buf = None
|
||
|
|
||
|
if callable(self._prev_sigwinch_handler):
|
||
|
self._prev_sigwinch_handler(signum, frame)
|
||
|
|
||
|
def _sigtstp_handler(self, signum, frame=None):
|
||
|
self.stop() # Restores the previous signal handlers
|
||
|
self._prev_sigcont_handler = self.signal_handler_setter(signal.SIGCONT, self._sigcont_handler)
|
||
|
# Handled by the previous handler.
|
||
|
# If non-default, it may set its own SIGCONT handler which should hopefully call our own.
|
||
|
os.kill(os.getpid(), signal.SIGTSTP)
|
||
|
|
||
|
def _sigcont_handler(self, signum, frame=None):
|
||
|
"""
|
||
|
frame -- will always be None when the GLib event loop is being used.
|
||
|
"""
|
||
|
self.signal_restore()
|
||
|
|
||
|
if callable(self._prev_sigcont_handler):
|
||
|
# May set its own SIGTSTP handler which would be stored and replaced in
|
||
|
# `signal_init()` (via `start()`).
|
||
|
self._prev_sigcont_handler(signum, frame)
|
||
|
|
||
|
self.start()
|
||
|
self._sigwinch_handler(None, None)
|
||
|
|
||
|
def signal_init(self):
|
||
|
"""
|
||
|
Called in the startup of run wrapper to set the SIGWINCH
|
||
|
and SIGTSTP signal handlers.
|
||
|
|
||
|
Override this function to call from main thread in threaded
|
||
|
applications.
|
||
|
"""
|
||
|
self._prev_sigwinch_handler = self.signal_handler_setter(signal.SIGWINCH, self._sigwinch_handler)
|
||
|
self._prev_sigtstp_handler = self.signal_handler_setter(signal.SIGTSTP, self._sigtstp_handler)
|
||
|
|
||
|
def signal_restore(self):
|
||
|
"""
|
||
|
Called in the finally block of run wrapper to restore the
|
||
|
SIGTSTP, SIGCONT and SIGWINCH signal handlers.
|
||
|
|
||
|
Override this function to call from main thread in threaded
|
||
|
applications.
|
||
|
"""
|
||
|
self.signal_handler_setter(signal.SIGTSTP, self._prev_sigtstp_handler or signal.SIG_DFL)
|
||
|
self.signal_handler_setter(signal.SIGCONT, self._prev_sigcont_handler or signal.SIG_DFL)
|
||
|
self.signal_handler_setter(signal.SIGWINCH, self._prev_sigwinch_handler or signal.SIG_DFL)
|
||
|
|
||
|
def set_mouse_tracking(self, enable=True):
|
||
|
"""
|
||
|
Enable (or disable) mouse tracking.
|
||
|
|
||
|
After calling this function get_input will include mouse
|
||
|
click events along with keystrokes.
|
||
|
"""
|
||
|
enable = bool(enable)
|
||
|
if enable == self._mouse_tracking_enabled:
|
||
|
return
|
||
|
|
||
|
self._mouse_tracking(enable)
|
||
|
self._mouse_tracking_enabled = enable
|
||
|
|
||
|
def _mouse_tracking(self, enable):
|
||
|
if enable:
|
||
|
self.write(escape.MOUSE_TRACKING_ON)
|
||
|
self._start_gpm_tracking()
|
||
|
else:
|
||
|
self.write(escape.MOUSE_TRACKING_OFF)
|
||
|
self._stop_gpm_tracking()
|
||
|
|
||
|
def _start_gpm_tracking(self):
|
||
|
if not os.path.isfile("/usr/bin/mev"):
|
||
|
return
|
||
|
if not os.environ.get('TERM', "").lower().startswith("linux"):
|
||
|
return
|
||
|
|
||
|
m = Popen(["/usr/bin/mev", "-e", "158"], stdin=PIPE, stdout=PIPE, close_fds=True, encoding="ascii")
|
||
|
fcntl.fcntl(m.stdout.fileno(), fcntl.F_SETFL, os.O_NONBLOCK)
|
||
|
self.gpm_mev = m
|
||
|
|
||
|
def _stop_gpm_tracking(self):
|
||
|
if not self.gpm_mev:
|
||
|
return
|
||
|
os.kill(self.gpm_mev.pid, signal.SIGINT)
|
||
|
os.waitpid(self.gpm_mev.pid, 0)
|
||
|
self.gpm_mev = None
|
||
|
|
||
|
def _start(self, alternate_buffer=True):
|
||
|
"""
|
||
|
Initialize the screen and input mode.
|
||
|
|
||
|
alternate_buffer -- use alternate screen buffer
|
||
|
"""
|
||
|
if alternate_buffer:
|
||
|
self.write(escape.SWITCH_TO_ALTERNATE_BUFFER)
|
||
|
self._rows_used = None
|
||
|
else:
|
||
|
self._rows_used = 0
|
||
|
|
||
|
if (self.bracketed_paste_mode):
|
||
|
self.write(escape.ENABLE_BRACKETED_PASTE_MODE)
|
||
|
|
||
|
fd = self._input_fileno()
|
||
|
if fd is not None and os.isatty(fd):
|
||
|
self._old_termios_settings = termios.tcgetattr(fd)
|
||
|
tty.setcbreak(fd)
|
||
|
|
||
|
self.signal_init()
|
||
|
self._alternate_buffer = alternate_buffer
|
||
|
self._next_timeout = self.max_wait
|
||
|
|
||
|
if not self._signal_keys_set:
|
||
|
self._old_signal_keys = self.tty_signal_keys(fileno=fd)
|
||
|
|
||
|
signals.emit_signal(self, INPUT_DESCRIPTORS_CHANGED)
|
||
|
# restore mouse tracking to previous state
|
||
|
self._mouse_tracking(self._mouse_tracking_enabled)
|
||
|
|
||
|
return super()._start()
|
||
|
|
||
|
def _stop(self):
|
||
|
"""
|
||
|
Restore the screen.
|
||
|
"""
|
||
|
self.clear()
|
||
|
|
||
|
if (self.bracketed_paste_mode):
|
||
|
self.write(escape.DISABLE_BRACKETED_PASTE_MODE)
|
||
|
|
||
|
signals.emit_signal(self, INPUT_DESCRIPTORS_CHANGED)
|
||
|
|
||
|
self.signal_restore()
|
||
|
|
||
|
fd = self._input_fileno()
|
||
|
if fd is not None and os.isatty(fd):
|
||
|
termios.tcsetattr(fd, termios.TCSADRAIN, self._old_termios_settings)
|
||
|
|
||
|
self._mouse_tracking(False)
|
||
|
|
||
|
move_cursor = ""
|
||
|
if self._alternate_buffer:
|
||
|
move_cursor = escape.RESTORE_NORMAL_BUFFER
|
||
|
elif self.maxrow is not None:
|
||
|
move_cursor = escape.set_cursor_position(
|
||
|
0, self.maxrow)
|
||
|
self.write(
|
||
|
self._attrspec_to_escape(AttrSpec('',''))
|
||
|
+ escape.SI
|
||
|
+ move_cursor
|
||
|
+ escape.SHOW_CURSOR)
|
||
|
self.flush()
|
||
|
|
||
|
if self._old_signal_keys:
|
||
|
self.tty_signal_keys(*(self._old_signal_keys + (fd,)))
|
||
|
|
||
|
super()._stop()
|
||
|
|
||
|
|
||
|
def write(self, data):
|
||
|
"""Write some data to the terminal.
|
||
|
|
||
|
You may wish to override this if you're using something other than
|
||
|
regular files for input and output.
|
||
|
"""
|
||
|
self._term_output_file.write(data)
|
||
|
|
||
|
def flush(self):
|
||
|
"""Flush the output buffer.
|
||
|
|
||
|
You may wish to override this if you're using something other than
|
||
|
regular files for input and output.
|
||
|
"""
|
||
|
self._term_output_file.flush()
|
||
|
|
||
|
@typing.overload
|
||
|
def get_input(self, raw_keys: Literal[False]) -> list[str]:
|
||
|
...
|
||
|
|
||
|
@typing.overload
|
||
|
def get_input(self, raw_keys: Literal[True]) -> tuple[list[str], list[int]]:
|
||
|
...
|
||
|
|
||
|
def get_input(self, raw_keys: bool = False) -> list[str] | tuple[list[str], list[int]]:
|
||
|
"""Return pending input as a list.
|
||
|
|
||
|
raw_keys -- return raw keycodes as well as translated versions
|
||
|
|
||
|
This function will immediately return all the input since the
|
||
|
last time it was called. If there is no input pending it will
|
||
|
wait before returning an empty list. The wait time may be
|
||
|
configured with the set_input_timeouts function.
|
||
|
|
||
|
If raw_keys is False (default) this function will return a list
|
||
|
of keys pressed. If raw_keys is True this function will return
|
||
|
a ( keys pressed, raw keycodes ) tuple instead.
|
||
|
|
||
|
Examples of keys returned:
|
||
|
|
||
|
* ASCII printable characters: " ", "a", "0", "A", "-", "/"
|
||
|
* ASCII control characters: "tab", "enter"
|
||
|
* Escape sequences: "up", "page up", "home", "insert", "f1"
|
||
|
* Key combinations: "shift f1", "meta a", "ctrl b"
|
||
|
* Window events: "window resize"
|
||
|
|
||
|
When a narrow encoding is not enabled:
|
||
|
|
||
|
* "Extended ASCII" characters: "\\xa1", "\\xb2", "\\xfe"
|
||
|
|
||
|
When a wide encoding is enabled:
|
||
|
|
||
|
* Double-byte characters: "\\xa1\\xea", "\\xb2\\xd4"
|
||
|
|
||
|
When utf8 encoding is enabled:
|
||
|
|
||
|
* Unicode characters: u"\\u00a5", u'\\u253c"
|
||
|
|
||
|
Examples of mouse events returned:
|
||
|
|
||
|
* Mouse button press: ('mouse press', 1, 15, 13),
|
||
|
('meta mouse press', 2, 17, 23)
|
||
|
* Mouse drag: ('mouse drag', 1, 16, 13),
|
||
|
('mouse drag', 1, 17, 13),
|
||
|
('ctrl mouse drag', 1, 18, 13)
|
||
|
* Mouse button release: ('mouse release', 0, 18, 13),
|
||
|
('ctrl mouse release', 0, 17, 23)
|
||
|
"""
|
||
|
assert self._started
|
||
|
|
||
|
self._wait_for_input_ready(self._next_timeout)
|
||
|
keys, raw = self.parse_input(None, None, self.get_available_raw_input())
|
||
|
|
||
|
# Avoid pegging CPU at 100% when slowly resizing
|
||
|
if keys == ['window resize'] and self.prev_input_resize:
|
||
|
while True:
|
||
|
self._wait_for_input_ready(self.resize_wait)
|
||
|
keys, raw2 = self.parse_input(None, None, self.get_available_raw_input())
|
||
|
raw += raw2
|
||
|
# if not keys:
|
||
|
# keys, raw2 = self._get_input(
|
||
|
# self.resize_wait)
|
||
|
# raw += raw2
|
||
|
if keys != ['window resize']:
|
||
|
break
|
||
|
if keys[-1:] != ['window resize']:
|
||
|
keys.append('window resize')
|
||
|
|
||
|
if keys == ['window resize']:
|
||
|
self.prev_input_resize = 2
|
||
|
elif self.prev_input_resize == 2 and not keys:
|
||
|
self.prev_input_resize = 1
|
||
|
else:
|
||
|
self.prev_input_resize = 0
|
||
|
|
||
|
if raw_keys:
|
||
|
return keys, raw
|
||
|
return keys
|
||
|
|
||
|
def get_input_descriptors(self) -> list[int]:
|
||
|
"""
|
||
|
Return a list of integer file descriptors that should be
|
||
|
polled in external event loops to check for user input.
|
||
|
|
||
|
Use this method if you are implementing your own event loop.
|
||
|
|
||
|
This method is only called by `hook_event_loop`, so if you override
|
||
|
that, you can safely ignore this.
|
||
|
"""
|
||
|
if not self._started:
|
||
|
return []
|
||
|
|
||
|
fd_list = [self._resize_pipe_rd]
|
||
|
fd = self._input_fileno()
|
||
|
if fd is not None:
|
||
|
fd_list.append(fd)
|
||
|
if self.gpm_mev is not None:
|
||
|
fd_list.append(self.gpm_mev.stdout.fileno())
|
||
|
return fd_list
|
||
|
|
||
|
_current_event_loop_handles = ()
|
||
|
|
||
|
def unhook_event_loop(self, event_loop):
|
||
|
"""
|
||
|
Remove any hooks added by hook_event_loop.
|
||
|
"""
|
||
|
for handle in self._current_event_loop_handles:
|
||
|
event_loop.remove_watch_file(handle)
|
||
|
|
||
|
if self._input_timeout:
|
||
|
event_loop.remove_alarm(self._input_timeout)
|
||
|
self._input_timeout = None
|
||
|
|
||
|
def hook_event_loop(self, event_loop, callback):
|
||
|
"""
|
||
|
Register the given callback with the event loop, to be called with new
|
||
|
input whenever it's available. The callback should be passed a list of
|
||
|
processed keys and a list of unprocessed keycodes.
|
||
|
|
||
|
Subclasses may wish to use parse_input to wrap the callback.
|
||
|
"""
|
||
|
if hasattr(self, 'get_input_nonblocking'):
|
||
|
wrapper = self._make_legacy_input_wrapper(event_loop, callback)
|
||
|
else:
|
||
|
wrapper = lambda: self.parse_input(
|
||
|
event_loop, callback, self.get_available_raw_input())
|
||
|
fds = self.get_input_descriptors()
|
||
|
handles = [event_loop.watch_file(fd, wrapper) for fd in fds]
|
||
|
self._current_event_loop_handles = handles
|
||
|
|
||
|
_input_timeout = None
|
||
|
_partial_codes = None
|
||
|
|
||
|
def _make_legacy_input_wrapper(self, event_loop, callback):
|
||
|
"""
|
||
|
Support old Screen classes that still have a get_input_nonblocking and
|
||
|
expect it to work.
|
||
|
"""
|
||
|
def wrapper():
|
||
|
if self._input_timeout:
|
||
|
event_loop.remove_alarm(self._input_timeout)
|
||
|
self._input_timeout = None
|
||
|
timeout, keys, raw = self.get_input_nonblocking()
|
||
|
if timeout is not None:
|
||
|
self._input_timeout = event_loop.alarm(timeout, wrapper)
|
||
|
|
||
|
callback(keys, raw)
|
||
|
|
||
|
return wrapper
|
||
|
|
||
|
def get_available_raw_input(self):
|
||
|
"""
|
||
|
Return any currently-available input. Does not block.
|
||
|
|
||
|
This method is only used by the default `hook_event_loop`
|
||
|
implementation; you can safely ignore it if you implement your own.
|
||
|
"""
|
||
|
codes = self._get_gpm_codes() + self._get_keyboard_codes()
|
||
|
|
||
|
if self._partial_codes:
|
||
|
codes = self._partial_codes + codes
|
||
|
self._partial_codes = None
|
||
|
|
||
|
# clean out the pipe used to signal external event loops
|
||
|
# that a resize has occurred
|
||
|
try:
|
||
|
while True: os.read(self._resize_pipe_rd, 1)
|
||
|
except OSError:
|
||
|
pass
|
||
|
|
||
|
return codes
|
||
|
|
||
|
def parse_input(self, event_loop, callback, codes, wait_for_more=True):
|
||
|
"""
|
||
|
Read any available input from get_available_raw_input, parses it into
|
||
|
keys, and calls the given callback.
|
||
|
|
||
|
The current implementation tries to avoid any assumptions about what
|
||
|
the screen or event loop look like; it only deals with parsing keycodes
|
||
|
and setting a timeout when an incomplete one is detected.
|
||
|
|
||
|
`codes` should be a sequence of keycodes, i.e. bytes. A bytearray is
|
||
|
appropriate, but beware of using bytes, which only iterates as integers
|
||
|
on Python 3.
|
||
|
"""
|
||
|
# Note: event_loop may be None for 100% synchronous support, only used
|
||
|
# by get_input. Not documented because you shouldn't be doing it.
|
||
|
if self._input_timeout and event_loop:
|
||
|
event_loop.remove_alarm(self._input_timeout)
|
||
|
self._input_timeout = None
|
||
|
|
||
|
original_codes = codes
|
||
|
processed = []
|
||
|
try:
|
||
|
while codes:
|
||
|
run, codes = escape.process_keyqueue(
|
||
|
codes, wait_for_more)
|
||
|
processed.extend(run)
|
||
|
except escape.MoreInputRequired:
|
||
|
# Set a timer to wait for the rest of the input; if it goes off
|
||
|
# without any new input having come in, use the partial input
|
||
|
k = len(original_codes) - len(codes)
|
||
|
processed_codes = original_codes[:k]
|
||
|
self._partial_codes = codes
|
||
|
|
||
|
def _parse_incomplete_input():
|
||
|
self._input_timeout = None
|
||
|
self._partial_codes = None
|
||
|
self.parse_input(
|
||
|
event_loop, callback, codes, wait_for_more=False)
|
||
|
if event_loop:
|
||
|
self._input_timeout = event_loop.alarm(
|
||
|
self.complete_wait, _parse_incomplete_input)
|
||
|
|
||
|
else:
|
||
|
processed_codes = original_codes
|
||
|
self._partial_codes = None
|
||
|
|
||
|
if self._resized:
|
||
|
processed.append('window resize')
|
||
|
self._resized = False
|
||
|
|
||
|
if callback:
|
||
|
callback(processed, processed_codes)
|
||
|
else:
|
||
|
# For get_input
|
||
|
return processed, processed_codes
|
||
|
|
||
|
def _get_keyboard_codes(self):
|
||
|
codes = []
|
||
|
while True:
|
||
|
code = self._getch_nodelay()
|
||
|
if code < 0:
|
||
|
break
|
||
|
codes.append(code)
|
||
|
return codes
|
||
|
|
||
|
def _get_gpm_codes(self):
|
||
|
codes = []
|
||
|
try:
|
||
|
while self.gpm_mev is not None and self.gpm_event_pending:
|
||
|
codes.extend(self._encode_gpm_event())
|
||
|
except OSError as e:
|
||
|
if e.args[0] != 11:
|
||
|
raise
|
||
|
return codes
|
||
|
|
||
|
def _wait_for_input_ready(self, timeout):
|
||
|
ready = None
|
||
|
fd_list = [self._resize_pipe_rd]
|
||
|
fd = self._input_fileno()
|
||
|
if fd is not None:
|
||
|
fd_list.append(fd)
|
||
|
if self.gpm_mev is not None:
|
||
|
fd_list.append(self.gpm_mev.stdout.fileno())
|
||
|
while True:
|
||
|
try:
|
||
|
if timeout is None:
|
||
|
ready,w,err = select.select(
|
||
|
fd_list, [], fd_list)
|
||
|
else:
|
||
|
ready,w,err = select.select(
|
||
|
fd_list,[],fd_list, timeout)
|
||
|
break
|
||
|
except OSError as e:
|
||
|
if e.args[0] != 4:
|
||
|
raise
|
||
|
if self._resized:
|
||
|
ready = []
|
||
|
break
|
||
|
return ready
|
||
|
|
||
|
def _getch(self, timeout: int) ->int:
|
||
|
ready = self._wait_for_input_ready(timeout)
|
||
|
if self.gpm_mev is not None:
|
||
|
if self.gpm_mev.stdout.fileno() in ready:
|
||
|
self.gpm_event_pending = True
|
||
|
fd = self._input_fileno()
|
||
|
if fd is not None and fd in ready:
|
||
|
return ord(os.read(fd, 1))
|
||
|
return -1
|
||
|
|
||
|
def _encode_gpm_event(self) -> list[int]:
|
||
|
self.gpm_event_pending = False
|
||
|
s = self.gpm_mev.stdout.readline()
|
||
|
l = s.split(", ")
|
||
|
if len(l) != 6:
|
||
|
# unexpected output, stop tracking
|
||
|
self._stop_gpm_tracking()
|
||
|
signals.emit_signal(self, INPUT_DESCRIPTORS_CHANGED)
|
||
|
return []
|
||
|
ev, x, y, ign, b, m = s.split(",")
|
||
|
ev = int(ev.split("x")[-1], 16)
|
||
|
x = int( x.split(" ")[-1] )
|
||
|
y = int( y.lstrip().split(" ")[0] )
|
||
|
b = int( b.split(" ")[-1] )
|
||
|
m = int( m.split("x")[-1].rstrip(), 16 )
|
||
|
|
||
|
# convert to xterm-like escape sequence
|
||
|
|
||
|
last = next = self.last_bstate
|
||
|
l = []
|
||
|
|
||
|
mod = 0
|
||
|
if m & 1:
|
||
|
mod |= 4 # shift
|
||
|
if m & 10:
|
||
|
mod |= 8 # alt
|
||
|
if m & 4:
|
||
|
mod |= 16 # ctrl
|
||
|
|
||
|
def append_button(b):
|
||
|
b |= mod
|
||
|
l.extend([27, ord('['), ord('M'), b+32, x+32, y+32])
|
||
|
|
||
|
def determine_button_release(flag: int) -> None:
|
||
|
if b & 4 and last & 1:
|
||
|
append_button( 0 + flag )
|
||
|
next |= 1
|
||
|
if b & 2 and last & 2:
|
||
|
append_button( 1 + flag )
|
||
|
next |= 2
|
||
|
if b & 1 and last & 4:
|
||
|
append_button( 2 + flag )
|
||
|
next |= 4
|
||
|
|
||
|
if ev == 20 or ev == 36 or ev == 52: # press
|
||
|
if b & 4 and last & 1 == 0:
|
||
|
append_button( 0 )
|
||
|
next |= 1
|
||
|
if b & 2 and last & 2 == 0:
|
||
|
append_button( 1 )
|
||
|
next |= 2
|
||
|
if b & 1 and last & 4 == 0:
|
||
|
append_button( 2 )
|
||
|
next |= 4
|
||
|
elif ev == 146: # drag
|
||
|
if b & 4:
|
||
|
append_button( 0 + escape.MOUSE_DRAG_FLAG )
|
||
|
elif b & 2:
|
||
|
append_button( 1 + escape.MOUSE_DRAG_FLAG )
|
||
|
elif b & 1:
|
||
|
append_button( 2 + escape.MOUSE_DRAG_FLAG )
|
||
|
else: # release
|
||
|
if b & 4 and last & 1:
|
||
|
append_button( 0 + escape.MOUSE_RELEASE_FLAG )
|
||
|
next &= ~ 1
|
||
|
if b & 2 and last & 2:
|
||
|
append_button( 1 + escape.MOUSE_RELEASE_FLAG )
|
||
|
next &= ~ 2
|
||
|
if b & 1 and last & 4:
|
||
|
append_button( 2 + escape.MOUSE_RELEASE_FLAG )
|
||
|
next &= ~ 4
|
||
|
if ev == 40: # double click (release)
|
||
|
if b & 4 and last & 1:
|
||
|
append_button( 0 + escape.MOUSE_MULTIPLE_CLICK_FLAG )
|
||
|
if b & 2 and last & 2:
|
||
|
append_button( 1 + escape.MOUSE_MULTIPLE_CLICK_FLAG )
|
||
|
if b & 1 and last & 4:
|
||
|
append_button( 2 + escape.MOUSE_MULTIPLE_CLICK_FLAG )
|
||
|
elif ev == 52:
|
||
|
if b & 4 and last & 1:
|
||
|
append_button( 0 + escape.MOUSE_MULTIPLE_CLICK_FLAG*2 )
|
||
|
if b & 2 and last & 2:
|
||
|
append_button( 1 + escape.MOUSE_MULTIPLE_CLICK_FLAG*2 )
|
||
|
if b & 1 and last & 4:
|
||
|
append_button( 2 + escape.MOUSE_MULTIPLE_CLICK_FLAG*2 )
|
||
|
|
||
|
self.last_bstate = next
|
||
|
return l
|
||
|
|
||
|
def _getch_nodelay(self):
|
||
|
return self._getch(0)
|
||
|
|
||
|
def get_cols_rows(self):
|
||
|
"""Return the terminal dimensions (num columns, num rows)."""
|
||
|
y, x = 24, 80
|
||
|
try:
|
||
|
if hasattr(self._term_output_file, 'fileno'):
|
||
|
buf = fcntl.ioctl(self._term_output_file.fileno(), termios.TIOCGWINSZ, ' '*4)
|
||
|
y, x = struct.unpack('hh', buf)
|
||
|
except OSError:
|
||
|
# Term size could not be determined
|
||
|
pass
|
||
|
# Provide some lightweight fallbacks in case the TIOCWINSZ doesn't
|
||
|
# give sane answers
|
||
|
if (x <= 0 or y <= 0) and self.term in ('ansi', 'vt100'):
|
||
|
y, x = 24, 80
|
||
|
self.maxrow = y
|
||
|
return x, y
|
||
|
|
||
|
def _setup_G1(self):
|
||
|
"""
|
||
|
Initialize the G1 character set to graphics mode if required.
|
||
|
"""
|
||
|
if self._setup_G1_done:
|
||
|
return
|
||
|
|
||
|
while True:
|
||
|
try:
|
||
|
self.write(escape.DESIGNATE_G1_SPECIAL)
|
||
|
self.flush()
|
||
|
break
|
||
|
except OSError:
|
||
|
pass
|
||
|
self._setup_G1_done = True
|
||
|
|
||
|
|
||
|
def draw_screen(self, maxres, r ):
|
||
|
"""Paint screen with rendered canvas."""
|
||
|
|
||
|
(maxcol, maxrow) = maxres
|
||
|
|
||
|
assert self._started
|
||
|
|
||
|
assert maxrow == r.rows()
|
||
|
|
||
|
# quick return if nothing has changed
|
||
|
if self.screen_buf and r is self._screen_buf_canvas:
|
||
|
return
|
||
|
|
||
|
self._setup_G1()
|
||
|
|
||
|
if self._resized:
|
||
|
# handle resize before trying to draw screen
|
||
|
return
|
||
|
|
||
|
o = [escape.HIDE_CURSOR, self._attrspec_to_escape(AttrSpec('',''))]
|
||
|
|
||
|
def partial_display():
|
||
|
# returns True if the screen is in partial display mode
|
||
|
# ie. only some rows belong to the display
|
||
|
return self._rows_used is not None
|
||
|
|
||
|
if not partial_display():
|
||
|
o.append(escape.CURSOR_HOME)
|
||
|
|
||
|
if self.screen_buf:
|
||
|
osb = self.screen_buf
|
||
|
else:
|
||
|
osb = []
|
||
|
sb = []
|
||
|
cy = self._cy
|
||
|
y = -1
|
||
|
|
||
|
def set_cursor_home():
|
||
|
if not partial_display():
|
||
|
return escape.set_cursor_position(0, 0)
|
||
|
return (escape.CURSOR_HOME_COL +
|
||
|
escape.move_cursor_up(cy))
|
||
|
|
||
|
def set_cursor_row(y):
|
||
|
if not partial_display():
|
||
|
return escape.set_cursor_position(0, y)
|
||
|
return escape.move_cursor_down(y - cy)
|
||
|
|
||
|
def set_cursor_position(x, y):
|
||
|
if not partial_display():
|
||
|
return escape.set_cursor_position(x, y)
|
||
|
if cy > y:
|
||
|
return ('\b' + escape.CURSOR_HOME_COL +
|
||
|
escape.move_cursor_up(cy - y) +
|
||
|
escape.move_cursor_right(x))
|
||
|
return ('\b' + escape.CURSOR_HOME_COL +
|
||
|
escape.move_cursor_down(y - cy) +
|
||
|
escape.move_cursor_right(x))
|
||
|
|
||
|
def is_blank_row(row):
|
||
|
if len(row) > 1:
|
||
|
return False
|
||
|
if row[0][2].strip():
|
||
|
return False
|
||
|
return True
|
||
|
|
||
|
def attr_to_escape(a):
|
||
|
if a in self._pal_escape:
|
||
|
return self._pal_escape[a]
|
||
|
elif isinstance(a, AttrSpec):
|
||
|
return self._attrspec_to_escape(a)
|
||
|
# undefined attributes use default/default
|
||
|
# TODO: track and report these
|
||
|
return self._attrspec_to_escape(
|
||
|
AttrSpec('default','default'))
|
||
|
|
||
|
def using_standout_or_underline(a):
|
||
|
a = self._pal_attrspec.get(a, a)
|
||
|
return isinstance(a, AttrSpec) and (a.standout or a.underline)
|
||
|
|
||
|
ins = None
|
||
|
o.append(set_cursor_home())
|
||
|
cy = 0
|
||
|
for row in r.content():
|
||
|
y += 1
|
||
|
if osb and y < len(osb) and osb[y] == row:
|
||
|
# this row of the screen buffer matches what is
|
||
|
# currently displayed, so we can skip this line
|
||
|
sb.append( osb[y] )
|
||
|
continue
|
||
|
|
||
|
sb.append(row)
|
||
|
|
||
|
# leave blank lines off display when we are using
|
||
|
# the default screen buffer (allows partial screen)
|
||
|
if partial_display() and y > self._rows_used:
|
||
|
if is_blank_row(row):
|
||
|
continue
|
||
|
self._rows_used = y
|
||
|
|
||
|
if y or partial_display():
|
||
|
o.append(set_cursor_position(0, y))
|
||
|
# after updating the line we will be just over the
|
||
|
# edge, but terminals still treat this as being
|
||
|
# on the same line
|
||
|
cy = y
|
||
|
|
||
|
whitespace_at_end = False
|
||
|
if row:
|
||
|
a, cs, run = row[-1]
|
||
|
if (run[-1:] == b' ' and self.back_color_erase
|
||
|
and not using_standout_or_underline(a)):
|
||
|
whitespace_at_end = True
|
||
|
row = row[:-1] + [(a, cs, run.rstrip(b' '))]
|
||
|
elif y == maxrow-1 and maxcol > 1:
|
||
|
row, back, ins = self._last_row(row)
|
||
|
|
||
|
first = True
|
||
|
lasta = lastcs = None
|
||
|
for (a,cs, run) in row:
|
||
|
assert isinstance(run, bytes) # canvases should render with bytes
|
||
|
if cs != 'U':
|
||
|
run = run.translate(UNPRINTABLE_TRANS_TABLE)
|
||
|
if first or lasta != a:
|
||
|
o.append(attr_to_escape(a))
|
||
|
lasta = a
|
||
|
if first or lastcs != cs:
|
||
|
assert cs in [None, "0", "U"], repr(cs)
|
||
|
if lastcs == "U":
|
||
|
o.append( escape.IBMPC_OFF )
|
||
|
|
||
|
if cs is None:
|
||
|
o.append( escape.SI )
|
||
|
elif cs == "U":
|
||
|
o.append( escape.IBMPC_ON )
|
||
|
else:
|
||
|
o.append( escape.SO )
|
||
|
lastcs = cs
|
||
|
o.append( run )
|
||
|
first = False
|
||
|
if ins:
|
||
|
(inserta, insertcs, inserttext) = ins
|
||
|
ias = attr_to_escape(inserta)
|
||
|
assert insertcs in [None, "0", "U"], repr(insertcs)
|
||
|
if cs is None:
|
||
|
icss = escape.SI
|
||
|
elif cs == "U":
|
||
|
icss = escape.IBMPC_ON
|
||
|
else:
|
||
|
icss = escape.SO
|
||
|
o += ["\x08" * back,
|
||
|
ias, icss,
|
||
|
escape.INSERT_ON, inserttext,
|
||
|
escape.INSERT_OFF ]
|
||
|
|
||
|
if cs == "U":
|
||
|
o.append(escape.IBMPC_OFF)
|
||
|
if whitespace_at_end:
|
||
|
o.append(escape.ERASE_IN_LINE_RIGHT)
|
||
|
|
||
|
if r.cursor is not None:
|
||
|
x,y = r.cursor
|
||
|
o += [set_cursor_position(x, y),
|
||
|
escape.SHOW_CURSOR ]
|
||
|
self._cy = y
|
||
|
|
||
|
if self._resized:
|
||
|
# handle resize before trying to draw screen
|
||
|
return
|
||
|
try:
|
||
|
for l in o:
|
||
|
if isinstance(l, bytes):
|
||
|
l = l.decode('utf-8', 'replace')
|
||
|
self.write(l)
|
||
|
self.flush()
|
||
|
except OSError as e:
|
||
|
# ignore interrupted syscall
|
||
|
if e.args[0] != 4:
|
||
|
raise
|
||
|
|
||
|
self.screen_buf = sb
|
||
|
self._screen_buf_canvas = r
|
||
|
|
||
|
|
||
|
def _last_row(self, row):
|
||
|
"""On the last row we need to slide the bottom right character
|
||
|
into place. Calculate the new line, attr and an insert sequence
|
||
|
to do that.
|
||
|
|
||
|
eg. last row:
|
||
|
XXXXXXXXXXXXXXXXXXXXYZ
|
||
|
|
||
|
Y will be drawn after Z, shifting Z into position.
|
||
|
"""
|
||
|
|
||
|
new_row = row[:-1]
|
||
|
z_attr, z_cs, last_text = row[-1]
|
||
|
last_cols = util.calc_width(last_text, 0, len(last_text))
|
||
|
last_offs, z_col = util.calc_text_pos(last_text, 0,
|
||
|
len(last_text), last_cols-1)
|
||
|
if last_offs == 0:
|
||
|
z_text = last_text
|
||
|
del new_row[-1]
|
||
|
# we need another segment
|
||
|
y_attr, y_cs, nlast_text = row[-2]
|
||
|
nlast_cols = util.calc_width(nlast_text, 0,
|
||
|
len(nlast_text))
|
||
|
z_col += nlast_cols
|
||
|
nlast_offs, y_col = util.calc_text_pos(nlast_text, 0,
|
||
|
len(nlast_text), nlast_cols-1)
|
||
|
y_text = nlast_text[nlast_offs:]
|
||
|
if nlast_offs:
|
||
|
new_row.append((y_attr, y_cs,
|
||
|
nlast_text[:nlast_offs]))
|
||
|
else:
|
||
|
z_text = last_text[last_offs:]
|
||
|
y_attr, y_cs = z_attr, z_cs
|
||
|
nlast_cols = util.calc_width(last_text, 0,
|
||
|
last_offs)
|
||
|
nlast_offs, y_col = util.calc_text_pos(last_text, 0,
|
||
|
last_offs, nlast_cols-1)
|
||
|
y_text = last_text[nlast_offs:last_offs]
|
||
|
if nlast_offs:
|
||
|
new_row.append((y_attr, y_cs,
|
||
|
last_text[:nlast_offs]))
|
||
|
|
||
|
new_row.append((z_attr, z_cs, z_text))
|
||
|
return new_row, z_col-y_col, (y_attr, y_cs, y_text)
|
||
|
|
||
|
|
||
|
|
||
|
def clear(self):
|
||
|
"""
|
||
|
Force the screen to be completely repainted on the next
|
||
|
call to draw_screen().
|
||
|
"""
|
||
|
self.screen_buf = None
|
||
|
self.setup_G1 = True
|
||
|
|
||
|
|
||
|
def _attrspec_to_escape(self, a):
|
||
|
"""
|
||
|
Convert AttrSpec instance a to an escape sequence for the terminal
|
||
|
|
||
|
>>> s = Screen()
|
||
|
>>> s.set_terminal_properties(colors=256)
|
||
|
>>> a2e = s._attrspec_to_escape
|
||
|
>>> a2e(s.AttrSpec('brown', 'dark green'))
|
||
|
'\\x1b[0;33;42m'
|
||
|
>>> a2e(s.AttrSpec('#fea,underline', '#d0d'))
|
||
|
'\\x1b[0;38;5;229;4;48;5;164m'
|
||
|
"""
|
||
|
if self.term == 'fbterm':
|
||
|
fg = escape.ESC + '[1;%d}' % (a.foreground_number,)
|
||
|
bg = escape.ESC + '[2;%d}' % (a.background_number,)
|
||
|
return fg + bg
|
||
|
|
||
|
if a.foreground_true:
|
||
|
fg = "38;2;%d;%d;%d" %(a.get_rgb_values()[0:3])
|
||
|
elif a.foreground_high:
|
||
|
fg = "38;5;%d" % a.foreground_number
|
||
|
elif a.foreground_basic:
|
||
|
if a.foreground_number > 7:
|
||
|
if self.fg_bright_is_bold:
|
||
|
fg = "1;%d" % (a.foreground_number - 8 + 30)
|
||
|
else:
|
||
|
fg = "%d" % (a.foreground_number - 8 + 90)
|
||
|
else:
|
||
|
fg = "%d" % (a.foreground_number + 30)
|
||
|
else:
|
||
|
fg = "39"
|
||
|
st = ("1;" * a.bold + "3;" * a.italics +
|
||
|
"4;" * a.underline + "5;" * a.blink +
|
||
|
"7;" * a.standout + "9;" * a.strikethrough)
|
||
|
if a.background_true:
|
||
|
bg = "48;2;%d;%d;%d" %(a.get_rgb_values()[3:6])
|
||
|
elif a.background_high:
|
||
|
bg = "48;5;%d" % a.background_number
|
||
|
elif a.background_basic:
|
||
|
if a.background_number > 7:
|
||
|
if self.bg_bright_is_blink:
|
||
|
bg = "5;%d" % (a.background_number - 8 + 40)
|
||
|
else:
|
||
|
# this doesn't work on most terminals
|
||
|
bg = "%d" % (a.background_number - 8 + 100)
|
||
|
else:
|
||
|
bg = "%d" % (a.background_number + 40)
|
||
|
else:
|
||
|
bg = "49"
|
||
|
return f"{escape.ESC}[0;{fg};{st}{bg}m"
|
||
|
|
||
|
|
||
|
def set_terminal_properties(self, colors=None, bright_is_bold=None,
|
||
|
has_underline=None):
|
||
|
"""
|
||
|
colors -- number of colors terminal supports (1, 16, 88, 256, or 2**24)
|
||
|
or None to leave unchanged
|
||
|
bright_is_bold -- set to True if this terminal uses the bold
|
||
|
setting to create bright colors (numbers 8-15), set to False
|
||
|
if this Terminal can create bright colors without bold or
|
||
|
None to leave unchanged
|
||
|
has_underline -- set to True if this terminal can use the
|
||
|
underline setting, False if it cannot or None to leave
|
||
|
unchanged
|
||
|
"""
|
||
|
if colors is None:
|
||
|
colors = self.colors
|
||
|
if bright_is_bold is None:
|
||
|
bright_is_bold = self.fg_bright_is_bold
|
||
|
if has_underline is None:
|
||
|
has_underline = self.has_underline
|
||
|
|
||
|
if colors == self.colors and bright_is_bold == self.fg_bright_is_bold \
|
||
|
and has_underline == self.has_underline:
|
||
|
return
|
||
|
|
||
|
self.colors = colors
|
||
|
self.fg_bright_is_bold = bright_is_bold
|
||
|
self.has_underline = has_underline
|
||
|
|
||
|
self.clear()
|
||
|
self._pal_escape = {}
|
||
|
for p,v in self._palette.items():
|
||
|
self._on_update_palette_entry(p, *v)
|
||
|
|
||
|
|
||
|
|
||
|
def reset_default_terminal_palette(self):
|
||
|
"""
|
||
|
Attempt to set the terminal palette to default values as taken
|
||
|
from xterm. Uses number of colors from current
|
||
|
set_terminal_properties() screen setting.
|
||
|
"""
|
||
|
if self.colors == 1:
|
||
|
return
|
||
|
elif self.colors == 2**24:
|
||
|
colors = 256
|
||
|
else:
|
||
|
colors = self.colors
|
||
|
|
||
|
def rgb_values(n):
|
||
|
if colors == 16:
|
||
|
aspec = AttrSpec("h%d"%n, "", 256)
|
||
|
else:
|
||
|
aspec = AttrSpec("h%d"%n, "", colors)
|
||
|
return aspec.get_rgb_values()[:3]
|
||
|
|
||
|
entries = [(n,) + rgb_values(n) for n in range(min(colors, 256))]
|
||
|
self.modify_terminal_palette(entries)
|
||
|
|
||
|
|
||
|
def modify_terminal_palette(self, entries):
|
||
|
"""
|
||
|
entries - list of (index, red, green, blue) tuples.
|
||
|
|
||
|
Attempt to set part of the terminal palette (this does not work
|
||
|
on all terminals.) The changes are sent as a single escape
|
||
|
sequence so they should all take effect at the same time.
|
||
|
|
||
|
0 <= index < 256 (some terminals will only have 16 or 88 colors)
|
||
|
0 <= red, green, blue < 256
|
||
|
"""
|
||
|
|
||
|
if self.term == 'fbterm':
|
||
|
modify = ["%d;%d;%d;%d" % (index, red, green, blue)
|
||
|
for index, red, green, blue in entries]
|
||
|
self.write(f"\x1b[3;{';'.join(modify)}}}")
|
||
|
else:
|
||
|
modify = ["%d;rgb:%02x/%02x/%02x" % (index, red, green, blue)
|
||
|
for index, red, green, blue in entries]
|
||
|
self.write(f"\x1b]4;{';'.join(modify)}\x1b\\")
|
||
|
self.flush()
|
||
|
|
||
|
|
||
|
# shortcut for creating an AttrSpec with this screen object's
|
||
|
# number of colors
|
||
|
AttrSpec = lambda self, fg, bg: AttrSpec(fg, bg, self.colors)
|
||
|
|
||
|
|
||
|
def _test():
|
||
|
import doctest
|
||
|
doctest.testmod()
|
||
|
|
||
|
if __name__=='__main__':
|
||
|
_test()
|