From 51801d978e871056ccf9d752f8215b82756dee2e Mon Sep 17 00:00:00 2001 From: Domenico Francesco Bruscino Date: Thu, 22 Jul 2021 07:39:10 +0200 Subject: [PATCH] ARTEMIS-3396 Convert bytes properties to String for OpenWire --- .../openwire/OpenWireMessageConverter.java | 13 ++++--- .../OpenWireMessageConverterTest.java | 17 +++++++++ .../crossprotocol/AMQPToOpenwireTest.java | 37 +++++++++++++++++++ 3 files changed, 61 insertions(+), 6 deletions(-) diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java index 337cefdc01..3cc1a1635e 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java @@ -37,6 +37,7 @@ import java.util.zip.Inflater; import java.util.zip.InflaterInputStream; import java.util.zip.InflaterOutputStream; +import com.google.common.io.BaseEncoding; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException; import org.apache.activemq.artemis.api.core.ICoreMessage; @@ -963,13 +964,13 @@ public final class OpenWireMessageConverter { try { if (prop instanceof SimpleString) { amqMsg.setObjectProperty(keyStr, prop.toString()); + } else if (prop instanceof byte[]) { + amqMsg.setObjectProperty(keyStr, BaseEncoding.base16().encode((byte[])prop)); + } else if (keyStr.equals(MessageUtil.JMSXDELIVERYCOUNT) && prop instanceof Long) { + Long l = (Long) prop; + amqMsg.setObjectProperty(keyStr, l.intValue()); } else { - if (keyStr.equals(MessageUtil.JMSXDELIVERYCOUNT) && prop instanceof Long) { - Long l = (Long) prop; - amqMsg.setObjectProperty(keyStr, l.intValue()); - } else { - amqMsg.setObjectProperty(keyStr, prop); - } + amqMsg.setObjectProperty(keyStr, prop); } } catch (JMSException e) { throw new IOException("exception setting property " + s + " : " + prop, e); diff --git a/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverterTest.java b/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverterTest.java index d29676f0de..e1ecfd4790 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverterTest.java +++ b/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverterTest.java @@ -36,6 +36,7 @@ import org.junit.Test; import org.mockito.Mockito; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; public class OpenWireMessageConverterTest { @@ -91,4 +92,20 @@ public class OpenWireMessageConverterTest { } } + + @Test + public void testBytesPropertyConversionToString() throws Exception { + final String bytesPropertyKey = "bytesProperty"; + + ICoreMessage coreMessage = new CoreMessage().initBuffer(8); + coreMessage.putBytesProperty(bytesPropertyKey, "TEST".getBytes()); + + MessageReference messageReference = new MessageReferenceImpl(coreMessage, Mockito.mock(Queue.class)); + AMQConsumer amqConsumer = Mockito.mock(AMQConsumer.class); + Mockito.when(amqConsumer.getOpenwireDestination()).thenReturn(destination); + + MessageDispatch messageDispatch = OpenWireMessageConverter.createMessageDispatch(messageReference, coreMessage, openWireFormat, amqConsumer, nodeUUID); + + assertTrue(messageDispatch.getMessage().getProperty(bytesPropertyKey) instanceof String); + } } \ No newline at end of file diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/crossprotocol/AMQPToOpenwireTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/crossprotocol/AMQPToOpenwireTest.java index f2fa7916bc..2b84152967 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/crossprotocol/AMQPToOpenwireTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/crossprotocol/AMQPToOpenwireTest.java @@ -25,6 +25,7 @@ import javax.jms.Queue; import javax.jms.Session; import java.net.URI; import java.util.ArrayList; +import java.util.Collections; import java.util.concurrent.TimeUnit; import org.apache.activemq.ActiveMQConnectionFactory; @@ -42,7 +43,9 @@ import org.apache.activemq.transport.amqp.client.AmqpMessage; import org.apache.activemq.transport.amqp.client.AmqpSender; import org.apache.activemq.transport.amqp.client.AmqpSession; import org.apache.qpid.jms.JmsConnectionFactory; +import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.UnsignedInteger; +import org.apache.qpid.proton.amqp.messaging.ApplicationProperties; import org.apache.qpid.proton.amqp.messaging.Header; import org.junit.After; import org.junit.Before; @@ -145,4 +148,38 @@ public class AMQPToOpenwireTest extends ActiveMQTestBase { } } } + + @Test + public void testBinaryPropertyConversionToString() throws Exception { + final String binaryPropertyName = "binaryProperty"; + + AmqpClient client = new AmqpClient(new URI("tcp://127.0.0.1:61616"), null, null); + AmqpConnection amqpconnection = client.connect(); + try { + AmqpSession session = amqpconnection.createSession(); + AmqpSender sender = session.createSender(queueName); + AmqpMessage message = new AmqpMessage(); + message.getWrappedMessage().setHeader(new Header()); + message.getWrappedMessage().setApplicationProperties(new ApplicationProperties(Collections.singletonMap(binaryPropertyName, new Binary("TEST".getBytes())))); + sender.send(message); + } finally { + amqpconnection.close(); + } + + Connection connection = null; + try { + connection = factory.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(queueName); + MessageConsumer consumer = session.createConsumer(queue); + connection.start(); + Message receive = consumer.receive(5000); + assertNotNull(receive); + assertTrue(receive.getObjectProperty(binaryPropertyName) instanceof String); + } finally { + if (connection != null) { + connection.close(); + } + } + } }