ARTEMIS-2817 Support Stomp subscription with FQQN+multicast
This commit is contained in:
parent
2217b96f6a
commit
20daf2354c
|
@ -45,6 +45,7 @@ import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl;
|
|||
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
|
||||
import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
|
||||
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
|
||||
import org.apache.activemq.artemis.utils.CompositeAddress;
|
||||
import org.apache.activemq.artemis.utils.ConfigurationHelper;
|
||||
import org.apache.activemq.artemis.utils.PendingTask;
|
||||
import org.apache.activemq.artemis.utils.UUIDGenerator;
|
||||
|
@ -259,7 +260,8 @@ public class StompSession implements SessionCallback {
|
|||
|
||||
Set<RoutingType> routingTypes = manager.getServer().getAddressInfo(getCoreSession().removePrefix(address)).getRoutingTypes();
|
||||
boolean multicast = routingTypes.size() == 1 && routingTypes.contains(RoutingType.MULTICAST);
|
||||
if (multicast) {
|
||||
// if the destination is FQQN then the queue will have already been created
|
||||
if (multicast && !CompositeAddress.isFullyQualified(destination)) {
|
||||
// subscribes to a topic
|
||||
if (durableSubscriptionName != null) {
|
||||
if (clientID == null) {
|
||||
|
|
|
@ -204,7 +204,7 @@ public class FQQNStompTest extends StompTestBase {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testSendFQQNAutoCreateOnSend() throws Exception {
|
||||
public void testAutoCreateOnSendFQQN() throws Exception {
|
||||
final SimpleString myAddress = SimpleString.toSimpleString("myAddress");
|
||||
final SimpleString q1Name = SimpleString.toSimpleString("q1");
|
||||
|
||||
|
@ -224,30 +224,68 @@ public class FQQNStompTest extends StompTestBase {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testSendFQQNAutoCreateOnSubscribe() throws Exception {
|
||||
public void testAutoCreateOnSubscribeFQQNAnycast() throws Exception {
|
||||
internalTestAutoCreateOnSubscribeFQQN(RoutingType.ANYCAST);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAutoCreateOnSubscribeFQQNMulticast() throws Exception {
|
||||
internalTestAutoCreateOnSubscribeFQQN(RoutingType.MULTICAST);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAutoCreateOnSubscribeFQQNNoRoutingType() throws Exception {
|
||||
internalTestAutoCreateOnSubscribeFQQN(null);
|
||||
}
|
||||
|
||||
private void internalTestAutoCreateOnSubscribeFQQN(RoutingType routingType) throws Exception {
|
||||
final SimpleString myAddress = SimpleString.toSimpleString("myAddress");
|
||||
final SimpleString q1Name = SimpleString.toSimpleString("q1");
|
||||
final SimpleString q2Name = SimpleString.toSimpleString("q2");
|
||||
|
||||
StompClientConnection consumer1Connection = StompClientConnectionFactory.createClientConnection(uri);
|
||||
consumer1Connection.connect(defUser, defPass);
|
||||
subscribeQueue(consumer1Connection, "sub-01", myAddress + "\\c\\c" + q1Name);
|
||||
|
||||
ClientStompFrame frame = consumer1Connection
|
||||
.createFrame(Stomp.Commands.SUBSCRIBE)
|
||||
.addHeader(Stomp.Headers.Subscribe.DESTINATION, myAddress + "\\c\\c" + q1Name)
|
||||
.addHeader(Stomp.Headers.Subscribe.ID, "sub-01")
|
||||
.addHeader(Stomp.Headers.Subscribe.ACK_MODE, Stomp.Headers.Subscribe.AckModeValues.AUTO);
|
||||
|
||||
if (routingType != null) {
|
||||
frame.addHeader(Stomp.Headers.Subscribe.SUBSCRIPTION_TYPE, routingType.toString());
|
||||
}
|
||||
|
||||
consumer1Connection.sendFrame(frame);
|
||||
|
||||
assertTrue(Wait.waitFor(() -> server.locateQueue(q1Name) != null, 2000, 100));
|
||||
|
||||
StompClientConnection consumer2Connection = StompClientConnectionFactory.createClientConnection(uri);
|
||||
consumer2Connection.connect(defUser, defPass);
|
||||
subscribeQueue(consumer2Connection, "sub-02", myAddress + "\\c\\c" + q2Name);
|
||||
|
||||
frame = consumer2Connection
|
||||
.createFrame(Stomp.Commands.SUBSCRIBE)
|
||||
.addHeader(Stomp.Headers.Subscribe.DESTINATION, myAddress + "\\c\\c" + q2Name)
|
||||
.addHeader(Stomp.Headers.Subscribe.ID, "sub-02")
|
||||
.addHeader(Stomp.Headers.Subscribe.ACK_MODE, Stomp.Headers.Subscribe.AckModeValues.AUTO);
|
||||
|
||||
if (routingType != null) {
|
||||
frame.addHeader(Stomp.Headers.Subscribe.SUBSCRIPTION_TYPE, routingType.toString());
|
||||
}
|
||||
|
||||
consumer2Connection.sendFrame(frame);
|
||||
|
||||
assertTrue(Wait.waitFor(() -> server.locateQueue(q1Name) != null, 2000, 100));
|
||||
assertTrue(Wait.waitFor(() -> server.locateQueue(q2Name) != null, 2000, 100));
|
||||
|
||||
StompClientConnection senderConnection = StompClientConnectionFactory.createClientConnection(uri);
|
||||
senderConnection.connect(defUser, defPass);
|
||||
send(senderConnection, myAddress + "\\c\\c" + q1Name, null, "Hello World!", false, RoutingType.ANYCAST);
|
||||
send(senderConnection, myAddress + "\\c\\c" + q1Name, null, "Hello World!", false, routingType);
|
||||
|
||||
assertTrue(Wait.waitFor(() -> server.locateQueue(q1Name).getMessagesAdded() == 1, 2000, 100));
|
||||
assertTrue(Wait.waitFor(() -> server.locateQueue(q2Name).getMessagesAdded() == 0, 2000, 100));
|
||||
|
||||
ClientStompFrame frame = consumer1Connection.receiveFrame(2000);
|
||||
frame = consumer1Connection.receiveFrame(2000);
|
||||
assertNotNull(frame);
|
||||
assertEquals("Hello World!", frame.getBody());
|
||||
assertTrue(Wait.waitFor(() -> server.locateQueue(q1Name).getMessageCount() == 0, 4000, 100));
|
||||
|
|
Loading…
Reference in New Issue