Skip to content
Snippets Groups Projects
Commit fb857f4e8959 authored by Luke Bakken's avatar Luke Bakken
Browse files

Use ack_message from README

parent e0c244dd32b7
Branches
No related tags found
No related merge requests found
......@@ -10,9 +10,20 @@
logging.basicConfig(level=logging.DEBUG, format=LOG_FORMAT)
def ack_message(channel, delivery_tag):
"""Note that `channel` must be the same pika channel instance via which
the message being ACKed was retrieved (AMQP protocol constraint).
"""
if channel.is_open:
channel.basic_ack(delivery_tag)
else:
# Channel is already closed, so we can't ACK this message;
# log and/or do something that makes sense for your app in this case.
pass
def do_work(connection, channel, delivery_tag, body):
thread_id = threading.get_ident()
fmt1 = 'Thread id: {} Delivery tag: {} Message body: {}'
LOGGER.info(fmt1.format(thread_id, delivery_tag, body))
# Sleeping to simulate 10 seconds of work
time.sleep(10)
......@@ -13,11 +24,8 @@
def do_work(connection, channel, delivery_tag, body):
thread_id = threading.get_ident()
fmt1 = 'Thread id: {} Delivery tag: {} Message body: {}'
LOGGER.info(fmt1.format(thread_id, delivery_tag, body))
# Sleeping to simulate 10 seconds of work
time.sleep(10)
if channel.is_open:
fmt2 = 'Thread id: {} Delivery tag: {} sending ack'
LOGGER.info(fmt2.format(thread_id, delivery_tag))
cb = functools.partial(channel.basic_ack, delivery_tag)
cb = functools.partial(ack_message, channel, delivery_tag)
connection.add_callback_threadsafe(cb)
......@@ -23,7 +31,4 @@
connection.add_callback_threadsafe(cb)
else:
fmt2 = 'Thread id: {} Delivery tag: {} not sending ack, channel closed'
LOGGER.info(fmt2.format(thread_id, delivery_tag))
def on_message(channel, method_frame, header_frame, body, args):
(connection, threads) = args
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment