This commit is contained in:
Clebert Suconic 2022-04-25 09:44:36 -04:00
commit 2f72b6068d
2 changed files with 104 additions and 19 deletions

View File

@ -94,9 +94,9 @@ public final class OpenWireMessageConverter {
private static final SimpleString AMQ_MSG_COMMAND_ID = new SimpleString(AMQ_PREFIX + "COMMAND_ID");
private static final SimpleString AMQ_MSG_DATASTRUCTURE = new SimpleString(AMQ_PREFIX + "DATASTRUCTURE");
public static final SimpleString AMQ_MSG_MESSAGE_ID = new SimpleString(AMQ_PREFIX + "MESSAGE_ID");
private static final SimpleString AMQ_MSG_ORIG_DESTINATION = new SimpleString(AMQ_PREFIX + "ORIG_DESTINATION");
public static final SimpleString AMQ_MSG_ORIG_DESTINATION = new SimpleString(AMQ_PREFIX + "ORIG_DESTINATION");
public static final SimpleString AMQ_MSG_PRODUCER_ID = new SimpleString(AMQ_PREFIX + "PRODUCER_ID");
private static final SimpleString AMQ_MSG_REPLY_TO = new SimpleString(AMQ_PREFIX + "REPLY_TO");
public static final SimpleString AMQ_MSG_REPLY_TO = new SimpleString(AMQ_PREFIX + "REPLY_TO");
private static final SimpleString AMQ_MSG_USER_ID = new SimpleString(AMQ_PREFIX + "USER_ID");
@ -622,35 +622,46 @@ public final class OpenWireMessageConverter {
amqMsg.setGroupSequence(coreMessage.getGroupSequence());
final SimpleString midString = getObjectProperty(coreMessage, SimpleString.class, AMQ_MSG_MESSAGE_ID);
final MessageId mid;
if (midString != null) {
mid = new MessageId(midString.toString());
final Object messageIdValue = getObjectProperty(coreMessage, Object.class, AMQ_MSG_MESSAGE_ID);
final MessageId messageId;
if (messageIdValue instanceof SimpleString) {
messageId = new MessageId(messageIdValue.toString());
} else if (messageIdValue instanceof byte[]) {
ByteSequence midSeq = new ByteSequence((byte[]) messageIdValue);
messageId = (MessageId) marshaller.unmarshal(midSeq);
} else {
//JMSMessageID should be started with "ID:" and needs to be globally unique (node + journal id)
// ARTEMIS-3776 due to AMQ-6431 some older clients will not be able to receive messages
// if using a failover schema due to the messageID overFlowing Integer.MAX_VALUE
String midd = "ID:" + serverNodeUUID + ":-1:-1:" + (coreMessage.getMessageID() / Integer.MAX_VALUE);
mid = new MessageId(midd, coreMessage.getMessageID() % Integer.MAX_VALUE);
messageId = new MessageId(midd, coreMessage.getMessageID() % Integer.MAX_VALUE);
}
amqMsg.setMessageId(mid);
amqMsg.setMessageId(messageId);
final SimpleString origDestBytes = getObjectProperty(coreMessage, SimpleString.class, AMQ_MSG_ORIG_DESTINATION);
if (origDestBytes != null) {
amqMsg.setOriginalDestination(ActiveMQDestination.createDestination(origDestBytes.toString(), QUEUE_TYPE));
final Object origDestValue = getObjectProperty(coreMessage, Object.class, AMQ_MSG_ORIG_DESTINATION);
if (origDestValue instanceof SimpleString) {
amqMsg.setOriginalDestination(ActiveMQDestination.createDestination(origDestValue.toString(), QUEUE_TYPE));
} else if (origDestValue instanceof byte[]) {
ActiveMQDestination origDest = (ActiveMQDestination) marshaller.unmarshal(new ByteSequence((byte[]) origDestValue));
amqMsg.setOriginalDestination(origDest);
}
final SimpleString producerId = getObjectProperty(coreMessage, SimpleString.class, AMQ_MSG_PRODUCER_ID);
if (producerId != null && producerId.length() > 0) {
amqMsg.setProducerId(new ProducerId(producerId.toString()));
final Object producerIdValue = getObjectProperty(coreMessage, Object.class, AMQ_MSG_PRODUCER_ID);
if (producerIdValue instanceof SimpleString && ((SimpleString) producerIdValue).length() > 0) {
amqMsg.setProducerId(new ProducerId(producerIdValue.toString()));
} else if (producerIdValue instanceof byte[]) {
ProducerId producerId = (ProducerId) marshaller.unmarshal(new ByteSequence((byte[]) producerIdValue));
amqMsg.setProducerId(producerId);
}
amqMsg.setRedeliveryCounter(reference.getDeliveryCount() - 1);
final SimpleString replyToBytes = getObjectProperty(coreMessage, SimpleString.class, AMQ_MSG_REPLY_TO);
if (replyToBytes != null) {
amqMsg.setReplyTo(ActiveMQDestination.createDestination(replyToBytes.toString(), QUEUE_TYPE));
final Object replyToValue = getObjectProperty(coreMessage, Object.class, AMQ_MSG_REPLY_TO);
if (replyToValue instanceof SimpleString) {
amqMsg.setReplyTo(ActiveMQDestination.createDestination(replyToValue.toString(), QUEUE_TYPE));
} else if (replyToValue instanceof byte[]) {
ActiveMQDestination replyTo = (ActiveMQDestination) marshaller.unmarshal(new ByteSequence((byte[]) replyToValue));
amqMsg.setReplyTo(replyTo);
}
final SimpleString userId = getObjectProperty(coreMessage, SimpleString.class, AMQ_MSG_USER_ID);

View File

@ -25,6 +25,7 @@ import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.MessageReferenceImpl;
import org.apache.activemq.artemis.reader.MessageUtil;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.apache.activemq.artemis.utils.UUID;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.apache.activemq.command.ActiveMQDestination;
@ -34,10 +35,15 @@ import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.openwire.OpenWireFormatFactory;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.wireformat.WireFormat;
import org.junit.Test;
import org.mockito.Mockito;
import static org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter.AMQ_MSG_MESSAGE_ID;
import static org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter.AMQ_MSG_ORIG_DESTINATION;
import static org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter.AMQ_MSG_PRODUCER_ID;
import static org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter.AMQ_MSG_REPLY_TO;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
@ -150,6 +156,23 @@ public class OpenWireMessageConverterTest {
assertEquals(PRODUCER_ID, classicMessageDispatch.getMessage().getProducerId().toString());
}
@Test
public void testLegacyProducerId() throws Exception {
final String PRODUCER_ID = "123:456:789";
ActiveMQMessage classicMessage = new ActiveMQMessage();
classicMessage.setProducerId(new ProducerId(PRODUCER_ID));
final ByteSequence pidBytes = openWireFormat.marshal(classicMessage.getProducerId());
pidBytes.compact();
ICoreMessage coreMessage = new CoreMessage().initBuffer(8);
coreMessage.putBytesProperty(AMQ_MSG_PRODUCER_ID, pidBytes.data);
MessageReference messageReference = new MessageReferenceImpl(coreMessage, Mockito.mock(Queue.class));
AMQConsumer amqConsumer = Mockito.mock(AMQConsumer.class);
Mockito.when(amqConsumer.getOpenwireDestination()).thenReturn(destination);
MessageDispatch messageDispatch = OpenWireMessageConverter.createMessageDispatch(messageReference, coreMessage, openWireFormat, amqConsumer, nodeUUID);
assertEquals(PRODUCER_ID, messageDispatch.getMessage().getProducerId().toString());
}
@Test
public void testMessageId() throws Exception {
final String MESSAGE_ID = "ID:123:456:789";
@ -157,7 +180,7 @@ public class OpenWireMessageConverterTest {
ActiveMQMessage classicMessage = new ActiveMQMessage();
classicMessage.setMessageId(new MessageId(MESSAGE_ID));
Message artemisMessage = OpenWireMessageConverter.inbound(classicMessage.getMessage(), openWireFormat, null);
assertEquals(MESSAGE_ID, artemisMessage.getStringProperty(OpenWireMessageConverter.AMQ_MSG_MESSAGE_ID));
assertEquals(MESSAGE_ID, artemisMessage.getStringProperty(AMQ_MSG_MESSAGE_ID));
MessageReference messageReference = new MessageReferenceImpl(artemisMessage, Mockito.mock(Queue.class));
AMQConsumer amqConsumer = Mockito.mock(AMQConsumer.class);
@ -166,6 +189,57 @@ public class OpenWireMessageConverterTest {
assertEquals(MESSAGE_ID, classicMessageDispatch.getMessage().getMessageId().toString());
}
@Test
public void testLegacyMessageId() throws Exception {
final String MESSAGE_ID = "ID:123:456:789";
ActiveMQMessage classicMessage = new ActiveMQMessage();
classicMessage.setMessageId(new MessageId(MESSAGE_ID));
final ByteSequence midBytes = openWireFormat.marshal(classicMessage.getMessageId());
midBytes.compact();
ICoreMessage coreMessage = new CoreMessage().initBuffer(8);
coreMessage.putBytesProperty(AMQ_MSG_MESSAGE_ID, midBytes.data);
MessageReference messageReference = new MessageReferenceImpl(coreMessage, Mockito.mock(Queue.class));
AMQConsumer amqConsumer = Mockito.mock(AMQConsumer.class);
Mockito.when(amqConsumer.getOpenwireDestination()).thenReturn(destination);
MessageDispatch messageDispatch = OpenWireMessageConverter.createMessageDispatch(messageReference, coreMessage, openWireFormat, amqConsumer, nodeUUID);
assertEquals(MESSAGE_ID, messageDispatch.getMessage().getMessageId().toString());
}
@Test
public void testLegacyOriginalDestination() throws Exception {
final String DESTINATION = RandomUtil.randomString().replace("-", "");
ActiveMQMessage classicMessage = new ActiveMQMessage();
classicMessage.setOriginalDestination(new ActiveMQQueue(DESTINATION));
final ByteSequence destBytes = openWireFormat.marshal(classicMessage.getOriginalDestination());
destBytes.compact();
ICoreMessage coreMessage = new CoreMessage().initBuffer(8);
coreMessage.putBytesProperty(AMQ_MSG_ORIG_DESTINATION, destBytes.data);
MessageReference messageReference = new MessageReferenceImpl(coreMessage, Mockito.mock(Queue.class));
AMQConsumer amqConsumer = Mockito.mock(AMQConsumer.class);
Mockito.when(amqConsumer.getOpenwireDestination()).thenReturn(destination);
MessageDispatch messageDispatch = OpenWireMessageConverter.createMessageDispatch(messageReference, coreMessage, openWireFormat, amqConsumer, nodeUUID);
assertEquals("queue://" + DESTINATION, messageDispatch.getMessage().getOriginalDestination().toString());
}
@Test
public void testLegacyReplyTo() throws Exception {
final String DESTINATION = RandomUtil.randomString().replace("-", "");
ActiveMQMessage classicMessage = new ActiveMQMessage();
classicMessage.setJMSReplyTo(new ActiveMQQueue(DESTINATION));
final ByteSequence destBytes = openWireFormat.marshal(classicMessage.getJMSReplyTo());
destBytes.compact();
ICoreMessage coreMessage = new CoreMessage().initBuffer(8);
coreMessage.putBytesProperty(AMQ_MSG_REPLY_TO, destBytes.data);
MessageReference messageReference = new MessageReferenceImpl(coreMessage, Mockito.mock(Queue.class));
AMQConsumer amqConsumer = Mockito.mock(AMQConsumer.class);
Mockito.when(amqConsumer.getOpenwireDestination()).thenReturn(destination);
MessageDispatch messageDispatch = OpenWireMessageConverter.createMessageDispatch(messageReference, coreMessage, openWireFormat, amqConsumer, nodeUUID);
assertEquals("queue://" + DESTINATION, messageDispatch.getMessage().getReplyTo().toString());
}
@Test
public void testBadPropertyConversion() throws Exception {
final String hdrArrival = "__HDR_ARRIVAL";