diff --git a/examples/consume.py b/examples/consume.py index fb857f4e89594d21b03b0c2a4214351637d578cb_ZXhhbXBsZXMvY29uc3VtZS5weQ==..093fdd8abeb19e1fd4bfc24242f9d8a761e02fa8_ZXhhbXBsZXMvY29uc3VtZS5weQ== 100644 --- a/examples/consume.py +++ b/examples/consume.py @@ -1,3 +1,4 @@ +"""Basic message consumer example""" import functools import logging import pika @@ -8,7 +9,8 @@ logging.basicConfig(level=logging.DEBUG, format=LOG_FORMAT) -def on_message(channel, method_frame, header_frame, body, userdata=None): - LOGGER.info('Userdata: {} Message body: {}'.format(userdata, body)) - channel.basic_ack(delivery_tag=method_frame.delivery_tag) +def on_message(chan, method_frame, _header_frame, body, userdata=None): + """Called when a message is received. Log message and ack it.""" + LOGGER.info('Userdata: %s Message body: %s', userdata, body) + chan.basic_ack(delivery_tag=method_frame.delivery_tag) @@ -14,5 +16,7 @@ -credentials = pika.PlainCredentials('guest', 'guest') -parameters = pika.ConnectionParameters('localhost', credentials=credentials) -connection = pika.BlockingConnection(parameters) +def main(): + """Main method.""" + credentials = pika.PlainCredentials('guest', 'guest') + parameters = pika.ConnectionParameters('localhost', credentials=credentials) + connection = pika.BlockingConnection(parameters) @@ -18,7 +22,11 @@ -channel = connection.channel() -channel.exchange_declare(exchange="test_exchange", exchange_type="direct", passive=False, durable=True, auto_delete=False) -channel.queue_declare(queue="standard", auto_delete=True) -channel.queue_bind(queue="standard", exchange="test_exchange", routing_key="standard_key") -channel.basic_qos(prefetch_count=1) + channel = connection.channel() + channel.exchange_declare(exchange="test_exchange", + exchange_type="direct", + passive=False, + durable=True, + auto_delete=False) + channel.queue_declare(queue="standard", auto_delete=True) + channel.queue_bind(queue="standard", exchange="test_exchange", routing_key="standard_key") + channel.basic_qos(prefetch_count=1) @@ -24,4 +32,4 @@ -on_message_callback = functools.partial(on_message, userdata='on_message_userdata') -channel.basic_consume(on_message_callback, 'standard') + on_message_callback = functools.partial(on_message, userdata='on_message_userdata') + channel.basic_consume(on_message_callback, 'standard') @@ -27,6 +35,6 @@ -try: - channel.start_consuming() -except KeyboardInterrupt: - channel.stop_consuming() + try: + channel.start_consuming() + except KeyboardInterrupt: + channel.stop_consuming() @@ -32,2 +40,5 @@ -connection.close() + connection.close() + +if __name__ == '__main__': + main() diff --git a/pika/connection.py b/pika/connection.py index fb857f4e89594d21b03b0c2a4214351637d578cb_cGlrYS9jb25uZWN0aW9uLnB5..093fdd8abeb19e1fd4bfc24242f9d8a761e02fa8_cGlrYS9jb25uZWN0aW9uLnB5 100644 --- a/pika/connection.py +++ b/pika/connection.py @@ -1301,7 +1301,7 @@ self._backpressure_multiplier = value # - # Connections state properties + # Connection state properties # @property diff --git a/pika/heartbeat.py b/pika/heartbeat.py index fb857f4e89594d21b03b0c2a4214351637d578cb_cGlrYS9oZWFydGJlYXQucHk=..093fdd8abeb19e1fd4bfc24242f9d8a761e02fa8_cGlrYS9oZWFydGJlYXQucHk= 100644 --- a/pika/heartbeat.py +++ b/pika/heartbeat.py @@ -7,7 +7,9 @@ class HeartbeatChecker(object): - """Checks to make sure that our heartbeat is received at the expected - intervals. + """Sends heartbeats to the broker. The provided timeout is used to + determine if the connection is stale - no received heartbeats or + other activity will close the connection. See the parameter list for more + details. """ @@ -12,5 +14,3 @@ """ - DEFAULT_INTERVAL = 60 - MAX_IDLE_COUNT = 2 _CONNECTION_FORCED = 320 @@ -16,3 +16,3 @@ _CONNECTION_FORCED = 320 - _STALE_CONNECTION = "Too Many Missed Heartbeats, No reply in %i seconds" + _STALE_CONNECTION = "No activity or too many missed meartbeats in the last %i seconds" @@ -18,6 +18,12 @@ - def __init__(self, connection, interval=DEFAULT_INTERVAL, idle_count=MAX_IDLE_COUNT): - """Create a heartbeat on connection sending a heartbeat frame every - interval seconds. + def __init__(self, connection, timeout): + """Create an object that will check for activity on the provided + connection as well as receive heartbeat frames from the broker. The + timeout parameter defines a window within which this activity must + happen. If not, the connection is considered dead and closed. + + The value passed for timeout is also used to calculate an interval + at which a heartbeat frame is sent to the broker. The interval is + equal to the timeout value divided by two. :param pika.connection.Connection: Connection object @@ -22,8 +28,9 @@ :param pika.connection.Connection: Connection object - :param int interval: Heartbeat check interval. Note: heartbeats will - be sent at interval / 2 frequency. - :param int idle_count: The number of heartbeat intervals without data - received that will close the current connection. + :param int timeout: Connection idle timeout. If no activity occurs on the + connection nor heartbeat frames received during the + timeout window the connection will be closed. The + interval used to send heartbeats is calculated from + this value by dividing it by two. """ @@ -28,4 +35,7 @@ """ + if timeout < 1: + raise ValueError('timeout must >= 0, but got %r' % (timeout,)) + self._connection = connection @@ -30,4 +40,4 @@ self._connection = connection - # Note: see the following document: + # Note: see the following documents: # https://www.rabbitmq.com/heartbeats.html#heartbeats-timeout @@ -33,3 +43,12 @@ # https://www.rabbitmq.com/heartbeats.html#heartbeats-timeout - self._interval = float(interval / 2) + # https://github.com/pika/pika/pull/1072 + # https://groups.google.com/d/topic/rabbitmq-users/Fmfeqe5ocTY/discussion + # There is a certain amount of confusion around how client developers + # interpret the spec. The spec talks about 2 missed heartbeats as a + # *timeout*, plus that any activity on the connection counts for a + # heartbeat. This is to avoid edge cases and not to depend on network + # latency. + self._timeout = timeout + + self._send_interval = float(timeout) / 2 @@ -35,10 +54,20 @@ - # Note: even though we're sending heartbeats in half the specified - # interval, the broker will be sending them to us at the specified - # interval. This means we'll be checking for an idle connection - # twice as many times as the broker will send heartbeats to us, - # so we need to double the max idle count here - self._max_idle_count = idle_count * 2 + # Note: Pika will calculate the heartbeat / connectivity check interval + # by adding 5 seconds to the negotiated timeout to leave a bit of room + # for broker heartbeats that may be right at the edge of the timeout + # window. This is different behavior from the RabbitMQ Java client and + # the spec that suggests a check interval equivalent to two times the + # heartbeat timeout value. But, one advantage of adding a small amount + # is that bad connections will be detected faster. + # https://github.com/pika/pika/pull/1072#issuecomment-397850795 + # https://github.com/rabbitmq/rabbitmq-java-client/blob/b55bd20a1a236fc2d1ea9369b579770fa0237615/src/main/java/com/rabbitmq/client/impl/AMQConnection.java#L773-L780 + # https://github.com/ruby-amqp/bunny/blob/3259f3af2e659a49c38c2470aa565c8fb825213c/lib/bunny/session.rb#L1187-L1192 + self._check_interval = timeout + 5 + + LOGGER.debug('timeout: %f send_interval: %f check_interval: %f', + self._timeout, + self._send_interval, + self._check_interval) # Initialize counters self._bytes_received = 0 @@ -47,21 +76,10 @@ self._heartbeat_frames_sent = 0 self._idle_byte_intervals = 0 - # The handle for the last timer - self._timer = None - - # Setup the timer to fire in _interval seconds - self._setup_timer() - - @property - def active(self): - """Return True if the connection's heartbeat attribute is set to this - instance. - - :rtype True - - """ - return self._connection.heartbeat is self + self._send_timer = None + self._check_timer = None + self._start_send_timer() + self._start_check_timer() @property def bytes_received_on_connection(self): @@ -78,10 +96,10 @@ to trip the max idle threshold. """ - return self._idle_byte_intervals >= self._max_idle_count + return self._idle_byte_intervals > 0 def received(self): """Called when a heartbeat is received""" LOGGER.debug('Received heartbeat frame') self._heartbeat_frames_received += 1 @@ -82,12 +100,20 @@ def received(self): """Called when a heartbeat is received""" LOGGER.debug('Received heartbeat frame') self._heartbeat_frames_received += 1 - def send_and_check(self): - """Invoked by a timer to send a heartbeat when we need to, check to see + def _send_heartbeat(self): + """Invoked by a timer to send a heartbeat when we need to. + + """ + LOGGER.debug('Sending heartbeat frame') + self._send_heartbeat_frame() + self._start_send_timer() + + def _check_heartbeat(self): + """Invoked by a timer to check for broker heartbeats. Checks to see if we've missed any heartbeats and disconnect our connection if it's been idle too long. """ @@ -90,5 +116,11 @@ if we've missed any heartbeats and disconnect our connection if it's been idle too long. """ + if self._has_received_data: + self._idle_byte_intervals = 0 + else: + # Connection has not received any data, increment the counter + self._idle_byte_intervals += 1 + LOGGER.debug('Received %i heartbeat frames, sent %i, ' @@ -94,4 +126,4 @@ LOGGER.debug('Received %i heartbeat frames, sent %i, ' - 'idle intervals %i, max idle count %i', + 'idle intervals %i', self._heartbeat_frames_received, self._heartbeat_frames_sent, @@ -96,6 +128,5 @@ self._heartbeat_frames_received, self._heartbeat_frames_sent, - self._idle_byte_intervals, - self._max_idle_count) + self._idle_byte_intervals) if self.connection_is_idle: @@ -100,10 +131,5 @@ if self.connection_is_idle: - return self._close_connection() - - # Connection has not received any data, increment the counter - if not self._has_received_data: - self._idle_byte_intervals += 1 - else: - self._idle_byte_intervals = 0 + self._close_connection() + return @@ -109,12 +135,5 @@ - # Update the counters of bytes sent/received and the frames received - self._update_counters() - - # Send a heartbeat frame - self._send_heartbeat_frame() - - # Update the timer to fire again - self._start_timer() + self._start_check_timer() def stop(self): """Stop the heartbeat checker""" @@ -118,12 +137,16 @@ def stop(self): """Stop the heartbeat checker""" - if self._timer: - LOGGER.debug('Removing timeout for next heartbeat interval') - self._connection.remove_timeout(self._timer) - self._timer = None + if self._send_timer: + LOGGER.debug('Removing timer for next heartbeat send interval') + self._connection.remove_timeout(self._send_timer) # pylint: disable=W0212 + self._send_timer = None + if self._check_timer: + LOGGER.debug('Removing timer for next heartbeat check interval') + self._connection.remove_timeout(self._check_timer) # pylint: disable=W0212 + self._check_timer = None def _close_connection(self): """Close the connection with the AMQP Connection-Forced value.""" LOGGER.info('Connection is idle, %i stale byte intervals', self._idle_byte_intervals) @@ -125,13 +148,12 @@ def _close_connection(self): """Close the connection with the AMQP Connection-Forced value.""" LOGGER.info('Connection is idle, %i stale byte intervals', self._idle_byte_intervals) - duration = self._max_idle_count * self._interval - text = HeartbeatChecker._STALE_CONNECTION % duration + text = HeartbeatChecker._STALE_CONNECTION % self._timeout # NOTE: this won't achieve the perceived effect of sending # Connection.Close to broker, because the frame will only get buffered # in memory before the next statement terminates the connection. self._connection.close(HeartbeatChecker._CONNECTION_FORCED, text) @@ -132,11 +154,11 @@ # NOTE: this won't achieve the perceived effect of sending # Connection.Close to broker, because the frame will only get buffered # in memory before the next statement terminates the connection. self._connection.close(HeartbeatChecker._CONNECTION_FORCED, text) - self._connection._on_terminate(HeartbeatChecker._CONNECTION_FORCED, + self._connection._on_terminate(HeartbeatChecker._CONNECTION_FORCED, # pylint: disable=W0212 text) @property def _has_received_data(self): @@ -139,9 +161,9 @@ text) @property def _has_received_data(self): - """Returns True if the connection has received data on the connection. + """Returns True if the connection has received data. :rtype: bool """ @@ -144,8 +166,8 @@ :rtype: bool """ - return not self._bytes_received == self.bytes_received_on_connection + return self._bytes_received != self.bytes_received_on_connection @staticmethod def _new_heartbeat_frame(): @@ -161,6 +183,7 @@ """ LOGGER.debug('Sending heartbeat frame') - self._connection._send_frame(self._new_heartbeat_frame()) + self._connection._send_frame( # pylint: disable=W0212 + self._new_heartbeat_frame()) self._heartbeat_frames_sent += 1 @@ -165,7 +188,8 @@ self._heartbeat_frames_sent += 1 - def _setup_timer(self): - """Use the connection objects delayed_call function which is - implemented by the Adapter for calling the check_heartbeats function - every interval seconds. + def _start_send_timer(self): + """Start a new heartbeat send timer.""" + self._send_timer = self._connection.add_timeout( # pylint: disable=W0212 + self._send_interval, + self._send_heartbeat) @@ -171,5 +195,9 @@ - """ - self._timer = self._connection.add_timeout(self._interval, - self.send_and_check) + def _start_check_timer(self): + """Start a new heartbeat check timer.""" + # Note: update counters now to get current values + # at the start of the timeout window. Values will be + # checked against the connection's byte count at the + # end of the window + self._update_counters() @@ -175,11 +203,7 @@ - def _start_timer(self): - """If the connection still has this object set for heartbeats, add a - new timer. - - """ - if self.active: - self._setup_timer() + self._check_timer = self._connection.add_timeout( # pylint: disable=W0212 + self._check_interval, + self._check_heartbeat) def _update_counters(self): """Update the internal counters for bytes sent and received and the diff --git a/tests/unit/heartbeat_tests.py b/tests/unit/heartbeat_tests.py index fb857f4e89594d21b03b0c2a4214351637d578cb_dGVzdHMvdW5pdC9oZWFydGJlYXRfdGVzdHMucHk=..093fdd8abeb19e1fd4bfc24242f9d8a761e02fa8_dGVzdHMvdW5pdC9oZWFydGJlYXRfdGVzdHMucHk= 100644 --- a/tests/unit/heartbeat_tests.py +++ b/tests/unit/heartbeat_tests.py @@ -8,7 +8,6 @@ from pika import connection, frame, heartbeat - class HeartbeatTests(unittest.TestCase): INTERVAL = 60 @@ -12,7 +11,8 @@ class HeartbeatTests(unittest.TestCase): INTERVAL = 60 - HALF_INTERVAL = INTERVAL / 2 + SEND_INTERVAL = float(INTERVAL) / 2 + CHECK_INTERVAL = INTERVAL + 5 def setUp(self): self.mock_conn = mock.Mock(spec=connection.Connection) @@ -25,12 +25,6 @@ del self.obj del self.mock_conn - def test_default_initialization_interval(self): - self.assertEqual(self.obj._interval, self.HALF_INTERVAL) - - def test_default_initialization_max_idle_count(self): - self.assertEqual(self.obj._max_idle_count, self.obj.MAX_IDLE_COUNT * 2) - def test_constructor_assignment_connection(self): self.assertIs(self.obj._connection, self.mock_conn) @@ -34,7 +28,8 @@ def test_constructor_assignment_connection(self): self.assertIs(self.obj._connection, self.mock_conn) - def test_constructor_assignment_heartbeat_interval(self): - self.assertEqual(self.obj._interval, self.HALF_INTERVAL) + def test_constructor_assignment_intervals(self): + self.assertEqual(self.obj._send_interval, self.SEND_INTERVAL) + self.assertEqual(self.obj._check_interval, self.CHECK_INTERVAL) def test_constructor_initial_bytes_received(self): @@ -39,5 +34,9 @@ def test_constructor_initial_bytes_received(self): - self.assertEqual(self.obj._bytes_received, 0) + # Note: _bytes_received is initialized by calls + # to _start_check_timer which calls _update_counters + # which reads the initial values from the connection + self.assertEqual(self.obj._bytes_received, + self.mock_conn.bytes_received) def test_constructor_initial_bytes_sent(self): @@ -42,6 +41,10 @@ def test_constructor_initial_bytes_sent(self): - self.assertEqual(self.obj._bytes_received, 0) + # Note: _bytes_received is initialized by calls + # to _start_check_timer which calls _update_counters + # which reads the initial values from the connection + self.assertEqual(self.obj._bytes_sent, + self.mock_conn.bytes_sent) def test_constructor_initial_heartbeat_frames_received(self): self.assertEqual(self.obj._heartbeat_frames_received, 0) @@ -52,8 +55,8 @@ def test_constructor_initial_idle_byte_intervals(self): self.assertEqual(self.obj._idle_byte_intervals, 0) - @mock.patch('pika.heartbeat.HeartbeatChecker._setup_timer') - def test_constructor_called_setup_timer(self, timer): - heartbeat.HeartbeatChecker(self.mock_conn) + @mock.patch('pika.heartbeat.HeartbeatChecker._start_send_timer') + def test_constructor_called_start_send_timer(self, timer): + heartbeat.HeartbeatChecker(self.mock_conn, self.INTERVAL) timer.assert_called_once_with() @@ -58,12 +61,9 @@ timer.assert_called_once_with() - def test_active_true(self): - self.mock_conn.heartbeat = self.obj - self.assertTrue(self.obj.active) - - def test_active_false(self): - self.mock_conn.heartbeat = mock.Mock() - self.assertFalse(self.obj.active) + @mock.patch('pika.heartbeat.HeartbeatChecker._start_check_timer') + def test_constructor_called_start_check_timer(self, timer): + heartbeat.HeartbeatChecker(self.mock_conn, self.INTERVAL) + timer.assert_called_once_with() def test_bytes_received_on_connection(self): self.mock_conn.bytes_received = 128 @@ -81,9 +81,16 @@ self.assertTrue(self.obj._heartbeat_frames_received, 1) @mock.patch('pika.heartbeat.HeartbeatChecker._close_connection') - def test_send_and_check_not_closed(self, close_connection): - obj = heartbeat.HeartbeatChecker(self.mock_conn) - obj.send_and_check() + def test_send_heartbeat_not_closed(self, close_connection): + obj = heartbeat.HeartbeatChecker(self.mock_conn, self.INTERVAL) + obj._send_heartbeat() + close_connection.assert_not_called() + + @mock.patch('pika.heartbeat.HeartbeatChecker._close_connection') + def test_check_heartbeat_not_closed(self, close_connection): + obj = heartbeat.HeartbeatChecker(self.mock_conn, self.INTERVAL) + self.mock_conn.bytes_received = 128 + obj._check_heartbeat() close_connection.assert_not_called() @mock.patch('pika.heartbeat.HeartbeatChecker._close_connection') @@ -87,6 +94,6 @@ close_connection.assert_not_called() @mock.patch('pika.heartbeat.HeartbeatChecker._close_connection') - def test_send_and_check_missed_bytes(self, close_connection): - obj = heartbeat.HeartbeatChecker(self.mock_conn) + def test_check_heartbeat_missed_bytes(self, close_connection): + obj = heartbeat.HeartbeatChecker(self.mock_conn, self.INTERVAL) obj._idle_byte_intervals = self.INTERVAL @@ -92,4 +99,4 @@ obj._idle_byte_intervals = self.INTERVAL - obj.send_and_check() + obj._check_heartbeat() close_connection.assert_called_once_with() @@ -94,5 +101,5 @@ close_connection.assert_called_once_with() - def test_send_and_check_increment_no_bytes(self): + def test_check_heartbeat_increment_no_bytes(self): self.mock_conn.bytes_received = 100 self.obj._bytes_received = 100 @@ -97,5 +104,5 @@ self.mock_conn.bytes_received = 100 self.obj._bytes_received = 100 - self.obj.send_and_check() + self.obj._check_heartbeat() self.assertEqual(self.obj._idle_byte_intervals, 1) @@ -100,5 +107,5 @@ self.assertEqual(self.obj._idle_byte_intervals, 1) - def test_send_and_check_increment_bytes(self): + def test_check_heartbeat_increment_bytes(self): self.mock_conn.bytes_received = 100 self.obj._bytes_received = 128 @@ -103,6 +110,6 @@ self.mock_conn.bytes_received = 100 self.obj._bytes_received = 128 - self.obj.send_and_check() + self.obj._check_heartbeat() self.assertEqual(self.obj._idle_byte_intervals, 0) @mock.patch('pika.heartbeat.HeartbeatChecker._update_counters') @@ -106,9 +113,8 @@ self.assertEqual(self.obj._idle_byte_intervals, 0) @mock.patch('pika.heartbeat.HeartbeatChecker._update_counters') - def test_send_and_check_update_counters(self, update_counters): - obj = heartbeat.HeartbeatChecker(self.mock_conn) - obj.send_and_check() + def test_check_heartbeat_update_counters(self, update_counters): + heartbeat.HeartbeatChecker(self.mock_conn, self.INTERVAL) update_counters.assert_called_once_with() @mock.patch('pika.heartbeat.HeartbeatChecker._send_heartbeat_frame') @@ -112,8 +118,8 @@ update_counters.assert_called_once_with() @mock.patch('pika.heartbeat.HeartbeatChecker._send_heartbeat_frame') - def test_send_and_check_send_heartbeat_frame(self, send_heartbeat_frame): - obj = heartbeat.HeartbeatChecker(self.mock_conn) - obj.send_and_check() + def test_send_heartbeat_sends_heartbeat_frame(self, send_heartbeat_frame): + obj = heartbeat.HeartbeatChecker(self.mock_conn, self.INTERVAL) + obj._send_heartbeat() send_heartbeat_frame.assert_called_once_with() @@ -118,12 +124,16 @@ send_heartbeat_frame.assert_called_once_with() - @mock.patch('pika.heartbeat.HeartbeatChecker._start_timer') - def test_send_and_check_start_timer(self, start_timer): - obj = heartbeat.HeartbeatChecker(self.mock_conn) - obj.send_and_check() - start_timer.assert_called_once_with() + @mock.patch('pika.heartbeat.HeartbeatChecker._start_send_timer') + def test_send_heartbeat_start_timer(self, start_send_timer): + heartbeat.HeartbeatChecker(self.mock_conn, self.INTERVAL) + start_send_timer.assert_called_once_with() + + @mock.patch('pika.heartbeat.HeartbeatChecker._start_check_timer') + def test_check_heartbeat_start_timer(self, start_check_timer): + heartbeat.HeartbeatChecker(self.mock_conn, self.INTERVAL) + start_check_timer.assert_called_once_with() def test_connection_close(self): self.obj._idle_byte_intervals = 3 self.obj._idle_heartbeat_intervals = 4 self.obj._close_connection() @@ -125,10 +135,9 @@ def test_connection_close(self): self.obj._idle_byte_intervals = 3 self.obj._idle_heartbeat_intervals = 4 self.obj._close_connection() - reason = self.obj._STALE_CONNECTION % ( - self.obj._max_idle_count * self.obj._interval) + reason = self.obj._STALE_CONNECTION % self.obj._timeout self.mock_conn.close.assert_called_once_with( self.obj._CONNECTION_FORCED, reason) self.mock_conn._on_terminate.assert_called_once_with( @@ -157,7 +166,9 @@ self.obj._send_heartbeat_frame() self.assertEqual(self.obj._heartbeat_frames_sent, 1) - def test_setup_timer_called(self): - self.mock_conn.add_timeout.assert_called_once_with( - self.HALF_INTERVAL, self.obj.send_and_check) + def test_start_send_timer_called(self): + want = [mock.call(self.SEND_INTERVAL, self.obj._send_heartbeat), + mock.call(self.CHECK_INTERVAL, self.obj._check_heartbeat)] + got = self.mock_conn.add_timeout.call_args_list + self.assertEqual(got, want) @@ -163,9 +174,4 @@ - @mock.patch('pika.heartbeat.HeartbeatChecker._setup_timer') - def test_start_timer_not_active(self, setup_timer): - self.obj._start_timer() - setup_timer.assert_not_called() - - @mock.patch('pika.heartbeat.HeartbeatChecker._setup_timer') - def test_start_timer_active(self, setup_timer): + @mock.patch('pika.heartbeat.HeartbeatChecker._start_send_timer') + def test_start_timer_active(self, setup_send_timer): self.mock_conn.heartbeat = self.obj @@ -171,6 +177,6 @@ self.mock_conn.heartbeat = self.obj - self.obj._start_timer() - self.assertTrue(setup_timer.called) + self.obj._start_send_timer() + self.assertTrue(setup_send_timer.called) def test_update_counters_bytes_received(self): self.mock_conn.bytes_received = 256