# HG changeset patch # User Alexandre Yang <alexandre.yang@datadoghq.com> # Date 1592224642 0 # Mon Jun 15 12:37:22 2020 +0000 # Node ID 989215e3380e5ab5d19efbc571adb636a1b511c1 # Parent a7cfa3da02779b6b18aa3942e964238c8700288a GH #204 Add int64 (CFIN64), int64 list (CFIL64) and group (CFGR) support diff --git a/code/pymqi/__init__.py b/code/pymqi/__init__.py --- a/code/pymqi/__init__.py +++ b/code/pymqi/__init__.py @@ -204,6 +204,8 @@ else: MQLONG_TYPE = 'l' # 32 bit +INTEGER64_TYPE = 'q' + ####################################################################### # @@ -1229,6 +1231,22 @@ super(CFBS, self).__init__(tuple(opts), **kw) +class CFGR(MQOpts): + """ Construct an MQCFGR Structure with default values as per MQI. + The default values may be overridden by the optional keyword arguments 'kw'. + """ + + def __init__(self, **kw): + # types: (Dict[str, Any]) -> None + count = kw.pop('ParameterCount', 0) + + opts = [['Type', CMQCFC.MQCFT_GROUP, MQLONG_TYPE], + ['StrucLength', CMQCFC.MQCFGR_STRUC_LENGTH, MQLONG_TYPE], + ['Parameter', 0, MQLONG_TYPE], + ['ParameterCount', count, MQLONG_TYPE], + ] + super(CFGR, self).__init__(tuple(opts), **kw) + class CFIF(MQOpts): """ Construct an MQCFIF Structure with default values as per MQI. The default values may be overridden by the optional keyword arguments 'kw'. @@ -1262,6 +1280,23 @@ ] super(CFIL, self).__init__(tuple(opts), **kw) +class CFIL64(MQOpts): + """ Construct an MQCFIL64 Structure with default values as per MQI. + The default values may be overridden by the optional keyword arguments 'kw'. + """ + def __init__(self, **kw): + # types: (Dict[str, Any]) -> None + values = kw.pop('Values', []) + count = kw.pop('Count', len(values)) + + opts = [['Type', CMQCFC.MQCFT_INTEGER64_LIST, MQLONG_TYPE], + ['StrucLength', CMQCFC.MQCFIL64_STRUC_LENGTH_FIXED + 8 * count, MQLONG_TYPE], + ['Parameter', 0, MQLONG_TYPE], + ['Count', count, MQLONG_TYPE], + ['Values', values, INTEGER64_TYPE, count], + ] + super(CFIL64, self).__init__(tuple(opts), **kw) + class CFIN(MQOpts): """ Construct an MQCFIN Structure with default values as per MQI. The default values may be overridden by the optional keyword arguments 'kw'. @@ -1276,6 +1311,20 @@ ] super(CFIN, self).__init__(tuple(opts), **kw) +class CFIN64(MQOpts): + """ Construct an MQCFIN64 Structure with default values as per MQI. + The default values may be overridden by the optional keyword arguments 'kw'. + """ + def __init__(self, **kw): + # types: (Dict[str, Any]) -> None -> None + + opts = [['Type', CMQCFC.MQCFT_INTEGER64, MQLONG_TYPE], + ['StrucLength', CMQCFC.MQCFIN64_STRUC_LENGTH, MQLONG_TYPE], + ['Parameter', 0, MQLONG_TYPE], + ['Value', 0, INTEGER64_TYPE], + ] + super(CFIN64, self).__init__(tuple(opts), **kw) + class CFSF(MQOpts): """ Construct an MQCFSF Structure with default values as per MQI. The default values may be overridden by the optional keyword arguments 'kw'. @@ -2824,7 +2873,13 @@ index = mqcfh.ParameterCount cursor = CMQCFC.MQCFH_STRUC_LENGTH parameter = None # type: Optional[MQOpts] + group = None + group_count = 0 while (index > 0): + if group_count == 0: + group = None + if group is not None: + group_count -= 1 if message[cursor] == CMQCFC.MQCFT_STRING: parameter = CFST() parameter.unpack(message[cursor:cursor + CMQCFC.MQCFST_STRUC_LENGTH_FIXED]) @@ -2845,6 +2900,10 @@ parameter = CFIN() parameter.unpack(message[cursor:cursor + CMQCFC.MQCFIN_STRUC_LENGTH]) value = parameter.Value + elif message[cursor] == CMQCFC.MQCFT_INTEGER64: + parameter = CFIN64() + parameter.unpack(message[cursor:cursor + CMQCFC.MQCFIN64_STRUC_LENGTH]) + value = parameter.Value elif message[cursor] == CMQCFC.MQCFT_INTEGER_LIST: parameter = CFIL() parameter.unpack(message[cursor:cursor + CMQCFC.MQCFIL_STRUC_LENGTH_FIXED]) @@ -2853,6 +2912,22 @@ StrucLength=parameter.StrucLength) parameter.unpack(message[cursor:cursor + parameter.StrucLength]) value = parameter.Values + elif message[cursor] == CMQCFC.MQCFT_INTEGER64_LIST: + parameter = CFIL64() + parameter.unpack(message[cursor:cursor + CMQCFC.MQCFIL64_STRUC_LENGTH_FIXED]) + if parameter.Count > 0: + parameter = CFIL64(Count=parameter.Count, + StrucLength=parameter.StrucLength) + parameter.unpack(message[cursor:cursor + parameter.StrucLength]) + value = parameter.Values + elif message[cursor] == CMQCFC.MQCFT_GROUP: + parameter = CFGR() + parameter.unpack(message[cursor:cursor + parameter.StrucLength]) + group_count = parameter.ParameterCount + index += group_count + group = {} + res[parameter.Parameter] = res.get(parameter.Parameter, []) + res[parameter.Parameter].append(group) elif message[cursor] == CMQCFC.MQCFT_BYTE_STRING: parameter = CFBS() parameter.unpack(message[cursor:cursor + CMQCFC.MQCFBS_STRUC_LENGTH_FIXED]) @@ -2865,7 +2940,12 @@ raise NotImplementedError('Unpack for type ({}) not implemented'.format(pcf_type)) index -= 1 cursor += parameter.StrucLength - res[parameter.Parameter] = value + if parameter.Type == CMQCFC.MQCFT_GROUP: + continue + if group is not None: + group[parameter.Parameter] = value + else: + res[parameter.Parameter] = value return res, mqcfh.Control diff --git a/code/tests/messages/statistics_q.dat b/code/tests/messages/statistics_q.dat new file mode 100644 index 0000000000000000000000000000000000000000..9de9c9dcde7c9e43f1c18186fee1fc5cc3f5a851 GIT binary patch literal 8960 zc%1FoO>5LZ7zglwvb9@ES3GQM6`?(N5;{z>NxD6zyCW8x+IG8#p1cU2D%8*5xgHek z7bx^w6rqQnOM5FS=tb*EK?N@oGXu?#mzpMFc1>UjPbSa2?EILWVJ89H1yBR92SDQ= zaoq$k0-y`veLZ+a0c>qQ-g^4nn7!=f*D(v=&3I6Ue>15`=cqGh*g+e20POPeF#rZ7 zmO<>c$bFTP`&!C1iP`qYbpyZ*fS0vkoPK<!2lpnAAI<<%7smSG!dQEoy$$9Ox9csf zkj^8|b62`#t<U-_d8{YAj)_ozgL1#Wr}a>O=>lmvqJK?34<fEh@h3gV74b(N-{Nf_ z=m38dx~>73_UDq9Uq`r}Z+iH>cTzp#z3)<cU-^4~2N%)avDBW(JCDf|?VL*ec`Nlt z)blxJyrP~%>HGSr2YnIo52^eYkN?3(XSv_oVC(5Z2FV9<_7T_Kam>7)r_8JGLo3J! za`NG=xl3f7tuAj8o3VMT&8XR8u464mFCZCxSSF!%l8;)7kI@Jp_ePR^G;;Q_x|8gq z5#vM7D`Wl8iuTc=dHZm#;zNt@(MPfmEoUD`DZejTj1M`l3?C@mhpChgJ;KND6d!ud zJ{}<1haTfY&KnLNmh$Vz?Gzu?`cb40M=2j!_NRIKcru#JhnzPY`;T_zL-wI%^q~!- z54Z9m`PfG`A9CJs__&|557W4ck4A)#XXD8}{^#>8iuTbpl=2aIzKvP$tNm#iK3Yoo zh&tbXN%2vgZ;SL{E5Ck({!FXZkCJ`NEAL~XWFLcczAfIzg7Q9Il<i}H&bP(;SXACe Qvt%Fp()_4=zP*f(KZ%}J(EtDd diff --git a/code/tests/test_pcf.py b/code/tests/test_pcf.py --- a/code/tests/test_pcf.py +++ b/code/tests/test_pcf.py @@ -1,4 +1,5 @@ """Test PCF usage.""" +import os from unittest import skip from ddt import data from ddt import ddt @@ -13,6 +14,7 @@ """Class for MQ PCF testing.""" pcf = None + messages_dir = os.path.join(os.path.dirname(__file__), "messages") @classmethod def setUpClass(cls): @@ -197,6 +199,80 @@ self.assertTrue(item in value, '{} value not in values list'.format(item)) value.remove(item) + def test_mqcfgr_mqcfin64_mqcfil64(self): + """Test arbitrary message with MQCFIL.""" + message = pymqi.CFH(Version=pymqi.CMQCFC.MQCFH_VERSION_1, + Type=pymqi.CMQCFC.MQCFT_USER, + ParameterCount=4).pack() + message += pymqi.CFST(Parameter=pymqi.CMQC.MQCA_Q_MGR_NAME, + String=b'QM1').pack() + # group1 + message += pymqi.CFGR(Parameter=pymqi.CMQCFC.MQGACF_Q_STATISTICS_DATA, + ParameterCount=3).pack() + message += pymqi.CFST(Parameter=pymqi.CMQC.MQCA_Q_NAME, + String=b'SYSTEM.ADMIN.COMMAND.QUEUE').pack() + message += pymqi.CFIN64(Parameter=pymqi.CMQCFC.MQIAMO_Q_MIN_DEPTH, + Value=10).pack() + message += pymqi.CFIL64(Parameter=pymqi.CMQCFC.MQIAMO64_AVG_Q_TIME, + Values=[1, 2, 3]).pack() + # group2 + message += pymqi.CFGR(Parameter=pymqi.CMQCFC.MQGACF_Q_STATISTICS_DATA, + ParameterCount=3).pack() + message += pymqi.CFST(Parameter=pymqi.CMQC.MQCA_Q_NAME, + String=b'SYSTEM.ADMIN.COMMAND.QUEUE2').pack() + message += pymqi.CFIN64(Parameter=pymqi.CMQCFC.MQIAMO_Q_MIN_DEPTH, + Value=20).pack() + message += pymqi.CFIL64(Parameter=pymqi.CMQCFC.MQIAMO64_AVG_Q_TIME, + Values=[111, 222]).pack() + + message += pymqi.CFST(Parameter=pymqi.CMQCFC.MQCAMO_START_TIME, + String=b'10.41.58').pack() + + queue = pymqi.Queue(self.qmgr, self.queue_name, + pymqi.CMQC.MQOO_INPUT_AS_Q_DEF + pymqi.CMQC.MQOO_OUTPUT) + + put_md = pymqi.MD(Format=pymqi.CMQC.MQFMT_PCF) + queue.put(message, put_md) + + get_opts = pymqi.GMO( + Options=pymqi.CMQC.MQGMO_NO_SYNCPOINT + pymqi.CMQC.MQGMO_FAIL_IF_QUIESCING, + Version=pymqi.CMQC.MQGMO_VERSION_2, + MatchOptions=pymqi.CMQC.MQMO_MATCH_CORREL_ID) + get_md = pymqi.MD(MsgId=put_md.MsgId) # pylint: disable=no-member + message = queue.get(None, get_md, get_opts) + queue.close() + message, _ = pymqi.PCFExecute.unpack(message) + + self.assertEqual({ + pymqi.CMQC.MQCA_Q_MGR_NAME: b'QM1\x00', + pymqi.CMQCFC.MQCAMO_START_TIME: b'10.41.58', + pymqi.CMQCFC.MQGACF_Q_STATISTICS_DATA: [ + { + pymqi.CMQC.MQCA_Q_NAME: b'SYSTEM.ADMIN.COMMAND.QUEUE\x00\x00', + pymqi.CMQCFC.MQIAMO_Q_MIN_DEPTH: 10, + pymqi.CMQCFC.MQIAMO64_AVG_Q_TIME: [1, 2, 3], + }, + { + pymqi.CMQC.MQCA_Q_NAME: b'SYSTEM.ADMIN.COMMAND.QUEUE2\x00', + pymqi.CMQCFC.MQIAMO_Q_MIN_DEPTH: 20, + pymqi.CMQCFC.MQIAMO64_AVG_Q_TIME: [111, 222], + }, + ] + }, message) + + def test_unpack_group(self): + binary_message = open(os.path.join(self.messages_dir, "statistics_q.dat"), "rb").read() + + message, _ = pymqi.PCFExecute.unpack(binary_message) + + self.assertEqual(message[pymqi.CMQC.MQCA_Q_MGR_NAME].strip(), b'mq_mgr1') + self.assertEqual(message[pymqi.CMQCFC.MQCAMO_START_DATE], b'2020-06-15\x00\x00') + self.assertEqual(len(message[pymqi.CMQCFC.MQGACF_Q_STATISTICS_DATA]), 16) + + item = message[pymqi.CMQCFC.MQGACF_Q_STATISTICS_DATA][0] + self.assertEqual(item[pymqi.CMQC.MQCA_Q_NAME].strip(), b'SYSTEM.ADMIN.COMMAND.QUEUE') + self.assertEqual(item[pymqi.CMQCFC.MQIAMO_PUTS], [14, 0]) + def test_mqcfbs_old(self): """Test byte string MQCFBS with old style.""" attrs = {