mirror of https://github.com/apache/activemq.git
Network of brokers on duplex mode reports InstanceAlreadyExistsException on already existing destinations
(cherry picked from commit 6b1e87410d
)
This commit is contained in:
parent
543851ba54
commit
aa8b64420b
|
@ -5,9 +5,9 @@
|
|||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* <p/>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* <p/>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
|
@ -16,12 +16,6 @@
|
|||
*/
|
||||
package org.apache.activemq.network;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
import javax.management.ObjectName;
|
||||
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.broker.jmx.AnnotatedMBean;
|
||||
import org.apache.activemq.broker.jmx.BrokerMBeanSupport;
|
||||
|
@ -33,6 +27,11 @@ import org.apache.activemq.thread.Scheduler;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import javax.management.ObjectName;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
public class MBeanBridgeDestination {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(MBeanBridgeDestination.class);
|
||||
private final BrokerService brokerService;
|
||||
|
@ -41,9 +40,8 @@ public class MBeanBridgeDestination {
|
|||
private final NetworkBridgeConfiguration networkBridgeConfiguration;
|
||||
private final Scheduler scheduler;
|
||||
private final Runnable purgeInactiveDestinationViewTask;
|
||||
private Map<ActiveMQDestination, ObjectName> destinationObjectNameMap = new ConcurrentHashMap<ActiveMQDestination, ObjectName>();
|
||||
private Map<ActiveMQDestination, NetworkDestinationView> outboundDestinationViewMap = new ConcurrentHashMap<ActiveMQDestination, NetworkDestinationView>();
|
||||
private Map<ActiveMQDestination, NetworkDestinationView> inboundDestinationViewMap = new ConcurrentHashMap<ActiveMQDestination, NetworkDestinationView>();
|
||||
private final Map<ActiveMQDestination, NetworkDestinationContainer> outboundDestinationViewMap = new ConcurrentHashMap<>();
|
||||
private final Map<ActiveMQDestination, NetworkDestinationContainer> inboundDestinationViewMap = new ConcurrentHashMap<>();
|
||||
|
||||
public MBeanBridgeDestination(BrokerService brokerService, NetworkBridgeConfiguration networkBridgeConfiguration, NetworkBridge bridge, NetworkBridgeView networkBridgeView) {
|
||||
this.brokerService = brokerService;
|
||||
|
@ -61,50 +59,49 @@ public class MBeanBridgeDestination {
|
|||
|
||||
public void onOutboundMessage(Message message) {
|
||||
ActiveMQDestination destination = message.getDestination();
|
||||
NetworkDestinationView networkDestinationView = outboundDestinationViewMap.get(destination);
|
||||
if (networkDestinationView == null) {
|
||||
synchronized (destinationObjectNameMap) {
|
||||
if ((networkDestinationView = outboundDestinationViewMap.get(destination)) == null) {
|
||||
NetworkDestinationContainer networkDestinationContainer;
|
||||
|
||||
if ((networkDestinationContainer = outboundDestinationViewMap.get(destination)) == null) {
|
||||
ObjectName bridgeObjectName = bridge.getMbeanObjectName();
|
||||
try {
|
||||
ObjectName objectName = BrokerMBeanSupport.createNetworkOutBoundDestinationObjectName(bridgeObjectName, destination);
|
||||
networkDestinationView = new NetworkDestinationView(networkBridgeView, destination.getPhysicalName());
|
||||
NetworkDestinationView networkDestinationView = new NetworkDestinationView(networkBridgeView, destination.getPhysicalName());
|
||||
AnnotatedMBean.registerMBean(brokerService.getManagementContext(), networkDestinationView, objectName);
|
||||
destinationObjectNameMap.put(destination, objectName);
|
||||
outboundDestinationViewMap.put(destination, networkDestinationView);
|
||||
|
||||
networkDestinationContainer = new NetworkDestinationContainer(networkDestinationView, objectName);
|
||||
outboundDestinationViewMap.put(destination, networkDestinationContainer);
|
||||
networkDestinationView.messageSent();
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Failed to register " + destination, e);
|
||||
}
|
||||
} else {
|
||||
networkDestinationContainer.view.messageSent();
|
||||
}
|
||||
}
|
||||
}
|
||||
networkDestinationView.messageSent();
|
||||
}
|
||||
|
||||
|
||||
public void onInboundMessage(Message message) {
|
||||
ActiveMQDestination destination = message.getDestination();
|
||||
NetworkDestinationView networkDestinationView = inboundDestinationViewMap.get(destination);
|
||||
if (networkDestinationView == null) {
|
||||
synchronized (destinationObjectNameMap) {
|
||||
if ((networkDestinationView = inboundDestinationViewMap.get(destination)) == null) {
|
||||
NetworkDestinationContainer networkDestinationContainer;
|
||||
|
||||
if ((networkDestinationContainer = inboundDestinationViewMap.get(destination)) == null) {
|
||||
ObjectName bridgeObjectName = bridge.getMbeanObjectName();
|
||||
try {
|
||||
ObjectName objectName = BrokerMBeanSupport.createNetworkInBoundDestinationObjectName(bridgeObjectName, destination);
|
||||
networkDestinationView = new NetworkDestinationView(networkBridgeView, destination.getPhysicalName());
|
||||
networkBridgeView.addNetworkDestinationView(networkDestinationView);
|
||||
NetworkDestinationView networkDestinationView = new NetworkDestinationView(networkBridgeView, destination.getPhysicalName());
|
||||
AnnotatedMBean.registerMBean(brokerService.getManagementContext(), networkDestinationView, objectName);
|
||||
destinationObjectNameMap.put(destination, objectName);
|
||||
inboundDestinationViewMap.put(destination, networkDestinationView);
|
||||
|
||||
networkBridgeView.addNetworkDestinationView(networkDestinationView);
|
||||
networkDestinationContainer = new NetworkDestinationContainer(networkDestinationView, objectName);
|
||||
inboundDestinationViewMap.put(destination, networkDestinationContainer);
|
||||
networkDestinationView.messageSent();
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Failed to register " + destination, e);
|
||||
}
|
||||
} else {
|
||||
networkDestinationContainer.view.messageSent();
|
||||
}
|
||||
}
|
||||
}
|
||||
networkDestinationView.messageSent();
|
||||
}
|
||||
|
||||
public void start() {
|
||||
if (networkBridgeConfiguration.isGcDestinationViews()) {
|
||||
|
@ -121,18 +118,22 @@ public class MBeanBridgeDestination {
|
|||
}
|
||||
|
||||
scheduler.cancel(purgeInactiveDestinationViewTask);
|
||||
for (ObjectName objectName : destinationObjectNameMap.values()) {
|
||||
for (NetworkDestinationContainer networkDestinationContainer : inboundDestinationViewMap.values()) {
|
||||
try {
|
||||
if (objectName != null) {
|
||||
brokerService.getManagementContext().unregisterMBean(objectName);
|
||||
brokerService.getManagementContext().unregisterMBean(networkDestinationContainer.objectName);
|
||||
} catch (Exception e) {
|
||||
LOG.error("Network bridge could not be unregistered in JMX: {}", e.getMessage(), e);
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
}
|
||||
for (NetworkDestinationContainer networkDestinationContainer : outboundDestinationViewMap.values()) {
|
||||
try {
|
||||
brokerService.getManagementContext().unregisterMBean(networkDestinationContainer.objectName);
|
||||
} catch (Exception e) {
|
||||
LOG.debug("Network bridge could not be unregistered in JMX: {}", e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
destinationObjectNameMap.clear();
|
||||
outboundDestinationViewMap.clear();
|
||||
inboundDestinationViewMap.clear();
|
||||
outboundDestinationViewMap.clear();
|
||||
}
|
||||
|
||||
private void purgeInactiveDestinationViews() {
|
||||
|
@ -143,25 +144,32 @@ public class MBeanBridgeDestination {
|
|||
purgeInactiveDestinationView(outboundDestinationViewMap);
|
||||
}
|
||||
|
||||
private void purgeInactiveDestinationView(Map<ActiveMQDestination, NetworkDestinationView> map) {
|
||||
private void purgeInactiveDestinationView(Map<ActiveMQDestination, NetworkDestinationContainer> map) {
|
||||
long time = System.currentTimeMillis() - networkBridgeConfiguration.getGcSweepTime();
|
||||
for (Map.Entry<ActiveMQDestination, NetworkDestinationView> entry : map.entrySet()) {
|
||||
if (entry.getValue().getLastAccessTime() <= time) {
|
||||
synchronized (destinationObjectNameMap) {
|
||||
map.remove(entry.getKey());
|
||||
ObjectName objectName = destinationObjectNameMap.remove(entry.getKey());
|
||||
for (Iterator<Map.Entry<ActiveMQDestination, NetworkDestinationContainer>> it = map.entrySet().iterator(); it.hasNext(); ) {
|
||||
Map.Entry<ActiveMQDestination, NetworkDestinationContainer> entry = it.next();
|
||||
if (entry.getValue().view.getLastAccessTime() <= time) {
|
||||
ObjectName objectName = entry.getValue().objectName;
|
||||
if (objectName != null) {
|
||||
try {
|
||||
if (objectName != null) {
|
||||
brokerService.getManagementContext().unregisterMBean(objectName);
|
||||
}
|
||||
brokerService.getManagementContext().unregisterMBean(entry.getValue().objectName);
|
||||
} catch (Throwable e) {
|
||||
LOG.debug("Network bridge could not be unregistered in JMX: {}", e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
entry.getValue().close();
|
||||
entry.getValue().view.close();
|
||||
it.remove();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static class NetworkDestinationContainer {
|
||||
private final NetworkDestinationView view;
|
||||
private final ObjectName objectName;
|
||||
|
||||
private NetworkDestinationContainer(NetworkDestinationView view, ObjectName objectName) {
|
||||
this.view = view;
|
||||
this.objectName = objectName;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -5,9 +5,9 @@
|
|||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* <p/>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* <p/>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
|
@ -16,17 +16,7 @@
|
|||
*/
|
||||
package org.apache.activemq.network;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assume.assumeNotNull;
|
||||
|
||||
import java.net.MalformedURLException;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
import javax.management.MBeanServer;
|
||||
import javax.management.ObjectInstance;
|
||||
import javax.management.ObjectName;
|
||||
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.broker.jmx.ManagementContext;
|
||||
import org.apache.activemq.util.TestUtils;
|
||||
|
@ -35,6 +25,20 @@ import org.junit.Test;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Session;
|
||||
import javax.management.MBeanServer;
|
||||
import javax.management.ObjectInstance;
|
||||
import javax.management.ObjectName;
|
||||
import java.net.MalformedURLException;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assume.assumeNotNull;
|
||||
|
||||
public class DuplexNetworkMBeanTest {
|
||||
|
||||
protected static final Logger LOG = LoggerFactory.getLogger(DuplexNetworkMBeanTest.class);
|
||||
|
@ -137,6 +141,74 @@ public class DuplexNetworkMBeanTest {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMBeansNotOverwrittenOnCleanup() throws Exception {
|
||||
BrokerService broker = createBroker();
|
||||
|
||||
BrokerService networkedBroker = createNetworkedBroker();
|
||||
MessageProducer producerBroker = null;
|
||||
MessageConsumer consumerBroker = null;
|
||||
Session sessionNetworkBroker = null;
|
||||
Session sessionBroker = null;
|
||||
MessageProducer producerNetworkBroker = null;
|
||||
MessageConsumer consumerNetworkBroker = null;
|
||||
try {
|
||||
broker.start();
|
||||
broker.waitUntilStarted();
|
||||
networkedBroker.start();
|
||||
try {
|
||||
assertEquals(2, countMbeans(networkedBroker, "connector=networkConnectors", 10000));
|
||||
assertEquals(1, countMbeans(broker, "connector=duplexNetworkConnectors", 10000));
|
||||
|
||||
Connection brokerConnection = new ActiveMQConnectionFactory(broker.getVmConnectorURI()).createConnection();
|
||||
brokerConnection.start();
|
||||
|
||||
sessionBroker = brokerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
producerBroker = sessionBroker.createProducer(sessionBroker.createTopic("testTopic"));
|
||||
consumerBroker = sessionBroker.createConsumer(sessionBroker.createTopic("testTopic"));
|
||||
Connection netWorkBrokerConnection = new ActiveMQConnectionFactory(networkedBroker.getVmConnectorURI()).createConnection();
|
||||
netWorkBrokerConnection.start();
|
||||
sessionNetworkBroker = netWorkBrokerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
producerNetworkBroker = sessionNetworkBroker.createProducer(sessionBroker.createTopic("testTopic"));
|
||||
consumerNetworkBroker = sessionNetworkBroker.createConsumer(sessionBroker.createTopic("testTopic"));
|
||||
|
||||
assertEquals(4, countMbeans(broker, "destinationType=Topic,destinationName=testTopic", 15000));
|
||||
assertEquals(4, countMbeans(networkedBroker, "destinationType=Topic,destinationName=testTopic", 15000));
|
||||
|
||||
producerBroker.send(sessionBroker.createTextMessage("test1"));
|
||||
producerNetworkBroker.send(sessionNetworkBroker.createTextMessage("test2"));
|
||||
|
||||
assertEquals(2, countMbeans(networkedBroker, "destinationName=testTopic,direction=*", 10000));
|
||||
assertEquals(2, countMbeans(broker, "destinationName=testTopic,direction=*", 10000));
|
||||
} finally {
|
||||
if (producerBroker != null) {
|
||||
producerBroker.close();
|
||||
}
|
||||
if (consumerBroker != null) {
|
||||
consumerBroker.close();
|
||||
}
|
||||
if (sessionBroker != null) {
|
||||
sessionBroker.close();
|
||||
}
|
||||
if (sessionNetworkBroker != null) {
|
||||
sessionNetworkBroker.close();
|
||||
}
|
||||
if (producerNetworkBroker != null) {
|
||||
producerNetworkBroker.close();
|
||||
}
|
||||
if (consumerNetworkBroker != null) {
|
||||
consumerNetworkBroker.close();
|
||||
}
|
||||
networkedBroker.stop();
|
||||
networkedBroker.waitUntilStopped();
|
||||
}
|
||||
assertEquals(0, countMbeans(broker, "destinationName=testTopic,direction=*", 1500));
|
||||
} finally {
|
||||
broker.stop();
|
||||
broker.waitUntilStopped();
|
||||
}
|
||||
}
|
||||
|
||||
private int countMbeans(BrokerService broker, String type) throws Exception {
|
||||
return countMbeans(broker, type, 0);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue