https://issues.apache.org/jira/browse/AMQ-498 - make max frame size part of wire format negotiation

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1133915 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bosanac Dejan 2011-06-09 15:09:55 +00:00
parent 6ad39a3615
commit e5e99d72f3
7 changed files with 69 additions and 18 deletions

View File

@ -270,6 +270,15 @@ public class WireFormatInfo implements Command, MarshallAware {
public void setMaxInactivityDurationInitalDelay(long maxInactivityDurationInitalDelay) throws IOException { public void setMaxInactivityDurationInitalDelay(long maxInactivityDurationInitalDelay) throws IOException {
setProperty("MaxInactivityDurationInitalDelay", new Long(maxInactivityDurationInitalDelay)); setProperty("MaxInactivityDurationInitalDelay", new Long(maxInactivityDurationInitalDelay));
} }
public long getMaxFrameSize() throws IOException {
Long l = (Long)getProperty("MaxFrameSize");
return l == null ? 0 : l.longValue();
}
public void setMaxFrameSize(long maxFrameSize) throws IOException {
setProperty("MaxFrameSize", new Long(maxFrameSize));
}

View File

@ -40,6 +40,7 @@ public final class OpenWireFormat implements WireFormat {
public static final int DEFAULT_VERSION = CommandTypes.PROTOCOL_STORE_VERSION; public static final int DEFAULT_VERSION = CommandTypes.PROTOCOL_STORE_VERSION;
public static final int DEFAULT_WIRE_VERSION = CommandTypes.PROTOCOL_VERSION; public static final int DEFAULT_WIRE_VERSION = CommandTypes.PROTOCOL_VERSION;
public static final int DEFAULT_MAX_FRAME_SIZE = 100 * 1024 * 1024; //100 MB
static final byte NULL_TYPE = CommandTypes.NULL; static final byte NULL_TYPE = CommandTypes.NULL;
private static final int MARSHAL_CACHE_SIZE = Short.MAX_VALUE / 2; private static final int MARSHAL_CACHE_SIZE = Short.MAX_VALUE / 2;
@ -52,6 +53,7 @@ public final class OpenWireFormat implements WireFormat {
private boolean cacheEnabled; private boolean cacheEnabled;
private boolean tightEncodingEnabled; private boolean tightEncodingEnabled;
private boolean sizePrefixDisabled; private boolean sizePrefixDisabled;
private long maxFrameSize = DEFAULT_MAX_FRAME_SIZE;
// The following fields are used for value caching // The following fields are used for value caching
private short nextMarshallCacheIndex; private short nextMarshallCacheIndex;
@ -103,7 +105,7 @@ public final class OpenWireFormat implements WireFormat {
public String toString() { public String toString() {
return "OpenWireFormat{version=" + version + ", cacheEnabled=" + cacheEnabled + ", stackTraceEnabled=" + stackTraceEnabled + ", tightEncodingEnabled=" return "OpenWireFormat{version=" + version + ", cacheEnabled=" + cacheEnabled + ", stackTraceEnabled=" + stackTraceEnabled + ", tightEncodingEnabled="
+ tightEncodingEnabled + ", sizePrefixDisabled=" + sizePrefixDisabled + "}"; + tightEncodingEnabled + ", sizePrefixDisabled=" + sizePrefixDisabled + ", maxFrameSize=" + maxFrameSize + "}";
// return "OpenWireFormat{id="+id+", // return "OpenWireFormat{id="+id+",
// tightEncodingEnabled="+tightEncodingEnabled+"}"; // tightEncodingEnabled="+tightEncodingEnabled+"}";
} }
@ -591,6 +593,14 @@ public final class OpenWireFormat implements WireFormat {
return preferedWireFormatInfo; return preferedWireFormatInfo;
} }
public long getMaxFrameSize() {
return maxFrameSize;
}
public void setMaxFrameSize(long maxFrameSize) {
this.maxFrameSize = maxFrameSize;
}
public void renegotiateWireFormat(WireFormatInfo info) throws IOException { public void renegotiateWireFormat(WireFormatInfo info) throws IOException {
if (preferedWireFormatInfo == null) { if (preferedWireFormatInfo == null) {
@ -600,6 +610,9 @@ public final class OpenWireFormat implements WireFormat {
this.setVersion(min(preferedWireFormatInfo.getVersion(), info.getVersion())); this.setVersion(min(preferedWireFormatInfo.getVersion(), info.getVersion()));
info.setVersion(this.getVersion()); info.setVersion(this.getVersion());
this.setMaxFrameSize(min(preferedWireFormatInfo.getMaxFrameSize(), info.getMaxFrameSize()));
info.setMaxFrameSize(this.getMaxFrameSize());
this.stackTraceEnabled = info.isStackTraceEnabled() && preferedWireFormatInfo.isStackTraceEnabled(); this.stackTraceEnabled = info.isStackTraceEnabled() && preferedWireFormatInfo.isStackTraceEnabled();
info.setStackTraceEnabled(this.stackTraceEnabled); info.setStackTraceEnabled(this.stackTraceEnabled);
@ -647,4 +660,11 @@ public final class OpenWireFormat implements WireFormat {
} }
return version2; return version2;
} }
protected long min(long version1, long version2) {
if (version1 < version2 && version1 > 0 || version2 <= 0) {
return version1;
}
return version2;
}
} }

View File

@ -39,6 +39,7 @@ public class OpenWireFormatFactory implements WireFormatFactory {
private long maxInactivityDuration = 30*1000; private long maxInactivityDuration = 30*1000;
private long maxInactivityDurationInitalDelay = 10*1000; private long maxInactivityDurationInitalDelay = 10*1000;
private int cacheSize = 1024; private int cacheSize = 1024;
private long maxFrameSize = OpenWireFormat.DEFAULT_MAX_FRAME_SIZE;
public WireFormat createWireFormat() { public WireFormat createWireFormat() {
WireFormatInfo info = new WireFormatInfo(); WireFormatInfo info = new WireFormatInfo();
@ -53,6 +54,7 @@ public class OpenWireFormatFactory implements WireFormatFactory {
info.setMaxInactivityDuration(maxInactivityDuration); info.setMaxInactivityDuration(maxInactivityDuration);
info.setMaxInactivityDurationInitalDelay(maxInactivityDurationInitalDelay); info.setMaxInactivityDurationInitalDelay(maxInactivityDurationInitalDelay);
info.setCacheSize(cacheSize); info.setCacheSize(cacheSize);
info.setMaxFrameSize(maxFrameSize);
} catch (Exception e) { } catch (Exception e) {
IllegalStateException ise = new IllegalStateException("Could not configure WireFormatInfo"); IllegalStateException ise = new IllegalStateException("Could not configure WireFormatInfo");
ise.initCause(e); ise.initCause(e);
@ -60,6 +62,7 @@ public class OpenWireFormatFactory implements WireFormatFactory {
} }
OpenWireFormat f = new OpenWireFormat(version); OpenWireFormat f = new OpenWireFormat(version);
f.setMaxFrameSize(maxFrameSize);
f.setPreferedWireFormatInfo(info); f.setPreferedWireFormatInfo(info);
return f; return f;
} }
@ -136,4 +139,12 @@ public class OpenWireFormatFactory implements WireFormatFactory {
long maxInactivityDurationInitalDelay) { long maxInactivityDurationInitalDelay) {
this.maxInactivityDurationInitalDelay = maxInactivityDurationInitalDelay; this.maxInactivityDurationInitalDelay = maxInactivityDurationInitalDelay;
} }
public long getMaxFrameSize() {
return maxFrameSize;
}
public void setMaxFrameSize(long maxFrameSize) {
this.maxFrameSize = maxFrameSize;
}
} }

View File

@ -30,6 +30,7 @@ import java.nio.channels.SocketChannel;
import javax.net.SocketFactory; import javax.net.SocketFactory;
import org.apache.activemq.command.Command; import org.apache.activemq.command.Command;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.tcp.TcpTransport; import org.apache.activemq.transport.tcp.TcpTransport;
import org.apache.activemq.util.IOExceptionSupport; import org.apache.activemq.util.IOExceptionSupport;
@ -115,9 +116,14 @@ public class NIOTransport extends TcpTransport {
// for it. // for it.
inputBuffer.flip(); inputBuffer.flip();
nextFrameSize = inputBuffer.getInt() + 4; nextFrameSize = inputBuffer.getInt() + 4;
if (nextFrameSize > maxFrameSize) {
throw new IOException("Frame size of " + (nextFrameSize / (1024 * 1024)) + " MB larger than max allowed " + (maxFrameSize / (1024 * 1024)) + " MB"); if (wireFormat instanceof OpenWireFormat) {
long maxFrameSize = ((OpenWireFormat)wireFormat).getMaxFrameSize();
if (nextFrameSize > maxFrameSize) {
throw new IOException("Frame size of " + (nextFrameSize / (1024 * 1024)) + " MB larger than max allowed " + (maxFrameSize / (1024 * 1024)) + " MB");
}
} }
if (nextFrameSize > inputBuffer.capacity()) { if (nextFrameSize > inputBuffer.capacity()) {
currentBuffer = ByteBuffer.allocate(nextFrameSize); currentBuffer = ByteBuffer.allocate(nextFrameSize);
currentBuffer.putInt(nextFrameSize); currentBuffer.putInt(nextFrameSize);

View File

@ -70,8 +70,6 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
protected DataInputStream dataIn; protected DataInputStream dataIn;
protected TimeStampStream buffOut = null; protected TimeStampStream buffOut = null;
protected int maxFrameSize = 104857600; //100MB
/** /**
* The Traffic Class to be set on the socket. * The Traffic Class to be set on the socket.
@ -323,14 +321,6 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
return socketBufferSize; return socketBufferSize;
} }
public int getMaxFrameSize() {
return maxFrameSize;
}
public void setMaxFrameSize(int maxFrameSize) {
this.maxFrameSize = maxFrameSize;
}
/** /**
* Sets the buffer size to use on the socket * Sets the buffer size to use on the socket
*/ */

View File

@ -152,7 +152,7 @@ public class WireformatNegociationTest extends CombinationTestSupport {
/** /**
* @throws Exception * @throws Exception
*/ */
public void testWireFomatInfoSeverVersion1() throws Exception { public void testWireFormatInfoSeverVersion1() throws Exception {
startServer("tcp://localhost:61616?wireFormat.version=1"); startServer("tcp://localhost:61616?wireFormat.version=1");
startClient("tcp://localhost:61616"); startClient("tcp://localhost:61616");
@ -170,7 +170,7 @@ public class WireformatNegociationTest extends CombinationTestSupport {
/** /**
* @throws Exception * @throws Exception
*/ */
public void testWireFomatInfoClientVersion1() throws Exception { public void testWireFormatInfoClientVersion1() throws Exception {
startServer("tcp://localhost:61616"); startServer("tcp://localhost:61616");
startClient("tcp://localhost:61616?wireFormat.version=1"); startClient("tcp://localhost:61616?wireFormat.version=1");
@ -188,7 +188,7 @@ public class WireformatNegociationTest extends CombinationTestSupport {
/** /**
* @throws Exception * @throws Exception
*/ */
public void testWireFomatInfoCurrentVersion() throws Exception { public void testWireFormatInfoCurrentVersion() throws Exception {
startServer("tcp://localhost:61616"); startServer("tcp://localhost:61616");
startClient("tcp://localhost:61616"); startClient("tcp://localhost:61616");
@ -203,7 +203,7 @@ public class WireformatNegociationTest extends CombinationTestSupport {
assertEquals(CommandTypes.PROTOCOL_VERSION, serverWF.get().getVersion()); assertEquals(CommandTypes.PROTOCOL_VERSION, serverWF.get().getVersion());
} }
public void testWireFomatInactivityDurationInitalDelay() throws Exception { public void testWireFormatInactivityDurationInitialDelay() throws Exception {
startServer("tcp://localhost:61616"); startServer("tcp://localhost:61616");
startClient("tcp://localhost:61616?wireFormat.maxInactivityDurationInitalDelay=60000"); startClient("tcp://localhost:61616?wireFormat.maxInactivityDurationInitalDelay=60000");
@ -218,4 +218,19 @@ public class WireformatNegociationTest extends CombinationTestSupport {
assertEquals(CommandTypes.PROTOCOL_VERSION, serverWF.get().getVersion()); assertEquals(CommandTypes.PROTOCOL_VERSION, serverWF.get().getVersion());
} }
public void testWireFormatMaxFrameSize() throws Exception {
startServer("tcp://localhost:61616");
startClient("tcp://localhost:61616?wireFormat.maxFrameSize=1048576");
assertTrue("Connect timeout", negociationCounter.await(10, TimeUnit.SECONDS));
assertNull("Async error: " + asyncError, asyncError.get());
assertNotNull(clientWF.get());
assertEquals(1048576, clientWF.get().getMaxFrameSize());
assertNotNull(serverWF.get());
assertEquals(1048576, serverWF.get().getMaxFrameSize());
}
} }

View File

@ -28,7 +28,7 @@
<broker useJmx="false" xmlns="http://activemq.apache.org/schema/core" persistent="false"> <broker useJmx="false" xmlns="http://activemq.apache.org/schema/core" persistent="false">
<transportConnectors> <transportConnectors>
<transportConnector uri="nio://localhost:61616?transport.maxFrameSize=10485760" /> <transportConnector uri="nio://localhost:61616?wireFormat.maxFrameSize=1048576" />
</transportConnectors> </transportConnectors>
</broker> </broker>