diff --git a/.gitignore b/.gitignore index 9d067b717e..2f6e1516a0 100644 --- a/.gitignore +++ b/.gitignore @@ -4,6 +4,7 @@ **/bin **/obj .idea/ +.vscode/ ratReport.txt **/server.lock **/data diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java index ad079e9fbc..564291f88d 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java @@ -54,6 +54,7 @@ import java.nio.CharBuffer; import java.nio.charset.CharacterCodingException; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Set; @@ -477,7 +478,8 @@ public class AmqpCoreConverter { } else if (value instanceof Decimal32) { msg.setFloatProperty(key, ((Decimal32) value).floatValue()); } else if (value instanceof Binary) { - msg.setStringProperty(key, value.toString()); + Binary bin = (Binary) value; + msg.setObjectProperty(key, Arrays.copyOfRange(bin.getArray(), bin.getArrayOffset(), bin.getLength())); } else { msg.setObjectProperty(key, value); } diff --git a/pom.xml b/pom.xml index 466c001361..a3e752dc6b 100644 --- a/pom.xml +++ b/pom.xml @@ -1663,6 +1663,7 @@ **/org.apache.activemq.artemis.cfg **/nb-configuration.xml **/nbactions-tests.xml + **/.vscode/settings.json **/*.data **/*.bin diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/crossprotocol/OpenwireAmqpResenderTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/crossprotocol/OpenwireAmqpResenderTest.java new file mode 100644 index 0000000000..beeaad5566 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/crossprotocol/OpenwireAmqpResenderTest.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.crossprotocol; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.qpid.jms.JmsConnectionFactory; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class OpenwireAmqpResenderTest extends ActiveMQTestBase { + + private static final String OPENWIRE_URL = "tcp://localhost:61616"; + private static final String AMQP_URL = "amqp://localhost:61616"; + private static final String QUEUE_ZERO_NAME = "queue.zero"; + private static final String QUEUE_ONE_NAME = "queue.one"; + + private ActiveMQServer server; + private ActiveMQConnectionFactory factory; + private JmsConnectionFactory qpidFactory; + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + server = createServer(true, true); + + factory = new ActiveMQConnectionFactory(OPENWIRE_URL); + qpidFactory = new JmsConnectionFactory(AMQP_URL); + + Configuration serverConfig = server.getConfiguration(); + serverConfig.getAddressesSettings().put("#", new AddressSettings().setAutoCreateQueues(true) + .setAutoCreateAddresses(true).setDeadLetterAddress(new SimpleString("ActiveMQ.DLQ"))); + serverConfig.setSecurityEnabled(false); + server.start(); + + SimpleString coreQueueZero = new SimpleString(QUEUE_ZERO_NAME); + server.createQueue(coreQueueZero, RoutingType.ANYCAST, coreQueueZero, null, false, false); + + SimpleString coreQueueOne = new SimpleString(QUEUE_ONE_NAME); + server.createQueue(coreQueueOne, RoutingType.ANYCAST, coreQueueOne, null, false, false); + } + + @Override + @After + public void tearDown() throws Exception { + if (server != null) { + server.stop(); + server = null; + } + } + + @Test(timeout = 5_000) + public void internalOpenwireBinaryPropShouldBeConvertedAsByteArrays() throws Exception { + openwireSender(factory); + amqpResender(qpidFactory); + openwireReceiver(factory); + } + + private void openwireSender(ConnectionFactory cf) throws Exception { + Connection connection = factory.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queueZero = session.createQueue(QUEUE_ZERO_NAME); + + MessageProducer producer = session.createProducer(queueZero); + Message testMessage = session.createTextMessage("test"); + producer.send(testMessage); + + connection.close(); + } + + private void amqpResender(ConnectionFactory cf) throws Exception { + Connection connection = cf.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queueZero = session.createQueue(QUEUE_ZERO_NAME); + Queue queueOne = session.createQueue(QUEUE_ONE_NAME); + + MessageConsumer consumer = session.createConsumer(queueZero); + connection.start(); + Message message = consumer.receive(); + assertNotNull(message); + + MessageProducer producer = session.createProducer(queueOne); + producer.send(message); + + connection.close(); + } + + private void openwireReceiver(ConnectionFactory cf) throws Exception { + Connection connection = cf.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queueOne = session.createQueue(QUEUE_ONE_NAME); + + MessageConsumer consumer = session.createConsumer(queueOne); + connection.start(); + Message receivedMessage = consumer.receive(); + assertNotNull(receivedMessage); + + connection.close(); + } + +}