ARTEMIS-4235 fix map msg conversion from OpenWire->core

This commit is contained in:
Justin Bertram 2023-04-04 12:30:26 -05:00 committed by Robbie Gemmell
parent e368dacc78
commit 8abdee29e9
3 changed files with 164 additions and 3 deletions

View File

@ -111,9 +111,13 @@ public final class OpenWireMessageConverter {
final ActiveMQBuffer body = coreMessage.getBodyBuffer();
final ByteSequence contents = messageSend.getContent();
if (contents == null && coreType == org.apache.activemq.artemis.api.core.Message.TEXT_TYPE) {
body.writeNullableString(null);
} else if (contents != null) {
if (contents == null) {
if (coreType == org.apache.activemq.artemis.api.core.Message.TEXT_TYPE) {
body.writeNullableString(null);
} else if (coreType == org.apache.activemq.artemis.api.core.Message.MAP_TYPE) {
body.writeByte(DataConstants.NULL);
}
} else {
final boolean messageCompressed = messageSend.isCompressed();
if (messageCompressed) {
coreMessage.putBooleanProperty(OpenWireConstants.AMQ_MSG_COMPRESSED, true);

View File

@ -17,6 +17,7 @@
package org.apache.activemq.artemis.core.protocol.openwire;
import org.apache.activemq.ActiveMQMessageAuditNoSync;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
@ -28,7 +29,9 @@ import org.apache.activemq.artemis.reader.MessageUtil;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.apache.activemq.artemis.utils.UUID;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.apache.activemq.artemis.utils.collections.TypedProperties;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMapMessage;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.MessageDispatch;
@ -135,6 +138,16 @@ public class OpenWireMessageConverterTest {
}
}
@Test
public void testEmptyMapMessage() throws Exception {
CoreMessage artemisMessage = (CoreMessage) OpenWireMessageConverter.inbound(new ActiveMQMapMessage().getMessage(), openWireFormat, null);
assertEquals(Message.MAP_TYPE, artemisMessage.getType());
ActiveMQBuffer buffer = artemisMessage.getDataBuffer();
TypedProperties map = new TypedProperties();
buffer.resetReaderIndex();
map.decode(buffer.byteBuf());
}
@Test
public void testProducerId() throws Exception {
final String PRODUCER_ID = "123:456:789";

View File

@ -19,6 +19,7 @@ package org.apache.activemq.artemis.tests.integration.jms.multiprotocol;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
@ -31,6 +32,7 @@ import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.utils.DestinationUtil;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -193,4 +195,146 @@ public class JMSMessageConsumerTest extends MultiprotocolJMSClientTestSupport {
assertEquals("color = 'BLUE'", queue.getFilter().getFilterString().toString());
}
}
@Test(timeout = 30000)
public void testEmptyMapMessageConversionBetweenOpenWireAndAMQP() throws Exception {
testEmptyMapMessageConversion(createOpenWireConnection(), createConnection());
}
@Test(timeout = 30000)
public void testEmptyMapMessageConversionBetweenAMQPAndOpenWire() throws Exception {
testEmptyMapMessageConversion(createConnection(), createOpenWireConnection());
}
@Test(timeout = 30000)
public void testEmptyMapMessageConversionBetweenCoreAndAMQP() throws Exception {
testEmptyMapMessageConversion(createCoreConnection(), createConnection());
}
@Test(timeout = 30000)
public void testEmptyMapMessageConversionBetweenAMQPAndCore() throws Exception {
testEmptyMapMessageConversion(createConnection(), createCoreConnection());
}
@Test(timeout = 30000)
public void testEmptyMapMessageConversionBetweenCoreAndOpenWire() throws Exception {
testEmptyMapMessageConversion(createCoreConnection(), createOpenWireConnection());
}
@Test(timeout = 30000)
public void testEmptyMapMessageConversionBetweenOpenWireAndCore() throws Exception {
testEmptyMapMessageConversion(createOpenWireConnection(), createCoreConnection());
}
private void testEmptyMapMessageConversion(Connection senderConnection, Connection consumerConnection) throws Exception {
try {
Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = consumerSession.createConsumer(consumerSession.createQueue(getQueueName()));
Session senderSession = senderConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = senderSession.createProducer(senderSession.createQueue(getQueueName()));
MapMessage message = senderSession.createMapMessage();
producer.send(message);
Message received = consumer.receive(1000);
assertNotNull("Should have received a message by now.", received);
assertTrue("Should be an instance of MapMessage", received instanceof MapMessage);
} finally {
senderConnection.close();
consumerConnection.close();
}
}
@Test(timeout = 30000)
public void testMapMessageConversionBetweenAMQPAndOpenWire() throws Exception {
testMapMessageConversion(createConnection(), createOpenWireConnection());
}
@Test(timeout = 30000)
public void testMapMessageConversionBetweenCoreAndAMQP() throws Exception {
testMapMessageConversion(createCoreConnection(), createConnection());
}
@Test(timeout = 30000)
public void testMapMessageConversionBetweenAMQPAndCore() throws Exception {
testMapMessageConversion(createConnection(), createCoreConnection());
}
@Test(timeout = 30000)
public void testMapMessageConversionBetweenCoreAndOpenWire() throws Exception {
testMapMessageConversion(createCoreConnection(), createOpenWireConnection());
}
@Test(timeout = 30000)
public void testMapMessageConversionBetweenOpenWireAndCore() throws Exception {
testMapMessageConversion(createOpenWireConnection(), createCoreConnection());
}
private void testMapMessageConversion(Connection senderConnection, Connection consumerConnection) throws Exception {
final boolean BOOLEAN_VALUE = RandomUtil.randomBoolean();
final String BOOLEAN_KEY = "myBoolean";
final byte BYTE_VALUE = RandomUtil.randomByte();
final String BYTE_KEY = "myByte";
final byte[] BYTES_VALUE = RandomUtil.randomBytes();
final String BYTES_KEY = "myBytes";
final char CHAR_VALUE = RandomUtil.randomChar();
final String CHAR_KEY = "myChar";
final double DOUBLE_VALUE = RandomUtil.randomDouble();
final String DOUBLE_KEY = "myDouble";
final float FLOAT_VALUE = RandomUtil.randomFloat();
final String FLOAT_KEY = "myFloat";
final int INT_VALUE = RandomUtil.randomInt();
final String INT_KEY = "myInt";
final long LONG_VALUE = RandomUtil.randomLong();
final String LONG_KEY = "myLong";
final Boolean OBJECT_VALUE = RandomUtil.randomBoolean();
final String OBJECT_KEY = "myObject";
final short SHORT_VALUE = RandomUtil.randomShort();
final String SHORT_KEY = "myShort";
final String STRING_VALUE = RandomUtil.randomString();
final String STRING_KEY = "myString";
try {
Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = consumerSession.createConsumer(consumerSession.createQueue(getQueueName()));
Session senderSession = senderConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = senderSession.createProducer(senderSession.createQueue(getQueueName()));
MapMessage message = senderSession.createMapMessage();message.setBoolean(BOOLEAN_KEY, BOOLEAN_VALUE);
message.setByte(BYTE_KEY, BYTE_VALUE);
message.setBytes(BYTES_KEY, BYTES_VALUE);
message.setChar(CHAR_KEY, CHAR_VALUE);
message.setDouble(DOUBLE_KEY, DOUBLE_VALUE);
message.setFloat(FLOAT_KEY, FLOAT_VALUE);
message.setInt(INT_KEY, INT_VALUE);
message.setLong(LONG_KEY, LONG_VALUE);
message.setObject(OBJECT_KEY, OBJECT_VALUE);
message.setShort(SHORT_KEY, SHORT_VALUE);
message.setString(STRING_KEY, STRING_VALUE);
producer.send(message);
Message received = consumer.receive(1000);
assertNotNull("Should have received a message by now.", received);
assertTrue("Should be an instance of MapMessage", received instanceof MapMessage);
MapMessage receivedMapMessage = (MapMessage) received;
assertEquals(BOOLEAN_VALUE, receivedMapMessage.getBoolean(BOOLEAN_KEY));
assertEquals(BYTE_VALUE, receivedMapMessage.getByte(BYTE_KEY));
assertEqualsByteArrays(BYTES_VALUE, receivedMapMessage.getBytes(BYTES_KEY));
assertEquals(CHAR_VALUE, receivedMapMessage.getChar(CHAR_KEY));
assertEquals(DOUBLE_VALUE, receivedMapMessage.getDouble(DOUBLE_KEY), 0);
assertEquals(FLOAT_VALUE, receivedMapMessage.getFloat(FLOAT_KEY), 0);
assertEquals(INT_VALUE, receivedMapMessage.getInt(INT_KEY));
assertEquals(LONG_VALUE, receivedMapMessage.getLong(LONG_KEY));
assertTrue(receivedMapMessage.getObject(OBJECT_KEY) instanceof Boolean);
assertEquals(OBJECT_VALUE, receivedMapMessage.getObject(OBJECT_KEY));
assertEquals(SHORT_VALUE, receivedMapMessage.getShort(SHORT_KEY));
assertEquals(STRING_VALUE, receivedMapMessage.getString(STRING_KEY));
} finally {
senderConnection.close();
consumerConnection.close();
}
}
}