From c66d62e4b09f7a529c4067f83a333a55d11667d7 Mon Sep 17 00:00:00 2001 From: Howard Gao Date: Tue, 25 Jun 2019 20:21:40 +0800 Subject: [PATCH 1/2] ARTEMIS-2390 JMSMessageID header can be null when messages are cross-protocol If a jms client (be it openwire, amqp, or core jms) receives a message that is from a different protocol, the JMSMessageID maybe null when the jms client expects it. --- .../apache/activemq/artemis/utils/UUID.java | 4 + .../activemq/artemis/utils/UUIDGenerator.java | 13 ++ .../amqp/converter/AmqpCoreConverter.java | 12 +- .../amqp/converter/CoreAmqpConverter.java | 4 + .../openwire/OpenWireMessageConverter.java | 6 +- .../impl/ManagementServiceImpl.java | 3 +- .../MessageIDMultiProtocolTest.java | 146 ++++++++++++++++++ .../tests/unit/util/UUIDGeneratorTest.java | 10 ++ 8 files changed, 195 insertions(+), 3 deletions(-) create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/crossprotocol/MessageIDMultiProtocolTest.java diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/UUID.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/UUID.java index 1d84fea906..c3f99a423b 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/UUID.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/UUID.java @@ -107,6 +107,10 @@ public final class UUID { mId[UUID.INDEX_VARIATION] |= (byte) 0x80; } + public UUID(final byte[] data) { + mId = data; + } + public byte[] asBytes() { return mId; } diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/UUIDGenerator.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/UUIDGenerator.java index c111617471..3d4f10ecab 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/UUIDGenerator.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/UUIDGenerator.java @@ -18,6 +18,7 @@ package org.apache.activemq.artemis.utils; import java.net.NetworkInterface; import java.net.SocketException; +import java.nio.ByteBuffer; import java.security.SecureRandom; import java.util.ArrayList; import java.util.Arrays; @@ -109,6 +110,18 @@ public final class UUIDGenerator { return new UUID(UUID.TYPE_TIME_BASED, contents); } + public UUID fromJavaUUID(java.util.UUID uuid) { + long msb = uuid.getMostSignificantBits(); + long lsb = uuid.getLeastSignificantBits(); + + ByteBuffer buffer = ByteBuffer.allocate(16); + buffer.putLong(msb); + buffer.putLong(lsb); + byte[] contents = buffer.array(); + + return new UUID(contents); + } + public byte[] generateDummyAddress() { Random rnd = getRandomNumberGenerator(); byte[] dummy = new byte[6]; diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java index 32d35966ae..739d4373da 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java @@ -55,6 +55,7 @@ import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.UUID; import javax.jms.DeliveryMode; import javax.jms.JMSException; @@ -67,6 +68,7 @@ import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage; import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSStreamMessage; import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable; import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode; +import org.apache.activemq.artemis.utils.UUIDGenerator; import org.apache.activemq.artemis.utils.collections.TypedProperties; import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.Decimal128; @@ -311,7 +313,15 @@ public class AmqpCoreConverter { if (properties != null) { if (properties.getMessageId() != null) { jms.setJMSMessageID(AMQPMessageIdHelper.INSTANCE.toMessageIdString(properties.getMessageId())); + //core jms clients get JMSMessageID from UserID which is a UUID object + if (properties.getMessageId() instanceof UUID) { + //AMQP's message ID can be a UUID, keep it + jms.getInnerMessage().setUserID(UUIDGenerator.getInstance().fromJavaUUID((UUID) properties.getMessageId())); + } else { + jms.getInnerMessage().setUserID(UUIDGenerator.getInstance().generateUUID()); + } } + Binary userId = properties.getUserId(); if (userId != null) { jms.setStringProperty("JMSXUserID", new String(userId.getArray(), userId.getArrayOffset(), userId.getLength(), StandardCharsets.UTF_8)); @@ -345,6 +355,7 @@ public class AmqpCoreConverter { } } Object correlationID = properties.getCorrelationId(); + if (correlationID != null) { try { jms.getInnerMessage().setCorrelationID(AMQPMessageIdHelper.INSTANCE.toCorrelationIdString(correlationID)); @@ -374,7 +385,6 @@ public class AmqpCoreConverter { jms.setJMSExpiration(properties.getAbsoluteExpiryTime().getTime()); } } - return jms; } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java index 1099d51c4c..453b7ec41c 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java @@ -159,6 +159,10 @@ public class CoreAmqpConverter { } catch (ActiveMQAMQPIllegalStateException e) { properties.setMessageId(messageId); } + } else { + if (message.getInnerMessage().getUserID() != null) { + properties.setMessageId("ID:" + message.getInnerMessage().getUserID().toString()); + } } Destination destination = message.getJMSDestination(); if (destination != null) { diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java index c6c91f303c..63082d916d 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java @@ -187,6 +187,8 @@ public final class OpenWireMessageConverter { midBytes.compact(); coreMessage.putBytesProperty(AMQ_MSG_MESSAGE_ID, midBytes.data); + coreMessage.setUserID(UUIDGenerator.getInstance().generateUUID()); + final ProducerId producerId = messageSend.getProducerId(); if (producerId != null) { final ByteSequence producerIdBytes = marshaller.marshal(producerId); @@ -629,7 +631,9 @@ public final class OpenWireMessageConverter { ByteSequence midSeq = new ByteSequence(midBytes); mid = (MessageId) marshaller.unmarshal(midSeq); } else { - mid = new MessageId(UUIDGenerator.getInstance().generateStringUUID() + ":-1"); + //JMSMessageID should be started with "ID:" + String midd = "ID:" + UUIDGenerator.getInstance().generateStringUUID() + ":-1:-1:-1:-1"; + mid = new MessageId(midd); } amqMsg.setMessageId(mid); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java index 0cb0f195bc..2ba8e4e7fb 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java @@ -801,9 +801,10 @@ public class ManagementServiceImpl implements ManagementService { // CoreMessage#getUserId returns UUID, so to implement this part a alternative API that returned object. This part of the // change is a nice to have for my point of view. I suggested it for completeness. The application could // always supply unique correl ids on the request and achieve the same effect. I'd be happy to drop this part. - Object underlying = request.getUserID() != null ? request.getUserID() : request.getStringProperty(NATIVE_MESSAGE_ID); + Object underlying = request.getStringProperty(NATIVE_MESSAGE_ID) != null ? request.getStringProperty(NATIVE_MESSAGE_ID) : request.getUserID(); correlationId = underlying == null ? null : String.valueOf(underlying); } + return correlationId; } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/crossprotocol/MessageIDMultiProtocolTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/crossprotocol/MessageIDMultiProtocolTest.java new file mode 100644 index 0000000000..e652f79283 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/crossprotocol/MessageIDMultiProtocolTest.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.integration.crossprotocol; + +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.tests.integration.openwire.OpenWireTestBase; +import org.apache.activemq.artemis.tests.util.Wait; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import java.util.Arrays; + +import static org.apache.activemq.artemis.tests.util.CFUtil.createConnectionFactory; + +@RunWith(Parameterized.class) +public class MessageIDMultiProtocolTest extends OpenWireTestBase { + + String protocolSender; + String protocolConsumer; + ConnectionFactory senderCF; + ConnectionFactory consumerCF; + private static final SimpleString queueName = SimpleString.toSimpleString("MessageIDueueTest"); + + public MessageIDMultiProtocolTest(String protocolSender, String protocolConsumer) { + this.protocolSender = protocolSender; + this.protocolConsumer = protocolConsumer; + } + + @Parameterized.Parameters(name = "sender={0},consumer={1}") + public static Iterable data() { + return Arrays.asList(new Object[][]{ + {"OPENWIRE", "OPENWIRE"}, + {"OPENWIRE", "CORE"}, + {"OPENWIRE", "AMQP"}, + {"CORE", "OPENWIRE"}, + {"CORE", "CORE"}, + {"CORE", "AMQP"}, + {"AMQP", "OPENWIRE"}, + {"AMQP", "CORE"}, + {"AMQP", "AMQP"}, + }); + } + + + @Before + public void setupCF() { + senderCF = createConnectionFactory(protocolSender, urlString); + consumerCF = createConnectionFactory(protocolConsumer, urlString); + } + + @Before + public void setupQueue() throws Exception { + Wait.assertTrue(server::isStarted); + Wait.assertTrue(server::isActive); + this.server.createQueue(queueName, RoutingType.ANYCAST, queueName, null, true, false, -1, false, true); + } + + + @Test + public void testMessageIDNotNullCorrelationIDPreserved() throws Throwable { + Connection senderConn = senderCF.createConnection(); + Connection consumerConn = consumerCF.createConnection(); + consumerConn.setClientID("consumer"); + + try (Session senderSession = senderConn.createSession(false, Session.AUTO_ACKNOWLEDGE)) { + Queue senderDestination = senderSession.createQueue(queueName.toString()); + MessageProducer senderProducer = senderSession.createProducer(senderDestination); + Message sentMessage = senderSession.createMessage(); + sentMessage.setJMSCorrelationID("ID:MessageIDCorrelationId"); + senderProducer.send(sentMessage); + senderConn.start(); + + String sentMid = sentMessage.getJMSMessageID(); + + try (Session consumerSess = consumerConn.createSession(false, Session.AUTO_ACKNOWLEDGE)) { + Destination consumerDestination = consumerSess.createQueue(queueName.toString()); + MessageConsumer consumer = consumerSess.createConsumer(consumerDestination); + consumerConn.start(); + + Message receivedMessage = consumer.receive(3000); + Assert.assertNotNull(receivedMessage); + + Assert.assertEquals(sentMessage.getJMSCorrelationID(), receivedMessage.getJMSCorrelationID()); + + String messageId = receivedMessage.getJMSMessageID(); + Assert.assertNotNull(messageId); + + Assert.assertTrue(messageId.startsWith("ID:")); + + System.out.println("[" + protocolSender + "][" + protocolConsumer + "] " + messageId); + System.out.println("[" + protocolSender + "][" + protocolConsumer + "] " + sentMid); + + if (protocolConsumer.equals(protocolSender)) { + //only same protocol we guarantee the same JMSMessageID + assertEquals(sentMid, messageId); + } + + //specific case [CORE]->[AMQP] + if ("CORE".equals(protocolSender) && "AMQP".equals(protocolConsumer)) { + assertEquals(sentMid, messageId); + } + } + } catch (Throwable e) { + e.printStackTrace(); + throw e; + } finally { + try { + senderConn.close(); + } catch (Throwable e) { + e.printStackTrace(); + } + try { + consumerConn.close(); + } catch (Throwable e) { + e.printStackTrace(); + } + } + } +} diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/UUIDGeneratorTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/UUIDGeneratorTest.java index 46a906506f..2eaa41962f 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/UUIDGeneratorTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/UUIDGeneratorTest.java @@ -21,6 +21,8 @@ import org.apache.activemq.artemis.utils.UUIDGenerator; import org.junit.Assert; import org.junit.Test; +import java.util.UUID; + public class UUIDGeneratorTest extends ActiveMQTestBase { // Constants ----------------------------------------------------- @@ -32,6 +34,14 @@ public class UUIDGeneratorTest extends ActiveMQTestBase { // Public -------------------------------------------------------- + @Test + public void testFromJavaUUID() throws Exception { + UUID javaId = UUID.randomUUID(); + UUIDGenerator gen = UUIDGenerator.getInstance(); + org.apache.activemq.artemis.utils.UUID nativeId = gen.fromJavaUUID(javaId); + assertEquals(javaId.toString(), nativeId.toString()); + } + @Test public void testGetHardwareAddress() throws Exception { byte[] bytes = UUIDGenerator.getHardwareAddress(); From 2a84a6fd22dbfde96ac475be47aee30a3020ed77 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Thu, 27 Jun 2019 10:26:30 -0400 Subject: [PATCH 2/2] ARTEMIS-2390 Small improvement on UUID Conversion --- .../activemq/artemis/utils/ByteUtil.java | 24 ++++++++++++++++ .../apache/activemq/artemis/utils/UUID.java | 7 ++++- .../activemq/artemis/utils/UUIDGenerator.java | 11 +------- .../activemq/artemis/utils/ByteUtilTest.java | 28 +++++++++++++++++++ 4 files changed, 59 insertions(+), 11 deletions(-) diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java index 88c6d49583..f34ce1a2cd 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java @@ -163,6 +163,30 @@ public class ByteUtil { | ((int) b[0] & 0xff) << 24; } + public static byte[] longToBytes(long value) { + byte[] output = new byte[8]; + longToBytes(value, output, 0); + return output; + } + + public static void longToBytes(long x, byte[] output, int offset) { + output[offset] = (byte)(x >> 56); + output[offset + 1] = (byte)(x >> 48); + output[offset + 2] = (byte)(x >> 40); + output[offset + 3] = (byte)(x >> 32); + output[offset + 4] = (byte)(x >> 24); + output[offset + 5] = (byte)(x >> 16); + output[offset + 6] = (byte)(x >> 8); + output[offset + 7] = (byte)(x); + } + + public static byte[] doubleLongToBytes(long value1, long value2) { + byte[] output = new byte[16]; + longToBytes(value1, output, 0); + longToBytes(value2, output, 8); + return output; + } + public static byte[] hexToBytes(String hexStr) { byte[] bytes = new byte[hexStr.length() / 2]; for (int i = 0; i < bytes.length; i++) { diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/UUID.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/UUID.java index c3f99a423b..7d8e984c0d 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/UUID.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/UUID.java @@ -107,10 +107,15 @@ public final class UUID { mId[UUID.INDEX_VARIATION] |= (byte) 0x80; } - public UUID(final byte[] data) { + private UUID(final byte[] data) { mId = data; } + /** This is for conversions between two types of UUID */ + public UUID(java.util.UUID uuid) { + this(ByteUtil.doubleLongToBytes(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits())); + } + public byte[] asBytes() { return mId; } diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/UUIDGenerator.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/UUIDGenerator.java index 3d4f10ecab..64475810cd 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/UUIDGenerator.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/UUIDGenerator.java @@ -18,7 +18,6 @@ package org.apache.activemq.artemis.utils; import java.net.NetworkInterface; import java.net.SocketException; -import java.nio.ByteBuffer; import java.security.SecureRandom; import java.util.ArrayList; import java.util.Arrays; @@ -111,15 +110,7 @@ public final class UUIDGenerator { } public UUID fromJavaUUID(java.util.UUID uuid) { - long msb = uuid.getMostSignificantBits(); - long lsb = uuid.getLeastSignificantBits(); - - ByteBuffer buffer = ByteBuffer.allocate(16); - buffer.putLong(msb); - buffer.putLong(lsb); - byte[] contents = buffer.array(); - - return new UUID(contents); + return new UUID(uuid); } public byte[] generateDummyAddress() { diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ByteUtilTest.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ByteUtilTest.java index f13e153d0e..929ccf8947 100644 --- a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ByteUtilTest.java +++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ByteUtilTest.java @@ -318,4 +318,32 @@ public class ByteUtilTest { assertEquals(randomInt, ByteUtil.bytesToInt(actual)); } } + + @Test + public void testLongToBytes() { + ByteBuffer buffer = ByteBuffer.allocate(8); + long randomLong = RandomUtil.randomLong(); + buffer.putLong(randomLong); + byte[] longArrayAssert = buffer.array(); + + byte[] convertedArray = ByteUtil.longToBytes(randomLong); + + assertArrayEquals(longArrayAssert, convertedArray); + } + + @Test + public void testDoubleLongToBytes() { + long randomLong1 = RandomUtil.randomLong(); + long randomLong2 = RandomUtil.randomLong(); + ByteBuffer buffer = ByteBuffer.allocate(16); + buffer.putLong(randomLong1); + buffer.putLong(randomLong2); + byte[] assertContent = buffer.array(); + + byte[] convertedContent = ByteUtil.doubleLongToBytes(randomLong1, randomLong2); + + assertArrayEquals(assertContent, convertedContent); + } + + }