This commit is contained in:
Justin Bertram 2018-01-12 15:12:45 -06:00
commit adc0f02ae0
2 changed files with 57 additions and 17 deletions

View File

@ -260,25 +260,22 @@ public final class StompConnection implements RemotingConnection {
} }
} }
public boolean autoCreateDestinationIfPossible(String queue, RoutingType routingType) throws ActiveMQStompException { public void autoCreateDestinationIfPossible(String queue, RoutingType routingType) throws ActiveMQStompException {
boolean result = false;
ServerSession session = getSession().getCoreSession(); ServerSession session = getSession().getCoreSession();
try { try {
if (manager.getServer().getAddressInfo(SimpleString.toSimpleString(queue)) == null) { SimpleString simpleQueue = SimpleString.toSimpleString(queue);
if (manager.getServer().getAddressInfo(simpleQueue) == null) {
AddressSettings addressSettings = manager.getServer().getAddressSettingsRepository().getMatch(queue); AddressSettings addressSettings = manager.getServer().getAddressSettingsRepository().getMatch(queue);
if (routingType != null && routingType == RoutingType.MULTICAST && addressSettings.isAutoCreateAddresses()) {
session.createAddress(SimpleString.toSimpleString(queue), RoutingType.MULTICAST, true); RoutingType effectiveAddressRoutingType = routingType == null ? addressSettings.getDefaultAddressRoutingType() : routingType;
result = true;
} else {
if (addressSettings.isAutoCreateAddresses()) { if (addressSettings.isAutoCreateAddresses()) {
session.createAddress(SimpleString.toSimpleString(queue), RoutingType.ANYCAST, true); session.createAddress(simpleQueue, effectiveAddressRoutingType, true);
result = true;
}
if (addressSettings.isAutoCreateQueues()) {
session.createQueue(SimpleString.toSimpleString(queue), SimpleString.toSimpleString(queue), RoutingType.ANYCAST, null, false, true, true);
result = true;
} }
// only auto create the queue if the address is ANYCAST
if (effectiveAddressRoutingType == RoutingType.ANYCAST && addressSettings.isAutoCreateQueues()) {
session.createQueue(simpleQueue, simpleQueue, routingType == null ? addressSettings.getDefaultQueueRoutingType() : routingType, null, false, true, true);
} }
} }
} catch (ActiveMQQueueExistsException e) { } catch (ActiveMQQueueExistsException e) {
@ -286,8 +283,6 @@ public final class StompConnection implements RemotingConnection {
} catch (Exception e) { } catch (Exception e) {
throw new ActiveMQStompException(e.getMessage(), e).setHandler(frameHandler); throw new ActiveMQStompException(e.getMessage(), e).setHandler(frameHandler);
} }
return result;
} }
public void checkRoutingSemantics(String destination, RoutingType routingType) throws ActiveMQStompException { public void checkRoutingSemantics(String destination, RoutingType routingType) throws ActiveMQStompException {

View File

@ -50,7 +50,9 @@ import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
import org.apache.activemq.artemis.core.protocol.stomp.Stomp; import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
import org.apache.activemq.artemis.core.protocol.stomp.StompProtocolManagerFactory; import org.apache.activemq.artemis.core.protocol.stomp.StompProtocolManagerFactory;
import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.AddressQueryResult;
import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
@ -1665,4 +1667,47 @@ public class StompTest extends StompTestBase {
assertTrue(Wait.waitFor(() -> activeMQServer.locateQueue(SimpleString.toSimpleString(queueA)).getMessageCount() + activeMQServer.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount() == 1, 2000, 100)); assertTrue(Wait.waitFor(() -> activeMQServer.locateQueue(SimpleString.toSimpleString(queueA)).getMessageCount() + activeMQServer.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount() == 1, 2000, 100));
assertTrue(Wait.waitFor(() -> activeMQServer.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount() + activeMQServer.locateQueue(SimpleString.toSimpleString(queueD)).getMessageCount() == 2, 2000, 100)); assertTrue(Wait.waitFor(() -> activeMQServer.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount() + activeMQServer.locateQueue(SimpleString.toSimpleString(queueD)).getMessageCount() == 2, 2000, 100));
} }
@Test
public void testAutoCreatedAnycastAddress() throws Exception {
conn.connect(defUser, defPass);
String queueName = UUID.randomUUID().toString();
SimpleString simpleQueueName = SimpleString.toSimpleString(queueName);
ActiveMQServer activeMQServer = server.getActiveMQServer();
Assert.assertNull(activeMQServer.getAddressInfo(simpleQueueName));
Assert.assertNull(activeMQServer.locateQueue(simpleQueueName));
activeMQServer.getAddressSettingsRepository().addMatch(queueName, new AddressSettings()
.setDefaultAddressRoutingType(RoutingType.ANYCAST)
.setDefaultQueueRoutingType(RoutingType.ANYCAST)
);
send(conn, queueName, null, "Hello ANYCAST");
assertTrue("Address and queue should be created now", Wait.waitFor(() -> (activeMQServer.getAddressInfo(simpleQueueName) != null) && (activeMQServer.locateQueue(simpleQueueName) != null), 2000, 200));
assertTrue(activeMQServer.getAddressInfo(simpleQueueName).getRoutingTypes().contains(RoutingType.ANYCAST));
assertEquals(RoutingType.ANYCAST, activeMQServer.locateQueue(simpleQueueName).getRoutingType());
}
@Test
public void testAutoCreatedMulticastAddress() throws Exception {
conn.connect(defUser, defPass);
String queueName = UUID.randomUUID().toString();
SimpleString simpleQueueName = SimpleString.toSimpleString(queueName);
ActiveMQServer activeMQServer = server.getActiveMQServer();
Assert.assertNull(activeMQServer.getAddressInfo(simpleQueueName));
Assert.assertNull(activeMQServer.locateQueue(simpleQueueName));
send(conn, queueName, null, "Hello MULTICAST");
assertTrue("Address should be created now", Wait.waitFor(() -> (activeMQServer.getAddressInfo(simpleQueueName) != null), 2000, 200));
assertTrue(activeMQServer.getAddressInfo(simpleQueueName).getRoutingTypes().contains(RoutingType.MULTICAST));
Assert.assertNull(activeMQServer.locateQueue(simpleQueueName));
}
} }