diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java index 2be8203622..751a10e35f 100644 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java @@ -1388,12 +1388,6 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon if (isClosed()) { throw new ConnectionClosedException(); } else { - if(command.isMessage()) { - int tmpMsgSize = Message.class.cast(command).getSize(); - if(maxFrameSize.get() < tmpMsgSize) { - throw new JMSException("Message size: " + tmpMsgSize + " exceeds maximum allowed by broker: " + maxFrameSize.get(), "41300"); - } - } try { Response response = (Response)(timeout > 0 ? this.transport.request(command, timeout) diff --git a/activemq-client/src/main/java/org/apache/activemq/MaxFrameSizeExceededException.java b/activemq-client/src/main/java/org/apache/activemq/MaxFrameSizeExceededException.java new file mode 100644 index 0000000000..a9d0705802 --- /dev/null +++ b/activemq-client/src/main/java/org/apache/activemq/MaxFrameSizeExceededException.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import java.io.IOException; +/** + * An exception thrown when max frame size is exceeded. + * + * + */ +public class MaxFrameSizeExceededException extends IOException { + private static final long serialVersionUID = -7681404582227153308L; + + public MaxFrameSizeExceededException(String message) { + super(message); + } +} diff --git a/activemq-client/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java b/activemq-client/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java index c1297a24d1..e4f6e38c94 100644 --- a/activemq-client/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java +++ b/activemq-client/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java @@ -55,6 +55,7 @@ public final class OpenWireFormat implements WireFormat { private boolean cacheEnabled; private boolean tightEncodingEnabled; private boolean sizePrefixDisabled; + private boolean maxFrameSizeEnabled = true; private long maxFrameSize = DEFAULT_MAX_FRAME_SIZE; // The following fields are used for value caching @@ -80,7 +81,8 @@ public final class OpenWireFormat implements WireFormat { return version ^ (cacheEnabled ? 0x10000000 : 0x20000000) ^ (stackTraceEnabled ? 0x01000000 : 0x02000000) ^ (tightEncodingEnabled ? 0x00100000 : 0x00200000) - ^ (sizePrefixDisabled ? 0x00010000 : 0x00020000); + ^ (sizePrefixDisabled ? 0x00010000 : 0x00020000) + ^ (maxFrameSizeEnabled ? 0x00010000 : 0x00020000); } public OpenWireFormat copy() { @@ -91,6 +93,7 @@ public final class OpenWireFormat implements WireFormat { answer.tightEncodingEnabled = tightEncodingEnabled; answer.sizePrefixDisabled = sizePrefixDisabled; answer.preferedWireFormatInfo = preferedWireFormatInfo; + answer.maxFrameSizeEnabled = maxFrameSizeEnabled; return answer; } @@ -102,14 +105,15 @@ public final class OpenWireFormat implements WireFormat { OpenWireFormat o = (OpenWireFormat)object; return o.stackTraceEnabled == stackTraceEnabled && o.cacheEnabled == cacheEnabled && o.version == version && o.tightEncodingEnabled == tightEncodingEnabled - && o.sizePrefixDisabled == sizePrefixDisabled; + && o.sizePrefixDisabled == sizePrefixDisabled + && o.maxFrameSizeEnabled == maxFrameSizeEnabled; } @Override public String toString() { return "OpenWireFormat{version=" + version + ", cacheEnabled=" + cacheEnabled + ", stackTraceEnabled=" + stackTraceEnabled + ", tightEncodingEnabled=" - + tightEncodingEnabled + ", sizePrefixDisabled=" + sizePrefixDisabled + ", maxFrameSize=" + maxFrameSize + "}"; + + tightEncodingEnabled + ", sizePrefixDisabled=" + sizePrefixDisabled + ", maxFrameSize=" + maxFrameSize + ", maxFrameSizeEnabled=" + maxFrameSizeEnabled + "}"; // return "OpenWireFormat{id="+id+", // tightEncodingEnabled="+tightEncodingEnabled+"}"; } @@ -142,6 +146,10 @@ public final class OpenWireFormat implements WireFormat { size += dsm.tightMarshal1(this, c, bs); size += bs.marshalledSize(); + if(maxFrameSizeEnabled && size > maxFrameSize) { + throw IOExceptionSupport.createFrameSizeException(size, maxFrameSize); + } + bytesOut.restart(size); if (!sizePrefixDisabled) { bytesOut.writeInt(size); @@ -193,7 +201,7 @@ public final class OpenWireFormat implements WireFormat { // size"); } - if (size > maxFrameSize) { + if (maxFrameSizeEnabled && size > maxFrameSize) { throw IOExceptionSupport.createFrameSizeException(size, maxFrameSize); } } @@ -226,6 +234,10 @@ public final class OpenWireFormat implements WireFormat { size += dsm.tightMarshal1(this, c, bs); size += bs.marshalledSize(); + if(maxFrameSizeEnabled && size > maxFrameSize) { + throw IOExceptionSupport.createFrameSizeException(size, maxFrameSize); + } + if (!sizePrefixDisabled) { dataOut.writeInt(size); } @@ -266,7 +278,7 @@ public final class OpenWireFormat implements WireFormat { DataInput dataIn = dis; if (!sizePrefixDisabled) { int size = dis.readInt(); - if (size > maxFrameSize) { + if (maxFrameSizeEnabled && size > maxFrameSize) { throw IOExceptionSupport.createFrameSizeException(size, maxFrameSize); } // int size = dis.readInt(); @@ -605,6 +617,14 @@ public final class OpenWireFormat implements WireFormat { this.maxFrameSize = maxFrameSize; } + public boolean isMaxFrameSizeEnabled() { + return maxFrameSizeEnabled; + } + + public void setMaxFrameSizeEnabled(boolean maxFrameSizeEnabled) { + this.maxFrameSizeEnabled = maxFrameSizeEnabled; + } + public void renegotiateWireFormat(WireFormatInfo info) throws IOException { if (preferedWireFormatInfo == null) { diff --git a/activemq-client/src/main/java/org/apache/activemq/openwire/OpenWireFormatFactory.java b/activemq-client/src/main/java/org/apache/activemq/openwire/OpenWireFormatFactory.java index 2614ad780c..20836390b5 100644 --- a/activemq-client/src/main/java/org/apache/activemq/openwire/OpenWireFormatFactory.java +++ b/activemq-client/src/main/java/org/apache/activemq/openwire/OpenWireFormatFactory.java @@ -41,6 +41,7 @@ public class OpenWireFormatFactory implements WireFormatFactory { private long maxInactivityDurationInitalDelay = 10*1000; private int cacheSize = 1024; private long maxFrameSize = OpenWireFormat.DEFAULT_MAX_FRAME_SIZE; + private boolean maxFrameSizeEnabled = true; private String host=null; private String providerName = ActiveMQConnectionMetaData.PROVIDER_NAME; private String providerVersion = ActiveMQConnectionMetaData.PROVIDER_VERSION; @@ -80,6 +81,7 @@ public class OpenWireFormatFactory implements WireFormatFactory { OpenWireFormat f = new OpenWireFormat(version); f.setMaxFrameSize(maxFrameSize); f.setPreferedWireFormatInfo(info); + f.setMaxFrameSizeEnabled(maxFrameSizeEnabled); return f; } @@ -203,4 +205,12 @@ public class OpenWireFormatFactory implements WireFormatFactory { public void setIncludePlatformDetails(boolean includePlatformDetails) { this.includePlatformDetails = includePlatformDetails; } + + public void setMaxFrameSizeEnabled(boolean maxFrameSizeEnabled) { + this.maxFrameSizeEnabled = maxFrameSizeEnabled; + } + + public boolean isMaxFrameSizeEnabled() { + return this.maxFrameSizeEnabled; + } } diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java b/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java index d0e2fc8e09..13b6c54a11 100644 --- a/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java +++ b/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java @@ -40,6 +40,7 @@ import javax.net.ssl.SSLParameters; import javax.net.ssl.SSLPeerUnverifiedException; import javax.net.ssl.SSLSession; +import org.apache.activemq.MaxFrameSizeExceededException; import org.apache.activemq.command.ConnectionInfo; import org.apache.activemq.openwire.OpenWireFormat; import org.apache.activemq.thread.TaskRunnerFactory; @@ -335,9 +336,11 @@ public class NIOSSLTransport extends NIOTransport { } if (wireFormat instanceof OpenWireFormat) { - long maxFrameSize = ((OpenWireFormat) wireFormat).getMaxFrameSize(); - if (nextFrameSize > maxFrameSize) { - throw new IOException("Frame size of " + (nextFrameSize / (1024 * 1024)) + + OpenWireFormat openWireFormat = (OpenWireFormat) wireFormat; + long maxFrameSize = openWireFormat.getMaxFrameSize(); + + if (openWireFormat.isMaxFrameSizeEnabled() && nextFrameSize > maxFrameSize) { + throw new MaxFrameSizeExceededException("Frame size of " + (nextFrameSize / (1024 * 1024)) + " MB larger than max allowed " + (maxFrameSize / (1024 * 1024)) + " MB"); } } diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java b/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java index 7fe5badc1a..6109949d51 100644 --- a/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java +++ b/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java @@ -29,6 +29,7 @@ import java.nio.channels.SocketChannel; import javax.net.SocketFactory; +import org.apache.activemq.MaxFrameSizeExceededException; import org.apache.activemq.openwire.OpenWireFormat; import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.tcp.TcpTransport; @@ -139,9 +140,11 @@ public class NIOTransport extends TcpTransport { nextFrameSize = inputBuffer.getInt() + 4; if (wireFormat instanceof OpenWireFormat) { - long maxFrameSize = ((OpenWireFormat)wireFormat).getMaxFrameSize(); - if (nextFrameSize > maxFrameSize) { - throw new IOException("Frame size of " + (nextFrameSize / (1024 * 1024)) + " MB larger than max allowed " + (maxFrameSize / (1024 * 1024)) + " MB"); + OpenWireFormat openWireFormat = (OpenWireFormat)wireFormat; + long maxFrameSize = openWireFormat.getMaxFrameSize(); + + if (openWireFormat.isMaxFrameSizeEnabled() && nextFrameSize > maxFrameSize) { + throw new MaxFrameSizeExceededException("Frame size of " + (nextFrameSize / (1024 * 1024)) + " MB larger than max allowed " + (maxFrameSize / (1024 * 1024)) + " MB"); } } diff --git a/activemq-client/src/main/java/org/apache/activemq/util/IOExceptionSupport.java b/activemq-client/src/main/java/org/apache/activemq/util/IOExceptionSupport.java index 7aa2fc47b8..0db3ce4fc4 100644 --- a/activemq-client/src/main/java/org/apache/activemq/util/IOExceptionSupport.java +++ b/activemq-client/src/main/java/org/apache/activemq/util/IOExceptionSupport.java @@ -19,6 +19,8 @@ package org.apache.activemq.util; import java.io.IOException; import java.math.BigInteger; +import org.apache.activemq.MaxFrameSizeExceededException; + public final class IOExceptionSupport { private IOExceptionSupport() { @@ -49,7 +51,7 @@ public final class IOExceptionSupport { } public static IOException createFrameSizeException(int size, long maxSize) { - return new IOException("Frame size of " + toHumanReadableSizeString(size) + + return new MaxFrameSizeExceededException("Frame size of " + toHumanReadableSizeString(size) + " larger than max allowed " + toHumanReadableSizeString(maxSize)); } diff --git a/activemq-client/src/main/java/org/apache/activemq/util/JMSExceptionSupport.java b/activemq-client/src/main/java/org/apache/activemq/util/JMSExceptionSupport.java index 5d55273422..a73f01f5d0 100644 --- a/activemq-client/src/main/java/org/apache/activemq/util/JMSExceptionSupport.java +++ b/activemq-client/src/main/java/org/apache/activemq/util/JMSExceptionSupport.java @@ -21,6 +21,8 @@ import javax.jms.JMSSecurityException; import javax.jms.MessageEOFException; import javax.jms.MessageFormatException; +import org.apache.activemq.MaxFrameSizeExceededException; + public final class JMSExceptionSupport { private JMSExceptionSupport() { @@ -61,6 +63,12 @@ public final class JMSExceptionSupport { if (cause instanceof JMSException) { return (JMSException)cause; } + if (cause instanceof MaxFrameSizeExceededException) { + JMSException jmsException = new JMSException(cause.getMessage(), "41300"); + jmsException.setLinkedException(cause); + jmsException.initCause(cause); + return jmsException; + } String msg = cause.getMessage(); if (msg == null || msg.length() == 0) { msg = cause.toString(); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/MaxFrameSizeEnabledTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/MaxFrameSizeEnabledTest.java new file mode 100644 index 0000000000..d2ce82e17b --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/MaxFrameSizeEnabledTest.java @@ -0,0 +1,302 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.MaxFrameSizeExceededException; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.transport.nio.SelectorManager; +import org.apache.activemq.util.Wait; +import org.junit.After; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; + +import java.io.IOException; +import java.util.*; +import java.util.concurrent.ThreadPoolExecutor; + +import static junit.framework.TestCase.assertTrue; +import static org.junit.Assert.*; + +//Test for AMQ-8142 +@RunWith(value = Parameterized.class) +public class MaxFrameSizeEnabledTest { + + public static final String KEYSTORE_TYPE = "jks"; + public static final String PASSWORD = "password"; + public static final String SERVER_KEYSTORE = "src/test/resources/org/apache/activemq/security/broker1.ks"; + public static final String TRUST_KEYSTORE = "src/test/resources/org/apache/activemq/security/broker1.ks"; + + private BrokerService broker; + private final String transportType; + private final boolean clientSideEnabled; + private final boolean serverSideEnabled; + + public MaxFrameSizeEnabledTest(String transportType, boolean clientSideEnabled, boolean serverSideEnabled) { + this.transportType = transportType; + this.clientSideEnabled = clientSideEnabled; + this.serverSideEnabled = serverSideEnabled; + } + + @Parameterized.Parameters(name="transportType={0},clientSideEnable={1},serverSideEnabled={2}") + public static Collection data() { + return Arrays.asList(new Object[][] { + //Both client and server side max frame check enabled + {"tcp", true, true}, + {"ssl", true, true}, + {"nio", true, true}, + {"nio+ssl", true, true}, + {"auto", true, true}, + {"auto+ssl", true, true}, + {"auto+nio", true, true}, + {"auto+nio+ssl", true, true}, + + //Client side enabled but server side disabled + {"tcp", true, false}, + {"ssl", true, false}, + {"nio", true, false}, + {"nio+ssl", true, false}, + {"auto", true, false}, + {"auto+ssl", true, false}, + {"auto+nio", true, false}, + {"auto+nio+ssl", true, false}, + + //Client side disabled but server side enabled + {"tcp", false, true}, + {"ssl", false, true}, + {"nio", false, true}, + {"nio+ssl", false, true}, + {"auto", false, true}, + {"auto+ssl", false, true}, + {"auto+nio", false, true}, + {"auto+nio+ssl", false, true}, + + //Client side and server side disabled + {"tcp", false, false}, + {"ssl", false, false}, + {"nio", false, false}, + {"nio+ssl", false, false}, + {"auto", false, false}, + {"auto+ssl", false, false}, + {"auto+nio", false, false}, + {"auto+nio+ssl", false, false}, + }); + } + + @BeforeClass + public static void beforeClass() throws Exception { + System.setProperty("javax.net.ssl.trustStore", TRUST_KEYSTORE); + System.setProperty("javax.net.ssl.trustStorePassword", PASSWORD); + System.setProperty("javax.net.ssl.trustStoreType", KEYSTORE_TYPE); + System.setProperty("javax.net.ssl.keyStore", SERVER_KEYSTORE); + System.setProperty("javax.net.ssl.keyStoreType", KEYSTORE_TYPE); + System.setProperty("javax.net.ssl.keyStorePassword", PASSWORD); + } + + @After + public void after() throws Exception { + stopBroker(broker); + } + + public BrokerService createBroker(String connectorName, String connectorString) throws Exception { + BrokerService broker = new BrokerService(); + broker.setPersistent(false); + broker.setUseJmx(false); + TransportConnector connector = broker.addConnector(connectorString); + connector.setName(connectorName); + broker.start(); + broker.waitUntilStarted(); + return broker; + } + + public void stopBroker(BrokerService broker) throws Exception { + if (broker != null) { + broker.stop(); + broker.waitUntilStopped(); + } + } + + @Test + public void testMaxFrameSize() throws Exception { + broker = createBroker(transportType, transportType + "://localhost:0?wireFormat.maxFrameSize=2048" + getServerParams()); + testMaxFrameSize(transportType, (isSsl() ? "ssl" : "tcp") + "://localhost:" + broker.getConnectorByName(transportType).getConnectUri().getPort() + + getClientParams(), false); + } + + @Test + public void testMaxFrameSizeCompression() throws Exception { + // Test message body length is 99841 bytes. Compresses to ~ 48000 + broker = createBroker(transportType, transportType + "://localhost:0?wireFormat.maxFrameSize=60000" + getServerParams()); + testMaxFrameSize(transportType, (isSsl() ? "ssl" : "tcp") + "://localhost:" + broker.getConnectorByName(transportType).getConnectUri().getPort() + + getClientParams(), true); + } + + protected void testMaxFrameSize(String transportType, String clientUri, boolean useCompression) throws Exception { + final List connections = new ArrayList<>(); + final ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(clientUri); + factory.setUseCompression(useCompression); + + for (int i = 0; i < 10; i++) { + Connection connection = factory.createConnection(); + connection.start(); + connections.add(connection); + } + + //Generate a body that is too large + StringBuffer body = new StringBuffer(); + Random r = new Random(); + for (int i = 0; i < 10000; i++) { + body.append(r.nextInt()); + } + + //Try sending 10 large messages rapidly in a loop to make sure all + //nio threads are allowed to send again and do not close server-side + for (int i = 0; i < 10; i++) { + boolean maxFrameSizeException = false; + boolean otherException = false; + + Connection connection = null; + Session session = null; + Queue destination = null; + MessageConsumer messageConsumer = null; + MessageProducer producer = null; + + try { + connection = connections.get(i); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + destination = session.createQueue("TEST"); + producer = session.createProducer(destination); + producer.send(session.createTextMessage(body.toString())); + } catch (JMSException e) { + if (clientSideEnabled) { + assertNotNull(e.getErrorCode()); + assertEquals("41300", e.getErrorCode()); + assertTrue(e.getCause() instanceof MaxFrameSizeExceededException); + } else { + assertTrue(e.getCause() instanceof IOException); + } + assertNotNull(e.getCause()); + maxFrameSizeException = true; + } catch (Exception e) { + otherException = true; + } + + if (maxFrameSizeEnabled() && !useCompression) { + assertTrue("Should have gotten a jms maxframesize exception", maxFrameSizeException); + assertFalse("Should not have gotten a transport exception", otherException); + } else { + assertFalse("Should not have gotten a jms maxframesize exception", maxFrameSizeException); + } + + if (!maxFrameSizeEnabled() && otherException) { + fail("Should not have gotten exception"); + } + + assertNotNull(connection); + assertNotNull(session); + assertNotNull(destination); + assertNotNull(producer); + + if (connectionsShouldBeOpen(useCompression)) { + // Validate we can send a valid sized message after sending a too-large of message + boolean nextException = false; + try { + + messageConsumer = session.createConsumer(destination); + producer.send(session.createTextMessage("Hello")); + + int maxLoops = 50; + boolean found = false; + do { + Message message = messageConsumer.receive(200l); + if (message != null) { + assertTrue(TextMessage.class.isAssignableFrom(message.getClass())); + TextMessage.class.cast(message).getText().equals("Hello"); + found = true; + } + maxLoops++; + } while (!found && maxLoops <= 50); + + } catch (Exception e) { + nextException = true; + } + assertFalse("Should not have gotten an exception for the next message", nextException); + } + } + + + if (connectionsShouldBeOpen(useCompression)) { + //Verify that all connections are active + assertTrue(Wait.waitFor(() -> broker.getConnectorByName(transportType).getConnections().size() == 10, + 3000, 500)); + } else { + //Verify that all connections are closed + assertTrue(Wait.waitFor(() -> broker.getConnectorByName(transportType).getConnections().size() == 0, + 3000, 500)); + } + + if (isNio() && connectionsShouldBeOpen(useCompression)) { + //Verify one active transport connections in the selector thread pool. + final ThreadPoolExecutor e = (ThreadPoolExecutor) SelectorManager.getInstance().getSelectorExecutor(); + assertTrue(Wait.waitFor(() -> e.getActiveCount() == 1, 3000, 500)); + } + } + + private boolean maxFrameSizeEnabled() { + return clientSideEnabled || serverSideEnabled; + } + + private boolean connectionsShouldBeOpen(boolean useCompression) { + return !maxFrameSizeEnabled() || clientSideEnabled || useCompression; + } + + private boolean isSsl() { + return transportType.contains("ssl"); + } + + private boolean isNio() { + return transportType.contains("nio"); + } + + private String getServerParams() { + if (serverSideEnabled) { + return isSsl() ? "&transport.needClientAuth=true" : ""; + } else { + return isSsl() ? "&transport.needClientAuth=true&wireFormat.maxFrameSizeEnabled=false" : "&wireFormat.maxFrameSizeEnabled=false"; + } + } + + private String getClientParams() { + if (clientSideEnabled) { + return isSsl() ? "?socket.verifyHostName=false" : ""; + } else { + return isSsl() ? "?socket.verifyHostName=false&wireFormat.maxFrameSizeEnabled=false" : "?wireFormat.maxFrameSizeEnabled=false"; + } + } +} diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/nio/NIOMaxFrameSizeCleanupTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/nio/NIOMaxFrameSizeCleanupTest.java index df2e691b2e..83a6939015 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/nio/NIOMaxFrameSizeCleanupTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/nio/NIOMaxFrameSizeCleanupTest.java @@ -82,14 +82,14 @@ public class NIOMaxFrameSizeCleanupTest { public void testMaxFrameSizeCleanupNio() throws Exception { String transportType = "nio"; broker = createBroker(transportType, transportType + "://localhost:0?wireFormat.maxFrameSize=1024"); - testMaxFrameSizeCleanup(transportType, "tcp://localhost:" + broker.getConnectorByName(transportType).getConnectUri().getPort()); + testMaxFrameSizeCleanup(transportType, "tcp://localhost:" + broker.getConnectorByName(transportType).getConnectUri().getPort() + "?wireFormat.maxFrameSizeEnabled=false"); } @Test public void testMaxFrameSizeCleanupAutoNio() throws Exception { String transportType = "auto+nio"; broker = createBroker(transportType, transportType + "://localhost:0?wireFormat.maxFrameSize=1024"); - testMaxFrameSizeCleanup(transportType, "tcp://localhost:" + broker.getConnectorByName(transportType).getConnectUri().getPort()); + testMaxFrameSizeCleanup(transportType, "tcp://localhost:" + broker.getConnectorByName(transportType).getConnectUri().getPort() + "?wireFormat.maxFrameSizeEnabled=false"); } @Test @@ -98,7 +98,7 @@ public class NIOMaxFrameSizeCleanupTest { broker = createBroker(transportType, transportType + "://localhost:0?transport.needClientAuth=true&wireFormat.maxFrameSize=1024"); testMaxFrameSizeCleanup(transportType, "ssl://localhost:" + broker.getConnectorByName(transportType).getConnectUri().getPort() - + "?socket.verifyHostName=false"); + + "?socket.verifyHostName=false&wireFormat.maxFrameSizeEnabled=false"); } @Test @@ -107,7 +107,7 @@ public class NIOMaxFrameSizeCleanupTest { broker = createBroker(transportType, transportType + "://localhost:0?transport.needClientAuth=true&wireFormat.maxFrameSize=1024"); testMaxFrameSizeCleanup(transportType, "ssl://localhost:" + broker.getConnectorByName(transportType).getConnectUri().getPort() - + "?socket.verifyHostName=false"); + + "?socket.verifyHostName=false&wireFormat.maxFrameSizeEnabled=false"); } protected void testMaxFrameSizeCleanup(String transportType, String clientUri) throws Exception {