ARTEMIS-5054 Fix concurrent access issue of large message to Stomp Frame

When converting a large server message to an outgoing STOMP frame the converter
is allowing unsafe concurrent access to the large message internals which leads
to failures on message deliver as the state is out of sync amongst the dispatch
threads.
This commit is contained in:
Timothy Bish 2024-09-25 08:24:03 -04:00 committed by Robbie Gemmell
parent 9bb63b656f
commit e7ed4700e1
4 changed files with 121 additions and 20 deletions

View File

@ -626,7 +626,7 @@ public final class StompConnection extends AbstractRemotingConnection {
public StompFrame createStompMessage(ICoreMessage message,
StompSubscription subscription,
ServerConsumer consumer,
int deliveryCount) {
int deliveryCount) throws ActiveMQException {
return frameHandler.createMessageFrame(message, subscription, consumer, deliveryCount);
}

View File

@ -16,14 +16,18 @@
*/
package org.apache.activemq.artemis.core.protocol.stomp;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.message.LargeBodyReader;
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.protocol.stomp.Stomp.Headers;
import org.apache.activemq.artemis.core.protocol.stomp.v10.StompFrameHandlerV10;
@ -36,6 +40,8 @@ import static org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompProto
public abstract class VersionedStompFrameHandler {
protected static byte[] EMPTY_BODY = new byte[0];
protected StompConnection connection;
protected StompDecoder decoder;
@ -324,29 +330,18 @@ public abstract class VersionedStompFrameHandler {
public StompFrame createMessageFrame(ICoreMessage serverMessage,
StompSubscription subscription,
ServerConsumer consumer,
int deliveryCount) {
StompFrame frame = createStompFrame(Stomp.Responses.MESSAGE);
int deliveryCount) throws ActiveMQException {
final StompFrame frame = createStompFrame(Stomp.Responses.MESSAGE);
if (subscription.getID() != null) {
frame.addHeader(Stomp.Headers.Message.SUBSCRIPTION, subscription.getID());
}
ActiveMQBuffer buffer = serverMessage.getReadOnlyBodyBuffer();
byte[] data = new byte[buffer.writerIndex()];
if (data.length > 0) {
if (serverMessage.containsProperty(Stomp.Headers.CONTENT_LENGTH) || serverMessage.getType() == Message.BYTES_TYPE) {
frame.addHeader(Headers.CONTENT_LENGTH, String.valueOf(data.length));
buffer.readBytes(data);
} else {
SimpleString text = buffer.readNullableSimpleString();
if (text != null) {
data = text.toString().getBytes(StandardCharsets.UTF_8);
}
}
if (serverMessage.isLargeMessage()) {
populateFrameBodyFromLargeMessage(frame, serverMessage);
} else {
populateFrameBodyFromMessage(frame, serverMessage);
}
frame.setByteBody(data);
frame.addHeader(Stomp.Headers.Message.MESSAGE_ID, new StringBuilder(41).append(consumer.getID()).append(StompSession.MESSAGE_ID_SEPARATOR).append(serverMessage.getMessageID()).toString());
StompUtils.copyStandardHeadersFromMessageToFrame(serverMessage, frame, deliveryCount);
@ -354,6 +349,62 @@ public abstract class VersionedStompFrameHandler {
return frame;
}
private void populateFrameBodyFromMessage(StompFrame frame, ICoreMessage serverMessage) {
final ActiveMQBuffer buffer = serverMessage.getReadOnlyBodyBuffer();
final int bodyLength = buffer.readableBytes();
if (bodyLength > 0) {
if (serverMessage.containsProperty(Stomp.Headers.CONTENT_LENGTH) || serverMessage.getType() == Message.BYTES_TYPE) {
final byte[] data = new byte[bodyLength];
buffer.readBytes(data);
frame.addHeader(Headers.CONTENT_LENGTH, String.valueOf(bodyLength));
frame.setByteBody(data);
} else {
final SimpleString text = buffer.readNullableSimpleString();
if (text != null) {
frame.setByteBody(text.toString().getBytes(StandardCharsets.UTF_8));
}
}
} else {
frame.setByteBody(EMPTY_BODY);
}
}
private void populateFrameBodyFromLargeMessage(StompFrame frame, ICoreMessage serverMessage) throws ActiveMQException {
try (LargeBodyReader reader = serverMessage.getLargeBodyReader()) {
reader.open();
final int bodyLength = (int) reader.getSize();
if (bodyLength > 0) {
final byte[] bodyBytes = new byte[bodyLength];
final ByteBuffer bodyBuffer = ByteBuffer.wrap(bodyBytes);
reader.readInto(bodyBuffer);
if (serverMessage.containsProperty(Stomp.Headers.CONTENT_LENGTH) || serverMessage.getType() == Message.BYTES_TYPE) {
frame.addHeader(Headers.CONTENT_LENGTH, String.valueOf(bodyLength));
frame.setByteBody(bodyBytes);
} else {
final ActiveMQBuffer buffer = ActiveMQBuffers.wrappedBuffer(bodyBuffer);
buffer.writerIndex(bodyLength);
final SimpleString text = buffer.readNullableSimpleString();
if (text != null) {
frame.setByteBody(text.toString().getBytes(StandardCharsets.UTF_8));
}
}
} else {
frame.setByteBody(EMPTY_BODY);
}
}
}
/**
* this method is called when a newer version of handler is created. It should
* take over the state of the decoder of the existingHandler so that
@ -391,5 +442,4 @@ public abstract class VersionedStompFrameHandler {
response.addHeader(Stomp.Headers.Error.MESSAGE, responseText);
return response;
}
}

View File

@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.protocol.stomp.v12;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompException;
import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
@ -52,7 +53,7 @@ public class StompFrameHandlerV12 extends StompFrameHandlerV11 {
public StompFrame createMessageFrame(ICoreMessage serverMessage,
StompSubscription subscription,
ServerConsumer consumer,
int deliveryCount) {
int deliveryCount) throws ActiveMQException {
StompFrame frame = super.createMessageFrame(serverMessage, subscription, consumer, deliveryCount);
if (!subscription.getAck().equals(Stomp.Headers.Subscribe.AckModeValues.AUTO)) {

View File

@ -2120,4 +2120,54 @@ public class StompTest extends StompTestBase {
conn.disconnect();
}
@Test
public void testMultipleSubscriptionsOnMulticastAddressReadSameMessage() throws Exception {
doTestMultipleSubscriptionsOnMulticastAddressReadSameMessage(10);
}
@Test
public void testMultipleSubscriptionsOnMulticastAddressReadSameLargeMessage() throws Exception {
doTestMultipleSubscriptionsOnMulticastAddressReadSameMessage(120_000);
}
private void doTestMultipleSubscriptionsOnMulticastAddressReadSameMessage(int size) throws Exception {
final String body = "A".repeat(size);
final StompClientConnection conn_r1 = StompClientConnectionFactory.createClientConnection(uri);
final StompClientConnection conn_r2 = StompClientConnectionFactory.createClientConnection(uri);
try {
conn_r1.connect(defUser, defPass);
subscribeTopic(conn_r1, null, null, null);
conn_r2.connect(defUser, defPass);
subscribeTopic(conn_r2, null, null, null);
// Sender
conn.connect(defUser, defPass);
send(conn, getTopicPrefix() + getTopicName(), null, body, true, RoutingType.MULTICAST);
ClientStompFrame frame1 = conn_r1.receiveFrame(10000);
ClientStompFrame frame2 = conn_r2.receiveFrame(10000);
assertEquals(Stomp.Responses.MESSAGE, frame2.getCommand());
assertEquals(Stomp.Responses.MESSAGE, frame1.getCommand());
assertEquals(getTopicPrefix() + getTopicName(), frame2.getHeader(Stomp.Headers.Send.DESTINATION));
assertEquals(getTopicPrefix() + getTopicName(), frame1.getHeader(Stomp.Headers.Send.DESTINATION));
assertEquals(RoutingType.MULTICAST.toString(), frame2.getHeader(Stomp.Headers.Send.DESTINATION_TYPE));
assertEquals(RoutingType.MULTICAST.toString(), frame1.getHeader(Stomp.Headers.Send.DESTINATION_TYPE));
assertNull(frame2.getHeader(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE.toString()));
assertNull(frame1.getHeader(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE.toString()));
assertEquals(body, frame2.getBody());
assertEquals(body, frame1.getBody());
} finally {
conn.disconnect();
conn_r1.disconnect();
conn_r2.disconnect();
}
}
}