[AMQ-8412] Client-side management of max frame size

- Store maxFrameSize on client-side
 - Check messages on outbound send to send an actionable exception to the caller
 - Add a reason code for max message size exceeded (client-side)

(cherry picked from commit 48d0fb1171)
This commit is contained in:
Matt Pavlovich 2022-01-24 13:10:09 -06:00 committed by Jean-Baptiste Onofré
parent f1b37e0a4f
commit afc50ee727
1 changed files with 19 additions and 1 deletions

View File

@ -35,6 +35,7 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.Connection; import javax.jms.Connection;
import javax.jms.ConnectionConsumer; import javax.jms.ConnectionConsumer;
@ -192,6 +193,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
// Assume that protocol is the latest. Change to the actual protocol // Assume that protocol is the latest. Change to the actual protocol
// version when a WireFormatInfo is received. // version when a WireFormatInfo is received.
private final AtomicInteger protocolVersion = new AtomicInteger(CommandTypes.PROTOCOL_VERSION); private final AtomicInteger protocolVersion = new AtomicInteger(CommandTypes.PROTOCOL_VERSION);
private final AtomicLong maxFrameSize = new AtomicLong(Long.MAX_VALUE);
private final long timeCreated; private final long timeCreated;
private final ConnectionAudit connectionAudit = new ConnectionAudit(); private final ConnectionAudit connectionAudit = new ConnectionAudit();
private DestinationSource destinationSource; private DestinationSource destinationSource;
@ -1386,7 +1388,12 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
if (isClosed()) { if (isClosed()) {
throw new ConnectionClosedException(); throw new ConnectionClosedException();
} else { } else {
if(command.isMessage()) {
int tmpMsgSize = Message.class.cast(command).getSize();
if(maxFrameSize.get() < tmpMsgSize) {
throw new JMSException("Message size: " + tmpMsgSize + " exceeds maximum allowed by broker: " + maxFrameSize.get(), "41300");
}
}
try { try {
Response response = (Response)(timeout > 0 Response response = (Response)(timeout > 0
? this.transport.request(command, timeout) ? this.transport.request(command, timeout)
@ -1920,6 +1927,17 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
protected void onWireFormatInfo(WireFormatInfo info) { protected void onWireFormatInfo(WireFormatInfo info) {
protocolVersion.set(info.getVersion()); protocolVersion.set(info.getVersion());
long tmpMaxFrameSize = 0;
try {
tmpMaxFrameSize = info.getMaxFrameSize();
} catch (IOException e) {
// unmarshal error on property map
}
if(tmpMaxFrameSize > 0) {
maxFrameSize.set(tmpMaxFrameSize);
}
} }
/** /**