ARTEMIS-1834 don't alter STOMP 'destination' header when using prefix
This commit is contained in:
parent
a66b7dda27
commit
90a604da20
|
@ -153,6 +153,12 @@ public interface Message {
|
||||||
*/
|
*/
|
||||||
SimpleString HDR_ROUTING_TYPE = new SimpleString("_AMQ_ROUTING_TYPE");
|
SimpleString HDR_ROUTING_TYPE = new SimpleString("_AMQ_ROUTING_TYPE");
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The prefix used (if any) when sending this message. For protocols (e.g. STOMP) that need to track this and restore
|
||||||
|
* the prefix when the message is consumed.
|
||||||
|
*/
|
||||||
|
SimpleString HDR_PREFIX = new SimpleString("_AMQ_PREFIX");
|
||||||
|
|
||||||
byte DEFAULT_TYPE = 0;
|
byte DEFAULT_TYPE = 0;
|
||||||
|
|
||||||
byte OBJECT_TYPE = 2;
|
byte OBJECT_TYPE = 2;
|
||||||
|
|
|
@ -45,7 +45,20 @@ public class PrefixUtil {
|
||||||
return address;
|
return address;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static SimpleString getPrefix(SimpleString address, Map<SimpleString, RoutingType> prefixes) {
|
||||||
|
for (Map.Entry<SimpleString, RoutingType> entry : prefixes.entrySet()) {
|
||||||
|
if (address.startsWith(entry.getKey())) {
|
||||||
|
return removeAddress(address, entry.getKey());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
public static SimpleString removePrefix(SimpleString string, SimpleString prefix) {
|
public static SimpleString removePrefix(SimpleString string, SimpleString prefix) {
|
||||||
return string.subSeq(prefix.length(), string.length());
|
return string.subSeq(prefix.length(), string.length());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static SimpleString removeAddress(SimpleString string, SimpleString prefix) {
|
||||||
|
return string.subSeq(0, prefix.length());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -38,7 +38,7 @@ public class StompUtils {
|
||||||
|
|
||||||
// Static --------------------------------------------------------
|
// Static --------------------------------------------------------
|
||||||
|
|
||||||
public static void copyStandardHeadersFromFrameToMessage(StompFrame frame, Message msg) throws Exception {
|
public static void copyStandardHeadersFromFrameToMessage(StompFrame frame, Message msg, String prefix) throws Exception {
|
||||||
Map<String, String> headers = new HashMap<>(frame.getHeadersMap());
|
Map<String, String> headers = new HashMap<>(frame.getHeadersMap());
|
||||||
|
|
||||||
String priority = headers.remove(Stomp.Headers.Send.PRIORITY);
|
String priority = headers.remove(Stomp.Headers.Send.PRIORITY);
|
||||||
|
@ -89,6 +89,10 @@ public class StompUtils {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (prefix != null) {
|
||||||
|
msg.putStringProperty(Message.HDR_PREFIX, prefix);
|
||||||
|
}
|
||||||
|
|
||||||
// now the general headers
|
// now the general headers
|
||||||
for (Entry<String, String> entry : headers.entrySet()) {
|
for (Entry<String, String> entry : headers.entrySet()) {
|
||||||
String name = entry.getKey();
|
String name = entry.getKey();
|
||||||
|
@ -101,7 +105,8 @@ public class StompUtils {
|
||||||
StompFrame command,
|
StompFrame command,
|
||||||
int deliveryCount) throws Exception {
|
int deliveryCount) throws Exception {
|
||||||
command.addHeader(Stomp.Headers.Message.MESSAGE_ID, String.valueOf(message.getMessageID()));
|
command.addHeader(Stomp.Headers.Message.MESSAGE_ID, String.valueOf(message.getMessageID()));
|
||||||
command.addHeader(Stomp.Headers.Message.DESTINATION, message.getAddress().toString());
|
SimpleString prefix = message.getSimpleStringProperty(Message.HDR_PREFIX);
|
||||||
|
command.addHeader(Stomp.Headers.Message.DESTINATION, (prefix == null ? "" : prefix) + message.getAddress());
|
||||||
|
|
||||||
if (message.getObjectProperty(MessageUtil.CORRELATIONID_HEADER_NAME) != null) {
|
if (message.getObjectProperty(MessageUtil.CORRELATIONID_HEADER_NAME) != null) {
|
||||||
command.addHeader(Stomp.Headers.Message.CORRELATION_ID, message.getObjectProperty(MessageUtil.CORRELATIONID_HEADER_NAME).toString());
|
command.addHeader(Stomp.Headers.Message.CORRELATION_ID, message.getObjectProperty(MessageUtil.CORRELATIONID_HEADER_NAME).toString());
|
||||||
|
@ -135,6 +140,7 @@ public class StompUtils {
|
||||||
name.equals(Message.HDR_CONTENT_TYPE) ||
|
name.equals(Message.HDR_CONTENT_TYPE) ||
|
||||||
name.equals(Message.HDR_VALIDATED_USER) ||
|
name.equals(Message.HDR_VALIDATED_USER) ||
|
||||||
name.equals(Message.HDR_ROUTING_TYPE) ||
|
name.equals(Message.HDR_ROUTING_TYPE) ||
|
||||||
|
name.equals(Message.HDR_PREFIX) ||
|
||||||
name.equals(MessageUtil.TYPE_HEADER_NAME) ||
|
name.equals(MessageUtil.TYPE_HEADER_NAME) ||
|
||||||
name.equals(MessageUtil.CORRELATIONID_HEADER_NAME) ||
|
name.equals(MessageUtil.CORRELATIONID_HEADER_NAME) ||
|
||||||
name.toString().equals(Stomp.Headers.Message.DESTINATION)) {
|
name.toString().equals(Stomp.Headers.Message.DESTINATION)) {
|
||||||
|
|
|
@ -200,7 +200,7 @@ public abstract class VersionedStompFrameHandler {
|
||||||
}
|
}
|
||||||
message.setTimestamp(timestamp);
|
message.setTimestamp(timestamp);
|
||||||
message.setAddress(SimpleString.toSimpleString(destination));
|
message.setAddress(SimpleString.toSimpleString(destination));
|
||||||
StompUtils.copyStandardHeadersFromFrameToMessage(frame, message);
|
StompUtils.copyStandardHeadersFromFrameToMessage(frame, message, getPrefix(frame));
|
||||||
if (frame.hasHeader(Stomp.Headers.CONTENT_LENGTH)) {
|
if (frame.hasHeader(Stomp.Headers.CONTENT_LENGTH)) {
|
||||||
message.setType(Message.BYTES_TYPE);
|
message.setType(Message.BYTES_TYPE);
|
||||||
message.getBodyBuffer().writeBytes(frame.getBodyAsBytes());
|
message.getBodyBuffer().writeBytes(frame.getBodyAsBytes());
|
||||||
|
@ -291,6 +291,15 @@ public abstract class VersionedStompFrameHandler {
|
||||||
return connection.getSession().getCoreSession().removePrefix(SimpleString.toSimpleString(destination)).toString();
|
return connection.getSession().getCoreSession().removePrefix(SimpleString.toSimpleString(destination)).toString();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public String getPrefix(StompFrame request) throws ActiveMQStompException {
|
||||||
|
String destination = request.getHeader(Headers.Send.DESTINATION);
|
||||||
|
if (destination == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
SimpleString prefix = connection.getSession().getCoreSession().getPrefix(SimpleString.toSimpleString(destination));
|
||||||
|
return prefix == null ? null : prefix.toString();
|
||||||
|
}
|
||||||
|
|
||||||
public StompFrame postprocess(StompFrame request) {
|
public StompFrame postprocess(StompFrame request) {
|
||||||
StompFrame response = null;
|
StompFrame response = null;
|
||||||
if (request.hasHeader(Stomp.Headers.RECEIPT_REQUESTED)) {
|
if (request.hasHeader(Stomp.Headers.RECEIPT_REQUESTED)) {
|
||||||
|
|
|
@ -315,6 +315,14 @@ public interface ServerSession extends SecurityAuth {
|
||||||
*/
|
*/
|
||||||
SimpleString removePrefix(SimpleString address);
|
SimpleString removePrefix(SimpleString address);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the prefix (if it exists) from the address based on the prefixes provided to the ServerSession constructor.
|
||||||
|
*
|
||||||
|
* @param address the address to inspect
|
||||||
|
* @return the canonical (i.e. non-prefixed) address name
|
||||||
|
*/
|
||||||
|
SimpleString getPrefix(SimpleString address);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the canonical (i.e. non-prefixed) address and the corresponding routing-type.
|
* Get the canonical (i.e. non-prefixed) address and the corresponding routing-type.
|
||||||
*
|
*
|
||||||
|
|
|
@ -1834,6 +1834,14 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
|
||||||
return address;
|
return address;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public SimpleString getPrefix(SimpleString address) {
|
||||||
|
if (prefixEnabled && address != null) {
|
||||||
|
return PrefixUtil.getPrefix(address, prefixes);
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public AddressInfo getAddressAndRoutingType(AddressInfo addressInfo) {
|
public AddressInfo getAddressAndRoutingType(AddressInfo addressInfo) {
|
||||||
if (prefixEnabled) {
|
if (prefixEnabled) {
|
||||||
|
|
|
@ -1621,6 +1621,7 @@ public class StompTest extends StompTestBase {
|
||||||
Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
|
Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
|
||||||
Assert.assertEquals("Hello World 2", frame.getBody());
|
Assert.assertEquals("Hello World 2", frame.getBody());
|
||||||
Assert.assertEquals(RoutingType.ANYCAST.toString(), frame.getHeader(Stomp.Headers.Send.DESTINATION_TYPE));
|
Assert.assertEquals(RoutingType.ANYCAST.toString(), frame.getHeader(Stomp.Headers.Send.DESTINATION_TYPE));
|
||||||
|
Assert.assertEquals("/queue/" + ADDRESS, frame.getHeader(Stomp.Headers.Send.DESTINATION));
|
||||||
frame = conn.receiveFrame(1000);
|
frame = conn.receiveFrame(1000);
|
||||||
Assert.assertNull(frame);
|
Assert.assertNull(frame);
|
||||||
|
|
||||||
|
@ -1643,6 +1644,7 @@ public class StompTest extends StompTestBase {
|
||||||
Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
|
Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
|
||||||
Assert.assertEquals("Hello World 3", frame.getBody());
|
Assert.assertEquals("Hello World 3", frame.getBody());
|
||||||
Assert.assertEquals(RoutingType.MULTICAST.toString(), frame.getHeader(Stomp.Headers.Send.DESTINATION_TYPE));
|
Assert.assertEquals(RoutingType.MULTICAST.toString(), frame.getHeader(Stomp.Headers.Send.DESTINATION_TYPE));
|
||||||
|
Assert.assertEquals("/topic/" + ADDRESS, frame.getHeader(Stomp.Headers.Send.DESTINATION));
|
||||||
frame = conn.receiveFrame(1000);
|
frame = conn.receiveFrame(1000);
|
||||||
Assert.assertNull(frame);
|
Assert.assertNull(frame);
|
||||||
|
|
||||||
|
@ -1699,6 +1701,7 @@ public class StompTest extends StompTestBase {
|
||||||
Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
|
Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
|
||||||
Assert.assertEquals("Hello World 2", frame.getBody());
|
Assert.assertEquals("Hello World 2", frame.getBody());
|
||||||
Assert.assertEquals(RoutingType.MULTICAST.toString(), frame.getHeader(Stomp.Headers.Send.DESTINATION_TYPE));
|
Assert.assertEquals(RoutingType.MULTICAST.toString(), frame.getHeader(Stomp.Headers.Send.DESTINATION_TYPE));
|
||||||
|
Assert.assertEquals("/topic/" + ADDRESS, frame.getHeader(Stomp.Headers.Send.DESTINATION));
|
||||||
frame = conn.receiveFrame(1000);
|
frame = conn.receiveFrame(1000);
|
||||||
Assert.assertNull(frame);
|
Assert.assertNull(frame);
|
||||||
|
|
||||||
|
@ -1718,6 +1721,7 @@ public class StompTest extends StompTestBase {
|
||||||
Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
|
Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
|
||||||
Assert.assertEquals("Hello World 1", frame.getBody());
|
Assert.assertEquals("Hello World 1", frame.getBody());
|
||||||
Assert.assertEquals(RoutingType.ANYCAST.toString(), frame.getHeader(Stomp.Headers.Send.DESTINATION_TYPE));
|
Assert.assertEquals(RoutingType.ANYCAST.toString(), frame.getHeader(Stomp.Headers.Send.DESTINATION_TYPE));
|
||||||
|
Assert.assertEquals("/queue/" + ADDRESS, frame.getHeader(Stomp.Headers.Send.DESTINATION));
|
||||||
frame = conn.receiveFrame(2000);
|
frame = conn.receiveFrame(2000);
|
||||||
Assert.assertNull(frame);
|
Assert.assertNull(frame);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue