Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
import cmd
import json
import sys
from typing import Tuple
from ovms import cmbdef, iodef, mbxqio, psldef, starlet
def pstat2str(s: int) -> str:
dstate = {
0: 'STOPPED',
10: 'STARTING',
20: 'RUNNING',
30: 'BACKOFF',
40: 'STOPPING',
100: 'EXITED',
200: 'FATAL',
1000: 'UNKNOWN',
}
return dstate[s]
class Controller(cmd.Cmd):
mcmd: mbxqio.MBXQIO
mcmd_r: mbxqio.MBXQIO
def __init__(
self,
mcmd: mbxqio.MBXQIO,
mcmd_r: mbxqio.MBXQIO,
completekey='tab',
stdin=None,
stdout=None,
):
self.mcmd = mcmd
self.mcmd_r = mcmd_r
cmd.Cmd.__init__(self, completekey, stdin, stdout)
def do_EOF(self, line):
"""To quit, type ^D or use the quit command"""
return True
def do_quit(self, line):
"""To quit, type ^D or use the quit command"""
return True
def do_shutdown(self, arg):
"""Shutdown supervisord, don't stop programs"""
lst = arg.split()
if len(lst) != 0:
print('*** invalid number of arguments')
return
jscmd = json.dumps({'cmd': 'shutdown'})
self.mcmd.write(jscmd.encode('ascii'), iodef.IO_M_READERCHECK)
def do_status(self, arg):
"""Display status of programs"""
lst = arg.split()
jscmd = json.dumps({'cmd': 'status', 'programs': lst})
self.mcmd.write(jscmd.encode('ascii'), iodef.IO_M_READERCHECK)
res: bytes
res = self.mcmd_r.read(4096, iodef.IO_M_WRITERCHECK) # type: ignore
jscmd = json.loads(res)
for pinfo in jscmd['reply']:
pinfo[3] = pstat2str(pinfo[3])
print(pinfo)
def help_status(self):
print('status <name>\t\tGet status for a single process')
print('status <gname>:*\tGet status for all ' 'processes in a group')
print(
'status <name> <name>\tGet status for multiple named ' 'processes'
)
print('status\t\t\tGet all process status info')
def do_start(self, arg):
lst = arg.split()
if len(lst) < 1:
print('*** invalid number of arguments')
return
jscmd = json.dumps({'cmd': 'start', 'programs': lst})
self.mcmd.write(jscmd.encode('ascii'), iodef.IO_M_READERCHECK)
def help_start(self):
print('start <name>\t\tStart a process')
print('start <gname>:*\t\tStart all processes in a group')
print('start <name> <name>\tStart multiple processes or groups')
print('start all\t\tStart all processes')
def do_stop(self, arg):
lst = arg.split()
if len(lst) < 1:
print('*** invalid number of arguments')
return
jscmd = json.dumps({'cmd': 'stop', 'programs': lst})
self.mcmd.write(jscmd.encode('ascii'), iodef.IO_M_READERCHECK)
def help_stop(self):
print('stop <name>\t\tStop a process')
print('stop <gname>:*\t\tStop all processes in a group')
print('stop <name> <name>\tStop multiple processes or groups')
print('stop all\t\tStop all processes')
def mbx_init() -> Tuple[int, int]:
s, chancmd = starlet.crembx(
prmflg=1,
lognam='SUPERVISORD_CMD',
maxmsg=2048,
bufquo=8192,
promsk=0x0000FF00,
acmode=psldef.PSL_C_USER,
flags=cmbdef.CMB_M_WRITEONLY,
)
s, chancmd_r = starlet.crembx(
prmflg=1,
lognam='SUPERVISORD_CMD_REPLY',
maxmsg=32 * 1024,
bufquo=64 * 1024,
promsk=0x0000FF00,
acmode=psldef.PSL_C_USER,
flags=cmbdef.CMB_M_READONLY,
)
return chancmd, chancmd_r
def main():
chan, chan_r = mbx_init()
with (
mbxqio.MBXQIO(channel=chan) as mcmd,
mbxqio.MBXQIO(channel=chan_r) as mcmd_r,
):
c = Controller(mcmd, mcmd_r)
if len(sys.argv) > 1:
c.onecmd(' '.join(sys.argv[1:]))
else:
c.cmdloop()
if __name__ == '__main__':
main()