mirror of https://github.com/apache/activemq.git
[AMQ-8412] Client-side management of max frame size
- Store maxFrameSize on client-side - Check messages on outbound send to send an actionable exception to the caller - Add a reason code for max message size exceeded (client-side)
This commit is contained in:
parent
d5441530a9
commit
48d0fb1171
|
@ -35,6 +35,7 @@ import java.util.concurrent.ThreadPoolExecutor;
|
|||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.ConnectionConsumer;
|
||||
|
@ -192,6 +193,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
|
|||
// Assume that protocol is the latest. Change to the actual protocol
|
||||
// version when a WireFormatInfo is received.
|
||||
private final AtomicInteger protocolVersion = new AtomicInteger(CommandTypes.PROTOCOL_VERSION);
|
||||
private final AtomicLong maxFrameSize = new AtomicLong(Long.MAX_VALUE);
|
||||
private final long timeCreated;
|
||||
private final ConnectionAudit connectionAudit = new ConnectionAudit();
|
||||
private DestinationSource destinationSource;
|
||||
|
@ -1446,7 +1448,12 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
|
|||
if (isClosed()) {
|
||||
throw new ConnectionClosedException();
|
||||
} else {
|
||||
|
||||
if(command.isMessage()) {
|
||||
int tmpMsgSize = Message.class.cast(command).getSize();
|
||||
if(maxFrameSize.get() < tmpMsgSize) {
|
||||
throw new JMSException("Message size: " + tmpMsgSize + " exceeds maximum allowed by broker: " + maxFrameSize.get(), "41300");
|
||||
}
|
||||
}
|
||||
try {
|
||||
Response response = (Response)(timeout > 0
|
||||
? this.transport.request(command, timeout)
|
||||
|
@ -1980,6 +1987,17 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
|
|||
|
||||
protected void onWireFormatInfo(WireFormatInfo info) {
|
||||
protocolVersion.set(info.getVersion());
|
||||
|
||||
long tmpMaxFrameSize = 0;
|
||||
try {
|
||||
tmpMaxFrameSize = info.getMaxFrameSize();
|
||||
} catch (IOException e) {
|
||||
// unmarshal error on property map
|
||||
}
|
||||
|
||||
if(tmpMaxFrameSize > 0) {
|
||||
maxFrameSize.set(tmpMaxFrameSize);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
Loading…
Reference in New Issue