Adding a flag to be able to specificy when to apply changes immediately
after updating the virtual destinations using the Java runtime plugin.
This commit is contained in:
Christopher L. Shannon (cshannon) 2015-11-05 13:34:08 +00:00
parent b3d35472f9
commit 5db9af8b24
3 changed files with 72 additions and 0 deletions

View File

@ -122,6 +122,27 @@ public class AbstractRuntimeConfigurationBroker extends BrokerFilter {
}
}
/**
* Apply the destination work immediately instead of waiting for
* a connection add or destination add
*
* @throws Exception
*/
protected void applyDestinationWork() throws Exception {
Runnable work = addDestinationWork.poll();
if (work != null) {
try {
addDestinationBarrier.writeLock().lockInterruptibly();
do {
work.run();
work = addDestinationWork.poll();
} while (work != null);
} finally {
addDestinationBarrier.writeLock().unlock();
}
}
}
protected void debug(String s) {
LOG.debug(s);
}

View File

@ -61,6 +61,21 @@ public class JavaRuntimeConfigurationBroker extends AbstractRuntimeConfiguration
});
}
/**
* Set the virtual destinations and apply immediately, instead of waiting for a new
* destination or connection to trigger the work.
*
* @param virtualDestinations
* @param applyImmediately
* @throws Exception
*/
public void setVirtualDestinations(final VirtualDestination[] virtualDestinations, boolean applyImmediately) throws Exception {
setVirtualDestinations(virtualDestinations);
if (applyImmediately) {
this.applyDestinationWork();
}
}
//New Destinations
public void setDestinations(final ActiveMQDestination[] destinations) {
for (ActiveMQDestination destination : destinations) {

View File

@ -90,6 +90,7 @@ public class JavaVirtualDestTest extends AbstractVirtualDestTest {
TimeUnit.SECONDS.sleep(SLEEP);
assertSame("same instance", newValue, brokerService.getDestinationInterceptors()[0]);
}
@Test
public void testNewComposite() throws Exception {
startBroker(new BrokerService());
@ -105,6 +106,21 @@ public class JavaVirtualDestTest extends AbstractVirtualDestTest {
exerciseCompositeQueue("VirtualDestination.CompositeQueue", "VirtualDestination.QueueConsumer");
}
@Test
public void testNewCompositeApplyImmediately() throws Exception {
startBroker(new BrokerService());
assertTrue("broker alive", brokerService.isStarted());
CompositeQueue queue = buildCompositeQueue("VirtualDestination.CompositeQueue",
Arrays.asList(new ActiveMQQueue("VirtualDestination.QueueConsumer"),
new ActiveMQTopic("VirtualDestination.TopicConsumer")));
javaConfigBroker.setVirtualDestinations(new VirtualDestination[]{queue}, true);
TimeUnit.SECONDS.sleep(SLEEP);
exerciseCompositeQueue("VirtualDestination.CompositeQueue", "VirtualDestination.QueueConsumer");
}
@Test
public void testModComposite() throws Exception {
@ -272,6 +288,26 @@ public class JavaVirtualDestTest extends AbstractVirtualDestTest {
assertEquals("still one interceptor", 1, brokerService.getDestinationInterceptors().length);
}
@Test
public void testModApplyImmediately() throws Exception {
final BrokerService brokerService = new BrokerService();
brokerService.setDestinationInterceptors(new DestinationInterceptor[] {
buildInterceptor(new VirtualDestination[]{buildVirtualTopic("A.>", false)})});
startBroker(brokerService);
assertTrue("broker alive", brokerService.isStarted());
assertEquals("one interceptor", 1, brokerService.getDestinationInterceptors().length);
exerciseVirtualTopic("A.Default");
//apply new config
javaConfigBroker.setVirtualDestinations(new VirtualDestination[]{buildVirtualTopic("B.>", false)}, true);
TimeUnit.SECONDS.sleep(SLEEP);
exerciseVirtualTopic("B.Default");
assertEquals("still one interceptor", 1, brokerService.getDestinationInterceptors().length);
}
@Test
public void testModWithMirroredQueue() throws Exception {