This commit is contained in:
Clebert Suconic 2020-07-07 08:21:22 -04:00
commit c433f50957
2 changed files with 47 additions and 7 deletions

View File

@ -45,6 +45,7 @@ import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
import org.apache.activemq.artemis.utils.CompositeAddress;
import org.apache.activemq.artemis.utils.ConfigurationHelper;
import org.apache.activemq.artemis.utils.PendingTask;
import org.apache.activemq.artemis.utils.UUIDGenerator;
@ -259,7 +260,8 @@ public class StompSession implements SessionCallback {
Set<RoutingType> routingTypes = manager.getServer().getAddressInfo(getCoreSession().removePrefix(address)).getRoutingTypes();
boolean multicast = routingTypes.size() == 1 && routingTypes.contains(RoutingType.MULTICAST);
if (multicast) {
// if the destination is FQQN then the queue will have already been created
if (multicast && !CompositeAddress.isFullyQualified(destination)) {
// subscribes to a topic
if (durableSubscriptionName != null) {
if (clientID == null) {

View File

@ -204,7 +204,7 @@ public class FQQNStompTest extends StompTestBase {
}
@Test
public void testSendFQQNAutoCreateOnSend() throws Exception {
public void testAutoCreateOnSendFQQN() throws Exception {
final SimpleString myAddress = SimpleString.toSimpleString("myAddress");
final SimpleString q1Name = SimpleString.toSimpleString("q1");
@ -224,30 +224,68 @@ public class FQQNStompTest extends StompTestBase {
}
@Test
public void testSendFQQNAutoCreateOnSubscribe() throws Exception {
public void testAutoCreateOnSubscribeFQQNAnycast() throws Exception {
internalTestAutoCreateOnSubscribeFQQN(RoutingType.ANYCAST);
}
@Test
public void testAutoCreateOnSubscribeFQQNMulticast() throws Exception {
internalTestAutoCreateOnSubscribeFQQN(RoutingType.MULTICAST);
}
@Test
public void testAutoCreateOnSubscribeFQQNNoRoutingType() throws Exception {
internalTestAutoCreateOnSubscribeFQQN(null);
}
private void internalTestAutoCreateOnSubscribeFQQN(RoutingType routingType) throws Exception {
final SimpleString myAddress = SimpleString.toSimpleString("myAddress");
final SimpleString q1Name = SimpleString.toSimpleString("q1");
final SimpleString q2Name = SimpleString.toSimpleString("q2");
StompClientConnection consumer1Connection = StompClientConnectionFactory.createClientConnection(uri);
consumer1Connection.connect(defUser, defPass);
subscribeQueue(consumer1Connection, "sub-01", myAddress + "\\c\\c" + q1Name);
ClientStompFrame frame = consumer1Connection
.createFrame(Stomp.Commands.SUBSCRIBE)
.addHeader(Stomp.Headers.Subscribe.DESTINATION, myAddress + "\\c\\c" + q1Name)
.addHeader(Stomp.Headers.Subscribe.ID, "sub-01")
.addHeader(Stomp.Headers.Subscribe.ACK_MODE, Stomp.Headers.Subscribe.AckModeValues.AUTO);
if (routingType != null) {
frame.addHeader(Stomp.Headers.Subscribe.SUBSCRIPTION_TYPE, routingType.toString());
}
consumer1Connection.sendFrame(frame);
assertTrue(Wait.waitFor(() -> server.locateQueue(q1Name) != null, 2000, 100));
StompClientConnection consumer2Connection = StompClientConnectionFactory.createClientConnection(uri);
consumer2Connection.connect(defUser, defPass);
subscribeQueue(consumer2Connection, "sub-02", myAddress + "\\c\\c" + q2Name);
frame = consumer2Connection
.createFrame(Stomp.Commands.SUBSCRIBE)
.addHeader(Stomp.Headers.Subscribe.DESTINATION, myAddress + "\\c\\c" + q2Name)
.addHeader(Stomp.Headers.Subscribe.ID, "sub-02")
.addHeader(Stomp.Headers.Subscribe.ACK_MODE, Stomp.Headers.Subscribe.AckModeValues.AUTO);
if (routingType != null) {
frame.addHeader(Stomp.Headers.Subscribe.SUBSCRIPTION_TYPE, routingType.toString());
}
consumer2Connection.sendFrame(frame);
assertTrue(Wait.waitFor(() -> server.locateQueue(q1Name) != null, 2000, 100));
assertTrue(Wait.waitFor(() -> server.locateQueue(q2Name) != null, 2000, 100));
StompClientConnection senderConnection = StompClientConnectionFactory.createClientConnection(uri);
senderConnection.connect(defUser, defPass);
send(senderConnection, myAddress + "\\c\\c" + q1Name, null, "Hello World!", false, RoutingType.ANYCAST);
send(senderConnection, myAddress + "\\c\\c" + q1Name, null, "Hello World!", false, routingType);
assertTrue(Wait.waitFor(() -> server.locateQueue(q1Name).getMessagesAdded() == 1, 2000, 100));
assertTrue(Wait.waitFor(() -> server.locateQueue(q2Name).getMessagesAdded() == 0, 2000, 100));
ClientStompFrame frame = consumer1Connection.receiveFrame(2000);
frame = consumer1Connection.receiveFrame(2000);
assertNotNull(frame);
assertEquals("Hello World!", frame.getBody());
assertTrue(Wait.waitFor(() -> server.locateQueue(q1Name).getMessageCount() == 0, 4000, 100));