[AMQ-8515] FailoverTransport should handle MaxFrameSizeExceededException (#785)

This commit is contained in:
Matt Pavlovich 2022-02-25 19:46:45 -06:00 committed by GitHub
parent f03ed01d66
commit 094dbc89f3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 92 additions and 40 deletions

View File

@ -39,6 +39,7 @@ import java.util.StringTokenizer;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import org.apache.activemq.MaxFrameSizeExceededException;
import org.apache.activemq.broker.SslContext; import org.apache.activemq.broker.SslContext;
import org.apache.activemq.command.Command; import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConnectionControl; import org.apache.activemq.command.ConnectionControl;
@ -697,6 +698,9 @@ public class FailoverTransport implements CompositeTransport {
} }
return; return;
} catch (MaxFrameSizeExceededException e) {
LOG.debug("MaxFrameSizeExceededException for command: {}", command);
throw e;
} catch (IOException e) { } catch (IOException e) {
LOG.debug("Send oneway attempt: {} failed for command: {}", i, command); LOG.debug("Send oneway attempt: {} failed for command: {}", i, command);
handleTransportFailure(e); handleTransportFailure(e);

View File

@ -56,56 +56,86 @@ public class MaxFrameSizeEnabledTest {
private BrokerService broker; private BrokerService broker;
private final String transportType; private final String transportType;
private final boolean clientSideEnabled; private final boolean clientSideEnabled;
private final boolean clientSideFailoverEnabled;
private final boolean serverSideEnabled; private final boolean serverSideEnabled;
public MaxFrameSizeEnabledTest(String transportType, boolean clientSideEnabled, boolean serverSideEnabled) { public MaxFrameSizeEnabledTest(String transportType, boolean clientSideEnabled, boolean clientSideFailoverEnabled, boolean serverSideEnabled) {
this.transportType = transportType; this.transportType = transportType;
this.clientSideEnabled = clientSideEnabled; this.clientSideEnabled = clientSideEnabled;
this.clientSideFailoverEnabled = clientSideFailoverEnabled;
this.serverSideEnabled = serverSideEnabled; this.serverSideEnabled = serverSideEnabled;
} }
@Parameterized.Parameters(name="transportType={0},clientSideEnable={1},serverSideEnabled={2}") @Parameterized.Parameters(name="transportType={0},clientSideEnable={1},clientSideFailoverEnable={2},serverSideEnabled={3}")
public static Collection<Object[]> data() { public static Collection<Object[]> data() {
return Arrays.asList(new Object[][] { return Arrays.asList(new Object[][] {
//Both client and server side max frame check enabled //Both client and server side max frame check enabled
{"tcp", true, true}, {"tcp", true, false, true},
{"ssl", true, true}, {"tcp", true, true, true},
{"nio", true, true}, {"ssl", true, false, true},
{"nio+ssl", true, true}, {"ssl", true, true, true},
{"auto", true, true}, {"nio", true, false, true},
{"auto+ssl", true, true}, {"nio", true, true, true},
{"auto+nio", true, true}, {"nio+ssl", true, false, true},
{"auto+nio+ssl", true, true}, {"nio+ssl", true, true, true},
{"auto", true, false, true},
{"auto", true, true, true},
{"auto+ssl", true, false, true},
{"auto+ssl", true, true, true},
{"auto+nio", true, false, true},
{"auto+nio", true, true, true},
{"auto+nio+ssl", true, false, true},
{"auto+nio+ssl", true, true, true},
//Client side enabled but server side disabled //Client side enabled but server side disabled
{"tcp", true, false}, {"tcp", true, false, false},
{"ssl", true, false}, {"tcp", true, true, false},
{"nio", true, false}, {"ssl", true, false, false},
{"nio+ssl", true, false}, {"ssl", true, true, false},
{"auto", true, false}, {"nio", true, false, false},
{"auto+ssl", true, false}, {"nio", true, true, false},
{"auto+nio", true, false}, {"nio+ssl", true, false, false},
{"auto+nio+ssl", true, false}, {"nio+ssl", true, true, false},
{"auto", true, false, false},
{"auto", true, true, false},
{"auto+ssl", true, false, false},
{"auto+ssl", true, true, false},
{"auto+nio", true, false, false},
{"auto+nio", true, true, false},
{"auto+nio+ssl", true, false, false},
{"auto+nio+ssl", true, true, false},
//Client side disabled but server side enabled //Client side disabled but server side enabled
{"tcp", false, true}, //
{"ssl", false, true}, // AMQ-8515 client=false, failover=true, server=true
{"nio", false, true}, // results in infinite retries since broker closes
{"nio+ssl", false, true}, // socket, so we don't test that combo
{"auto", false, true}, {"tcp", false, false, true},
{"auto+ssl", false, true}, {"ssl", false, false, true},
{"auto+nio", false, true}, {"nio", false, false, true},
{"auto+nio+ssl", false, true}, {"nio+ssl", false, false, true},
{"auto", false, false, true},
{"auto+ssl", false, false, true},
{"auto+nio", false, false, true},
{"auto+nio+ssl", false, false, true},
//Client side and server side disabled //Client side and server side disabled
{"tcp", false, false}, {"tcp", false, false, false},
{"ssl", false, false}, {"tcp", false, true, false},
{"nio", false, false}, {"ssl", false, false, false},
{"nio+ssl", false, false}, {"ssl", false, true, false},
{"auto", false, false}, {"nio", false, false, false},
{"auto+ssl", false, false}, {"nio", false, true, false},
{"auto+nio", false, false}, {"nio+ssl", false, false, false},
{"auto+nio+ssl", false, false}, {"nio+ssl", false, true, false},
{"auto", false, false, false},
{"auto", false, true, false},
{"auto+ssl", false, false, false},
{"auto+ssl", false, true, false},
{"auto+nio", false, false, false},
{"auto+nio", false, true, false},
{"auto+nio+ssl", false, false, false},
{"auto+nio+ssl", false, true, false},
}); });
} }
@ -145,16 +175,14 @@ public class MaxFrameSizeEnabledTest {
@Test @Test
public void testMaxFrameSize() throws Exception { public void testMaxFrameSize() throws Exception {
broker = createBroker(transportType, transportType + "://localhost:0?wireFormat.maxFrameSize=2048" + getServerParams()); broker = createBroker(transportType, transportType + "://localhost:0?wireFormat.maxFrameSize=2048" + getServerParams());
testMaxFrameSize(transportType, (isSsl() ? "ssl" : "tcp") + "://localhost:" + broker.getConnectorByName(transportType).getConnectUri().getPort() + testMaxFrameSize(transportType, getClientUri(broker.getConnectorByName(transportType).getConnectUri().getPort()), false);
getClientParams(), false);
} }
@Test @Test
public void testMaxFrameSizeCompression() throws Exception { public void testMaxFrameSizeCompression() throws Exception {
// Test message body length is 99841 bytes. Compresses to ~ 48000 // Test message body length is 99841 bytes. Compresses to ~ 48000
broker = createBroker(transportType, transportType + "://localhost:0?wireFormat.maxFrameSize=60000" + getServerParams()); broker = createBroker(transportType, transportType + "://localhost:0?wireFormat.maxFrameSize=60000" + getServerParams());
testMaxFrameSize(transportType, (isSsl() ? "ssl" : "tcp") + "://localhost:" + broker.getConnectorByName(transportType).getConnectUri().getPort() testMaxFrameSize(transportType, getClientUri(broker.getConnectorByName(transportType).getConnectUri().getPort()), true);
+ getClientParams(), true);
} }
protected void testMaxFrameSize(String transportType, String clientUri, boolean useCompression) throws Exception { protected void testMaxFrameSize(String transportType, String clientUri, boolean useCompression) throws Exception {
@ -276,6 +304,10 @@ public class MaxFrameSizeEnabledTest {
return !maxFrameSizeEnabled() || clientSideEnabled || useCompression; return !maxFrameSizeEnabled() || clientSideEnabled || useCompression;
} }
private boolean isFailover() {
return clientSideFailoverEnabled;
}
private boolean isSsl() { private boolean isSsl() {
return transportType.contains("ssl"); return transportType.contains("ssl");
} }
@ -294,9 +326,25 @@ public class MaxFrameSizeEnabledTest {
private String getClientParams() { private String getClientParams() {
if (clientSideEnabled) { if (clientSideEnabled) {
return isSsl() ? "?socket.verifyHostName=false" : ""; if(clientSideFailoverEnabled) {
return isSsl() ? "?nested.socket.verifyHostName=false" : "";
} else {
return isSsl() ? "?socket.verifyHostName=false" : "";
}
} else { } else {
return isSsl() ? "?socket.verifyHostName=false&wireFormat.maxFrameSizeEnabled=false" : "?wireFormat.maxFrameSizeEnabled=false"; if(clientSideFailoverEnabled) {
return isSsl() ? "?nested.socket.verifyHostName=false&nested.wireFormat.maxFrameSizeEnabled=false" : "?nested.wireFormat.maxFrameSizeEnabled=false";
} else {
return isSsl() ? "?socket.verifyHostName=false&wireFormat.maxFrameSizeEnabled=false" : "?wireFormat.maxFrameSizeEnabled=false";
}
}
}
private String getClientUri(int port) {
if(isFailover()) {
return "failover:(" + (isSsl() ? "ssl" : "tcp") + "://localhost:" + port + ")" + getClientParams() + "&maxReconnectAttempts=1&startupMaxReconnectAttempts=1";
} else {
return (isSsl() ? "ssl" : "tcp") + "://localhost:" + port + getClientParams();
} }
} }
} }