Skip to content
Snippets Groups Projects
supervisorctl.py 6.62 KiB
Newer Older
  • Learn to ignore specific revisions
  • jfp's avatar
    jfp committed
    import cmd
    
    jfp's avatar
    jfp committed
    import json
    
    jfp's avatar
    jfp committed
    from typing import Any, Set, Tuple
    
    jfp's avatar
    jfp committed
    
    
    from ovms import cmbdef, dvidef, iodef, lnmdef, mbxqio, prvdef, psldef, starlet
    
    from ovms.rtl import lib
    
    jfp's avatar
    jfp committed
    
    
    supervisorctl_mbxreply = ''
    
    supervisord_table_name = b'SUPERVISORD_TABLE'
    
    jfp's avatar
    jfp committed
    
    
    jfp's avatar
    jfp committed
    def pstat2str(s: int) -> str:
        dstate = {
            0: 'STOPPED',
            10: 'STARTING',
            20: 'RUNNING',
            30: 'BACKOFF',
            40: 'STOPPING',
            100: 'EXITED',
            200: 'FATAL',
            1000: 'UNKNOWN',
        }
        return dstate[s]
    
    
    class Controller(cmd.Cmd):
        mcmd: mbxqio.MBXQIO
        mcmd_r: mbxqio.MBXQIO
    
        def __init__(
            self,
            mcmd: mbxqio.MBXQIO,
            mcmd_r: mbxqio.MBXQIO,
            completekey='tab',
            stdin=None,
            stdout=None,
        ):
            self.mcmd = mcmd
            self.mcmd_r = mcmd_r
            cmd.Cmd.__init__(self, completekey, stdin, stdout)
    
    
        def write_cmd(self, cmd: dict) -> Any:
    
            global supervisorctl_pwd, supervisorctl_mbxreply
    
            cmd['pwd'] = supervisorctl_pwd
    
            cmd['mbxreply'] = supervisorctl_mbxreply
    
            jscmd = json.dumps(cmd)
            self.mcmd.write(jscmd.encode('ascii'), iodef.IO_M_READERCHECK)
            return json.loads(
    
                # self.mcmd_r.read(4096, iodef.IO_M_WRITERCHECK)  # type: ignore
                self.mcmd_r.read(4096)  # type: ignore
    
    jfp's avatar
    jfp committed
        def do_EOF(self, line):
            """To quit, type ^D or use the quit command"""
            return True
    
        def do_quit(self, line):
            """To quit, type ^D or use the quit command"""
            return True
    
        def do_shutdown(self, arg):
            """Shutdown supervisord, don't stop programs"""
            lst = arg.split()
            if len(lst) != 0:
                print('*** invalid number of arguments')
                return
    
            res = self.write_cmd({'cmd': 'shutdown'})
            print(res)
    
    jfp's avatar
    jfp committed
    
        def do_status(self, arg):
            """Display status of programs"""
    
    jfp's avatar
    jfp committed
            lst = arg.split()
    
            for name in lst:
                name = name.upper()
                if name not in programs_name:
                    print(f'Invalid program name {name}')
                    return
    
            jscmd = self.write_cmd({'cmd': 'status', 'programs': lst})
            if 'error' in jscmd:
                print(jscmd['error'])
                return
            for pinfo in jscmd['result']:
    
    jfp's avatar
    jfp committed
                pinfo[3] = pstat2str(pinfo[3])
                print(pinfo)
    
    
            lst = arg.split()
            if len(lst) < 1:
                print('*** invalid number of arguments')
                return
    
    jfp's avatar
    jfp committed
            if lst != [
                'ALL',
            ]:
    
                for name in lst:
                    if name not in programs_name:
                        print(f'Invalid program name {name}')
                        return
    
            res = self.write_cmd({'cmd': 'start', 'programs': lst})
            print(res)
    
        def do_stop(self, arg):
    
            lst = arg.split()
            if len(lst) < 1:
                print('*** invalid number of arguments')
                return
    
    jfp's avatar
    jfp committed
            if lst != [
                'ALL',
            ]:
    
                for name in lst:
                    if name not in programs_name:
                        print(f'Invalid program name {name}')
                        return
    
            res = self.write_cmd({'cmd': 'stop', 'programs': lst})
            print(res)
    
    
    jfp's avatar
    jfp committed
        def help_status(self):
            print('status <name>\t\tGet status for a single process')
            print('status <gname>:*\tGet status for all ' 'processes in a group')
            print(
                'status <name> <name>\tGet status for multiple named ' 'processes'
            )
            print('status\t\t\tGet all process status info')
    
        def help_start(self):
            print('start <name>\t\tStart a process')
            print('start <gname>:*\t\tStart all processes in a group')
            print('start <name> <name>\tStart multiple processes or groups')
            print('start all\t\tStart all processes')
    
        def help_stop(self):
            print('stop <name>\t\tStop a process')
            print('stop <gname>:*\t\tStop all processes in a group')
            print('stop <name> <name>\tStop multiple processes or groups')
            print('stop all\t\tStop all processes')
    
    
    
    def logicals_init():
        global supervisord_table_name
        starlet.crelnt(
            attr=lnmdef.LNM_M_CREATE_IF,
            promsk=0xE000,
            tabnam=supervisord_table_name,
            partab=b'LNM$SYSTEM_DIRECTORY',
            acmode=psldef.PSL_C_SUPER,
        )
        lib.set_logical(
            b'LNM$PERMANENT_MAILBOX',
            supervisord_table_name,
            b'LNM$PROCESS_DIRECTORY',
        )
    
    
    
    def mbx_init() -> Tuple[int, int, str]:
    
    jfp's avatar
    jfp committed
        s, chancmd = starlet.crembx(
            prmflg=1,
            lognam='SUPERVISORD_CMD',
            maxmsg=2048,
            bufquo=8192,
            promsk=0x0000FF00,
            acmode=psldef.PSL_C_USER,
            flags=cmbdef.CMB_M_WRITEONLY,
        )
        s, chancmd_r = starlet.crembx(
    
            prmflg=0,
            # lognam='SUPERVISORD_CMD_REPLY',
    
    jfp's avatar
    jfp committed
            maxmsg=32 * 1024,
            bufquo=64 * 1024,
            promsk=0x0000FF00,
            acmode=psldef.PSL_C_USER,
            flags=cmbdef.CMB_M_READONLY,
        )
    
        mbxunit = lib.getdvi(dvidef.DVI__UNIT, chancmd_r)[1]
    
    jfp's avatar
    jfp committed
    
    
        return chancmd, chancmd_r, f'MBA{mbxunit}:'
    
    jfp's avatar
    jfp committed
    
    
    def main():
    
        global supervisorctl_pwd, supervisorctl_mbxreply, programs_name
    
        parser = argparse.ArgumentParser(description='supervisorctl')
        parser.add_argument(
            '-p',
            '--password',
            required=False,
    
            default='',
            help='password for supervisord',
        )
    
        parser.add_argument(
            '-c',
            '--configuration',
            type=argparse.FileType('r', encoding='latin1'),
            default='./supervisord.conf',
            help='Configuration file path (default ./supervisord.conf)',
        )
    
        args, unknownargs = parser.parse_known_args()
        supervisorctl_pwd = args.password
    
        config = configparser.ConfigParser()
        config.read_file(args.configuration)
        args.configuration.close()
    
        for sn in config.sections():
            if sn.startswith('program:'):
                name = sn.split(':')[-1].upper()
                programs_name.add(name)
    
    
    jfp's avatar
    jfp committed
        # Required privileges: PRMMBX, SYSNAM
    
        starlet.setprv(1, prvdef.PRV_M_PRMMBX | prvdef.PRV_M_SYSNAM)
    
        chan, chan_r, supervisorctl_mbxreply = mbx_init()
    
    jfp's avatar
    jfp committed
        with (
            mbxqio.MBXQIO(channel=chan) as mcmd,
            mbxqio.MBXQIO(channel=chan_r) as mcmd_r,
        ):
            c = Controller(mcmd, mcmd_r)
    
            if len(unknownargs) > 0:
                c.onecmd(' '.join(unknownargs))
    
    jfp's avatar
    jfp committed
            else:
                c.cmdloop()
    
    
    if __name__ == '__main__':
        main()