diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java index 031c426312..6ca37ea202 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java @@ -153,6 +153,12 @@ public interface Message { */ SimpleString HDR_ROUTING_TYPE = new SimpleString("_AMQ_ROUTING_TYPE"); + /** + * The prefix used (if any) when sending this message. For protocols (e.g. STOMP) that need to track this and restore + * the prefix when the message is consumed. + */ + SimpleString HDR_PREFIX = new SimpleString("_AMQ_PREFIX"); + byte DEFAULT_TYPE = 0; byte OBJECT_TYPE = 2; diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/PrefixUtil.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/PrefixUtil.java index 9c6e92a099..4066986bbd 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/PrefixUtil.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/PrefixUtil.java @@ -45,7 +45,20 @@ public class PrefixUtil { return address; } + public static SimpleString getPrefix(SimpleString address, Map prefixes) { + for (Map.Entry entry : prefixes.entrySet()) { + if (address.startsWith(entry.getKey())) { + return removeAddress(address, entry.getKey()); + } + } + return null; + } + public static SimpleString removePrefix(SimpleString string, SimpleString prefix) { return string.subSeq(prefix.length(), string.length()); } + + public static SimpleString removeAddress(SimpleString string, SimpleString prefix) { + return string.subSeq(0, prefix.length()); + } } diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompUtils.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompUtils.java index cd17982912..07dcd8fd15 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompUtils.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompUtils.java @@ -38,7 +38,7 @@ public class StompUtils { // Static -------------------------------------------------------- - public static void copyStandardHeadersFromFrameToMessage(StompFrame frame, Message msg) throws Exception { + public static void copyStandardHeadersFromFrameToMessage(StompFrame frame, Message msg, String prefix) throws Exception { Map headers = new HashMap<>(frame.getHeadersMap()); String priority = headers.remove(Stomp.Headers.Send.PRIORITY); @@ -89,6 +89,10 @@ public class StompUtils { } } + if (prefix != null) { + msg.putStringProperty(Message.HDR_PREFIX, prefix); + } + // now the general headers for (Entry entry : headers.entrySet()) { String name = entry.getKey(); @@ -101,7 +105,8 @@ public class StompUtils { StompFrame command, int deliveryCount) throws Exception { command.addHeader(Stomp.Headers.Message.MESSAGE_ID, String.valueOf(message.getMessageID())); - command.addHeader(Stomp.Headers.Message.DESTINATION, message.getAddress().toString()); + SimpleString prefix = message.getSimpleStringProperty(Message.HDR_PREFIX); + command.addHeader(Stomp.Headers.Message.DESTINATION, (prefix == null ? "" : prefix) + message.getAddress()); if (message.getObjectProperty(MessageUtil.CORRELATIONID_HEADER_NAME) != null) { command.addHeader(Stomp.Headers.Message.CORRELATION_ID, message.getObjectProperty(MessageUtil.CORRELATIONID_HEADER_NAME).toString()); @@ -135,6 +140,7 @@ public class StompUtils { name.equals(Message.HDR_CONTENT_TYPE) || name.equals(Message.HDR_VALIDATED_USER) || name.equals(Message.HDR_ROUTING_TYPE) || + name.equals(Message.HDR_PREFIX) || name.equals(MessageUtil.TYPE_HEADER_NAME) || name.equals(MessageUtil.CORRELATIONID_HEADER_NAME) || name.toString().equals(Stomp.Headers.Message.DESTINATION)) { diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java index 941cee6ab8..023d885378 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java @@ -200,7 +200,7 @@ public abstract class VersionedStompFrameHandler { } message.setTimestamp(timestamp); message.setAddress(SimpleString.toSimpleString(destination)); - StompUtils.copyStandardHeadersFromFrameToMessage(frame, message); + StompUtils.copyStandardHeadersFromFrameToMessage(frame, message, getPrefix(frame)); if (frame.hasHeader(Stomp.Headers.CONTENT_LENGTH)) { message.setType(Message.BYTES_TYPE); message.getBodyBuffer().writeBytes(frame.getBodyAsBytes()); @@ -291,6 +291,15 @@ public abstract class VersionedStompFrameHandler { return connection.getSession().getCoreSession().removePrefix(SimpleString.toSimpleString(destination)).toString(); } + public String getPrefix(StompFrame request) throws ActiveMQStompException { + String destination = request.getHeader(Headers.Send.DESTINATION); + if (destination == null) { + return null; + } + SimpleString prefix = connection.getSession().getCoreSession().getPrefix(SimpleString.toSimpleString(destination)); + return prefix == null ? null : prefix.toString(); + } + public StompFrame postprocess(StompFrame request) { StompFrame response = null; if (request.hasHeader(Stomp.Headers.RECEIPT_REQUESTED)) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java index 59a400bb72..f5cca75ac3 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java @@ -315,6 +315,14 @@ public interface ServerSession extends SecurityAuth { */ SimpleString removePrefix(SimpleString address); + /** + * Get the prefix (if it exists) from the address based on the prefixes provided to the ServerSession constructor. + * + * @param address the address to inspect + * @return the canonical (i.e. non-prefixed) address name + */ + SimpleString getPrefix(SimpleString address); + /** * Get the canonical (i.e. non-prefixed) address and the corresponding routing-type. * diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java index f92b45a1c7..0c6838daa3 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java @@ -1834,6 +1834,14 @@ public class ServerSessionImpl implements ServerSession, FailureListener { return address; } + @Override + public SimpleString getPrefix(SimpleString address) { + if (prefixEnabled && address != null) { + return PrefixUtil.getPrefix(address, prefixes); + } + return null; + } + @Override public AddressInfo getAddressAndRoutingType(AddressInfo addressInfo) { if (prefixEnabled) { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java index de6a11dc96..bc363f21ef 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java @@ -1621,6 +1621,7 @@ public class StompTest extends StompTestBase { Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand()); Assert.assertEquals("Hello World 2", frame.getBody()); Assert.assertEquals(RoutingType.ANYCAST.toString(), frame.getHeader(Stomp.Headers.Send.DESTINATION_TYPE)); + Assert.assertEquals("/queue/" + ADDRESS, frame.getHeader(Stomp.Headers.Send.DESTINATION)); frame = conn.receiveFrame(1000); Assert.assertNull(frame); @@ -1643,6 +1644,7 @@ public class StompTest extends StompTestBase { Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand()); Assert.assertEquals("Hello World 3", frame.getBody()); Assert.assertEquals(RoutingType.MULTICAST.toString(), frame.getHeader(Stomp.Headers.Send.DESTINATION_TYPE)); + Assert.assertEquals("/topic/" + ADDRESS, frame.getHeader(Stomp.Headers.Send.DESTINATION)); frame = conn.receiveFrame(1000); Assert.assertNull(frame); @@ -1699,6 +1701,7 @@ public class StompTest extends StompTestBase { Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand()); Assert.assertEquals("Hello World 2", frame.getBody()); Assert.assertEquals(RoutingType.MULTICAST.toString(), frame.getHeader(Stomp.Headers.Send.DESTINATION_TYPE)); + Assert.assertEquals("/topic/" + ADDRESS, frame.getHeader(Stomp.Headers.Send.DESTINATION)); frame = conn.receiveFrame(1000); Assert.assertNull(frame); @@ -1718,6 +1721,7 @@ public class StompTest extends StompTestBase { Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand()); Assert.assertEquals("Hello World 1", frame.getBody()); Assert.assertEquals(RoutingType.ANYCAST.toString(), frame.getHeader(Stomp.Headers.Send.DESTINATION_TYPE)); + Assert.assertEquals("/queue/" + ADDRESS, frame.getHeader(Stomp.Headers.Send.DESTINATION)); frame = conn.receiveFrame(2000); Assert.assertNull(frame);