ARTEMIS-3698 avoid byte[] prop values when converting from OpenWire

Avoid storing the following values as byte[] for OpenWire:
 - Marshalled properties. We already store the unmarshalled properties
   so this is altogether redundant.
 - Producer ID.
 - Message ID.
 - Various destination values.

Also, eliminate the "original transaction ID" conversion code as it's
never actually set from the incoming message.
This commit is contained in:
Justin Bertram 2022-02-24 15:05:13 -06:00 committed by clebertsuconic
parent e74eeb8268
commit 40acb18f4e
3 changed files with 130 additions and 74 deletions

View File

@ -68,7 +68,6 @@ import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.util.ByteArrayInputStream;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.ByteSequenceData;
@ -77,6 +76,7 @@ import org.apache.activemq.wireformat.WireFormat;
import org.fusesource.hawtbuf.UTF8Buffer;
import static org.apache.activemq.artemis.api.core.Message.HDR_INGRESS_TIMESTAMP;
import static org.apache.activemq.command.ActiveMQDestination.QUEUE_TYPE;
public final class OpenWireMessageConverter {
@ -92,11 +92,9 @@ public final class OpenWireMessageConverter {
private static final SimpleString AMQ_MSG_CLUSTER = new SimpleString(AMQ_PREFIX + "CLUSTER");
private static final SimpleString AMQ_MSG_COMMAND_ID = new SimpleString(AMQ_PREFIX + "COMMAND_ID");
private static final SimpleString AMQ_MSG_DATASTRUCTURE = new SimpleString(AMQ_PREFIX + "DATASTRUCTURE");
private static final SimpleString AMQ_MSG_MESSAGE_ID = new SimpleString(AMQ_PREFIX + "MESSAGE_ID");
public static final SimpleString AMQ_MSG_MESSAGE_ID = new SimpleString(AMQ_PREFIX + "MESSAGE_ID");
private static final SimpleString AMQ_MSG_ORIG_DESTINATION = new SimpleString(AMQ_PREFIX + "ORIG_DESTINATION");
private static final SimpleString AMQ_MSG_ORIG_TXID = new SimpleString(AMQ_PREFIX + "ORIG_TXID");
private static final SimpleString AMQ_MSG_PRODUCER_ID = new SimpleString(AMQ_PREFIX + "PRODUCER_ID");
private static final SimpleString AMQ_MSG_MARSHALL_PROP = new SimpleString(AMQ_PREFIX + "MARSHALL_PROP");
public static final SimpleString AMQ_MSG_PRODUCER_ID = new SimpleString(AMQ_PREFIX + "PRODUCER_ID");
private static final SimpleString AMQ_MSG_REPLY_TO = new SimpleString(AMQ_PREFIX + "REPLY_TO");
private static final SimpleString AMQ_MSG_USER_ID = new SimpleString(AMQ_PREFIX + "USER_ID");
@ -186,24 +184,19 @@ public final class OpenWireMessageConverter {
coreMessage.setGroupSequence(messageSend.getGroupSequence());
final MessageId messageId = messageSend.getMessageId();
final ByteSequence midBytes = marshaller.marshal(messageId);
midBytes.compact();
coreMessage.putBytesProperty(AMQ_MSG_MESSAGE_ID, midBytes.data);
if (messageId != null) {
coreMessage.putStringProperty(AMQ_MSG_MESSAGE_ID, SimpleString.toSimpleString(messageId.toString()));
}
coreMessage.setUserID(UUIDGenerator.getInstance().generateUUID());
final ProducerId producerId = messageSend.getProducerId();
if (producerId != null) {
final ByteSequence producerIdBytes = marshaller.marshal(producerId);
producerIdBytes.compact();
coreMessage.putBytesProperty(AMQ_MSG_PRODUCER_ID, producerIdBytes.data);
}
final ByteSequence propBytes = messageSend.getMarshalledProperties();
if (propBytes != null) {
putMsgMarshalledProperties(propBytes, messageSend, coreMessage);
coreMessage.putStringProperty(AMQ_MSG_PRODUCER_ID, SimpleString.toSimpleString(producerId.toString()));
}
putMsgProperties(messageSend, coreMessage);
final ActiveMQDestination replyTo = messageSend.getReplyTo();
if (replyTo != null) {
if (replyTo instanceof TemporaryQueue) {
@ -228,7 +221,7 @@ public final class OpenWireMessageConverter {
final ActiveMQDestination origDest = messageSend.getOriginalDestination();
if (origDest != null) {
putMsgOriginalDestination(origDest, marshaller, coreMessage);
coreMessage.putStringProperty(AMQ_MSG_ORIG_DESTINATION, origDest.getQualifiedName());
}
return coreMessage;
@ -440,12 +433,8 @@ public final class OpenWireMessageConverter {
coreMessage.putBytesProperty(AMQ_MSG_DATASTRUCTURE, dsBytes.data);
}
private static void putMsgMarshalledProperties(final ByteSequence propBytes,
final Message messageSend,
private static void putMsgProperties(final Message messageSend,
final CoreMessage coreMessage) throws IOException {
propBytes.compact();
coreMessage.putBytesProperty(AMQ_MSG_MARSHALL_PROP, propBytes.data);
//unmarshall properties to core so selector will work
final Map<String, Object> props = messageSend.getProperties();
if (!props.isEmpty()) {
props.forEach((key, value) -> {
@ -458,14 +447,6 @@ public final class OpenWireMessageConverter {
}
}
private static void putMsgOriginalDestination(final ActiveMQDestination origDest,
final WireFormat marshaller,
final CoreMessage coreMessage) throws IOException {
final ByteSequence origDestBytes = marshaller.marshal(origDest);
origDestBytes.compact();
coreMessage.putBytesProperty(AMQ_MSG_ORIG_DESTINATION, origDestBytes.data);
}
private static void loadMapIntoProperties(TypedProperties props, Map<String, Object> map) {
for (Entry<String, Object> entry : map.entrySet()) {
SimpleString key = new SimpleString(entry.getKey());
@ -629,11 +610,10 @@ public final class OpenWireMessageConverter {
amqMsg.setGroupSequence(coreMessage.getGroupSequence());
final byte[] midBytes = getObjectProperty(coreMessage, byte[].class, AMQ_MSG_MESSAGE_ID);
final SimpleString midString = getObjectProperty(coreMessage, SimpleString.class, AMQ_MSG_MESSAGE_ID);
final MessageId mid;
if (midBytes != null) {
ByteSequence midSeq = new ByteSequence(midBytes);
mid = (MessageId) marshaller.unmarshal(midSeq);
if (midString != null) {
mid = new MessageId(midString.toString());
} else {
//JMSMessageID should be started with "ID:" and needs to be globally unique (node + journal id)
String midd = "ID:" + serverNodeUUID + ":-1:-1:-1";
@ -642,32 +622,21 @@ public final class OpenWireMessageConverter {
amqMsg.setMessageId(mid);
final byte[] origDestBytes = getObjectProperty(coreMessage, byte[].class, AMQ_MSG_ORIG_DESTINATION);
final SimpleString origDestBytes = getObjectProperty(coreMessage, SimpleString.class, AMQ_MSG_ORIG_DESTINATION);
if (origDestBytes != null) {
setAMQMsgOriginalDestination(amqMsg, marshaller, origDestBytes);
amqMsg.setOriginalDestination(ActiveMQDestination.createDestination(origDestBytes.toString(), QUEUE_TYPE));
}
final byte[] origTxIdBytes = getObjectProperty(coreMessage, byte[].class, AMQ_MSG_ORIG_TXID);
if (origTxIdBytes != null) {
setAMQMsgOriginalTransactionId(amqMsg, marshaller, origTxIdBytes);
}
final byte[] producerIdBytes = getObjectProperty(coreMessage, byte[].class, AMQ_MSG_PRODUCER_ID);
if (producerIdBytes != null) {
ProducerId producerId = (ProducerId) marshaller.unmarshal(new ByteSequence(producerIdBytes));
amqMsg.setProducerId(producerId);
}
final byte[] marshalledBytes = getObjectProperty(coreMessage, byte[].class, AMQ_MSG_MARSHALL_PROP);
if (marshalledBytes != null) {
amqMsg.setMarshalledProperties(new ByteSequence(marshalledBytes));
final SimpleString producerId = getObjectProperty(coreMessage, SimpleString.class, AMQ_MSG_PRODUCER_ID);
if (producerId != null && producerId.length() > 0) {
amqMsg.setProducerId(new ProducerId(producerId.toString()));
}
amqMsg.setRedeliveryCounter(reference.getDeliveryCount() - 1);
final byte[] replyToBytes = getObjectProperty(coreMessage, byte[].class, AMQ_MSG_REPLY_TO);
final SimpleString replyToBytes = getObjectProperty(coreMessage, SimpleString.class, AMQ_MSG_REPLY_TO);
if (replyToBytes != null) {
setAMQMsgReplyTo(amqMsg, marshaller, replyToBytes);
amqMsg.setReplyTo(ActiveMQDestination.createDestination(replyToBytes.toString(), QUEUE_TYPE));
}
final SimpleString userId = getObjectProperty(coreMessage, SimpleString.class, AMQ_MSG_USER_ID);
@ -923,27 +892,6 @@ public final class OpenWireMessageConverter {
amqMsg.setDataStructure(ds);
}
private static void setAMQMsgOriginalDestination(final ActiveMQMessage amqMsg,
final WireFormat marshaller,
final byte[] origDestBytes) throws IOException {
ActiveMQDestination origDest = (ActiveMQDestination) marshaller.unmarshal(new ByteSequence(origDestBytes));
amqMsg.setOriginalDestination(origDest);
}
private static void setAMQMsgOriginalTransactionId(final ActiveMQMessage amqMsg,
final WireFormat marshaller,
final byte[] origTxIdBytes) throws IOException {
TransactionId origTxId = (TransactionId) marshaller.unmarshal(new ByteSequence(origTxIdBytes));
amqMsg.setOriginalTransactionId(origTxId);
}
private static void setAMQMsgReplyTo(final ActiveMQMessage amqMsg,
final WireFormat marshaller,
final byte[] replyToBytes) throws IOException {
ActiveMQDestination replyTo = (ActiveMQDestination) marshaller.unmarshal(new ByteSequence(replyToBytes));
amqMsg.setReplyTo(replyTo);
}
private static void setAMQMsgDlqDeliveryFailureCause(final ActiveMQMessage amqMsg,
final SimpleString dlqCause) throws IOException {
try {

View File

@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.protocol.openwire;
import org.apache.activemq.ActiveMQMessageAuditNoSync;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer;
import org.apache.activemq.artemis.core.server.MessageReference;
@ -27,14 +28,17 @@ import org.apache.activemq.artemis.reader.MessageUtil;
import org.apache.activemq.artemis.utils.UUID;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.openwire.OpenWireFormatFactory;
import org.apache.activemq.wireformat.WireFormat;
import org.junit.Test;
import org.mockito.Mockito;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@ -110,6 +114,58 @@ public class OpenWireMessageConverterTest {
assertTrue(messageDispatch.getMessage().getProperty(bytesPropertyKey) instanceof String);
}
@Test
public void testProperties() throws Exception {
ICoreMessage coreMessage = new CoreMessage().initBuffer(8);
for (int i = 0; i < 5; i++) {
coreMessage.putIntProperty(i + "", i);
}
MessageReference messageReference = new MessageReferenceImpl(coreMessage, Mockito.mock(Queue.class));
AMQConsumer amqConsumer = Mockito.mock(AMQConsumer.class);
Mockito.when(amqConsumer.getOpenwireDestination()).thenReturn(destination);
MessageDispatch marshalled = (MessageDispatch) openWireFormat.unmarshal(openWireFormat.marshal(OpenWireMessageConverter.createMessageDispatch(messageReference, coreMessage, openWireFormat, amqConsumer, nodeUUID)));
assertEquals(5, marshalled.getMessage().getProperties().keySet().size());
Message converted = OpenWireMessageConverter.inbound(marshalled.getMessage(), openWireFormat, null);
for (int i = 0; i < 5; i++) {
assertTrue(converted.containsProperty(i + ""));
}
}
@Test
public void testProducerId() throws Exception {
final String PRODUCER_ID = "123:456:789";
ActiveMQMessage classicMessage = new ActiveMQMessage();
classicMessage.setProducerId(new ProducerId(PRODUCER_ID));
classicMessage.setMessageId(new MessageId("1:1:1"));
Message artemisMessage = OpenWireMessageConverter.inbound(classicMessage.getMessage(), openWireFormat, null);
assertEquals(PRODUCER_ID, artemisMessage.getStringProperty(OpenWireMessageConverter.AMQ_MSG_PRODUCER_ID));
MessageReference messageReference = new MessageReferenceImpl(artemisMessage, Mockito.mock(Queue.class));
AMQConsumer amqConsumer = Mockito.mock(AMQConsumer.class);
Mockito.when(amqConsumer.getOpenwireDestination()).thenReturn(destination);
MessageDispatch classicMessageDispatch = OpenWireMessageConverter.createMessageDispatch(messageReference, (ICoreMessage) artemisMessage, openWireFormat, amqConsumer, nodeUUID);
assertEquals(PRODUCER_ID, classicMessageDispatch.getMessage().getProducerId().toString());
}
@Test
public void testMessageId() throws Exception {
final String MESSAGE_ID = "ID:123:456:789";
ActiveMQMessage classicMessage = new ActiveMQMessage();
classicMessage.setMessageId(new MessageId(MESSAGE_ID));
Message artemisMessage = OpenWireMessageConverter.inbound(classicMessage.getMessage(), openWireFormat, null);
assertEquals(MESSAGE_ID, artemisMessage.getStringProperty(OpenWireMessageConverter.AMQ_MSG_MESSAGE_ID));
MessageReference messageReference = new MessageReferenceImpl(artemisMessage, Mockito.mock(Queue.class));
AMQConsumer amqConsumer = Mockito.mock(AMQConsumer.class);
Mockito.when(amqConsumer.getOpenwireDestination()).thenReturn(destination);
MessageDispatch classicMessageDispatch = OpenWireMessageConverter.createMessageDispatch(messageReference, (ICoreMessage) artemisMessage, openWireFormat, amqConsumer, nodeUUID);
assertEquals(MESSAGE_ID, classicMessageDispatch.getMessage().getMessageId().toString());
}
@Test
public void testBadPropertyConversion() throws Exception {
final String hdrArrival = "__HDR_ARRIVAL";

View File

@ -17,9 +17,11 @@
package org.apache.activemq.artemis.tests.integration.crossprotocol;
import java.util.ArrayList;
import java.util.Enumeration;
import javax.jms.Connection;
import javax.jms.MessageConsumer;
import javax.jms.MessageFormatException;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Queue;
@ -118,4 +120,54 @@ public class OpenWireToAMQPTest extends ActiveMQTestBase {
}
}
}
@SuppressWarnings("unchecked")
@Test(timeout = 60000)
public void testByteArrayProperties() throws Exception {
Connection connection = null;
try {
connection = factory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(queueName);
MessageProducer producer = session.createProducer(queue);
ArrayList<String> list = new ArrayList<>();
list.add("aString");
ObjectMessage objectMessage = session.createObjectMessage(list);
producer.send(objectMessage);
connection.close();
} catch (Exception e) {
e.printStackTrace();
fail("Failed to send message via OpenWire: " + e.getMessage());
} finally {
if (connection != null) {
connection.close();
}
}
try {
connection = qpidfactory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(queueName);
MessageConsumer consumer = session.createConsumer(queue);
connection.start();
ObjectMessage receive = (ObjectMessage) consumer.receive(5000);
assertNotNull(receive);
/*
* As noted in section 3.5.4 of the JMS 2 specification all properties can be converted to String
*/
Enumeration<String> propertyNames = receive.getPropertyNames();
while (propertyNames.hasMoreElements()) {
receive.getStringProperty(propertyNames.nextElement());
}
connection.close();
} catch (MessageFormatException e) {
e.printStackTrace();
fail("Failed to receive message via AMQP: " + e.getMessage());
} finally {
if (connection != null) {
connection.close();
}
}
}
}