From afc50ee7275b5417d5768db84a9fe9054d526e7a Mon Sep 17 00:00:00 2001 From: Matt Pavlovich Date: Mon, 24 Jan 2022 13:10:09 -0600 Subject: [PATCH] [AMQ-8412] Client-side management of max frame size - Store maxFrameSize on client-side - Check messages on outbound send to send an actionable exception to the caller - Add a reason code for max message size exceeded (client-side) (cherry picked from commit 48d0fb11716d3490b1d76f7e8860b61324d4e82f) --- .../apache/activemq/ActiveMQConnection.java | 20 ++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java index 5d11fb6d08..2be8203622 100644 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java @@ -35,6 +35,7 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import javax.jms.Connection; import javax.jms.ConnectionConsumer; @@ -192,6 +193,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon // Assume that protocol is the latest. Change to the actual protocol // version when a WireFormatInfo is received. private final AtomicInteger protocolVersion = new AtomicInteger(CommandTypes.PROTOCOL_VERSION); + private final AtomicLong maxFrameSize = new AtomicLong(Long.MAX_VALUE); private final long timeCreated; private final ConnectionAudit connectionAudit = new ConnectionAudit(); private DestinationSource destinationSource; @@ -1386,7 +1388,12 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon if (isClosed()) { throw new ConnectionClosedException(); } else { - + if(command.isMessage()) { + int tmpMsgSize = Message.class.cast(command).getSize(); + if(maxFrameSize.get() < tmpMsgSize) { + throw new JMSException("Message size: " + tmpMsgSize + " exceeds maximum allowed by broker: " + maxFrameSize.get(), "41300"); + } + } try { Response response = (Response)(timeout > 0 ? this.transport.request(command, timeout) @@ -1920,6 +1927,17 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon protected void onWireFormatInfo(WireFormatInfo info) { protocolVersion.set(info.getVersion()); + + long tmpMaxFrameSize = 0; + try { + tmpMaxFrameSize = info.getMaxFrameSize(); + } catch (IOException e) { + // unmarshal error on property map + } + + if(tmpMaxFrameSize > 0) { + maxFrameSize.set(tmpMaxFrameSize); + } } /**