ARTEMIS-3201 - configured diverts arent persisted

https://issues.apache.org/jira/browse/ARTEMIS-3201
This commit is contained in:
Andy Taylor 2021-03-23 12:07:42 +00:00 committed by clebertsuconic
parent e45c1d98d4
commit 69bea6756c
4 changed files with 118 additions and 11 deletions

View File

@ -82,7 +82,6 @@ import org.apache.activemq.artemis.core.messagecounter.MessageCounterManager;
import org.apache.activemq.artemis.core.messagecounter.impl.MessageCounterManagerImpl;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.config.PersistedAddressSetting;
import org.apache.activemq.artemis.core.persistence.config.PersistedDivertConfiguration;
import org.apache.activemq.artemis.core.persistence.config.PersistedSecuritySetting;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.Bindings;
@ -3590,7 +3589,6 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
TransformerConfiguration transformerConfiguration = transformerClassName == null || transformerClassName.isEmpty() ? null : new TransformerConfiguration(transformerClassName).setProperties(transformerProperties);
DivertConfiguration config = new DivertConfiguration().setName(name).setRoutingName(routingName).setAddress(address).setForwardingAddress(forwardingAddress).setExclusive(exclusive).setFilterString(filterString).setTransformerConfiguration(transformerConfiguration).setRoutingType(ComponentConfigurationRoutingType.valueOf(routingType));
server.deployDivert(config);
storageManager.storeDivertConfiguration(new PersistedDivertConfiguration(config));
} finally {
blockOnIO();
}

View File

@ -2752,7 +2752,6 @@ public class ActiveMQServerImpl implements ActiveMQServer {
return federationManager;
}
@Override
public Divert deployDivert(DivertConfiguration config) throws Exception {
if (config.getName() == null) {
@ -2791,6 +2790,8 @@ public class ActiveMQServerImpl implements ActiveMQServer {
Binding binding = new DivertBinding(storageManager.generateID(), sAddress, divert);
storageManager.storeDivertConfiguration(new PersistedDivertConfiguration(config));
postOffice.addBinding(binding);
managementService.registerDivert(divert);
@ -2831,6 +2832,8 @@ public class ActiveMQServerImpl implements ActiveMQServer {
divert.setRoutingType(config.getRoutingType());
}
storageManager.storeDivertConfiguration(new PersistedDivertConfiguration(config));
return divert;
}
@ -3594,14 +3597,6 @@ public class ActiveMQServerImpl implements ActiveMQServer {
securityRepository.addMatch(roleItem.getAddressMatch().toString(), setRoles);
}
List<PersistedDivertConfiguration> persistedDivertConfigurations = storageManager.recoverDivertConfigurations();
if (persistedDivertConfigurations != null) {
for (PersistedDivertConfiguration persistedDivertConfiguration : persistedDivertConfigurations) {
configuration.getDivertConfigurations().add(persistedDivertConfiguration.getDivertConfiguration());
}
}
}
@Override
@ -4060,6 +4055,21 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}
private void deployDiverts() throws Exception {
if (storageManager.recoverDivertConfigurations() != null) {
for (PersistedDivertConfiguration persistedDivertConfiguration : storageManager.recoverDivertConfigurations()) {
boolean deleted = true;
for (DivertConfiguration config : configuration.getDivertConfigurations()) {
if (persistedDivertConfiguration.getName().equals(config.getName())) {
deleted = false;
}
}
if (deleted) {
//todo add a flag to specify whether to delete or not
deployDivert(persistedDivertConfiguration.getDivertConfiguration());
}
}
}
for (DivertConfiguration config : configuration.getDivertConfigurations()) {
deployDivert(config);
}

View File

@ -741,6 +741,101 @@ public class DivertTest extends ActiveMQTestBase {
Assert.assertNull(consumer4.receiveImmediate());
}
@Test
public void testSinglePersistedDivert() throws Exception {
final String testAddress = "testAddress";
final String forwardAddress = "forwardAddress";
DivertConfiguration divertConf = new DivertConfiguration().setName("divert1").setRoutingName("divert1").setAddress(testAddress).setForwardingAddress(forwardAddress).setExclusive(true);
QueueConfiguration q1 = new QueueConfiguration("forwardAddress1").setDurable(true).setRoutingType(RoutingType.ANYCAST);
Configuration config = createDefaultInVMConfig().addDivertConfiguration(divertConf).addQueueConfiguration(q1);
ActiveMQServer server = addServer(ActiveMQServers.newActiveMQServer(config, true));
server.start();
server.stop();
divertConf.setRoutingName("divert2");
server.start();
Binding divert1 = server.getPostOffice().getBinding(new SimpleString("divert1"));
Assert.assertNotNull(divert1);
Assert.assertEquals(divert1.getRoutingName(), new SimpleString("divert2"));
}
@Test
public void testSinglePersistedNewDivert() throws Exception {
final String testAddress = "testAddress";
final String forwardAddress = "forwardAddress";
DivertConfiguration divertConf = new DivertConfiguration().setName("divert1").setRoutingName("divert1").setAddress(testAddress).setForwardingAddress(forwardAddress).setExclusive(true);
QueueConfiguration q1 = new QueueConfiguration("forwardAddress1").setDurable(true).setRoutingType(RoutingType.ANYCAST);
Configuration config = createDefaultInVMConfig().addDivertConfiguration(divertConf).addQueueConfiguration(q1);
ActiveMQServer server = addServer(ActiveMQServers.newActiveMQServer(config, true));
server.start();
server.stop();
divertConf = new DivertConfiguration().setName("divert1").setRoutingName("divert2").setAddress(testAddress).setForwardingAddress(forwardAddress).setExclusive(true);
config.getDivertConfigurations().clear();
config.getDivertConfigurations().add(divertConf);
server.start();
Binding divert1 = server.getPostOffice().getBinding(new SimpleString("divert1"));
Assert.assertNotNull(divert1);
Assert.assertEquals(divert1.getRoutingName(), new SimpleString("divert2"));
}
@Test
public void testSinglePersistedNoDeleteDivert() throws Exception {
final String testAddress = "testAddress";
final String forwardAddress = "forwardAddress";
DivertConfiguration divertConf = new DivertConfiguration().setName("divert1").setRoutingName("divert1").setAddress(testAddress).setForwardingAddress(forwardAddress).setExclusive(true);
QueueConfiguration q1 = new QueueConfiguration("forwardAddress1").setDurable(true).setRoutingType(RoutingType.ANYCAST);
Configuration config = createDefaultInVMConfig().addDivertConfiguration(divertConf).addQueueConfiguration(q1);
ActiveMQServer server = addServer(ActiveMQServers.newActiveMQServer(config, true));
server.start();
server.stop();
config.getDivertConfigurations().clear();
server.start();
Binding divert1 = server.getPostOffice().getBinding(new SimpleString("divert1"));
Assert.assertNotNull(divert1);
Assert.assertEquals(divert1.getRoutingName(), new SimpleString("divert1"));
}
@Test
public void testMultipleNonExclusiveDivert() throws Exception {
final String testAddress = "testAddress";

View File

@ -66,6 +66,7 @@ import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.impl.SecurityConfiguration;
import org.apache.activemq.artemis.core.messagecounter.impl.MessageCounterManagerImpl;
import org.apache.activemq.artemis.core.persistence.config.PersistedDivertConfiguration;
import org.apache.activemq.artemis.core.remoting.impl.invm.InVMAcceptorFactory;
import org.apache.activemq.artemis.core.remoting.impl.invm.TransportConstants;
import org.apache.activemq.artemis.core.security.Role;
@ -1770,6 +1771,9 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
divertNames = serverControl.getDivertNames();
assertEquals(1, divertNames.length);
assertEquals(name, divertNames[0]);
//now check its been persisted
PersistedDivertConfiguration pdc = server.getStorageManager().recoverDivertConfigurations().get(0);
assertEquals(pdc.getDivertConfiguration().getForwardingAddress(), updatedForwardingAddress);
// check that a message is no longer exclusively diverted
message = session.createMessage(false);