ARTEMIS-2922 artemis-cli consumer on large message results in a ClassCastException

This commit is contained in:
Justin Bertram 2021-10-25 12:01:20 -05:00 committed by clebertsuconic
parent 3f9de5fa30
commit f1004c8b94
2 changed files with 36 additions and 3 deletions

View File

@ -25,6 +25,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.client.impl.ClientLargeMessageImpl;
import org.apache.activemq.artemis.core.message.LargeBodyReader; import org.apache.activemq.artemis.core.message.LargeBodyReader;
import org.apache.activemq.artemis.core.server.LargeServerMessage; import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.reader.TextMessageUtil; import org.apache.activemq.artemis.reader.TextMessageUtil;
@ -58,7 +59,7 @@ public class XMLMessageExporter {
xmlWriter.writeStartElement(XmlDataConstants.MESSAGE_BODY); xmlWriter.writeStartElement(XmlDataConstants.MESSAGE_BODY);
if (message.isLargeMessage()) { if (message.isLargeMessage()) {
printLargeMessageBody((LargeServerMessage) message); printLargeMessageBody(message);
} else { } else {
if (encodeTextMessageUTF8 && message.toCore().getType() == Message.TEXT_TYPE) { if (encodeTextMessageUTF8 && message.toCore().getType() == Message.TEXT_TYPE) {
xmlWriter.writeCData(TextMessageUtil.readBodyText(message.toCore().getReadOnlyBodyBuffer()).toString()); xmlWriter.writeCData(TextMessageUtil.readBodyText(message.toCore().getReadOnlyBodyBuffer()).toString());
@ -78,12 +79,18 @@ public class XMLMessageExporter {
return chunkBytes; return chunkBytes;
} }
public void printLargeMessageBody(LargeServerMessage message) throws XMLStreamException { public void printLargeMessageBody(Message message) throws XMLStreamException {
xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_IS_LARGE, Boolean.TRUE.toString()); xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_IS_LARGE, Boolean.TRUE.toString());
LargeBodyReader encoder = null; LargeBodyReader encoder = null;
try { try {
encoder = message.toMessage().toCore().getLargeBodyReader(); if (message instanceof LargeServerMessage) {
encoder = ((LargeServerMessage)message).toMessage().toCore().getLargeBodyReader();
} else if (message instanceof ClientLargeMessageImpl) {
encoder = ((ClientLargeMessageImpl)message).getLargeBodyReader();
} else {
throw new RuntimeException("Unrecognized message implementation: " + message.getClass().getName());
}
encoder.open(); encoder.open();
long totalBytesWritten = 0; long totalBytesWritten = 0;
int bufferSize; int bufferSize;

View File

@ -34,6 +34,7 @@ import java.util.List;
import java.util.UUID; import java.util.UUID;
import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.cli.commands.address.CreateAddress; import org.apache.activemq.artemis.cli.commands.address.CreateAddress;
import org.apache.activemq.artemis.cli.commands.messages.Consumer; import org.apache.activemq.artemis.cli.commands.messages.Consumer;
import org.apache.activemq.artemis.cli.commands.messages.Producer; import org.apache.activemq.artemis.cli.commands.messages.Producer;
@ -93,6 +94,17 @@ public class MessageSerializerTest extends CliTestBase {
return messages; return messages;
} }
private List<Message> generateLargeTextMessages(Session session, Destination destination) throws Exception {
List<Message> messages = new ArrayList<>(TEST_MESSAGE_COUNT);
for (int i = 0; i < TEST_MESSAGE_COUNT; i++) {
messages.add(session.createTextMessage(new String(RandomUtil.randomBytes(ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE * 2))));
}
sendMessages(session, destination, messages);
return messages;
}
private void checkSentMessages(Session session, List<Message> messages, String address) throws Exception { private void checkSentMessages(Session session, List<Message> messages, String address) throws Exception {
checkSentMessages(session, messages, address, null); checkSentMessages(session, messages, address, null);
} }
@ -188,6 +200,20 @@ public class MessageSerializerTest extends CliTestBase {
checkSentMessages(session, sent, address); checkSentMessages(session, sent, address);
} }
@Test
public void testLargeMessageExport() throws Exception {
String address = "test";
File file = createMessageFile();
Session session = createSession(connection);
generateLargeTextMessages(session, getDestination(address));
exportMessages(address, file);
Wait.assertTrue(() -> verifyMessageCount(address, 0), 2000, 100);
}
@Test @Test
public void testObjectMessageImportExport() throws Exception { public void testObjectMessageImportExport() throws Exception {
String address = "test"; String address = "test";