diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java index 90518ec652..717ca8e193 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java @@ -71,7 +71,6 @@ public class OpenWireMessageConverter implements MessageConverter { private static final String AMQ_MSG_CLUSTER = AMQ_PREFIX + "CLUSTER"; private static final String AMQ_MSG_COMMAND_ID = AMQ_PREFIX + "COMMAND_ID"; private static final String AMQ_MSG_DATASTRUCTURE = AMQ_PREFIX + "DATASTRUCTURE"; - private static final String AMQ_MSG_DESTINATION = AMQ_PREFIX + "DESTINATION"; private static final String AMQ_MSG_GROUP_ID = AMQ_PREFIX + "GROUP_ID"; private static final String AMQ_MSG_GROUP_SEQUENCE = AMQ_PREFIX + "GROUP_SEQUENCE"; private static final String AMQ_MSG_MESSAGE_ID = AMQ_PREFIX + "MESSAGE_ID"; @@ -253,10 +252,6 @@ public class OpenWireMessageConverter implements MessageConverter { dsBytes.compact(); coreMessage.putBytesProperty(AMQ_MSG_DATASTRUCTURE, dsBytes.data); } - ActiveMQDestination dest = messageSend.getDestination(); - ByteSequence destBytes = marshaller.marshal(dest); - destBytes.compact(); - coreMessage.putBytesProperty(AMQ_MSG_DESTINATION, destBytes.data); String groupId = messageSend.getGroupID(); if (groupId != null) { coreMessage.putStringProperty(AMQ_MSG_GROUP_ID, groupId); @@ -269,18 +264,6 @@ public class OpenWireMessageConverter implements MessageConverter { midBytes.compact(); coreMessage.putBytesProperty(AMQ_MSG_MESSAGE_ID, midBytes.data); - ActiveMQDestination origDest = messageSend.getOriginalDestination(); - if (origDest != null) { - ByteSequence origDestBytes = marshaller.marshal(origDest); - origDestBytes.compact(); - coreMessage.putBytesProperty(AMQ_MSG_ORIG_DESTINATION, origDestBytes.data); - } - TransactionId origTxId = messageSend.getOriginalTransactionId(); - if (origTxId != null) { - ByteSequence origTxBytes = marshaller.marshal(origTxId); - origTxBytes.compact(); - coreMessage.putBytesProperty(AMQ_MSG_ORIG_TXID, origTxBytes.data); - } ProducerId producerId = messageSend.getProducerId(); if (producerId != null) { ByteSequence producerIdBytes = marshaller.marshal(producerId); @@ -375,7 +358,7 @@ public class OpenWireMessageConverter implements MessageConverter { public static MessageDispatch createMessageDispatch(ServerMessage message, int deliveryCount, AMQConsumer consumer) throws IOException { - ActiveMQMessage amqMessage = toAMQMessage(message, consumer.getMarshaller()); + ActiveMQMessage amqMessage = toAMQMessage(message, consumer.getMarshaller(), consumer.getActualDestination()); MessageDispatch md = new MessageDispatch(); md.setConsumerId(consumer.getId()); @@ -387,7 +370,7 @@ public class OpenWireMessageConverter implements MessageConverter { return md; } - private static ActiveMQMessage toAMQMessage(ServerMessage coreMessage, WireFormat marshaller) throws IOException { + private static ActiveMQMessage toAMQMessage(ServerMessage coreMessage, WireFormat marshaller, ActiveMQDestination actualDestination) throws IOException { ActiveMQMessage amqMsg = null; byte coreType = coreMessage.getType(); switch (coreType) { @@ -582,12 +565,7 @@ public class OpenWireMessageConverter implements MessageConverter { amqMsg.setDataStructure(ds); } - byte[] destBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_DESTINATION); - if (destBytes != null) { - ByteSequence seq = new ByteSequence(destBytes); - ActiveMQDestination dest = (ActiveMQDestination) marshaller.unmarshal(seq); - amqMsg.setDestination(dest); - } + amqMsg.setDestination(OpenWireUtil.toAMQAddress(coreMessage, actualDestination)); Object value = coreMessage.getObjectProperty(AMQ_MSG_GROUP_ID); if (value != null) { diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireUtil.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireUtil.java index fbd3aecafc..d68476165e 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireUtil.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireUtil.java @@ -18,11 +18,14 @@ package org.apache.activemq.artemis.core.protocol.openwire; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffers; +import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQServerSession; import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSession; import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; import org.apache.activemq.artemis.core.server.BindingQueryResult; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.util.ByteSequence; import org.apache.activemq.artemis.api.core.SimpleString; @@ -44,6 +47,23 @@ public class OpenWireUtil { } } + /** + * We convert the core address to an ActiveMQ Destination. We use the actual address on the message rather than the + * destination set on the consumer because it maybe different and the JMS spec says that it should be what ever was + * set on publish/send so a divert or wildcard may mean thats its different to the destination subscribed to by the + * consumer + */ + public static ActiveMQDestination toAMQAddress(ServerMessage message, ActiveMQDestination actualDestination) { + String address = message.getAddress().toString(); + String strippedAddress = address.replace("jms.queue.", "").replace("jms.topic.", ""); + if (actualDestination.isQueue()) { + return new ActiveMQQueue(strippedAddress); + } + else { + return new ActiveMQTopic(strippedAddress); + } + } + /** * Checks to see if this destination exists. If it does not throw an invalid destination exception. * diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java index 789e527f12..40d253e5dd 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java @@ -370,6 +370,10 @@ public class AMQConsumer implements BrowserListener { session.removeConsumer(nativeId); } + public org.apache.activemq.command.ActiveMQDestination getActualDestination() { + return actualDest; + } + private class MessagePullHandler { private long next = -1; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/interop/GeneralInteropTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/interop/GeneralInteropTest.java index b88d9ea7c3..3f2b7b602d 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/interop/GeneralInteropTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/interop/GeneralInteropTest.java @@ -69,11 +69,15 @@ public class GeneralInteropTest extends BasicOpenWireTest { assertEquals(text, textMessage.getText()); + assertEquals(destination, textMessage.getJMSDestination()); + //map messages sendMapMessageUsingCoreJms(queueName); MapMessage mapMessage = (MapMessage) consumer.receive(5000); + assertEquals(destination, mapMessage.getJMSDestination()); + assertTrue(mapMessage.getBoolean("aboolean")); assertEquals((byte) 4, mapMessage.getByte("abyte")); byte[] bytes = mapMessage.getBytes("abytes"); @@ -105,6 +109,9 @@ public class GeneralInteropTest extends BasicOpenWireTest { sendStreamMessageUsingCoreJms(queueName); StreamMessage streamMessage = (StreamMessage) consumer.receive(5000); + + assertEquals(destination, streamMessage.getJMSDestination()); + assertTrue(streamMessage.readBoolean()); assertEquals((byte) 2, streamMessage.readByte()); @@ -146,6 +153,8 @@ public class GeneralInteropTest extends BasicOpenWireTest { sendMessageUsingCoreJms(queueName); javax.jms.Message genericMessage = consumer.receive(5000); + + assertEquals(destination, genericMessage.getJMSDestination()); String value = genericMessage.getStringProperty("stringProperty"); assertEquals("HelloMessage", value); assertFalse(genericMessage.getBooleanProperty("booleanProperty")); @@ -171,6 +180,8 @@ public class GeneralInteropTest extends BasicOpenWireTest { for (int i = 0; i < num; i++) { TextMessage textMessage = (TextMessage) consumer.receive(5000); assertEquals(text + i, textMessage.getText()); + + assertEquals(destination, textMessage.getJMSDestination()); } } @@ -365,12 +376,15 @@ public class GeneralInteropTest extends BasicOpenWireTest { TextMessage txtMessage = (TextMessage) coreConsumer.receive(5000); assertEquals(text, txtMessage.getText()); + assertEquals(txtMessage.getJMSDestination(), queue); // map messages sendMapMessageUsingOpenWire(); MapMessage mapMessage = (MapMessage) coreConsumer.receive(5000); + assertEquals(mapMessage.getJMSDestination(), queue); + assertTrue(mapMessage.getBoolean("aboolean")); assertEquals((byte) 4, mapMessage.getByte("abyte")); byte[] bytes = mapMessage.getBytes("abytes"); @@ -392,6 +406,9 @@ public class GeneralInteropTest extends BasicOpenWireTest { sendObjectMessageUsingOpenWire(obj); ObjectMessage objectMessage = (ObjectMessage) coreConsumer.receive(5000); + + assertEquals(objectMessage.getJMSDestination(), queue); + SimpleSerializable data = (SimpleSerializable) objectMessage.getObject(); assertEquals(obj.objName, data.objName); @@ -402,6 +419,8 @@ public class GeneralInteropTest extends BasicOpenWireTest { sendStreamMessageUsingOpenWire(queueName); StreamMessage streamMessage = (StreamMessage) coreConsumer.receive(5000); + + assertEquals(streamMessage.getJMSDestination(), queue); assertTrue(streamMessage.readBoolean()); assertEquals((byte) 2, streamMessage.readByte()); @@ -426,6 +445,8 @@ public class GeneralInteropTest extends BasicOpenWireTest { sendBytesMessageUsingOpenWire(bytesData); BytesMessage bytesMessage = (BytesMessage) coreConsumer.receive(5000); + + assertEquals(bytesMessage.getJMSDestination(), queue); byte[] rawBytes = new byte[bytesData.length]; bytesMessage.readBytes(rawBytes); @@ -437,6 +458,7 @@ public class GeneralInteropTest extends BasicOpenWireTest { sendMessageUsingOpenWire(queueName); javax.jms.Message genericMessage = coreConsumer.receive(5000); + assertEquals(genericMessage.getJMSDestination(), queue); String value = genericMessage.getStringProperty("stringProperty"); assertEquals("HelloMessage", value); assertFalse(genericMessage.getBooleanProperty("booleanProperty")); @@ -471,6 +493,7 @@ public class GeneralInteropTest extends BasicOpenWireTest { for (int i = 0; i < num; i++) { TextMessage txtMessage = (TextMessage) coreConsumer.receive(5000); + assertEquals(txtMessage.getJMSDestination(), queue); assertEquals(text + i, txtMessage.getText()); } }