This commit is contained in:
Justin Bertram 2020-05-01 13:40:24 -05:00
commit b81c595e3f
2 changed files with 64 additions and 10 deletions

View File

@ -54,6 +54,7 @@ import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.Acceptor; import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
import org.apache.activemq.artemis.spi.core.remoting.Connection; import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener; import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
import org.apache.activemq.artemis.utils.CompositeAddress;
import org.apache.activemq.artemis.utils.ConfigurationHelper; import org.apache.activemq.artemis.utils.ConfigurationHelper;
import org.apache.activemq.artemis.utils.ExecutorFactory; import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.apache.activemq.artemis.utils.VersionLoader; import org.apache.activemq.artemis.utils.VersionLoader;
@ -274,20 +275,20 @@ public final class StompConnection implements RemotingConnection {
} }
} }
public void autoCreateDestinationIfPossible(String queue, RoutingType routingType) throws ActiveMQStompException { public void autoCreateDestinationIfPossible(String destination, RoutingType routingType) throws ActiveMQStompException {
try { try {
ServerSession session = getSession().getCoreSession(); SimpleString simpleDestination = SimpleString.toSimpleString(destination);
SimpleString simpleQueue = SimpleString.toSimpleString(queue); AddressInfo addressInfo = manager.getServer().getAddressInfo(simpleDestination);
AddressInfo addressInfo = manager.getServer().getAddressInfo(simpleQueue); AddressSettings addressSettings = manager.getServer().getAddressSettingsRepository().getMatch(destination);
AddressSettings addressSettings = manager.getServer().getAddressSettingsRepository().getMatch(queue);
RoutingType effectiveAddressRoutingType = routingType == null ? addressSettings.getDefaultAddressRoutingType() : routingType; RoutingType effectiveAddressRoutingType = routingType == null ? addressSettings.getDefaultAddressRoutingType() : routingType;
ServerSession session = getSession().getCoreSession();
/** /**
* If the address doesn't exist then it is created if possible. * If the address doesn't exist then it is created if possible.
* If the address does exist but doesn't support the routing-type then the address is updated if possible. * If the address does exist but doesn't support the routing-type then the address is updated if possible.
*/ */
if (addressInfo == null) { if (addressInfo == null) {
if (addressSettings.isAutoCreateAddresses()) { if (addressSettings.isAutoCreateAddresses()) {
session.createAddress(simpleQueue, effectiveAddressRoutingType, true); session.createAddress(simpleDestination, effectiveAddressRoutingType, true);
} }
} else if (!addressInfo.getRoutingTypes().contains(effectiveAddressRoutingType)) { } else if (!addressInfo.getRoutingTypes().contains(effectiveAddressRoutingType)) {
if (addressSettings.isAutoCreateAddresses()) { if (addressSettings.isAutoCreateAddresses()) {
@ -296,13 +297,13 @@ public final class StompConnection implements RemotingConnection {
routingTypes.add(existingRoutingType); routingTypes.add(existingRoutingType);
} }
routingTypes.add(effectiveAddressRoutingType); routingTypes.add(effectiveAddressRoutingType);
manager.getServer().updateAddressInfo(simpleQueue, routingTypes); manager.getServer().updateAddressInfo(simpleDestination, routingTypes);
} }
} }
// only auto create the queue if the address is ANYCAST // auto create the queue if the address is ANYCAST or FQQN
if (effectiveAddressRoutingType == RoutingType.ANYCAST && addressSettings.isAutoCreateQueues() && manager.getServer().locateQueue(simpleQueue) == null) { if ((CompositeAddress.isFullyQualified(destination) || effectiveAddressRoutingType == RoutingType.ANYCAST) && addressSettings.isAutoCreateQueues() && manager.getServer().locateQueue(simpleDestination) == null) {
session.createQueue(new QueueConfiguration(simpleQueue).setRoutingType(effectiveAddressRoutingType).setAutoCreated(true)); session.createQueue(new QueueConfiguration(destination).setRoutingType(effectiveAddressRoutingType).setAutoCreated(true));
} }
} catch (ActiveMQQueueExistsException e) { } catch (ActiveMQQueueExistsException e) {
// ignore // ignore

View File

@ -203,4 +203,57 @@ public class FQQNStompTest extends StompTestBase {
assertEquals(Stomp.Responses.ERROR, frame.getCommand()); assertEquals(Stomp.Responses.ERROR, frame.getCommand());
} }
@Test
public void testSendFQQNAutoCreateOnSend() throws Exception {
final SimpleString myAddress = SimpleString.toSimpleString("myAddress");
final SimpleString q1Name = SimpleString.toSimpleString("q1");
conn.connect(defUser, defPass);
send(conn, myAddress + "\\c\\c" + q1Name, null, "Hello World!");
assertTrue(Wait.waitFor(() -> server.locateQueue(q1Name) != null, 2000, 100));
assertTrue(Wait.waitFor(() -> server.locateQueue(q1Name).getMessageCount() == 1, 2000, 100));
subscribeQueue(conn, "sub-01", myAddress + "\\c\\c" + q1Name);
ClientStompFrame frame = conn.receiveFrame(2000);
assertNotNull(frame);
assertEquals("Hello World!", frame.getBody());
assertTrue(Wait.waitFor(() -> server.locateQueue(q1Name).getMessageCount() == 0, 2000, 100));
unsubscribe(conn, "sub-01");
}
@Test
public void testSendFQQNAutoCreateOnSubscribe() throws Exception {
final SimpleString myAddress = SimpleString.toSimpleString("myAddress");
final SimpleString q1Name = SimpleString.toSimpleString("q1");
final SimpleString q2Name = SimpleString.toSimpleString("q2");
StompClientConnection consumer1Connection = StompClientConnectionFactory.createClientConnection(uri);
consumer1Connection.connect(defUser, defPass);
subscribeQueue(consumer1Connection, "sub-01", myAddress + "\\c\\c" + q1Name);
StompClientConnection consumer2Connection = StompClientConnectionFactory.createClientConnection(uri);
consumer2Connection.connect(defUser, defPass);
subscribeQueue(consumer2Connection, "sub-02", myAddress + "\\c\\c" + q2Name);
assertTrue(Wait.waitFor(() -> server.locateQueue(q1Name) != null, 2000, 100));
assertTrue(Wait.waitFor(() -> server.locateQueue(q2Name) != null, 2000, 100));
StompClientConnection senderConnection = StompClientConnectionFactory.createClientConnection(uri);
senderConnection.connect(defUser, defPass);
send(senderConnection, myAddress + "\\c\\c" + q1Name, null, "Hello World!", false, RoutingType.ANYCAST);
assertTrue(Wait.waitFor(() -> server.locateQueue(q1Name).getMessagesAdded() == 1, 2000, 100));
assertTrue(Wait.waitFor(() -> server.locateQueue(q2Name).getMessagesAdded() == 0, 2000, 100));
ClientStompFrame frame = consumer1Connection.receiveFrame(2000);
assertNotNull(frame);
assertEquals("Hello World!", frame.getBody());
assertTrue(Wait.waitFor(() -> server.locateQueue(q1Name).getMessageCount() == 0, 4000, 100));
unsubscribe(consumer1Connection, "sub-01");
unsubscribe(consumer2Connection, "sub-02");
}
} }