mirror of https://github.com/apache/activemq.git
Merge pull request #748 from mattrpav/AMQ-8412b
[AMQ-8412] Client-side handling of max frame size received from broker
This commit is contained in:
commit
c5d07ac016
|
@ -35,6 +35,7 @@ import java.util.concurrent.ThreadPoolExecutor;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
import javax.jms.Connection;
|
import javax.jms.Connection;
|
||||||
import javax.jms.ConnectionConsumer;
|
import javax.jms.ConnectionConsumer;
|
||||||
|
@ -192,6 +193,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
|
||||||
// Assume that protocol is the latest. Change to the actual protocol
|
// Assume that protocol is the latest. Change to the actual protocol
|
||||||
// version when a WireFormatInfo is received.
|
// version when a WireFormatInfo is received.
|
||||||
private final AtomicInteger protocolVersion = new AtomicInteger(CommandTypes.PROTOCOL_VERSION);
|
private final AtomicInteger protocolVersion = new AtomicInteger(CommandTypes.PROTOCOL_VERSION);
|
||||||
|
private final AtomicLong maxFrameSize = new AtomicLong(Long.MAX_VALUE);
|
||||||
private final long timeCreated;
|
private final long timeCreated;
|
||||||
private final ConnectionAudit connectionAudit = new ConnectionAudit();
|
private final ConnectionAudit connectionAudit = new ConnectionAudit();
|
||||||
private DestinationSource destinationSource;
|
private DestinationSource destinationSource;
|
||||||
|
@ -1446,7 +1448,12 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
|
||||||
if (isClosed()) {
|
if (isClosed()) {
|
||||||
throw new ConnectionClosedException();
|
throw new ConnectionClosedException();
|
||||||
} else {
|
} else {
|
||||||
|
if(command.isMessage()) {
|
||||||
|
int tmpMsgSize = Message.class.cast(command).getSize();
|
||||||
|
if(maxFrameSize.get() < tmpMsgSize) {
|
||||||
|
throw new JMSException("Message size: " + tmpMsgSize + " exceeds maximum allowed by broker: " + maxFrameSize.get(), "41300");
|
||||||
|
}
|
||||||
|
}
|
||||||
try {
|
try {
|
||||||
Response response = (Response)(timeout > 0
|
Response response = (Response)(timeout > 0
|
||||||
? this.transport.request(command, timeout)
|
? this.transport.request(command, timeout)
|
||||||
|
@ -1980,6 +1987,17 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
|
||||||
|
|
||||||
protected void onWireFormatInfo(WireFormatInfo info) {
|
protected void onWireFormatInfo(WireFormatInfo info) {
|
||||||
protocolVersion.set(info.getVersion());
|
protocolVersion.set(info.getVersion());
|
||||||
|
|
||||||
|
long tmpMaxFrameSize = 0;
|
||||||
|
try {
|
||||||
|
tmpMaxFrameSize = info.getMaxFrameSize();
|
||||||
|
} catch (IOException e) {
|
||||||
|
// unmarshal error on property map
|
||||||
|
}
|
||||||
|
|
||||||
|
if(tmpMaxFrameSize > 0) {
|
||||||
|
maxFrameSize.set(tmpMaxFrameSize);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
Loading…
Reference in New Issue