AMQ-8183 - prevent infinite loop when maxFrameSize is exceeded

This makes sure the nio transport thread properly terminates if
maxFrameSize is exceeded with OpenWire to prevent an infinite loop that
uses up all the cpu
This commit is contained in:
Christopher L. Shannon (cshannon) 2021-03-11 09:59:36 -05:00
parent fc5cd100fe
commit 944ca6c7e1
3 changed files with 163 additions and 0 deletions

View File

@ -257,6 +257,11 @@ public class NIOSSLTransport extends NIOTransport {
plain.position(plain.limit());
while (true) {
//If the transport was already stopped then break
if (this.isStopped()) {
return;
}
if (!plain.hasRemaining()) {
int readCount = secureRead(plain);

View File

@ -108,6 +108,10 @@ public class NIOTransport extends TcpTransport {
protected void serviceRead() {
try {
while (true) {
//If the transport was already stopped then break
if (this.isStopped()) {
return;
}
int readSize = readFromBuffer();
if (readSize == -1) {

View File

@ -0,0 +1,154 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.nio;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.BeforeClass;
import org.junit.Test;
import javax.jms.Connection;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ThreadPoolExecutor;
import static junit.framework.TestCase.assertTrue;
//Test for AMQ-8183
public class NIOMaxFrameSizeCleanupTest {
public static final String KEYSTORE_TYPE = "jks";
public static final String PASSWORD = "password";
public static final String SERVER_KEYSTORE = "src/test/resources/org/apache/activemq/security/broker1.ks";
public static final String TRUST_KEYSTORE = "src/test/resources/org/apache/activemq/security/broker1.ks";
private BrokerService broker;
@BeforeClass
public static void beforeClass() throws Exception {
System.setProperty("javax.net.ssl.trustStore", TRUST_KEYSTORE);
System.setProperty("javax.net.ssl.trustStorePassword", PASSWORD);
System.setProperty("javax.net.ssl.trustStoreType", KEYSTORE_TYPE);
System.setProperty("javax.net.ssl.keyStore", SERVER_KEYSTORE);
System.setProperty("javax.net.ssl.keyStoreType", KEYSTORE_TYPE);
System.setProperty("javax.net.ssl.keyStorePassword", PASSWORD);
}
@After
public void after() throws Exception {
stopBroker(broker);
}
public BrokerService createBroker(String connectorName, String connectorString) throws Exception {
BrokerService broker = new BrokerService();
broker.setPersistent(false);
broker.setUseJmx(false);
TransportConnector connector = broker.addConnector(connectorString);
connector.setName(connectorName);
broker.start();
broker.waitUntilStarted();
return broker;
}
public void stopBroker(BrokerService broker) throws Exception {
if (broker != null) {
broker.stop();
broker.waitUntilStopped();
}
}
@Test
public void testMaxFrameSizeCleanupNio() throws Exception {
String transportType = "nio";
broker = createBroker(transportType, transportType + "://localhost:0?wireFormat.maxFrameSize=1024");
testMaxFrameSizeCleanup(transportType, "tcp://localhost:" + broker.getConnectorByName(transportType).getConnectUri().getPort());
}
@Test
public void testMaxFrameSizeCleanupAutoNio() throws Exception {
String transportType = "auto+nio";
broker = createBroker(transportType, transportType + "://localhost:0?wireFormat.maxFrameSize=1024");
testMaxFrameSizeCleanup(transportType, "tcp://localhost:" + broker.getConnectorByName(transportType).getConnectUri().getPort());
}
@Test
public void testMaxFrameSizeCleanupNioSsl() throws Exception {
String transportType = "nio+ssl";
broker = createBroker(transportType, transportType +
"://localhost:0?transport.needClientAuth=true&wireFormat.maxFrameSize=1024");
testMaxFrameSizeCleanup(transportType, "ssl://localhost:" + broker.getConnectorByName(transportType).getConnectUri().getPort()
+ "?socket.verifyHostName=false");
}
@Test
public void testMaxFrameSizeCleanupAutoNioSsl() throws Exception {
String transportType = "auto+nio+ssl";
broker = createBroker(transportType, transportType +
"://localhost:0?transport.needClientAuth=true&wireFormat.maxFrameSize=1024");
testMaxFrameSizeCleanup(transportType, "ssl://localhost:" + broker.getConnectorByName(transportType).getConnectUri().getPort()
+ "?socket.verifyHostName=false");
}
protected void testMaxFrameSizeCleanup(String transportType, String clientUri) throws Exception {
final List<Connection> connections = new ArrayList<>();
final ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(clientUri);
for (int i = 0; i < 10; i++) {
Connection connection = factory.createConnection();
connection.start();
connections.add(connection);
}
//Generate a body that is too large
StringBuffer body = new StringBuffer();
Random r = new Random();
for (int i = 0; i < 10000; i++) {
body.append(r.nextInt());
}
//Try sending 10 large messages rapidly in a loop to make sure all
//nio threads are properly terminated
for (int i = 0; i < 10; i++) {
boolean exception = false;
try {
Connection connection = connections.get(i);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue destination = session.createQueue("TEST");
MessageProducer producer = session.createProducer(destination);
producer.send(session.createTextMessage(body.toString()));
} catch (Exception e) {
//expected
exception = true;
}
assertTrue("Should have gotten a transport exception", exception);
}
final ThreadPoolExecutor e = (ThreadPoolExecutor) SelectorManager.getInstance().getSelectorExecutor();
//Verify that all connections are removed
assertTrue(Wait.waitFor(() -> broker.getConnectorByName(transportType).getConnections().size() == 0,
5000, 500));
//Verify no more active transport connections in the selector thread pool. This was broken
//due to AMQ-7106 before the fix in AMQ-8183
assertTrue(Wait.waitFor(() -> e.getActiveCount() == 0, 5000, 500));
}
}