ARTEMIS-2655 support auto-creation w/FQQN & STOMP
This commit is contained in:
parent
2af6f3d5a2
commit
ede2051960
|
@ -54,6 +54,7 @@ import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
|
||||||
import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
|
import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
|
||||||
import org.apache.activemq.artemis.spi.core.remoting.Connection;
|
import org.apache.activemq.artemis.spi.core.remoting.Connection;
|
||||||
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
|
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
|
||||||
|
import org.apache.activemq.artemis.utils.CompositeAddress;
|
||||||
import org.apache.activemq.artemis.utils.ConfigurationHelper;
|
import org.apache.activemq.artemis.utils.ConfigurationHelper;
|
||||||
import org.apache.activemq.artemis.utils.ExecutorFactory;
|
import org.apache.activemq.artemis.utils.ExecutorFactory;
|
||||||
import org.apache.activemq.artemis.utils.VersionLoader;
|
import org.apache.activemq.artemis.utils.VersionLoader;
|
||||||
|
@ -274,20 +275,20 @@ public final class StompConnection implements RemotingConnection {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void autoCreateDestinationIfPossible(String queue, RoutingType routingType) throws ActiveMQStompException {
|
public void autoCreateDestinationIfPossible(String destination, RoutingType routingType) throws ActiveMQStompException {
|
||||||
try {
|
try {
|
||||||
ServerSession session = getSession().getCoreSession();
|
SimpleString simpleDestination = SimpleString.toSimpleString(destination);
|
||||||
SimpleString simpleQueue = SimpleString.toSimpleString(queue);
|
AddressInfo addressInfo = manager.getServer().getAddressInfo(simpleDestination);
|
||||||
AddressInfo addressInfo = manager.getServer().getAddressInfo(simpleQueue);
|
AddressSettings addressSettings = manager.getServer().getAddressSettingsRepository().getMatch(destination);
|
||||||
AddressSettings addressSettings = manager.getServer().getAddressSettingsRepository().getMatch(queue);
|
|
||||||
RoutingType effectiveAddressRoutingType = routingType == null ? addressSettings.getDefaultAddressRoutingType() : routingType;
|
RoutingType effectiveAddressRoutingType = routingType == null ? addressSettings.getDefaultAddressRoutingType() : routingType;
|
||||||
|
ServerSession session = getSession().getCoreSession();
|
||||||
/**
|
/**
|
||||||
* If the address doesn't exist then it is created if possible.
|
* If the address doesn't exist then it is created if possible.
|
||||||
* If the address does exist but doesn't support the routing-type then the address is updated if possible.
|
* If the address does exist but doesn't support the routing-type then the address is updated if possible.
|
||||||
*/
|
*/
|
||||||
if (addressInfo == null) {
|
if (addressInfo == null) {
|
||||||
if (addressSettings.isAutoCreateAddresses()) {
|
if (addressSettings.isAutoCreateAddresses()) {
|
||||||
session.createAddress(simpleQueue, effectiveAddressRoutingType, true);
|
session.createAddress(simpleDestination, effectiveAddressRoutingType, true);
|
||||||
}
|
}
|
||||||
} else if (!addressInfo.getRoutingTypes().contains(effectiveAddressRoutingType)) {
|
} else if (!addressInfo.getRoutingTypes().contains(effectiveAddressRoutingType)) {
|
||||||
if (addressSettings.isAutoCreateAddresses()) {
|
if (addressSettings.isAutoCreateAddresses()) {
|
||||||
|
@ -296,13 +297,13 @@ public final class StompConnection implements RemotingConnection {
|
||||||
routingTypes.add(existingRoutingType);
|
routingTypes.add(existingRoutingType);
|
||||||
}
|
}
|
||||||
routingTypes.add(effectiveAddressRoutingType);
|
routingTypes.add(effectiveAddressRoutingType);
|
||||||
manager.getServer().updateAddressInfo(simpleQueue, routingTypes);
|
manager.getServer().updateAddressInfo(simpleDestination, routingTypes);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// only auto create the queue if the address is ANYCAST
|
// auto create the queue if the address is ANYCAST or FQQN
|
||||||
if (effectiveAddressRoutingType == RoutingType.ANYCAST && addressSettings.isAutoCreateQueues() && manager.getServer().locateQueue(simpleQueue) == null) {
|
if ((CompositeAddress.isFullyQualified(destination) || effectiveAddressRoutingType == RoutingType.ANYCAST) && addressSettings.isAutoCreateQueues() && manager.getServer().locateQueue(simpleDestination) == null) {
|
||||||
session.createQueue(new QueueConfiguration(simpleQueue).setRoutingType(effectiveAddressRoutingType).setAutoCreated(true));
|
session.createQueue(new QueueConfiguration(destination).setRoutingType(effectiveAddressRoutingType).setAutoCreated(true));
|
||||||
}
|
}
|
||||||
} catch (ActiveMQQueueExistsException e) {
|
} catch (ActiveMQQueueExistsException e) {
|
||||||
// ignore
|
// ignore
|
||||||
|
|
|
@ -203,4 +203,57 @@ public class FQQNStompTest extends StompTestBase {
|
||||||
assertEquals(Stomp.Responses.ERROR, frame.getCommand());
|
assertEquals(Stomp.Responses.ERROR, frame.getCommand());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSendFQQNAutoCreateOnSend() throws Exception {
|
||||||
|
final SimpleString myAddress = SimpleString.toSimpleString("myAddress");
|
||||||
|
final SimpleString q1Name = SimpleString.toSimpleString("q1");
|
||||||
|
|
||||||
|
conn.connect(defUser, defPass);
|
||||||
|
send(conn, myAddress + "\\c\\c" + q1Name, null, "Hello World!");
|
||||||
|
|
||||||
|
assertTrue(Wait.waitFor(() -> server.locateQueue(q1Name) != null, 2000, 100));
|
||||||
|
assertTrue(Wait.waitFor(() -> server.locateQueue(q1Name).getMessageCount() == 1, 2000, 100));
|
||||||
|
|
||||||
|
subscribeQueue(conn, "sub-01", myAddress + "\\c\\c" + q1Name);
|
||||||
|
ClientStompFrame frame = conn.receiveFrame(2000);
|
||||||
|
assertNotNull(frame);
|
||||||
|
assertEquals("Hello World!", frame.getBody());
|
||||||
|
assertTrue(Wait.waitFor(() -> server.locateQueue(q1Name).getMessageCount() == 0, 2000, 100));
|
||||||
|
|
||||||
|
unsubscribe(conn, "sub-01");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSendFQQNAutoCreateOnSubscribe() throws Exception {
|
||||||
|
final SimpleString myAddress = SimpleString.toSimpleString("myAddress");
|
||||||
|
final SimpleString q1Name = SimpleString.toSimpleString("q1");
|
||||||
|
final SimpleString q2Name = SimpleString.toSimpleString("q2");
|
||||||
|
|
||||||
|
StompClientConnection consumer1Connection = StompClientConnectionFactory.createClientConnection(uri);
|
||||||
|
consumer1Connection.connect(defUser, defPass);
|
||||||
|
subscribeQueue(consumer1Connection, "sub-01", myAddress + "\\c\\c" + q1Name);
|
||||||
|
|
||||||
|
StompClientConnection consumer2Connection = StompClientConnectionFactory.createClientConnection(uri);
|
||||||
|
consumer2Connection.connect(defUser, defPass);
|
||||||
|
subscribeQueue(consumer2Connection, "sub-02", myAddress + "\\c\\c" + q2Name);
|
||||||
|
|
||||||
|
assertTrue(Wait.waitFor(() -> server.locateQueue(q1Name) != null, 2000, 100));
|
||||||
|
assertTrue(Wait.waitFor(() -> server.locateQueue(q2Name) != null, 2000, 100));
|
||||||
|
|
||||||
|
StompClientConnection senderConnection = StompClientConnectionFactory.createClientConnection(uri);
|
||||||
|
senderConnection.connect(defUser, defPass);
|
||||||
|
send(senderConnection, myAddress + "\\c\\c" + q1Name, null, "Hello World!", false, RoutingType.ANYCAST);
|
||||||
|
|
||||||
|
assertTrue(Wait.waitFor(() -> server.locateQueue(q1Name).getMessagesAdded() == 1, 2000, 100));
|
||||||
|
assertTrue(Wait.waitFor(() -> server.locateQueue(q2Name).getMessagesAdded() == 0, 2000, 100));
|
||||||
|
|
||||||
|
ClientStompFrame frame = consumer1Connection.receiveFrame(2000);
|
||||||
|
assertNotNull(frame);
|
||||||
|
assertEquals("Hello World!", frame.getBody());
|
||||||
|
assertTrue(Wait.waitFor(() -> server.locateQueue(q1Name).getMessageCount() == 0, 4000, 100));
|
||||||
|
|
||||||
|
unsubscribe(consumer1Connection, "sub-01");
|
||||||
|
unsubscribe(consumer2Connection, "sub-02");
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue