Skip to content
Snippets Groups Projects
Commit 2c5aeec1f5c6 authored by jfp's avatar jfp
Browse files

Add some examples

parent 559a54bb427a
Branches
No related tags found
No related merge requests found
import sys
import os
import time
from typing import Tuple
from collections.abc import MutableMapping
import logging
from ovms import fileqio
from ovms import starlet
from ovms import stsdef
from ovms import ast as vmsast
from ovms import iosbdef
import ctypes
logging.basicConfig(
format='%(asctime)s %(levelname)s:%(name)s: %(message)s',
level=logging.INFO,
datefmt='%H:%M:%S',
stream=sys.stderr,
)
logger = logging.getLogger('aio')
logging.getLogger('chardet.charsetprober').disabled = True
def copy(
fi: fileqio.FILEQIO,
fo: fileqio.FILEQIO,
fsz: int,
iosizeblk: int = 16,
maxconcio: int = 2,
) -> int:
currio: MutableMapping[
int,
Tuple[
bool, # ast from a qio read
vmsast.AstContext, # ast parameter
iosbdef.IOSB_r_io_64, # iosb from qio
ctypes.Array[ctypes.c_char], # buffer from qio read
int, # file vbn read, for write
],
]
logger.debug(f'copy {fi.filename} -> {fo.filename}')
iosizebytes = iosizeblk * 512
rdsz = fsz
rblkcur = 1
currio = [
None,
] * maxconcio # type: ignore
while rdsz > 0:
for i in range(maxconcio):
if currio[i] is None:
riosize = iosizebytes if rdsz > iosizeblk else rdsz
ast_param = vmsast.AstContext(vmsast.M_WAKE)
iosb_r, buff_r = fi.readblk_nowait(
rblkcur, riosize, ast_param
)
currio[i] = (True, ast_param, iosb_r, buff_r, rblkcur)
rblkcur += iosizeblk
rdsz -= riosize
elif currio[i][1].done:
w, astp, iosb, buff, vbn = currio[i]
if not stsdef.vms_status_success(iosb.iosb_w_status):
raise IOError(iosb.iosb_w_status)
if w:
ast_param = vmsast.AstContext()
iosb = fo.writeblk_nowait(buff.raw, vbn, ast_param)
currio[i] = (False, ast_param, iosb, None, None) # type: ignore
else:
riosize = iosizebytes if rdsz > iosizebytes else rdsz
ast_param = vmsast.AstContext(vmsast.M_WAKE)
iosb_r, buff_r = fi.readblk_nowait(
rblkcur, riosize, ast_param
)
currio[i] = (True, ast_param, iosb_r, buff_r, rblkcur)
rdsz -= riosize
break
else:
starlet.hiber()
while True:
for i in range(maxconcio):
if currio[i] and currio[i][0]:
w, astp, iosb, buff, vbn = currio[i]
ast_param = vmsast.AstContext()
iosb = fo.writeblk_nowait(buff.raw, vbn, ast_param)
currio[i] = ( # type: ignore
False,
ast_param,
iosb,
None,
None,
)
break
if currio[i] and currio[i][1].done == 0:
starlet.hiber()
break
else:
break
for i in range(maxconcio):
if currio[i]:
w, astp, iosb, buff, vbn = currio[i]
if not stsdef.vms_status_success(iosb.iosb_w_status):
raise IOError(iosb.iosb_w_status)
return 1
def main():
from ovms.rtl import lib
started_at = time.monotonic()
s, ctxt = lib.init_timer()
ifn = 'test_fileqio.in'
ofn = 'test_fileqio.out'
sz = os.path.getsize(ifn)
with open(ofn, 'wb') as out:
out.truncate(sz)
assert os.path.getsize(ofn) == sz
with fileqio.FILEQIO(ifn, 'r') as fi, fileqio.FILEQIO(ofn, 'w') as fo:
copy(fi, fo, sz, 64, 4)
lib.show_timer(ctxt)
duration = time.monotonic() - started_at
logger.info(f'total run time: {duration:.2f} seconds')
if __name__ == '__main__':
main()
import sys
import os
import time
import logging
from ovms import fileqio
from ovms import stsdef
from ovms import ast as vmsast
logging.basicConfig(
format='%(asctime)s %(levelname)s:%(name)s: %(message)s',
level=logging.INFO,
datefmt='%H:%M:%S',
stream=sys.stderr,
)
logger = logging.getLogger('aio')
logging.getLogger('chardet.charsetprober').disabled = True
def copy(
fi: fileqio.FILEQIO,
fo: fileqio.FILEQIO,
fsz: int,
iosizeblk: int = 16,
maxconcio: int = 2,
) -> int:
logger.debug(f'copy {fi.filename} -> {fo.filename}')
iosizebytes = iosizeblk * 512
rdsz = fsz
rblkcur = 1
nbqio = 0
for i in range(maxconcio):
riosize = iosizebytes if rdsz > iosizeblk else rdsz
ast_param = vmsast.AstContext(vmsast.M_WAKE | vmsast.M_QUEUE)
iosb, buff = fi.readblk_nowait(rblkcur, riosize, ast_param)
ast_param.param = (True, iosb, buff, rblkcur)
nbqio += 1
rblkcur += iosizeblk
rdsz -= riosize
if rdsz < 0:
break
while rdsz > 0:
ast_param: vmsast.AstContext
ast_param = vmsast.get_completed(True) # type: ignore
nbqio -= 1
iosb = ast_param.param[1] # type: ignore
if not stsdef.vms_status_success(iosb.iosb_w_status):
raise IOError(iosb.iosb_w_status)
if ast_param.param[0]: # type: ignore
# Read, start a write
buff, vbn = ast_param.param[2:] # type:ignore
ast_param = vmsast.AstContext(vmsast.M_WAKE | vmsast.M_QUEUE)
iosb = fo.writeblk_nowait(buff.raw, vbn, ast_param)
nbqio += 1
ast_param.param = (False, iosb)
else: # Write, start a read
riosize = iosizebytes if rdsz > iosizebytes else rdsz
ast_param = vmsast.AstContext(vmsast.M_WAKE | vmsast.M_QUEUE)
iosb, buff = fi.readblk_nowait(rblkcur, riosize, ast_param)
ast_param.param = (True, iosb, buff, rblkcur)
nbqio += 1
rblkcur += iosizeblk
rdsz -= riosize
while nbqio:
ast_param = vmsast.get_completed(True) # type: ignore
nbqio -= 1
iosb = ast_param.param[1] # type: ignore
if not stsdef.vms_status_success(iosb.iosb_w_status):
raise IOError(iosb.iosb_w_status)
if ast_param.param[0]: # type: ignore
# Read, start a write
buff, vbn = ast_param.param[2:] # type: ignore
ast_param = vmsast.AstContext(vmsast.M_WAKE | vmsast.M_QUEUE)
iosb = fo.writeblk_nowait(buff.raw, vbn, ast_param)
ast_param.param = (False, iosb)
nbqio += 1
return 1
def main():
from ovms.rtl import lib
started_at = time.monotonic()
s, ctxt = lib.init_timer()
ifn = 'test_fileqio.in'
ofn = 'test_fileqio.out'
sz = os.path.getsize(ifn)
with open(ofn, 'wb') as out:
out.truncate(sz)
assert os.path.getsize(ofn) == sz
with fileqio.FILEQIO(ifn, 'r') as fi, fileqio.FILEQIO(ofn, 'w') as fo:
copy(fi, fo, sz, 64, 4)
lib.show_timer(ctxt)
duration = time.monotonic() - started_at
logger.info(f'total run time: {duration:.2f} seconds')
if __name__ == '__main__':
main()
# import debugpy
# 5678 is the default attach port in the VS Code debug configurations.
# Unless a host and port are specified, host defaults to 127.0.0.1
# debugpy.configure(subProcess=False)
# debugpy.listen(('0.0.0.0', 5678), in_process_debug_adapter=True)
# print("Waiting for debugger attach")
# debugpy.wait_for_client()
# debugpy.breakpoint()
# print('break on this line')
import sys
import os
import time
import asyncio
import logging
from ovms import fileqio
logging.basicConfig(
format='%(asctime)s %(levelname)s:%(name)s: %(message)s',
level=logging.INFO,
datefmt='%H:%M:%S',
stream=sys.stderr,
)
logger = logging.getLogger('aio')
logging.getLogger('chardet.charsetprober').disabled = True
blkque: asyncio.Queue = asyncio.Queue(10)
async def write(f: fileqio.FILEQIO, n: int = 1) -> int:
logger.debug(f'write{n} {f.filename}')
# vbn = 1
wsz = 0
while True:
r, vbn = await blkque.get()
if r is None:
break
# logger.debug(f'write{n} get done len={len(r)}')
await f.awriteblk(r, vbn)
# logger.debug(f'write{n} write done vbn={vbn} len={len(r)}')
wsz += len(r)
# blkque.task_done()
await asyncio.sleep(0)
logger.debug(f'write{n} done {f.filename}')
return wsz
async def read(
fi: fileqio.FILEQIO,
fo: fileqio.FILEQIO,
fsz: int,
iosize: int = 1024,
n: int = 1,
start: int = 1,
first: bool = True,
) -> int:
logger.debug(f'read{start} {fi.filename}')
tw = asyncio.create_task(write(fo, start))
await asyncio.sleep(0)
maxblk = (fsz + 511) // 512
bsz: int = iosize // 512
vbn: int = 1 + bsz * (start - 1)
wsz: int = 0
logger.debug(f'read{start} maxblk={maxblk} bsz={bsz}')
while True:
# logger.debug(f'read{start} readblk start vbn={vbn} sz={iosize}')
r = await fi.areadblk(vbn, iosize)
# logger.debug(
# f'read{start} readblk done vbn={vbn} sz={iosize} len(r)={len(r)}'
# )
await blkque.put((r, vbn))
# logger.debug(f'read{start} queue put done')
vbn += bsz * n
wsz += len(r)
if vbn + bsz > maxblk:
break
await asyncio.sleep(0)
if first:
vbn -= bsz * n
iosize = (maxblk - vbn + 1) * 512
logger.debug(f'read{start} last readblk start vbn={vbn} sz={iosize}')
r = await fi.areadblk(vbn, iosize)
await blkque.put((r, vbn))
await blkque.put((None, None))
logger.debug(f'read{start} done {fi.filename}')
await tw
return fsz
async def main():
logger.debug('main')
ifn = 'test_fileqio.in'
ofn = 'test_fileqio.out'
sz = os.path.getsize(ifn)
with open(ofn, 'wb') as out:
out.truncate(sz)
assert os.path.getsize(ofn) == sz
async with fileqio.FILEQIO(ifn, 'r') as fi, fileqio.FILEQIO(
ofn, 'w'
) as fo:
ntasks = 3
first = True
tasks = []
for i in range(1, ntasks + 1):
tasks.append(
asyncio.create_task(read(fi, fo, sz, 512 * 64, ntasks, i, first))
)
first = False
# a = await asyncio.gather(*tasks)
for res in asyncio.as_completed(tasks):
await res
return True
if __name__ == '__main__':
from ovms.rtl import lib
started_at = time.monotonic()
s, ctxt = lib.init_timer()
a = asyncio.run(main())
lib.show_timer(ctxt)
duration = time.monotonic() - started_at
logger.info(f'total run time: {duration:.2f} seconds')
# assert a[0] == a[1]
import sys
import os
import time
import logging
import shutil
logging.basicConfig(
format='%(asctime)s %(levelname)s:%(name)s: %(message)s',
level=logging.INFO,
datefmt='%H:%M:%S',
stream=sys.stderr,
)
logger = logging.getLogger('aio')
logging.getLogger('chardet.charsetprober').disabled = True
def main():
from ovms.rtl import lib
started_at = time.monotonic()
s, ctxt = lib.init_timer()
ifn = 'test_fileqio.in'
ofn = 'test_fileqio.out'
sz = os.path.getsize(ifn)
with open(ofn, 'wb') as out:
out.truncate(sz)
assert os.path.getsize(ofn) == sz
shutil.copy(ifn, ofn)
lib.show_timer(ctxt)
duration = time.monotonic() - started_at
logger.info(f'total run time: {duration:.2f} seconds')
if __name__ == '__main__':
main()
\ No newline at end of file
from ovms import mbxqio
from ovms import starlet
from ovms import cmbdef
from ovms import iodef
import time
s, c = starlet.crembx(
lognam='MAILBOX_EXAMPLE',
maxmsg=64,
bufquo=128,
flags=cmbdef.CMB_M_READONLY,
)
with mbxqio.MBXQIO(channel=c) as f:
while True:
wchk = f.writercheck()
if not wchk:
f.writerwait()
r = f.read(64, iodef.IO_M_WRITERCHECK)
time.sleep(0.2)
if r is None: # EOF
break
print(f"Reader received: '{r.decode()}'")
starlet.delmbx(c)
from ovms import mbxqio
from ovms import starlet
from ovms import cmbdef
from ovms import ssdef
from ovms import iodef
from ovms import crtl
from ovms import stsdef
from ovms.rtl import lib
from ovms import iosbdef
from ovms import ast
iosb: iosbdef.IOSB_r_io_64 | None = None
s, chan = starlet.crembx(
lognam='MAILBOX_EXAMPLE',
maxmsg=64,
bufquo=128,
flags=cmbdef.CMB_M_WRITEONLY,
)
efn = lib.get_ef()[1]
msgs = [f'message {i:2}' for i in range(20)]
msgs.reverse()
astctxt = None
with mbxqio.MBXQIO(channel=chan) as f:
msg = msgs.pop() + ' ' * 1
EOF = False
while not EOF:
f.readerwait()
while True:
try:
# print('send', msg, len(msg))
f.write(
msg.encode(),
iodef.IO_M_READERCHECK
| iodef.IO_M_NOW
| iodef.IO_M_NORSWAIT,
)
msg = msgs.pop() + ' ' * 1
except IndexError:
EOF = True
break
except IOError as e:
if e.errno == ssdef.SS__NOREADER:
print('writer lost reader')
crtl.vms_exit(e.errno)
if e.errno == ssdef.SS__MBFULL:
if astctxt is not None:
starlet.waitfr(efn)
astctxt = None
assert iosb is not None
if not stsdef.vms_status_success(iosb.iosb_w_status):
raise IOError(
iosb.iosb_w_status, 'IOSB setmode error'
)
else:
ast_enabled = True
starlet.clref(efn)
astctxt = ast.AstContext(efn=efn)
iosb = f.setmode(
iodef.IO_M_MB_ROOM_NOTIFY, astctxt=astctxt
)
print('writer send EOF')
f.write_eof(iodef.IO_M_READERCHECK)
print('writer exit')
starlet.delmbx(chan)
# import debugpy
# 5678 is the default attach port in the VS Code debug configurations.
# Unless a host and port are specified, host defaults to 127.0.0.1
# debugpy.configure(subProcess=False)
# debugpy.listen(('0.0.0.0', 5678), in_process_debug_adapter=True)
# print("Waiting for debugger attach")
# debugpy.wait_for_client()
# debugpy.breakpoint()
# print('break on this line')
import sys
import os
import asyncio
import logging
from ovms import fileqio
logging.basicConfig(
format='%(asctime)s %(levelname)s:%(name)s: %(message)s',
level=logging.DEBUG,
datefmt='%H:%M:%S',
stream=sys.stderr,
)
logger = logging.getLogger('aio')
logging.getLogger('chardet.charsetprober').disabled = True
async def coro(seq) -> list:
for i in range(seq[0]):
logger.info(f'coro {seq}')
await asyncio.sleep(0)
return list(reversed(seq))
async def aio_qio(fn: str) -> bytes:
logger.info(f'aio_qior {fn}')
with open(fn, 'wb') as out:
out.truncate(1024 * 1024)
assert os.path.getsize(fn) == 1024 * 1024
async with fileqio.FILEQIO(fn, 'w') as f:
logger.info(f'aio_qior write {fn}')
await f.awriteblk(b'0123456789ABCDEF\nGH\n', 1)
logger.info(f'aio_qior read {fn}')
r = await f.areadblk(1, 20)
logger.info(f'aio_qior done {fn}')
return r
async def main():
logger.info('main')
t1 = asyncio.create_task(coro([6, 2, 1]))
t2 = asyncio.create_task(coro([8, 2, 1]))
t3 = asyncio.create_task(aio_qio('test_fileqio.tmp'))
a = await asyncio.gather(t1, t2, t3)
# for res in asyncio.as_completed((t1, t2)):
# compl = await res
print(f'All tasks done: {all((t1.done(), t2.done(), t3.done()))}')
print(f'type(t1): type {type(t1)}')
print(f't1.done(): {t1.done()}')
print(f't3.done(): {t3.done()}')
print(f't3.result(): {t3.result()}')
return a
if __name__ == '__main__':
a = asyncio.run(main())
print(a)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment