diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java index 742c2a8d79..bf0d09f955 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java @@ -626,7 +626,7 @@ public final class StompConnection extends AbstractRemotingConnection { public StompFrame createStompMessage(ICoreMessage message, StompSubscription subscription, ServerConsumer consumer, - int deliveryCount) { + int deliveryCount) throws ActiveMQException { return frameHandler.createMessageFrame(message, subscription, consumer, deliveryCount); } diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java index 77b6c78110..b338456b9e 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java @@ -16,14 +16,18 @@ */ package org.apache.activemq.artemis.core.protocol.stomp; +import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.concurrent.ScheduledExecutorService; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.ActiveMQBuffers; +import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.message.LargeBodyReader; import org.apache.activemq.artemis.core.message.impl.CoreMessage; import org.apache.activemq.artemis.core.protocol.stomp.Stomp.Headers; import org.apache.activemq.artemis.core.protocol.stomp.v10.StompFrameHandlerV10; @@ -36,6 +40,8 @@ import static org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompProto public abstract class VersionedStompFrameHandler { + protected static byte[] EMPTY_BODY = new byte[0]; + protected StompConnection connection; protected StompDecoder decoder; @@ -324,29 +330,18 @@ public abstract class VersionedStompFrameHandler { public StompFrame createMessageFrame(ICoreMessage serverMessage, StompSubscription subscription, ServerConsumer consumer, - int deliveryCount) { - StompFrame frame = createStompFrame(Stomp.Responses.MESSAGE); + int deliveryCount) throws ActiveMQException { + final StompFrame frame = createStompFrame(Stomp.Responses.MESSAGE); if (subscription.getID() != null) { frame.addHeader(Stomp.Headers.Message.SUBSCRIPTION, subscription.getID()); } - ActiveMQBuffer buffer = serverMessage.getReadOnlyBodyBuffer(); - - byte[] data = new byte[buffer.writerIndex()]; - - if (data.length > 0) { - if (serverMessage.containsProperty(Stomp.Headers.CONTENT_LENGTH) || serverMessage.getType() == Message.BYTES_TYPE) { - frame.addHeader(Headers.CONTENT_LENGTH, String.valueOf(data.length)); - buffer.readBytes(data); - } else { - SimpleString text = buffer.readNullableSimpleString(); - if (text != null) { - data = text.toString().getBytes(StandardCharsets.UTF_8); - } - } + if (serverMessage.isLargeMessage()) { + populateFrameBodyFromLargeMessage(frame, serverMessage); + } else { + populateFrameBodyFromMessage(frame, serverMessage); } - frame.setByteBody(data); frame.addHeader(Stomp.Headers.Message.MESSAGE_ID, new StringBuilder(41).append(consumer.getID()).append(StompSession.MESSAGE_ID_SEPARATOR).append(serverMessage.getMessageID()).toString()); StompUtils.copyStandardHeadersFromMessageToFrame(serverMessage, frame, deliveryCount); @@ -354,6 +349,62 @@ public abstract class VersionedStompFrameHandler { return frame; } + private void populateFrameBodyFromMessage(StompFrame frame, ICoreMessage serverMessage) { + final ActiveMQBuffer buffer = serverMessage.getReadOnlyBodyBuffer(); + final int bodyLength = buffer.readableBytes(); + + if (bodyLength > 0) { + if (serverMessage.containsProperty(Stomp.Headers.CONTENT_LENGTH) || serverMessage.getType() == Message.BYTES_TYPE) { + final byte[] data = new byte[bodyLength]; + + buffer.readBytes(data); + + frame.addHeader(Headers.CONTENT_LENGTH, String.valueOf(bodyLength)); + frame.setByteBody(data); + } else { + final SimpleString text = buffer.readNullableSimpleString(); + + if (text != null) { + frame.setByteBody(text.toString().getBytes(StandardCharsets.UTF_8)); + } + } + } else { + frame.setByteBody(EMPTY_BODY); + } + } + + private void populateFrameBodyFromLargeMessage(StompFrame frame, ICoreMessage serverMessage) throws ActiveMQException { + try (LargeBodyReader reader = serverMessage.getLargeBodyReader()) { + reader.open(); + + final int bodyLength = (int) reader.getSize(); + + if (bodyLength > 0) { + final byte[] bodyBytes = new byte[bodyLength]; + final ByteBuffer bodyBuffer = ByteBuffer.wrap(bodyBytes); + + reader.readInto(bodyBuffer); + + if (serverMessage.containsProperty(Stomp.Headers.CONTENT_LENGTH) || serverMessage.getType() == Message.BYTES_TYPE) { + frame.addHeader(Headers.CONTENT_LENGTH, String.valueOf(bodyLength)); + frame.setByteBody(bodyBytes); + } else { + final ActiveMQBuffer buffer = ActiveMQBuffers.wrappedBuffer(bodyBuffer); + + buffer.writerIndex(bodyLength); + + final SimpleString text = buffer.readNullableSimpleString(); + + if (text != null) { + frame.setByteBody(text.toString().getBytes(StandardCharsets.UTF_8)); + } + } + } else { + frame.setByteBody(EMPTY_BODY); + } + } + } + /** * this method is called when a newer version of handler is created. It should * take over the state of the decoder of the existingHandler so that @@ -391,5 +442,4 @@ public abstract class VersionedStompFrameHandler { response.addHeader(Stomp.Headers.Error.MESSAGE, responseText); return response; } - } diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v12/StompFrameHandlerV12.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v12/StompFrameHandlerV12.java index 6ee5fe382c..9c3d43ad41 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v12/StompFrameHandlerV12.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v12/StompFrameHandlerV12.java @@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.protocol.stomp.v12; import java.util.concurrent.ScheduledExecutorService; +import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompException; import org.apache.activemq.artemis.core.protocol.stomp.Stomp; @@ -52,7 +53,7 @@ public class StompFrameHandlerV12 extends StompFrameHandlerV11 { public StompFrame createMessageFrame(ICoreMessage serverMessage, StompSubscription subscription, ServerConsumer consumer, - int deliveryCount) { + int deliveryCount) throws ActiveMQException { StompFrame frame = super.createMessageFrame(serverMessage, subscription, consumer, deliveryCount); if (!subscription.getAck().equals(Stomp.Headers.Subscribe.AckModeValues.AUTO)) { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java index 02346976ef..c4c25dcb83 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java @@ -2120,4 +2120,54 @@ public class StompTest extends StompTestBase { conn.disconnect(); } + @Test + public void testMultipleSubscriptionsOnMulticastAddressReadSameMessage() throws Exception { + doTestMultipleSubscriptionsOnMulticastAddressReadSameMessage(10); + } + + @Test + public void testMultipleSubscriptionsOnMulticastAddressReadSameLargeMessage() throws Exception { + doTestMultipleSubscriptionsOnMulticastAddressReadSameMessage(120_000); + } + + private void doTestMultipleSubscriptionsOnMulticastAddressReadSameMessage(int size) throws Exception { + final String body = "A".repeat(size); + + final StompClientConnection conn_r1 = StompClientConnectionFactory.createClientConnection(uri); + final StompClientConnection conn_r2 = StompClientConnectionFactory.createClientConnection(uri); + + try { + conn_r1.connect(defUser, defPass); + subscribeTopic(conn_r1, null, null, null); + + conn_r2.connect(defUser, defPass); + subscribeTopic(conn_r2, null, null, null); + + // Sender + conn.connect(defUser, defPass); + send(conn, getTopicPrefix() + getTopicName(), null, body, true, RoutingType.MULTICAST); + + ClientStompFrame frame1 = conn_r1.receiveFrame(10000); + ClientStompFrame frame2 = conn_r2.receiveFrame(10000); + + assertEquals(Stomp.Responses.MESSAGE, frame2.getCommand()); + assertEquals(Stomp.Responses.MESSAGE, frame1.getCommand()); + + assertEquals(getTopicPrefix() + getTopicName(), frame2.getHeader(Stomp.Headers.Send.DESTINATION)); + assertEquals(getTopicPrefix() + getTopicName(), frame1.getHeader(Stomp.Headers.Send.DESTINATION)); + + assertEquals(RoutingType.MULTICAST.toString(), frame2.getHeader(Stomp.Headers.Send.DESTINATION_TYPE)); + assertEquals(RoutingType.MULTICAST.toString(), frame1.getHeader(Stomp.Headers.Send.DESTINATION_TYPE)); + + assertNull(frame2.getHeader(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE.toString())); + assertNull(frame1.getHeader(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE.toString())); + + assertEquals(body, frame2.getBody()); + assertEquals(body, frame1.getBody()); + } finally { + conn.disconnect(); + conn_r1.disconnect(); + conn_r2.disconnect(); + } + } }