Skip to content
Snippets Groups Projects
Commit b8c0658b3bf3 authored by jfp's avatar jfp
Browse files

Parameter itmlst for starlet and rtl.lib routines can be an ctype Array of iledef.ILE3

parent bcf87515e43c
No related branches found
No related tags found
No related merge requests found
......@@ -67,9 +67,9 @@
# Length of ILEA
ILEA_64_C_LENGTH = 24
from ctypes import Structure, c_ushort, c_void_p, c_char, POINTER, cast, sizeof, pointer
from ctypes import Structure, c_ushort, c_uint32, c_void_p, c_char, POINTER, cast, sizeof, pointer
class ILE3(Structure):
_fields_ = [("ile3_w_length", c_ushort), # Length of buffer in bytes
("ile3_w_code", c_ushort), # Item code value
("ile3_ps_bufaddr", c_void_p), # Buffer address
......@@ -71,9 +71,9 @@
class ILE3(Structure):
_fields_ = [("ile3_w_length", c_ushort), # Length of buffer in bytes
("ile3_w_code", c_ushort), # Item code value
("ile3_ps_bufaddr", c_void_p), # Buffer address
("ile3_ps_retlen_addr", POINTER(c_ushort)) # Address of word
("ile3_ps_retlen_addr", POINTER(c_uint32)) # Address of word or longword
# for returned length
]
......
......@@ -3,7 +3,6 @@
Copyright (c) 2002, Jean-Francois Pieronne jf.pieronne@laposte.net
See the documentation for further information on copyrights,
or contact the author. All Rights Reserved.
"""
# vms Extension Series version number
# (note that subpackages have their own version number)
......@@ -16,4 +15,5 @@
or contact the author. All Rights Reserved.
"""
from ._itemList import *
__version__ = '1.0'
......@@ -19,2 +19,1 @@
__version__ = '1.0'
import ctypes
from typing import List, Tuple, NoReturn
import ovms.ast
from ovms.itemList import itemList
......@@ -404,7 +405,7 @@
value_string: str | bytes | None = None,
table: str | bytes | None = None,
attributes: int | None = None,
itmlst: Tuple[itemList, ...] | List[itemList] | None = None,
itmlst: Tuple[itemList, ...] | List[itemList] | ctypes.Array | None = None,
) -> int:
"""status = set_logical(logical_name [,value_string] [,table] \
[,attributes] [,itmlst])"""
......
......@@ -687,6 +687,9 @@
iattributes = attributes
iattributesPtr = &iattributes
if itmlst is not None:
if isinstance(itmlst, ctypes.Array):
ile3Ptr = <ILE3 *><int>ctypes.addressof(itmlst)
else:
ile3 = pyvms_itmlst2ile3(itmlst, 0)
ile3Ptr = <ILE3 *>PyByteArray_AsString(ile3)
s = lib_set_logical(&logicalNameD, valueStringPtr, tablePtr,
......
......@@ -3,6 +3,7 @@
from ovms import efndef
from ovms import iosbdef
import ovms.ast
import ctypes
class Xlksb(object):
b_valblk: bytes
......@@ -75,7 +76,7 @@
def audit_event(
efn: int = efndef.EFN_C_ENF,
flags: int = 0,
itmlst: Tuple[itemList, ...] | List[itemList] | None = None,
itmlst: Tuple[itemList, ...] | List[itemList] | ctypes.Array | None = None,
) -> Tuple[int, int, Dict[int, Any]]:
"""status, audsts, resdic = audit_event(efn=EFN_C_ENF, flags=0, itmlst=None)"""
...
......@@ -83,7 +84,7 @@
def audit_eventw(
efn: int = efndef.EFN_C_ENF,
flags: int = 0,
itmlst: Tuple[itemList, ...] | List[itemList] | None = None,
itmlst: Tuple[itemList, ...] | List[itemList] | ctypes.Array | None = None,
) -> Tuple[int, int, Dict[int, Any]]:
"""status, audsts, resdic = audit_eventw(efn=EFN_C_ENF, flags=0, itmlst=None)"""
...
......@@ -122,7 +123,7 @@
objtyp: int = 0,
objnam: bytes | str | None = None,
usrnam: bytes | str | None = None,
itmlst: Tuple[itemList, ...] | List[itemList] | None = None,
itmlst: Tuple[itemList, ...] | List[itemList] | ctypes.Array | None = None,
contxt: int | None = None,
clsnam: bytes | str | None = None,
objpro: bytes | str | None = None,
......@@ -137,10 +138,10 @@
prv: int,
altprv: int | None = None,
flags: int | None = None,
itmlst: Tuple[itemList, ...] | List[itemList] | None = None,
itmlst: Tuple[itemList, ...] | List[itemList] | ctypes.Array | None = None,
) -> Tuple[int, int, Dict[int, Any]]:
"""status, audsts, resdic = check_privilegew(efn, prv, altprv=None, \
flags=None, itmlst=None)"""
...
def chkpro(
......@@ -141,10 +142,10 @@
) -> Tuple[int, int, Dict[int, Any]]:
"""status, audsts, resdic = check_privilegew(efn, prv, altprv=None, \
flags=None, itmlst=None)"""
...
def chkpro(
itmlst: Tuple[itemList, ...] | List[itemList],
itmlst: Tuple[itemList, ...] | List[itemList] | ctypes.Array,
objpro: bytes | None = None,
usrpro: bytes | None = None,
) -> Tuple[int, Dict[int, Any]]:
......@@ -195,7 +196,7 @@
def create_user_profile(
usrnam: bytes | str,
itmlst: Tuple[itemList, ...] | List[itemList] | None = None,
itmlst: Tuple[itemList, ...] | List[itemList] | ctypes.Array | None = None,
flags: int = 0,
contxt: int | None = None,
) -> Tuple[int, bytes, dict, int | None]:
......@@ -208,7 +209,7 @@
tabnam: bytes | str,
lognam: bytes | str,
acmode: int | None = None,
itmlst: Tuple[itemList, ...] | List[itemList] | None = None,
itmlst: Tuple[itemList, ...] | List[itemList] | ctypes.Array | None = None,
) -> Tuple[int, Dict[int, int | bytes]]:
"""status, resdic = crelnm(attr, tabnam, lognam, acmode=None, itmlst=None)"""
...
......@@ -250,7 +251,7 @@
uic: int = 0,
mbxunt: int = 0,
stsflg: int = 0,
itmlst: Tuple[itemList, ...] | List[itemList] | None = None,
itmlst: Tuple[itemList, ...] | List[itemList] | ctypes.Array | None = None,
node: bytes | str | None = None,
home_rad: int = 0,
) -> Tuple[int, int, Dict[int, Any]]:
......@@ -328,7 +329,7 @@
def device_path_scan(
chan: int = 0,
devnam: bytes | str | None = None,
itmlst: Tuple[itemList, ...] | List[itemList] | None = None,
itmlst: Tuple[itemList, ...] | List[itemList] | ctypes.Array | None = None,
contxt: int = 0,
) -> Tuple[int, int, Dict[int, bytes | int]]:
"""status, contxt,resdic = device_path_scan(chan=0, devnam=None, \
......@@ -337,7 +338,7 @@
def device_scan(
search_devnam: bytes | str | None = None,
itmlst: Tuple[itemList, ...] | List[itemList] | None = None,
itmlst: Tuple[itemList, ...] | List[itemList] | ctypes.Array | None = None,
contxt: int = 0,
) -> Tuple[int, bytes, int, Dict[int, bytes | int]]:
"""status, devnam, contxt, resdic = device_scan(search_devnam=None, \
......@@ -524,7 +525,7 @@
objnam: bytes | str | None = None,
objhan: int | None = None,
flags: int = 0,
itmlst: Tuple[itemList, ...] | List[itemList] | None = None,
itmlst: Tuple[itemList, ...] | List[itemList] | ctypes.Array | None = None,
contxt: int | None = None,
acmode: Tuple[
bytes,
......@@ -590,5 +591,5 @@
efn: int = efndef.EFN_C_ENF,
pid: int | None = None,
prcnam: bytes | str | None = None,
itmlst: Tuple[itemList, ...] | List[itemList] | None = None,
itmlst: Tuple[itemList, ...] | List[itemList] | ctypes.Array | None = None,
flags: int = 0,
......@@ -594,5 +595,5 @@
flags: int = 0,
) -> Tuple[int, int | None, Dict[int, Any] | None]:
) -> Tuple[int, int, Dict[int, Any]]:
"""status, pid, resdic = getjpiw(efn=EFN_C_ENF, pid=None, \
prcnam=None, itmlst=None, flags=0)
If flags is 1 then status code SS$_SUSPENDED is returned instead of been signaled"""
......@@ -601,7 +602,7 @@
def getlki(
efn: int = efndef.EFN_C_ENF,
lkid: int | None = None,
itmlst: Tuple[itemList, ...] | List[itemList] | None = None,
itmlst: Tuple[itemList, ...] | List[itemList] | ctypes.Array | None = None,
iosb: iosbdef.IOSB_r_get_64 | None = None,
astadr: int = 0,
astctx: ovms.ast.AstContext | None = None,
......@@ -613,7 +614,7 @@
def getlkiw(
efn: int = efndef.EFN_C_ENF,
lkid: int | None = None,
itmlst: Tuple[itemList, ...] | List[itemList] | None = None,
itmlst: Tuple[itemList, ...] | List[itemList] | ctypes.Array | None = None,
) -> Tuple[int, int, Dict[int, int | bytes]]:
"""status, lkid, resdic = getlkiw(efn=EFN_C_ENF, lkid=None, itmlst=None)"""
...
......@@ -626,7 +627,7 @@
efn: int = efndef.EFN_C_ENF,
func: int | None = None,
contxt: int | None = None,
itmlst: Tuple[itemList, ...] | List[itemList] | None = None,
itmlst: Tuple[itemList, ...] | List[itemList] | ctypes.Array | None = None,
) -> Tuple[int, int | None, Dict[int, Any]]:
"""status, contxt,resdic = getquiw(efn=EFN_C_ENF, func=None, \
contxt=None, itmlst=None)"""
......@@ -636,7 +637,7 @@
efn: int = efndef.EFN_C_ENF,
nullarg1: None = None,
nullarg2: None = None,
itmlst: Tuple[itemList, ...] | List[itemList] | None = None,
itmlst: Tuple[itemList, ...] | List[itemList] | ctypes.Array | None = None,
) -> Tuple[int, Dict[int, Any]]:
"""status, resdic = getrmiw(efn=EFN_C_ENF, nullarg1=None, \
nullarg2=None, itmlst=None)"""
......@@ -646,7 +647,7 @@
efn: int = efndef.EFN_C_ENF,
csid: int | None = None,
nodename: bytes | str | None = None,
itmlst: Tuple[itemList, ...] | List[itemList] | None = None,
itmlst: Tuple[itemList, ...] | List[itemList] | ctypes.Array | None = None,
) -> Tuple[int, int | None, Dict[int, Any]]:
"""status, csid, resdic = getsyiw(efn=EFN_C_ENF, csid=None, \
nodename=None, itmlst=None)"""
......@@ -660,7 +661,7 @@
efn: int = efndef.EFN_C_ENF,
contxt: int | None = None,
usrnam: bytes | str | None = None,
itmlst: Tuple[itemList, ...] | List[itemList] | None = None,
itmlst: Tuple[itemList, ...] | List[itemList] | ctypes.Array | None = None,
) -> Tuple[int, int | None, Dict[int, Any]]:
"""status,contxt, ,resdic = getuai(efn=EFN_C_ENF, contxt=None, \
usrnam=None, itmlst=None)"""
......@@ -741,7 +742,7 @@
def init_vol(
devnam: bytes | str,
volnam: bytes | str,
itmlst: Tuple[itemList, ...] | List[itemList] | None = None,
itmlst: Tuple[itemList, ...] | List[itemList] | ctypes.Array | None = None,
) -> int:
"""status = init_vol(devnam, volnam, itmlst=None)"""
...
......@@ -781,7 +782,7 @@
...
def mount(
itmlst: Tuple[itemList, ...] | List[itemList] | None = None
itmlst: Tuple[itemList, ...] | List[itemList] | ctypes.Array | None = None
) -> Tuple[int, Dict[int, Any]]:
"""status, resdic = mount(itmlst)"""
...
......@@ -863,7 +864,7 @@
usrnam: bytes | str | None = None,
flags: int = 0,
usrpro: bytes | None = None,
itmlst: Tuple[itemList, ...] | List[itemList] | None = None,
itmlst: Tuple[itemList, ...] | List[itemList] | ctypes.Array | None = None,
) -> Tuple[int, int]:
"""status, persona = persona_create(usrnam=None, flags=0, \
usrpro=None, itmlst=None)"""
......@@ -900,9 +901,9 @@
...
def persona_modify(
persona: int, itmlst: Tuple[itemList, ...] | List[itemList]
persona: int, itmlst: Tuple[itemList, ...] | ctypes.Array | List[itemList]
) -> int:
"""status = persona_modify(persona, itmlst)"""
...
def persona_query(
......@@ -904,9 +905,9 @@
) -> int:
"""status = persona_modify(persona, itmlst)"""
...
def persona_query(
persona: int, itmlst: Tuple[itemList, ...] | List[itemList]
persona: int, itmlst: Tuple[itemList, ...] | ctypes.Array | List[itemList]
) -> Tuple[int, Dict[int, Any]]:
"""status, resdic = persona_query(persona, itmlst)"""
...
......@@ -938,7 +939,7 @@
...
def process_scan(
pidctx: int, itmlst: Tuple[itemList, ...] | List[itemList] | None = None
pidctx: int, itmlst: Tuple[itemList, ...] | List[itemList] | ctypes.Array | None = None
) -> Tuple[int, int, Dict[int, Any]]:
"""status, pidctx, resdic = process_scan(pidctx, itmlst=None)"""
...
......@@ -952,7 +953,7 @@
...
def registryw(
efn: int, itmlst: Tuple[itemList, ...] | List[itemList], timeout: int = 0
efn: int, itmlst: Tuple[itemList, ...] | List[itemList] | ctypes.Array, timeout: int = 0
) -> Tuple[int, Dict[int, Any]]:
"""status, resdic = registryw(efn, func, itmlst, timeout=0)"""
...
......@@ -1021,7 +1022,7 @@
efn: int = efndef.EFN_C_ENF,
chan: int = 0,
devnam: bytes | str | None = None,
itmlst: Tuple[itemList, ...] | List[itemList] | None = None,
itmlst: Tuple[itemList, ...] | List[itemList] | ctypes.Array | None = None,
nullarg: None = None,
) -> Tuple[int, Dict[int, Any]]:
"""status, resdic = set_devicew(efn=EFN_C_ENF, chan=0, devnam=None, \
......@@ -1066,7 +1067,7 @@
objnam: bytes | str | None = None,
objhan: int | None = None,
flags: int = 0,
itmlst: Tuple[itemList, ...] | List[itemList] | None = None,
itmlst: Tuple[itemList, ...] | List[itemList] | ctypes.Array | None = None,
contxt: int | None = None,
acmode: int | None = None,
) -> Tuple[int, int | None, Dict[int, Any]]:
......@@ -1123,7 +1124,7 @@
efn: int = efndef.EFN_C_ENF,
contxt: int | None = None,
usrnam: bytes | str | None = None,
itmlst: Tuple[itemList, ...] | List[itemList] | None = None,
itmlst: Tuple[itemList, ...] | List[itemList] | ctypes.Array | None = None,
) -> Tuple[int, int | None, Dict[int, Any]]:
"""status, contxt, resdic = setuai(efn=EFN_C_ENF, contxt=None, \
usrnam=None, itmlst=None)"""
......@@ -1141,7 +1142,7 @@
efn: int = efndef.EFN_C_ENF,
func: int = 0,
nularg: int = 0,
itmlst: Tuple[itemList, ...] | List[itemList] | None = None,
itmlst: Tuple[itemList, ...] | List[itemList] | ctypes.Array | None = None,
) -> Tuple[int, Dict[int, Any]]:
"""status, resdic = sndjbcw(efn=EFN_C_ENF, func=0, nullarg=0, itmlst=0)"""
...
......@@ -1179,7 +1180,7 @@
tabnam: bytes | str,
lognam: bytes | str,
accmode: int | None = None,
itmlst: Tuple[itemList, ...] | List[itemList] | None = None,
itmlst: Tuple[itemList, ...] | List[itemList] | ctypes.Array | None = None,
) -> Tuple[int, Dict[int, bytes]]:
"""status, resdic = trnlnm(attr, tabnam, lognam, acmode=None, itmlst=None)"""
...
......
This diff is collapsed.
......@@ -409,7 +409,7 @@
assert s == 1 and isinstance(usrpro, bytes) and d == {} and ctxt is None
def test_check_access():
def test_check_access_a():
privnam = {
0: '',
chpdef.CHP_M_SYSPRV: 'SYSPRV',
......@@ -432,7 +432,8 @@
dtype=itemList.il_unsignedLong,
),
itemList.itemList(
chpdef.CHP__PRIVUSED, dtype=itemList.il_unsignedLong
chpdef.CHP__PRIVUSED,
dtype=itemList.il_unsignedLong,
),
itemList.itemList(chpdef.CHP__MATCHEDACE),
)
......@@ -461,7 +462,8 @@
dtype=itemList.il_unsignedLong,
),
itemList.itemList(
chpdef.CHP__PRIVUSED, dtype=itemList.il_unsignedLong
chpdef.CHP__PRIVUSED,
dtype=itemList.il_unsignedLong,
),
itemList.itemList(chpdef.CHP__MATCHEDACE),
)
......@@ -480,7 +482,7 @@
)
def test_ile3():
def test_ile3_a():
from ctypes import (
c_ushort,
create_string_buffer,
......@@ -984,3 +986,147 @@
flags=lckdef.LCK_M_VALBLK | lckdef.LCK_M_XVALBLK,
)
assert s == 1 and len(valblk) == 64 and valblk == b'\x00' * 64
def test_ile3_b():
from ovms.starlet import getjpiw
from ovms import efndef
from ovms.iledef import ILE3, set_ile3_item, get_ile3_string_item
from ovms import jpidef
import ctypes
from ovms.descrip import bydesc
VMSSYS = '/SYS$LIBRARY/SYS$PUBLIC_VECTORS.EXE'
SYS = ctypes.CDLL(VMSSYS)
vms_getjpiw = getattr(SYS, 'SYS$GETJPIW')
nodename = ctypes.create_string_buffer(32 + 1)
pid = ctypes.c_uint32()
retlen = ctypes.c_ushort()
it = (ILE3 * 3)()
set_ile3_item(
it[0],
code=jpidef.JPI__NODENAME,
buffer=nodename,
retlen=retlen,
)
set_ile3_item(
it[1],
code=jpidef.JPI__PID,
length=4,
buffer=ctypes.pointer(pid),
)
set_ile3_item(it[2])
r = vms_getjpiw(
ctypes.c_uint(efndef.EFN_C_ENF),
None,
bydesc(b'SWAPPER'),
it,
None,
None,
None,
)
r1 = (nodename.raw[: retlen.value], pid.value)
retlen.value = 0
nodename.value = b'\0'
pid.value = 0
r = getjpiw(prcnam=b'SWAPPER', itmlst=it)
r2 = (get_ile3_string_item(it[0]).value, pid.value)
r = getjpiw(
prcnam=b'SWAPPER',
itmlst=[
itemList.itemList(code=jpidef.JPI__NODENAME),
itemList.itemList(
code=jpidef.JPI__PID,
dtype=itemList.il_unsignedLong,
),
],
)[2]
r3 = (r[jpidef.JPI__NODENAME], r[jpidef.JPI__PID])
assert r1 == r2
assert r2 == r3
def test_check_access_ile3():
from ovms.iledef import ILE3, set_ile3_item, get_ile3_string_item
import ctypes
privnam = {
0: '',
chpdef.CHP_M_SYSPRV: 'SYSPRV',
chpdef.CHP_M_BYPASS: 'BYPASS',
chpdef.CHP_M_UPGRADE: 'UPGRADE',
chpdef.CHP_M_DOWNGRADE: 'DOWNGRADE',
chpdef.CHP_M_GRPPRV: 'GRPPRV',
chpdef.CHP_M_READALL: 'READALL',
chpdef.CHP_M_OPER: 'OPER',
chpdef.CHP_M_GRPNAM: 'GRPNAM',
chpdef.CHP_M_SYSNAM: 'SYSNAM',
chpdef.CHP_M_GROUP: 'GROUP',
chpdef.CHP_M_WORLD: 'WORLD',
chpdef.CHP_M_PRMCEB: 'PRMCEB',
}
armread = ctypes.c_uint32(armdef.ARM_M_READ)
privused = ctypes.c_uint32()
matchace = ctypes.create_string_buffer(1024 + 1)
retlen = ctypes.c_ushort()
it = (ILE3 * 4)()
set_ile3_item(
it[0],
length=4,
code=chpdef.CHP__ACCESS,
buffer=ctypes.pointer(armread),
)
set_ile3_item(
it[1],
length=4,
code=chpdef.CHP__PRIVUSED,
buffer=ctypes.pointer(privused),
)
set_ile3_item(
it[2],
code=chpdef.CHP__MATCHEDACE,
buffer=matchace,
retlen=retlen,
)
set_ile3_item(it[3])
contxt = 0xFFFFFFFF
s, contxt, d = starlet.check_access(
objtyp=acldef.ACL_C_FILE,
objnam='SYS$SYSTEM:SYSUAF.DAT',
usrnam='SYSTEM',
itmlst=it,
contxt=contxt,
)
assert (
s == 1
and isinstance(contxt, int)
and (privused.value == chpdef.CHP_M_GRPPRV)
and retlen.value == 0
)
armdelete = ctypes.c_uint32(armdef.ARM_M_DELETE)
set_ile3_item(
it[0],
length=4,
code=chpdef.CHP__ACCESS,
buffer=ctypes.pointer(armdelete),
)
s, contxt, d = starlet.check_access(
objnam='SYS$SYSDEVICE:[VMS$COMMON]SYSEXE.DIR',
usrnam='SYSTEM',
itmlst=it,
contxt=contxt,
clsnam=b'FILE',
)
assert (
s == 1
and isinstance(contxt, int)
and privnam[privused.value] == 'BYPASS'
and retlen.value == 0
)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment