Fixed Network Connection failure recovery.

http://issues.apache.org/activemq/browse/AMQ-802
http://issues.apache.org/activemq/browse/AMQ-805



git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@420723 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2006-07-11 04:46:04 +00:00
parent 75fde7165e
commit ad2546fa8d
3 changed files with 252 additions and 225 deletions

View File

@ -53,9 +53,6 @@ public class DemandForwardingBridge extends DemandForwardingBridgeSupport {
ServiceSupport.dispose(this);
}
}
if (!disposed){
triggerLocalStartBridge();
}
}
}

View File

@ -70,14 +70,14 @@ public abstract class DemandForwardingBridgeSupport implements Bridge {
protected static final Log log = LogFactory.getLog(DemandForwardingBridge.class);
protected final Transport localBroker;
protected final Transport remoteBroker;
protected IdGenerator idGenerator = new IdGenerator();
protected LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
protected final IdGenerator idGenerator = new IdGenerator();
protected final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
protected ConnectionInfo localConnectionInfo;
protected ConnectionInfo remoteConnectionInfo;
protected SessionInfo localSessionInfo;
protected ProducerInfo producerInfo;
protected String localBrokerName;
protected String remoteBrokerName;
protected String localBrokerName = "Unknown";
protected String remoteBrokerName = "Unknown";
protected String localClientId;
protected String userName;
protected String password;
@ -87,22 +87,22 @@ public abstract class DemandForwardingBridgeSupport implements Bridge {
protected String name = "bridge";
protected ConsumerInfo demandConsumerInfo;
protected int demandConsumerDispatched;
protected AtomicBoolean localBridgeStarted = new AtomicBoolean(false);
protected AtomicBoolean remoteBridgeStarted = new AtomicBoolean(false);
protected final AtomicBoolean localBridgeStarted = new AtomicBoolean(false);
protected final AtomicBoolean remoteBridgeStarted = new AtomicBoolean(false);
protected boolean disposed = false;
protected BrokerId localBrokerId;
protected ActiveMQDestination[] excludedDestinations;
protected ActiveMQDestination[] dynamicallyIncludedDestinations;
protected ActiveMQDestination[] staticallyIncludedDestinations;
protected ActiveMQDestination[] durableDestinations;
protected ConcurrentHashMap subscriptionMapByLocalId = new ConcurrentHashMap();
protected ConcurrentHashMap subscriptionMapByRemoteId = new ConcurrentHashMap();
protected final ConcurrentHashMap subscriptionMapByLocalId = new ConcurrentHashMap();
protected final ConcurrentHashMap subscriptionMapByRemoteId = new ConcurrentHashMap();
protected final BrokerId localBrokerPath[] = new BrokerId[] { null };
protected CountDownLatch startedLatch = new CountDownLatch(2);
protected boolean decreaseNetworkConsumerPriority;
protected boolean shutDown;
protected int networkTTL = 1;
protected final AtomicBoolean remoteInterupted = new AtomicBoolean(false);
protected final AtomicBoolean lastConnectSucceeded = new AtomicBoolean(false);
public DemandForwardingBridgeSupport(final Transport localBroker, final Transport remoteBroker) {
@ -111,7 +111,6 @@ public abstract class DemandForwardingBridgeSupport implements Bridge {
}
public void start() throws Exception {
log.info("Starting a network connection between "+localBroker+" and "+remoteBroker+" has been established.");
localBroker.setTransportListener(new DefaultTransportListener(){
public void onCommand(Command command){
serviceLocalCommand(command);
@ -130,16 +129,23 @@ public abstract class DemandForwardingBridgeSupport implements Bridge {
serviceRemoteException(error);
}
public synchronized void transportInterupted(){
public void transportInterupted(){
//clear any subscriptions - to try and prevent the bridge from stalling the broker
if( remoteInterupted.compareAndSet(false, true) ) {
log.warn("Outbound transport to " + remoteBrokerName + " interrupted ...");
clearDownSubscriptions();
try{
localBroker.oneway(localConnectionInfo.createRemoveCommand());
}catch(IOException e){
log.warn("Caught exception from local start",e);
}
log.debug("Outbound transport to " + remoteBrokerName + " interrupted.");
if( localBridgeStarted.get() ) {
clearDownSubscriptions();
synchronized( DemandForwardingBridgeSupport.this ) {
try{
localBroker.oneway(localConnectionInfo.createRemoveCommand());
}catch(IOException e){
log.warn("Caught exception from local start",e);
}
}
}
localBridgeStarted.set(false);
remoteBridgeStarted.set(false);
startedLatch = new CountDownLatch(2);
@ -147,35 +153,33 @@ public abstract class DemandForwardingBridgeSupport implements Bridge {
}
public synchronized void transportResumed(){
public void transportResumed(){
if( remoteInterupted.compareAndSet(true, false) ) {
//restart and static subscriptions - the consumer advisories will be replayed
log.info("Outbound transport to " + remoteBrokerName + " resumed");
// try{
// triggerLocalStartBridge();
// }catch(IOException e){
// log.warn("Caught exception from local start",e);
// }
try{
// clear out the previous connection as it may have missed some consumer advisories.
remoteBroker.oneway(remoteConnectionInfo.createRemoveCommand());
triggerRemoteStartBridge();
}catch(IOException e){
log.warn("Caught exception from remote start",e);
}
// We want to slow down false connects so that we don't get in a busy loop.
// False connects can occurr if you using SSH tunnels.
if( !lastConnectSucceeded.get() ) {
try {
log.debug("Previous connection was never fully established. Sleeping for second to avoid busy loop.");
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
lastConnectSucceeded.set(false);
log.debug("Outbound transport to " + remoteBrokerName + " resumed");
}
}
});
localBroker.start();
remoteBroker.start();
// triggerLocalStartBridge();
triggerRemoteStartBridge();
try{
triggerRemoteStartBridge();
}catch(IOException e){
log.warn("Caught exception from remote start",e);
}
}
protected void triggerLocalStartBridge() throws IOException {
@ -184,7 +188,7 @@ public abstract class DemandForwardingBridgeSupport implements Bridge {
try{
startLocalBridge();
}catch(Exception e){
log.error("Failed to start network bridge: "+e,e);
serviceLocalException(e);
}
}
};
@ -197,7 +201,7 @@ public abstract class DemandForwardingBridgeSupport implements Bridge {
try{
startRemoteBridge();
}catch(Exception e){
log.error("Failed to start network bridge: "+e,e);
serviceRemoteException(e);
}
}
};
@ -206,121 +210,109 @@ public abstract class DemandForwardingBridgeSupport implements Bridge {
protected void startLocalBridge() throws Exception {
if(localBridgeStarted.compareAndSet(false,true)){
localConnectionInfo=new ConnectionInfo();
localConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
localClientId="NC_"+remoteBrokerName+"_inbound"+name;
localConnectionInfo.setClientId(localClientId);
localConnectionInfo.setUserName(userName);
localConnectionInfo.setPassword(password);
localBroker.oneway(localConnectionInfo);
localSessionInfo=new SessionInfo(localConnectionInfo,1);
localBroker.oneway(localSessionInfo);
log.info("Network connection between "+localBroker+" and "+remoteBroker+"("+remoteBrokerName
+") has been established.");
startedLatch.countDown();
setupStaticDestinations();
synchronized( this ) {
localConnectionInfo=new ConnectionInfo();
localConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
localClientId="NC_"+remoteBrokerName+"_inbound"+name;
localConnectionInfo.setClientId(localClientId);
localConnectionInfo.setUserName(userName);
localConnectionInfo.setPassword(password);
localBroker.oneway(localConnectionInfo);
localSessionInfo=new SessionInfo(localConnectionInfo,1);
localBroker.oneway(localSessionInfo);
log.info("Network connection between "+localBroker+" and "+remoteBroker+"("+remoteBrokerName
+") has been established.");
startedLatch.countDown();
setupStaticDestinations();
}
}
}
protected void startRemoteBridge() throws Exception {
if(remoteBridgeStarted.compareAndSet(false,true)){
if(remoteBridgeStarted.compareAndSet(false,true)) {
synchronized (this) {
if( remoteConnectionInfo!=null ) {
remoteBroker.oneway(remoteConnectionInfo.createRemoveCommand());
}
remoteConnectionInfo=new ConnectionInfo();
remoteConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
remoteConnectionInfo.setClientId("NC_"+localBrokerName+"_outbound"+name);
remoteConnectionInfo.setUserName(userName);
remoteConnectionInfo.setPassword(password);
remoteBroker.oneway(remoteConnectionInfo);
remoteConnectionInfo=new ConnectionInfo();
remoteConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
remoteConnectionInfo.setClientId("NC_"+localBrokerName+"_outbound"+name);
remoteConnectionInfo.setUserName(userName);
remoteConnectionInfo.setPassword(password);
remoteBroker.oneway(remoteConnectionInfo);
BrokerInfo brokerInfo=new BrokerInfo();
brokerInfo.setBrokerName(localBrokerName);
remoteBroker.oneway(brokerInfo);
BrokerInfo brokerInfo=new BrokerInfo();
brokerInfo.setBrokerName(localBrokerName);
remoteBroker.oneway(brokerInfo);
SessionInfo remoteSessionInfo=new SessionInfo(remoteConnectionInfo,1);
remoteBroker.oneway(remoteSessionInfo);
SessionInfo remoteSessionInfo=new SessionInfo(remoteConnectionInfo,1);
remoteBroker.oneway(remoteSessionInfo);
producerInfo=new ProducerInfo(remoteSessionInfo,1);
producerInfo.setResponseRequired(false);
remoteBroker.oneway(producerInfo);
producerInfo=new ProducerInfo(remoteSessionInfo,1);
producerInfo.setResponseRequired(false);
remoteBroker.oneway(producerInfo);
// Listen to consumer advisory messages on the remote broker to determine demand.
demandConsumerInfo=new ConsumerInfo(remoteSessionInfo,1);
demandConsumerInfo.setDispatchAsync(dispatchAsync);
demandConsumerInfo.setDestination(new ActiveMQTopic(AdvisorySupport.CONSUMER_ADVISORY_TOPIC_PREFIX
+destinationFilter));
demandConsumerInfo.setPrefetchSize(prefetchSize);
remoteBroker.oneway(demandConsumerInfo);
//we want information about Destinations as well
ConsumerInfo destinationInfo = new ConsumerInfo(remoteSessionInfo,2);
destinationInfo.setDestination(AdvisorySupport.TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC);
destinationInfo.setPrefetchSize(prefetchSize);
remoteBroker.oneway(destinationInfo);
startedLatch.countDown();
// Listen to consumer advisory messages on the remote broker to determine demand.
demandConsumerInfo=new ConsumerInfo(remoteSessionInfo,1);
demandConsumerInfo.setDispatchAsync(dispatchAsync);
demandConsumerInfo.setDestination(new ActiveMQTopic(AdvisorySupport.CONSUMER_ADVISORY_TOPIC_PREFIX
+destinationFilter));
demandConsumerInfo.setPrefetchSize(prefetchSize);
remoteBroker.oneway(demandConsumerInfo);
//we want information about Destinations as well
ConsumerInfo destinationInfo = new ConsumerInfo(remoteSessionInfo,2);
destinationInfo.setDestination(AdvisorySupport.TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC);
destinationInfo.setPrefetchSize(prefetchSize);
remoteBroker.oneway(destinationInfo);
startedLatch.countDown();
if (!disposed){
triggerLocalStartBridge();
}
}
}
}
public void stop() throws Exception {
shutDown = true;
doStop();
}
/**
* stop the bridge
* @throws Exception
*/
protected void doStop() throws Exception {
log.debug(" stopping "+localBrokerName+ " bridge to " + remoteBrokerName + " is disposed already ? "+disposed);
if(!disposed){
try{
disposed=true;
remoteBridgeStarted.set(false);
if(!shutDown){
remoteBroker.oneway(new ShutdownInfo());
if(localConnectionInfo!=null){
localBroker.oneway(localConnectionInfo.createRemoveCommand());
remoteBroker.oneway(remoteConnectionInfo.createRemoveCommand());
}
localBroker.oneway(new ShutdownInfo());
}
localBroker.setTransportListener(null);
remoteBroker.setTransportListener(null);
}catch(IOException e){
log.debug("Caught exception stopping",e);
}finally{
ServiceStopper ss=new ServiceStopper();
ss.stop(localBroker);
ss.stop(remoteBroker);
ss.throwFirstException();
}
}
if (!disposed) {
try {
disposed = true;
remoteBridgeStarted.set(false);
localBroker.oneway(new ShutdownInfo());
remoteBroker.oneway(new ShutdownInfo());
} catch (IOException e) {
log.debug("Caught exception stopping", e);
} finally {
ServiceStopper ss = new ServiceStopper();
ss.stop(localBroker);
ss.stop(remoteBroker);
ss.throwFirstException();
}
}
log.debug(localBrokerName+ " bridge to " + remoteBrokerName + " stopped");
}
protected void doStopLocal(){
try{
if(!shutDown){
if(localConnectionInfo!=null){
localBroker.oneway(localConnectionInfo.createRemoveCommand());
}
localBroker.oneway(new ShutdownInfo());
}
localBroker.setTransportListener(null);
}catch(IOException e){
log.debug("Caught exception stopping",e);
}finally{
ServiceStopper ss=new ServiceStopper();
ss.stop(localBroker);
localBridgeStarted.set(false);
}
}
protected void serviceRemoteException(Throwable error) {
log.info("Network connection between "+localBroker+" and "+remoteBroker+" shutdown: "+error.getMessage(),error);
ServiceSupport.dispose(this);
if( !disposed ) {
log.info("Network connection between "+localBroker+" and "+remoteBroker+" shutdown due to a remote error: "+error);
log.debug("The remote Exception was: "+error, error);
new Thread() {
public void run() {
ServiceSupport.dispose(DemandForwardingBridgeSupport.this);
}
}.start();
}
}
protected void serviceRemoteCommand(Command command) {
@ -336,7 +328,10 @@ public abstract class DemandForwardingBridgeSupport implements Bridge {
demandConsumerDispatched=0;
}
}else if(command.isBrokerInfo()){
serviceRemoteBrokerInfo(command);
lastConnectSucceeded.set(true);
serviceRemoteBrokerInfo(command);
}else if(command.getClass() == ConnectionError.class ) {
ConnectionError ce = (ConnectionError) command;
serviceRemoteException(ce.getException());
@ -344,6 +339,7 @@ public abstract class DemandForwardingBridgeSupport implements Bridge {
switch(command.getDataStructureType()){
case KeepAliveInfo.DATA_STRUCTURE_TYPE:
case WireFormatInfo.DATA_STRUCTURE_TYPE:
case ShutdownInfo.DATA_STRUCTURE_TYPE:
break;
default:
log.warn("Unexpected remote command: "+command);
@ -413,7 +409,10 @@ public abstract class DemandForwardingBridgeSupport implements Bridge {
ActiveMQTempDestination tempDest = (ActiveMQTempDestination) destInfo.getDestination();
tempDest.setConnectionId(localSessionInfo.getSessionId().getConnectionId());
}
destInfo.setBrokerPath(appendToBrokerPath(destInfo.getBrokerPath(),getRemoteBrokerPath()));
log.debug("Replying destination control command: "+destInfo);
localBroker.oneway(destInfo);
}
@ -424,8 +423,15 @@ public abstract class DemandForwardingBridgeSupport implements Bridge {
}
protected void serviceLocalException(Throwable error) {
log.info("Network connection between "+localBroker+" and "+remoteBroker+" shutdown: "+error.getMessage(),error);
ServiceSupport.dispose(this);
if( !disposed ) {
log.info("Network connection between "+localBroker+" and "+remoteBroker+" shutdown due to a local error: "+error);
log.debug("The local Exception was:"+error,error);
new Thread() {
public void run() {
ServiceSupport.dispose(DemandForwardingBridgeSupport.this);
}
}.start();
}
}
protected void addSubscription(DemandSubscription sub) throws IOException {
@ -502,16 +508,6 @@ public abstract class DemandForwardingBridgeSupport implements Bridge {
remoteBroker.asyncRequest(message, callback);
}
// Ack on every message since we don't know if the broker is blocked due to memory
// usage and is waiting for an Ack to un-block him.
// Acking a range is more efficient, but also more prone to locking up a server
// Perhaps doing something like the following should be policy based.
// int dispatched = sub.incrementDispatched();
// if(dispatched>(sub.getLocalInfo().getPrefetchSize()*.75)){
// localBroker.oneway(new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,dispatched));
// sub.setDispatched(0);
// }
}
}else if(command.isBrokerInfo()){
serviceLocalBrokerInfo(command);
@ -521,8 +517,7 @@ public abstract class DemandForwardingBridgeSupport implements Bridge {
// the local transport is just shutting down temporarily until the remote side
// is restored.
if( !remoteInterupted.get() ) {
shutDown = true;
doStop();
stop();
}
}else if(command.getClass() == ConnectionError.class ) {
ConnectionError ce = (ConnectionError) command;
@ -695,21 +690,7 @@ public abstract class DemandForwardingBridgeSupport implements Bridge {
public void setNetworkTTL(int networkTTL) {
this.networkTTL=networkTTL;
}
/**
* @return Returns the shutDown.
*/
public boolean isShutDown() {
return shutDown;
}
/**
* @param shutDown The shutDown to set.
*/
public void setShutDown(boolean shutDown) {
this.shutDown=shutDown;
}
public static boolean contains(BrokerId[] brokerPath, BrokerId brokerId) {
if(brokerPath!=null){
for(int i=0;i<brokerPath.length;i++){
@ -843,7 +824,8 @@ public abstract class DemandForwardingBridgeSupport implements Bridge {
}
protected void clearDownSubscriptions() {
subscriptionMapByLocalId.clear();
subscriptionMapByRemoteId.clear();
}
protected abstract NetworkBridgeFilter createNetworkBridgeFilter(ConsumerInfo info) throws IOException;

View File

@ -16,9 +16,11 @@
*/
package org.apache.activemq.network;
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Iterator;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.DiscoveryEvent;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFactory;
@ -28,10 +30,7 @@ import org.apache.activemq.transport.discovery.DiscoveryListener;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.util.ServiceSupport;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Iterator;
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
/**
* A network connector which uses a discovery agent to detect the remote brokers
@ -58,6 +57,11 @@ public class DiscoveryNetworkConnector extends NetworkConnector implements Disco
}
public void onServiceAdd(DiscoveryEvent event) {
// Ignore events once we start stopping.
if( isStopped() || isStopping() )
return;
String url = event.getServiceName();
if (url != null) {
@ -80,7 +84,7 @@ public class DiscoveryNetworkConnector extends NetworkConnector implements Disco
URI connectUri = uri;
if (failover) {
try {
connectUri = new URI("failover:" + connectUri);
connectUri = new URI("failover:(" + connectUri+")?maxReconnectDelay=1000");
}
catch (URISyntaxException e) {
log.warn("Could not create failover URI: " + connectUri);
@ -90,22 +94,24 @@ public class DiscoveryNetworkConnector extends NetworkConnector implements Disco
log.info("Establishing network connection between from " + localURI + " to " + connectUri);
Transport localTransport;
try {
localTransport = createLocalTransport();
}
catch (Exception e) {
log.warn("Could not connect to local URI: " + localURI + ": " + e, e);
return;
}
Transport remoteTransport;
try {
remoteTransport = TransportFactory.connect(connectUri);
}
catch (Exception e) {
ServiceSupport.dispose(localTransport);
log.warn("Could not connect to remote URI: " + connectUri + ": " + e, e);
log.warn("Could not connect to remote URI: " + localURI + ": " + e.getMessage());
log.debug("Connection failure exception: "+ e, e);
return;
}
Transport localTransport;
try {
localTransport = createLocalTransport();
}
catch (Exception e) {
ServiceSupport.dispose(remoteTransport);
log.warn("Could not connect to local URI: " + localURI + ": " + e.getMessage());
log.debug("Connection failure exception: "+ e, e);
return;
}
@ -117,7 +123,13 @@ public class DiscoveryNetworkConnector extends NetworkConnector implements Disco
catch (Exception e) {
ServiceSupport.dispose(localTransport);
ServiceSupport.dispose(remoteTransport);
log.warn("Could not start network bridge between: " + localURI + " and: " + uri + " due to: " + e, e);
log.warn("Could not start network bridge between: " + localURI + " and: " + uri + " due to: " + e);
log.debug("Start failure exception: "+ e, e);
try {
discoveryAgent.serviceFailed(event);
} catch (IOException e1) {
}
return;
}
}
@ -196,45 +208,81 @@ public class DiscoveryNetworkConnector extends NetworkConnector implements Disco
if (conduitSubscriptions) {
if (dynamicOnly) {
result = new ConduitBridge(localTransport, remoteTransport) {
protected void serviceRemoteException(Exception error) {
super.serviceRemoteException(error);
try {
// Notify the discovery agent that the remote broker
// failed.
discoveryAgent.serviceFailed(event);
}
catch (IOException e) {
}
}
protected void serviceLocalException(Throwable error) {
try {
super.serviceLocalException(error);
} finally {
fireServiceFailed();
}
}
protected void serviceRemoteException(Throwable error) {
try {
super.serviceRemoteException(error);
} finally {
fireServiceFailed();
}
}
public void fireServiceFailed() {
if( !isStopped() ) {
try {
discoveryAgent.serviceFailed(event);
} catch (IOException e) {
}
}
}
};
}
else {
result = new DurableConduitBridge(localTransport, remoteTransport) {
protected void serviceRemoteException(Exception error) {
super.serviceRemoteException(error);
try {
// Notify the discovery agent that the remote broker
// failed.
discoveryAgent.serviceFailed(event);
}
catch (IOException e) {
}
}
protected void serviceLocalException(Throwable error) {
try {
super.serviceLocalException(error);
} finally {
fireServiceFailed();
}
}
protected void serviceRemoteException(Throwable error) {
try {
super.serviceRemoteException(error);
} finally {
fireServiceFailed();
}
}
public void fireServiceFailed() {
if( !isStopped() ) {
try {
discoveryAgent.serviceFailed(event);
} catch (IOException e) {
}
}
}
};
}
}
else {
result = new DemandForwardingBridge(localTransport, remoteTransport) {
protected void serviceRemoteException(Exception error) {
super.serviceRemoteException(error);
try {
// Notify the discovery agent that the remote broker
// failed.
discoveryAgent.serviceFailed(event);
}
catch (IOException e) {
}
}
result = new DemandForwardingBridge(localTransport, remoteTransport) {
protected void serviceLocalException(Throwable error) {
try {
super.serviceLocalException(error);
} finally {
fireServiceFailed();
}
}
protected void serviceRemoteException(Throwable error) {
try {
super.serviceRemoteException(error);
} finally {
fireServiceFailed();
}
}
public void fireServiceFailed() {
if( !isStopped() ) {
try {
discoveryAgent.serviceFailed(event);
} catch (IOException e) {
}
}
}
};
}
return configureBridge(result);