Skip to content
Snippets Groups Projects
supervisorctl.py 3.97 KiB
Newer Older
  • Learn to ignore specific revisions
  • jfp's avatar
    jfp committed
    import cmd
    import json
    import sys
    from typing import Tuple
    
    from ovms import cmbdef, iodef, mbxqio, psldef, starlet
    
    
    def pstat2str(s: int) -> str:
        dstate = {
            0: 'STOPPED',
            10: 'STARTING',
            20: 'RUNNING',
            30: 'BACKOFF',
            40: 'STOPPING',
            100: 'EXITED',
            200: 'FATAL',
            1000: 'UNKNOWN',
        }
        return dstate[s]
    
    
    class Controller(cmd.Cmd):
        mcmd: mbxqio.MBXQIO
        mcmd_r: mbxqio.MBXQIO
    
        def __init__(
            self,
            mcmd: mbxqio.MBXQIO,
            mcmd_r: mbxqio.MBXQIO,
            completekey='tab',
            stdin=None,
            stdout=None,
        ):
            self.mcmd = mcmd
            self.mcmd_r = mcmd_r
            cmd.Cmd.__init__(self, completekey, stdin, stdout)
    
        def do_EOF(self, line):
            """To quit, type ^D or use the quit command"""
            return True
    
        def do_quit(self, line):
            """To quit, type ^D or use the quit command"""
            return True
    
        def do_shutdown(self, arg):
            """Shutdown supervisord, don't stop programs"""
            lst = arg.split()
            if len(lst) != 0:
                print('*** invalid number of arguments')
                return
            jscmd = json.dumps({'cmd': 'shutdown'})
            self.mcmd.write(jscmd.encode('ascii'), iodef.IO_M_READERCHECK)
    
        def do_status(self, arg):
            """Display status of programs"""
            lst = arg.split()
            jscmd = json.dumps({'cmd': 'status', 'programs': lst})
            self.mcmd.write(jscmd.encode('ascii'), iodef.IO_M_READERCHECK)
            res: bytes
            res = self.mcmd_r.read(4096, iodef.IO_M_WRITERCHECK)   # type: ignore
            jscmd = json.loads(res)
            for pinfo in jscmd['reply']:
                pinfo[3] = pstat2str(pinfo[3])
                print(pinfo)
    
        def help_status(self):
            print('status <name>\t\tGet status for a single process')
            print('status <gname>:*\tGet status for all ' 'processes in a group')
            print(
                'status <name> <name>\tGet status for multiple named ' 'processes'
            )
            print('status\t\t\tGet all process status info')
    
        def do_start(self, arg):
            lst = arg.split()
            if len(lst) < 1:
                print('*** invalid number of arguments')
                return
            jscmd = json.dumps({'cmd': 'start', 'programs': lst})
            self.mcmd.write(jscmd.encode('ascii'), iodef.IO_M_READERCHECK)
    
        def help_start(self):
            print('start <name>\t\tStart a process')
            print('start <gname>:*\t\tStart all processes in a group')
            print('start <name> <name>\tStart multiple processes or groups')
            print('start all\t\tStart all processes')
    
        def do_stop(self, arg):
            lst = arg.split()
            if len(lst) < 1:
                print('*** invalid number of arguments')
                return
            jscmd = json.dumps({'cmd': 'stop', 'programs': lst})
            self.mcmd.write(jscmd.encode('ascii'), iodef.IO_M_READERCHECK)
    
        def help_stop(self):
            print('stop <name>\t\tStop a process')
            print('stop <gname>:*\t\tStop all processes in a group')
            print('stop <name> <name>\tStop multiple processes or groups')
            print('stop all\t\tStop all processes')
    
    
    def mbx_init() -> Tuple[int, int]:
        s, chancmd = starlet.crembx(
            prmflg=1,
            lognam='SUPERVISORD_CMD',
            maxmsg=2048,
            bufquo=8192,
            promsk=0x0000FF00,
            acmode=psldef.PSL_C_USER,
            flags=cmbdef.CMB_M_WRITEONLY,
        )
        s, chancmd_r = starlet.crembx(
            prmflg=1,
            lognam='SUPERVISORD_CMD_REPLY',
            maxmsg=32 * 1024,
            bufquo=64 * 1024,
            promsk=0x0000FF00,
            acmode=psldef.PSL_C_USER,
            flags=cmbdef.CMB_M_READONLY,
        )
    
        return chancmd, chancmd_r
    
    
    def main():
        chan, chan_r = mbx_init()
        with (
            mbxqio.MBXQIO(channel=chan) as mcmd,
            mbxqio.MBXQIO(channel=chan_r) as mcmd_r,
        ):
            c = Controller(mcmd, mcmd_r)
            if len(sys.argv) > 1:
                c.onecmd(' '.join(sys.argv[1:]))
            else:
                c.cmdloop()
    
    
    if __name__ == '__main__':
        main()