mirror of https://github.com/apache/activemq.git
AMQ-6610 - Network connector mbean not registered on reload
On network connector add or update after broker start (such as the runtime plugins) the mbean was not created. There was also a couple of other properties not set. Fixed the network connector start to be the same for normal broker start and runtime reload.
This commit is contained in:
parent
0cf64783d2
commit
bab0887ed6
|
@ -50,6 +50,7 @@ import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
import javax.annotation.PostConstruct;
|
import javax.annotation.PostConstruct;
|
||||||
import javax.annotation.PreDestroy;
|
import javax.annotation.PreDestroy;
|
||||||
|
import javax.management.InstanceNotFoundException;
|
||||||
import javax.management.MalformedObjectNameException;
|
import javax.management.MalformedObjectNameException;
|
||||||
import javax.management.ObjectName;
|
import javax.management.ObjectName;
|
||||||
|
|
||||||
|
@ -2270,7 +2271,7 @@ public class BrokerService implements Service {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected ObjectName createNetworkConnectorObjectName(NetworkConnector connector) throws MalformedObjectNameException {
|
public ObjectName createNetworkConnectorObjectName(NetworkConnector connector) throws MalformedObjectNameException {
|
||||||
return BrokerMBeanSupport.createNetworkConnectorName(getBrokerObjectName(), "networkConnectors", connector.getName());
|
return BrokerMBeanSupport.createNetworkConnectorName(getBrokerObjectName(), "networkConnectors", connector.getName());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2603,7 +2604,6 @@ public class BrokerService implements Service {
|
||||||
* @throws Exception
|
* @throws Exception
|
||||||
*/
|
*/
|
||||||
public void startAllConnectors() throws Exception {
|
public void startAllConnectors() throws Exception {
|
||||||
Set<ActiveMQDestination> durableDestinations = getBroker().getDurableDestinations();
|
|
||||||
List<TransportConnector> al = new ArrayList<>();
|
List<TransportConnector> al = new ArrayList<>();
|
||||||
for (Iterator<TransportConnector> iter = getTransportConnectors().iterator(); iter.hasNext();) {
|
for (Iterator<TransportConnector> iter = getTransportConnectors().iterator(); iter.hasNext();) {
|
||||||
TransportConnector connector = iter.next();
|
TransportConnector connector = iter.next();
|
||||||
|
@ -2642,26 +2642,7 @@ public class BrokerService implements Service {
|
||||||
for (Iterator<NetworkConnector> iter = getNetworkConnectors().iterator(); iter.hasNext();) {
|
for (Iterator<NetworkConnector> iter = getNetworkConnectors().iterator(); iter.hasNext();) {
|
||||||
final NetworkConnector connector = iter.next();
|
final NetworkConnector connector = iter.next();
|
||||||
connector.setLocalUri(uri);
|
connector.setLocalUri(uri);
|
||||||
connector.setBrokerName(getBrokerName());
|
startNetworkConnector(connector, networkConnectorStartExecutor);
|
||||||
connector.setDurableDestinations(durableDestinations);
|
|
||||||
if (getDefaultSocketURIString() != null) {
|
|
||||||
connector.setBrokerURL(getDefaultSocketURIString());
|
|
||||||
}
|
|
||||||
if (networkConnectorStartExecutor != null) {
|
|
||||||
networkConnectorStartExecutor.execute(new Runnable() {
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
try {
|
|
||||||
LOG.info("Async start of {}", connector);
|
|
||||||
connector.start();
|
|
||||||
} catch(Exception e) {
|
|
||||||
LOG.error("Async start of network connector: {} failed", connector, e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
} else {
|
|
||||||
connector.start();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
if (networkConnectorStartExecutor != null) {
|
if (networkConnectorStartExecutor != null) {
|
||||||
// executor done when enqueued tasks are complete
|
// executor done when enqueued tasks are complete
|
||||||
|
@ -2683,6 +2664,45 @@ public class BrokerService implements Service {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void startNetworkConnector(final NetworkConnector connector,
|
||||||
|
final ThreadPoolExecutor networkConnectorStartExecutor) throws Exception {
|
||||||
|
connector.setBrokerName(getBrokerName());
|
||||||
|
//set the durable destinations to match the broker if not set on the connector
|
||||||
|
if (connector.getDurableDestinations() == null) {
|
||||||
|
connector.setDurableDestinations(getBroker().getDurableDestinations());
|
||||||
|
}
|
||||||
|
String defaultSocketURI = getDefaultSocketURIString();
|
||||||
|
if (defaultSocketURI != null) {
|
||||||
|
connector.setBrokerURL(defaultSocketURI);
|
||||||
|
}
|
||||||
|
//If using the runtime plugin to start a network connector then the mbean needs
|
||||||
|
//to be added, under normal start it will already exist so check for InstanceNotFoundException
|
||||||
|
if (isUseJmx()) {
|
||||||
|
ObjectName networkMbean = createNetworkConnectorObjectName(connector);
|
||||||
|
try {
|
||||||
|
getManagementContext().getObjectInstance(networkMbean);
|
||||||
|
} catch (InstanceNotFoundException e) {
|
||||||
|
LOG.debug("Network connector MBean {} not found, registering", networkMbean);
|
||||||
|
registerNetworkConnectorMBean(connector);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (networkConnectorStartExecutor != null) {
|
||||||
|
networkConnectorStartExecutor.execute(new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
LOG.info("Async start of {}", connector);
|
||||||
|
connector.start();
|
||||||
|
} catch(Exception e) {
|
||||||
|
LOG.error("Async start of network connector: {} failed", connector, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
connector.start();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public TransportConnector startTransportConnector(TransportConnector connector) throws Exception {
|
public TransportConnector startTransportConnector(TransportConnector connector) throws Exception {
|
||||||
connector.setBrokerService(this);
|
connector.setBrokerService(this);
|
||||||
connector.setTaskRunnerFactory(getTaskRunnerFactory());
|
connector.setTaskRunnerFactory(getTaskRunnerFactory());
|
||||||
|
|
|
@ -16,13 +16,13 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.plugin;
|
package org.apache.activemq.plugin;
|
||||||
|
|
||||||
|
import java.util.TreeMap;
|
||||||
|
|
||||||
import org.apache.activemq.network.DiscoveryNetworkConnector;
|
import org.apache.activemq.network.DiscoveryNetworkConnector;
|
||||||
import org.apache.activemq.network.NetworkConnector;
|
import org.apache.activemq.network.NetworkConnector;
|
||||||
import org.apache.activemq.schema.core.DtoNetworkConnector;
|
import org.apache.activemq.schema.core.DtoNetworkConnector;
|
||||||
import org.apache.activemq.util.IntrospectionSupport;
|
import org.apache.activemq.util.IntrospectionSupport;
|
||||||
|
|
||||||
import java.util.TreeMap;
|
|
||||||
|
|
||||||
public class NetworkConnectorProcessor extends DefaultConfigurationProcessor {
|
public class NetworkConnectorProcessor extends DefaultConfigurationProcessor {
|
||||||
|
|
||||||
public NetworkConnectorProcessor(RuntimeConfigurationBroker plugin, Class configurationClass) {
|
public NetworkConnectorProcessor(RuntimeConfigurationBroker plugin, Class configurationClass) {
|
||||||
|
@ -36,7 +36,7 @@ public class NetworkConnectorProcessor extends DefaultConfigurationProcessor {
|
||||||
try {
|
try {
|
||||||
DiscoveryNetworkConnector nc = fromDto(networkConnector, new DiscoveryNetworkConnector());
|
DiscoveryNetworkConnector nc = fromDto(networkConnector, new DiscoveryNetworkConnector());
|
||||||
plugin.getBrokerService().addNetworkConnector(nc);
|
plugin.getBrokerService().addNetworkConnector(nc);
|
||||||
nc.start();
|
plugin.getBrokerService().startNetworkConnector(nc, null);
|
||||||
plugin.info("started new network connector: " + nc);
|
plugin.info("started new network connector: " + nc);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
plugin.info("Failed to add new networkConnector " + networkConnector, e);
|
plugin.info("Failed to add new networkConnector " + networkConnector, e);
|
||||||
|
|
|
@ -104,7 +104,7 @@ public class JavaRuntimeConfigurationBroker extends AbstractRuntimeConfiguration
|
||||||
try {
|
try {
|
||||||
if (!getBrokerService().getNetworkConnectors().contains(nc)) {
|
if (!getBrokerService().getNetworkConnectors().contains(nc)) {
|
||||||
getBrokerService().addNetworkConnector(nc);
|
getBrokerService().addNetworkConnector(nc);
|
||||||
nc.start();
|
getBrokerService().startNetworkConnector(nc, null);
|
||||||
info("started new network connector: " + nc);
|
info("started new network connector: " + nc);
|
||||||
} else {
|
} else {
|
||||||
info("skipping network connector add, already exists: " + nc);
|
info("skipping network connector add, already exists: " + nc);
|
||||||
|
|
|
@ -17,8 +17,13 @@
|
||||||
package org.apache.activemq;
|
package org.apache.activemq;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
import static org.junit.Assert.assertSame;
|
import static org.junit.Assert.assertSame;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
|
import javax.management.InstanceNotFoundException;
|
||||||
|
|
||||||
import org.apache.activemq.network.NetworkConnector;
|
import org.apache.activemq.network.NetworkConnector;
|
||||||
import org.apache.activemq.util.Wait;
|
import org.apache.activemq.util.Wait;
|
||||||
|
@ -57,6 +62,10 @@ public class NetworkConnectorTest extends RuntimeConfigTestSupport {
|
||||||
assertEquals("one statically included", 1, networkConnector.getStaticallyIncludedDestinations().size());
|
assertEquals("one statically included", 1, networkConnector.getStaticallyIncludedDestinations().size());
|
||||||
assertEquals("one dynamically included", 1, networkConnector.getDynamicallyIncludedDestinations().size());
|
assertEquals("one dynamically included", 1, networkConnector.getDynamicallyIncludedDestinations().size());
|
||||||
assertEquals("one durable", 1, networkConnector.getDurableDestinations().size());
|
assertEquals("one durable", 1, networkConnector.getDurableDestinations().size());
|
||||||
|
assertFalse(networkConnector.getBrokerName().isEmpty());
|
||||||
|
|
||||||
|
assertNotNull(brokerService.getManagementContext().getObjectInstance(
|
||||||
|
brokerService.createNetworkConnectorObjectName(networkConnector)));
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -84,6 +93,10 @@ public class NetworkConnectorTest extends RuntimeConfigTestSupport {
|
||||||
applyNewConfig(brokerConfig, configurationSeed + "-mod-one-nc", SLEEP);
|
applyNewConfig(brokerConfig, configurationSeed + "-mod-one-nc", SLEEP);
|
||||||
assertEquals("no new network connectors", 1, brokerService.getNetworkConnectors().size());
|
assertEquals("no new network connectors", 1, brokerService.getNetworkConnectors().size());
|
||||||
assertSame("same instance", modNetworkConnector, brokerService.getNetworkConnectors().get(0));
|
assertSame("same instance", modNetworkConnector, brokerService.getNetworkConnectors().get(0));
|
||||||
|
assertFalse(modNetworkConnector.getBrokerName().isEmpty());
|
||||||
|
|
||||||
|
assertNotNull(brokerService.getManagementContext().getObjectInstance(
|
||||||
|
brokerService.createNetworkConnectorObjectName(modNetworkConnector)));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -95,6 +108,8 @@ public class NetworkConnectorTest extends RuntimeConfigTestSupport {
|
||||||
assertTrue("broker alive", brokerService.isStarted());
|
assertTrue("broker alive", brokerService.isStarted());
|
||||||
assertEquals("correct network connectors", 2, brokerService.getNetworkConnectors().size());
|
assertEquals("correct network connectors", 2, brokerService.getNetworkConnectors().size());
|
||||||
|
|
||||||
|
NetworkConnector two = brokerService.getNetworkConnectors().get(1);
|
||||||
|
|
||||||
applyNewConfig(brokerConfig, configurationSeed + "-one-nc", SLEEP);
|
applyNewConfig(brokerConfig, configurationSeed + "-one-nc", SLEEP);
|
||||||
|
|
||||||
assertTrue("expected mod on time", Wait.waitFor(new Wait.Condition() {
|
assertTrue("expected mod on time", Wait.waitFor(new Wait.Condition() {
|
||||||
|
@ -106,5 +121,16 @@ public class NetworkConnectorTest extends RuntimeConfigTestSupport {
|
||||||
|
|
||||||
NetworkConnector remainingNetworkConnector = brokerService.getNetworkConnectors().get(0);
|
NetworkConnector remainingNetworkConnector = brokerService.getNetworkConnectors().get(0);
|
||||||
assertEquals("name match", "one", remainingNetworkConnector.getName());
|
assertEquals("name match", "one", remainingNetworkConnector.getName());
|
||||||
|
|
||||||
|
try {
|
||||||
|
brokerService.getManagementContext().getObjectInstance(
|
||||||
|
brokerService.createNetworkConnectorObjectName(two));
|
||||||
|
fail("mbean for nc2 should not exist");
|
||||||
|
} catch (InstanceNotFoundException e) {
|
||||||
|
//should throw exception
|
||||||
|
}
|
||||||
|
|
||||||
|
assertNotNull(brokerService.getManagementContext().getObjectInstance(
|
||||||
|
brokerService.createNetworkConnectorObjectName(remainingNetworkConnector)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,14 +17,19 @@
|
||||||
package org.apache.activemq.java;
|
package org.apache.activemq.java;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
import static org.junit.Assert.assertSame;
|
import static org.junit.Assert.assertSame;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import javax.management.InstanceNotFoundException;
|
||||||
|
|
||||||
import org.apache.activemq.RuntimeConfigTestSupport;
|
import org.apache.activemq.RuntimeConfigTestSupport;
|
||||||
import org.apache.activemq.broker.BrokerPlugin;
|
import org.apache.activemq.broker.BrokerPlugin;
|
||||||
import org.apache.activemq.broker.BrokerService;
|
import org.apache.activemq.broker.BrokerService;
|
||||||
|
@ -61,7 +66,6 @@ public class JavaNetworkConnectorTest extends RuntimeConfigTestSupport {
|
||||||
startBroker(brokerService);
|
startBroker(brokerService);
|
||||||
assertTrue("broker alive", brokerService.isStarted());
|
assertTrue("broker alive", brokerService.isStarted());
|
||||||
assertEquals("no network connectors", 0, brokerService.getNetworkConnectors().size());
|
assertEquals("no network connectors", 0, brokerService.getNetworkConnectors().size());
|
||||||
|
|
||||||
DiscoveryNetworkConnector nc = createNetworkConnector();
|
DiscoveryNetworkConnector nc = createNetworkConnector();
|
||||||
|
|
||||||
javaConfigBroker.addNetworkConnector(nc);
|
javaConfigBroker.addNetworkConnector(nc);
|
||||||
|
@ -85,6 +89,10 @@ public class JavaNetworkConnectorTest extends RuntimeConfigTestSupport {
|
||||||
assertEquals("one statically included", 1, networkConnector.getStaticallyIncludedDestinations().size());
|
assertEquals("one statically included", 1, networkConnector.getStaticallyIncludedDestinations().size());
|
||||||
assertEquals("one dynamically included", 1, networkConnector.getDynamicallyIncludedDestinations().size());
|
assertEquals("one dynamically included", 1, networkConnector.getDynamicallyIncludedDestinations().size());
|
||||||
assertEquals("one durable", 1, networkConnector.getDurableDestinations().size());
|
assertEquals("one durable", 1, networkConnector.getDurableDestinations().size());
|
||||||
|
assertFalse(networkConnector.getBrokerName().isEmpty());
|
||||||
|
|
||||||
|
assertNotNull(brokerService.getManagementContext().getObjectInstance(
|
||||||
|
brokerService.createNetworkConnectorObjectName(networkConnector)));
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -105,6 +113,8 @@ public class JavaNetworkConnectorTest extends RuntimeConfigTestSupport {
|
||||||
// track the original
|
// track the original
|
||||||
NetworkConnector networkConnector = brokerService.getNetworkConnectors().get(0);
|
NetworkConnector networkConnector = brokerService.getNetworkConnectors().get(0);
|
||||||
assertEquals("network ttl is default", 1, networkConnector.getNetworkTTL());
|
assertEquals("network ttl is default", 1, networkConnector.getNetworkTTL());
|
||||||
|
assertNotNull(networkConnector.getBrokerName());
|
||||||
|
assertNotNull(networkConnector.getBrokerURL());
|
||||||
|
|
||||||
nc.setNetworkTTL(2);
|
nc.setNetworkTTL(2);
|
||||||
javaConfigBroker.updateNetworkConnector(nc);
|
javaConfigBroker.updateNetworkConnector(nc);
|
||||||
|
@ -118,6 +128,10 @@ public class JavaNetworkConnectorTest extends RuntimeConfigTestSupport {
|
||||||
javaConfigBroker.updateNetworkConnector(nc);
|
javaConfigBroker.updateNetworkConnector(nc);
|
||||||
assertEquals("no new network connectors", 1, brokerService.getNetworkConnectors().size());
|
assertEquals("no new network connectors", 1, brokerService.getNetworkConnectors().size());
|
||||||
assertSame("same instance", modNetworkConnector, brokerService.getNetworkConnectors().get(0));
|
assertSame("same instance", modNetworkConnector, brokerService.getNetworkConnectors().get(0));
|
||||||
|
assertFalse(modNetworkConnector.getBrokerName().isEmpty());
|
||||||
|
|
||||||
|
assertNotNull(brokerService.getManagementContext().getObjectInstance(
|
||||||
|
brokerService.createNetworkConnectorObjectName(modNetworkConnector)));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -135,7 +149,7 @@ public class JavaNetworkConnectorTest extends RuntimeConfigTestSupport {
|
||||||
DiscoveryNetworkConnector nc2 = new DiscoveryNetworkConnector();
|
DiscoveryNetworkConnector nc2 = new DiscoveryNetworkConnector();
|
||||||
nc2.setUri(new URI("static:(tcp://localhost:5555)"));
|
nc2.setUri(new URI("static:(tcp://localhost:5555)"));
|
||||||
nc2.setNetworkTTL(1);
|
nc2.setNetworkTTL(1);
|
||||||
nc2.setName("one");
|
nc2.setName("two");
|
||||||
|
|
||||||
javaConfigBroker.addNetworkConnector(nc1);
|
javaConfigBroker.addNetworkConnector(nc1);
|
||||||
javaConfigBroker.addNetworkConnector(nc2);
|
javaConfigBroker.addNetworkConnector(nc2);
|
||||||
|
@ -154,6 +168,18 @@ public class JavaNetworkConnectorTest extends RuntimeConfigTestSupport {
|
||||||
|
|
||||||
NetworkConnector remainingNetworkConnector = brokerService.getNetworkConnectors().get(0);
|
NetworkConnector remainingNetworkConnector = brokerService.getNetworkConnectors().get(0);
|
||||||
assertEquals("name match", "one", remainingNetworkConnector.getName());
|
assertEquals("name match", "one", remainingNetworkConnector.getName());
|
||||||
|
|
||||||
|
try {
|
||||||
|
brokerService.getManagementContext().getObjectInstance(
|
||||||
|
brokerService.createNetworkConnectorObjectName(nc2));
|
||||||
|
fail("mbean for nc2 should not exist");
|
||||||
|
} catch (InstanceNotFoundException e) {
|
||||||
|
//should throw exception
|
||||||
|
}
|
||||||
|
|
||||||
|
assertNotNull(brokerService.getManagementContext().getObjectInstance(
|
||||||
|
brokerService.createNetworkConnectorObjectName(nc1)));
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private DiscoveryNetworkConnector createNetworkConnector() throws Exception {
|
private DiscoveryNetworkConnector createNetworkConnector() throws Exception {
|
||||||
|
|
|
@ -174,6 +174,9 @@ public class SimpleNetworkTest {
|
||||||
assertNull(consumer2.receive(1000));
|
assertNull(consumer2.receive(1000));
|
||||||
|
|
||||||
assertNetworkBridgeStatistics(MESSAGE_COUNT, 0);
|
assertNetworkBridgeStatistics(MESSAGE_COUNT, 0);
|
||||||
|
|
||||||
|
assertNotNull(localBroker.getManagementContext().getObjectInstance(
|
||||||
|
localBroker.createNetworkConnectorObjectName(localBroker.getNetworkConnectors().get(0))));
|
||||||
}
|
}
|
||||||
|
|
||||||
private void waitForConsumerRegistration(final BrokerService brokerService, final int min, final ActiveMQDestination destination) throws Exception {
|
private void waitForConsumerRegistration(final BrokerService brokerService, final int min, final ActiveMQDestination destination) throws Exception {
|
||||||
|
|
Loading…
Reference in New Issue