somehow, somewhere, the ResponseCorrelator got dropped from the transport used by the

MasterBroker - so I've added it back here

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@490452 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2006-12-27 07:40:13 +00:00
parent ef0b33eeb6
commit de64939643
1 changed files with 121 additions and 133 deletions

View File

@ -1,24 +1,20 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.activemq.broker.ft;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.broker.Connection;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.InsertableMutableBrokerFilter;
@ -40,6 +36,8 @@ import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.TransactionInfo;
import org.apache.activemq.transport.MutexTransport;
import org.apache.activemq.transport.ResponseCorrelator;
import org.apache.activemq.transport.Transport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -50,23 +48,28 @@ import org.apache.commons.logging.LogFactory;
* @version $Revision: 1.8 $
*/
public class MasterBroker extends InsertableMutableBrokerFilter{
private static final Log log=LogFactory.getLog(MasterBroker.class);
private Transport slave;
private AtomicBoolean started=new AtomicBoolean(false);
/**
* Constructor
*
* @param parent
* @param slave
* @param transport
*/
public MasterBroker(MutableBrokerFilter parent,Transport slave){
public MasterBroker(MutableBrokerFilter parent,Transport transport){
super(parent);
this.slave=slave;
this.slave=transport;
this.slave=new MutexTransport(slave);
this.slave=new ResponseCorrelator(slave);
this.slave.setTransportListener(transport.getTransportListener());
}
/**
* start processing this broker
*
*
*/
public void startProcessing(){
started.set(true);
@ -95,251 +98,240 @@ public class MasterBroker extends InsertableMutableBrokerFilter{
super.stop();
stopProcessing();
}
/**
* stop processing this broker
*
*
*/
public void stopProcessing(){
if (started.compareAndSet(true,false)){
if(started.compareAndSet(true,false)){
remove();
}
}
/**
* A client is establishing a connection with the broker.
*
* @param context
* @param info
* @throws Exception
* @param info
* @throws Exception
*/
public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception{
public void addConnection(ConnectionContext context,ConnectionInfo info) throws Exception{
super.addConnection(context,info);
sendAsyncToSlave(info);
}
/**
* A client is disconnecting from the broker.
*
* @param context the environment the operation is being executed under.
* @param info
* @param info
* @param error null if the client requested the disconnect or the error that caused the client to disconnect.
* @throws Exception
* @throws Exception
*/
public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception{
public void removeConnection(ConnectionContext context,ConnectionInfo info,Throwable error) throws Exception{
super.removeConnection(context,info,error);
sendAsyncToSlave(new RemoveInfo(info.getConnectionId()));
}
/**
* Adds a session.
*
* @param context
* @param info
* @throws Exception
* @throws Exception
*/
public void addSession(ConnectionContext context, SessionInfo info) throws Exception{
super.addSession(context, info);
public void addSession(ConnectionContext context,SessionInfo info) throws Exception{
super.addSession(context,info);
sendAsyncToSlave(info);
}
/**
* Removes a session.
*
* @param context
* @param info
* @throws Exception
* @throws Exception
*/
public void removeSession(ConnectionContext context, SessionInfo info) throws Exception{
super.removeSession(context, info);
public void removeSession(ConnectionContext context,SessionInfo info) throws Exception{
super.removeSession(context,info);
sendAsyncToSlave(new RemoveInfo(info.getSessionId()));
}
/**
* Adds a producer.
*
* @param context the enviorment the operation is being executed under.
* @param info
* @throws Exception
* @param info
* @throws Exception
*/
public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception{
public void addProducer(ConnectionContext context,ProducerInfo info) throws Exception{
super.addProducer(context,info);
sendAsyncToSlave(info);
}
/**
* Removes a producer.
*
* @param context the enviorment the operation is being executed under.
* @param info
* @throws Exception
* @param info
* @throws Exception
*/
public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception{
super.removeProducer(context, info);
public void removeProducer(ConnectionContext context,ProducerInfo info) throws Exception{
super.removeProducer(context,info);
sendAsyncToSlave(new RemoveInfo(info.getProducerId()));
}
/**
* add a consumer
* @param context
* @param info
*
* @param context
* @param info
* @return the assocated subscription
* @throws Exception
* @throws Exception
*/
public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
public Subscription addConsumer(ConnectionContext context,ConsumerInfo info) throws Exception{
sendAsyncToSlave(info);
Subscription answer = super.addConsumer(context, info);
Subscription answer=super.addConsumer(context,info);
return answer;
}
/**
* remove a subscription
* @param context
* @param info
* @throws Exception
*
* @param context
* @param info
* @throws Exception
*/
public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
super.removeSubscription(context, info);
public void removeSubscription(ConnectionContext context,RemoveSubscriptionInfo info) throws Exception{
super.removeSubscription(context,info);
sendAsyncToSlave(info);
}
/**
* begin a transaction
* @param context
* @param xid
* @throws Exception
*
* @param context
* @param xid
* @throws Exception
*/
public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception{
TransactionInfo info = new TransactionInfo(context.getConnectionId(),xid,TransactionInfo.BEGIN);
public void beginTransaction(ConnectionContext context,TransactionId xid) throws Exception{
TransactionInfo info=new TransactionInfo(context.getConnectionId(),xid,TransactionInfo.BEGIN);
sendAsyncToSlave(info);
super.beginTransaction(context, xid);
super.beginTransaction(context,xid);
}
/**
* Prepares a transaction. Only valid for xa transactions.
* @param context
*
* @param context
* @param xid
* @return the state
* @throws Exception
* @throws Exception
*/
public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception{
TransactionInfo info = new TransactionInfo(context.getConnectionId(),xid,TransactionInfo.PREPARE);
public int prepareTransaction(ConnectionContext context,TransactionId xid) throws Exception{
TransactionInfo info=new TransactionInfo(context.getConnectionId(),xid,TransactionInfo.PREPARE);
sendAsyncToSlave(info);
int result = super.prepareTransaction(context, xid);
int result=super.prepareTransaction(context,xid);
return result;
}
/**
* Rollsback a transaction.
* @param context
*
* @param context
* @param xid
* @throws Exception
* @throws Exception
*/
public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception{
TransactionInfo info = new TransactionInfo(context.getConnectionId(),xid,TransactionInfo.ROLLBACK);
public void rollbackTransaction(ConnectionContext context,TransactionId xid) throws Exception{
TransactionInfo info=new TransactionInfo(context.getConnectionId(),xid,TransactionInfo.ROLLBACK);
sendAsyncToSlave(info);
super.rollbackTransaction(context, xid);
super.rollbackTransaction(context,xid);
}
/**
* Commits a transaction.
* @param context
*
* @param context
* @param xid
* @param onePhase
* @throws Exception
* @throws Exception
*/
public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception{
TransactionInfo info = new TransactionInfo(context.getConnectionId(),xid,TransactionInfo.COMMIT_ONE_PHASE);
public void commitTransaction(ConnectionContext context,TransactionId xid,boolean onePhase) throws Exception{
TransactionInfo info=new TransactionInfo(context.getConnectionId(),xid,TransactionInfo.COMMIT_ONE_PHASE);
sendSyncToSlave(info);
super.commitTransaction(context, xid,onePhase);
super.commitTransaction(context,xid,onePhase);
}
/**
* Forgets a transaction.
* @param context
* @param xid
* @throws Exception
*
* @param context
* @param xid
* @throws Exception
*/
public void forgetTransaction(ConnectionContext context, TransactionId xid) throws Exception{
TransactionInfo info = new TransactionInfo(context.getConnectionId(),xid,TransactionInfo.FORGET);
public void forgetTransaction(ConnectionContext context,TransactionId xid) throws Exception{
TransactionInfo info=new TransactionInfo(context.getConnectionId(),xid,TransactionInfo.FORGET);
sendAsyncToSlave(info);
super.forgetTransaction(context, xid);
super.forgetTransaction(context,xid);
}
/**
* Notifiy the Broker that a dispatch has happened
*
* @param messageDispatch
*/
public void processDispatch(MessageDispatch messageDispatch){
MessageDispatchNotification mdn = new MessageDispatchNotification();
MessageDispatchNotification mdn=new MessageDispatchNotification();
mdn.setConsumerId(messageDispatch.getConsumerId());
mdn.setDeliverySequenceId(messageDispatch.getDeliverySequenceId());
mdn.setDestination(messageDispatch.getDestination());
if( messageDispatch.getMessage() != null )
if(messageDispatch.getMessage()!=null)
mdn.setMessageId(messageDispatch.getMessage().getMessageId());
sendAsyncToSlave(mdn);
super.processDispatch(messageDispatch);
}
/**
* @param context
* @param message
* @throws Exception
* @param context
* @param message
* @throws Exception
*
*/
public void send(ConnectionContext context, Message message) throws Exception{
public void send(ConnectionContext context,Message message) throws Exception{
/**
* A message can be dispatched before the super.send() method returns
* so - here the order is switched to avoid problems on the slave
* with receiving acks for messages not received yey
* A message can be dispatched before the super.send() method returns so - here the order is switched to avoid
* problems on the slave with receiving acks for messages not received yey
*/
sendToSlave(message);
super.send(context,message);
}
/**
* @param context
* @param ack
* @throws Exception
* @param context
* @param ack
* @throws Exception
*
*/
public void acknowledge(ConnectionContext context, MessageAck ack) throws Exception{
public void acknowledge(ConnectionContext context,MessageAck ack) throws Exception{
sendToSlave(ack);
super.acknowledge(context, ack);
super.acknowledge(context,ack);
}
public boolean isFaultTolerantConfiguration(){
return true;
}
protected void sendToSlave(Message message){
if ( message.isResponseRequired() ){
if(message.isResponseRequired()){
sendSyncToSlave(message);
}else{
sendAsyncToSlave(message);
}
}
protected void sendToSlave(MessageAck ack){
if ( ack.isResponseRequired() ){
if(ack.isResponseRequired()){
sendAsyncToSlave(ack);
}else{
sendSyncToSlave(ack);
@ -348,9 +340,7 @@ public class MasterBroker extends InsertableMutableBrokerFilter{
protected void sendAsyncToSlave(Command command){
try{
slave.oneway(command);
}catch(Throwable e){
log.error("Slave Failed",e);
stopProcessing();
@ -359,17 +349,15 @@ public class MasterBroker extends InsertableMutableBrokerFilter{
protected void sendSyncToSlave(Command command){
try{
Response response=(Response) slave.request(command);
if (response.isException()){
System.err.println("SEMNDING SYNC "+command);
Response response=(Response)slave.request(command);
System.out.println("GOT RESPONSE "+response);
if(response.isException()){
ExceptionResponse er=(ExceptionResponse)response;
log.error("Slave Failed",er.getException());
}
}catch(Throwable e){
log.error("Slave Failed",e);
}
}
}