From 5c8939741ac3dfc5943911e5aa486d32e41bca5f Mon Sep 17 00:00:00 2001 From: Altaflux Date: Mon, 4 Jan 2016 15:00:00 -0600 Subject: [PATCH] Network of brokers on duplex mode reports InstanceAlreadyExistsException on already existing destinations (cherry picked from commit 6b1e87410da4a2033c286fcaa758371e48da62ec) (cherry picked from commit aa8b64420be5734a8b70736dab4d037bf84af927) --- .../network/MBeanBridgeDestination.java | 142 +++++++++--------- .../network/DuplexNetworkMBeanTest.java | 99 ++++++++++-- 2 files changed, 161 insertions(+), 80 deletions(-) diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/MBeanBridgeDestination.java b/activemq-broker/src/main/java/org/apache/activemq/network/MBeanBridgeDestination.java index bab5574e18..888d2952c1 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/network/MBeanBridgeDestination.java +++ b/activemq-broker/src/main/java/org/apache/activemq/network/MBeanBridgeDestination.java @@ -5,9 +5,9 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -16,12 +16,6 @@ */ package org.apache.activemq.network; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -import javax.management.ObjectName; - import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.jmx.AnnotatedMBean; import org.apache.activemq.broker.jmx.BrokerMBeanSupport; @@ -33,6 +27,11 @@ import org.apache.activemq.thread.Scheduler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.management.ObjectName; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + public class MBeanBridgeDestination { private static final Logger LOG = LoggerFactory.getLogger(MBeanBridgeDestination.class); private final BrokerService brokerService; @@ -41,9 +40,8 @@ public class MBeanBridgeDestination { private final NetworkBridgeConfiguration networkBridgeConfiguration; private final Scheduler scheduler; private final Runnable purgeInactiveDestinationViewTask; - private Map destinationObjectNameMap = new ConcurrentHashMap(); - private Map outboundDestinationViewMap = new ConcurrentHashMap(); - private Map inboundDestinationViewMap = new ConcurrentHashMap(); + private final Map outboundDestinationViewMap = new ConcurrentHashMap<>(); + private final Map inboundDestinationViewMap = new ConcurrentHashMap<>(); public MBeanBridgeDestination(BrokerService brokerService, NetworkBridgeConfiguration networkBridgeConfiguration, NetworkBridge bridge, NetworkBridgeView networkBridgeView) { this.brokerService = brokerService; @@ -61,49 +59,48 @@ public class MBeanBridgeDestination { public void onOutboundMessage(Message message) { ActiveMQDestination destination = message.getDestination(); - NetworkDestinationView networkDestinationView = outboundDestinationViewMap.get(destination); - if (networkDestinationView == null) { - synchronized (destinationObjectNameMap) { - if ((networkDestinationView = outboundDestinationViewMap.get(destination)) == null) { - ObjectName bridgeObjectName = bridge.getMbeanObjectName(); - try { - ObjectName objectName = BrokerMBeanSupport.createNetworkOutBoundDestinationObjectName(bridgeObjectName, destination); - networkDestinationView = new NetworkDestinationView(networkBridgeView, destination.getPhysicalName()); - AnnotatedMBean.registerMBean(brokerService.getManagementContext(), networkDestinationView, objectName); - destinationObjectNameMap.put(destination, objectName); - outboundDestinationViewMap.put(destination, networkDestinationView); + NetworkDestinationContainer networkDestinationContainer; - } catch (Exception e) { - LOG.warn("Failed to register " + destination, e); - } - } + if ((networkDestinationContainer = outboundDestinationViewMap.get(destination)) == null) { + ObjectName bridgeObjectName = bridge.getMbeanObjectName(); + try { + ObjectName objectName = BrokerMBeanSupport.createNetworkOutBoundDestinationObjectName(bridgeObjectName, destination); + NetworkDestinationView networkDestinationView = new NetworkDestinationView(networkBridgeView, destination.getPhysicalName()); + AnnotatedMBean.registerMBean(brokerService.getManagementContext(), networkDestinationView, objectName); + + networkDestinationContainer = new NetworkDestinationContainer(networkDestinationView, objectName); + outboundDestinationViewMap.put(destination, networkDestinationContainer); + networkDestinationView.messageSent(); + } catch (Exception e) { + LOG.warn("Failed to register " + destination, e); } + } else { + networkDestinationContainer.view.messageSent(); } - networkDestinationView.messageSent(); } public void onInboundMessage(Message message) { ActiveMQDestination destination = message.getDestination(); - NetworkDestinationView networkDestinationView = inboundDestinationViewMap.get(destination); - if (networkDestinationView == null) { - synchronized (destinationObjectNameMap) { - if ((networkDestinationView = inboundDestinationViewMap.get(destination)) == null) { - ObjectName bridgeObjectName = bridge.getMbeanObjectName(); - try { - ObjectName objectName = BrokerMBeanSupport.createNetworkInBoundDestinationObjectName(bridgeObjectName, destination); - networkDestinationView = new NetworkDestinationView(networkBridgeView, destination.getPhysicalName()); - networkBridgeView.addNetworkDestinationView(networkDestinationView); - AnnotatedMBean.registerMBean(brokerService.getManagementContext(), networkDestinationView, objectName); - destinationObjectNameMap.put(destination, objectName); - inboundDestinationViewMap.put(destination, networkDestinationView); - } catch (Exception e) { - LOG.warn("Failed to register " + destination, e); - } - } + NetworkDestinationContainer networkDestinationContainer; + + if ((networkDestinationContainer = inboundDestinationViewMap.get(destination)) == null) { + ObjectName bridgeObjectName = bridge.getMbeanObjectName(); + try { + ObjectName objectName = BrokerMBeanSupport.createNetworkInBoundDestinationObjectName(bridgeObjectName, destination); + NetworkDestinationView networkDestinationView = new NetworkDestinationView(networkBridgeView, destination.getPhysicalName()); + AnnotatedMBean.registerMBean(brokerService.getManagementContext(), networkDestinationView, objectName); + + networkBridgeView.addNetworkDestinationView(networkDestinationView); + networkDestinationContainer = new NetworkDestinationContainer(networkDestinationView, objectName); + inboundDestinationViewMap.put(destination, networkDestinationContainer); + networkDestinationView.messageSent(); + } catch (Exception e) { + LOG.warn("Failed to register " + destination, e); } + } else { + networkDestinationContainer.view.messageSent(); } - networkDestinationView.messageSent(); } public void start() { @@ -121,18 +118,22 @@ public class MBeanBridgeDestination { } scheduler.cancel(purgeInactiveDestinationViewTask); - for (ObjectName objectName : destinationObjectNameMap.values()) { + for (NetworkDestinationContainer networkDestinationContainer : inboundDestinationViewMap.values()) { try { - if (objectName != null) { - brokerService.getManagementContext().unregisterMBean(objectName); - } - } catch (Throwable e) { + brokerService.getManagementContext().unregisterMBean(networkDestinationContainer.objectName); + } catch (Exception e) { + LOG.error("Network bridge could not be unregistered in JMX: {}", e.getMessage(), e); + } + } + for (NetworkDestinationContainer networkDestinationContainer : outboundDestinationViewMap.values()) { + try { + brokerService.getManagementContext().unregisterMBean(networkDestinationContainer.objectName); + } catch (Exception e) { LOG.debug("Network bridge could not be unregistered in JMX: {}", e.getMessage(), e); } } - destinationObjectNameMap.clear(); - outboundDestinationViewMap.clear(); inboundDestinationViewMap.clear(); + outboundDestinationViewMap.clear(); } private void purgeInactiveDestinationViews() { @@ -143,25 +144,32 @@ public class MBeanBridgeDestination { purgeInactiveDestinationView(outboundDestinationViewMap); } - private void purgeInactiveDestinationView(Map map) { + private void purgeInactiveDestinationView(Map map) { long time = System.currentTimeMillis() - networkBridgeConfiguration.getGcSweepTime(); - for (Map.Entry entry : map.entrySet()) { - if (entry.getValue().getLastAccessTime() <= time) { - synchronized (destinationObjectNameMap) { - map.remove(entry.getKey()); - ObjectName objectName = destinationObjectNameMap.remove(entry.getKey()); - if (objectName != null) { - try { - if (objectName != null) { - brokerService.getManagementContext().unregisterMBean(objectName); - } - } catch (Throwable e) { - LOG.debug("Network bridge could not be unregistered in JMX: {}", e.getMessage(), e); - } + for (Iterator> it = map.entrySet().iterator(); it.hasNext(); ) { + Map.Entry entry = it.next(); + if (entry.getValue().view.getLastAccessTime() <= time) { + ObjectName objectName = entry.getValue().objectName; + if (objectName != null) { + try { + brokerService.getManagementContext().unregisterMBean(entry.getValue().objectName); + } catch (Throwable e) { + LOG.debug("Network bridge could not be unregistered in JMX: {}", e.getMessage(), e); } - entry.getValue().close(); } + entry.getValue().view.close(); + it.remove(); } } } + + private static class NetworkDestinationContainer { + private final NetworkDestinationView view; + private final ObjectName objectName; + + private NetworkDestinationContainer(NetworkDestinationView view, ObjectName objectName) { + this.view = view; + this.objectName = objectName; + } + } } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java index 13a94cbfb0..cd5fa16020 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java @@ -5,9 +5,9 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -16,16 +16,7 @@ */ package org.apache.activemq.network; -import static org.junit.Assert.assertEquals; -import static org.junit.Assume.assumeNotNull; - -import java.net.MalformedURLException; -import java.util.List; -import java.util.Set; - -import javax.management.ObjectInstance; -import javax.management.ObjectName; - +import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.util.TestUtils; import org.junit.Before; @@ -33,6 +24,20 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.jms.Connection; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.management.MBeanServer; +import javax.management.ObjectInstance; +import javax.management.ObjectName; +import java.net.MalformedURLException; +import java.util.List; +import java.util.Set; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assume.assumeNotNull; + public class DuplexNetworkMBeanTest { protected static final Logger LOG = LoggerFactory.getLogger(DuplexNetworkMBeanTest.class); @@ -134,6 +139,74 @@ public class DuplexNetworkMBeanTest { } } + @Test + public void testMBeansNotOverwrittenOnCleanup() throws Exception { + BrokerService broker = createBroker(); + + BrokerService networkedBroker = createNetworkedBroker(); + MessageProducer producerBroker = null; + MessageConsumer consumerBroker = null; + Session sessionNetworkBroker = null; + Session sessionBroker = null; + MessageProducer producerNetworkBroker = null; + MessageConsumer consumerNetworkBroker = null; + try { + broker.start(); + broker.waitUntilStarted(); + networkedBroker.start(); + try { + assertEquals(2, countMbeans(networkedBroker, "connector=networkConnectors", 10000)); + assertEquals(1, countMbeans(broker, "connector=duplexNetworkConnectors", 10000)); + + Connection brokerConnection = new ActiveMQConnectionFactory(broker.getVmConnectorURI()).createConnection(); + brokerConnection.start(); + + sessionBroker = brokerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + producerBroker = sessionBroker.createProducer(sessionBroker.createTopic("testTopic")); + consumerBroker = sessionBroker.createConsumer(sessionBroker.createTopic("testTopic")); + Connection netWorkBrokerConnection = new ActiveMQConnectionFactory(networkedBroker.getVmConnectorURI()).createConnection(); + netWorkBrokerConnection.start(); + sessionNetworkBroker = netWorkBrokerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + producerNetworkBroker = sessionNetworkBroker.createProducer(sessionBroker.createTopic("testTopic")); + consumerNetworkBroker = sessionNetworkBroker.createConsumer(sessionBroker.createTopic("testTopic")); + + assertEquals(4, countMbeans(broker, "destinationType=Topic,destinationName=testTopic", 15000)); + assertEquals(4, countMbeans(networkedBroker, "destinationType=Topic,destinationName=testTopic", 15000)); + + producerBroker.send(sessionBroker.createTextMessage("test1")); + producerNetworkBroker.send(sessionNetworkBroker.createTextMessage("test2")); + + assertEquals(2, countMbeans(networkedBroker, "destinationName=testTopic,direction=*", 10000)); + assertEquals(2, countMbeans(broker, "destinationName=testTopic,direction=*", 10000)); + } finally { + if (producerBroker != null) { + producerBroker.close(); + } + if (consumerBroker != null) { + consumerBroker.close(); + } + if (sessionBroker != null) { + sessionBroker.close(); + } + if (sessionNetworkBroker != null) { + sessionNetworkBroker.close(); + } + if (producerNetworkBroker != null) { + producerNetworkBroker.close(); + } + if (consumerNetworkBroker != null) { + consumerNetworkBroker.close(); + } + networkedBroker.stop(); + networkedBroker.waitUntilStopped(); + } + assertEquals(0, countMbeans(broker, "destinationName=testTopic,direction=*", 1500)); + } finally { + broker.stop(); + broker.waitUntilStopped(); + } + } + private int countMbeans(BrokerService broker, String type) throws Exception { return countMbeans(broker, type, 0); }