mirror of https://github.com/apache/activemq.git
The QueueMasterSlaveTest and TopicMasterSlaveTest were hanging on on teardown of the test case due us issuing a transport request() that never returned. Converterted to just issuing a oneway and it seems to have fixed the issue.
git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@376332 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
28ca7515c9
commit
4f6e90108f
|
@ -26,7 +26,6 @@ import org.apache.activemq.command.Command;
|
||||||
import org.apache.activemq.command.CommandTypes;
|
import org.apache.activemq.command.CommandTypes;
|
||||||
import org.apache.activemq.command.ConnectionId;
|
import org.apache.activemq.command.ConnectionId;
|
||||||
import org.apache.activemq.command.ConnectionInfo;
|
import org.apache.activemq.command.ConnectionInfo;
|
||||||
import org.apache.activemq.command.ExceptionResponse;
|
|
||||||
import org.apache.activemq.command.MessageDispatch;
|
import org.apache.activemq.command.MessageDispatch;
|
||||||
import org.apache.activemq.command.ProducerInfo;
|
import org.apache.activemq.command.ProducerInfo;
|
||||||
import org.apache.activemq.command.Response;
|
import org.apache.activemq.command.Response;
|
||||||
|
@ -59,6 +58,7 @@ public class MasterConnector implements Service{
|
||||||
private Transport remoteBroker;
|
private Transport remoteBroker;
|
||||||
private TransportConnector connector;
|
private TransportConnector connector;
|
||||||
private AtomicBoolean masterActive=new AtomicBoolean(false);
|
private AtomicBoolean masterActive=new AtomicBoolean(false);
|
||||||
|
private AtomicBoolean started=new AtomicBoolean(false);
|
||||||
IdGenerator idGenerator=new IdGenerator();
|
IdGenerator idGenerator=new IdGenerator();
|
||||||
|
|
||||||
ConnectionInfo connectionInfo;
|
ConnectionInfo connectionInfo;
|
||||||
|
@ -75,7 +75,10 @@ public class MasterConnector implements Service{
|
||||||
}
|
}
|
||||||
|
|
||||||
public void start() throws Exception{
|
public void start() throws Exception{
|
||||||
|
if( !started.compareAndSet(false, true) ) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
localBroker=TransportFactory.connect(localURI);
|
localBroker=TransportFactory.connect(localURI);
|
||||||
remoteBroker=TransportFactory.connect(remoteURI);
|
remoteBroker=TransportFactory.connect(remoteURI);
|
||||||
log.info("Starting a network connection between "+localBroker+" and "+remoteBroker+" has been established.");
|
log.info("Starting a network connection between "+localBroker+" and "+remoteBroker+" has been established.");
|
||||||
|
@ -84,16 +87,22 @@ public class MasterConnector implements Service{
|
||||||
public void onCommand(Command command){
|
public void onCommand(Command command){
|
||||||
}
|
}
|
||||||
public void onException(IOException error){
|
public void onException(IOException error){
|
||||||
serviceLocalException(error);
|
if( started.get() ) {
|
||||||
|
serviceLocalException(error);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
remoteBroker.setTransportListener(new TransportListener(){
|
remoteBroker.setTransportListener(new TransportListener(){
|
||||||
public void onCommand(Command command){
|
public void onCommand(Command command){
|
||||||
serviceRemoteCommand(command);
|
if( started.get() ) {
|
||||||
|
serviceRemoteCommand(command);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
public void onException(IOException error){
|
public void onException(IOException error){
|
||||||
serviceRemoteException(error);
|
if( started.get() ) {
|
||||||
|
serviceRemoteException(error);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -145,14 +154,18 @@ public class MasterConnector implements Service{
|
||||||
log.info("Slave connection between "+localBroker+" and "+remoteBroker+" has been established.");
|
log.info("Slave connection between "+localBroker+" and "+remoteBroker+" has been established.");
|
||||||
}
|
}
|
||||||
|
|
||||||
public void stop() throws Exception{
|
public void stop() throws Exception{
|
||||||
|
if( !started.compareAndSet(true, false) ) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
masterActive.set(false);
|
masterActive.set(false);
|
||||||
try{
|
try{
|
||||||
if (connectionInfo!=null){
|
// if (connectionInfo!=null){
|
||||||
localBroker.request(connectionInfo.createRemoveCommand());
|
// localBroker.request(connectionInfo.createRemoveCommand());
|
||||||
}
|
// }
|
||||||
localBroker.setTransportListener(null);
|
// localBroker.setTransportListener(null);
|
||||||
remoteBroker.setTransportListener(null);
|
// remoteBroker.setTransportListener(null);
|
||||||
remoteBroker.oneway(new ShutdownInfo());
|
remoteBroker.oneway(new ShutdownInfo());
|
||||||
localBroker.oneway(new ShutdownInfo());
|
localBroker.oneway(new ShutdownInfo());
|
||||||
}catch(IOException e){
|
}catch(IOException e){
|
||||||
|
|
|
@ -296,7 +296,7 @@ public class RegionBroker implements Broker {
|
||||||
|
|
||||||
public void send(ConnectionContext context, Message message) throws Throwable {
|
public void send(ConnectionContext context, Message message) throws Throwable {
|
||||||
message.getMessageId().setBrokerSequenceId(sequenceGenerator.getNextSequenceId());
|
message.getMessageId().setBrokerSequenceId(sequenceGenerator.getNextSequenceId());
|
||||||
if (message.getTimestamp() > 0 && (message.getBrokerPath() == null | message.getBrokerPath().length == 0)) {
|
if (message.getTimestamp() > 0 && (message.getBrokerPath() == null || message.getBrokerPath().length == 0)) {
|
||||||
//timestamp not been disabled and has not passed through a network
|
//timestamp not been disabled and has not passed through a network
|
||||||
message.setTimestamp(System.currentTimeMillis());
|
message.setTimestamp(System.currentTimeMillis());
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue