ARTEMIS-2608 Fix ClassCastException on binary properties conversion

This commit is contained in:
Federico Valeri 2020-03-23 15:58:53 +01:00 committed by Clebert Suconic
parent ddb67ccdb1
commit 8cf35385a4
4 changed files with 135 additions and 1 deletions

1
.gitignore vendored
View File

@ -4,6 +4,7 @@
**/bin **/bin
**/obj **/obj
.idea/ .idea/
.vscode/
ratReport.txt ratReport.txt
**/server.lock **/server.lock
**/data **/data

View File

@ -54,6 +54,7 @@ import java.nio.CharBuffer;
import java.nio.charset.CharacterCodingException; import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset; import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
@ -477,7 +478,8 @@ public class AmqpCoreConverter {
} else if (value instanceof Decimal32) { } else if (value instanceof Decimal32) {
msg.setFloatProperty(key, ((Decimal32) value).floatValue()); msg.setFloatProperty(key, ((Decimal32) value).floatValue());
} else if (value instanceof Binary) { } else if (value instanceof Binary) {
msg.setStringProperty(key, value.toString()); Binary bin = (Binary) value;
msg.setObjectProperty(key, Arrays.copyOfRange(bin.getArray(), bin.getArrayOffset(), bin.getLength()));
} else { } else {
msg.setObjectProperty(key, value); msg.setObjectProperty(key, value);
} }

View File

@ -1663,6 +1663,7 @@
<exclude>**/org.apache.activemq.artemis.cfg</exclude> <exclude>**/org.apache.activemq.artemis.cfg</exclude>
<exclude>**/nb-configuration.xml</exclude> <exclude>**/nb-configuration.xml</exclude>
<exclude>**/nbactions-tests.xml</exclude> <exclude>**/nbactions-tests.xml</exclude>
<exclude>**/.vscode/settings.json</exclude>
<!-- activemq5 unit tests exclude --> <!-- activemq5 unit tests exclude -->
<exclude>**/*.data</exclude> <exclude>**/*.data</exclude>
<exclude>**/*.bin</exclude> <exclude>**/*.bin</exclude>

View File

@ -0,0 +1,130 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.crossprotocol;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class OpenwireAmqpResenderTest extends ActiveMQTestBase {
private static final String OPENWIRE_URL = "tcp://localhost:61616";
private static final String AMQP_URL = "amqp://localhost:61616";
private static final String QUEUE_ZERO_NAME = "queue.zero";
private static final String QUEUE_ONE_NAME = "queue.one";
private ActiveMQServer server;
private ActiveMQConnectionFactory factory;
private JmsConnectionFactory qpidFactory;
@Override
@Before
public void setUp() throws Exception {
super.setUp();
server = createServer(true, true);
factory = new ActiveMQConnectionFactory(OPENWIRE_URL);
qpidFactory = new JmsConnectionFactory(AMQP_URL);
Configuration serverConfig = server.getConfiguration();
serverConfig.getAddressesSettings().put("#", new AddressSettings().setAutoCreateQueues(true)
.setAutoCreateAddresses(true).setDeadLetterAddress(new SimpleString("ActiveMQ.DLQ")));
serverConfig.setSecurityEnabled(false);
server.start();
SimpleString coreQueueZero = new SimpleString(QUEUE_ZERO_NAME);
server.createQueue(coreQueueZero, RoutingType.ANYCAST, coreQueueZero, null, false, false);
SimpleString coreQueueOne = new SimpleString(QUEUE_ONE_NAME);
server.createQueue(coreQueueOne, RoutingType.ANYCAST, coreQueueOne, null, false, false);
}
@Override
@After
public void tearDown() throws Exception {
if (server != null) {
server.stop();
server = null;
}
}
@Test(timeout = 5_000)
public void internalOpenwireBinaryPropShouldBeConvertedAsByteArrays() throws Exception {
openwireSender(factory);
amqpResender(qpidFactory);
openwireReceiver(factory);
}
private void openwireSender(ConnectionFactory cf) throws Exception {
Connection connection = factory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queueZero = session.createQueue(QUEUE_ZERO_NAME);
MessageProducer producer = session.createProducer(queueZero);
Message testMessage = session.createTextMessage("test");
producer.send(testMessage);
connection.close();
}
private void amqpResender(ConnectionFactory cf) throws Exception {
Connection connection = cf.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queueZero = session.createQueue(QUEUE_ZERO_NAME);
Queue queueOne = session.createQueue(QUEUE_ONE_NAME);
MessageConsumer consumer = session.createConsumer(queueZero);
connection.start();
Message message = consumer.receive();
assertNotNull(message);
MessageProducer producer = session.createProducer(queueOne);
producer.send(message);
connection.close();
}
private void openwireReceiver(ConnectionFactory cf) throws Exception {
Connection connection = cf.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queueOne = session.createQueue(QUEUE_ONE_NAME);
MessageConsumer consumer = session.createConsumer(queueOne);
connection.start();
Message receivedMessage = consumer.receive();
assertNotNull(receivedMessage);
connection.close();
}
}