applied patch from John Heitmann for AMQ-932 - many thanks!

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@449654 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
James Strachan 2006-09-25 11:26:53 +00:00
parent ef5fd15b84
commit 4b595ea6dc
2 changed files with 128 additions and 70 deletions

View File

@ -17,8 +17,6 @@
*/ */
package org.apache.activemq.broker; package org.apache.activemq.broker;
import java.io.IOException;
import org.apache.activemq.broker.ft.MasterBroker; import org.apache.activemq.broker.ft.MasterBroker;
import org.apache.activemq.command.BrokerInfo; import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.Command; import org.apache.activemq.command.Command;
@ -30,8 +28,9 @@ import org.apache.activemq.transport.Transport;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import java.io.IOException;
/** /**
*
* @version $Revision: 1.8 $ * @version $Revision: 1.8 $
*/ */
public class TransportConnection extends AbstractConnection { public class TransportConnection extends AbstractConnection {
@ -43,7 +42,9 @@ public class TransportConnection extends AbstractConnection {
private boolean blocked; private boolean blocked;
private boolean connected; private boolean connected;
private boolean active; private boolean active;
private long timeStamp=0; private boolean starting;
private boolean pendingStop;
private long timeStamp = 0;
private MasterBroker masterBroker; //used if this connection is used by a Slave private MasterBroker masterBroker; //used if this connection is used by a Slave
/** /**
@ -59,10 +60,11 @@ public class TransportConnection extends AbstractConnection {
this.transport.setTransportListener(new DefaultTransportListener() { this.transport.setTransportListener(new DefaultTransportListener() {
public void onCommand(Command command) { public void onCommand(Command command) {
Response response = service(command); Response response = service(command);
if( response!=null ) { if (response != null) {
dispatch(response); dispatch(response);
} }
} }
public void onException(IOException exception) { public void onException(IOException exception) {
serviceTransportException(exception); serviceTransportException(exception);
} }
@ -70,26 +72,48 @@ public class TransportConnection extends AbstractConnection {
connected = true; connected = true;
} }
public void start() throws Exception { public synchronized void start() throws Exception {
starting = true;
try {
transport.start(); transport.start();
active = true; active = true;
super.start(); super.start();
connector.onStarted(this); connector.onStarted(this);
} }
finally {
// stop() can be called from within the above block,
// but we want to be sure start() completes before
// stop() runs, so queue the stop until right now:
starting = false;
if (pendingStop) {
log.debug("Calling the delayed stop()");
stop();
}
}
}
public synchronized void stop() throws Exception {
// If we're in the middle of starting
// then go no further... for now.
pendingStop = true;
if (starting) {
log.debug("stop() called in the middle of start(). Delaying...");
return;
}
public void stop() throws Exception {
connector.onStopped(this); connector.onStopped(this);
try { try {
if (masterBroker != null){ if (masterBroker != null) {
masterBroker.stop(); masterBroker.stop();
} }
// If the transport has not failed yet, // If the transport has not failed yet,
// notify the peer that we are doing a normal shutdown. // notify the peer that we are doing a normal shutdown.
if( transportException == null ) { if (transportException == null) {
transport.oneway(new ShutdownInfo()); transport.oneway(new ShutdownInfo());
} }
} catch (Exception ignore) { }
catch (Exception ignore) {
//ignore.printStackTrace(); //ignore.printStackTrace();
} }
@ -102,121 +126,151 @@ public class TransportConnection extends AbstractConnection {
/** /**
* @return Returns the blockedCandidate. * @return Returns the blockedCandidate.
*/ */
public boolean isBlockedCandidate(){ public boolean isBlockedCandidate() {
return blockedCandidate; return blockedCandidate;
} }
/** /**
* @param blockedCandidate * @param blockedCandidate The blockedCandidate to set.
* The blockedCandidate to set.
*/ */
public void setBlockedCandidate(boolean blockedCandidate){ public void setBlockedCandidate(boolean blockedCandidate) {
this.blockedCandidate=blockedCandidate; this.blockedCandidate = blockedCandidate;
} }
/** /**
* @return Returns the markedCandidate. * @return Returns the markedCandidate.
*/ */
public boolean isMarkedCandidate(){ public boolean isMarkedCandidate() {
return markedCandidate; return markedCandidate;
} }
/** /**
* @param markedCandidate * @param markedCandidate The markedCandidate to set.
* The markedCandidate to set.
*/ */
public void setMarkedCandidate(boolean markedCandidate){ public void setMarkedCandidate(boolean markedCandidate) {
this.markedCandidate=markedCandidate; this.markedCandidate = markedCandidate;
if(!markedCandidate){ if (!markedCandidate) {
timeStamp=0; timeStamp = 0;
blockedCandidate=false; blockedCandidate = false;
} }
} }
/** /**
* @param slow * @param slow The slow to set.
* The slow to set.
*/ */
public void setSlow(boolean slow){ public void setSlow(boolean slow) {
this.slow=slow; this.slow = slow;
} }
/** /**
* @return true if the Connection is slow * @return true if the Connection is slow
*/ */
public boolean isSlow(){ public boolean isSlow() {
return slow; return slow;
} }
/** /**
* @return true if the Connection is potentially blocked * @return true if the Connection is potentially blocked
*/ */
public boolean isMarkedBlockedCandidate(){ public boolean isMarkedBlockedCandidate() {
return markedCandidate; return markedCandidate;
} }
/** /**
* Mark the Connection, so we can deem if it's collectable on the next sweep * Mark the Connection, so we can deem if it's collectable on the next sweep
*/ */
public void doMark(){ public void doMark() {
if(timeStamp==0){ if (timeStamp == 0) {
timeStamp=System.currentTimeMillis(); timeStamp = System.currentTimeMillis();
} }
} }
/** /**
* @return if after being marked, the Connection is still writing * @return if after being marked, the Connection is still writing
*/ */
public boolean isBlocked(){ public boolean isBlocked() {
return blocked; return blocked;
} }
/** /**
* @return true if the Connection is connected * @return true if the Connection is connected
*/ */
public boolean isConnected(){ public boolean isConnected() {
return connected; return connected;
} }
/** /**
* @param blocked * @param blocked The blocked to set.
* The blocked to set.
*/ */
public void setBlocked(boolean blocked){ public void setBlocked(boolean blocked) {
this.blocked=blocked; this.blocked = blocked;
} }
/** /**
* @param connected * @param connected The connected to set.
* The connected to set.
*/ */
public void setConnected(boolean connected){ public void setConnected(boolean connected) {
this.connected=connected; this.connected = connected;
} }
/** /**
* @return true if the Connection is active * @return true if the Connection is active
*/ */
public boolean isActive(){ public boolean isActive() {
return active; return active;
} }
/** /**
* @param active * @param active The active to set.
* The active to set.
*/ */
public void setActive(boolean active){ public void setActive(boolean active) {
this.active=active; this.active = active;
} }
public Response processBrokerInfo(BrokerInfo info){ /**
if(info.isSlaveBroker()){ * @return true if the Connection is starting
*/
public synchronized boolean isStarting() {
return starting;
}
synchronized protected void setStarting(boolean starting) {
this.starting = starting;
}
/**
* @return true if the Connection needs to stop
*/
public synchronized boolean isPendingStop() {
return pendingStop;
}
protected synchronized void setPendingStop(boolean pendingStop) {
this.pendingStop = pendingStop;
}
public Response processBrokerInfo(BrokerInfo info) {
if (info.isSlaveBroker()) {
//stream messages from this broker (the master) to //stream messages from this broker (the master) to
//the slave //the slave
MutableBrokerFilter parent=(MutableBrokerFilter) broker.getAdaptor(MutableBrokerFilter.class); MutableBrokerFilter parent = (MutableBrokerFilter) broker.getAdaptor(MutableBrokerFilter.class);
masterBroker=new MasterBroker(parent,transport); masterBroker = new MasterBroker(parent, transport);
masterBroker.startProcessing(); masterBroker.startProcessing();
log.info("Slave Broker "+info.getBrokerName()+" is attached"); log.info("Slave Broker " + info.getBrokerName() + " is attached");
} }
return super.processBrokerInfo(info); return super.processBrokerInfo(info);
} }
protected void dispatch(Command command){ protected void dispatch(Command command) {
try{ try {
setMarkedCandidate(true); setMarkedCandidate(true);
transport.oneway(command); transport.oneway(command);
getStatistics().onCommand(command); getStatistics().onCommand(command);
}catch(IOException e){ }
catch (IOException e) {
serviceException(e); serviceException(e);
}finally{ }
finally {
setMarkedCandidate(false); setMarkedCandidate(false);
} }
} }

View File

@ -58,7 +58,11 @@ public class ManagedTransportConnection extends TransportConnection {
setConnectionId(connectionId); setConnectionId(connectionId);
} }
public void stop() throws Exception { public synchronized void stop() throws Exception {
if (isStarting()) {
setPendingStop(true);
return;
}
unregisterMBean(); unregisterMBean();
super.stop(); super.stop();
} }