From 50ec158e25d1ee81e788e29e15366e3955c8295f Mon Sep 17 00:00:00 2001 From: gtully Date: Thu, 7 Nov 2013 11:34:56 +0000 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-4852 - ensure clientId view connector mbean visible for duplex network connectors --- .../DemandForwardingBridgeSupport.java | 8 ++++ .../jmx/TransportConnectorMBeanTest.java | 37 +++++++++++++++++++ .../network/DuplexNetworkMBeanTest.java | 4 +- 3 files changed, 47 insertions(+), 2 deletions(-) diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java index ea614728c0..af10a945d3 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java +++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java @@ -119,6 +119,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br private final AtomicBoolean started = new AtomicBoolean(); private TransportConnection duplexInitiatingConnection; + private final AtomicBoolean duplexInitiatingConnectionInfoReceived = new AtomicBoolean(); protected BrokerService brokerService = null; private ObjectName mbeanObjectName; private final ExecutorService serialExecutor = Executors.newSingleThreadExecutor(); @@ -610,6 +611,13 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br } else { switch (command.getDataStructureType()) { case ConnectionInfo.DATA_STRUCTURE_TYPE: + if (duplexInitiatingConnection != null && duplexInitiatingConnectionInfoReceived.compareAndSet(false, true)) { + // end of initiating connection setup - propogate to initial connection to get mbean by clientid + duplexInitiatingConnection.processAddConnection((ConnectionInfo) command); + } else { + localBroker.oneway(command); + } + break; case SessionInfo.DATA_STRUCTURE_TYPE: localBroker.oneway(command); break; diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/TransportConnectorMBeanTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/TransportConnectorMBeanTest.java index 310f112c77..6f55e3de38 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/TransportConnectorMBeanTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/TransportConnectorMBeanTest.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.broker.jmx; +import static junit.framework.TestCase.assertTrue; import static org.junit.Assert.assertEquals; import java.net.Socket; @@ -26,7 +27,9 @@ import javax.management.ObjectName; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.network.NetworkConnector; import org.apache.activemq.util.JMXSupport; +import org.apache.activemq.util.Wait; import org.junit.After; import org.junit.Test; import org.slf4j.Logger; @@ -47,6 +50,40 @@ public class TransportConnectorMBeanTest { doVerifyRemoteAddressInMbeanName(false); } + @Test + public void verifyClientIdNetwork() throws Exception { + doVerifyClientIdNetwork(false); + } + + @Test + public void verifyClientIdDuplexNetwork() throws Exception { + doVerifyClientIdNetwork(true); + } + + private void doVerifyClientIdNetwork(boolean duplex) throws Exception { + createBroker(true); + + BrokerService networked = new BrokerService(); + networked.setBrokerName("networked"); + networked.setPersistent(false); + NetworkConnector nc = networked.addNetworkConnector("static:" + broker.getTransportConnectors().get(0).getPublishableConnectString()); + nc.setDuplex(duplex); + networked.start(); + + try { + assertTrue("presence of mbean with clientId", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + Set registeredMbeans = getRegisteredMbeans(); + return match("_outbound", registeredMbeans); + } + })); + + } finally { + networked.stop(); + } + } + private void doVerifyRemoteAddressInMbeanName(boolean allowRemoteAddress) throws Exception { createBroker(allowRemoteAddress); ActiveMQConnection connection = createConnection(); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java index cf02bb25e2..213d4aeb63 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java @@ -66,7 +66,7 @@ public class DuplexNetworkMBeanTest { networkedBroker.start(); assertEquals(1, countMbeans(networkedBroker, "networkBridge", 2000)); assertEquals(1, countMbeans(broker, "networkBridge", 2000)); - assertEquals(1, countMbeans(broker, "connectionName")); + assertEquals(2, countMbeans(broker, "connectionName")); } finally { networkedBroker.stop(); networkedBroker.waitUntilStopped(); @@ -100,7 +100,7 @@ public class DuplexNetworkMBeanTest { try { broker.start(); assertEquals(1, countMbeans(networkedBroker, "networkBridge", 5000)); - assertEquals("restart number: " + i, 1, countMbeans(broker, "connectionName", 10000)); + assertEquals("restart number: " + i, 2, countMbeans(broker, "connectionName", 10000)); } finally { broker.stop(); broker.waitUntilStopped();