send back responses

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@371910 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2006-01-24 13:34:59 +00:00
parent 4b8822f154
commit fd156f833a
1 changed files with 22 additions and 13 deletions

View File

@ -26,8 +26,10 @@ import org.apache.activemq.command.Command;
import org.apache.activemq.command.CommandTypes; import org.apache.activemq.command.CommandTypes;
import org.apache.activemq.command.ConnectionId; import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.ConnectionInfo; import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ExceptionResponse;
import org.apache.activemq.command.MessageDispatch; import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.ProducerInfo; import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionInfo; import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.ShutdownInfo; import org.apache.activemq.command.ShutdownInfo;
import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.Transport;
@ -56,7 +58,7 @@ public class MasterConnector implements Service{
private Transport localBroker; private Transport localBroker;
private Transport remoteBroker; private Transport remoteBroker;
private TransportConnector connector; private TransportConnector connector;
private AtomicBoolean masterActive = new AtomicBoolean(false); private AtomicBoolean masterActive=new AtomicBoolean(false);
IdGenerator idGenerator=new IdGenerator(); IdGenerator idGenerator=new IdGenerator();
ConnectionInfo connectionInfo; ConnectionInfo connectionInfo;
@ -64,8 +66,8 @@ public class MasterConnector implements Service{
ProducerInfo producerInfo; ProducerInfo producerInfo;
public MasterConnector(BrokerService broker,TransportConnector connector){ public MasterConnector(BrokerService broker,TransportConnector connector){
this.broker = broker; this.broker=broker;
this.connector = connector; this.connector=connector;
} }
public boolean isSlave(){ public boolean isSlave(){
@ -127,13 +129,13 @@ public class MasterConnector implements Service{
producerInfo=new ProducerInfo(sessionInfo,1); producerInfo=new ProducerInfo(sessionInfo,1);
producerInfo.setResponseRequired(false); producerInfo.setResponseRequired(false);
remoteBroker.oneway(producerInfo); remoteBroker.oneway(producerInfo);
BrokerInfo brokerInfo = null; BrokerInfo brokerInfo=null;
if (connector != null){ if (connector!=null){
brokerInfo = connector.getBrokerInfo(); brokerInfo=connector.getBrokerInfo();
}else{ }else{
brokerInfo = new BrokerInfo(); brokerInfo=new BrokerInfo();
} }
brokerInfo.setBrokerName(broker.getBrokerName()); brokerInfo.setBrokerName(broker.getBrokerName());
brokerInfo.setPeerBrokerInfos(broker.getBroker().getPeerBrokerInfos()); brokerInfo.setPeerBrokerInfos(broker.getBroker().getPeerBrokerInfos());
@ -177,9 +179,16 @@ public class MasterConnector implements Service{
if (command.getDataStructureType()==CommandTypes.SHUTDOWN_INFO){ if (command.getDataStructureType()==CommandTypes.SHUTDOWN_INFO){
log.warn("The Master has shutdown"); log.warn("The Master has shutdown");
shutDown(); shutDown();
}else { }else{
boolean responseRequired = command.isResponseRequired();
short commandId = command.getCommandId();
localBroker.oneway(command); localBroker.oneway(command);
if (responseRequired){
Response response=new Response();
response.setCorrelationId(commandId);
remoteBroker.oneway(response);
}
} }
}catch(IOException e){ }catch(IOException e){
serviceRemoteException(e); serviceRemoteException(e);
@ -220,10 +229,10 @@ public class MasterConnector implements Service{
public void setRemoteURI(URI remoteURI){ public void setRemoteURI(URI remoteURI){
this.remoteURI=remoteURI; this.remoteURI=remoteURI;
} }
private void shutDown(){ private void shutDown(){
masterActive.set(false); masterActive.set(false);
broker.masterFailed(); broker.masterFailed();
ServiceSupport.dispose(this); //ServiceSupport.dispose(this);
} }
} }