diff --git a/pika/heartbeat.py b/pika/heartbeat.py index b2f9dea1a755ae3e51c6dd4122f656189e0dc8ed_cGlrYS9oZWFydGJlYXQucHk=..31d2d01280b96d9e9cdec53b0e07c2a60852fcdd_cGlrYS9oZWFydGJlYXQucHk= 100644 --- a/pika/heartbeat.py +++ b/pika/heartbeat.py @@ -7,10 +7,10 @@ class HeartbeatChecker(object): - """Sends heartbeats to the broker twice during the provided interval - and checks at a different interval to make sure that the broker's - heartbeat is received as expected. See the parameter list for more + """Sends heartbeats to the broker. The provided timeout is used to + determine if the connection is stale - no received heartbeats or + other activity will close the connection. See the parameter list for more details. """ _CONNECTION_FORCED = 320 @@ -13,7 +13,6 @@ details. """ _CONNECTION_FORCED = 320 - _MAX_IDLE_COUNT = 2 - _STALE_CONNECTION = "Too Many Missed Heartbeats, No reply in %i seconds" + _STALE_CONNECTION = "No activity or too many missed meartbeats in the last %i seconds" @@ -19,7 +18,12 @@ - def __init__(self, connection, interval): - """Create a heartbeat on the connection that sends two heartbeat frames - within the specified interval. Also checks to ensure heartbeats are - received from the broker at a different interval explained below. + def __init__(self, connection, timeout): + """Create an object that will check for activity on the provided + connection as well as receive heartbeat frames from the broker. The + timeout parameter defines a window within which this activity must + happen. If not, the connection is considered dead and closed. + + The value passed for timeout is also used to calculate an interval + at which a heartbeat frame is sent to the broker. The interval is + equal to the timeout value divided by two. :param pika.connection.Connection: Connection object @@ -24,13 +28,10 @@ :param pika.connection.Connection: Connection object - :param int interval: Heartbeat check interval. Note: heartbeats will - actually be sent at interval / 2 frequency, and - heartbeat checks made at (interval * 3) / 2. Default - interval is 60 seconds so Pika will send heartbeats - at 30 second intervals and will check every 90 seconds. - If no heartbeat is received from the broker, nor data - activity on the connection after 180 seconds the - connection will be assumed dead and closed. + :param int timeout: Connection idle timeout. If no activity occurs on the + connection nor heartbeat frames received during the + timeout window the connection will be closed. The + interval used to send heartbeats is calculated from + this value by dividing it by two. """ self._connection = connection @@ -39,8 +40,14 @@ # https://www.rabbitmq.com/heartbeats.html#heartbeats-timeout # https://github.com/pika/pika/pull/1072 # https://groups.google.com/d/topic/rabbitmq-users/Fmfeqe5ocTY/discussion - self._send_interval = float(interval) / 2 - self._check_interval = (float(interval) * 3) / 2 + # There is a certain amount of confusion around how client developers + # interpret the spec. The spec talks about 2 missed heartbeats as a + # *timeout*, plus that any activity on the connection counts for a + # heartbeat. This is to avoid edge cases and not to depend on network + # latency. + self._timeout = timeout + self._send_interval = float(timeout) / 2 + self._check_interval = timeout # Initialize counters self._bytes_received = 0 @@ -51,18 +58,8 @@ self._send_timer = None self._check_timer = None - - self._setup_timers() - - @property - def active(self): - """Return True if the connection's heartbeat attribute is set to this - instance. - - :rtype True - - """ - return self._connection.heartbeat is self # pylint: disable=W0212 + self._start_send_timer() + self._start_check_timer() @property def bytes_received_on_connection(self): @@ -79,10 +76,10 @@ to trip the max idle threshold. """ - return self._idle_byte_intervals >= HeartbeatChecker._MAX_IDLE_COUNT + return self._idle_byte_intervals > 0 def received(self): """Called when a heartbeat is received""" LOGGER.debug('Received heartbeat frame') self._heartbeat_frames_received += 1 @@ -83,10 +80,10 @@ def received(self): """Called when a heartbeat is received""" LOGGER.debug('Received heartbeat frame') self._heartbeat_frames_received += 1 - def send_heartbeat(self): + def _send_heartbeat(self): """Invoked by a timer to send a heartbeat when we need to. """ @@ -94,9 +91,9 @@ self._send_heartbeat_frame() self._start_send_timer() - def check_heartbeat(self): + def _check_heartbeat(self): """Invoked by a timer to check for broker heartbeats. Checks to see if we've missed any heartbeats and disconnect our connection if it's been idle too long. """ @@ -98,6 +95,12 @@ """Invoked by a timer to check for broker heartbeats. Checks to see if we've missed any heartbeats and disconnect our connection if it's been idle too long. """ + if self._has_received_data: + self._idle_byte_intervals = 0 + else: + # Connection has not received any data, increment the counter + self._idle_byte_intervals += 1 + LOGGER.debug('Received %i heartbeat frames, sent %i, ' @@ -103,4 +106,4 @@ LOGGER.debug('Received %i heartbeat frames, sent %i, ' - 'idle intervals %i, max idle count %i', + 'idle intervals %i', self._heartbeat_frames_received, self._heartbeat_frames_sent, @@ -105,6 +108,5 @@ self._heartbeat_frames_received, self._heartbeat_frames_sent, - self._idle_byte_intervals, - HeartbeatChecker._MAX_IDLE_COUNT) + self._idle_byte_intervals) if self.connection_is_idle: @@ -109,4 +111,5 @@ if self.connection_is_idle: - return self._close_connection() + self._close_connection() + return @@ -112,11 +115,4 @@ - # Connection has not received any data, increment the counter - if not self._has_received_data: - self._idle_byte_intervals += 1 - else: - self._idle_byte_intervals = 0 - - self._update_counters() self._start_check_timer() def stop(self): @@ -134,8 +130,7 @@ """Close the connection with the AMQP Connection-Forced value.""" LOGGER.info('Connection is idle, %i stale byte intervals', self._idle_byte_intervals) - text = HeartbeatChecker._STALE_CONNECTION % ( - self._check_interval * HeartbeatChecker._MAX_IDLE_COUNT) + text = HeartbeatChecker._STALE_CONNECTION % self._timeout # NOTE: this won't achieve the perceived effect of sending # Connection.Close to broker, because the frame will only get buffered @@ -147,8 +142,8 @@ @property def _has_received_data(self): - """Returns True if the connection has received data on the connection. + """Returns True if the connection has received data. :rtype: bool """ @@ -151,8 +146,8 @@ :rtype: bool """ - return not self._bytes_received == self.bytes_received_on_connection + return self._bytes_received != self.bytes_received_on_connection @staticmethod def _new_heartbeat_frame(): @@ -168,6 +163,7 @@ """ LOGGER.debug('Sending heartbeat frame') - self._connection._send_frame(self._new_heartbeat_frame()) + self._connection._send_frame( # pylint: disable=W0212 + self._new_heartbeat_frame()) self._heartbeat_frames_sent += 1 @@ -172,14 +168,6 @@ self._heartbeat_frames_sent += 1 - def _setup_timers(self): - """Use the connection objects _adapter_add_timeout function which is - implemented by the Adapter for calling our send and check functions - after interval seconds. - - """ - self._setup_send_timer() - self._setup_check_timer() - - def _setup_send_timer(self): # pylint: disable=C0111 + def _start_send_timer(self): + """Start a new heartbeat send timer.""" self._send_timer = self._connection.add_timeout( # pylint: disable=W0212 self._send_interval, @@ -184,4 +172,4 @@ self._send_timer = self._connection.add_timeout( # pylint: disable=W0212 self._send_interval, - self.send_heartbeat) + self._send_heartbeat) @@ -187,4 +175,11 @@ - def _setup_check_timer(self): # pylint: disable=C0111 + def _start_check_timer(self): + """Start a new heartbeat check timer.""" + # Note: update counters now to get current values + # at the start of the timeout window. Values will be + # checked against the connection's byte count at the + # end of the window + self._update_counters() + self._check_timer = self._connection.add_timeout( # pylint: disable=W0212 self._check_interval, @@ -189,22 +184,6 @@ self._check_timer = self._connection.add_timeout( # pylint: disable=W0212 self._check_interval, - self.check_heartbeat) - - def _start_send_timer(self): - """If the connection still has this object set for heartbeats, add a - new send timer. - - """ - if self.active: - self._setup_send_timer() - - def _start_check_timer(self): - """If the connection still has this object set for heartbeats, add a - new check timer. - - """ - if self.active: - self._setup_check_timer() + self._check_heartbeat) def _update_counters(self): """Update the internal counters for bytes sent and received and the diff --git a/tests/unit/heartbeat_tests.py b/tests/unit/heartbeat_tests.py index b2f9dea1a755ae3e51c6dd4122f656189e0dc8ed_dGVzdHMvdW5pdC9oZWFydGJlYXRfdGVzdHMucHk=..31d2d01280b96d9e9cdec53b0e07c2a60852fcdd_dGVzdHMvdW5pdC9oZWFydGJlYXRfdGVzdHMucHk= 100644 --- a/tests/unit/heartbeat_tests.py +++ b/tests/unit/heartbeat_tests.py @@ -8,8 +8,7 @@ from pika import connection, frame, heartbeat - class HeartbeatTests(unittest.TestCase): INTERVAL = 60 SEND_INTERVAL = float(INTERVAL) / 2 @@ -12,8 +11,8 @@ class HeartbeatTests(unittest.TestCase): INTERVAL = 60 SEND_INTERVAL = float(INTERVAL) / 2 - CHECK_INTERVAL = (float(INTERVAL) * 3) / 2 + CHECK_INTERVAL = INTERVAL def setUp(self): self.mock_conn = mock.Mock(spec=connection.Connection) @@ -34,6 +33,10 @@ self.assertEqual(self.obj._check_interval, self.CHECK_INTERVAL) def test_constructor_initial_bytes_received(self): - self.assertEqual(self.obj._bytes_received, 0) + # Note: _bytes_received is initialized by calls + # to _start_check_timer which calls _update_counters + # which reads the initial values from the connection + self.assertEqual(self.obj._bytes_received, + self.mock_conn.bytes_received) def test_constructor_initial_bytes_sent(self): @@ -38,6 +41,10 @@ def test_constructor_initial_bytes_sent(self): - self.assertEqual(self.obj._bytes_received, 0) + # Note: _bytes_received is initialized by calls + # to _start_check_timer which calls _update_counters + # which reads the initial values from the connection + self.assertEqual(self.obj._bytes_sent, + self.mock_conn.bytes_sent) def test_constructor_initial_heartbeat_frames_received(self): self.assertEqual(self.obj._heartbeat_frames_received, 0) @@ -48,8 +55,8 @@ def test_constructor_initial_idle_byte_intervals(self): self.assertEqual(self.obj._idle_byte_intervals, 0) - @mock.patch('pika.heartbeat.HeartbeatChecker._setup_send_timer') - def test_constructor_called_setup_send_timer(self, timer): + @mock.patch('pika.heartbeat.HeartbeatChecker._start_send_timer') + def test_constructor_called_start_send_timer(self, timer): heartbeat.HeartbeatChecker(self.mock_conn, self.INTERVAL) timer.assert_called_once_with() @@ -53,8 +60,8 @@ heartbeat.HeartbeatChecker(self.mock_conn, self.INTERVAL) timer.assert_called_once_with() - @mock.patch('pika.heartbeat.HeartbeatChecker._setup_check_timer') - def test_constructor_called_setup_check_timer(self, timer): + @mock.patch('pika.heartbeat.HeartbeatChecker._start_check_timer') + def test_constructor_called_start_check_timer(self, timer): heartbeat.HeartbeatChecker(self.mock_conn, self.INTERVAL) timer.assert_called_once_with() @@ -58,14 +65,6 @@ heartbeat.HeartbeatChecker(self.mock_conn, self.INTERVAL) timer.assert_called_once_with() - def test_active_true(self): - self.mock_conn.heartbeat = self.obj - self.assertTrue(self.obj.active) - - def test_active_false(self): - self.mock_conn.heartbeat = mock.Mock() - self.assertFalse(self.obj.active) - def test_bytes_received_on_connection(self): self.mock_conn.bytes_received = 128 self.assertEqual(self.obj.bytes_received_on_connection, 128) @@ -84,9 +83,9 @@ @mock.patch('pika.heartbeat.HeartbeatChecker._close_connection') def test_send_heartbeat_not_closed(self, close_connection): obj = heartbeat.HeartbeatChecker(self.mock_conn, self.INTERVAL) - obj.send_heartbeat() + obj._send_heartbeat() close_connection.assert_not_called() @mock.patch('pika.heartbeat.HeartbeatChecker._close_connection') def test_check_heartbeat_not_closed(self, close_connection): obj = heartbeat.HeartbeatChecker(self.mock_conn, self.INTERVAL) @@ -88,12 +87,13 @@ close_connection.assert_not_called() @mock.patch('pika.heartbeat.HeartbeatChecker._close_connection') def test_check_heartbeat_not_closed(self, close_connection): obj = heartbeat.HeartbeatChecker(self.mock_conn, self.INTERVAL) - obj.check_heartbeat() + self.mock_conn.bytes_received = 128 + obj._check_heartbeat() close_connection.assert_not_called() @mock.patch('pika.heartbeat.HeartbeatChecker._close_connection') def test_check_heartbeat_missed_bytes(self, close_connection): obj = heartbeat.HeartbeatChecker(self.mock_conn, self.INTERVAL) obj._idle_byte_intervals = self.INTERVAL @@ -94,12 +94,12 @@ close_connection.assert_not_called() @mock.patch('pika.heartbeat.HeartbeatChecker._close_connection') def test_check_heartbeat_missed_bytes(self, close_connection): obj = heartbeat.HeartbeatChecker(self.mock_conn, self.INTERVAL) obj._idle_byte_intervals = self.INTERVAL - obj.check_heartbeat() + obj._check_heartbeat() close_connection.assert_called_once_with() def test_check_heartbeat_increment_no_bytes(self): self.mock_conn.bytes_received = 100 self.obj._bytes_received = 100 @@ -101,11 +101,11 @@ close_connection.assert_called_once_with() def test_check_heartbeat_increment_no_bytes(self): self.mock_conn.bytes_received = 100 self.obj._bytes_received = 100 - self.obj.check_heartbeat() + self.obj._check_heartbeat() self.assertEqual(self.obj._idle_byte_intervals, 1) def test_check_heartbeat_increment_bytes(self): self.mock_conn.bytes_received = 100 self.obj._bytes_received = 128 @@ -107,10 +107,10 @@ self.assertEqual(self.obj._idle_byte_intervals, 1) def test_check_heartbeat_increment_bytes(self): self.mock_conn.bytes_received = 100 self.obj._bytes_received = 128 - self.obj.check_heartbeat() + self.obj._check_heartbeat() self.assertEqual(self.obj._idle_byte_intervals, 0) @mock.patch('pika.heartbeat.HeartbeatChecker._update_counters') def test_check_heartbeat_update_counters(self, update_counters): @@ -113,11 +113,10 @@ self.assertEqual(self.obj._idle_byte_intervals, 0) @mock.patch('pika.heartbeat.HeartbeatChecker._update_counters') def test_check_heartbeat_update_counters(self, update_counters): - obj = heartbeat.HeartbeatChecker(self.mock_conn, self.INTERVAL) - obj.check_heartbeat() + heartbeat.HeartbeatChecker(self.mock_conn, self.INTERVAL) update_counters.assert_called_once_with() @mock.patch('pika.heartbeat.HeartbeatChecker._send_heartbeat_frame') def test_send_heartbeat_sends_heartbeat_frame(self, send_heartbeat_frame): obj = heartbeat.HeartbeatChecker(self.mock_conn, self.INTERVAL) @@ -119,10 +118,10 @@ update_counters.assert_called_once_with() @mock.patch('pika.heartbeat.HeartbeatChecker._send_heartbeat_frame') def test_send_heartbeat_sends_heartbeat_frame(self, send_heartbeat_frame): obj = heartbeat.HeartbeatChecker(self.mock_conn, self.INTERVAL) - obj.send_heartbeat() + obj._send_heartbeat() send_heartbeat_frame.assert_called_once_with() @mock.patch('pika.heartbeat.HeartbeatChecker._start_send_timer') def test_send_heartbeat_start_timer(self, start_send_timer): @@ -125,10 +124,9 @@ send_heartbeat_frame.assert_called_once_with() @mock.patch('pika.heartbeat.HeartbeatChecker._start_send_timer') def test_send_heartbeat_start_timer(self, start_send_timer): - obj = heartbeat.HeartbeatChecker(self.mock_conn, self.INTERVAL) - obj.send_heartbeat() + heartbeat.HeartbeatChecker(self.mock_conn, self.INTERVAL) start_send_timer.assert_called_once_with() @mock.patch('pika.heartbeat.HeartbeatChecker._start_check_timer') def test_check_heartbeat_start_timer(self, start_check_timer): @@ -131,12 +129,11 @@ start_send_timer.assert_called_once_with() @mock.patch('pika.heartbeat.HeartbeatChecker._start_check_timer') def test_check_heartbeat_start_timer(self, start_check_timer): - obj = heartbeat.HeartbeatChecker(self.mock_conn, self.INTERVAL) - obj.check_heartbeat() + heartbeat.HeartbeatChecker(self.mock_conn, self.INTERVAL) start_check_timer.assert_called_once_with() def test_connection_close(self): self.obj._idle_byte_intervals = 3 self.obj._idle_heartbeat_intervals = 4 self.obj._close_connection() @@ -137,11 +134,10 @@ start_check_timer.assert_called_once_with() def test_connection_close(self): self.obj._idle_byte_intervals = 3 self.obj._idle_heartbeat_intervals = 4 self.obj._close_connection() - reason = self.obj._STALE_CONNECTION % ( - self.obj._check_interval * self.obj._MAX_IDLE_COUNT) + reason = self.obj._STALE_CONNECTION % self.obj._timeout self.mock_conn.close.assert_called_once_with( self.obj._CONNECTION_FORCED, reason) self.mock_conn._on_terminate.assert_called_once_with( @@ -170,11 +166,9 @@ self.obj._send_heartbeat_frame() self.assertEqual(self.obj._heartbeat_frames_sent, 1) - def test_setup_send_timer_called(self): - want = [mock.call(self.SEND_INTERVAL, self.obj.send_heartbeat), - mock.call(self.CHECK_INTERVAL, self.obj.check_heartbeat)] - self.obj.send_heartbeat() - self.obj.check_heartbeat() + def test_start_send_timer_called(self): + want = [mock.call(self.SEND_INTERVAL, self.obj._send_heartbeat), + mock.call(self.CHECK_INTERVAL, self.obj._check_heartbeat)] got = self.mock_conn.add_timeout.call_args_list self.assertEqual(got, want) @@ -178,17 +172,7 @@ got = self.mock_conn.add_timeout.call_args_list self.assertEqual(got, want) - @mock.patch('pika.heartbeat.HeartbeatChecker._setup_send_timer') - def test_start_send_timer_not_active(self, setup_send_timer): - self.obj._start_send_timer() - setup_send_timer.assert_not_called() - - @mock.patch('pika.heartbeat.HeartbeatChecker._setup_check_timer') - def test_start_check_timer_not_active(self, setup_check_timer): - self.obj._start_check_timer() - setup_check_timer.assert_not_called() - - @mock.patch('pika.heartbeat.HeartbeatChecker._setup_send_timer') + @mock.patch('pika.heartbeat.HeartbeatChecker._start_send_timer') def test_start_timer_active(self, setup_send_timer): self.mock_conn.heartbeat = self.obj self.obj._start_send_timer()