Skip to content
Snippets Groups Projects
Commit 02ad93304e14 authored by Vyacheslav Savchenko's avatar Vyacheslav Savchenko
Browse files

GH #11 Allow to pub and sub for messages with rfh2 (PR #213)

parent c8eee3da654e
No related branches found
No related tags found
No related merge requests found
......@@ -3,7 +3,6 @@
push:
branches:
- master
- gh159-actions-unit-testing
pull_request:
jobs:
......
......@@ -3,7 +3,6 @@
push:
branches:
- master
- gh159-actions-unit-testing
pull_request:
jobs:
......@@ -17,7 +16,7 @@
# macos-latest: has some issues in SeyfSV/setup-mqclient@master action
#environment: ['ubuntu-latest', 'macos-latest', 'windows-latest']
python-version: [2.7, 3.5, 3.6, 3.7, 3.8]
mq-client-version: [9.1.4.0]
mq-client-version: [9.1.5.0]
services:
mq:
......
......@@ -2283,6 +2283,34 @@
msg_desc.unpack(rv[0])
put_opts.unpack(rv[1])
def pub_rfh2(self, msg, *opts):
# type: (bytes, *MQOpts) -> None
"""pub_rfh2(msg[, mDesc ,putOpts, [rfh2_header, ]])
Put a RFH2 message. opts[2] is a list of RFH2 headers.
MQMD and RFH2's must be correct.
"""
ensure_not_unicode(msg) # Python 3 bytes check
rfh2_buff = b''
if len(opts) >= 3:
if opts[2] is not None:
if not isinstance(opts[2], list):
raise TypeError('Third item of opts should be a list.')
encoding = CMQC.MQENC_NATIVE
if opts[0] is not None:
mqmd = opts[0]
encoding = mqmd['Encoding']
for rfh2_header in opts[2]:
if rfh2_header is not None:
rfh2_buff = rfh2_buff + rfh2_header.pack(encoding)
encoding = rfh2_header['Encoding']
msg = rfh2_buff + msg
self.pub(msg, *opts[0:2])
else:
self.pub(msg, *opts)
def sub(self, *opts):
""" Subscribe to the topic and return a Subscription object.
A subscription to a topic can be made using an existing queue, either
......@@ -2350,6 +2378,12 @@
"""
return self.sub_queue.get(max_length, *opts)
def get_rfh2(self, max_length=None, *opts):
# type: (int, *MQOpts) -> bytes
""" Get a publication from the Queue.
"""
return self.sub_queue.get_rfh2(max_length, *opts)
def sub(self, sub_desc=None, sub_queue=None, sub_name=None, sub_opts=None,
topic_name=None, topic_string=None):
""" Subscribe to a topic, alter or resume a subscription.
......
......@@ -106,6 +106,14 @@
topic.pub(msg, *opts)
topic.close()
def pub_rfh2(self, msg, topic_string, *opts):
topic = pymqi.Topic(self.qmgr, topic_string=topic_string)
topic.open(open_opts=pymqi.CMQC.MQOO_OUTPUT)
if isinstance(msg, str) and not isinstance(msg, bytes):
raise AttributeError('msg must be bytes (not str) to publish to topic.') # py3
topic.pub_rfh2(msg, *opts)
topic.close()
def create_api_subscription(self):
return pymqi.Subscription(self.qmgr)
......@@ -208,6 +216,48 @@
sub.close(sub_close_options=0, close_sub_queue=True)
self.assertEqual(data, msg)
def test_pubsub_api_managed_non_durable_rfh2(self):
topic_string = self.topic_string_template.format(type="API_RFH2", destination="MANAGED", durable="NON DURABLE")
subname = self.subname_template.format(type="Api_rfh2", destination="Managed", durable="Non Durable")
msg = self.msg_format(topic_string=topic_string)
sub_desc = self.get_subscription_descriptor(subname, topic_string,
pymqi.CMQC.MQSO_CREATE + pymqi.CMQC.MQSO_MANAGED)
# register Subscription
sub = self.create_api_subscription()
self.sub_desc_list = [(sub, sub_desc, None)]
sub.sub(sub_desc=sub_desc)
# publish (put)
put_mqmd = pymqi.md(
Format=pymqi.CMQC.MQFMT_RF_HEADER_2,
Encoding=273,
CodedCharSetId=1208)
put_opts = pymqi.pmo()
put_rfh2 = pymqi.RFH2(StrucId=pymqi.CMQC.MQRFH_STRUC_ID,
Version=pymqi.CMQC.MQRFH_VERSION_2,
StrucLength=188,
Encoding=273,
CodedCharSetId=1208,
Format=pymqi.CMQC.MQFMT_STRING,
Flags=0,
NameValueCCSID=1208)
put_rfh2.add_folder(b"<psc><Command>RegSub</Command><Topic>$topictree/topiccat/topic</Topic><QMgrName>DebugQM</QMgrName><QName>PUBOUT</QName><RegOpt>PersAsPub</RegOpt></psc>")
put_rfh2.add_folder(b"<testFolder><testVar>testValue</testVar></testFolder>")
put_rfh2.add_folder(b"<mcd><Msd>xmlnsc</Msd></mcd>")
self.pub_rfh2(msg, topic_string, put_mqmd, put_opts, [put_rfh2])
get_opts = pymqi.GMO(Version=pymqi.CMQC.MQGMO_VERSION_4,
WaitInterval=15000,
Options=pymqi.CMQC.MQGMO_NO_SYNCPOINT + \
pymqi.CMQC.MQGMO_FAIL_IF_QUIESCING + \
pymqi.CMQC.MQGMO_WAIT)
get_rfh2_list = []
data = sub.get_rfh2(None, pymqi.md(Version=pymqi.CMQC.MQMD_VERSION_2), get_opts, get_rfh2_list)
sub.close(sub_close_options=0, close_sub_queue=True)
self.assertEqual(data, msg)
def test_pubsub_admin_managed(self):
topic_string = self.topic_string_template.format(type="ADMIN", destination="MANAGED", durable="DURABLE")
subname = self.subname_template.format(type="Admin", destination="Managed", durable="Durable")
......@@ -334,7 +384,7 @@
"""
topic_string = self.topic_string_template.format(type="API", destination="MANAGED", durable="NON DURABLE")
subname = self.subname_template.format(type="Api", destination="Managed", durable="Non Durable")
messages = ["ascii", unicode("Euro sign: ", "iso-8859-15"), unicode("Umlut", "iso-8859-15"), unicodedata.lookup("INFINITY")]
messages = ["ascii", unicode("Euro sign: ", "iso-8859-15"), unicode("Umlut", "iso-8859-15"), unicodedata.lookup("INFINITY")]
md = pymqi.md()
# setting this means the message is entirely character data
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment