From 0ee9420587043fb4f27be2315a09a85de66044c4 Mon Sep 17 00:00:00 2001 From: "Christopher L. Shannon (cshannon)" Date: Fri, 24 Feb 2017 14:05:27 -0500 Subject: [PATCH] AMQ-6610 - Network connector mbean not registered on reload On network connector add or update after broker start (such as the runtime plugins) the mbean was not created. There was also a couple of other properties not set. Fixed the network connector start to be the same for normal broker start and runtime reload. (cherry picked from commit bab0887ed60907d4ccac1824d83371a592f94925) --- .../apache/activemq/broker/BrokerService.java | 64 ++++++++++++------- .../plugin/NetworkConnectorProcessor.java | 6 +- .../java/JavaRuntimeConfigurationBroker.java | 2 +- .../apache/activemq/NetworkConnectorTest.java | 26 ++++++++ .../java/JavaNetworkConnectorTest.java | 30 ++++++++- .../activemq/network/SimpleNetworkTest.java | 3 + 6 files changed, 103 insertions(+), 28 deletions(-) diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java index 19910a5485..1df077efe1 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java @@ -50,6 +50,7 @@ import java.util.concurrent.atomic.AtomicLong; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; +import javax.management.InstanceNotFoundException; import javax.management.MalformedObjectNameException; import javax.management.ObjectName; @@ -2257,7 +2258,7 @@ public class BrokerService implements Service { } } - protected ObjectName createNetworkConnectorObjectName(NetworkConnector connector) throws MalformedObjectNameException { + public ObjectName createNetworkConnectorObjectName(NetworkConnector connector) throws MalformedObjectNameException { return BrokerMBeanSupport.createNetworkConnectorName(getBrokerObjectName(), "networkConnectors", connector.getName()); } @@ -2590,7 +2591,6 @@ public class BrokerService implements Service { * @throws Exception */ public void startAllConnectors() throws Exception { - Set durableDestinations = getBroker().getDurableDestinations(); List al = new ArrayList(); for (Iterator iter = getTransportConnectors().iterator(); iter.hasNext();) { TransportConnector connector = iter.next(); @@ -2629,26 +2629,7 @@ public class BrokerService implements Service { for (Iterator iter = getNetworkConnectors().iterator(); iter.hasNext();) { final NetworkConnector connector = iter.next(); connector.setLocalUri(uri); - connector.setBrokerName(getBrokerName()); - connector.setDurableDestinations(durableDestinations); - if (getDefaultSocketURIString() != null) { - connector.setBrokerURL(getDefaultSocketURIString()); - } - if (networkConnectorStartExecutor != null) { - networkConnectorStartExecutor.execute(new Runnable() { - @Override - public void run() { - try { - LOG.info("Async start of {}", connector); - connector.start(); - } catch(Exception e) { - LOG.error("Async start of network connector: {} failed", connector, e); - } - } - }); - } else { - connector.start(); - } + startNetworkConnector(connector, networkConnectorStartExecutor); } if (networkConnectorStartExecutor != null) { // executor done when enqueued tasks are complete @@ -2670,6 +2651,45 @@ public class BrokerService implements Service { } } + public void startNetworkConnector(final NetworkConnector connector, + final ThreadPoolExecutor networkConnectorStartExecutor) throws Exception { + connector.setBrokerName(getBrokerName()); + //set the durable destinations to match the broker if not set on the connector + if (connector.getDurableDestinations() == null) { + connector.setDurableDestinations(getBroker().getDurableDestinations()); + } + String defaultSocketURI = getDefaultSocketURIString(); + if (defaultSocketURI != null) { + connector.setBrokerURL(defaultSocketURI); + } + //If using the runtime plugin to start a network connector then the mbean needs + //to be added, under normal start it will already exist so check for InstanceNotFoundException + if (isUseJmx()) { + ObjectName networkMbean = createNetworkConnectorObjectName(connector); + try { + getManagementContext().getObjectInstance(networkMbean); + } catch (InstanceNotFoundException e) { + LOG.debug("Network connector MBean {} not found, registering", networkMbean); + registerNetworkConnectorMBean(connector); + } + } + if (networkConnectorStartExecutor != null) { + networkConnectorStartExecutor.execute(new Runnable() { + @Override + public void run() { + try { + LOG.info("Async start of {}", connector); + connector.start(); + } catch(Exception e) { + LOG.error("Async start of network connector: {} failed", connector, e); + } + } + }); + } else { + connector.start(); + } + } + public TransportConnector startTransportConnector(TransportConnector connector) throws Exception { connector.setBrokerService(this); connector.setTaskRunnerFactory(getTaskRunnerFactory()); diff --git a/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/NetworkConnectorProcessor.java b/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/NetworkConnectorProcessor.java index a761d5b0fd..6ea145423b 100644 --- a/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/NetworkConnectorProcessor.java +++ b/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/NetworkConnectorProcessor.java @@ -16,13 +16,13 @@ */ package org.apache.activemq.plugin; +import java.util.TreeMap; + import org.apache.activemq.network.DiscoveryNetworkConnector; import org.apache.activemq.network.NetworkConnector; import org.apache.activemq.schema.core.DtoNetworkConnector; import org.apache.activemq.util.IntrospectionSupport; -import java.util.TreeMap; - public class NetworkConnectorProcessor extends DefaultConfigurationProcessor { public NetworkConnectorProcessor(RuntimeConfigurationBroker plugin, Class configurationClass) { @@ -36,7 +36,7 @@ public class NetworkConnectorProcessor extends DefaultConfigurationProcessor { try { DiscoveryNetworkConnector nc = fromDto(networkConnector, new DiscoveryNetworkConnector()); plugin.getBrokerService().addNetworkConnector(nc); - nc.start(); + plugin.getBrokerService().startNetworkConnector(nc, null); plugin.info("started new network connector: " + nc); } catch (Exception e) { plugin.info("Failed to add new networkConnector " + networkConnector, e); diff --git a/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/java/JavaRuntimeConfigurationBroker.java b/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/java/JavaRuntimeConfigurationBroker.java index 66f85605a1..23faeecbd0 100644 --- a/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/java/JavaRuntimeConfigurationBroker.java +++ b/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/java/JavaRuntimeConfigurationBroker.java @@ -104,7 +104,7 @@ public class JavaRuntimeConfigurationBroker extends AbstractRuntimeConfiguration try { if (!getBrokerService().getNetworkConnectors().contains(nc)) { getBrokerService().addNetworkConnector(nc); - nc.start(); + getBrokerService().startNetworkConnector(nc, null); info("started new network connector: " + nc); } else { info("skipping network connector add, already exists: " + nc); diff --git a/activemq-runtime-config/src/test/java/org/apache/activemq/NetworkConnectorTest.java b/activemq-runtime-config/src/test/java/org/apache/activemq/NetworkConnectorTest.java index 97ced17029..400fbde0b7 100644 --- a/activemq-runtime-config/src/test/java/org/apache/activemq/NetworkConnectorTest.java +++ b/activemq-runtime-config/src/test/java/org/apache/activemq/NetworkConnectorTest.java @@ -17,8 +17,13 @@ package org.apache.activemq; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import javax.management.InstanceNotFoundException; import org.apache.activemq.network.NetworkConnector; import org.apache.activemq.util.Wait; @@ -57,6 +62,10 @@ public class NetworkConnectorTest extends RuntimeConfigTestSupport { assertEquals("one statically included", 1, networkConnector.getStaticallyIncludedDestinations().size()); assertEquals("one dynamically included", 1, networkConnector.getDynamicallyIncludedDestinations().size()); assertEquals("one durable", 1, networkConnector.getDurableDestinations().size()); + assertFalse(networkConnector.getBrokerName().isEmpty()); + + assertNotNull(brokerService.getManagementContext().getObjectInstance( + brokerService.createNetworkConnectorObjectName(networkConnector))); } @@ -84,6 +93,10 @@ public class NetworkConnectorTest extends RuntimeConfigTestSupport { applyNewConfig(brokerConfig, configurationSeed + "-mod-one-nc", SLEEP); assertEquals("no new network connectors", 1, brokerService.getNetworkConnectors().size()); assertSame("same instance", modNetworkConnector, brokerService.getNetworkConnectors().get(0)); + assertFalse(modNetworkConnector.getBrokerName().isEmpty()); + + assertNotNull(brokerService.getManagementContext().getObjectInstance( + brokerService.createNetworkConnectorObjectName(modNetworkConnector))); } @Test @@ -95,6 +108,8 @@ public class NetworkConnectorTest extends RuntimeConfigTestSupport { assertTrue("broker alive", brokerService.isStarted()); assertEquals("correct network connectors", 2, brokerService.getNetworkConnectors().size()); + NetworkConnector two = brokerService.getNetworkConnectors().get(1); + applyNewConfig(brokerConfig, configurationSeed + "-one-nc", SLEEP); assertTrue("expected mod on time", Wait.waitFor(new Wait.Condition() { @@ -106,5 +121,16 @@ public class NetworkConnectorTest extends RuntimeConfigTestSupport { NetworkConnector remainingNetworkConnector = brokerService.getNetworkConnectors().get(0); assertEquals("name match", "one", remainingNetworkConnector.getName()); + + try { + brokerService.getManagementContext().getObjectInstance( + brokerService.createNetworkConnectorObjectName(two)); + fail("mbean for nc2 should not exist"); + } catch (InstanceNotFoundException e) { + //should throw exception + } + + assertNotNull(brokerService.getManagementContext().getObjectInstance( + brokerService.createNetworkConnectorObjectName(remainingNetworkConnector))); } } diff --git a/activemq-runtime-config/src/test/java/org/apache/activemq/java/JavaNetworkConnectorTest.java b/activemq-runtime-config/src/test/java/org/apache/activemq/java/JavaNetworkConnectorTest.java index db0b71590b..254414df1b 100644 --- a/activemq-runtime-config/src/test/java/org/apache/activemq/java/JavaNetworkConnectorTest.java +++ b/activemq-runtime-config/src/test/java/org/apache/activemq/java/JavaNetworkConnectorTest.java @@ -17,14 +17,19 @@ package org.apache.activemq.java; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.net.URI; import java.util.Arrays; import java.util.HashSet; import java.util.concurrent.TimeUnit; +import javax.management.InstanceNotFoundException; + import org.apache.activemq.RuntimeConfigTestSupport; import org.apache.activemq.broker.BrokerPlugin; import org.apache.activemq.broker.BrokerService; @@ -61,7 +66,6 @@ public class JavaNetworkConnectorTest extends RuntimeConfigTestSupport { startBroker(brokerService); assertTrue("broker alive", brokerService.isStarted()); assertEquals("no network connectors", 0, brokerService.getNetworkConnectors().size()); - DiscoveryNetworkConnector nc = createNetworkConnector(); javaConfigBroker.addNetworkConnector(nc); @@ -85,6 +89,10 @@ public class JavaNetworkConnectorTest extends RuntimeConfigTestSupport { assertEquals("one statically included", 1, networkConnector.getStaticallyIncludedDestinations().size()); assertEquals("one dynamically included", 1, networkConnector.getDynamicallyIncludedDestinations().size()); assertEquals("one durable", 1, networkConnector.getDurableDestinations().size()); + assertFalse(networkConnector.getBrokerName().isEmpty()); + + assertNotNull(brokerService.getManagementContext().getObjectInstance( + brokerService.createNetworkConnectorObjectName(networkConnector))); } @@ -105,6 +113,8 @@ public class JavaNetworkConnectorTest extends RuntimeConfigTestSupport { // track the original NetworkConnector networkConnector = brokerService.getNetworkConnectors().get(0); assertEquals("network ttl is default", 1, networkConnector.getNetworkTTL()); + assertNotNull(networkConnector.getBrokerName()); + assertNotNull(networkConnector.getBrokerURL()); nc.setNetworkTTL(2); javaConfigBroker.updateNetworkConnector(nc); @@ -118,6 +128,10 @@ public class JavaNetworkConnectorTest extends RuntimeConfigTestSupport { javaConfigBroker.updateNetworkConnector(nc); assertEquals("no new network connectors", 1, brokerService.getNetworkConnectors().size()); assertSame("same instance", modNetworkConnector, brokerService.getNetworkConnectors().get(0)); + assertFalse(modNetworkConnector.getBrokerName().isEmpty()); + + assertNotNull(brokerService.getManagementContext().getObjectInstance( + brokerService.createNetworkConnectorObjectName(modNetworkConnector))); } @Test @@ -135,7 +149,7 @@ public class JavaNetworkConnectorTest extends RuntimeConfigTestSupport { DiscoveryNetworkConnector nc2 = new DiscoveryNetworkConnector(); nc2.setUri(new URI("static:(tcp://localhost:5555)")); nc2.setNetworkTTL(1); - nc2.setName("one"); + nc2.setName("two"); javaConfigBroker.addNetworkConnector(nc1); javaConfigBroker.addNetworkConnector(nc2); @@ -154,6 +168,18 @@ public class JavaNetworkConnectorTest extends RuntimeConfigTestSupport { NetworkConnector remainingNetworkConnector = brokerService.getNetworkConnectors().get(0); assertEquals("name match", "one", remainingNetworkConnector.getName()); + + try { + brokerService.getManagementContext().getObjectInstance( + brokerService.createNetworkConnectorObjectName(nc2)); + fail("mbean for nc2 should not exist"); + } catch (InstanceNotFoundException e) { + //should throw exception + } + + assertNotNull(brokerService.getManagementContext().getObjectInstance( + brokerService.createNetworkConnectorObjectName(nc1))); + } private DiscoveryNetworkConnector createNetworkConnector() throws Exception { diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java index 5c6b35ae07..73d8a4fb1a 100755 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java @@ -174,6 +174,9 @@ public class SimpleNetworkTest { assertNull(consumer2.receive(1000)); assertNetworkBridgeStatistics(MESSAGE_COUNT, 0); + + assertNotNull(localBroker.getManagementContext().getObjectInstance( + localBroker.createNetworkConnectorObjectName(localBroker.getNetworkConnectors().get(0)))); } private void waitForConsumerRegistration(final BrokerService brokerService, final int min, final ActiveMQDestination destination) throws Exception {