mirror of https://github.com/apache/activemq.git
Network of brokers on duplex mode reports InstanceAlreadyExistsException on already existing destinations
(cherry picked from commit6b1e87410d
) (cherry picked from commitaa8b64420b
)
This commit is contained in:
parent
b21ad1a0f9
commit
5c8939741a
|
@ -5,9 +5,9 @@
|
||||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
* (the "License"); you may not use this file except in compliance with
|
* (the "License"); you may not use this file except in compliance with
|
||||||
* the License. You may obtain a copy of the License at
|
* the License. You may obtain a copy of the License at
|
||||||
*
|
* <p/>
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
*
|
* <p/>
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
@ -16,12 +16,6 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.network;
|
package org.apache.activemq.network;
|
||||||
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
|
||||||
|
|
||||||
import javax.management.ObjectName;
|
|
||||||
|
|
||||||
import org.apache.activemq.broker.BrokerService;
|
import org.apache.activemq.broker.BrokerService;
|
||||||
import org.apache.activemq.broker.jmx.AnnotatedMBean;
|
import org.apache.activemq.broker.jmx.AnnotatedMBean;
|
||||||
import org.apache.activemq.broker.jmx.BrokerMBeanSupport;
|
import org.apache.activemq.broker.jmx.BrokerMBeanSupport;
|
||||||
|
@ -33,6 +27,11 @@ import org.apache.activemq.thread.Scheduler;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import javax.management.ObjectName;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
||||||
public class MBeanBridgeDestination {
|
public class MBeanBridgeDestination {
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(MBeanBridgeDestination.class);
|
private static final Logger LOG = LoggerFactory.getLogger(MBeanBridgeDestination.class);
|
||||||
private final BrokerService brokerService;
|
private final BrokerService brokerService;
|
||||||
|
@ -41,9 +40,8 @@ public class MBeanBridgeDestination {
|
||||||
private final NetworkBridgeConfiguration networkBridgeConfiguration;
|
private final NetworkBridgeConfiguration networkBridgeConfiguration;
|
||||||
private final Scheduler scheduler;
|
private final Scheduler scheduler;
|
||||||
private final Runnable purgeInactiveDestinationViewTask;
|
private final Runnable purgeInactiveDestinationViewTask;
|
||||||
private Map<ActiveMQDestination, ObjectName> destinationObjectNameMap = new ConcurrentHashMap<ActiveMQDestination, ObjectName>();
|
private final Map<ActiveMQDestination, NetworkDestinationContainer> outboundDestinationViewMap = new ConcurrentHashMap<>();
|
||||||
private Map<ActiveMQDestination, NetworkDestinationView> outboundDestinationViewMap = new ConcurrentHashMap<ActiveMQDestination, NetworkDestinationView>();
|
private final Map<ActiveMQDestination, NetworkDestinationContainer> inboundDestinationViewMap = new ConcurrentHashMap<>();
|
||||||
private Map<ActiveMQDestination, NetworkDestinationView> inboundDestinationViewMap = new ConcurrentHashMap<ActiveMQDestination, NetworkDestinationView>();
|
|
||||||
|
|
||||||
public MBeanBridgeDestination(BrokerService brokerService, NetworkBridgeConfiguration networkBridgeConfiguration, NetworkBridge bridge, NetworkBridgeView networkBridgeView) {
|
public MBeanBridgeDestination(BrokerService brokerService, NetworkBridgeConfiguration networkBridgeConfiguration, NetworkBridge bridge, NetworkBridgeView networkBridgeView) {
|
||||||
this.brokerService = brokerService;
|
this.brokerService = brokerService;
|
||||||
|
@ -61,49 +59,48 @@ public class MBeanBridgeDestination {
|
||||||
|
|
||||||
public void onOutboundMessage(Message message) {
|
public void onOutboundMessage(Message message) {
|
||||||
ActiveMQDestination destination = message.getDestination();
|
ActiveMQDestination destination = message.getDestination();
|
||||||
NetworkDestinationView networkDestinationView = outboundDestinationViewMap.get(destination);
|
NetworkDestinationContainer networkDestinationContainer;
|
||||||
if (networkDestinationView == null) {
|
|
||||||
synchronized (destinationObjectNameMap) {
|
|
||||||
if ((networkDestinationView = outboundDestinationViewMap.get(destination)) == null) {
|
|
||||||
ObjectName bridgeObjectName = bridge.getMbeanObjectName();
|
|
||||||
try {
|
|
||||||
ObjectName objectName = BrokerMBeanSupport.createNetworkOutBoundDestinationObjectName(bridgeObjectName, destination);
|
|
||||||
networkDestinationView = new NetworkDestinationView(networkBridgeView, destination.getPhysicalName());
|
|
||||||
AnnotatedMBean.registerMBean(brokerService.getManagementContext(), networkDestinationView, objectName);
|
|
||||||
destinationObjectNameMap.put(destination, objectName);
|
|
||||||
outboundDestinationViewMap.put(destination, networkDestinationView);
|
|
||||||
|
|
||||||
} catch (Exception e) {
|
if ((networkDestinationContainer = outboundDestinationViewMap.get(destination)) == null) {
|
||||||
LOG.warn("Failed to register " + destination, e);
|
ObjectName bridgeObjectName = bridge.getMbeanObjectName();
|
||||||
}
|
try {
|
||||||
}
|
ObjectName objectName = BrokerMBeanSupport.createNetworkOutBoundDestinationObjectName(bridgeObjectName, destination);
|
||||||
|
NetworkDestinationView networkDestinationView = new NetworkDestinationView(networkBridgeView, destination.getPhysicalName());
|
||||||
|
AnnotatedMBean.registerMBean(brokerService.getManagementContext(), networkDestinationView, objectName);
|
||||||
|
|
||||||
|
networkDestinationContainer = new NetworkDestinationContainer(networkDestinationView, objectName);
|
||||||
|
outboundDestinationViewMap.put(destination, networkDestinationContainer);
|
||||||
|
networkDestinationView.messageSent();
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.warn("Failed to register " + destination, e);
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
networkDestinationContainer.view.messageSent();
|
||||||
}
|
}
|
||||||
networkDestinationView.messageSent();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public void onInboundMessage(Message message) {
|
public void onInboundMessage(Message message) {
|
||||||
ActiveMQDestination destination = message.getDestination();
|
ActiveMQDestination destination = message.getDestination();
|
||||||
NetworkDestinationView networkDestinationView = inboundDestinationViewMap.get(destination);
|
NetworkDestinationContainer networkDestinationContainer;
|
||||||
if (networkDestinationView == null) {
|
|
||||||
synchronized (destinationObjectNameMap) {
|
if ((networkDestinationContainer = inboundDestinationViewMap.get(destination)) == null) {
|
||||||
if ((networkDestinationView = inboundDestinationViewMap.get(destination)) == null) {
|
ObjectName bridgeObjectName = bridge.getMbeanObjectName();
|
||||||
ObjectName bridgeObjectName = bridge.getMbeanObjectName();
|
try {
|
||||||
try {
|
ObjectName objectName = BrokerMBeanSupport.createNetworkInBoundDestinationObjectName(bridgeObjectName, destination);
|
||||||
ObjectName objectName = BrokerMBeanSupport.createNetworkInBoundDestinationObjectName(bridgeObjectName, destination);
|
NetworkDestinationView networkDestinationView = new NetworkDestinationView(networkBridgeView, destination.getPhysicalName());
|
||||||
networkDestinationView = new NetworkDestinationView(networkBridgeView, destination.getPhysicalName());
|
AnnotatedMBean.registerMBean(brokerService.getManagementContext(), networkDestinationView, objectName);
|
||||||
networkBridgeView.addNetworkDestinationView(networkDestinationView);
|
|
||||||
AnnotatedMBean.registerMBean(brokerService.getManagementContext(), networkDestinationView, objectName);
|
networkBridgeView.addNetworkDestinationView(networkDestinationView);
|
||||||
destinationObjectNameMap.put(destination, objectName);
|
networkDestinationContainer = new NetworkDestinationContainer(networkDestinationView, objectName);
|
||||||
inboundDestinationViewMap.put(destination, networkDestinationView);
|
inboundDestinationViewMap.put(destination, networkDestinationContainer);
|
||||||
} catch (Exception e) {
|
networkDestinationView.messageSent();
|
||||||
LOG.warn("Failed to register " + destination, e);
|
} catch (Exception e) {
|
||||||
}
|
LOG.warn("Failed to register " + destination, e);
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
networkDestinationContainer.view.messageSent();
|
||||||
}
|
}
|
||||||
networkDestinationView.messageSent();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void start() {
|
public void start() {
|
||||||
|
@ -121,18 +118,22 @@ public class MBeanBridgeDestination {
|
||||||
}
|
}
|
||||||
|
|
||||||
scheduler.cancel(purgeInactiveDestinationViewTask);
|
scheduler.cancel(purgeInactiveDestinationViewTask);
|
||||||
for (ObjectName objectName : destinationObjectNameMap.values()) {
|
for (NetworkDestinationContainer networkDestinationContainer : inboundDestinationViewMap.values()) {
|
||||||
try {
|
try {
|
||||||
if (objectName != null) {
|
brokerService.getManagementContext().unregisterMBean(networkDestinationContainer.objectName);
|
||||||
brokerService.getManagementContext().unregisterMBean(objectName);
|
} catch (Exception e) {
|
||||||
}
|
LOG.error("Network bridge could not be unregistered in JMX: {}", e.getMessage(), e);
|
||||||
} catch (Throwable e) {
|
}
|
||||||
|
}
|
||||||
|
for (NetworkDestinationContainer networkDestinationContainer : outboundDestinationViewMap.values()) {
|
||||||
|
try {
|
||||||
|
brokerService.getManagementContext().unregisterMBean(networkDestinationContainer.objectName);
|
||||||
|
} catch (Exception e) {
|
||||||
LOG.debug("Network bridge could not be unregistered in JMX: {}", e.getMessage(), e);
|
LOG.debug("Network bridge could not be unregistered in JMX: {}", e.getMessage(), e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
destinationObjectNameMap.clear();
|
|
||||||
outboundDestinationViewMap.clear();
|
|
||||||
inboundDestinationViewMap.clear();
|
inboundDestinationViewMap.clear();
|
||||||
|
outboundDestinationViewMap.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
private void purgeInactiveDestinationViews() {
|
private void purgeInactiveDestinationViews() {
|
||||||
|
@ -143,25 +144,32 @@ public class MBeanBridgeDestination {
|
||||||
purgeInactiveDestinationView(outboundDestinationViewMap);
|
purgeInactiveDestinationView(outboundDestinationViewMap);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void purgeInactiveDestinationView(Map<ActiveMQDestination, NetworkDestinationView> map) {
|
private void purgeInactiveDestinationView(Map<ActiveMQDestination, NetworkDestinationContainer> map) {
|
||||||
long time = System.currentTimeMillis() - networkBridgeConfiguration.getGcSweepTime();
|
long time = System.currentTimeMillis() - networkBridgeConfiguration.getGcSweepTime();
|
||||||
for (Map.Entry<ActiveMQDestination, NetworkDestinationView> entry : map.entrySet()) {
|
for (Iterator<Map.Entry<ActiveMQDestination, NetworkDestinationContainer>> it = map.entrySet().iterator(); it.hasNext(); ) {
|
||||||
if (entry.getValue().getLastAccessTime() <= time) {
|
Map.Entry<ActiveMQDestination, NetworkDestinationContainer> entry = it.next();
|
||||||
synchronized (destinationObjectNameMap) {
|
if (entry.getValue().view.getLastAccessTime() <= time) {
|
||||||
map.remove(entry.getKey());
|
ObjectName objectName = entry.getValue().objectName;
|
||||||
ObjectName objectName = destinationObjectNameMap.remove(entry.getKey());
|
if (objectName != null) {
|
||||||
if (objectName != null) {
|
try {
|
||||||
try {
|
brokerService.getManagementContext().unregisterMBean(entry.getValue().objectName);
|
||||||
if (objectName != null) {
|
} catch (Throwable e) {
|
||||||
brokerService.getManagementContext().unregisterMBean(objectName);
|
LOG.debug("Network bridge could not be unregistered in JMX: {}", e.getMessage(), e);
|
||||||
}
|
|
||||||
} catch (Throwable e) {
|
|
||||||
LOG.debug("Network bridge could not be unregistered in JMX: {}", e.getMessage(), e);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
entry.getValue().close();
|
|
||||||
}
|
}
|
||||||
|
entry.getValue().view.close();
|
||||||
|
it.remove();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static class NetworkDestinationContainer {
|
||||||
|
private final NetworkDestinationView view;
|
||||||
|
private final ObjectName objectName;
|
||||||
|
|
||||||
|
private NetworkDestinationContainer(NetworkDestinationView view, ObjectName objectName) {
|
||||||
|
this.view = view;
|
||||||
|
this.objectName = objectName;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,9 +5,9 @@
|
||||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
* (the "License"); you may not use this file except in compliance with
|
* (the "License"); you may not use this file except in compliance with
|
||||||
* the License. You may obtain a copy of the License at
|
* the License. You may obtain a copy of the License at
|
||||||
*
|
* <p/>
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
*
|
* <p/>
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
@ -16,16 +16,7 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.network;
|
package org.apache.activemq.network;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||||
import static org.junit.Assume.assumeNotNull;
|
|
||||||
|
|
||||||
import java.net.MalformedURLException;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Set;
|
|
||||||
|
|
||||||
import javax.management.ObjectInstance;
|
|
||||||
import javax.management.ObjectName;
|
|
||||||
|
|
||||||
import org.apache.activemq.broker.BrokerService;
|
import org.apache.activemq.broker.BrokerService;
|
||||||
import org.apache.activemq.util.TestUtils;
|
import org.apache.activemq.util.TestUtils;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
@ -33,6 +24,20 @@ import org.junit.Test;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import javax.jms.Connection;
|
||||||
|
import javax.jms.MessageConsumer;
|
||||||
|
import javax.jms.MessageProducer;
|
||||||
|
import javax.jms.Session;
|
||||||
|
import javax.management.MBeanServer;
|
||||||
|
import javax.management.ObjectInstance;
|
||||||
|
import javax.management.ObjectName;
|
||||||
|
import java.net.MalformedURLException;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assume.assumeNotNull;
|
||||||
|
|
||||||
public class DuplexNetworkMBeanTest {
|
public class DuplexNetworkMBeanTest {
|
||||||
|
|
||||||
protected static final Logger LOG = LoggerFactory.getLogger(DuplexNetworkMBeanTest.class);
|
protected static final Logger LOG = LoggerFactory.getLogger(DuplexNetworkMBeanTest.class);
|
||||||
|
@ -134,6 +139,74 @@ public class DuplexNetworkMBeanTest {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMBeansNotOverwrittenOnCleanup() throws Exception {
|
||||||
|
BrokerService broker = createBroker();
|
||||||
|
|
||||||
|
BrokerService networkedBroker = createNetworkedBroker();
|
||||||
|
MessageProducer producerBroker = null;
|
||||||
|
MessageConsumer consumerBroker = null;
|
||||||
|
Session sessionNetworkBroker = null;
|
||||||
|
Session sessionBroker = null;
|
||||||
|
MessageProducer producerNetworkBroker = null;
|
||||||
|
MessageConsumer consumerNetworkBroker = null;
|
||||||
|
try {
|
||||||
|
broker.start();
|
||||||
|
broker.waitUntilStarted();
|
||||||
|
networkedBroker.start();
|
||||||
|
try {
|
||||||
|
assertEquals(2, countMbeans(networkedBroker, "connector=networkConnectors", 10000));
|
||||||
|
assertEquals(1, countMbeans(broker, "connector=duplexNetworkConnectors", 10000));
|
||||||
|
|
||||||
|
Connection brokerConnection = new ActiveMQConnectionFactory(broker.getVmConnectorURI()).createConnection();
|
||||||
|
brokerConnection.start();
|
||||||
|
|
||||||
|
sessionBroker = brokerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
producerBroker = sessionBroker.createProducer(sessionBroker.createTopic("testTopic"));
|
||||||
|
consumerBroker = sessionBroker.createConsumer(sessionBroker.createTopic("testTopic"));
|
||||||
|
Connection netWorkBrokerConnection = new ActiveMQConnectionFactory(networkedBroker.getVmConnectorURI()).createConnection();
|
||||||
|
netWorkBrokerConnection.start();
|
||||||
|
sessionNetworkBroker = netWorkBrokerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
producerNetworkBroker = sessionNetworkBroker.createProducer(sessionBroker.createTopic("testTopic"));
|
||||||
|
consumerNetworkBroker = sessionNetworkBroker.createConsumer(sessionBroker.createTopic("testTopic"));
|
||||||
|
|
||||||
|
assertEquals(4, countMbeans(broker, "destinationType=Topic,destinationName=testTopic", 15000));
|
||||||
|
assertEquals(4, countMbeans(networkedBroker, "destinationType=Topic,destinationName=testTopic", 15000));
|
||||||
|
|
||||||
|
producerBroker.send(sessionBroker.createTextMessage("test1"));
|
||||||
|
producerNetworkBroker.send(sessionNetworkBroker.createTextMessage("test2"));
|
||||||
|
|
||||||
|
assertEquals(2, countMbeans(networkedBroker, "destinationName=testTopic,direction=*", 10000));
|
||||||
|
assertEquals(2, countMbeans(broker, "destinationName=testTopic,direction=*", 10000));
|
||||||
|
} finally {
|
||||||
|
if (producerBroker != null) {
|
||||||
|
producerBroker.close();
|
||||||
|
}
|
||||||
|
if (consumerBroker != null) {
|
||||||
|
consumerBroker.close();
|
||||||
|
}
|
||||||
|
if (sessionBroker != null) {
|
||||||
|
sessionBroker.close();
|
||||||
|
}
|
||||||
|
if (sessionNetworkBroker != null) {
|
||||||
|
sessionNetworkBroker.close();
|
||||||
|
}
|
||||||
|
if (producerNetworkBroker != null) {
|
||||||
|
producerNetworkBroker.close();
|
||||||
|
}
|
||||||
|
if (consumerNetworkBroker != null) {
|
||||||
|
consumerNetworkBroker.close();
|
||||||
|
}
|
||||||
|
networkedBroker.stop();
|
||||||
|
networkedBroker.waitUntilStopped();
|
||||||
|
}
|
||||||
|
assertEquals(0, countMbeans(broker, "destinationName=testTopic,direction=*", 1500));
|
||||||
|
} finally {
|
||||||
|
broker.stop();
|
||||||
|
broker.waitUntilStopped();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private int countMbeans(BrokerService broker, String type) throws Exception {
|
private int countMbeans(BrokerService broker, String type) throws Exception {
|
||||||
return countMbeans(broker, type, 0);
|
return countMbeans(broker, type, 0);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue