diff --git a/python/local/ovms_module/ovms/ptd/run.py b/python/local/ovms_module/ovms/ptd/run.py new file mode 100644 index 0000000000000000000000000000000000000000..4a21e2cca2bf60611e179cd9e45ff3f89f69a367_cHl0aG9uL2xvY2FsL292bXNfbW9kdWxlL292bXMvcHRkL3J1bi5weQ== --- /dev/null +++ b/python/local/ovms_module/ovms/ptd/run.py @@ -0,0 +1,151 @@ +# -*- coding: utf-8 -*- +# +# This code shoud run on Python 3.10 +# def foo(x, *, y): +import argparse + +# import configparser +import logging +import logging.handlers +import queue +import threading +import time +from functools import partial +from typing import Callable, List, Tuple + +from ovms import dvidef, ssdef +from ovms.ptd import Ptd +from ovms.rtl import lib + +logger: logging.Logger +# get a logger specific for this module +logger = logging.getLogger(__name__) + +print_if_verbose: Callable + + +def get_page_size() -> int: + try: + return lib.getdvi(dvidef.DVI__TT_PAGE, device_name="sys$input")[1] + except OSError: + return 24 + + +def print_internal(verbose: bool, *args, **kwargs): + if verbose: + print(*args, **kwargs, end="", flush=True) + + +def ptd_reader( + ptd: Ptd, + queue: queue.Queue[str], + exit_reader: threading.Event, + exit_reader_done: threading.Event, +): + global print_if_verbose + exit_reader_done.clear() + r: str | None = None + while True: + try: + r = ptd.read() + except OSError as e: + if e.errno != ssdef.SS__CANCEL: + raise + r = None + if r is not None: + print_if_verbose(r) + queue.put(r) + if exit_reader.is_set(): + exit_reader_done.set() + return + + +def wait_for_value( + qread: queue.Queue[str], + value: str, + ignore_case: bool = False, +) -> str: + while True: + if value in (r := (qread.get().upper() if ignore_case else qread.get())): + return r + + +def empty_queue(qread: queue.Queue[str]): + while True: + try: + qread.get_nowait() + except queue.Empty: + break + + +def run(ptd_fct: Callable) -> List[Tuple[str, str]]: + ptd = Ptd() + qread: queue.Queue[str] = queue.Queue() + # Use to end reader thread + exit_reader = threading.Event() + exit_reader_done = threading.Event() + tptd = threading.Thread( + target=ptd_reader, args=(ptd, qread, exit_reader, exit_reader_done) + ) + tptd.start() + wait_for_value(qread, ptd.prompt) # wait for initial prompt + print_if_verbose(ptd.writeln("set process/token=ext", True)) + wait_for_value(qread, ptd.prompt) + print_if_verbose( + ptd.writeln(f"set terminal/dev=vt100/page={get_page_size()}", True) + ) + wait_for_value(qread, ptd.prompt) + res = ptd_fct(ptd, qread) + exit_reader.set() + while not exit_reader_done.is_set(): + ptd.cancel() + time.sleep(0.1) + empty_queue(qread) + tptd.join() + return res + + +def main(): + def run_cmd( + cmd: str, + ptd: Ptd, + qread: queue.Queue[str], + ) -> str: + print_if_verbose(ptd.writeln(cmd)) + wait_for_value(qread, ptd.prompt) # wait for prompt + return f"{cmd} executed" + + global print_if_verbose + args: argparse.Namespace + logging_format = "%(asctime)s - %(name)s - %(levelname)s - %(message)s" + logging.basicConfig(filename="/dev/null", level=logging.INFO, format=logging_format) + main_log = logging.getLogger() # root handler + main_log.setLevel(logging.INFO) + parser = argparse.ArgumentParser(description="Ptd demo") + parser.add_argument( + "-c", + "--command", + type=str, + dest="command", + help="Command to execute", + default="show sys/noproc", + ) + parser.add_argument( + "-v", + "--verbose", + help="increase output verbosity", + dest="verbose", + action="store_true", + ) + + args = parser.parse_args() + + print_if_verbose = partial(print_internal, args.verbose) + ptd_fct = partial(run_cmd, args.command) + + r = run(ptd_fct) + print(r) + + +if __name__ == "__main__": + main()