mirror of https://github.com/apache/activemq.git
git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@493696 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
e76feb6b8e
commit
e349089a8d
|
@ -59,6 +59,7 @@ import org.apache.activemq.util.ServiceSupport;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
|
||||||
|
import java.security.GeneralSecurityException;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
@ -139,7 +140,7 @@ public abstract class DemandForwardingBridgeSupport implements Bridge {
|
||||||
//clear any subscriptions - to try and prevent the bridge from stalling the broker
|
//clear any subscriptions - to try and prevent the bridge from stalling the broker
|
||||||
if( remoteInterupted.compareAndSet(false, true) ) {
|
if( remoteInterupted.compareAndSet(false, true) ) {
|
||||||
|
|
||||||
log.debug("Outbound transport to " + remoteBrokerName + " interrupted.");
|
log.info("Outbound transport to " + remoteBrokerName + " interrupted.");
|
||||||
|
|
||||||
if( localBridgeStarted.get() ) {
|
if( localBridgeStarted.get() ) {
|
||||||
clearDownSubscriptions();
|
clearDownSubscriptions();
|
||||||
|
@ -180,7 +181,7 @@ public abstract class DemandForwardingBridgeSupport implements Bridge {
|
||||||
startLocalBridge();
|
startLocalBridge();
|
||||||
remoteBridgeStarted.set(true);
|
remoteBridgeStarted.set(true);
|
||||||
startedLatch.countDown();
|
startedLatch.countDown();
|
||||||
log.debug("Outbound transport to " + remoteBrokerName + " resumed");
|
log.info("Outbound transport to " + remoteBrokerName + " resumed");
|
||||||
}catch(Exception e) {
|
}catch(Exception e) {
|
||||||
log.error("Caught exception from local start in resume transport",e );
|
log.error("Caught exception from local start in resume transport",e );
|
||||||
}
|
}
|
||||||
|
@ -297,39 +298,48 @@ public abstract class DemandForwardingBridgeSupport implements Bridge {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void stop() throws Exception {
|
public void stop() throws Exception{
|
||||||
log.debug(" stopping "+localBrokerName+ " bridge to " + remoteBrokerName + " is disposed already ? "+disposed);
|
log.debug(" stopping "+localBrokerName+" bridge to "+remoteBrokerName+" is disposed already ? "+disposed);
|
||||||
if (!disposed) {
|
boolean wasDisposedAlready=disposed;
|
||||||
try {
|
if(!disposed){
|
||||||
disposed = true;
|
try{
|
||||||
|
disposed=true;
|
||||||
remoteBridgeStarted.set(false);
|
remoteBridgeStarted.set(false);
|
||||||
|
localBroker.oneway(new ShutdownInfo());
|
||||||
localBroker.oneway(new ShutdownInfo());
|
remoteBroker.oneway(new ShutdownInfo());
|
||||||
remoteBroker.oneway(new ShutdownInfo());
|
}catch(IOException e){
|
||||||
|
log.info("Caught exception stopping",e);
|
||||||
} catch (IOException e) {
|
}finally{
|
||||||
log.debug("Caught exception stopping", e);
|
ServiceStopper ss=new ServiceStopper();
|
||||||
} finally {
|
ss.stop(localBroker);
|
||||||
ServiceStopper ss = new ServiceStopper();
|
ss.stop(remoteBroker);
|
||||||
ss.stop(localBroker);
|
ss.throwFirstException();
|
||||||
ss.stop(remoteBroker);
|
}
|
||||||
ss.throwFirstException();
|
}
|
||||||
}
|
if(wasDisposedAlready){
|
||||||
}
|
log.debug(localBrokerName+" bridge to "+remoteBrokerName+" stopped");
|
||||||
log.debug(localBrokerName+ " bridge to " + remoteBrokerName + " stopped");
|
}else{
|
||||||
|
log.info(localBrokerName+" bridge to "+remoteBrokerName+" stopped");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void serviceRemoteException(Throwable error) {
|
protected void serviceRemoteException(Throwable error){
|
||||||
if( !disposed ) {
|
if(!disposed){
|
||||||
log.info("Network connection between "+localBroker+" and "+remoteBroker+" shutdown due to a remote error: "+error);
|
if(error instanceof SecurityException||error instanceof GeneralSecurityException){
|
||||||
log.debug("The remote Exception was: "+error, error);
|
log.error("Network connection between "+localBroker+" and "+remoteBroker
|
||||||
new Thread() {
|
+" shutdown due to a remote error: "+error);
|
||||||
public void run() {
|
}else{
|
||||||
ServiceSupport.dispose(DemandForwardingBridgeSupport.this);
|
log.warn("Network connection between "+localBroker+" and "+remoteBroker
|
||||||
}
|
+" shutdown due to a remote error: "+error);
|
||||||
}.start();
|
}
|
||||||
}
|
log.debug("The remote Exception was: "+error,error);
|
||||||
|
new Thread(){
|
||||||
|
|
||||||
|
public void run(){
|
||||||
|
ServiceSupport.dispose(DemandForwardingBridgeSupport.this);
|
||||||
|
}
|
||||||
|
}.start();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void serviceRemoteCommand(Command command) {
|
protected void serviceRemoteCommand(Command command) {
|
||||||
|
|
Loading…
Reference in New Issue