diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java index d8b00c1c85..da74a43f35 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java @@ -68,7 +68,6 @@ import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageDispatch; import org.apache.activemq.command.MessageId; import org.apache.activemq.command.ProducerId; -import org.apache.activemq.command.TransactionId; import org.apache.activemq.util.ByteArrayInputStream; import org.apache.activemq.util.ByteSequence; import org.apache.activemq.util.ByteSequenceData; @@ -77,6 +76,7 @@ import org.apache.activemq.wireformat.WireFormat; import org.fusesource.hawtbuf.UTF8Buffer; import static org.apache.activemq.artemis.api.core.Message.HDR_INGRESS_TIMESTAMP; +import static org.apache.activemq.command.ActiveMQDestination.QUEUE_TYPE; public final class OpenWireMessageConverter { @@ -92,11 +92,9 @@ public final class OpenWireMessageConverter { private static final SimpleString AMQ_MSG_CLUSTER = new SimpleString(AMQ_PREFIX + "CLUSTER"); private static final SimpleString AMQ_MSG_COMMAND_ID = new SimpleString(AMQ_PREFIX + "COMMAND_ID"); private static final SimpleString AMQ_MSG_DATASTRUCTURE = new SimpleString(AMQ_PREFIX + "DATASTRUCTURE"); - private static final SimpleString AMQ_MSG_MESSAGE_ID = new SimpleString(AMQ_PREFIX + "MESSAGE_ID"); + public static final SimpleString AMQ_MSG_MESSAGE_ID = new SimpleString(AMQ_PREFIX + "MESSAGE_ID"); private static final SimpleString AMQ_MSG_ORIG_DESTINATION = new SimpleString(AMQ_PREFIX + "ORIG_DESTINATION"); - private static final SimpleString AMQ_MSG_ORIG_TXID = new SimpleString(AMQ_PREFIX + "ORIG_TXID"); - private static final SimpleString AMQ_MSG_PRODUCER_ID = new SimpleString(AMQ_PREFIX + "PRODUCER_ID"); - private static final SimpleString AMQ_MSG_MARSHALL_PROP = new SimpleString(AMQ_PREFIX + "MARSHALL_PROP"); + public static final SimpleString AMQ_MSG_PRODUCER_ID = new SimpleString(AMQ_PREFIX + "PRODUCER_ID"); private static final SimpleString AMQ_MSG_REPLY_TO = new SimpleString(AMQ_PREFIX + "REPLY_TO"); private static final SimpleString AMQ_MSG_USER_ID = new SimpleString(AMQ_PREFIX + "USER_ID"); @@ -186,24 +184,19 @@ public final class OpenWireMessageConverter { coreMessage.setGroupSequence(messageSend.getGroupSequence()); final MessageId messageId = messageSend.getMessageId(); - - final ByteSequence midBytes = marshaller.marshal(messageId); - midBytes.compact(); - coreMessage.putBytesProperty(AMQ_MSG_MESSAGE_ID, midBytes.data); + if (messageId != null) { + coreMessage.putStringProperty(AMQ_MSG_MESSAGE_ID, SimpleString.toSimpleString(messageId.toString())); + } coreMessage.setUserID(UUIDGenerator.getInstance().generateUUID()); final ProducerId producerId = messageSend.getProducerId(); if (producerId != null) { - final ByteSequence producerIdBytes = marshaller.marshal(producerId); - producerIdBytes.compact(); - coreMessage.putBytesProperty(AMQ_MSG_PRODUCER_ID, producerIdBytes.data); - } - final ByteSequence propBytes = messageSend.getMarshalledProperties(); - if (propBytes != null) { - putMsgMarshalledProperties(propBytes, messageSend, coreMessage); + coreMessage.putStringProperty(AMQ_MSG_PRODUCER_ID, SimpleString.toSimpleString(producerId.toString())); } + putMsgProperties(messageSend, coreMessage); + final ActiveMQDestination replyTo = messageSend.getReplyTo(); if (replyTo != null) { if (replyTo instanceof TemporaryQueue) { @@ -228,7 +221,7 @@ public final class OpenWireMessageConverter { final ActiveMQDestination origDest = messageSend.getOriginalDestination(); if (origDest != null) { - putMsgOriginalDestination(origDest, marshaller, coreMessage); + coreMessage.putStringProperty(AMQ_MSG_ORIG_DESTINATION, origDest.getQualifiedName()); } return coreMessage; @@ -440,12 +433,8 @@ public final class OpenWireMessageConverter { coreMessage.putBytesProperty(AMQ_MSG_DATASTRUCTURE, dsBytes.data); } - private static void putMsgMarshalledProperties(final ByteSequence propBytes, - final Message messageSend, - final CoreMessage coreMessage) throws IOException { - propBytes.compact(); - coreMessage.putBytesProperty(AMQ_MSG_MARSHALL_PROP, propBytes.data); - //unmarshall properties to core so selector will work + private static void putMsgProperties(final Message messageSend, + final CoreMessage coreMessage) throws IOException { final Map props = messageSend.getProperties(); if (!props.isEmpty()) { props.forEach((key, value) -> { @@ -458,14 +447,6 @@ public final class OpenWireMessageConverter { } } - private static void putMsgOriginalDestination(final ActiveMQDestination origDest, - final WireFormat marshaller, - final CoreMessage coreMessage) throws IOException { - final ByteSequence origDestBytes = marshaller.marshal(origDest); - origDestBytes.compact(); - coreMessage.putBytesProperty(AMQ_MSG_ORIG_DESTINATION, origDestBytes.data); - } - private static void loadMapIntoProperties(TypedProperties props, Map map) { for (Entry entry : map.entrySet()) { SimpleString key = new SimpleString(entry.getKey()); @@ -629,11 +610,10 @@ public final class OpenWireMessageConverter { amqMsg.setGroupSequence(coreMessage.getGroupSequence()); - final byte[] midBytes = getObjectProperty(coreMessage, byte[].class, AMQ_MSG_MESSAGE_ID); + final SimpleString midString = getObjectProperty(coreMessage, SimpleString.class, AMQ_MSG_MESSAGE_ID); final MessageId mid; - if (midBytes != null) { - ByteSequence midSeq = new ByteSequence(midBytes); - mid = (MessageId) marshaller.unmarshal(midSeq); + if (midString != null) { + mid = new MessageId(midString.toString()); } else { //JMSMessageID should be started with "ID:" and needs to be globally unique (node + journal id) String midd = "ID:" + serverNodeUUID + ":-1:-1:-1"; @@ -642,32 +622,21 @@ public final class OpenWireMessageConverter { amqMsg.setMessageId(mid); - final byte[] origDestBytes = getObjectProperty(coreMessage, byte[].class, AMQ_MSG_ORIG_DESTINATION); + final SimpleString origDestBytes = getObjectProperty(coreMessage, SimpleString.class, AMQ_MSG_ORIG_DESTINATION); if (origDestBytes != null) { - setAMQMsgOriginalDestination(amqMsg, marshaller, origDestBytes); + amqMsg.setOriginalDestination(ActiveMQDestination.createDestination(origDestBytes.toString(), QUEUE_TYPE)); } - final byte[] origTxIdBytes = getObjectProperty(coreMessage, byte[].class, AMQ_MSG_ORIG_TXID); - if (origTxIdBytes != null) { - setAMQMsgOriginalTransactionId(amqMsg, marshaller, origTxIdBytes); - } - - final byte[] producerIdBytes = getObjectProperty(coreMessage, byte[].class, AMQ_MSG_PRODUCER_ID); - if (producerIdBytes != null) { - ProducerId producerId = (ProducerId) marshaller.unmarshal(new ByteSequence(producerIdBytes)); - amqMsg.setProducerId(producerId); - } - - final byte[] marshalledBytes = getObjectProperty(coreMessage, byte[].class, AMQ_MSG_MARSHALL_PROP); - if (marshalledBytes != null) { - amqMsg.setMarshalledProperties(new ByteSequence(marshalledBytes)); + final SimpleString producerId = getObjectProperty(coreMessage, SimpleString.class, AMQ_MSG_PRODUCER_ID); + if (producerId != null && producerId.length() > 0) { + amqMsg.setProducerId(new ProducerId(producerId.toString())); } amqMsg.setRedeliveryCounter(reference.getDeliveryCount() - 1); - final byte[] replyToBytes = getObjectProperty(coreMessage, byte[].class, AMQ_MSG_REPLY_TO); + final SimpleString replyToBytes = getObjectProperty(coreMessage, SimpleString.class, AMQ_MSG_REPLY_TO); if (replyToBytes != null) { - setAMQMsgReplyTo(amqMsg, marshaller, replyToBytes); + amqMsg.setReplyTo(ActiveMQDestination.createDestination(replyToBytes.toString(), QUEUE_TYPE)); } final SimpleString userId = getObjectProperty(coreMessage, SimpleString.class, AMQ_MSG_USER_ID); @@ -923,27 +892,6 @@ public final class OpenWireMessageConverter { amqMsg.setDataStructure(ds); } - private static void setAMQMsgOriginalDestination(final ActiveMQMessage amqMsg, - final WireFormat marshaller, - final byte[] origDestBytes) throws IOException { - ActiveMQDestination origDest = (ActiveMQDestination) marshaller.unmarshal(new ByteSequence(origDestBytes)); - amqMsg.setOriginalDestination(origDest); - } - - private static void setAMQMsgOriginalTransactionId(final ActiveMQMessage amqMsg, - final WireFormat marshaller, - final byte[] origTxIdBytes) throws IOException { - TransactionId origTxId = (TransactionId) marshaller.unmarshal(new ByteSequence(origTxIdBytes)); - amqMsg.setOriginalTransactionId(origTxId); - } - - private static void setAMQMsgReplyTo(final ActiveMQMessage amqMsg, - final WireFormat marshaller, - final byte[] replyToBytes) throws IOException { - ActiveMQDestination replyTo = (ActiveMQDestination) marshaller.unmarshal(new ByteSequence(replyToBytes)); - amqMsg.setReplyTo(replyTo); - } - private static void setAMQMsgDlqDeliveryFailureCause(final ActiveMQMessage amqMsg, final SimpleString dlqCause) throws IOException { try { diff --git a/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverterTest.java b/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverterTest.java index 6231eefe31..ce70a1d3f2 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverterTest.java +++ b/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverterTest.java @@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.protocol.openwire; import org.apache.activemq.ActiveMQMessageAuditNoSync; import org.apache.activemq.artemis.api.core.ICoreMessage; +import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.core.message.impl.CoreMessage; import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer; import org.apache.activemq.artemis.core.server.MessageReference; @@ -27,14 +28,17 @@ import org.apache.activemq.artemis.reader.MessageUtil; import org.apache.activemq.artemis.utils.UUID; import org.apache.activemq.artemis.utils.UUIDGenerator; import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.MessageDispatch; import org.apache.activemq.command.MessageId; +import org.apache.activemq.command.ProducerId; import org.apache.activemq.openwire.OpenWireFormatFactory; import org.apache.activemq.wireformat.WireFormat; import org.junit.Test; import org.mockito.Mockito; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; @@ -110,6 +114,58 @@ public class OpenWireMessageConverterTest { assertTrue(messageDispatch.getMessage().getProperty(bytesPropertyKey) instanceof String); } + @Test + public void testProperties() throws Exception { + ICoreMessage coreMessage = new CoreMessage().initBuffer(8); + for (int i = 0; i < 5; i++) { + coreMessage.putIntProperty(i + "", i); + } + + MessageReference messageReference = new MessageReferenceImpl(coreMessage, Mockito.mock(Queue.class)); + AMQConsumer amqConsumer = Mockito.mock(AMQConsumer.class); + Mockito.when(amqConsumer.getOpenwireDestination()).thenReturn(destination); + + MessageDispatch marshalled = (MessageDispatch) openWireFormat.unmarshal(openWireFormat.marshal(OpenWireMessageConverter.createMessageDispatch(messageReference, coreMessage, openWireFormat, amqConsumer, nodeUUID))); + assertEquals(5, marshalled.getMessage().getProperties().keySet().size()); + Message converted = OpenWireMessageConverter.inbound(marshalled.getMessage(), openWireFormat, null); + for (int i = 0; i < 5; i++) { + assertTrue(converted.containsProperty(i + "")); + } + } + + @Test + public void testProducerId() throws Exception { + final String PRODUCER_ID = "123:456:789"; + + ActiveMQMessage classicMessage = new ActiveMQMessage(); + classicMessage.setProducerId(new ProducerId(PRODUCER_ID)); + classicMessage.setMessageId(new MessageId("1:1:1")); + Message artemisMessage = OpenWireMessageConverter.inbound(classicMessage.getMessage(), openWireFormat, null); + assertEquals(PRODUCER_ID, artemisMessage.getStringProperty(OpenWireMessageConverter.AMQ_MSG_PRODUCER_ID)); + + MessageReference messageReference = new MessageReferenceImpl(artemisMessage, Mockito.mock(Queue.class)); + AMQConsumer amqConsumer = Mockito.mock(AMQConsumer.class); + Mockito.when(amqConsumer.getOpenwireDestination()).thenReturn(destination); + MessageDispatch classicMessageDispatch = OpenWireMessageConverter.createMessageDispatch(messageReference, (ICoreMessage) artemisMessage, openWireFormat, amqConsumer, nodeUUID); + assertEquals(PRODUCER_ID, classicMessageDispatch.getMessage().getProducerId().toString()); + } + + @Test + public void testMessageId() throws Exception { + final String MESSAGE_ID = "ID:123:456:789"; + + ActiveMQMessage classicMessage = new ActiveMQMessage(); + classicMessage.setMessageId(new MessageId(MESSAGE_ID)); + Message artemisMessage = OpenWireMessageConverter.inbound(classicMessage.getMessage(), openWireFormat, null); + assertEquals(MESSAGE_ID, artemisMessage.getStringProperty(OpenWireMessageConverter.AMQ_MSG_MESSAGE_ID)); + + MessageReference messageReference = new MessageReferenceImpl(artemisMessage, Mockito.mock(Queue.class)); + AMQConsumer amqConsumer = Mockito.mock(AMQConsumer.class); + Mockito.when(amqConsumer.getOpenwireDestination()).thenReturn(destination); + MessageDispatch classicMessageDispatch = OpenWireMessageConverter.createMessageDispatch(messageReference, (ICoreMessage) artemisMessage, openWireFormat, amqConsumer, nodeUUID); + assertEquals(MESSAGE_ID, classicMessageDispatch.getMessage().getMessageId().toString()); + } + @Test public void testBadPropertyConversion() throws Exception { final String hdrArrival = "__HDR_ARRIVAL"; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/crossprotocol/OpenWireToAMQPTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/crossprotocol/OpenWireToAMQPTest.java index 394de4c362..d66d7c413e 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/crossprotocol/OpenWireToAMQPTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/crossprotocol/OpenWireToAMQPTest.java @@ -17,9 +17,11 @@ package org.apache.activemq.artemis.tests.integration.crossprotocol; import java.util.ArrayList; +import java.util.Enumeration; import javax.jms.Connection; import javax.jms.MessageConsumer; +import javax.jms.MessageFormatException; import javax.jms.MessageProducer; import javax.jms.ObjectMessage; import javax.jms.Queue; @@ -118,4 +120,54 @@ public class OpenWireToAMQPTest extends ActiveMQTestBase { } } } + + @SuppressWarnings("unchecked") + @Test(timeout = 60000) + public void testByteArrayProperties() throws Exception { + Connection connection = null; + try { + connection = factory.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(queueName); + MessageProducer producer = session.createProducer(queue); + ArrayList list = new ArrayList<>(); + list.add("aString"); + ObjectMessage objectMessage = session.createObjectMessage(list); + producer.send(objectMessage); + connection.close(); + } catch (Exception e) { + e.printStackTrace(); + fail("Failed to send message via OpenWire: " + e.getMessage()); + } finally { + if (connection != null) { + connection.close(); + } + } + + try { + connection = qpidfactory.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(queueName); + MessageConsumer consumer = session.createConsumer(queue); + connection.start(); + ObjectMessage receive = (ObjectMessage) consumer.receive(5000); + assertNotNull(receive); + + /* + * As noted in section 3.5.4 of the JMS 2 specification all properties can be converted to String + */ + Enumeration propertyNames = receive.getPropertyNames(); + while (propertyNames.hasMoreElements()) { + receive.getStringProperty(propertyNames.nextElement()); + } + connection.close(); + } catch (MessageFormatException e) { + e.printStackTrace(); + fail("Failed to receive message via AMQP: " + e.getMessage()); + } finally { + if (connection != null) { + connection.close(); + } + } + } }