mirror of https://github.com/apache/activemq.git
Display the establised Neteowork Connector Bridges via JMX - https://issues.apache.org/activemq/browse/AMQ-1299
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@551443 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
b56197408a
commit
ec6e6a55a7
|
@ -278,6 +278,7 @@ public class BrokerService implements Service {
|
||||||
* network
|
* network
|
||||||
*/
|
*/
|
||||||
public NetworkConnector addNetworkConnector(NetworkConnector connector) throws Exception {
|
public NetworkConnector addNetworkConnector(NetworkConnector connector) throws Exception {
|
||||||
|
connector.setBrokerService(this);
|
||||||
URI uri = getVmConnectorURI();
|
URI uri = getVmConnectorURI();
|
||||||
HashMap map = new HashMap(URISupport.parseParamters(uri));
|
HashMap map = new HashMap(URISupport.parseParamters(uri));
|
||||||
map.put("network", "true");
|
map.put("network", "true");
|
||||||
|
@ -1217,6 +1218,7 @@ public class BrokerService implements Service {
|
||||||
NetworkConnectorViewMBean view = new NetworkConnectorView(connector);
|
NetworkConnectorViewMBean view = new NetworkConnectorView(connector);
|
||||||
try {
|
try {
|
||||||
ObjectName objectName = createNetworkConnectorObjectName(connector);
|
ObjectName objectName = createNetworkConnectorObjectName(connector);
|
||||||
|
connector.setObjectName(objectName);
|
||||||
mbeanServer.registerMBean(view, objectName);
|
mbeanServer.registerMBean(view, objectName);
|
||||||
registeredMBeanNames.add(objectName);
|
registeredMBeanNames.add(objectName);
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,61 @@
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* Copyright 2005-2006 The Apache Software Foundation
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.broker.jmx;
|
||||||
|
|
||||||
|
import org.apache.activemq.network.NetworkBridge;
|
||||||
|
|
||||||
|
public class NetworkBridgeView implements NetworkBridgeViewMBean {
|
||||||
|
|
||||||
|
private final NetworkBridge bridge;
|
||||||
|
|
||||||
|
public NetworkBridgeView(NetworkBridge bridge) {
|
||||||
|
this.bridge = bridge;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void start() throws Exception {
|
||||||
|
bridge.start();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void stop() throws Exception {
|
||||||
|
bridge.stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getLocalAddress() {
|
||||||
|
return bridge.getLocalAddress();
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getRemoteAddress() {
|
||||||
|
return bridge.getRemoteAddress();
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getRemoteBrokerName() {
|
||||||
|
return bridge.getRemoteBrokerName();
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getLocalBrokerName() {
|
||||||
|
return bridge.getLocalBrokerName();
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getEnqueueCounter() {
|
||||||
|
return bridge.getEnqueueCounter();
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getDequeueCounter() {
|
||||||
|
return bridge.getDequeueCounter();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,30 @@
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* Copyright 2005-2006 The Apache Software Foundation
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.broker.jmx;
|
||||||
|
|
||||||
|
import org.apache.activemq.Service;
|
||||||
|
|
||||||
|
public interface NetworkBridgeViewMBean extends Service {
|
||||||
|
|
||||||
|
public String getLocalAddress();
|
||||||
|
public String getRemoteAddress();
|
||||||
|
public String getRemoteBrokerName();
|
||||||
|
public String getLocalBrokerName();
|
||||||
|
public long getEnqueueCounter();
|
||||||
|
public long getDequeueCounter();
|
||||||
|
|
||||||
|
}
|
|
@ -34,4 +34,84 @@ public class NetworkConnectorView implements NetworkConnectorViewMBean {
|
||||||
public void stop() throws Exception {
|
public void stop() throws Exception {
|
||||||
connector.stop();
|
connector.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public String getName() {
|
||||||
|
return connector.getName();
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getNetworkTTL() {
|
||||||
|
return connector.getNetworkTTL();
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getPrefetchSize() {
|
||||||
|
return connector.getPrefetchSize();
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getUserName() {
|
||||||
|
return connector.getUserName();
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isBridgeTempDestinations() {
|
||||||
|
return connector.isBridgeTempDestinations();
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isConduitSubscriptions() {
|
||||||
|
return connector.isConduitSubscriptions();
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isDecreaseNetworkConsumerPriority() {
|
||||||
|
return connector.isDecreaseNetworkConsumerPriority();
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isDispatchAsync() {
|
||||||
|
return connector.isDispatchAsync();
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isDynamicOnly() {
|
||||||
|
return connector.isDynamicOnly();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setBridgeTempDestinations(boolean bridgeTempDestinations) {
|
||||||
|
connector.setBridgeTempDestinations(bridgeTempDestinations);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setConduitSubscriptions(boolean conduitSubscriptions) {
|
||||||
|
connector.setConduitSubscriptions(conduitSubscriptions);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setDispatchAsync(boolean dispatchAsync) {
|
||||||
|
connector.setDispatchAsync(dispatchAsync);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setDynamicOnly(boolean dynamicOnly) {
|
||||||
|
connector.setDynamicOnly(dynamicOnly);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setNetworkTTL(int networkTTL) {
|
||||||
|
connector.setNetworkTTL(networkTTL);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setPassword(String password) {
|
||||||
|
connector.setPassword(password);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setPrefetchSize(int prefetchSize) {
|
||||||
|
connector.setPrefetchSize(prefetchSize);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setUserName(String userName) {
|
||||||
|
connector.setUserName(userName);
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getPassword() {
|
||||||
|
String pw = connector.getPassword();
|
||||||
|
// Hide the password for security reasons.
|
||||||
|
if( pw!= null )
|
||||||
|
pw = pw.replaceAll(".", "*");
|
||||||
|
return pw;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setDecreaseNetworkConsumerPriority(boolean decreaseNetworkConsumerPriority) {
|
||||||
|
connector.setDecreaseNetworkConsumerPriority(decreaseNetworkConsumerPriority);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,4 +21,24 @@ import org.apache.activemq.Service;
|
||||||
|
|
||||||
public interface NetworkConnectorViewMBean extends Service {
|
public interface NetworkConnectorViewMBean extends Service {
|
||||||
|
|
||||||
|
public String getName();
|
||||||
|
public int getNetworkTTL();
|
||||||
|
public int getPrefetchSize();
|
||||||
|
public String getUserName();
|
||||||
|
public boolean isBridgeTempDestinations();
|
||||||
|
public boolean isConduitSubscriptions();
|
||||||
|
public boolean isDecreaseNetworkConsumerPriority();
|
||||||
|
public boolean isDispatchAsync();
|
||||||
|
public boolean isDynamicOnly();
|
||||||
|
public void setBridgeTempDestinations(boolean bridgeTempDestinations);
|
||||||
|
public void setConduitSubscriptions(boolean conduitSubscriptions);
|
||||||
|
public void setDispatchAsync(boolean dispatchAsync);
|
||||||
|
public void setDynamicOnly(boolean dynamicOnly);
|
||||||
|
public void setNetworkTTL(int networkTTL);
|
||||||
|
public void setPassword(String password);
|
||||||
|
public void setPrefetchSize(int prefetchSize);
|
||||||
|
public void setUserName(String userName);
|
||||||
|
public String getPassword();
|
||||||
|
public void setDecreaseNetworkConsumerPriority(boolean decreaseNetworkConsumerPriority);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,6 +18,12 @@
|
||||||
package org.apache.activemq.network;
|
package org.apache.activemq.network;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.security.GeneralSecurityException;
|
||||||
|
import java.util.Properties;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
import org.apache.activemq.advisory.AdvisorySupport;
|
import org.apache.activemq.advisory.AdvisorySupport;
|
||||||
import org.apache.activemq.command.ActiveMQDestination;
|
import org.apache.activemq.command.ActiveMQDestination;
|
||||||
|
@ -61,12 +67,6 @@ import org.apache.activemq.util.ServiceSupport;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
|
||||||
import java.security.GeneralSecurityException;
|
|
||||||
import java.util.Properties;
|
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
|
||||||
import java.util.concurrent.CountDownLatch;
|
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A useful base class for implementing demand forwarding bridges.
|
* A useful base class for implementing demand forwarding bridges.
|
||||||
*
|
*
|
||||||
|
@ -102,9 +102,14 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge {
|
||||||
protected final AtomicBoolean remoteInterupted = new AtomicBoolean(false);
|
protected final AtomicBoolean remoteInterupted = new AtomicBoolean(false);
|
||||||
protected final AtomicBoolean lastConnectSucceeded = new AtomicBoolean(false);
|
protected final AtomicBoolean lastConnectSucceeded = new AtomicBoolean(false);
|
||||||
protected NetworkBridgeConfiguration configuration;
|
protected NetworkBridgeConfiguration configuration;
|
||||||
private NetworkBridgeFailedListener bridgeFailedListener;
|
private NetworkBridgeListener networkBridgeListener;
|
||||||
private boolean createdByDuplex;
|
private boolean createdByDuplex;
|
||||||
|
|
||||||
|
private BrokerInfo localBrokerInfo;
|
||||||
|
private BrokerInfo remoteBrokerInfo;
|
||||||
|
|
||||||
|
final AtomicLong enqueueCounter = new AtomicLong();
|
||||||
|
final AtomicLong dequeueCounter = new AtomicLong();
|
||||||
|
|
||||||
public DemandForwardingBridgeSupport(NetworkBridgeConfiguration configuration, Transport localBroker, Transport remoteBroker) {
|
public DemandForwardingBridgeSupport(NetworkBridgeConfiguration configuration, Transport localBroker, Transport remoteBroker) {
|
||||||
this.configuration=configuration;
|
this.configuration=configuration;
|
||||||
|
@ -194,6 +199,12 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge {
|
||||||
}catch(IOException e){
|
}catch(IOException e){
|
||||||
log.warn("Caught exception from remote start",e);
|
log.warn("Caught exception from remote start",e);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
NetworkBridgeListener l = this.networkBridgeListener;
|
||||||
|
if (l!=null) {
|
||||||
|
l.onStart(this);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void triggerLocalStartBridge() throws IOException {
|
protected void triggerLocalStartBridge() throws IOException {
|
||||||
|
@ -308,6 +319,11 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge {
|
||||||
log.debug(" stopping "+configuration.getBrokerName()+" bridge to "+remoteBrokerName+" is disposed already ? "+disposed);
|
log.debug(" stopping "+configuration.getBrokerName()+" bridge to "+remoteBrokerName+" is disposed already ? "+disposed);
|
||||||
boolean wasDisposedAlready=disposed;
|
boolean wasDisposedAlready=disposed;
|
||||||
if(!disposed){
|
if(!disposed){
|
||||||
|
NetworkBridgeListener l = this.networkBridgeListener;
|
||||||
|
if (l!=null) {
|
||||||
|
l.onStop(this);
|
||||||
|
}
|
||||||
|
|
||||||
try{
|
try{
|
||||||
disposed=true;
|
disposed=true;
|
||||||
remoteBridgeStarted.set(false);
|
remoteBridgeStarted.set(false);
|
||||||
|
@ -364,6 +380,8 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge {
|
||||||
}else if(command.isBrokerInfo()){
|
}else if(command.isBrokerInfo()){
|
||||||
|
|
||||||
lastConnectSucceeded.set(true);
|
lastConnectSucceeded.set(true);
|
||||||
|
remoteBrokerInfo = ((BrokerInfo)command);
|
||||||
|
|
||||||
serviceRemoteBrokerInfo(command);
|
serviceRemoteBrokerInfo(command);
|
||||||
// Let the local broker know the remote broker's ID.
|
// Let the local broker know the remote broker's ID.
|
||||||
localBroker.oneway(command);
|
localBroker.oneway(command);
|
||||||
|
@ -507,6 +525,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge {
|
||||||
final boolean trace=log.isTraceEnabled();
|
final boolean trace=log.isTraceEnabled();
|
||||||
try{
|
try{
|
||||||
if(command.isMessageDispatch()){
|
if(command.isMessageDispatch()){
|
||||||
|
enqueueCounter.incrementAndGet();
|
||||||
waitStarted();
|
waitStarted();
|
||||||
final MessageDispatch md=(MessageDispatch) command;
|
final MessageDispatch md=(MessageDispatch) command;
|
||||||
DemandSubscription sub=(DemandSubscription) subscriptionMapByLocalId.get(md.getConsumerId());
|
DemandSubscription sub=(DemandSubscription) subscriptionMapByLocalId.get(md.getConsumerId());
|
||||||
|
@ -523,6 +542,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge {
|
||||||
// by bridging it using an async send (small chance of message loss).
|
// by bridging it using an async send (small chance of message loss).
|
||||||
remoteBroker.oneway(message);
|
remoteBroker.oneway(message);
|
||||||
localBroker.oneway(new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,1));
|
localBroker.oneway(new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,1));
|
||||||
|
dequeueCounter.incrementAndGet();
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
|
|
||||||
|
@ -537,6 +557,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge {
|
||||||
serviceLocalException(er.getException());
|
serviceLocalException(er.getException());
|
||||||
} else {
|
} else {
|
||||||
localBroker.oneway(new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,1));
|
localBroker.oneway(new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,1));
|
||||||
|
dequeueCounter.incrementAndGet();
|
||||||
}
|
}
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
serviceLocalException(e);
|
serviceLocalException(e);
|
||||||
|
@ -551,6 +572,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge {
|
||||||
if (trace)log.trace("No subscription registered with this network bridge for consumerId " + md.getConsumerId() + " for message: " + md.getMessage());
|
if (trace)log.trace("No subscription registered with this network bridge for consumerId " + md.getConsumerId() + " for message: " + md.getMessage());
|
||||||
}
|
}
|
||||||
}else if(command.isBrokerInfo()){
|
}else if(command.isBrokerInfo()){
|
||||||
|
localBrokerInfo = ((BrokerInfo)command);
|
||||||
serviceLocalBrokerInfo(command);
|
serviceLocalBrokerInfo(command);
|
||||||
}else if(command.isShutdownInfo()){
|
}else if(command.isShutdownInfo()){
|
||||||
log.info(configuration.getBrokerName()+" Shutting down");
|
log.info(configuration.getBrokerName()+" Shutting down");
|
||||||
|
@ -812,14 +834,39 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge {
|
||||||
|
|
||||||
protected abstract BrokerId[] getRemoteBrokerPath();
|
protected abstract BrokerId[] getRemoteBrokerPath();
|
||||||
|
|
||||||
public void setNetworkBridgeFailedListener(NetworkBridgeFailedListener listener){
|
public void setNetworkBridgeListener(NetworkBridgeListener listener){
|
||||||
this.bridgeFailedListener=listener;
|
this.networkBridgeListener=listener;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void fireBridgeFailed() {
|
private void fireBridgeFailed() {
|
||||||
NetworkBridgeFailedListener l = this.bridgeFailedListener;
|
NetworkBridgeListener l = this.networkBridgeListener;
|
||||||
if (l!=null) {
|
if (l!=null) {
|
||||||
l.bridgeFailed();
|
l.bridgeFailed();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public String getRemoteAddress() {
|
||||||
|
return remoteBroker.getRemoteAddress();
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getLocalAddress() {
|
||||||
|
return localBroker.getRemoteAddress();
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getRemoteBrokerName() {
|
||||||
|
return remoteBrokerInfo == null ? null : remoteBrokerInfo.getBrokerName();
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getLocalBrokerName() {
|
||||||
|
return localBrokerInfo == null ? null : localBrokerInfo.getBrokerName();
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getDequeueCounter() {
|
||||||
|
return dequeueCounter.get();
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getEnqueueCounter() {
|
||||||
|
return enqueueCounter.get();
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -186,7 +186,7 @@ public class DiscoveryNetworkConnector extends NetworkConnector implements Disco
|
||||||
}
|
}
|
||||||
|
|
||||||
protected NetworkBridge createBridge(Transport localTransport, Transport remoteTransport, final DiscoveryEvent event) {
|
protected NetworkBridge createBridge(Transport localTransport, Transport remoteTransport, final DiscoveryEvent event) {
|
||||||
NetworkBridgeFailedListener listener = new NetworkBridgeFailedListener() {
|
NetworkBridgeListener listener = new NetworkBridgeListener() {
|
||||||
|
|
||||||
public void bridgeFailed(){
|
public void bridgeFailed(){
|
||||||
if( !serviceSupport.isStopped() ) {
|
if( !serviceSupport.isStopped() ) {
|
||||||
|
@ -198,6 +198,15 @@ public class DiscoveryNetworkConnector extends NetworkConnector implements Disco
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void onStart(NetworkBridge bridge) {
|
||||||
|
registerNetworkBridgeMBean(bridge);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void onStop(NetworkBridge bridge) {
|
||||||
|
unregisterNetworkBridgeMBean(bridge);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
};
|
};
|
||||||
DemandForwardingBridge result = NetworkBridgeFactory.createBridge(this,localTransport,remoteTransport,listener);
|
DemandForwardingBridge result = NetworkBridgeFactory.createBridge(this,localTransport,remoteTransport,listener);
|
||||||
return configureBridge(result);
|
return configureBridge(result);
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
package org.apache.activemq.network;
|
package org.apache.activemq.network;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
import org.apache.activemq.Service;
|
import org.apache.activemq.Service;
|
||||||
import org.apache.activemq.command.ActiveMQQueue;
|
import org.apache.activemq.command.ActiveMQQueue;
|
||||||
|
@ -46,6 +47,7 @@ import org.apache.activemq.util.ServiceSupport;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Forwards all messages from the local broker to the remote broker.
|
* Forwards all messages from the local broker to the remote broker.
|
||||||
*
|
*
|
||||||
|
@ -74,7 +76,13 @@ public class ForwardingBridge implements Service{
|
||||||
|
|
||||||
BrokerId localBrokerId;
|
BrokerId localBrokerId;
|
||||||
BrokerId remoteBrokerId;
|
BrokerId remoteBrokerId;
|
||||||
private NetworkBridgeFailedListener bridgeFailedListener;
|
private NetworkBridgeListener bridgeFailedListener;
|
||||||
|
|
||||||
|
BrokerInfo localBrokerInfo;
|
||||||
|
BrokerInfo remoteBrokerInfo;
|
||||||
|
|
||||||
|
final AtomicLong enqueueCounter = new AtomicLong();
|
||||||
|
final AtomicLong dequeueCounter = new AtomicLong();
|
||||||
|
|
||||||
public ForwardingBridge(Transport localBroker, Transport remoteBroker) {
|
public ForwardingBridge(Transport localBroker, Transport remoteBroker) {
|
||||||
this.localBroker = localBroker;
|
this.localBroker = localBroker;
|
||||||
|
@ -187,7 +195,8 @@ public class ForwardingBridge implements Service{
|
||||||
try {
|
try {
|
||||||
if(command.isBrokerInfo() ) {
|
if(command.isBrokerInfo() ) {
|
||||||
synchronized( this ) {
|
synchronized( this ) {
|
||||||
remoteBrokerId = ((BrokerInfo)command).getBrokerId();
|
remoteBrokerInfo = ((BrokerInfo)command);
|
||||||
|
remoteBrokerId = remoteBrokerInfo.getBrokerId();
|
||||||
if( localBrokerId !=null) {
|
if( localBrokerId !=null) {
|
||||||
if( localBrokerId.equals(remoteBrokerId) ) {
|
if( localBrokerId.equals(remoteBrokerId) ) {
|
||||||
log.info("Disconnecting loop back connection.");
|
log.info("Disconnecting loop back connection.");
|
||||||
|
@ -213,6 +222,9 @@ public class ForwardingBridge implements Service{
|
||||||
protected void serviceLocalCommand(Command command) {
|
protected void serviceLocalCommand(Command command) {
|
||||||
try {
|
try {
|
||||||
if( command.isMessageDispatch() ) {
|
if( command.isMessageDispatch() ) {
|
||||||
|
|
||||||
|
enqueueCounter.incrementAndGet();
|
||||||
|
|
||||||
final MessageDispatch md = (MessageDispatch) command;
|
final MessageDispatch md = (MessageDispatch) command;
|
||||||
Message message = md.getMessage();
|
Message message = md.getMessage();
|
||||||
message.setProducerId(producerInfo.getProducerId());
|
message.setProducerId(producerInfo.getProducerId());
|
||||||
|
@ -223,10 +235,10 @@ public class ForwardingBridge implements Service{
|
||||||
|
|
||||||
|
|
||||||
if( !message.isResponseRequired() ) {
|
if( !message.isResponseRequired() ) {
|
||||||
|
|
||||||
// If the message was originally sent using async send, we will preserve that QOS
|
// If the message was originally sent using async send, we will preserve that QOS
|
||||||
// by bridging it using an async send (small chance of message loss).
|
// by bridging it using an async send (small chance of message loss).
|
||||||
remoteBroker.oneway(message);
|
remoteBroker.oneway(message);
|
||||||
|
dequeueCounter.incrementAndGet();
|
||||||
localBroker.oneway(new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,1));
|
localBroker.oneway(new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,1));
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
|
@ -241,6 +253,7 @@ public class ForwardingBridge implements Service{
|
||||||
ExceptionResponse er=(ExceptionResponse) response;
|
ExceptionResponse er=(ExceptionResponse) response;
|
||||||
serviceLocalException(er.getException());
|
serviceLocalException(er.getException());
|
||||||
} else {
|
} else {
|
||||||
|
dequeueCounter.incrementAndGet();
|
||||||
localBroker.oneway(new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,1));
|
localBroker.oneway(new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,1));
|
||||||
}
|
}
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
|
@ -273,7 +286,8 @@ public class ForwardingBridge implements Service{
|
||||||
// }
|
// }
|
||||||
} else if(command.isBrokerInfo() ) {
|
} else if(command.isBrokerInfo() ) {
|
||||||
synchronized( this ) {
|
synchronized( this ) {
|
||||||
localBrokerId = ((BrokerInfo)command).getBrokerId();
|
localBrokerInfo = ((BrokerInfo)command);
|
||||||
|
localBrokerId = localBrokerInfo.getBrokerId();
|
||||||
if( remoteBrokerId !=null) {
|
if( remoteBrokerId !=null) {
|
||||||
if( remoteBrokerId.equals(localBrokerId) ) {
|
if( remoteBrokerId.equals(localBrokerId) ) {
|
||||||
log.info("Disconnecting loop back connection.");
|
log.info("Disconnecting loop back connection.");
|
||||||
|
@ -320,14 +334,39 @@ public class ForwardingBridge implements Service{
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public void setNetworkBridgeFailedListener(NetworkBridgeFailedListener listener){
|
public void setNetworkBridgeFailedListener(NetworkBridgeListener listener){
|
||||||
this.bridgeFailedListener=listener;
|
this.bridgeFailedListener=listener;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void fireBridgeFailed() {
|
private void fireBridgeFailed() {
|
||||||
NetworkBridgeFailedListener l = this.bridgeFailedListener;
|
NetworkBridgeListener l = this.bridgeFailedListener;
|
||||||
if (l!=null) {
|
if (l!=null) {
|
||||||
l.bridgeFailed();
|
l.bridgeFailed();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public String getRemoteAddress() {
|
||||||
|
return remoteBroker.getRemoteAddress();
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getLocalAddress() {
|
||||||
|
return localBroker.getRemoteAddress();
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getLocalBrokerName() {
|
||||||
|
return localBrokerInfo == null ? null : localBrokerInfo.getBrokerName();
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getRemoteBrokerName() {
|
||||||
|
return remoteBrokerInfo == null ? null : remoteBrokerInfo.getBrokerName();
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getDequeueCounter() {
|
||||||
|
return dequeueCounter.get();
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getEnqueueCounter() {
|
||||||
|
return enqueueCounter.get();
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -43,5 +43,18 @@ public interface NetworkBridge extends Service {
|
||||||
* Set the NetworkBridgeFailedListener
|
* Set the NetworkBridgeFailedListener
|
||||||
* @param listener
|
* @param listener
|
||||||
*/
|
*/
|
||||||
public void setNetworkBridgeFailedListener(NetworkBridgeFailedListener listener);
|
public void setNetworkBridgeListener(NetworkBridgeListener listener);
|
||||||
|
|
||||||
|
|
||||||
|
public String getRemoteAddress();
|
||||||
|
|
||||||
|
public String getRemoteBrokerName();
|
||||||
|
|
||||||
|
public String getLocalAddress();
|
||||||
|
|
||||||
|
public String getLocalBrokerName();
|
||||||
|
|
||||||
|
public long getEnqueueCounter();
|
||||||
|
|
||||||
|
public long getDequeueCounter();
|
||||||
}
|
}
|
||||||
|
|
|
@ -31,7 +31,8 @@ public class NetworkBridgeFactory{
|
||||||
* @param remoteTransport
|
* @param remoteTransport
|
||||||
* @return the NetworkBridge
|
* @return the NetworkBridge
|
||||||
*/
|
*/
|
||||||
public static DemandForwardingBridge createBridge(NetworkBridgeConfiguration config,Transport localTransport,
|
public static DemandForwardingBridge createBridge(
|
||||||
|
NetworkBridgeConfiguration config, Transport localTransport,
|
||||||
Transport remoteTransport) {
|
Transport remoteTransport) {
|
||||||
return createBridge(config, localTransport, remoteTransport, null);
|
return createBridge(config, localTransport, remoteTransport, null);
|
||||||
}
|
}
|
||||||
|
@ -45,20 +46,24 @@ public class NetworkBridgeFactory{
|
||||||
* @param listener
|
* @param listener
|
||||||
* @return the NetworkBridge
|
* @return the NetworkBridge
|
||||||
*/
|
*/
|
||||||
public static DemandForwardingBridge createBridge(NetworkBridgeConfiguration configuration,Transport localTransport,
|
public static DemandForwardingBridge createBridge(
|
||||||
Transport remoteTransport,NetworkBridgeFailedListener listener){
|
NetworkBridgeConfiguration configuration, Transport localTransport,
|
||||||
|
Transport remoteTransport, final NetworkBridgeListener listener) {
|
||||||
DemandForwardingBridge result = null;
|
DemandForwardingBridge result = null;
|
||||||
if (configuration.isConduitSubscriptions()) {
|
if (configuration.isConduitSubscriptions()) {
|
||||||
if (configuration.isDynamicOnly()) {
|
if (configuration.isDynamicOnly()) {
|
||||||
result=new ConduitBridge(configuration,localTransport,remoteTransport);
|
result = new ConduitBridge(configuration, localTransport,
|
||||||
|
remoteTransport);
|
||||||
} else {
|
} else {
|
||||||
result=new DurableConduitBridge(configuration,localTransport,remoteTransport);
|
result = new DurableConduitBridge(configuration,
|
||||||
|
localTransport, remoteTransport);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
result=new DemandForwardingBridge(configuration,localTransport,remoteTransport);
|
result = new DemandForwardingBridge(configuration, localTransport,
|
||||||
|
remoteTransport);
|
||||||
}
|
}
|
||||||
if (listener != null) {
|
if (listener != null) {
|
||||||
result.setNetworkBridgeFailedListener(listener);
|
result.setNetworkBridgeListener(listener);
|
||||||
}
|
}
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,11 +24,24 @@ package org.apache.activemq.network;
|
||||||
*
|
*
|
||||||
* @version $Revision: 1.1 $
|
* @version $Revision: 1.1 $
|
||||||
*/
|
*/
|
||||||
public interface NetworkBridgeFailedListener{
|
public interface NetworkBridgeListener{
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* called when the transport fails
|
* called when the transport fails
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
public void bridgeFailed();
|
public void bridgeFailed();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* called after the bridge is started.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public void onStart(NetworkBridge bridge);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* called before the bridge is stopped.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public void onStop(NetworkBridge bridge);
|
||||||
|
|
||||||
}
|
}
|
|
@ -16,13 +16,23 @@ package org.apache.activemq.network;
|
||||||
|
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.net.URISyntaxException;
|
import java.net.URISyntaxException;
|
||||||
|
import java.util.Hashtable;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.CopyOnWriteArrayList;
|
import java.util.concurrent.CopyOnWriteArrayList;
|
||||||
|
|
||||||
|
import javax.management.MBeanServer;
|
||||||
|
import javax.management.MalformedObjectNameException;
|
||||||
|
import javax.management.ObjectName;
|
||||||
|
|
||||||
import org.apache.activemq.Service;
|
import org.apache.activemq.Service;
|
||||||
|
import org.apache.activemq.broker.BrokerService;
|
||||||
|
import org.apache.activemq.broker.jmx.NetworkBridgeView;
|
||||||
|
import org.apache.activemq.broker.jmx.NetworkBridgeViewMBean;
|
||||||
import org.apache.activemq.command.ActiveMQDestination;
|
import org.apache.activemq.command.ActiveMQDestination;
|
||||||
import org.apache.activemq.transport.Transport;
|
import org.apache.activemq.transport.Transport;
|
||||||
import org.apache.activemq.transport.TransportFactory;
|
import org.apache.activemq.transport.TransportFactory;
|
||||||
|
import org.apache.activemq.util.JMXSupport;
|
||||||
import org.apache.activemq.util.ServiceStopper;
|
import org.apache.activemq.util.ServiceStopper;
|
||||||
import org.apache.activemq.util.ServiceSupport;
|
import org.apache.activemq.util.ServiceSupport;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
|
@ -40,6 +50,9 @@ public abstract class NetworkConnector extends NetworkBridgeConfiguration implem
|
||||||
private List dynamicallyIncludedDestinations=new CopyOnWriteArrayList();
|
private List dynamicallyIncludedDestinations=new CopyOnWriteArrayList();
|
||||||
private List staticallyIncludedDestinations=new CopyOnWriteArrayList();
|
private List staticallyIncludedDestinations=new CopyOnWriteArrayList();
|
||||||
protected ConnectionFilter connectionFilter;
|
protected ConnectionFilter connectionFilter;
|
||||||
|
private BrokerService brokerService;
|
||||||
|
private ObjectName objectName;
|
||||||
|
|
||||||
protected ServiceSupport serviceSupport=new ServiceSupport(){
|
protected ServiceSupport serviceSupport=new ServiceSupport(){
|
||||||
|
|
||||||
protected void doStart() throws Exception{
|
protected void doStart() throws Exception{
|
||||||
|
@ -188,4 +201,76 @@ public abstract class NetworkConnector extends NetworkBridgeConfiguration implem
|
||||||
protected void handleStop(ServiceStopper stopper) throws Exception{
|
protected void handleStop(ServiceStopper stopper) throws Exception{
|
||||||
log.info("Network Connector "+getName()+" Stopped");
|
log.info("Network Connector "+getName()+" Stopped");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public ObjectName getObjectName() {
|
||||||
|
return objectName;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setObjectName(ObjectName objectName) {
|
||||||
|
this.objectName = objectName;
|
||||||
|
}
|
||||||
|
|
||||||
|
public BrokerService getBrokerService() {
|
||||||
|
return brokerService;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setBrokerService(BrokerService brokerService) {
|
||||||
|
this.brokerService = brokerService;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void registerNetworkBridgeMBean(NetworkBridge bridge) {
|
||||||
|
if (!getBrokerService().isUseJmx())
|
||||||
|
return;
|
||||||
|
|
||||||
|
MBeanServer mbeanServer = getBrokerService().getManagementContext()
|
||||||
|
.getMBeanServer();
|
||||||
|
if (mbeanServer != null) {
|
||||||
|
NetworkBridgeViewMBean view = new NetworkBridgeView(bridge);
|
||||||
|
try {
|
||||||
|
ObjectName objectName = createNetworkBridgeObjectName(bridge);
|
||||||
|
mbeanServer.registerMBean(view, objectName);
|
||||||
|
} catch (Throwable e) {
|
||||||
|
log.debug("Network bridge could not be registered in JMX: "
|
||||||
|
+ e.getMessage(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void unregisterNetworkBridgeMBean(NetworkBridge bridge) {
|
||||||
|
if (!getBrokerService().isUseJmx())
|
||||||
|
return;
|
||||||
|
|
||||||
|
MBeanServer mbeanServer = getBrokerService().getManagementContext()
|
||||||
|
.getMBeanServer();
|
||||||
|
if (mbeanServer != null) {
|
||||||
|
try {
|
||||||
|
ObjectName objectName = createNetworkBridgeObjectName(bridge);
|
||||||
|
mbeanServer.unregisterMBean(objectName);
|
||||||
|
} catch (Throwable e) {
|
||||||
|
log.debug("Network bridge could not be unregistered in JMX: "
|
||||||
|
+ e.getMessage(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected ObjectName createNetworkBridgeObjectName(NetworkBridge bridge)
|
||||||
|
throws MalformedObjectNameException {
|
||||||
|
ObjectName connectorName = getObjectName();
|
||||||
|
Hashtable map = connectorName.getKeyPropertyList();
|
||||||
|
return new ObjectName(connectorName.getDomain()
|
||||||
|
+ ":"
|
||||||
|
+ "BrokerName="
|
||||||
|
+ JMXSupport.encodeObjectNamePart((String) map
|
||||||
|
.get("BrokerName"))
|
||||||
|
+ ","
|
||||||
|
+ "Type=NetworkBridge,"
|
||||||
|
+ "NetworkConnectorName="
|
||||||
|
+ JMXSupport.encodeObjectNamePart((String) map
|
||||||
|
.get("NetworkConnectorName"))
|
||||||
|
+ ","
|
||||||
|
+ "Name="
|
||||||
|
+ JMXSupport.encodeObjectNamePart(JMXSupport
|
||||||
|
.encodeObjectNamePart(bridge.getRemoteAddress())));
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue