ARTEMIS-3396 Convert bytes properties to String for OpenWire

This commit is contained in:
Domenico Francesco Bruscino 2021-07-22 07:39:10 +02:00 committed by clebertsuconic
parent 8a88c5f913
commit 51801d978e
3 changed files with 61 additions and 6 deletions

View File

@ -37,6 +37,7 @@ import java.util.zip.Inflater;
import java.util.zip.InflaterInputStream;
import java.util.zip.InflaterOutputStream;
import com.google.common.io.BaseEncoding;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
import org.apache.activemq.artemis.api.core.ICoreMessage;
@ -963,13 +964,13 @@ public final class OpenWireMessageConverter {
try {
if (prop instanceof SimpleString) {
amqMsg.setObjectProperty(keyStr, prop.toString());
} else if (prop instanceof byte[]) {
amqMsg.setObjectProperty(keyStr, BaseEncoding.base16().encode((byte[])prop));
} else if (keyStr.equals(MessageUtil.JMSXDELIVERYCOUNT) && prop instanceof Long) {
Long l = (Long) prop;
amqMsg.setObjectProperty(keyStr, l.intValue());
} else {
if (keyStr.equals(MessageUtil.JMSXDELIVERYCOUNT) && prop instanceof Long) {
Long l = (Long) prop;
amqMsg.setObjectProperty(keyStr, l.intValue());
} else {
amqMsg.setObjectProperty(keyStr, prop);
}
amqMsg.setObjectProperty(keyStr, prop);
}
} catch (JMSException e) {
throw new IOException("exception setting property " + s + " : " + prop, e);

View File

@ -36,6 +36,7 @@ import org.junit.Test;
import org.mockito.Mockito;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class OpenWireMessageConverterTest {
@ -91,4 +92,20 @@ public class OpenWireMessageConverterTest {
}
}
@Test
public void testBytesPropertyConversionToString() throws Exception {
final String bytesPropertyKey = "bytesProperty";
ICoreMessage coreMessage = new CoreMessage().initBuffer(8);
coreMessage.putBytesProperty(bytesPropertyKey, "TEST".getBytes());
MessageReference messageReference = new MessageReferenceImpl(coreMessage, Mockito.mock(Queue.class));
AMQConsumer amqConsumer = Mockito.mock(AMQConsumer.class);
Mockito.when(amqConsumer.getOpenwireDestination()).thenReturn(destination);
MessageDispatch messageDispatch = OpenWireMessageConverter.createMessageDispatch(messageReference, coreMessage, openWireFormat, amqConsumer, nodeUUID);
assertTrue(messageDispatch.getMessage().getProperty(bytesPropertyKey) instanceof String);
}
}

View File

@ -25,6 +25,7 @@ import javax.jms.Queue;
import javax.jms.Session;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.ActiveMQConnectionFactory;
@ -42,7 +43,9 @@ import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.UnsignedInteger;
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
import org.apache.qpid.proton.amqp.messaging.Header;
import org.junit.After;
import org.junit.Before;
@ -145,4 +148,38 @@ public class AMQPToOpenwireTest extends ActiveMQTestBase {
}
}
}
@Test
public void testBinaryPropertyConversionToString() throws Exception {
final String binaryPropertyName = "binaryProperty";
AmqpClient client = new AmqpClient(new URI("tcp://127.0.0.1:61616"), null, null);
AmqpConnection amqpconnection = client.connect();
try {
AmqpSession session = amqpconnection.createSession();
AmqpSender sender = session.createSender(queueName);
AmqpMessage message = new AmqpMessage();
message.getWrappedMessage().setHeader(new Header());
message.getWrappedMessage().setApplicationProperties(new ApplicationProperties(Collections.singletonMap(binaryPropertyName, new Binary("TEST".getBytes()))));
sender.send(message);
} finally {
amqpconnection.close();
}
Connection connection = null;
try {
connection = factory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(queueName);
MessageConsumer consumer = session.createConsumer(queue);
connection.start();
Message receive = consumer.receive(5000);
assertNotNull(receive);
assertTrue(receive.getObjectProperty(binaryPropertyName) instanceof String);
} finally {
if (connection != null) {
connection.close();
}
}
}
}