https://issues.apache.org/jira/browse/AMQ-3109 - mbeans for bridges created by duplex connector

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1053942 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bosanac Dejan 2010-12-30 17:03:35 +00:00
parent ebcab0512c
commit 2a2dcd5ec4
8 changed files with 129 additions and 14 deletions

View File

@ -1702,6 +1702,14 @@ public class BrokerService implements Service {
+ "NetworkConnectorName=" + JMXSupport.encodeObjectNamePart(connector.getName())); + "NetworkConnectorName=" + JMXSupport.encodeObjectNamePart(connector.getName()));
} }
public ObjectName createDuplexNetworkConnectorObjectName(String transport)
throws MalformedObjectNameException {
return new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName="
+ JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=NetworkConnector,"
+ "NetworkConnectorName=duplex" + JMXSupport.encodeObjectNamePart(transport));
}
protected void unregisterNetworkConnectorMBean(NetworkConnector connector) { protected void unregisterNetworkConnectorMBean(NetworkConnector connector) {
if (isUseJmx()) { if (isUseJmx()) {
try { try {

View File

@ -33,6 +33,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.management.ObjectName;
import javax.transaction.xa.XAResource; import javax.transaction.xa.XAResource;
import org.apache.activemq.broker.ft.MasterBroker; import org.apache.activemq.broker.ft.MasterBroker;
@ -72,9 +73,7 @@ import org.apache.activemq.command.ShutdownInfo;
import org.apache.activemq.command.TransactionId; import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.TransactionInfo; import org.apache.activemq.command.TransactionInfo;
import org.apache.activemq.command.WireFormatInfo; import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.network.DemandForwardingBridge; import org.apache.activemq.network.*;
import org.apache.activemq.network.NetworkBridgeConfiguration;
import org.apache.activemq.network.NetworkBridgeFactory;
import org.apache.activemq.security.MessageAuthorizationPolicy; import org.apache.activemq.security.MessageAuthorizationPolicy;
import org.apache.activemq.state.CommandVisitor; import org.apache.activemq.state.CommandVisitor;
import org.apache.activemq.state.ConnectionState; import org.apache.activemq.state.ConnectionState;
@ -1209,7 +1208,13 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map)); uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map));
Transport localTransport = TransportFactory.connect(uri); Transport localTransport = TransportFactory.connect(uri);
Transport remoteBridgeTransport = new ResponseCorrelator(transport); Transport remoteBridgeTransport = new ResponseCorrelator(transport);
duplexBridge = NetworkBridgeFactory.createBridge(config, localTransport, remoteBridgeTransport); String duplexName = localTransport.toString();
if (duplexName.contains("#")) {
duplexName = duplexName.substring(duplexName.lastIndexOf("#"));
}
MBeanNetworkListener listener = new MBeanNetworkListener(broker.getBrokerService(), broker.getBrokerService().createDuplexNetworkConnectorObjectName(duplexName));
listener.setCreatedByDuplex(true);
duplexBridge = NetworkBridgeFactory.createBridge(config, localTransport, remoteBridgeTransport, listener);
duplexBridge.setBrokerService(broker.getBrokerService()); duplexBridge.setBrokerService(broker.getBrokerService());
// now turn duplex off this side // now turn duplex off this side
info.setDuplexConnection(false); info.setDuplexConnection(false);

View File

@ -21,6 +21,7 @@ import org.apache.activemq.network.NetworkBridge;
public class NetworkBridgeView implements NetworkBridgeViewMBean { public class NetworkBridgeView implements NetworkBridgeViewMBean {
private final NetworkBridge bridge; private final NetworkBridge bridge;
private boolean createByDuplex = false;
public NetworkBridgeView(NetworkBridge bridge) { public NetworkBridgeView(NetworkBridge bridge) {
this.bridge = bridge; this.bridge = bridge;
@ -58,4 +59,11 @@ public class NetworkBridgeView implements NetworkBridgeViewMBean {
return bridge.getDequeueCounter(); return bridge.getDequeueCounter();
} }
public boolean isCreatedByDuplex() {
return createByDuplex;
}
public void setCreateByDuplex(boolean createByDuplex) {
this.createByDuplex = createByDuplex;
}
} }

View File

@ -32,4 +32,6 @@ public interface NetworkBridgeViewMBean extends Service {
long getDequeueCounter(); long getDequeueCounter();
boolean isCreatedByDuplex();
} }

View File

@ -148,6 +148,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
this.duplexInitiatingConnection = connection; this.duplexInitiatingConnection = connection;
start(); start();
serviceRemoteCommand(remoteBrokerInfo); serviceRemoteCommand(remoteBrokerInfo);
brokerService.getBroker().networkBridgeStarted(remoteBrokerInfo);
} }
public void start() throws Exception { public void start() throws Exception {

View File

@ -23,6 +23,7 @@ import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.Map; import java.util.Map;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.SslContext; import org.apache.activemq.broker.SslContext;
import org.apache.activemq.command.DiscoveryEvent; import org.apache.activemq.command.DiscoveryEvent;
import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.Transport;
@ -39,6 +40,8 @@ import org.apache.activemq.util.URISupport.CompositeData;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import javax.management.ObjectName;
/** /**
* A network connector which uses a discovery agent to detect the remote brokers * A network connector which uses a discovery agent to detect the remote brokers
* available and setup a connection to each available remote broker * available and setup a connection to each available remote broker
@ -206,7 +209,11 @@ public class DiscoveryNetworkConnector extends NetworkConnector implements Disco
} }
protected NetworkBridge createBridge(Transport localTransport, Transport remoteTransport, final DiscoveryEvent event) { protected NetworkBridge createBridge(Transport localTransport, Transport remoteTransport, final DiscoveryEvent event) {
NetworkBridgeListener listener = new NetworkBridgeListener() { class DiscoverNetworkBridgeListener extends MBeanNetworkListener {
public DiscoverNetworkBridgeListener(BrokerService brokerService, ObjectName connectorName) {
super(brokerService, connectorName);
}
public void bridgeFailed() { public void bridgeFailed() {
if (!serviceSupport.isStopped()) { if (!serviceSupport.isStopped()) {
@ -217,16 +224,9 @@ public class DiscoveryNetworkConnector extends NetworkConnector implements Disco
} }
} }
}
NetworkBridgeListener listener = new DiscoverNetworkBridgeListener(getBrokerService(), getObjectName());
public void onStart(NetworkBridge bridge) {
registerNetworkBridgeMBean(bridge);
}
public void onStop(NetworkBridge bridge) {
unregisterNetworkBridgeMBean(bridge);
}
};
DemandForwardingBridge result = NetworkBridgeFactory.createBridge(this, localTransport, remoteTransport, listener); DemandForwardingBridge result = NetworkBridgeFactory.createBridge(this, localTransport, remoteTransport, listener);
result.setBrokerService(getBrokerService()); result.setBrokerService(getBrokerService());
return configureBridge(result); return configureBridge(result);

View File

@ -0,0 +1,89 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.network;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.AnnotatedMBean;
import org.apache.activemq.broker.jmx.NetworkBridgeView;
import org.apache.activemq.broker.jmx.NetworkBridgeViewMBean;
import org.apache.activemq.util.JMXSupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import java.util.HashMap;
import java.util.Map;
public class MBeanNetworkListener implements NetworkBridgeListener {
private static final Log LOG = LogFactory.getLog(MBeanNetworkListener.class);
BrokerService brokerService;
ObjectName connectorName;
boolean createdByDuplex = false;
public MBeanNetworkListener(BrokerService brokerService, ObjectName connectorName) {
this.brokerService = brokerService;
this.connectorName = connectorName;
}
@Override
public void bridgeFailed() {
}
@Override
public void onStart(NetworkBridge bridge) {
if (!brokerService.isUseJmx()) {
return;
}
NetworkBridgeViewMBean view = new NetworkBridgeView(bridge);
((NetworkBridgeView)view).setCreateByDuplex(createdByDuplex);
try {
ObjectName objectName = createNetworkBridgeObjectName(bridge);
AnnotatedMBean.registerMBean(brokerService.getManagementContext(), view, objectName);
} catch (Throwable e) {
LOG.debug("Network bridge could not be registered in JMX: " + e.getMessage(), e);
}
}
@Override
public void onStop(NetworkBridge bridge) {
if (!brokerService.isUseJmx()) {
return;
}
try {
ObjectName objectName = createNetworkBridgeObjectName(bridge);
brokerService.getManagementContext().unregisterMBean(objectName);
} catch (Throwable e) {
LOG.debug("Network bridge could not be unregistered in JMX: " + e.getMessage(), e);
}
}
protected ObjectName createNetworkBridgeObjectName(NetworkBridge bridge) throws MalformedObjectNameException {
Map<String, String> map = new HashMap<String, String>(connectorName.getKeyPropertyList());
return new ObjectName(connectorName.getDomain() + ":" + "BrokerName=" + JMXSupport.encodeObjectNamePart((String) map.get("BrokerName")) + "," + "Type=NetworkBridge,"
+ "NetworkConnectorName=" + JMXSupport.encodeObjectNamePart((String)map.get("NetworkConnectorName")) + "," + "Name="
+ JMXSupport.encodeObjectNamePart(JMXSupport.encodeObjectNamePart(bridge.getRemoteAddress())));
}
public void setCreatedByDuplex(boolean createdByDuplex) {
this.createdByDuplex = createdByDuplex;
}
}

View File

@ -68,10 +68,12 @@ public class DuplexNetworkMBeanTest {
networkedBroker = createNetworkedBroker(); networkedBroker = createNetworkedBroker();
networkedBroker.start(); networkedBroker.start();
assertEquals(1, countMbeans(networkedBroker, "NetworkBridge", 2000)); assertEquals(1, countMbeans(networkedBroker, "NetworkBridge", 2000));
assertEquals(1, countMbeans(broker, "NetworkBridge", 2000));
assertEquals(1, countMbeans(broker, "Connection")); assertEquals(1, countMbeans(broker, "Connection"));
networkedBroker.stop(); networkedBroker.stop();
networkedBroker.waitUntilStopped(); networkedBroker.waitUntilStopped();
assertEquals(0, countMbeans(networkedBroker, "stopped")); assertEquals(0, countMbeans(networkedBroker, "stopped"));
assertEquals(0, countMbeans(broker, "NetworkBridge"));
} }
assertEquals(0, countMbeans(networkedBroker, "NetworkBridge")); assertEquals(0, countMbeans(networkedBroker, "NetworkBridge"));