ARTEMIS-4529 NPE with empty core message and STOMP consumer

This commit is contained in:
Justin Bertram 2023-12-06 15:26:07 -06:00 committed by Gary Tully
parent ad5cf4c28b
commit b0545d9c45
5 changed files with 38 additions and 37 deletions

View File

@ -622,11 +622,10 @@ public final class StompConnection extends AbstractRemotingConnection {
return SERVER_NAME;
}
public StompFrame createStompMessage(ICoreMessage serverMessage,
ActiveMQBuffer bodyBuffer,
public StompFrame createStompMessage(ICoreMessage message,
StompSubscription subscription,
int deliveryCount) throws Exception {
return frameHandler.createMessageFrame(serverMessage, bodyBuffer, subscription, deliveryCount);
int deliveryCount) {
return frameHandler.createMessageFrame(message, subscription, deliveryCount);
}
public void addStompEventListener(FrameEventListener listener) {

View File

@ -153,29 +153,22 @@ public class StompSession implements SessionCallback {
}
@Override
public int sendMessage(MessageReference ref,
Message serverMessage,
final ServerConsumer consumer,
int deliveryCount) {
ICoreMessage coreMessage = serverMessage.toCore();
ICoreMessage newServerMessage = serverMessage.toCore();
public int sendMessage(MessageReference ref, Message serverMessage, final ServerConsumer consumer, int deliveryCount) {
ICoreMessage message = ref.getMessage().toCore();
try {
StompSubscription subscription = subscriptions.get(consumer.getID());
// subscription might be null if the consumer was closed
if (subscription == null)
return 0;
StompFrame frame;
ActiveMQBuffer buffer = coreMessage.getDataBuffer();
frame = connection.createStompMessage(newServerMessage, buffer, subscription, deliveryCount);
frame = connection.createStompMessage(message, subscription, deliveryCount);
int length = frame.getEncodedSize();
if (subscription.getAck().equals(Stomp.Headers.Subscribe.AckModeValues.AUTO)) {
if (manager.send(connection, frame)) {
final long messageID = newServerMessage.getMessageID();
final long messageID = message.getMessageID();
final long consumerID = consumer.getID();
// this will be called after the delivery is complete
@ -191,14 +184,14 @@ public class StompSession implements SessionCallback {
});
}
} else {
messagesToAck.put(newServerMessage.getMessageID(), new Pair<>(consumer.getID(), length));
messagesToAck.put(message.getMessageID(), new Pair<>(consumer.getID(), length));
// Must send AFTER adding to messagesToAck - or could get acked from client BEFORE it's been added!
manager.send(connection, frame);
}
return length;
} catch (Exception e) {
ActiveMQStompProtocolLogger.LOGGER.unableToSendMessageToClient(coreMessage, e);
ActiveMQStompProtocolLogger.LOGGER.unableToSendMessageToClient(message, e);
return 0;
}
}

View File

@ -320,31 +320,26 @@ public abstract class VersionedStompFrameHandler {
return response;
}
public StompFrame createMessageFrame(ICoreMessage serverMessage,
ActiveMQBuffer bodyBuffer,
StompSubscription subscription,
int deliveryCount) throws Exception {
public StompFrame createMessageFrame(ICoreMessage serverMessage, StompSubscription subscription, int deliveryCount) {
StompFrame frame = createStompFrame(Stomp.Responses.MESSAGE);
if (subscription.getID() != null) {
frame.addHeader(Stomp.Headers.Message.SUBSCRIPTION, subscription.getID());
}
ActiveMQBuffer buffer = bodyBuffer != null ? bodyBuffer : serverMessage.getReadOnlyBodyBuffer();
ActiveMQBuffer buffer = serverMessage.getReadOnlyBodyBuffer();
int size = buffer.writerIndex();
byte[] data = new byte[buffer.writerIndex()];
byte[] data = new byte[size];
if (serverMessage.containsProperty(Stomp.Headers.CONTENT_LENGTH) || serverMessage.getType() == Message.BYTES_TYPE) {
frame.addHeader(Headers.CONTENT_LENGTH, String.valueOf(data.length));
buffer.readBytes(data);
} else {
SimpleString text = buffer.readNullableSimpleString();
if (text != null) {
data = text.toString().getBytes(StandardCharsets.UTF_8);
if (data.length > 0) {
if (serverMessage.containsProperty(Stomp.Headers.CONTENT_LENGTH) || serverMessage.getType() == Message.BYTES_TYPE) {
frame.addHeader(Headers.CONTENT_LENGTH, String.valueOf(data.length));
buffer.readBytes(data);
} else {
data = new byte[0];
SimpleString text = buffer.readNullableSimpleString();
if (text != null) {
data = text.toString().getBytes(StandardCharsets.UTF_8);
}
}
}
frame.setByteBody(data);

View File

@ -18,7 +18,6 @@ package org.apache.activemq.artemis.core.protocol.stomp.v12;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompException;
import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
@ -50,10 +49,9 @@ public class StompFrameHandlerV12 extends StompFrameHandlerV11 {
@Override
public StompFrame createMessageFrame(ICoreMessage serverMessage,
ActiveMQBuffer bodyBuffer,
StompSubscription subscription,
int deliveryCount) throws Exception {
StompFrame frame = super.createMessageFrame(serverMessage, bodyBuffer, subscription, deliveryCount);
int deliveryCount) {
StompFrame frame = super.createMessageFrame(serverMessage, subscription, deliveryCount);
if (!subscription.getAck().equals(Stomp.Headers.Subscribe.AckModeValues.AUTO)) {
frame.addHeader(Stomp.Headers.Message.ACK, String.valueOf(serverMessage.getMessageID()));

View File

@ -344,6 +344,22 @@ public class StompTest extends StompTestBase {
}
@Test
public void sendEmptyCoreMessage() throws Exception {
conn.connect(defUser, defPass);
subscribe(conn, null, Stomp.Headers.Subscribe.AckModeValues.AUTO);
// send core JMS message
MessageProducer mp = session.createProducer(session.createQueue(getQueuePrefix() + getQueueName()));
Message m = session.createMessage();
mp.send(m);
// Receive STOMP Message
ClientStompFrame frame = conn.receiveFrame();
assertNotNull(frame);
assertNull(frame.getBody());
}
public void sendMessageToNonExistentQueue(String queuePrefix,
String queue,
RoutingType routingType) throws Exception {