import argparse import cmd import configparser import json from typing import Any, Set, Tuple from ovms import ( cmbdef, crtl, dvidef, iodef, lnmdef, mbxqio, prvdef, psldef, starlet, ) from ovms.rtl import lib supervisorctl_pwd = '' supervisorctl_mbxreply = '' supervisord_table_name = b'SUPERVISORD_TABLE' programs_name: Set[str] = set() def pstat2str(s: int) -> str: dstate = { 0: 'STOPPED', 10: 'STARTING', 20: 'RUNNING', 30: 'BACKOFF', 40: 'STOPPING', 100: 'EXITED', 200: 'FATAL', 1000: 'UNKNOWN', } return dstate[s] class Controller(cmd.Cmd): mcmd: mbxqio.MBXQIO mcmd_r: mbxqio.MBXQIO def __init__( self, mcmd: mbxqio.MBXQIO, mcmd_r: mbxqio.MBXQIO, completekey='tab', stdin=None, stdout=None, ): self.mcmd = mcmd self.mcmd_r = mcmd_r cmd.Cmd.__init__(self, completekey, stdin, stdout) def write_cmd(self, cmd: dict) -> Any: global supervisorctl_pwd, supervisorctl_mbxreply cmd['pwd'] = supervisorctl_pwd cmd['mbxreply'] = supervisorctl_mbxreply jscmd = json.dumps(cmd) try: self.mcmd.write(jscmd.encode('ascii'), iodef.IO_M_READERCHECK) except OSError as e: crtl.vms_exit(e.errno) return json.loads( # self.mcmd_r.read(4096, iodef.IO_M_WRITERCHECK) # type: ignore self.mcmd_r.read(4096) # type: ignore ) def do_EOF(self, line): """To quit, type ^D or use the quit command""" return True def do_quit(self, line): """To quit, type ^D or use the quit command""" return True def do_shutdown(self, arg): """Shutdown supervisord, don't stop programs""" lst = arg.split() if len(lst) != 0: print('*** invalid number of arguments') return res = self.write_cmd({'cmd': 'shutdown'}) print(res) # return True def do_status(self, arg): """Display status of programs""" global programs_name lst = arg.split() for name in lst: name = name.upper() if name not in programs_name: print(f'Invalid program name {name}') return jscmd = self.write_cmd({'cmd': 'status', 'programs': lst}) if 'error' in jscmd: print(jscmd['error']) return for pinfo in jscmd['result']: pinfo[3] = pstat2str(pinfo[3]) print(pinfo) def do_start(self, arg): global programs_name lst = arg.split() if len(lst) < 1: print('*** invalid number of arguments') return lst = [name.upper() for name in lst] if lst != [ 'ALL', ]: for name in lst: if name not in programs_name: print(f'Invalid program name {name}') return res = self.write_cmd({'cmd': 'start', 'programs': lst}) print(res) def do_stop(self, arg): global programs_name lst = arg.split() if len(lst) < 1: print('*** invalid number of arguments') return lst = [name.upper() for name in lst] if lst != [ 'ALL', ]: for name in lst: if name not in programs_name: print(f'Invalid program name {name}') return res = self.write_cmd({'cmd': 'stop', 'programs': lst}) print(res) def help_status(self): print('status <name>\t\tGet status for a single process') print('status <gname>:*\tGet status for all ' 'processes in a group') print( 'status <name> <name>\tGet status for multiple named ' 'processes' ) print('status\t\t\tGet all process status info') def help_start(self): print('start <name>\t\tStart a process') print('start <gname>:*\t\tStart all processes in a group') print('start <name> <name>\tStart multiple processes or groups') print('start all\t\tStart all processes') def help_stop(self): print('stop <name>\t\tStop a process') print('stop <gname>:*\t\tStop all processes in a group') print('stop <name> <name>\tStop multiple processes or groups') print('stop all\t\tStop all processes') def logicals_init(): global supervisord_table_name starlet.crelnt( attr=lnmdef.LNM_M_CREATE_IF, promsk=0xE000, tabnam=supervisord_table_name, partab=b'LNM$SYSTEM_DIRECTORY', acmode=psldef.PSL_C_SUPER, ) lib.set_logical( b'LNM$PERMANENT_MAILBOX', supervisord_table_name, b'LNM$PROCESS_DIRECTORY', ) def mbx_init() -> Tuple[int, int, str]: s, chancmd = starlet.crembx( prmflg=1, lognam='SUPERVISORD_CMD', maxmsg=2048, bufquo=8192, promsk=0x0000FF00, acmode=psldef.PSL_C_USER, flags=cmbdef.CMB_M_WRITEONLY, ) s, chancmd_r = starlet.crembx( prmflg=0, # lognam='SUPERVISORD_CMD_REPLY', maxmsg=32 * 1024, bufquo=64 * 1024, promsk=0x0000FF00, acmode=psldef.PSL_C_USER, flags=cmbdef.CMB_M_READONLY, ) mbxunit = lib.getdvi(dvidef.DVI__UNIT, chancmd_r)[1] return chancmd, chancmd_r, f'MBA{mbxunit}:' def main(): global supervisorctl_pwd, supervisorctl_mbxreply, programs_name parser = argparse.ArgumentParser(description='supervisorctl') parser.add_argument( '-p', '--password', required=False, type=str, default='', help='password for supervisord', ) parser.add_argument( '-c', '--configuration', type=argparse.FileType('r', encoding='latin1'), default='./supervisord.conf', help='Configuration file path (default ./supervisord.conf)', ) args, unknownargs = parser.parse_known_args() supervisorctl_pwd = args.password config = configparser.ConfigParser() config.read_file(args.configuration) args.configuration.close() for sn in config.sections(): if sn.startswith('program:'): name = sn.split(':')[-1].upper() programs_name.add(name) # Required privileges: PRMMBX, SYSNAM starlet.setprv(1, prvdef.PRV_M_PRMMBX | prvdef.PRV_M_SYSNAM) logicals_init() chan, chan_r, supervisorctl_mbxreply = mbx_init() with ( mbxqio.MBXQIO(channel=chan) as mcmd, mbxqio.MBXQIO(channel=chan_r) as mcmd_r, ): c = Controller(mcmd, mcmd_r) if len(unknownargs) > 0: c.onecmd(' '.join(unknownargs)) else: c.cmdloop() if __name__ == '__main__': main()