mirror of https://github.com/apache/activemq.git
[AMQ-8515] FailoverTransport should handle MaxFrameSizeExceededException (#785)
(cherry picked from commit 094dbc89f3
)
This commit is contained in:
parent
bd3175281c
commit
9db5f436cf
|
@ -39,6 +39,7 @@ import java.util.StringTokenizer;
|
||||||
import java.util.concurrent.CopyOnWriteArrayList;
|
import java.util.concurrent.CopyOnWriteArrayList;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
|
import org.apache.activemq.MaxFrameSizeExceededException;
|
||||||
import org.apache.activemq.broker.SslContext;
|
import org.apache.activemq.broker.SslContext;
|
||||||
import org.apache.activemq.command.Command;
|
import org.apache.activemq.command.Command;
|
||||||
import org.apache.activemq.command.ConnectionControl;
|
import org.apache.activemq.command.ConnectionControl;
|
||||||
|
@ -697,6 +698,9 @@ public class FailoverTransport implements CompositeTransport {
|
||||||
}
|
}
|
||||||
|
|
||||||
return;
|
return;
|
||||||
|
} catch (MaxFrameSizeExceededException e) {
|
||||||
|
LOG.debug("MaxFrameSizeExceededException for command: {}", command);
|
||||||
|
throw e;
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.debug("Send oneway attempt: {} failed for command: {}", i, command);
|
LOG.debug("Send oneway attempt: {} failed for command: {}", i, command);
|
||||||
handleTransportFailure(e);
|
handleTransportFailure(e);
|
||||||
|
|
|
@ -56,56 +56,86 @@ public class MaxFrameSizeEnabledTest {
|
||||||
private BrokerService broker;
|
private BrokerService broker;
|
||||||
private final String transportType;
|
private final String transportType;
|
||||||
private final boolean clientSideEnabled;
|
private final boolean clientSideEnabled;
|
||||||
|
private final boolean clientSideFailoverEnabled;
|
||||||
private final boolean serverSideEnabled;
|
private final boolean serverSideEnabled;
|
||||||
|
|
||||||
public MaxFrameSizeEnabledTest(String transportType, boolean clientSideEnabled, boolean serverSideEnabled) {
|
public MaxFrameSizeEnabledTest(String transportType, boolean clientSideEnabled, boolean clientSideFailoverEnabled, boolean serverSideEnabled) {
|
||||||
this.transportType = transportType;
|
this.transportType = transportType;
|
||||||
this.clientSideEnabled = clientSideEnabled;
|
this.clientSideEnabled = clientSideEnabled;
|
||||||
|
this.clientSideFailoverEnabled = clientSideFailoverEnabled;
|
||||||
this.serverSideEnabled = serverSideEnabled;
|
this.serverSideEnabled = serverSideEnabled;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Parameterized.Parameters(name="transportType={0},clientSideEnable={1},serverSideEnabled={2}")
|
@Parameterized.Parameters(name="transportType={0},clientSideEnable={1},clientSideFailoverEnable={2},serverSideEnabled={3}")
|
||||||
public static Collection<Object[]> data() {
|
public static Collection<Object[]> data() {
|
||||||
return Arrays.asList(new Object[][] {
|
return Arrays.asList(new Object[][] {
|
||||||
//Both client and server side max frame check enabled
|
//Both client and server side max frame check enabled
|
||||||
{"tcp", true, true},
|
{"tcp", true, false, true},
|
||||||
{"ssl", true, true},
|
{"tcp", true, true, true},
|
||||||
{"nio", true, true},
|
{"ssl", true, false, true},
|
||||||
{"nio+ssl", true, true},
|
{"ssl", true, true, true},
|
||||||
{"auto", true, true},
|
{"nio", true, false, true},
|
||||||
{"auto+ssl", true, true},
|
{"nio", true, true, true},
|
||||||
{"auto+nio", true, true},
|
{"nio+ssl", true, false, true},
|
||||||
{"auto+nio+ssl", true, true},
|
{"nio+ssl", true, true, true},
|
||||||
|
{"auto", true, false, true},
|
||||||
|
{"auto", true, true, true},
|
||||||
|
{"auto+ssl", true, false, true},
|
||||||
|
{"auto+ssl", true, true, true},
|
||||||
|
{"auto+nio", true, false, true},
|
||||||
|
{"auto+nio", true, true, true},
|
||||||
|
{"auto+nio+ssl", true, false, true},
|
||||||
|
{"auto+nio+ssl", true, true, true},
|
||||||
|
|
||||||
//Client side enabled but server side disabled
|
//Client side enabled but server side disabled
|
||||||
{"tcp", true, false},
|
{"tcp", true, false, false},
|
||||||
{"ssl", true, false},
|
{"tcp", true, true, false},
|
||||||
{"nio", true, false},
|
{"ssl", true, false, false},
|
||||||
{"nio+ssl", true, false},
|
{"ssl", true, true, false},
|
||||||
{"auto", true, false},
|
{"nio", true, false, false},
|
||||||
{"auto+ssl", true, false},
|
{"nio", true, true, false},
|
||||||
{"auto+nio", true, false},
|
{"nio+ssl", true, false, false},
|
||||||
{"auto+nio+ssl", true, false},
|
{"nio+ssl", true, true, false},
|
||||||
|
{"auto", true, false, false},
|
||||||
|
{"auto", true, true, false},
|
||||||
|
{"auto+ssl", true, false, false},
|
||||||
|
{"auto+ssl", true, true, false},
|
||||||
|
{"auto+nio", true, false, false},
|
||||||
|
{"auto+nio", true, true, false},
|
||||||
|
{"auto+nio+ssl", true, false, false},
|
||||||
|
{"auto+nio+ssl", true, true, false},
|
||||||
|
|
||||||
//Client side disabled but server side enabled
|
//Client side disabled but server side enabled
|
||||||
{"tcp", false, true},
|
//
|
||||||
{"ssl", false, true},
|
// AMQ-8515 client=false, failover=true, server=true
|
||||||
{"nio", false, true},
|
// results in infinite retries since broker closes
|
||||||
{"nio+ssl", false, true},
|
// socket, so we don't test that combo
|
||||||
{"auto", false, true},
|
{"tcp", false, false, true},
|
||||||
{"auto+ssl", false, true},
|
{"ssl", false, false, true},
|
||||||
{"auto+nio", false, true},
|
{"nio", false, false, true},
|
||||||
{"auto+nio+ssl", false, true},
|
{"nio+ssl", false, false, true},
|
||||||
|
{"auto", false, false, true},
|
||||||
|
{"auto+ssl", false, false, true},
|
||||||
|
{"auto+nio", false, false, true},
|
||||||
|
{"auto+nio+ssl", false, false, true},
|
||||||
|
|
||||||
//Client side and server side disabled
|
//Client side and server side disabled
|
||||||
{"tcp", false, false},
|
{"tcp", false, false, false},
|
||||||
{"ssl", false, false},
|
{"tcp", false, true, false},
|
||||||
{"nio", false, false},
|
{"ssl", false, false, false},
|
||||||
{"nio+ssl", false, false},
|
{"ssl", false, true, false},
|
||||||
{"auto", false, false},
|
{"nio", false, false, false},
|
||||||
{"auto+ssl", false, false},
|
{"nio", false, true, false},
|
||||||
{"auto+nio", false, false},
|
{"nio+ssl", false, false, false},
|
||||||
{"auto+nio+ssl", false, false},
|
{"nio+ssl", false, true, false},
|
||||||
|
{"auto", false, false, false},
|
||||||
|
{"auto", false, true, false},
|
||||||
|
{"auto+ssl", false, false, false},
|
||||||
|
{"auto+ssl", false, true, false},
|
||||||
|
{"auto+nio", false, false, false},
|
||||||
|
{"auto+nio", false, true, false},
|
||||||
|
{"auto+nio+ssl", false, false, false},
|
||||||
|
{"auto+nio+ssl", false, true, false},
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -145,16 +175,14 @@ public class MaxFrameSizeEnabledTest {
|
||||||
@Test
|
@Test
|
||||||
public void testMaxFrameSize() throws Exception {
|
public void testMaxFrameSize() throws Exception {
|
||||||
broker = createBroker(transportType, transportType + "://localhost:0?wireFormat.maxFrameSize=2048" + getServerParams());
|
broker = createBroker(transportType, transportType + "://localhost:0?wireFormat.maxFrameSize=2048" + getServerParams());
|
||||||
testMaxFrameSize(transportType, (isSsl() ? "ssl" : "tcp") + "://localhost:" + broker.getConnectorByName(transportType).getConnectUri().getPort() +
|
testMaxFrameSize(transportType, getClientUri(broker.getConnectorByName(transportType).getConnectUri().getPort()), false);
|
||||||
getClientParams(), false);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testMaxFrameSizeCompression() throws Exception {
|
public void testMaxFrameSizeCompression() throws Exception {
|
||||||
// Test message body length is 99841 bytes. Compresses to ~ 48000
|
// Test message body length is 99841 bytes. Compresses to ~ 48000
|
||||||
broker = createBroker(transportType, transportType + "://localhost:0?wireFormat.maxFrameSize=60000" + getServerParams());
|
broker = createBroker(transportType, transportType + "://localhost:0?wireFormat.maxFrameSize=60000" + getServerParams());
|
||||||
testMaxFrameSize(transportType, (isSsl() ? "ssl" : "tcp") + "://localhost:" + broker.getConnectorByName(transportType).getConnectUri().getPort()
|
testMaxFrameSize(transportType, getClientUri(broker.getConnectorByName(transportType).getConnectUri().getPort()), true);
|
||||||
+ getClientParams(), true);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void testMaxFrameSize(String transportType, String clientUri, boolean useCompression) throws Exception {
|
protected void testMaxFrameSize(String transportType, String clientUri, boolean useCompression) throws Exception {
|
||||||
|
@ -276,6 +304,10 @@ public class MaxFrameSizeEnabledTest {
|
||||||
return !maxFrameSizeEnabled() || clientSideEnabled || useCompression;
|
return !maxFrameSizeEnabled() || clientSideEnabled || useCompression;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private boolean isFailover() {
|
||||||
|
return clientSideFailoverEnabled;
|
||||||
|
}
|
||||||
|
|
||||||
private boolean isSsl() {
|
private boolean isSsl() {
|
||||||
return transportType.contains("ssl");
|
return transportType.contains("ssl");
|
||||||
}
|
}
|
||||||
|
@ -294,9 +326,25 @@ public class MaxFrameSizeEnabledTest {
|
||||||
|
|
||||||
private String getClientParams() {
|
private String getClientParams() {
|
||||||
if (clientSideEnabled) {
|
if (clientSideEnabled) {
|
||||||
return isSsl() ? "?socket.verifyHostName=false" : "";
|
if(clientSideFailoverEnabled) {
|
||||||
|
return isSsl() ? "?nested.socket.verifyHostName=false" : "";
|
||||||
|
} else {
|
||||||
|
return isSsl() ? "?socket.verifyHostName=false" : "";
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
return isSsl() ? "?socket.verifyHostName=false&wireFormat.maxFrameSizeEnabled=false" : "?wireFormat.maxFrameSizeEnabled=false";
|
if(clientSideFailoverEnabled) {
|
||||||
|
return isSsl() ? "?nested.socket.verifyHostName=false&nested.wireFormat.maxFrameSizeEnabled=false" : "?nested.wireFormat.maxFrameSizeEnabled=false";
|
||||||
|
} else {
|
||||||
|
return isSsl() ? "?socket.verifyHostName=false&wireFormat.maxFrameSizeEnabled=false" : "?wireFormat.maxFrameSizeEnabled=false";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private String getClientUri(int port) {
|
||||||
|
if(isFailover()) {
|
||||||
|
return "failover:(" + (isSsl() ? "ssl" : "tcp") + "://localhost:" + port + ")" + getClientParams() + "&maxReconnectAttempts=1&startupMaxReconnectAttempts=1";
|
||||||
|
} else {
|
||||||
|
return (isSsl() ? "ssl" : "tcp") + "://localhost:" + port + getClientParams();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue