This closes #2004
This commit is contained in:
commit
a8c3e1b5fe
|
@ -24,6 +24,7 @@ import java.util.Map.Entry;
|
|||
import java.util.Set;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.Message;
|
||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.client.impl.ClientMessageImpl;
|
||||
import org.apache.activemq.artemis.reader.MessageUtil;
|
||||
|
@ -123,6 +124,9 @@ public class StompUtils {
|
|||
if (message.getValidatedUserID() != null) {
|
||||
command.addHeader(Stomp.Headers.Message.VALIDATED_USER, message.getValidatedUserID());
|
||||
}
|
||||
if (message.getByteProperty(Message.HDR_ROUTING_TYPE.toString()) != null) {
|
||||
command.addHeader(Stomp.Headers.Send.DESTINATION_TYPE, RoutingType.getType(message.getByteProperty(Message.HDR_ROUTING_TYPE.toString())).toString());
|
||||
}
|
||||
|
||||
// now let's add all the rest of the message headers
|
||||
Set<SimpleString> names = message.getPropertyNames();
|
||||
|
@ -130,6 +134,7 @@ public class StompUtils {
|
|||
if (name.equals(ClientMessageImpl.REPLYTO_HEADER_NAME) ||
|
||||
name.equals(Message.HDR_CONTENT_TYPE) ||
|
||||
name.equals(Message.HDR_VALIDATED_USER) ||
|
||||
name.equals(Message.HDR_ROUTING_TYPE) ||
|
||||
name.equals(MessageUtil.TYPE_HEADER_NAME) ||
|
||||
name.equals(MessageUtil.CORRELATIONID_HEADER_NAME) ||
|
||||
name.toString().equals(Stomp.Headers.Message.DESTINATION)) {
|
||||
|
|
|
@ -400,6 +400,12 @@ If no indication of routing type is supplied then anycast semantics are used.
|
|||
The `destination` header maps to an address of the same name. If the `destination` header
|
||||
used a prefix then the prefix is stripped.
|
||||
|
||||
#### Receiving
|
||||
|
||||
When a client receives a message from the broker the message will have the `destination-type`
|
||||
header set to either `MULTICAST` or `ANYCAST` as determined when the message was originally
|
||||
sent/routed.
|
||||
|
||||
#### Subscribing
|
||||
|
||||
When a Stomp client subscribes to a destination (using a `SUBSCRIBE` frame), the protocol
|
||||
|
|
|
@ -704,6 +704,42 @@ public class StompTest extends StompTestBase {
|
|||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAnycastDestinationTypeMessageProperty() throws Exception {
|
||||
conn.connect(defUser, defPass);
|
||||
|
||||
subscribe(conn, null, Stomp.Headers.Subscribe.AckModeValues.AUTO);
|
||||
|
||||
send(conn, getQueuePrefix() + getQueueName(), null, getName(), true, RoutingType.ANYCAST);
|
||||
|
||||
ClientStompFrame frame = conn.receiveFrame(10000);
|
||||
Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
|
||||
Assert.assertEquals(getQueuePrefix() + getQueueName(), frame.getHeader(Stomp.Headers.Send.DESTINATION));
|
||||
Assert.assertEquals(RoutingType.ANYCAST.toString(), frame.getHeader(Stomp.Headers.Send.DESTINATION_TYPE));
|
||||
Assert.assertTrue(frame.getHeader(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE.toString()) == null);
|
||||
Assert.assertEquals(getName(), frame.getBody());
|
||||
|
||||
conn.disconnect();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMulticastDestinationTypeMessageProperty() throws Exception {
|
||||
conn.connect(defUser, defPass);
|
||||
|
||||
subscribeTopic(conn, null, null, null);
|
||||
|
||||
send(conn, getTopicPrefix() + getTopicName(), null, getName(), true, RoutingType.MULTICAST);
|
||||
|
||||
ClientStompFrame frame = conn.receiveFrame(10000);
|
||||
Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
|
||||
Assert.assertEquals(getTopicPrefix() + getTopicName(), frame.getHeader(Stomp.Headers.Send.DESTINATION));
|
||||
Assert.assertEquals(RoutingType.MULTICAST.toString(), frame.getHeader(Stomp.Headers.Send.DESTINATION_TYPE));
|
||||
Assert.assertTrue(frame.getHeader(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE.toString()) == null);
|
||||
Assert.assertEquals(getName(), frame.getBody());
|
||||
|
||||
conn.disconnect();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSubscribeWithAutoAckAndBytesMessage() throws Exception {
|
||||
conn.connect(defUser, defPass);
|
||||
|
|
Loading…
Reference in New Issue