Network of brokers on duplex mode reports InstanceAlreadyExistsException on already existing destinations

This commit is contained in:
Altaflux 2016-01-04 15:00:00 -06:00 committed by gtully
parent 8f6baf8188
commit 6b1e87410d
2 changed files with 161 additions and 81 deletions

View File

@ -5,9 +5,9 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -16,12 +16,6 @@
*/
package org.apache.activemq.network;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.management.ObjectName;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.AnnotatedMBean;
import org.apache.activemq.broker.jmx.BrokerMBeanSupport;
@ -33,6 +27,11 @@ import org.apache.activemq.thread.Scheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.management.ObjectName;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class MBeanBridgeDestination {
private static final Logger LOG = LoggerFactory.getLogger(MBeanBridgeDestination.class);
private final BrokerService brokerService;
@ -41,9 +40,8 @@ public class MBeanBridgeDestination {
private final NetworkBridgeConfiguration networkBridgeConfiguration;
private final Scheduler scheduler;
private final Runnable purgeInactiveDestinationViewTask;
private Map<ActiveMQDestination, ObjectName> destinationObjectNameMap = new ConcurrentHashMap<ActiveMQDestination, ObjectName>();
private Map<ActiveMQDestination, NetworkDestinationView> outboundDestinationViewMap = new ConcurrentHashMap<ActiveMQDestination, NetworkDestinationView>();
private Map<ActiveMQDestination, NetworkDestinationView> inboundDestinationViewMap = new ConcurrentHashMap<ActiveMQDestination, NetworkDestinationView>();
private final Map<ActiveMQDestination, NetworkDestinationContainer> outboundDestinationViewMap = new ConcurrentHashMap<>();
private final Map<ActiveMQDestination, NetworkDestinationContainer> inboundDestinationViewMap = new ConcurrentHashMap<>();
public MBeanBridgeDestination(BrokerService brokerService, NetworkBridgeConfiguration networkBridgeConfiguration, NetworkBridge bridge, NetworkBridgeView networkBridgeView) {
this.brokerService = brokerService;
@ -61,49 +59,48 @@ public class MBeanBridgeDestination {
public void onOutboundMessage(Message message) {
ActiveMQDestination destination = message.getDestination();
NetworkDestinationView networkDestinationView = outboundDestinationViewMap.get(destination);
if (networkDestinationView == null) {
synchronized (destinationObjectNameMap) {
if ((networkDestinationView = outboundDestinationViewMap.get(destination)) == null) {
ObjectName bridgeObjectName = bridge.getMbeanObjectName();
try {
ObjectName objectName = BrokerMBeanSupport.createNetworkOutBoundDestinationObjectName(bridgeObjectName, destination);
networkDestinationView = new NetworkDestinationView(networkBridgeView, destination.getPhysicalName());
AnnotatedMBean.registerMBean(brokerService.getManagementContext(), networkDestinationView, objectName);
destinationObjectNameMap.put(destination, objectName);
outboundDestinationViewMap.put(destination, networkDestinationView);
NetworkDestinationContainer networkDestinationContainer;
} catch (Exception e) {
LOG.warn("Failed to register " + destination, e);
}
}
if ((networkDestinationContainer = outboundDestinationViewMap.get(destination)) == null) {
ObjectName bridgeObjectName = bridge.getMbeanObjectName();
try {
ObjectName objectName = BrokerMBeanSupport.createNetworkOutBoundDestinationObjectName(bridgeObjectName, destination);
NetworkDestinationView networkDestinationView = new NetworkDestinationView(networkBridgeView, destination.getPhysicalName());
AnnotatedMBean.registerMBean(brokerService.getManagementContext(), networkDestinationView, objectName);
networkDestinationContainer = new NetworkDestinationContainer(networkDestinationView, objectName);
outboundDestinationViewMap.put(destination, networkDestinationContainer);
networkDestinationView.messageSent();
} catch (Exception e) {
LOG.warn("Failed to register " + destination, e);
}
} else {
networkDestinationContainer.view.messageSent();
}
networkDestinationView.messageSent();
}
public void onInboundMessage(Message message) {
ActiveMQDestination destination = message.getDestination();
NetworkDestinationView networkDestinationView = inboundDestinationViewMap.get(destination);
if (networkDestinationView == null) {
synchronized (destinationObjectNameMap) {
if ((networkDestinationView = inboundDestinationViewMap.get(destination)) == null) {
ObjectName bridgeObjectName = bridge.getMbeanObjectName();
try {
ObjectName objectName = BrokerMBeanSupport.createNetworkInBoundDestinationObjectName(bridgeObjectName, destination);
networkDestinationView = new NetworkDestinationView(networkBridgeView, destination.getPhysicalName());
networkBridgeView.addNetworkDestinationView(networkDestinationView);
AnnotatedMBean.registerMBean(brokerService.getManagementContext(), networkDestinationView, objectName);
destinationObjectNameMap.put(destination, objectName);
inboundDestinationViewMap.put(destination, networkDestinationView);
} catch (Exception e) {
LOG.warn("Failed to register " + destination, e);
}
}
NetworkDestinationContainer networkDestinationContainer;
if ((networkDestinationContainer = inboundDestinationViewMap.get(destination)) == null) {
ObjectName bridgeObjectName = bridge.getMbeanObjectName();
try {
ObjectName objectName = BrokerMBeanSupport.createNetworkInBoundDestinationObjectName(bridgeObjectName, destination);
NetworkDestinationView networkDestinationView = new NetworkDestinationView(networkBridgeView, destination.getPhysicalName());
AnnotatedMBean.registerMBean(brokerService.getManagementContext(), networkDestinationView, objectName);
networkBridgeView.addNetworkDestinationView(networkDestinationView);
networkDestinationContainer = new NetworkDestinationContainer(networkDestinationView, objectName);
inboundDestinationViewMap.put(destination, networkDestinationContainer);
networkDestinationView.messageSent();
} catch (Exception e) {
LOG.warn("Failed to register " + destination, e);
}
} else {
networkDestinationContainer.view.messageSent();
}
networkDestinationView.messageSent();
}
public void start() {
@ -121,18 +118,22 @@ public class MBeanBridgeDestination {
}
scheduler.cancel(purgeInactiveDestinationViewTask);
for (ObjectName objectName : destinationObjectNameMap.values()) {
for (NetworkDestinationContainer networkDestinationContainer : inboundDestinationViewMap.values()) {
try {
if (objectName != null) {
brokerService.getManagementContext().unregisterMBean(objectName);
}
} catch (Throwable e) {
brokerService.getManagementContext().unregisterMBean(networkDestinationContainer.objectName);
} catch (Exception e) {
LOG.error("Network bridge could not be unregistered in JMX: {}", e.getMessage(), e);
}
}
for (NetworkDestinationContainer networkDestinationContainer : outboundDestinationViewMap.values()) {
try {
brokerService.getManagementContext().unregisterMBean(networkDestinationContainer.objectName);
} catch (Exception e) {
LOG.debug("Network bridge could not be unregistered in JMX: {}", e.getMessage(), e);
}
}
destinationObjectNameMap.clear();
outboundDestinationViewMap.clear();
inboundDestinationViewMap.clear();
outboundDestinationViewMap.clear();
}
private void purgeInactiveDestinationViews() {
@ -143,25 +144,32 @@ public class MBeanBridgeDestination {
purgeInactiveDestinationView(outboundDestinationViewMap);
}
private void purgeInactiveDestinationView(Map<ActiveMQDestination, NetworkDestinationView> map) {
private void purgeInactiveDestinationView(Map<ActiveMQDestination, NetworkDestinationContainer> map) {
long time = System.currentTimeMillis() - networkBridgeConfiguration.getGcSweepTime();
for (Map.Entry<ActiveMQDestination, NetworkDestinationView> entry : map.entrySet()) {
if (entry.getValue().getLastAccessTime() <= time) {
synchronized (destinationObjectNameMap) {
map.remove(entry.getKey());
ObjectName objectName = destinationObjectNameMap.remove(entry.getKey());
if (objectName != null) {
try {
if (objectName != null) {
brokerService.getManagementContext().unregisterMBean(objectName);
}
} catch (Throwable e) {
LOG.debug("Network bridge could not be unregistered in JMX: {}", e.getMessage(), e);
}
for (Iterator<Map.Entry<ActiveMQDestination, NetworkDestinationContainer>> it = map.entrySet().iterator(); it.hasNext(); ) {
Map.Entry<ActiveMQDestination, NetworkDestinationContainer> entry = it.next();
if (entry.getValue().view.getLastAccessTime() <= time) {
ObjectName objectName = entry.getValue().objectName;
if (objectName != null) {
try {
brokerService.getManagementContext().unregisterMBean(entry.getValue().objectName);
} catch (Throwable e) {
LOG.debug("Network bridge could not be unregistered in JMX: {}", e.getMessage(), e);
}
entry.getValue().close();
}
entry.getValue().view.close();
it.remove();
}
}
}
private static class NetworkDestinationContainer {
private final NetworkDestinationView view;
private final ObjectName objectName;
private NetworkDestinationContainer(NetworkDestinationView view, ObjectName objectName) {
this.view = view;
this.objectName = objectName;
}
}
}

View File

@ -5,9 +5,9 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -16,17 +16,7 @@
*/
package org.apache.activemq.network;
import static org.junit.Assert.assertEquals;
import static org.junit.Assume.assumeNotNull;
import java.net.MalformedURLException;
import java.util.List;
import java.util.Set;
import javax.management.MBeanServer;
import javax.management.ObjectInstance;
import javax.management.ObjectName;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.ManagementContext;
import org.apache.activemq.util.TestUtils;
@ -35,6 +25,20 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.Connection;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.management.MBeanServer;
import javax.management.ObjectInstance;
import javax.management.ObjectName;
import java.net.MalformedURLException;
import java.util.List;
import java.util.Set;
import static org.junit.Assert.assertEquals;
import static org.junit.Assume.assumeNotNull;
public class DuplexNetworkMBeanTest {
protected static final Logger LOG = LoggerFactory.getLogger(DuplexNetworkMBeanTest.class);
@ -137,6 +141,74 @@ public class DuplexNetworkMBeanTest {
}
}
@Test
public void testMBeansNotOverwrittenOnCleanup() throws Exception {
BrokerService broker = createBroker();
BrokerService networkedBroker = createNetworkedBroker();
MessageProducer producerBroker = null;
MessageConsumer consumerBroker = null;
Session sessionNetworkBroker = null;
Session sessionBroker = null;
MessageProducer producerNetworkBroker = null;
MessageConsumer consumerNetworkBroker = null;
try {
broker.start();
broker.waitUntilStarted();
networkedBroker.start();
try {
assertEquals(2, countMbeans(networkedBroker, "connector=networkConnectors", 10000));
assertEquals(1, countMbeans(broker, "connector=duplexNetworkConnectors", 10000));
Connection brokerConnection = new ActiveMQConnectionFactory(broker.getVmConnectorURI()).createConnection();
brokerConnection.start();
sessionBroker = brokerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
producerBroker = sessionBroker.createProducer(sessionBroker.createTopic("testTopic"));
consumerBroker = sessionBroker.createConsumer(sessionBroker.createTopic("testTopic"));
Connection netWorkBrokerConnection = new ActiveMQConnectionFactory(networkedBroker.getVmConnectorURI()).createConnection();
netWorkBrokerConnection.start();
sessionNetworkBroker = netWorkBrokerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
producerNetworkBroker = sessionNetworkBroker.createProducer(sessionBroker.createTopic("testTopic"));
consumerNetworkBroker = sessionNetworkBroker.createConsumer(sessionBroker.createTopic("testTopic"));
assertEquals(4, countMbeans(broker, "destinationType=Topic,destinationName=testTopic", 15000));
assertEquals(4, countMbeans(networkedBroker, "destinationType=Topic,destinationName=testTopic", 15000));
producerBroker.send(sessionBroker.createTextMessage("test1"));
producerNetworkBroker.send(sessionNetworkBroker.createTextMessage("test2"));
assertEquals(2, countMbeans(networkedBroker, "destinationName=testTopic,direction=*", 10000));
assertEquals(2, countMbeans(broker, "destinationName=testTopic,direction=*", 10000));
} finally {
if (producerBroker != null) {
producerBroker.close();
}
if (consumerBroker != null) {
consumerBroker.close();
}
if (sessionBroker != null) {
sessionBroker.close();
}
if (sessionNetworkBroker != null) {
sessionNetworkBroker.close();
}
if (producerNetworkBroker != null) {
producerNetworkBroker.close();
}
if (consumerNetworkBroker != null) {
consumerNetworkBroker.close();
}
networkedBroker.stop();
networkedBroker.waitUntilStopped();
}
assertEquals(0, countMbeans(broker, "destinationName=testTopic,direction=*", 1500));
} finally {
broker.stop();
broker.waitUntilStopped();
}
}
private int countMbeans(BrokerService broker, String type) throws Exception {
return countMbeans(broker, type, 0);
}