ARTEMIS-4510 Add auto-create-destination logic to diverts

This commit is contained in:
AntonRoskvist 2024-04-08 11:42:34 +02:00 committed by Clebert Suconic
parent 4d6fc39560
commit 6c02950db3
2 changed files with 143 additions and 1 deletions

View File

@ -140,7 +140,7 @@ public class DivertImpl implements Divert {
copy = message;
}
postOffice.route(copy, new RoutingContextImpl(context.getTransaction()).setReusable(false).setRoutingType(copy.getRoutingType()), false);
postOffice.route(copy, new RoutingContextImpl(context.getTransaction()).setReusable(false).setRoutingType(copy.getRoutingType()).setServerSession(context.getServerSession()), false);
}
}

View File

@ -31,6 +31,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException;
import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
@ -51,6 +52,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType;
import org.apache.activemq.artemis.core.server.Divert;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.server.impl.QueueManagerImpl;
import org.apache.activemq.artemis.core.server.impl.ServiceRegistryImpl;
import org.apache.activemq.artemis.core.server.transformer.Transformer;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
@ -58,6 +60,7 @@ import org.apache.activemq.artemis.core.settings.impl.DeletionPolicy;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.tests.util.RandomUtil;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.command.ActiveMQTopic;
import org.junit.Assert;
import org.junit.Test;
@ -1775,4 +1778,143 @@ public class DivertTest extends ActiveMQTestBase {
Assert.assertEquals("testAddress" + COUNT, message.getAddress());
Assert.assertEquals("testAddress" + (COUNT - 1), message.getStringProperty(Message.HDR_ORIGINAL_ADDRESS));
}
@Test
public void testDivertToNewAddress() throws Exception {
final String queueName = "queue";
final String dummyQueueName = "dummy";
final String noDivertAutoCreateQName = "notAllowed";
final String propKey = "newQueue";
final String DIVERT = "myDivert";
final int numMessages = 10;
Transformer transformer = message -> message.setAddress(message.getStringProperty(propKey));
ServiceRegistryImpl serviceRegistry = new ServiceRegistryImpl();
serviceRegistry.addDivertTransformer(DIVERT, transformer);
AddressSettings autoCreateDestinationsAS = new AddressSettings().setAutoCreateAddresses(true).setAutoCreateQueues(true);
AddressSettings noAutoCreateDestinationsAS = new AddressSettings().setAutoCreateAddresses(false).setAutoCreateQueues(false);
ActiveMQServer server = addServer(new ActiveMQServerImpl(createDefaultInVMConfig(), null, null, null, serviceRegistry));
server.getConfiguration().addAddressSetting("#", autoCreateDestinationsAS);
server.getConfiguration().addAddressSetting(noDivertAutoCreateQName, noAutoCreateDestinationsAS);
server.start();
server.createQueue(new QueueConfiguration(queueName));
server.deployDivert(new DivertConfiguration()
.setName(DIVERT)
.setAddress(queueName)
.setRoutingType(ComponentConfigurationRoutingType.ANYCAST)
.setForwardingAddress(dummyQueueName)
.setExclusive(true));
ServerLocator locator = createInVMNonHALocator();
ClientSessionFactory sf = createSessionFactory(locator);
ClientSession session = sf.createSession(false, true, true);
session.start();
ClientMessage message;
ClientProducer producer = session.createProducer(queueName);
for (int i = 0; i < numMessages; i++) {
message = session.createMessage(true);
message.putStringProperty(propKey, queueName + "." + i);
producer.send(message);
}
for (int i = 0; i < numMessages; i++) {
ClientConsumer consumer = session.createConsumer(queueName + "." + i);
message = consumer.receive(DivertTest.TIMEOUT);
Assert.assertNotNull(message);
message.acknowledge();
consumer.close();
}
ClientMessage failMessage = session.createMessage(true);
assertThrows(ActiveMQAddressDoesNotExistException.class, () -> {
failMessage.putStringProperty(propKey, noDivertAutoCreateQName);
producer.send(failMessage);
});
producer.close();
Assert.assertNull(server.locateQueue(noDivertAutoCreateQName));
Assert.assertNull(server.locateQueue(dummyQueueName));
}
@Test
public void testHandleAutoDeleteDestination() throws Exception {
final String testAddress = "testAddress";
final String forwardAddress = "forwardAddress";
DivertConfiguration divertConf = new DivertConfiguration()
.setName("divert")
.setRoutingType(ComponentConfigurationRoutingType.ANYCAST)
.setExclusive(true)
.setAddress(testAddress)
.setForwardingAddress(forwardAddress);
AddressSettings addressSettings = new AddressSettings()
.setAutoCreateAddresses(true)
.setAutoCreateQueues(true)
.setAutoDeleteAddresses(true)
.setAutoDeleteQueues(true);
Configuration config = createDefaultInVMConfig().addDivertConfiguration(divertConf).addAddressSetting("#", addressSettings);
ActiveMQServer server = addServer(ActiveMQServers.newActiveMQServer(config, false));
server.start();
ServerLocator locator = createInVMNonHALocator();
ClientSessionFactory sf = createSessionFactory(locator);
ClientSession session = sf.createSession(false, true, true);
session.createQueue(new QueueConfiguration(testAddress).setAddress(testAddress).setRoutingType(RoutingType.ANYCAST).setDurable(true));
session.createQueue(new QueueConfiguration(forwardAddress).setAddress(forwardAddress).setRoutingType(RoutingType.ANYCAST).setDurable(true));
session.start();
ClientProducer producer = session.createProducer(testAddress);
ClientConsumer consumer = session.createConsumer(forwardAddress);
final int numMessages = 5;
for (int i = 0; i < numMessages; i++) {
ClientMessage message = session.createMessage(true);
message.setRoutingType(RoutingType.ANYCAST);
producer.send(message);
}
for (int i = 0; i < numMessages; i++) {
ClientMessage message = consumer.receive(DivertTest.TIMEOUT);
Assert.assertNotNull(message);
message.acknowledge();
}
Assert.assertNull(consumer.receiveImmediate());
consumer.close();
//Trigger autoDelete instead of waiting
QueueManagerImpl.performAutoDeleteQueue(server, server.locateQueue(forwardAddress));
Wait.assertTrue(() -> server.getPostOffice().getAddressInfo(SimpleString.toSimpleString(forwardAddress))
.getBindingRemovedTimestamp() != -1, DivertTest.TIMEOUT, 100);
for (int i = 0; i < numMessages; i++) {
ClientMessage message = session.createMessage(true);
producer.send(message);
}
consumer = session.createConsumer(forwardAddress);
for (int i = 0; i < numMessages; i++) {
ClientMessage message = consumer.receive(DivertTest.TIMEOUT);
Assert.assertNotNull(message);
message.acknowledge();
}
Assert.assertNull(consumer.receiveImmediate());
}
}