From 094dbc89f3630691f1bd66d0d07fe879842fa51b Mon Sep 17 00:00:00 2001 From: Matt Pavlovich Date: Fri, 25 Feb 2022 19:46:45 -0600 Subject: [PATCH] [AMQ-8515] FailoverTransport should handle MaxFrameSizeExceededException (#785) --- .../transport/failover/FailoverTransport.java | 4 + .../transport/MaxFrameSizeEnabledTest.java | 128 ++++++++++++------ 2 files changed, 92 insertions(+), 40 deletions(-) diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java b/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java index dcb4cb8fdf..76062e4178 100644 --- a/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java +++ b/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java @@ -39,6 +39,7 @@ import java.util.StringTokenizer; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicReference; +import org.apache.activemq.MaxFrameSizeExceededException; import org.apache.activemq.broker.SslContext; import org.apache.activemq.command.Command; import org.apache.activemq.command.ConnectionControl; @@ -697,6 +698,9 @@ public class FailoverTransport implements CompositeTransport { } return; + } catch (MaxFrameSizeExceededException e) { + LOG.debug("MaxFrameSizeExceededException for command: {}", command); + throw e; } catch (IOException e) { LOG.debug("Send oneway attempt: {} failed for command: {}", i, command); handleTransportFailure(e); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/MaxFrameSizeEnabledTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/MaxFrameSizeEnabledTest.java index d2ce82e17b..8f98b8fc58 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/MaxFrameSizeEnabledTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/MaxFrameSizeEnabledTest.java @@ -56,56 +56,86 @@ public class MaxFrameSizeEnabledTest { private BrokerService broker; private final String transportType; private final boolean clientSideEnabled; + private final boolean clientSideFailoverEnabled; private final boolean serverSideEnabled; - public MaxFrameSizeEnabledTest(String transportType, boolean clientSideEnabled, boolean serverSideEnabled) { + public MaxFrameSizeEnabledTest(String transportType, boolean clientSideEnabled, boolean clientSideFailoverEnabled, boolean serverSideEnabled) { this.transportType = transportType; this.clientSideEnabled = clientSideEnabled; + this.clientSideFailoverEnabled = clientSideFailoverEnabled; this.serverSideEnabled = serverSideEnabled; } - @Parameterized.Parameters(name="transportType={0},clientSideEnable={1},serverSideEnabled={2}") + @Parameterized.Parameters(name="transportType={0},clientSideEnable={1},clientSideFailoverEnable={2},serverSideEnabled={3}") public static Collection data() { return Arrays.asList(new Object[][] { //Both client and server side max frame check enabled - {"tcp", true, true}, - {"ssl", true, true}, - {"nio", true, true}, - {"nio+ssl", true, true}, - {"auto", true, true}, - {"auto+ssl", true, true}, - {"auto+nio", true, true}, - {"auto+nio+ssl", true, true}, + {"tcp", true, false, true}, + {"tcp", true, true, true}, + {"ssl", true, false, true}, + {"ssl", true, true, true}, + {"nio", true, false, true}, + {"nio", true, true, true}, + {"nio+ssl", true, false, true}, + {"nio+ssl", true, true, true}, + {"auto", true, false, true}, + {"auto", true, true, true}, + {"auto+ssl", true, false, true}, + {"auto+ssl", true, true, true}, + {"auto+nio", true, false, true}, + {"auto+nio", true, true, true}, + {"auto+nio+ssl", true, false, true}, + {"auto+nio+ssl", true, true, true}, //Client side enabled but server side disabled - {"tcp", true, false}, - {"ssl", true, false}, - {"nio", true, false}, - {"nio+ssl", true, false}, - {"auto", true, false}, - {"auto+ssl", true, false}, - {"auto+nio", true, false}, - {"auto+nio+ssl", true, false}, + {"tcp", true, false, false}, + {"tcp", true, true, false}, + {"ssl", true, false, false}, + {"ssl", true, true, false}, + {"nio", true, false, false}, + {"nio", true, true, false}, + {"nio+ssl", true, false, false}, + {"nio+ssl", true, true, false}, + {"auto", true, false, false}, + {"auto", true, true, false}, + {"auto+ssl", true, false, false}, + {"auto+ssl", true, true, false}, + {"auto+nio", true, false, false}, + {"auto+nio", true, true, false}, + {"auto+nio+ssl", true, false, false}, + {"auto+nio+ssl", true, true, false}, //Client side disabled but server side enabled - {"tcp", false, true}, - {"ssl", false, true}, - {"nio", false, true}, - {"nio+ssl", false, true}, - {"auto", false, true}, - {"auto+ssl", false, true}, - {"auto+nio", false, true}, - {"auto+nio+ssl", false, true}, + // + // AMQ-8515 client=false, failover=true, server=true + // results in infinite retries since broker closes + // socket, so we don't test that combo + {"tcp", false, false, true}, + {"ssl", false, false, true}, + {"nio", false, false, true}, + {"nio+ssl", false, false, true}, + {"auto", false, false, true}, + {"auto+ssl", false, false, true}, + {"auto+nio", false, false, true}, + {"auto+nio+ssl", false, false, true}, //Client side and server side disabled - {"tcp", false, false}, - {"ssl", false, false}, - {"nio", false, false}, - {"nio+ssl", false, false}, - {"auto", false, false}, - {"auto+ssl", false, false}, - {"auto+nio", false, false}, - {"auto+nio+ssl", false, false}, + {"tcp", false, false, false}, + {"tcp", false, true, false}, + {"ssl", false, false, false}, + {"ssl", false, true, false}, + {"nio", false, false, false}, + {"nio", false, true, false}, + {"nio+ssl", false, false, false}, + {"nio+ssl", false, true, false}, + {"auto", false, false, false}, + {"auto", false, true, false}, + {"auto+ssl", false, false, false}, + {"auto+ssl", false, true, false}, + {"auto+nio", false, false, false}, + {"auto+nio", false, true, false}, + {"auto+nio+ssl", false, false, false}, + {"auto+nio+ssl", false, true, false}, }); } @@ -145,16 +175,14 @@ public class MaxFrameSizeEnabledTest { @Test public void testMaxFrameSize() throws Exception { broker = createBroker(transportType, transportType + "://localhost:0?wireFormat.maxFrameSize=2048" + getServerParams()); - testMaxFrameSize(transportType, (isSsl() ? "ssl" : "tcp") + "://localhost:" + broker.getConnectorByName(transportType).getConnectUri().getPort() + - getClientParams(), false); + testMaxFrameSize(transportType, getClientUri(broker.getConnectorByName(transportType).getConnectUri().getPort()), false); } @Test public void testMaxFrameSizeCompression() throws Exception { // Test message body length is 99841 bytes. Compresses to ~ 48000 broker = createBroker(transportType, transportType + "://localhost:0?wireFormat.maxFrameSize=60000" + getServerParams()); - testMaxFrameSize(transportType, (isSsl() ? "ssl" : "tcp") + "://localhost:" + broker.getConnectorByName(transportType).getConnectUri().getPort() - + getClientParams(), true); + testMaxFrameSize(transportType, getClientUri(broker.getConnectorByName(transportType).getConnectUri().getPort()), true); } protected void testMaxFrameSize(String transportType, String clientUri, boolean useCompression) throws Exception { @@ -276,6 +304,10 @@ public class MaxFrameSizeEnabledTest { return !maxFrameSizeEnabled() || clientSideEnabled || useCompression; } + private boolean isFailover() { + return clientSideFailoverEnabled; + } + private boolean isSsl() { return transportType.contains("ssl"); } @@ -294,9 +326,25 @@ public class MaxFrameSizeEnabledTest { private String getClientParams() { if (clientSideEnabled) { - return isSsl() ? "?socket.verifyHostName=false" : ""; + if(clientSideFailoverEnabled) { + return isSsl() ? "?nested.socket.verifyHostName=false" : ""; + } else { + return isSsl() ? "?socket.verifyHostName=false" : ""; + } } else { - return isSsl() ? "?socket.verifyHostName=false&wireFormat.maxFrameSizeEnabled=false" : "?wireFormat.maxFrameSizeEnabled=false"; + if(clientSideFailoverEnabled) { + return isSsl() ? "?nested.socket.verifyHostName=false&nested.wireFormat.maxFrameSizeEnabled=false" : "?nested.wireFormat.maxFrameSizeEnabled=false"; + } else { + return isSsl() ? "?socket.verifyHostName=false&wireFormat.maxFrameSizeEnabled=false" : "?wireFormat.maxFrameSizeEnabled=false"; + } + } + } + + private String getClientUri(int port) { + if(isFailover()) { + return "failover:(" + (isSsl() ? "ssl" : "tcp") + "://localhost:" + port + ")" + getClientParams() + "&maxReconnectAttempts=1&startupMaxReconnectAttempts=1"; + } else { + return (isSsl() ? "ssl" : "tcp") + "://localhost:" + port + getClientParams(); } } }