https://issues.apache.org/jira/browse/AMQ-4852 - ensure clientId view connector mbean visible for duplex network connectors

This commit is contained in:
gtully 2013-11-07 11:34:56 +00:00
parent ec5b15cc27
commit 50ec158e25
3 changed files with 47 additions and 2 deletions

View File

@ -119,6 +119,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
private final AtomicBoolean started = new AtomicBoolean();
private TransportConnection duplexInitiatingConnection;
private final AtomicBoolean duplexInitiatingConnectionInfoReceived = new AtomicBoolean();
protected BrokerService brokerService = null;
private ObjectName mbeanObjectName;
private final ExecutorService serialExecutor = Executors.newSingleThreadExecutor();
@ -610,6 +611,13 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
} else {
switch (command.getDataStructureType()) {
case ConnectionInfo.DATA_STRUCTURE_TYPE:
if (duplexInitiatingConnection != null && duplexInitiatingConnectionInfoReceived.compareAndSet(false, true)) {
// end of initiating connection setup - propogate to initial connection to get mbean by clientid
duplexInitiatingConnection.processAddConnection((ConnectionInfo) command);
} else {
localBroker.oneway(command);
}
break;
case SessionInfo.DATA_STRUCTURE_TYPE:
localBroker.oneway(command);
break;

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.broker.jmx;
import static junit.framework.TestCase.assertTrue;
import static org.junit.Assert.assertEquals;
import java.net.Socket;
@ -26,7 +27,9 @@ import javax.management.ObjectName;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.util.JMXSupport;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Test;
import org.slf4j.Logger;
@ -47,6 +50,40 @@ public class TransportConnectorMBeanTest {
doVerifyRemoteAddressInMbeanName(false);
}
@Test
public void verifyClientIdNetwork() throws Exception {
doVerifyClientIdNetwork(false);
}
@Test
public void verifyClientIdDuplexNetwork() throws Exception {
doVerifyClientIdNetwork(true);
}
private void doVerifyClientIdNetwork(boolean duplex) throws Exception {
createBroker(true);
BrokerService networked = new BrokerService();
networked.setBrokerName("networked");
networked.setPersistent(false);
NetworkConnector nc = networked.addNetworkConnector("static:" + broker.getTransportConnectors().get(0).getPublishableConnectString());
nc.setDuplex(duplex);
networked.start();
try {
assertTrue("presence of mbean with clientId", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
Set<ObjectName> registeredMbeans = getRegisteredMbeans();
return match("_outbound", registeredMbeans);
}
}));
} finally {
networked.stop();
}
}
private void doVerifyRemoteAddressInMbeanName(boolean allowRemoteAddress) throws Exception {
createBroker(allowRemoteAddress);
ActiveMQConnection connection = createConnection();

View File

@ -66,7 +66,7 @@ public class DuplexNetworkMBeanTest {
networkedBroker.start();
assertEquals(1, countMbeans(networkedBroker, "networkBridge", 2000));
assertEquals(1, countMbeans(broker, "networkBridge", 2000));
assertEquals(1, countMbeans(broker, "connectionName"));
assertEquals(2, countMbeans(broker, "connectionName"));
} finally {
networkedBroker.stop();
networkedBroker.waitUntilStopped();
@ -100,7 +100,7 @@ public class DuplexNetworkMBeanTest {
try {
broker.start();
assertEquals(1, countMbeans(networkedBroker, "networkBridge", 5000));
assertEquals("restart number: " + i, 1, countMbeans(broker, "connectionName", 10000));
assertEquals("restart number: " + i, 2, countMbeans(broker, "connectionName", 10000));
} finally {
broker.stop();
broker.waitUntilStopped();