ARTEMIS-2390 JMSMessageID header can be null when messages are cross-protocol

If a jms client (be it openwire, amqp, or core jms) receives a message that
is from a different protocol, the JMSMessageID maybe null when the
jms client expects it.
This commit is contained in:
Howard Gao 2019-06-25 20:21:40 +08:00 committed by Clebert Suconic
parent a1e4f41183
commit c66d62e4b0
8 changed files with 195 additions and 3 deletions

View File

@ -107,6 +107,10 @@ public final class UUID {
mId[UUID.INDEX_VARIATION] |= (byte) 0x80; mId[UUID.INDEX_VARIATION] |= (byte) 0x80;
} }
public UUID(final byte[] data) {
mId = data;
}
public byte[] asBytes() { public byte[] asBytes() {
return mId; return mId;
} }

View File

@ -18,6 +18,7 @@ package org.apache.activemq.artemis.utils;
import java.net.NetworkInterface; import java.net.NetworkInterface;
import java.net.SocketException; import java.net.SocketException;
import java.nio.ByteBuffer;
import java.security.SecureRandom; import java.security.SecureRandom;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
@ -109,6 +110,18 @@ public final class UUIDGenerator {
return new UUID(UUID.TYPE_TIME_BASED, contents); return new UUID(UUID.TYPE_TIME_BASED, contents);
} }
public UUID fromJavaUUID(java.util.UUID uuid) {
long msb = uuid.getMostSignificantBits();
long lsb = uuid.getLeastSignificantBits();
ByteBuffer buffer = ByteBuffer.allocate(16);
buffer.putLong(msb);
buffer.putLong(lsb);
byte[] contents = buffer.array();
return new UUID(contents);
}
public byte[] generateDummyAddress() { public byte[] generateDummyAddress() {
Random rnd = getRandomNumberGenerator(); Random rnd = getRandomNumberGenerator();
byte[] dummy = new byte[6]; byte[] dummy = new byte[6];

View File

@ -55,6 +55,7 @@ import java.nio.charset.StandardCharsets;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.UUID;
import javax.jms.DeliveryMode; import javax.jms.DeliveryMode;
import javax.jms.JMSException; import javax.jms.JMSException;
@ -67,6 +68,7 @@ import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage;
import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSStreamMessage; import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSStreamMessage;
import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable; import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode; import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.apache.activemq.artemis.utils.collections.TypedProperties; import org.apache.activemq.artemis.utils.collections.TypedProperties;
import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Decimal128; import org.apache.qpid.proton.amqp.Decimal128;
@ -311,7 +313,15 @@ public class AmqpCoreConverter {
if (properties != null) { if (properties != null) {
if (properties.getMessageId() != null) { if (properties.getMessageId() != null) {
jms.setJMSMessageID(AMQPMessageIdHelper.INSTANCE.toMessageIdString(properties.getMessageId())); jms.setJMSMessageID(AMQPMessageIdHelper.INSTANCE.toMessageIdString(properties.getMessageId()));
//core jms clients get JMSMessageID from UserID which is a UUID object
if (properties.getMessageId() instanceof UUID) {
//AMQP's message ID can be a UUID, keep it
jms.getInnerMessage().setUserID(UUIDGenerator.getInstance().fromJavaUUID((UUID) properties.getMessageId()));
} else {
jms.getInnerMessage().setUserID(UUIDGenerator.getInstance().generateUUID());
} }
}
Binary userId = properties.getUserId(); Binary userId = properties.getUserId();
if (userId != null) { if (userId != null) {
jms.setStringProperty("JMSXUserID", new String(userId.getArray(), userId.getArrayOffset(), userId.getLength(), StandardCharsets.UTF_8)); jms.setStringProperty("JMSXUserID", new String(userId.getArray(), userId.getArrayOffset(), userId.getLength(), StandardCharsets.UTF_8));
@ -345,6 +355,7 @@ public class AmqpCoreConverter {
} }
} }
Object correlationID = properties.getCorrelationId(); Object correlationID = properties.getCorrelationId();
if (correlationID != null) { if (correlationID != null) {
try { try {
jms.getInnerMessage().setCorrelationID(AMQPMessageIdHelper.INSTANCE.toCorrelationIdString(correlationID)); jms.getInnerMessage().setCorrelationID(AMQPMessageIdHelper.INSTANCE.toCorrelationIdString(correlationID));
@ -374,7 +385,6 @@ public class AmqpCoreConverter {
jms.setJMSExpiration(properties.getAbsoluteExpiryTime().getTime()); jms.setJMSExpiration(properties.getAbsoluteExpiryTime().getTime());
} }
} }
return jms; return jms;
} }

View File

@ -159,6 +159,10 @@ public class CoreAmqpConverter {
} catch (ActiveMQAMQPIllegalStateException e) { } catch (ActiveMQAMQPIllegalStateException e) {
properties.setMessageId(messageId); properties.setMessageId(messageId);
} }
} else {
if (message.getInnerMessage().getUserID() != null) {
properties.setMessageId("ID:" + message.getInnerMessage().getUserID().toString());
}
} }
Destination destination = message.getJMSDestination(); Destination destination = message.getJMSDestination();
if (destination != null) { if (destination != null) {

View File

@ -187,6 +187,8 @@ public final class OpenWireMessageConverter {
midBytes.compact(); midBytes.compact();
coreMessage.putBytesProperty(AMQ_MSG_MESSAGE_ID, midBytes.data); coreMessage.putBytesProperty(AMQ_MSG_MESSAGE_ID, midBytes.data);
coreMessage.setUserID(UUIDGenerator.getInstance().generateUUID());
final ProducerId producerId = messageSend.getProducerId(); final ProducerId producerId = messageSend.getProducerId();
if (producerId != null) { if (producerId != null) {
final ByteSequence producerIdBytes = marshaller.marshal(producerId); final ByteSequence producerIdBytes = marshaller.marshal(producerId);
@ -629,7 +631,9 @@ public final class OpenWireMessageConverter {
ByteSequence midSeq = new ByteSequence(midBytes); ByteSequence midSeq = new ByteSequence(midBytes);
mid = (MessageId) marshaller.unmarshal(midSeq); mid = (MessageId) marshaller.unmarshal(midSeq);
} else { } else {
mid = new MessageId(UUIDGenerator.getInstance().generateStringUUID() + ":-1"); //JMSMessageID should be started with "ID:"
String midd = "ID:" + UUIDGenerator.getInstance().generateStringUUID() + ":-1:-1:-1:-1";
mid = new MessageId(midd);
} }
amqMsg.setMessageId(mid); amqMsg.setMessageId(mid);

View File

@ -801,9 +801,10 @@ public class ManagementServiceImpl implements ManagementService {
// CoreMessage#getUserId returns UUID, so to implement this part a alternative API that returned object. This part of the // CoreMessage#getUserId returns UUID, so to implement this part a alternative API that returned object. This part of the
// change is a nice to have for my point of view. I suggested it for completeness. The application could // change is a nice to have for my point of view. I suggested it for completeness. The application could
// always supply unique correl ids on the request and achieve the same effect. I'd be happy to drop this part. // always supply unique correl ids on the request and achieve the same effect. I'd be happy to drop this part.
Object underlying = request.getUserID() != null ? request.getUserID() : request.getStringProperty(NATIVE_MESSAGE_ID); Object underlying = request.getStringProperty(NATIVE_MESSAGE_ID) != null ? request.getStringProperty(NATIVE_MESSAGE_ID) : request.getUserID();
correlationId = underlying == null ? null : String.valueOf(underlying); correlationId = underlying == null ? null : String.valueOf(underlying);
} }
return correlationId; return correlationId;
} }

View File

@ -0,0 +1,146 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.crossprotocol;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.tests.integration.openwire.OpenWireTestBase;
import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import java.util.Arrays;
import static org.apache.activemq.artemis.tests.util.CFUtil.createConnectionFactory;
@RunWith(Parameterized.class)
public class MessageIDMultiProtocolTest extends OpenWireTestBase {
String protocolSender;
String protocolConsumer;
ConnectionFactory senderCF;
ConnectionFactory consumerCF;
private static final SimpleString queueName = SimpleString.toSimpleString("MessageIDueueTest");
public MessageIDMultiProtocolTest(String protocolSender, String protocolConsumer) {
this.protocolSender = protocolSender;
this.protocolConsumer = protocolConsumer;
}
@Parameterized.Parameters(name = "sender={0},consumer={1}")
public static Iterable<Object[]> data() {
return Arrays.asList(new Object[][]{
{"OPENWIRE", "OPENWIRE"},
{"OPENWIRE", "CORE"},
{"OPENWIRE", "AMQP"},
{"CORE", "OPENWIRE"},
{"CORE", "CORE"},
{"CORE", "AMQP"},
{"AMQP", "OPENWIRE"},
{"AMQP", "CORE"},
{"AMQP", "AMQP"},
});
}
@Before
public void setupCF() {
senderCF = createConnectionFactory(protocolSender, urlString);
consumerCF = createConnectionFactory(protocolConsumer, urlString);
}
@Before
public void setupQueue() throws Exception {
Wait.assertTrue(server::isStarted);
Wait.assertTrue(server::isActive);
this.server.createQueue(queueName, RoutingType.ANYCAST, queueName, null, true, false, -1, false, true);
}
@Test
public void testMessageIDNotNullCorrelationIDPreserved() throws Throwable {
Connection senderConn = senderCF.createConnection();
Connection consumerConn = consumerCF.createConnection();
consumerConn.setClientID("consumer");
try (Session senderSession = senderConn.createSession(false, Session.AUTO_ACKNOWLEDGE)) {
Queue senderDestination = senderSession.createQueue(queueName.toString());
MessageProducer senderProducer = senderSession.createProducer(senderDestination);
Message sentMessage = senderSession.createMessage();
sentMessage.setJMSCorrelationID("ID:MessageIDCorrelationId");
senderProducer.send(sentMessage);
senderConn.start();
String sentMid = sentMessage.getJMSMessageID();
try (Session consumerSess = consumerConn.createSession(false, Session.AUTO_ACKNOWLEDGE)) {
Destination consumerDestination = consumerSess.createQueue(queueName.toString());
MessageConsumer consumer = consumerSess.createConsumer(consumerDestination);
consumerConn.start();
Message receivedMessage = consumer.receive(3000);
Assert.assertNotNull(receivedMessage);
Assert.assertEquals(sentMessage.getJMSCorrelationID(), receivedMessage.getJMSCorrelationID());
String messageId = receivedMessage.getJMSMessageID();
Assert.assertNotNull(messageId);
Assert.assertTrue(messageId.startsWith("ID:"));
System.out.println("[" + protocolSender + "][" + protocolConsumer + "] " + messageId);
System.out.println("[" + protocolSender + "][" + protocolConsumer + "] " + sentMid);
if (protocolConsumer.equals(protocolSender)) {
//only same protocol we guarantee the same JMSMessageID
assertEquals(sentMid, messageId);
}
//specific case [CORE]->[AMQP]
if ("CORE".equals(protocolSender) && "AMQP".equals(protocolConsumer)) {
assertEquals(sentMid, messageId);
}
}
} catch (Throwable e) {
e.printStackTrace();
throw e;
} finally {
try {
senderConn.close();
} catch (Throwable e) {
e.printStackTrace();
}
try {
consumerConn.close();
} catch (Throwable e) {
e.printStackTrace();
}
}
}
}

View File

@ -21,6 +21,8 @@ import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import java.util.UUID;
public class UUIDGeneratorTest extends ActiveMQTestBase { public class UUIDGeneratorTest extends ActiveMQTestBase {
// Constants ----------------------------------------------------- // Constants -----------------------------------------------------
@ -32,6 +34,14 @@ public class UUIDGeneratorTest extends ActiveMQTestBase {
// Public -------------------------------------------------------- // Public --------------------------------------------------------
@Test
public void testFromJavaUUID() throws Exception {
UUID javaId = UUID.randomUUID();
UUIDGenerator gen = UUIDGenerator.getInstance();
org.apache.activemq.artemis.utils.UUID nativeId = gen.fromJavaUUID(javaId);
assertEquals(javaId.toString(), nativeId.toString());
}
@Test @Test
public void testGetHardwareAddress() throws Exception { public void testGetHardwareAddress() throws Exception {
byte[] bytes = UUIDGenerator.getHardwareAddress(); byte[] bytes = UUIDGenerator.getHardwareAddress();