Skip to content
Snippets Groups Projects
Commit 4a21e2cca2bf authored by jfp's avatar jfp
Browse files

Add ptd.run module

parent 83854973a3a2
Branches
No related tags found
No related merge requests found
# -*- coding: utf-8 -*-
#
# This code shoud run on Python 3.10
# def foo(x, *, y):
import argparse
# import configparser
import logging
import logging.handlers
import queue
import threading
import time
from functools import partial
from typing import Callable, List, Tuple
from ovms import dvidef, ssdef
from ovms.ptd import Ptd
from ovms.rtl import lib
logger: logging.Logger
# get a logger specific for this module
logger = logging.getLogger(__name__)
print_if_verbose: Callable
def get_page_size() -> int:
try:
return lib.getdvi(dvidef.DVI__TT_PAGE, device_name="sys$input")[1]
except OSError:
return 24
def print_internal(verbose: bool, *args, **kwargs):
if verbose:
print(*args, **kwargs, end="", flush=True)
def ptd_reader(
ptd: Ptd,
queue: queue.Queue[str],
exit_reader: threading.Event,
exit_reader_done: threading.Event,
):
global print_if_verbose
exit_reader_done.clear()
r: str | None = None
while True:
try:
r = ptd.read()
except OSError as e:
if e.errno != ssdef.SS__CANCEL:
raise
r = None
if r is not None:
print_if_verbose(r)
queue.put(r)
if exit_reader.is_set():
exit_reader_done.set()
return
def wait_for_value(
qread: queue.Queue[str],
value: str,
ignore_case: bool = False,
) -> str:
while True:
if value in (r := (qread.get().upper() if ignore_case else qread.get())):
return r
def empty_queue(qread: queue.Queue[str]):
while True:
try:
qread.get_nowait()
except queue.Empty:
break
def run(ptd_fct: Callable) -> List[Tuple[str, str]]:
ptd = Ptd()
qread: queue.Queue[str] = queue.Queue()
# Use to end reader thread
exit_reader = threading.Event()
exit_reader_done = threading.Event()
tptd = threading.Thread(
target=ptd_reader, args=(ptd, qread, exit_reader, exit_reader_done)
)
tptd.start()
wait_for_value(qread, ptd.prompt) # wait for initial prompt
print_if_verbose(ptd.writeln("set process/token=ext", True))
wait_for_value(qread, ptd.prompt)
print_if_verbose(
ptd.writeln(f"set terminal/dev=vt100/page={get_page_size()}", True)
)
wait_for_value(qread, ptd.prompt)
res = ptd_fct(ptd, qread)
exit_reader.set()
while not exit_reader_done.is_set():
ptd.cancel()
time.sleep(0.1)
empty_queue(qread)
tptd.join()
return res
def main():
def run_cmd(
cmd: str,
ptd: Ptd,
qread: queue.Queue[str],
) -> str:
print_if_verbose(ptd.writeln(cmd))
wait_for_value(qread, ptd.prompt) # wait for prompt
return f"{cmd} executed"
global print_if_verbose
args: argparse.Namespace
logging_format = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
logging.basicConfig(filename="/dev/null", level=logging.INFO, format=logging_format)
main_log = logging.getLogger() # root handler
main_log.setLevel(logging.INFO)
parser = argparse.ArgumentParser(description="Ptd demo")
parser.add_argument(
"-c",
"--command",
type=str,
dest="command",
help="Command to execute",
default="show sys/noproc",
)
parser.add_argument(
"-v",
"--verbose",
help="increase output verbosity",
dest="verbose",
action="store_true",
)
args = parser.parse_args()
print_if_verbose = partial(print_internal, args.verbose)
ptd_fct = partial(run_cmd, args.command)
r = run(ptd_fct)
print(r)
if __name__ == "__main__":
main()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment