AMQ-6610 - Network connector mbean not registered on reload

On network connector add or update after broker start (such as the
runtime plugins) the mbean was not created.  There was also a couple of
other properties not set. Fixed the network connector start to be the
same for normal broker start and runtime reload.

(cherry picked from commit bab0887ed6)
This commit is contained in:
Christopher L. Shannon (cshannon) 2017-02-24 14:05:27 -05:00
parent e575157093
commit 0ee9420587
6 changed files with 103 additions and 28 deletions

View File

@ -50,6 +50,7 @@ import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy; import javax.annotation.PreDestroy;
import javax.management.InstanceNotFoundException;
import javax.management.MalformedObjectNameException; import javax.management.MalformedObjectNameException;
import javax.management.ObjectName; import javax.management.ObjectName;
@ -2257,7 +2258,7 @@ public class BrokerService implements Service {
} }
} }
protected ObjectName createNetworkConnectorObjectName(NetworkConnector connector) throws MalformedObjectNameException { public ObjectName createNetworkConnectorObjectName(NetworkConnector connector) throws MalformedObjectNameException {
return BrokerMBeanSupport.createNetworkConnectorName(getBrokerObjectName(), "networkConnectors", connector.getName()); return BrokerMBeanSupport.createNetworkConnectorName(getBrokerObjectName(), "networkConnectors", connector.getName());
} }
@ -2590,7 +2591,6 @@ public class BrokerService implements Service {
* @throws Exception * @throws Exception
*/ */
public void startAllConnectors() throws Exception { public void startAllConnectors() throws Exception {
Set<ActiveMQDestination> durableDestinations = getBroker().getDurableDestinations();
List<TransportConnector> al = new ArrayList<TransportConnector>(); List<TransportConnector> al = new ArrayList<TransportConnector>();
for (Iterator<TransportConnector> iter = getTransportConnectors().iterator(); iter.hasNext();) { for (Iterator<TransportConnector> iter = getTransportConnectors().iterator(); iter.hasNext();) {
TransportConnector connector = iter.next(); TransportConnector connector = iter.next();
@ -2629,26 +2629,7 @@ public class BrokerService implements Service {
for (Iterator<NetworkConnector> iter = getNetworkConnectors().iterator(); iter.hasNext();) { for (Iterator<NetworkConnector> iter = getNetworkConnectors().iterator(); iter.hasNext();) {
final NetworkConnector connector = iter.next(); final NetworkConnector connector = iter.next();
connector.setLocalUri(uri); connector.setLocalUri(uri);
connector.setBrokerName(getBrokerName()); startNetworkConnector(connector, networkConnectorStartExecutor);
connector.setDurableDestinations(durableDestinations);
if (getDefaultSocketURIString() != null) {
connector.setBrokerURL(getDefaultSocketURIString());
}
if (networkConnectorStartExecutor != null) {
networkConnectorStartExecutor.execute(new Runnable() {
@Override
public void run() {
try {
LOG.info("Async start of {}", connector);
connector.start();
} catch(Exception e) {
LOG.error("Async start of network connector: {} failed", connector, e);
}
}
});
} else {
connector.start();
}
} }
if (networkConnectorStartExecutor != null) { if (networkConnectorStartExecutor != null) {
// executor done when enqueued tasks are complete // executor done when enqueued tasks are complete
@ -2670,6 +2651,45 @@ public class BrokerService implements Service {
} }
} }
public void startNetworkConnector(final NetworkConnector connector,
final ThreadPoolExecutor networkConnectorStartExecutor) throws Exception {
connector.setBrokerName(getBrokerName());
//set the durable destinations to match the broker if not set on the connector
if (connector.getDurableDestinations() == null) {
connector.setDurableDestinations(getBroker().getDurableDestinations());
}
String defaultSocketURI = getDefaultSocketURIString();
if (defaultSocketURI != null) {
connector.setBrokerURL(defaultSocketURI);
}
//If using the runtime plugin to start a network connector then the mbean needs
//to be added, under normal start it will already exist so check for InstanceNotFoundException
if (isUseJmx()) {
ObjectName networkMbean = createNetworkConnectorObjectName(connector);
try {
getManagementContext().getObjectInstance(networkMbean);
} catch (InstanceNotFoundException e) {
LOG.debug("Network connector MBean {} not found, registering", networkMbean);
registerNetworkConnectorMBean(connector);
}
}
if (networkConnectorStartExecutor != null) {
networkConnectorStartExecutor.execute(new Runnable() {
@Override
public void run() {
try {
LOG.info("Async start of {}", connector);
connector.start();
} catch(Exception e) {
LOG.error("Async start of network connector: {} failed", connector, e);
}
}
});
} else {
connector.start();
}
}
public TransportConnector startTransportConnector(TransportConnector connector) throws Exception { public TransportConnector startTransportConnector(TransportConnector connector) throws Exception {
connector.setBrokerService(this); connector.setBrokerService(this);
connector.setTaskRunnerFactory(getTaskRunnerFactory()); connector.setTaskRunnerFactory(getTaskRunnerFactory());

View File

@ -16,13 +16,13 @@
*/ */
package org.apache.activemq.plugin; package org.apache.activemq.plugin;
import java.util.TreeMap;
import org.apache.activemq.network.DiscoveryNetworkConnector; import org.apache.activemq.network.DiscoveryNetworkConnector;
import org.apache.activemq.network.NetworkConnector; import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.schema.core.DtoNetworkConnector; import org.apache.activemq.schema.core.DtoNetworkConnector;
import org.apache.activemq.util.IntrospectionSupport; import org.apache.activemq.util.IntrospectionSupport;
import java.util.TreeMap;
public class NetworkConnectorProcessor extends DefaultConfigurationProcessor { public class NetworkConnectorProcessor extends DefaultConfigurationProcessor {
public NetworkConnectorProcessor(RuntimeConfigurationBroker plugin, Class configurationClass) { public NetworkConnectorProcessor(RuntimeConfigurationBroker plugin, Class configurationClass) {
@ -36,7 +36,7 @@ public class NetworkConnectorProcessor extends DefaultConfigurationProcessor {
try { try {
DiscoveryNetworkConnector nc = fromDto(networkConnector, new DiscoveryNetworkConnector()); DiscoveryNetworkConnector nc = fromDto(networkConnector, new DiscoveryNetworkConnector());
plugin.getBrokerService().addNetworkConnector(nc); plugin.getBrokerService().addNetworkConnector(nc);
nc.start(); plugin.getBrokerService().startNetworkConnector(nc, null);
plugin.info("started new network connector: " + nc); plugin.info("started new network connector: " + nc);
} catch (Exception e) { } catch (Exception e) {
plugin.info("Failed to add new networkConnector " + networkConnector, e); plugin.info("Failed to add new networkConnector " + networkConnector, e);

View File

@ -104,7 +104,7 @@ public class JavaRuntimeConfigurationBroker extends AbstractRuntimeConfiguration
try { try {
if (!getBrokerService().getNetworkConnectors().contains(nc)) { if (!getBrokerService().getNetworkConnectors().contains(nc)) {
getBrokerService().addNetworkConnector(nc); getBrokerService().addNetworkConnector(nc);
nc.start(); getBrokerService().startNetworkConnector(nc, null);
info("started new network connector: " + nc); info("started new network connector: " + nc);
} else { } else {
info("skipping network connector add, already exists: " + nc); info("skipping network connector add, already exists: " + nc);

View File

@ -17,8 +17,13 @@
package org.apache.activemq; package org.apache.activemq;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertSame; import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import javax.management.InstanceNotFoundException;
import org.apache.activemq.network.NetworkConnector; import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.util.Wait; import org.apache.activemq.util.Wait;
@ -57,6 +62,10 @@ public class NetworkConnectorTest extends RuntimeConfigTestSupport {
assertEquals("one statically included", 1, networkConnector.getStaticallyIncludedDestinations().size()); assertEquals("one statically included", 1, networkConnector.getStaticallyIncludedDestinations().size());
assertEquals("one dynamically included", 1, networkConnector.getDynamicallyIncludedDestinations().size()); assertEquals("one dynamically included", 1, networkConnector.getDynamicallyIncludedDestinations().size());
assertEquals("one durable", 1, networkConnector.getDurableDestinations().size()); assertEquals("one durable", 1, networkConnector.getDurableDestinations().size());
assertFalse(networkConnector.getBrokerName().isEmpty());
assertNotNull(brokerService.getManagementContext().getObjectInstance(
brokerService.createNetworkConnectorObjectName(networkConnector)));
} }
@ -84,6 +93,10 @@ public class NetworkConnectorTest extends RuntimeConfigTestSupport {
applyNewConfig(brokerConfig, configurationSeed + "-mod-one-nc", SLEEP); applyNewConfig(brokerConfig, configurationSeed + "-mod-one-nc", SLEEP);
assertEquals("no new network connectors", 1, brokerService.getNetworkConnectors().size()); assertEquals("no new network connectors", 1, brokerService.getNetworkConnectors().size());
assertSame("same instance", modNetworkConnector, brokerService.getNetworkConnectors().get(0)); assertSame("same instance", modNetworkConnector, brokerService.getNetworkConnectors().get(0));
assertFalse(modNetworkConnector.getBrokerName().isEmpty());
assertNotNull(brokerService.getManagementContext().getObjectInstance(
brokerService.createNetworkConnectorObjectName(modNetworkConnector)));
} }
@Test @Test
@ -95,6 +108,8 @@ public class NetworkConnectorTest extends RuntimeConfigTestSupport {
assertTrue("broker alive", brokerService.isStarted()); assertTrue("broker alive", brokerService.isStarted());
assertEquals("correct network connectors", 2, brokerService.getNetworkConnectors().size()); assertEquals("correct network connectors", 2, brokerService.getNetworkConnectors().size());
NetworkConnector two = brokerService.getNetworkConnectors().get(1);
applyNewConfig(brokerConfig, configurationSeed + "-one-nc", SLEEP); applyNewConfig(brokerConfig, configurationSeed + "-one-nc", SLEEP);
assertTrue("expected mod on time", Wait.waitFor(new Wait.Condition() { assertTrue("expected mod on time", Wait.waitFor(new Wait.Condition() {
@ -106,5 +121,16 @@ public class NetworkConnectorTest extends RuntimeConfigTestSupport {
NetworkConnector remainingNetworkConnector = brokerService.getNetworkConnectors().get(0); NetworkConnector remainingNetworkConnector = brokerService.getNetworkConnectors().get(0);
assertEquals("name match", "one", remainingNetworkConnector.getName()); assertEquals("name match", "one", remainingNetworkConnector.getName());
try {
brokerService.getManagementContext().getObjectInstance(
brokerService.createNetworkConnectorObjectName(two));
fail("mbean for nc2 should not exist");
} catch (InstanceNotFoundException e) {
//should throw exception
}
assertNotNull(brokerService.getManagementContext().getObjectInstance(
brokerService.createNetworkConnectorObjectName(remainingNetworkConnector)));
} }
} }

View File

@ -17,14 +17,19 @@
package org.apache.activemq.java; package org.apache.activemq.java;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertSame; import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.net.URI; import java.net.URI;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashSet; import java.util.HashSet;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import javax.management.InstanceNotFoundException;
import org.apache.activemq.RuntimeConfigTestSupport; import org.apache.activemq.RuntimeConfigTestSupport;
import org.apache.activemq.broker.BrokerPlugin; import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
@ -61,7 +66,6 @@ public class JavaNetworkConnectorTest extends RuntimeConfigTestSupport {
startBroker(brokerService); startBroker(brokerService);
assertTrue("broker alive", brokerService.isStarted()); assertTrue("broker alive", brokerService.isStarted());
assertEquals("no network connectors", 0, brokerService.getNetworkConnectors().size()); assertEquals("no network connectors", 0, brokerService.getNetworkConnectors().size());
DiscoveryNetworkConnector nc = createNetworkConnector(); DiscoveryNetworkConnector nc = createNetworkConnector();
javaConfigBroker.addNetworkConnector(nc); javaConfigBroker.addNetworkConnector(nc);
@ -85,6 +89,10 @@ public class JavaNetworkConnectorTest extends RuntimeConfigTestSupport {
assertEquals("one statically included", 1, networkConnector.getStaticallyIncludedDestinations().size()); assertEquals("one statically included", 1, networkConnector.getStaticallyIncludedDestinations().size());
assertEquals("one dynamically included", 1, networkConnector.getDynamicallyIncludedDestinations().size()); assertEquals("one dynamically included", 1, networkConnector.getDynamicallyIncludedDestinations().size());
assertEquals("one durable", 1, networkConnector.getDurableDestinations().size()); assertEquals("one durable", 1, networkConnector.getDurableDestinations().size());
assertFalse(networkConnector.getBrokerName().isEmpty());
assertNotNull(brokerService.getManagementContext().getObjectInstance(
brokerService.createNetworkConnectorObjectName(networkConnector)));
} }
@ -105,6 +113,8 @@ public class JavaNetworkConnectorTest extends RuntimeConfigTestSupport {
// track the original // track the original
NetworkConnector networkConnector = brokerService.getNetworkConnectors().get(0); NetworkConnector networkConnector = brokerService.getNetworkConnectors().get(0);
assertEquals("network ttl is default", 1, networkConnector.getNetworkTTL()); assertEquals("network ttl is default", 1, networkConnector.getNetworkTTL());
assertNotNull(networkConnector.getBrokerName());
assertNotNull(networkConnector.getBrokerURL());
nc.setNetworkTTL(2); nc.setNetworkTTL(2);
javaConfigBroker.updateNetworkConnector(nc); javaConfigBroker.updateNetworkConnector(nc);
@ -118,6 +128,10 @@ public class JavaNetworkConnectorTest extends RuntimeConfigTestSupport {
javaConfigBroker.updateNetworkConnector(nc); javaConfigBroker.updateNetworkConnector(nc);
assertEquals("no new network connectors", 1, brokerService.getNetworkConnectors().size()); assertEquals("no new network connectors", 1, brokerService.getNetworkConnectors().size());
assertSame("same instance", modNetworkConnector, brokerService.getNetworkConnectors().get(0)); assertSame("same instance", modNetworkConnector, brokerService.getNetworkConnectors().get(0));
assertFalse(modNetworkConnector.getBrokerName().isEmpty());
assertNotNull(brokerService.getManagementContext().getObjectInstance(
brokerService.createNetworkConnectorObjectName(modNetworkConnector)));
} }
@Test @Test
@ -135,7 +149,7 @@ public class JavaNetworkConnectorTest extends RuntimeConfigTestSupport {
DiscoveryNetworkConnector nc2 = new DiscoveryNetworkConnector(); DiscoveryNetworkConnector nc2 = new DiscoveryNetworkConnector();
nc2.setUri(new URI("static:(tcp://localhost:5555)")); nc2.setUri(new URI("static:(tcp://localhost:5555)"));
nc2.setNetworkTTL(1); nc2.setNetworkTTL(1);
nc2.setName("one"); nc2.setName("two");
javaConfigBroker.addNetworkConnector(nc1); javaConfigBroker.addNetworkConnector(nc1);
javaConfigBroker.addNetworkConnector(nc2); javaConfigBroker.addNetworkConnector(nc2);
@ -154,6 +168,18 @@ public class JavaNetworkConnectorTest extends RuntimeConfigTestSupport {
NetworkConnector remainingNetworkConnector = brokerService.getNetworkConnectors().get(0); NetworkConnector remainingNetworkConnector = brokerService.getNetworkConnectors().get(0);
assertEquals("name match", "one", remainingNetworkConnector.getName()); assertEquals("name match", "one", remainingNetworkConnector.getName());
try {
brokerService.getManagementContext().getObjectInstance(
brokerService.createNetworkConnectorObjectName(nc2));
fail("mbean for nc2 should not exist");
} catch (InstanceNotFoundException e) {
//should throw exception
}
assertNotNull(brokerService.getManagementContext().getObjectInstance(
brokerService.createNetworkConnectorObjectName(nc1)));
} }
private DiscoveryNetworkConnector createNetworkConnector() throws Exception { private DiscoveryNetworkConnector createNetworkConnector() throws Exception {

View File

@ -174,6 +174,9 @@ public class SimpleNetworkTest {
assertNull(consumer2.receive(1000)); assertNull(consumer2.receive(1000));
assertNetworkBridgeStatistics(MESSAGE_COUNT, 0); assertNetworkBridgeStatistics(MESSAGE_COUNT, 0);
assertNotNull(localBroker.getManagementContext().getObjectInstance(
localBroker.createNetworkConnectorObjectName(localBroker.getNetworkConnectors().get(0))));
} }
private void waitForConsumerRegistration(final BrokerService brokerService, final int min, final ActiveMQDestination destination) throws Exception { private void waitForConsumerRegistration(final BrokerService brokerService, final int min, final ActiveMQDestination destination) throws Exception {