This commit is contained in:
Clebert Suconic 2018-09-26 09:23:57 -04:00
commit a5b19ad1e7
6 changed files with 109 additions and 34 deletions

View File

@ -2602,15 +2602,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
} }
// Deploy the rest of the stuff // Deploy the rest of the stuff
deployReloadableConfigFromConfiguration(configuration);
// Deploy predefined addresses
deployAddressesFromConfiguration();
// Deploy any predefined queues
deployQueuesFromConfiguration();
// Undeploy any addresses and queues not in config
undeployAddressesAndQueueNotInConfiguration();
// We need to call this here, this gives any dependent server a chance to deploy its own addresses // We need to call this here, this gives any dependent server a chance to deploy its own addresses
// this needs to be done before clustering is fully activated // this needs to be done before clustering is fully activated
@ -2830,6 +2822,8 @@ public class ActiveMQServerImpl implements ActiveMQServer {
} }
} }
private JournalLoadInformation[] loadJournals() throws Exception { private JournalLoadInformation[] loadJournals() throws Exception {
JournalLoader journalLoader = activation.createJournalLoader(postOffice, pagingManager, storageManager, queueFactory, nodeManager, managementService, groupingHandler, configuration, parentServer); JournalLoader journalLoader = activation.createJournalLoader(postOffice, pagingManager, storageManager, queueFactory, nodeManager, managementService, groupingHandler, configuration, parentServer);
@ -3480,18 +3474,26 @@ public class ActiveMQServerImpl implements ActiveMQServer {
@Override @Override
public void reload(URL uri) throws Exception { public void reload(URL uri) throws Exception {
if (isActive()) {
Configuration config = new FileConfigurationParser().parseMainConfig(uri.openStream()); Configuration config = new FileConfigurationParser().parseMainConfig(uri.openStream());
LegacyJMSConfiguration legacyJMSConfiguration = new LegacyJMSConfiguration(config); LegacyJMSConfiguration legacyJMSConfiguration = new LegacyJMSConfiguration(config);
legacyJMSConfiguration.parseConfiguration(uri.openStream()); legacyJMSConfiguration.parseConfiguration(uri.openStream());
configuration.setSecurityRoles(config.getSecurityRoles());
configuration.setAddressesSettings(config.getAddressesSettings());
configuration.setDivertConfigurations(config.getDivertConfigurations());
configuration.setAddressConfigurations(config.getAddressConfigurations());
configuration.setQueueConfigurations(config.getQueueConfigurations());
if (isActive()) {
deployReloadableConfigFromConfiguration(configuration);
}
}
}
private void deployReloadableConfigFromConfiguration(Configuration config) throws Exception {
ActiveMQServerLogger.LOGGER.reloadingConfiguration("security"); ActiveMQServerLogger.LOGGER.reloadingConfiguration("security");
securityRepository.swap(config.getSecurityRoles().entrySet()); securityRepository.swap(config.getSecurityRoles().entrySet());
configuration.setSecurityRoles(config.getSecurityRoles());
ActiveMQServerLogger.LOGGER.reloadingConfiguration("address settings"); ActiveMQServerLogger.LOGGER.reloadingConfiguration("address settings");
addressSettingsRepository.swap(config.getAddressesSettings().entrySet()); addressSettingsRepository.swap(config.getAddressesSettings().entrySet());
configuration.setAddressesSettings(config.getAddressesSettings());
ActiveMQServerLogger.LOGGER.reloadingConfiguration("diverts"); ActiveMQServerLogger.LOGGER.reloadingConfiguration("diverts");
for (DivertConfiguration divertConfig : config.getDivertConfigurations()) { for (DivertConfiguration divertConfig : config.getDivertConfigurations()) {
@ -3503,10 +3505,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
ActiveMQServerLogger.LOGGER.reloadingConfiguration("addresses"); ActiveMQServerLogger.LOGGER.reloadingConfiguration("addresses");
undeployAddressesAndQueueNotInConfiguration(config); undeployAddressesAndQueueNotInConfiguration(config);
deployAddressesFromConfiguration(config); deployAddressesFromConfiguration(config);
configuration.setAddressConfigurations(config.getAddressConfigurations()); deployQueuesFromListCoreQueueConfiguration(config.getQueueConfigurations());
configuration.setQueueConfigurations(config.getQueueConfigurations());
}
}
} }
public Set<ActivateCallback> getActivateCallbacks() { public Set<ActivateCallback> getActivateCallbacks() {

View File

@ -21,6 +21,7 @@ import java.net.URL;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.nio.file.StandardCopyOption; import java.nio.file.StandardCopyOption;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -43,6 +44,7 @@ import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.security.Role; import org.apache.activemq.artemis.core.security.Role;
import org.apache.activemq.artemis.core.server.embedded.EmbeddedActiveMQ; import org.apache.activemq.artemis.core.server.embedded.EmbeddedActiveMQ;
import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.junit.Wait; import org.apache.activemq.artemis.junit.Wait;
@ -205,6 +207,13 @@ public class RedeployTest extends ActiveMQTestBase {
@Test @Test
public void testRedeployWithFailover() throws Exception { public void testRedeployWithFailover() throws Exception {
Set<Role> original = new HashSet<>();
original.add(new Role("a", false, true, false, false, false, false, false, false, false, false));
Set<Role> changed = new HashSet<>();
changed.add(new Role("b", false, true, false, false, false, false, false, false, false, false));
EmbeddedActiveMQ live = new EmbeddedActiveMQ(); EmbeddedActiveMQ live = new EmbeddedActiveMQ();
EmbeddedActiveMQ backup = new EmbeddedActiveMQ(); EmbeddedActiveMQ backup = new EmbeddedActiveMQ();
@ -232,6 +241,11 @@ public class RedeployTest extends ActiveMQTestBase {
assertTrue(Wait.waitFor(() -> backup.getActiveMQServer().isReplicaSync(), 15000, 200)); assertTrue(Wait.waitFor(() -> backup.getActiveMQServer().isReplicaSync(), 15000, 200));
assertEquals("Test address settings original - live", AddressFullMessagePolicy.BLOCK, live.getActiveMQServer().getAddressSettingsRepository().getMatch("myQueue").getAddressFullMessagePolicy());
assertEquals("Test address settings original - backup", AddressFullMessagePolicy.BLOCK, backup.getActiveMQServer().getAddressSettingsRepository().getMatch("myQueue").getAddressFullMessagePolicy());
assertEquals("Test security settings original - live", original, live.getActiveMQServer().getSecurityRepository().getMatch("myQueue"));
assertEquals("Test security settings original - backup", original, backup.getActiveMQServer().getSecurityRepository().getMatch("myQueue"));
final ReusableLatch liveReloadLatch = new ReusableLatch(1); final ReusableLatch liveReloadLatch = new ReusableLatch(1);
Runnable liveTick = () -> liveReloadLatch.countDown(); Runnable liveTick = () -> liveReloadLatch.countDown();
live.getActiveMQServer().getReloadManager().setTick(liveTick); live.getActiveMQServer().getReloadManager().setTick(liveTick);
@ -259,9 +273,13 @@ public class RedeployTest extends ActiveMQTestBase {
Session session = connection.createSession(); Session session = connection.createSession();
Queue queue = session.createQueue("myQueue2"); Queue queue = session.createQueue("myQueue2");
MessageProducer producer = session.createProducer(queue); MessageProducer producer = session.createProducer(queue);
producer.send(session.createTextMessage("text")); producer.send(session.createTextMessage("text1"));
} }
assertFalse(backup.getActiveMQServer().isActive());
assertEquals("Test address settings redeploy - live", AddressFullMessagePolicy.PAGE, live.getActiveMQServer().getAddressSettingsRepository().getMatch("myQueue").getAddressFullMessagePolicy());
assertEquals("Test security settings redeploy - live", changed, live.getActiveMQServer().getSecurityRepository().getMatch("myQueue"));
live.stop(); live.stop();
assertTrue(Wait.waitFor(() -> (backup.getActiveMQServer().isActive()), 5000, 100)); assertTrue(Wait.waitFor(() -> (backup.getActiveMQServer().isActive()), 5000, 100));
@ -277,6 +295,8 @@ public class RedeployTest extends ActiveMQTestBase {
Assert.assertNotNull("Queue wasn't deployed accordingly", consumer.receive(5000)); Assert.assertNotNull("Queue wasn't deployed accordingly", consumer.receive(5000));
Assert.assertNotNull(consumer.receive(5000)); Assert.assertNotNull(consumer.receive(5000));
} }
assertEquals("Test address settings redeploy - backup", changed, backup.getActiveMQServer().getSecurityRepository().getMatch("myQueue"));
assertEquals("Test security settings redeploy - backup", AddressFullMessagePolicy.PAGE, backup.getActiveMQServer().getAddressSettingsRepository().getMatch("myQueue").getAddressFullMessagePolicy());
} finally { } finally {
live.stop(); live.stop();
backup.stop(); backup.stop();

View File

@ -78,5 +78,19 @@ under the License.
</anycast> </anycast>
</address> </address>
</addresses> </addresses>
<address-settings>
<address-setting match="#" >
<address-full-policy>PAGE</address-full-policy>
<config-delete-queues>FORCE</config-delete-queues>
<config-delete-addresses>FORCE</config-delete-addresses>
</address-setting>
</address-settings>
<security-settings>
<security-setting match="#" >
<permission type="consume" roles="b" />
</security-setting>
</security-settings>
</core> </core>
</configuration> </configuration>

View File

@ -73,5 +73,19 @@ under the License.
</anycast> </anycast>
</address> </address>
</addresses> </addresses>
<address-settings>
<address-setting match="#" >
<address-full-policy>BLOCK</address-full-policy>
<config-delete-queues>FORCE</config-delete-queues>
<config-delete-addresses>FORCE</config-delete-addresses>
</address-setting>
</address-settings>
<security-settings>
<security-setting match="#" >
<permission type="consume" roles="a" />
</security-setting>
</security-settings>
</core> </core>
</configuration> </configuration>

View File

@ -78,5 +78,19 @@ under the License.
</anycast> </anycast>
</address> </address>
</addresses> </addresses>
<address-settings>
<address-setting match="#" >
<address-full-policy>PAGE</address-full-policy>
<config-delete-queues>FORCE</config-delete-queues>
<config-delete-addresses>FORCE</config-delete-addresses>
</address-setting>
</address-settings>
<security-settings>
<security-setting match="#" >
<permission type="consume" roles="b" />
</security-setting>
</security-settings>
</core> </core>
</configuration> </configuration>

View File

@ -73,5 +73,19 @@ under the License.
</anycast> </anycast>
</address> </address>
</addresses> </addresses>
<address-settings>
<address-setting match="#" >
<address-full-policy>BLOCK</address-full-policy>
<config-delete-queues>FORCE</config-delete-queues>
<config-delete-addresses>FORCE</config-delete-addresses>
</address-setting>
</address-settings>
<security-settings>
<security-setting match="#" >
<permission type="consume" roles="a" />
</security-setting>
</security-settings>
</core> </core>
</configuration> </configuration>