This commit is contained in:
Clebert Suconic 2019-06-27 10:42:34 -04:00
commit 71b2671a61
10 changed files with 243 additions and 3 deletions

View File

@ -163,6 +163,30 @@ public class ByteUtil {
| ((int) b[0] & 0xff) << 24;
}
public static byte[] longToBytes(long value) {
byte[] output = new byte[8];
longToBytes(value, output, 0);
return output;
}
public static void longToBytes(long x, byte[] output, int offset) {
output[offset] = (byte)(x >> 56);
output[offset + 1] = (byte)(x >> 48);
output[offset + 2] = (byte)(x >> 40);
output[offset + 3] = (byte)(x >> 32);
output[offset + 4] = (byte)(x >> 24);
output[offset + 5] = (byte)(x >> 16);
output[offset + 6] = (byte)(x >> 8);
output[offset + 7] = (byte)(x);
}
public static byte[] doubleLongToBytes(long value1, long value2) {
byte[] output = new byte[16];
longToBytes(value1, output, 0);
longToBytes(value2, output, 8);
return output;
}
public static byte[] hexToBytes(String hexStr) {
byte[] bytes = new byte[hexStr.length() / 2];
for (int i = 0; i < bytes.length; i++) {

View File

@ -107,6 +107,15 @@ public final class UUID {
mId[UUID.INDEX_VARIATION] |= (byte) 0x80;
}
private UUID(final byte[] data) {
mId = data;
}
/** This is for conversions between two types of UUID */
public UUID(java.util.UUID uuid) {
this(ByteUtil.doubleLongToBytes(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits()));
}
public byte[] asBytes() {
return mId;
}

View File

@ -109,6 +109,10 @@ public final class UUIDGenerator {
return new UUID(UUID.TYPE_TIME_BASED, contents);
}
public UUID fromJavaUUID(java.util.UUID uuid) {
return new UUID(uuid);
}
public byte[] generateDummyAddress() {
Random rnd = getRandomNumberGenerator();
byte[] dummy = new byte[6];

View File

@ -318,4 +318,32 @@ public class ByteUtilTest {
assertEquals(randomInt, ByteUtil.bytesToInt(actual));
}
}
@Test
public void testLongToBytes() {
ByteBuffer buffer = ByteBuffer.allocate(8);
long randomLong = RandomUtil.randomLong();
buffer.putLong(randomLong);
byte[] longArrayAssert = buffer.array();
byte[] convertedArray = ByteUtil.longToBytes(randomLong);
assertArrayEquals(longArrayAssert, convertedArray);
}
@Test
public void testDoubleLongToBytes() {
long randomLong1 = RandomUtil.randomLong();
long randomLong2 = RandomUtil.randomLong();
ByteBuffer buffer = ByteBuffer.allocate(16);
buffer.putLong(randomLong1);
buffer.putLong(randomLong2);
byte[] assertContent = buffer.array();
byte[] convertedContent = ByteUtil.doubleLongToBytes(randomLong1, randomLong2);
assertArrayEquals(assertContent, convertedContent);
}
}

View File

@ -55,6 +55,7 @@ import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
@ -67,6 +68,7 @@ import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage;
import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSStreamMessage;
import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.apache.activemq.artemis.utils.collections.TypedProperties;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Decimal128;
@ -311,7 +313,15 @@ public class AmqpCoreConverter {
if (properties != null) {
if (properties.getMessageId() != null) {
jms.setJMSMessageID(AMQPMessageIdHelper.INSTANCE.toMessageIdString(properties.getMessageId()));
//core jms clients get JMSMessageID from UserID which is a UUID object
if (properties.getMessageId() instanceof UUID) {
//AMQP's message ID can be a UUID, keep it
jms.getInnerMessage().setUserID(UUIDGenerator.getInstance().fromJavaUUID((UUID) properties.getMessageId()));
} else {
jms.getInnerMessage().setUserID(UUIDGenerator.getInstance().generateUUID());
}
}
Binary userId = properties.getUserId();
if (userId != null) {
jms.setStringProperty("JMSXUserID", new String(userId.getArray(), userId.getArrayOffset(), userId.getLength(), StandardCharsets.UTF_8));
@ -345,6 +355,7 @@ public class AmqpCoreConverter {
}
}
Object correlationID = properties.getCorrelationId();
if (correlationID != null) {
try {
jms.getInnerMessage().setCorrelationID(AMQPMessageIdHelper.INSTANCE.toCorrelationIdString(correlationID));
@ -374,7 +385,6 @@ public class AmqpCoreConverter {
jms.setJMSExpiration(properties.getAbsoluteExpiryTime().getTime());
}
}
return jms;
}

View File

@ -159,6 +159,10 @@ public class CoreAmqpConverter {
} catch (ActiveMQAMQPIllegalStateException e) {
properties.setMessageId(messageId);
}
} else {
if (message.getInnerMessage().getUserID() != null) {
properties.setMessageId("ID:" + message.getInnerMessage().getUserID().toString());
}
}
Destination destination = message.getJMSDestination();
if (destination != null) {

View File

@ -187,6 +187,8 @@ public final class OpenWireMessageConverter {
midBytes.compact();
coreMessage.putBytesProperty(AMQ_MSG_MESSAGE_ID, midBytes.data);
coreMessage.setUserID(UUIDGenerator.getInstance().generateUUID());
final ProducerId producerId = messageSend.getProducerId();
if (producerId != null) {
final ByteSequence producerIdBytes = marshaller.marshal(producerId);
@ -629,7 +631,9 @@ public final class OpenWireMessageConverter {
ByteSequence midSeq = new ByteSequence(midBytes);
mid = (MessageId) marshaller.unmarshal(midSeq);
} else {
mid = new MessageId(UUIDGenerator.getInstance().generateStringUUID() + ":-1");
//JMSMessageID should be started with "ID:"
String midd = "ID:" + UUIDGenerator.getInstance().generateStringUUID() + ":-1:-1:-1:-1";
mid = new MessageId(midd);
}
amqMsg.setMessageId(mid);

View File

@ -801,9 +801,10 @@ public class ManagementServiceImpl implements ManagementService {
// CoreMessage#getUserId returns UUID, so to implement this part a alternative API that returned object. This part of the
// change is a nice to have for my point of view. I suggested it for completeness. The application could
// always supply unique correl ids on the request and achieve the same effect. I'd be happy to drop this part.
Object underlying = request.getUserID() != null ? request.getUserID() : request.getStringProperty(NATIVE_MESSAGE_ID);
Object underlying = request.getStringProperty(NATIVE_MESSAGE_ID) != null ? request.getStringProperty(NATIVE_MESSAGE_ID) : request.getUserID();
correlationId = underlying == null ? null : String.valueOf(underlying);
}
return correlationId;
}

View File

@ -0,0 +1,146 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.crossprotocol;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.tests.integration.openwire.OpenWireTestBase;
import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import java.util.Arrays;
import static org.apache.activemq.artemis.tests.util.CFUtil.createConnectionFactory;
@RunWith(Parameterized.class)
public class MessageIDMultiProtocolTest extends OpenWireTestBase {
String protocolSender;
String protocolConsumer;
ConnectionFactory senderCF;
ConnectionFactory consumerCF;
private static final SimpleString queueName = SimpleString.toSimpleString("MessageIDueueTest");
public MessageIDMultiProtocolTest(String protocolSender, String protocolConsumer) {
this.protocolSender = protocolSender;
this.protocolConsumer = protocolConsumer;
}
@Parameterized.Parameters(name = "sender={0},consumer={1}")
public static Iterable<Object[]> data() {
return Arrays.asList(new Object[][]{
{"OPENWIRE", "OPENWIRE"},
{"OPENWIRE", "CORE"},
{"OPENWIRE", "AMQP"},
{"CORE", "OPENWIRE"},
{"CORE", "CORE"},
{"CORE", "AMQP"},
{"AMQP", "OPENWIRE"},
{"AMQP", "CORE"},
{"AMQP", "AMQP"},
});
}
@Before
public void setupCF() {
senderCF = createConnectionFactory(protocolSender, urlString);
consumerCF = createConnectionFactory(protocolConsumer, urlString);
}
@Before
public void setupQueue() throws Exception {
Wait.assertTrue(server::isStarted);
Wait.assertTrue(server::isActive);
this.server.createQueue(queueName, RoutingType.ANYCAST, queueName, null, true, false, -1, false, true);
}
@Test
public void testMessageIDNotNullCorrelationIDPreserved() throws Throwable {
Connection senderConn = senderCF.createConnection();
Connection consumerConn = consumerCF.createConnection();
consumerConn.setClientID("consumer");
try (Session senderSession = senderConn.createSession(false, Session.AUTO_ACKNOWLEDGE)) {
Queue senderDestination = senderSession.createQueue(queueName.toString());
MessageProducer senderProducer = senderSession.createProducer(senderDestination);
Message sentMessage = senderSession.createMessage();
sentMessage.setJMSCorrelationID("ID:MessageIDCorrelationId");
senderProducer.send(sentMessage);
senderConn.start();
String sentMid = sentMessage.getJMSMessageID();
try (Session consumerSess = consumerConn.createSession(false, Session.AUTO_ACKNOWLEDGE)) {
Destination consumerDestination = consumerSess.createQueue(queueName.toString());
MessageConsumer consumer = consumerSess.createConsumer(consumerDestination);
consumerConn.start();
Message receivedMessage = consumer.receive(3000);
Assert.assertNotNull(receivedMessage);
Assert.assertEquals(sentMessage.getJMSCorrelationID(), receivedMessage.getJMSCorrelationID());
String messageId = receivedMessage.getJMSMessageID();
Assert.assertNotNull(messageId);
Assert.assertTrue(messageId.startsWith("ID:"));
System.out.println("[" + protocolSender + "][" + protocolConsumer + "] " + messageId);
System.out.println("[" + protocolSender + "][" + protocolConsumer + "] " + sentMid);
if (protocolConsumer.equals(protocolSender)) {
//only same protocol we guarantee the same JMSMessageID
assertEquals(sentMid, messageId);
}
//specific case [CORE]->[AMQP]
if ("CORE".equals(protocolSender) && "AMQP".equals(protocolConsumer)) {
assertEquals(sentMid, messageId);
}
}
} catch (Throwable e) {
e.printStackTrace();
throw e;
} finally {
try {
senderConn.close();
} catch (Throwable e) {
e.printStackTrace();
}
try {
consumerConn.close();
} catch (Throwable e) {
e.printStackTrace();
}
}
}
}

View File

@ -21,6 +21,8 @@ import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.junit.Assert;
import org.junit.Test;
import java.util.UUID;
public class UUIDGeneratorTest extends ActiveMQTestBase {
// Constants -----------------------------------------------------
@ -32,6 +34,14 @@ public class UUIDGeneratorTest extends ActiveMQTestBase {
// Public --------------------------------------------------------
@Test
public void testFromJavaUUID() throws Exception {
UUID javaId = UUID.randomUUID();
UUIDGenerator gen = UUIDGenerator.getInstance();
org.apache.activemq.artemis.utils.UUID nativeId = gen.fromJavaUUID(javaId);
assertEquals(javaId.toString(), nativeId.toString());
}
@Test
public void testGetHardwareAddress() throws Exception {
byte[] bytes = UUIDGenerator.getHardwareAddress();