mirror of https://github.com/apache/activemq.git
[AMQ-8412] Update client-side maxFrameSize handling to be more symetrical with server-side
- Handle in the OpenWireFormat class - Add unit tests to confirm - Verify compression is accounted for - Verify the ability to disable using wireFormat.maxFrameSizeEnabled=false - [cshannon] Reworked max frame size test case to add in all transports and all client/server cases
This commit is contained in:
parent
c36ca15d5d
commit
67a2edbf0d
|
@ -1448,12 +1448,6 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
|
||||||
if (isClosed()) {
|
if (isClosed()) {
|
||||||
throw new ConnectionClosedException();
|
throw new ConnectionClosedException();
|
||||||
} else {
|
} else {
|
||||||
if(command.isMessage()) {
|
|
||||||
int tmpMsgSize = Message.class.cast(command).getSize();
|
|
||||||
if(maxFrameSize.get() < tmpMsgSize) {
|
|
||||||
throw new JMSException("Message size: " + tmpMsgSize + " exceeds maximum allowed by broker: " + maxFrameSize.get(), "41300");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
try {
|
try {
|
||||||
Response response = (Response)(timeout > 0
|
Response response = (Response)(timeout > 0
|
||||||
? this.transport.request(command, timeout)
|
? this.transport.request(command, timeout)
|
||||||
|
|
|
@ -0,0 +1,31 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
/**
|
||||||
|
* An exception thrown when max frame size is exceeded.
|
||||||
|
*
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public class MaxFrameSizeExceededException extends IOException {
|
||||||
|
private static final long serialVersionUID = -7681404582227153308L;
|
||||||
|
|
||||||
|
public MaxFrameSizeExceededException(String message) {
|
||||||
|
super(message);
|
||||||
|
}
|
||||||
|
}
|
|
@ -55,6 +55,7 @@ public final class OpenWireFormat implements WireFormat {
|
||||||
private boolean cacheEnabled;
|
private boolean cacheEnabled;
|
||||||
private boolean tightEncodingEnabled;
|
private boolean tightEncodingEnabled;
|
||||||
private boolean sizePrefixDisabled;
|
private boolean sizePrefixDisabled;
|
||||||
|
private boolean maxFrameSizeEnabled = true;
|
||||||
private long maxFrameSize = DEFAULT_MAX_FRAME_SIZE;
|
private long maxFrameSize = DEFAULT_MAX_FRAME_SIZE;
|
||||||
|
|
||||||
// The following fields are used for value caching
|
// The following fields are used for value caching
|
||||||
|
@ -80,7 +81,8 @@ public final class OpenWireFormat implements WireFormat {
|
||||||
return version ^ (cacheEnabled ? 0x10000000 : 0x20000000)
|
return version ^ (cacheEnabled ? 0x10000000 : 0x20000000)
|
||||||
^ (stackTraceEnabled ? 0x01000000 : 0x02000000)
|
^ (stackTraceEnabled ? 0x01000000 : 0x02000000)
|
||||||
^ (tightEncodingEnabled ? 0x00100000 : 0x00200000)
|
^ (tightEncodingEnabled ? 0x00100000 : 0x00200000)
|
||||||
^ (sizePrefixDisabled ? 0x00010000 : 0x00020000);
|
^ (sizePrefixDisabled ? 0x00010000 : 0x00020000)
|
||||||
|
^ (maxFrameSizeEnabled ? 0x00010000 : 0x00020000);
|
||||||
}
|
}
|
||||||
|
|
||||||
public OpenWireFormat copy() {
|
public OpenWireFormat copy() {
|
||||||
|
@ -91,6 +93,7 @@ public final class OpenWireFormat implements WireFormat {
|
||||||
answer.tightEncodingEnabled = tightEncodingEnabled;
|
answer.tightEncodingEnabled = tightEncodingEnabled;
|
||||||
answer.sizePrefixDisabled = sizePrefixDisabled;
|
answer.sizePrefixDisabled = sizePrefixDisabled;
|
||||||
answer.preferedWireFormatInfo = preferedWireFormatInfo;
|
answer.preferedWireFormatInfo = preferedWireFormatInfo;
|
||||||
|
answer.maxFrameSizeEnabled = maxFrameSizeEnabled;
|
||||||
return answer;
|
return answer;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -102,14 +105,15 @@ public final class OpenWireFormat implements WireFormat {
|
||||||
OpenWireFormat o = (OpenWireFormat)object;
|
OpenWireFormat o = (OpenWireFormat)object;
|
||||||
return o.stackTraceEnabled == stackTraceEnabled && o.cacheEnabled == cacheEnabled
|
return o.stackTraceEnabled == stackTraceEnabled && o.cacheEnabled == cacheEnabled
|
||||||
&& o.version == version && o.tightEncodingEnabled == tightEncodingEnabled
|
&& o.version == version && o.tightEncodingEnabled == tightEncodingEnabled
|
||||||
&& o.sizePrefixDisabled == sizePrefixDisabled;
|
&& o.sizePrefixDisabled == sizePrefixDisabled
|
||||||
|
&& o.maxFrameSizeEnabled == maxFrameSizeEnabled;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "OpenWireFormat{version=" + version + ", cacheEnabled=" + cacheEnabled + ", stackTraceEnabled=" + stackTraceEnabled + ", tightEncodingEnabled="
|
return "OpenWireFormat{version=" + version + ", cacheEnabled=" + cacheEnabled + ", stackTraceEnabled=" + stackTraceEnabled + ", tightEncodingEnabled="
|
||||||
+ tightEncodingEnabled + ", sizePrefixDisabled=" + sizePrefixDisabled + ", maxFrameSize=" + maxFrameSize + "}";
|
+ tightEncodingEnabled + ", sizePrefixDisabled=" + sizePrefixDisabled + ", maxFrameSize=" + maxFrameSize + ", maxFrameSizeEnabled=" + maxFrameSizeEnabled + "}";
|
||||||
// return "OpenWireFormat{id="+id+",
|
// return "OpenWireFormat{id="+id+",
|
||||||
// tightEncodingEnabled="+tightEncodingEnabled+"}";
|
// tightEncodingEnabled="+tightEncodingEnabled+"}";
|
||||||
}
|
}
|
||||||
|
@ -142,6 +146,10 @@ public final class OpenWireFormat implements WireFormat {
|
||||||
size += dsm.tightMarshal1(this, c, bs);
|
size += dsm.tightMarshal1(this, c, bs);
|
||||||
size += bs.marshalledSize();
|
size += bs.marshalledSize();
|
||||||
|
|
||||||
|
if(maxFrameSizeEnabled && size > maxFrameSize) {
|
||||||
|
throw IOExceptionSupport.createFrameSizeException(size, maxFrameSize);
|
||||||
|
}
|
||||||
|
|
||||||
bytesOut.restart(size);
|
bytesOut.restart(size);
|
||||||
if (!sizePrefixDisabled) {
|
if (!sizePrefixDisabled) {
|
||||||
bytesOut.writeInt(size);
|
bytesOut.writeInt(size);
|
||||||
|
@ -193,7 +201,7 @@ public final class OpenWireFormat implements WireFormat {
|
||||||
// size");
|
// size");
|
||||||
}
|
}
|
||||||
|
|
||||||
if (size > maxFrameSize) {
|
if (maxFrameSizeEnabled && size > maxFrameSize) {
|
||||||
throw IOExceptionSupport.createFrameSizeException(size, maxFrameSize);
|
throw IOExceptionSupport.createFrameSizeException(size, maxFrameSize);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -226,6 +234,10 @@ public final class OpenWireFormat implements WireFormat {
|
||||||
size += dsm.tightMarshal1(this, c, bs);
|
size += dsm.tightMarshal1(this, c, bs);
|
||||||
size += bs.marshalledSize();
|
size += bs.marshalledSize();
|
||||||
|
|
||||||
|
if(maxFrameSizeEnabled && size > maxFrameSize) {
|
||||||
|
throw IOExceptionSupport.createFrameSizeException(size, maxFrameSize);
|
||||||
|
}
|
||||||
|
|
||||||
if (!sizePrefixDisabled) {
|
if (!sizePrefixDisabled) {
|
||||||
dataOut.writeInt(size);
|
dataOut.writeInt(size);
|
||||||
}
|
}
|
||||||
|
@ -266,7 +278,7 @@ public final class OpenWireFormat implements WireFormat {
|
||||||
DataInput dataIn = dis;
|
DataInput dataIn = dis;
|
||||||
if (!sizePrefixDisabled) {
|
if (!sizePrefixDisabled) {
|
||||||
int size = dis.readInt();
|
int size = dis.readInt();
|
||||||
if (size > maxFrameSize) {
|
if (maxFrameSizeEnabled && size > maxFrameSize) {
|
||||||
throw IOExceptionSupport.createFrameSizeException(size, maxFrameSize);
|
throw IOExceptionSupport.createFrameSizeException(size, maxFrameSize);
|
||||||
}
|
}
|
||||||
// int size = dis.readInt();
|
// int size = dis.readInt();
|
||||||
|
@ -605,6 +617,14 @@ public final class OpenWireFormat implements WireFormat {
|
||||||
this.maxFrameSize = maxFrameSize;
|
this.maxFrameSize = maxFrameSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean isMaxFrameSizeEnabled() {
|
||||||
|
return maxFrameSizeEnabled;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setMaxFrameSizeEnabled(boolean maxFrameSizeEnabled) {
|
||||||
|
this.maxFrameSizeEnabled = maxFrameSizeEnabled;
|
||||||
|
}
|
||||||
|
|
||||||
public void renegotiateWireFormat(WireFormatInfo info) throws IOException {
|
public void renegotiateWireFormat(WireFormatInfo info) throws IOException {
|
||||||
|
|
||||||
if (preferedWireFormatInfo == null) {
|
if (preferedWireFormatInfo == null) {
|
||||||
|
|
|
@ -41,6 +41,7 @@ public class OpenWireFormatFactory implements WireFormatFactory {
|
||||||
private long maxInactivityDurationInitalDelay = 10*1000;
|
private long maxInactivityDurationInitalDelay = 10*1000;
|
||||||
private int cacheSize = 1024;
|
private int cacheSize = 1024;
|
||||||
private long maxFrameSize = OpenWireFormat.DEFAULT_MAX_FRAME_SIZE;
|
private long maxFrameSize = OpenWireFormat.DEFAULT_MAX_FRAME_SIZE;
|
||||||
|
private boolean maxFrameSizeEnabled = true;
|
||||||
private String host=null;
|
private String host=null;
|
||||||
private String providerName = ActiveMQConnectionMetaData.PROVIDER_NAME;
|
private String providerName = ActiveMQConnectionMetaData.PROVIDER_NAME;
|
||||||
private String providerVersion = ActiveMQConnectionMetaData.PROVIDER_VERSION;
|
private String providerVersion = ActiveMQConnectionMetaData.PROVIDER_VERSION;
|
||||||
|
@ -80,6 +81,7 @@ public class OpenWireFormatFactory implements WireFormatFactory {
|
||||||
OpenWireFormat f = new OpenWireFormat(version);
|
OpenWireFormat f = new OpenWireFormat(version);
|
||||||
f.setMaxFrameSize(maxFrameSize);
|
f.setMaxFrameSize(maxFrameSize);
|
||||||
f.setPreferedWireFormatInfo(info);
|
f.setPreferedWireFormatInfo(info);
|
||||||
|
f.setMaxFrameSizeEnabled(maxFrameSizeEnabled);
|
||||||
return f;
|
return f;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -203,4 +205,12 @@ public class OpenWireFormatFactory implements WireFormatFactory {
|
||||||
public void setIncludePlatformDetails(boolean includePlatformDetails) {
|
public void setIncludePlatformDetails(boolean includePlatformDetails) {
|
||||||
this.includePlatformDetails = includePlatformDetails;
|
this.includePlatformDetails = includePlatformDetails;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setMaxFrameSizeEnabled(boolean maxFrameSizeEnabled) {
|
||||||
|
this.maxFrameSizeEnabled = maxFrameSizeEnabled;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isMaxFrameSizeEnabled() {
|
||||||
|
return this.maxFrameSizeEnabled;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -40,6 +40,7 @@ import javax.net.ssl.SSLParameters;
|
||||||
import javax.net.ssl.SSLPeerUnverifiedException;
|
import javax.net.ssl.SSLPeerUnverifiedException;
|
||||||
import javax.net.ssl.SSLSession;
|
import javax.net.ssl.SSLSession;
|
||||||
|
|
||||||
|
import org.apache.activemq.MaxFrameSizeExceededException;
|
||||||
import org.apache.activemq.command.ConnectionInfo;
|
import org.apache.activemq.command.ConnectionInfo;
|
||||||
import org.apache.activemq.openwire.OpenWireFormat;
|
import org.apache.activemq.openwire.OpenWireFormat;
|
||||||
import org.apache.activemq.thread.TaskRunnerFactory;
|
import org.apache.activemq.thread.TaskRunnerFactory;
|
||||||
|
@ -335,9 +336,11 @@ public class NIOSSLTransport extends NIOTransport {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (wireFormat instanceof OpenWireFormat) {
|
if (wireFormat instanceof OpenWireFormat) {
|
||||||
long maxFrameSize = ((OpenWireFormat) wireFormat).getMaxFrameSize();
|
OpenWireFormat openWireFormat = (OpenWireFormat) wireFormat;
|
||||||
if (nextFrameSize > maxFrameSize) {
|
long maxFrameSize = openWireFormat.getMaxFrameSize();
|
||||||
throw new IOException("Frame size of " + (nextFrameSize / (1024 * 1024)) +
|
|
||||||
|
if (openWireFormat.isMaxFrameSizeEnabled() && nextFrameSize > maxFrameSize) {
|
||||||
|
throw new MaxFrameSizeExceededException("Frame size of " + (nextFrameSize / (1024 * 1024)) +
|
||||||
" MB larger than max allowed " + (maxFrameSize / (1024 * 1024)) + " MB");
|
" MB larger than max allowed " + (maxFrameSize / (1024 * 1024)) + " MB");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -29,6 +29,7 @@ import java.nio.channels.SocketChannel;
|
||||||
|
|
||||||
import javax.net.SocketFactory;
|
import javax.net.SocketFactory;
|
||||||
|
|
||||||
|
import org.apache.activemq.MaxFrameSizeExceededException;
|
||||||
import org.apache.activemq.openwire.OpenWireFormat;
|
import org.apache.activemq.openwire.OpenWireFormat;
|
||||||
import org.apache.activemq.transport.Transport;
|
import org.apache.activemq.transport.Transport;
|
||||||
import org.apache.activemq.transport.tcp.TcpTransport;
|
import org.apache.activemq.transport.tcp.TcpTransport;
|
||||||
|
@ -139,9 +140,11 @@ public class NIOTransport extends TcpTransport {
|
||||||
nextFrameSize = inputBuffer.getInt() + 4;
|
nextFrameSize = inputBuffer.getInt() + 4;
|
||||||
|
|
||||||
if (wireFormat instanceof OpenWireFormat) {
|
if (wireFormat instanceof OpenWireFormat) {
|
||||||
long maxFrameSize = ((OpenWireFormat)wireFormat).getMaxFrameSize();
|
OpenWireFormat openWireFormat = (OpenWireFormat)wireFormat;
|
||||||
if (nextFrameSize > maxFrameSize) {
|
long maxFrameSize = openWireFormat.getMaxFrameSize();
|
||||||
throw new IOException("Frame size of " + (nextFrameSize / (1024 * 1024)) + " MB larger than max allowed " + (maxFrameSize / (1024 * 1024)) + " MB");
|
|
||||||
|
if (openWireFormat.isMaxFrameSizeEnabled() && nextFrameSize > maxFrameSize) {
|
||||||
|
throw new MaxFrameSizeExceededException("Frame size of " + (nextFrameSize / (1024 * 1024)) + " MB larger than max allowed " + (maxFrameSize / (1024 * 1024)) + " MB");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -19,6 +19,8 @@ package org.apache.activemq.util;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.math.BigInteger;
|
import java.math.BigInteger;
|
||||||
|
|
||||||
|
import org.apache.activemq.MaxFrameSizeExceededException;
|
||||||
|
|
||||||
public final class IOExceptionSupport {
|
public final class IOExceptionSupport {
|
||||||
|
|
||||||
private IOExceptionSupport() {
|
private IOExceptionSupport() {
|
||||||
|
@ -49,7 +51,7 @@ public final class IOExceptionSupport {
|
||||||
}
|
}
|
||||||
|
|
||||||
public static IOException createFrameSizeException(int size, long maxSize) {
|
public static IOException createFrameSizeException(int size, long maxSize) {
|
||||||
return new IOException("Frame size of " + toHumanReadableSizeString(size) +
|
return new MaxFrameSizeExceededException("Frame size of " + toHumanReadableSizeString(size) +
|
||||||
" larger than max allowed " + toHumanReadableSizeString(maxSize));
|
" larger than max allowed " + toHumanReadableSizeString(maxSize));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -21,6 +21,8 @@ import javax.jms.JMSSecurityException;
|
||||||
import javax.jms.MessageEOFException;
|
import javax.jms.MessageEOFException;
|
||||||
import javax.jms.MessageFormatException;
|
import javax.jms.MessageFormatException;
|
||||||
|
|
||||||
|
import org.apache.activemq.MaxFrameSizeExceededException;
|
||||||
|
|
||||||
public final class JMSExceptionSupport {
|
public final class JMSExceptionSupport {
|
||||||
|
|
||||||
private JMSExceptionSupport() {
|
private JMSExceptionSupport() {
|
||||||
|
@ -61,6 +63,12 @@ public final class JMSExceptionSupport {
|
||||||
if (cause instanceof JMSException) {
|
if (cause instanceof JMSException) {
|
||||||
return (JMSException)cause;
|
return (JMSException)cause;
|
||||||
}
|
}
|
||||||
|
if (cause instanceof MaxFrameSizeExceededException) {
|
||||||
|
JMSException jmsException = new JMSException(cause.getMessage(), "41300");
|
||||||
|
jmsException.setLinkedException(cause);
|
||||||
|
jmsException.initCause(cause);
|
||||||
|
return jmsException;
|
||||||
|
}
|
||||||
String msg = cause.getMessage();
|
String msg = cause.getMessage();
|
||||||
if (msg == null || msg.length() == 0) {
|
if (msg == null || msg.length() == 0) {
|
||||||
msg = cause.toString();
|
msg = cause.toString();
|
||||||
|
|
|
@ -0,0 +1,302 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.transport;
|
||||||
|
|
||||||
|
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||||
|
import org.apache.activemq.MaxFrameSizeExceededException;
|
||||||
|
import org.apache.activemq.broker.BrokerService;
|
||||||
|
import org.apache.activemq.broker.TransportConnector;
|
||||||
|
import org.apache.activemq.transport.nio.SelectorManager;
|
||||||
|
import org.apache.activemq.util.Wait;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.junit.runners.Parameterized;
|
||||||
|
|
||||||
|
import javax.jms.Connection;
|
||||||
|
import javax.jms.JMSException;
|
||||||
|
import javax.jms.Message;
|
||||||
|
import javax.jms.MessageConsumer;
|
||||||
|
import javax.jms.MessageProducer;
|
||||||
|
import javax.jms.Queue;
|
||||||
|
import javax.jms.Session;
|
||||||
|
import javax.jms.TextMessage;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.*;
|
||||||
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
|
|
||||||
|
import static junit.framework.TestCase.assertTrue;
|
||||||
|
import static org.junit.Assert.*;
|
||||||
|
|
||||||
|
//Test for AMQ-8142
|
||||||
|
@RunWith(value = Parameterized.class)
|
||||||
|
public class MaxFrameSizeEnabledTest {
|
||||||
|
|
||||||
|
public static final String KEYSTORE_TYPE = "jks";
|
||||||
|
public static final String PASSWORD = "password";
|
||||||
|
public static final String SERVER_KEYSTORE = "src/test/resources/org/apache/activemq/security/broker1.ks";
|
||||||
|
public static final String TRUST_KEYSTORE = "src/test/resources/org/apache/activemq/security/broker1.ks";
|
||||||
|
|
||||||
|
private BrokerService broker;
|
||||||
|
private final String transportType;
|
||||||
|
private final boolean clientSideEnabled;
|
||||||
|
private final boolean serverSideEnabled;
|
||||||
|
|
||||||
|
public MaxFrameSizeEnabledTest(String transportType, boolean clientSideEnabled, boolean serverSideEnabled) {
|
||||||
|
this.transportType = transportType;
|
||||||
|
this.clientSideEnabled = clientSideEnabled;
|
||||||
|
this.serverSideEnabled = serverSideEnabled;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Parameterized.Parameters(name="transportType={0},clientSideEnable={1},serverSideEnabled={2}")
|
||||||
|
public static Collection<Object[]> data() {
|
||||||
|
return Arrays.asList(new Object[][] {
|
||||||
|
//Both client and server side max frame check enabled
|
||||||
|
{"tcp", true, true},
|
||||||
|
{"ssl", true, true},
|
||||||
|
{"nio", true, true},
|
||||||
|
{"nio+ssl", true, true},
|
||||||
|
{"auto", true, true},
|
||||||
|
{"auto+ssl", true, true},
|
||||||
|
{"auto+nio", true, true},
|
||||||
|
{"auto+nio+ssl", true, true},
|
||||||
|
|
||||||
|
//Client side enabled but server side disabled
|
||||||
|
{"tcp", true, false},
|
||||||
|
{"ssl", true, false},
|
||||||
|
{"nio", true, false},
|
||||||
|
{"nio+ssl", true, false},
|
||||||
|
{"auto", true, false},
|
||||||
|
{"auto+ssl", true, false},
|
||||||
|
{"auto+nio", true, false},
|
||||||
|
{"auto+nio+ssl", true, false},
|
||||||
|
|
||||||
|
//Client side disabled but server side enabled
|
||||||
|
{"tcp", false, true},
|
||||||
|
{"ssl", false, true},
|
||||||
|
{"nio", false, true},
|
||||||
|
{"nio+ssl", false, true},
|
||||||
|
{"auto", false, true},
|
||||||
|
{"auto+ssl", false, true},
|
||||||
|
{"auto+nio", false, true},
|
||||||
|
{"auto+nio+ssl", false, true},
|
||||||
|
|
||||||
|
//Client side and server side disabled
|
||||||
|
{"tcp", false, false},
|
||||||
|
{"ssl", false, false},
|
||||||
|
{"nio", false, false},
|
||||||
|
{"nio+ssl", false, false},
|
||||||
|
{"auto", false, false},
|
||||||
|
{"auto+ssl", false, false},
|
||||||
|
{"auto+nio", false, false},
|
||||||
|
{"auto+nio+ssl", false, false},
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void beforeClass() throws Exception {
|
||||||
|
System.setProperty("javax.net.ssl.trustStore", TRUST_KEYSTORE);
|
||||||
|
System.setProperty("javax.net.ssl.trustStorePassword", PASSWORD);
|
||||||
|
System.setProperty("javax.net.ssl.trustStoreType", KEYSTORE_TYPE);
|
||||||
|
System.setProperty("javax.net.ssl.keyStore", SERVER_KEYSTORE);
|
||||||
|
System.setProperty("javax.net.ssl.keyStoreType", KEYSTORE_TYPE);
|
||||||
|
System.setProperty("javax.net.ssl.keyStorePassword", PASSWORD);
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void after() throws Exception {
|
||||||
|
stopBroker(broker);
|
||||||
|
}
|
||||||
|
|
||||||
|
public BrokerService createBroker(String connectorName, String connectorString) throws Exception {
|
||||||
|
BrokerService broker = new BrokerService();
|
||||||
|
broker.setPersistent(false);
|
||||||
|
broker.setUseJmx(false);
|
||||||
|
TransportConnector connector = broker.addConnector(connectorString);
|
||||||
|
connector.setName(connectorName);
|
||||||
|
broker.start();
|
||||||
|
broker.waitUntilStarted();
|
||||||
|
return broker;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void stopBroker(BrokerService broker) throws Exception {
|
||||||
|
if (broker != null) {
|
||||||
|
broker.stop();
|
||||||
|
broker.waitUntilStopped();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMaxFrameSize() throws Exception {
|
||||||
|
broker = createBroker(transportType, transportType + "://localhost:0?wireFormat.maxFrameSize=2048" + getServerParams());
|
||||||
|
testMaxFrameSize(transportType, (isSsl() ? "ssl" : "tcp") + "://localhost:" + broker.getConnectorByName(transportType).getConnectUri().getPort() +
|
||||||
|
getClientParams(), false);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMaxFrameSizeCompression() throws Exception {
|
||||||
|
// Test message body length is 99841 bytes. Compresses to ~ 48000
|
||||||
|
broker = createBroker(transportType, transportType + "://localhost:0?wireFormat.maxFrameSize=60000" + getServerParams());
|
||||||
|
testMaxFrameSize(transportType, (isSsl() ? "ssl" : "tcp") + "://localhost:" + broker.getConnectorByName(transportType).getConnectUri().getPort()
|
||||||
|
+ getClientParams(), true);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void testMaxFrameSize(String transportType, String clientUri, boolean useCompression) throws Exception {
|
||||||
|
final List<Connection> connections = new ArrayList<>();
|
||||||
|
final ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(clientUri);
|
||||||
|
factory.setUseCompression(useCompression);
|
||||||
|
|
||||||
|
for (int i = 0; i < 10; i++) {
|
||||||
|
Connection connection = factory.createConnection();
|
||||||
|
connection.start();
|
||||||
|
connections.add(connection);
|
||||||
|
}
|
||||||
|
|
||||||
|
//Generate a body that is too large
|
||||||
|
StringBuffer body = new StringBuffer();
|
||||||
|
Random r = new Random();
|
||||||
|
for (int i = 0; i < 10000; i++) {
|
||||||
|
body.append(r.nextInt());
|
||||||
|
}
|
||||||
|
|
||||||
|
//Try sending 10 large messages rapidly in a loop to make sure all
|
||||||
|
//nio threads are allowed to send again and do not close server-side
|
||||||
|
for (int i = 0; i < 10; i++) {
|
||||||
|
boolean maxFrameSizeException = false;
|
||||||
|
boolean otherException = false;
|
||||||
|
|
||||||
|
Connection connection = null;
|
||||||
|
Session session = null;
|
||||||
|
Queue destination = null;
|
||||||
|
MessageConsumer messageConsumer = null;
|
||||||
|
MessageProducer producer = null;
|
||||||
|
|
||||||
|
try {
|
||||||
|
connection = connections.get(i);
|
||||||
|
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
destination = session.createQueue("TEST");
|
||||||
|
producer = session.createProducer(destination);
|
||||||
|
producer.send(session.createTextMessage(body.toString()));
|
||||||
|
} catch (JMSException e) {
|
||||||
|
if (clientSideEnabled) {
|
||||||
|
assertNotNull(e.getErrorCode());
|
||||||
|
assertEquals("41300", e.getErrorCode());
|
||||||
|
assertTrue(e.getCause() instanceof MaxFrameSizeExceededException);
|
||||||
|
} else {
|
||||||
|
assertTrue(e.getCause() instanceof IOException);
|
||||||
|
}
|
||||||
|
assertNotNull(e.getCause());
|
||||||
|
maxFrameSizeException = true;
|
||||||
|
} catch (Exception e) {
|
||||||
|
otherException = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (maxFrameSizeEnabled() && !useCompression) {
|
||||||
|
assertTrue("Should have gotten a jms maxframesize exception", maxFrameSizeException);
|
||||||
|
assertFalse("Should not have gotten a transport exception", otherException);
|
||||||
|
} else {
|
||||||
|
assertFalse("Should not have gotten a jms maxframesize exception", maxFrameSizeException);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!maxFrameSizeEnabled() && otherException) {
|
||||||
|
fail("Should not have gotten exception");
|
||||||
|
}
|
||||||
|
|
||||||
|
assertNotNull(connection);
|
||||||
|
assertNotNull(session);
|
||||||
|
assertNotNull(destination);
|
||||||
|
assertNotNull(producer);
|
||||||
|
|
||||||
|
if (connectionsShouldBeOpen(useCompression)) {
|
||||||
|
// Validate we can send a valid sized message after sending a too-large of message
|
||||||
|
boolean nextException = false;
|
||||||
|
try {
|
||||||
|
|
||||||
|
messageConsumer = session.createConsumer(destination);
|
||||||
|
producer.send(session.createTextMessage("Hello"));
|
||||||
|
|
||||||
|
int maxLoops = 50;
|
||||||
|
boolean found = false;
|
||||||
|
do {
|
||||||
|
Message message = messageConsumer.receive(200l);
|
||||||
|
if (message != null) {
|
||||||
|
assertTrue(TextMessage.class.isAssignableFrom(message.getClass()));
|
||||||
|
TextMessage.class.cast(message).getText().equals("Hello");
|
||||||
|
found = true;
|
||||||
|
}
|
||||||
|
maxLoops++;
|
||||||
|
} while (!found && maxLoops <= 50);
|
||||||
|
|
||||||
|
} catch (Exception e) {
|
||||||
|
nextException = true;
|
||||||
|
}
|
||||||
|
assertFalse("Should not have gotten an exception for the next message", nextException);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
if (connectionsShouldBeOpen(useCompression)) {
|
||||||
|
//Verify that all connections are active
|
||||||
|
assertTrue(Wait.waitFor(() -> broker.getConnectorByName(transportType).getConnections().size() == 10,
|
||||||
|
3000, 500));
|
||||||
|
} else {
|
||||||
|
//Verify that all connections are closed
|
||||||
|
assertTrue(Wait.waitFor(() -> broker.getConnectorByName(transportType).getConnections().size() == 0,
|
||||||
|
3000, 500));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (isNio() && connectionsShouldBeOpen(useCompression)) {
|
||||||
|
//Verify one active transport connections in the selector thread pool.
|
||||||
|
final ThreadPoolExecutor e = (ThreadPoolExecutor) SelectorManager.getInstance().getSelectorExecutor();
|
||||||
|
assertTrue(Wait.waitFor(() -> e.getActiveCount() == 1, 3000, 500));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean maxFrameSizeEnabled() {
|
||||||
|
return clientSideEnabled || serverSideEnabled;
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean connectionsShouldBeOpen(boolean useCompression) {
|
||||||
|
return !maxFrameSizeEnabled() || clientSideEnabled || useCompression;
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean isSsl() {
|
||||||
|
return transportType.contains("ssl");
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean isNio() {
|
||||||
|
return transportType.contains("nio");
|
||||||
|
}
|
||||||
|
|
||||||
|
private String getServerParams() {
|
||||||
|
if (serverSideEnabled) {
|
||||||
|
return isSsl() ? "&transport.needClientAuth=true" : "";
|
||||||
|
} else {
|
||||||
|
return isSsl() ? "&transport.needClientAuth=true&wireFormat.maxFrameSizeEnabled=false" : "&wireFormat.maxFrameSizeEnabled=false";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private String getClientParams() {
|
||||||
|
if (clientSideEnabled) {
|
||||||
|
return isSsl() ? "?socket.verifyHostName=false" : "";
|
||||||
|
} else {
|
||||||
|
return isSsl() ? "?socket.verifyHostName=false&wireFormat.maxFrameSizeEnabled=false" : "?wireFormat.maxFrameSizeEnabled=false";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -82,14 +82,14 @@ public class NIOMaxFrameSizeCleanupTest {
|
||||||
public void testMaxFrameSizeCleanupNio() throws Exception {
|
public void testMaxFrameSizeCleanupNio() throws Exception {
|
||||||
String transportType = "nio";
|
String transportType = "nio";
|
||||||
broker = createBroker(transportType, transportType + "://localhost:0?wireFormat.maxFrameSize=1024");
|
broker = createBroker(transportType, transportType + "://localhost:0?wireFormat.maxFrameSize=1024");
|
||||||
testMaxFrameSizeCleanup(transportType, "tcp://localhost:" + broker.getConnectorByName(transportType).getConnectUri().getPort());
|
testMaxFrameSizeCleanup(transportType, "tcp://localhost:" + broker.getConnectorByName(transportType).getConnectUri().getPort() + "?wireFormat.maxFrameSizeEnabled=false");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testMaxFrameSizeCleanupAutoNio() throws Exception {
|
public void testMaxFrameSizeCleanupAutoNio() throws Exception {
|
||||||
String transportType = "auto+nio";
|
String transportType = "auto+nio";
|
||||||
broker = createBroker(transportType, transportType + "://localhost:0?wireFormat.maxFrameSize=1024");
|
broker = createBroker(transportType, transportType + "://localhost:0?wireFormat.maxFrameSize=1024");
|
||||||
testMaxFrameSizeCleanup(transportType, "tcp://localhost:" + broker.getConnectorByName(transportType).getConnectUri().getPort());
|
testMaxFrameSizeCleanup(transportType, "tcp://localhost:" + broker.getConnectorByName(transportType).getConnectUri().getPort() + "?wireFormat.maxFrameSizeEnabled=false");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -98,7 +98,7 @@ public class NIOMaxFrameSizeCleanupTest {
|
||||||
broker = createBroker(transportType, transportType +
|
broker = createBroker(transportType, transportType +
|
||||||
"://localhost:0?transport.needClientAuth=true&wireFormat.maxFrameSize=1024");
|
"://localhost:0?transport.needClientAuth=true&wireFormat.maxFrameSize=1024");
|
||||||
testMaxFrameSizeCleanup(transportType, "ssl://localhost:" + broker.getConnectorByName(transportType).getConnectUri().getPort()
|
testMaxFrameSizeCleanup(transportType, "ssl://localhost:" + broker.getConnectorByName(transportType).getConnectUri().getPort()
|
||||||
+ "?socket.verifyHostName=false");
|
+ "?socket.verifyHostName=false&wireFormat.maxFrameSizeEnabled=false");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -107,7 +107,7 @@ public class NIOMaxFrameSizeCleanupTest {
|
||||||
broker = createBroker(transportType, transportType +
|
broker = createBroker(transportType, transportType +
|
||||||
"://localhost:0?transport.needClientAuth=true&wireFormat.maxFrameSize=1024");
|
"://localhost:0?transport.needClientAuth=true&wireFormat.maxFrameSize=1024");
|
||||||
testMaxFrameSizeCleanup(transportType, "ssl://localhost:" + broker.getConnectorByName(transportType).getConnectUri().getPort()
|
testMaxFrameSizeCleanup(transportType, "ssl://localhost:" + broker.getConnectorByName(transportType).getConnectUri().getPort()
|
||||||
+ "?socket.verifyHostName=false");
|
+ "?socket.verifyHostName=false&wireFormat.maxFrameSizeEnabled=false");
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void testMaxFrameSizeCleanup(String transportType, String clientUri) throws Exception {
|
protected void testMaxFrameSizeCleanup(String transportType, String clientUri) throws Exception {
|
||||||
|
|
Loading…
Reference in New Issue