resolve some timing issues

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@391049 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2006-04-03 14:12:11 +00:00
parent 05d271a719
commit e9b1ff51ee
1 changed files with 88 additions and 18 deletions

View File

@ -51,19 +51,37 @@ public class MasterBroker extends InsertableMutableBrokerFilter{
private Transport slave; private Transport slave;
private AtomicBoolean started=new AtomicBoolean(false); private AtomicBoolean started=new AtomicBoolean(false);
/**
* Constructor
* @param parent
* @param slave
*/
public MasterBroker(MutableBrokerFilter parent,Transport slave){ public MasterBroker(MutableBrokerFilter parent,Transport slave){
super(parent); super(parent);
this.slave=slave; this.slave=slave;
} }
/**
* start processing this broker
*
*/
public void startProcessing(){ public void startProcessing(){
started.set(true); started.set(true);
} }
/**
* stop the broker
* @throws Exception
*/
public void stop() throws Exception{ public void stop() throws Exception{
super.stop(); super.stop();
stopProcessing(); stopProcessing();
} }
/**
* stop processing this broker
*
*/
public void stopProcessing(){ public void stopProcessing(){
if (started.compareAndSet(true,false)){ if (started.compareAndSet(true,false)){
remove(); remove();
@ -76,7 +94,7 @@ public class MasterBroker extends InsertableMutableBrokerFilter{
* A client is establishing a connection with the broker. * A client is establishing a connection with the broker.
* @param context * @param context
* @param info * @param info
* @param client * @throws Exception
*/ */
public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception{ public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception{
super.addConnection(context,info); super.addConnection(context,info);
@ -87,8 +105,8 @@ public class MasterBroker extends InsertableMutableBrokerFilter{
* A client is disconnecting from the broker. * A client is disconnecting from the broker.
* @param context the environment the operation is being executed under. * @param context the environment the operation is being executed under.
* @param info * @param info
* @param client
* @param error null if the client requested the disconnect or the error that caused the client to disconnect. * @param error null if the client requested the disconnect or the error that caused the client to disconnect.
* @throws Exception
*/ */
public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception{ public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception{
super.removeConnection(context,info,error); super.removeConnection(context,info,error);
@ -99,8 +117,10 @@ public class MasterBroker extends InsertableMutableBrokerFilter{
* Adds a session. * Adds a session.
* @param context * @param context
* @param info * @param info
* @throws Exception
*/ */
public void addSession(ConnectionContext context, SessionInfo info) throws Exception{ public void addSession(ConnectionContext context, SessionInfo info) throws Exception{
super.addSession(context, info); super.addSession(context, info);
sendAsyncToSlave(info); sendAsyncToSlave(info);
} }
@ -109,15 +129,20 @@ public class MasterBroker extends InsertableMutableBrokerFilter{
* Removes a session. * Removes a session.
* @param context * @param context
* @param info * @param info
* @throws Exception
*/ */
public void removeSession(ConnectionContext context, SessionInfo info) throws Exception{ public void removeSession(ConnectionContext context, SessionInfo info) throws Exception{
super.removeSession(context, info); super.removeSession(context, info);
sendAsyncToSlave(new RemoveInfo(info.getSessionId())); sendAsyncToSlave(new RemoveInfo(info.getSessionId()));
} }
/** /**
* Adds a producer. * Adds a producer.
* @param context the enviorment the operation is being executed under. * @param context the enviorment the operation is being executed under.
* @param info
* @throws Exception
*/ */
public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception{ public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception{
super.addProducer(context,info); super.addProducer(context,info);
@ -127,19 +152,34 @@ public class MasterBroker extends InsertableMutableBrokerFilter{
/** /**
* Removes a producer. * Removes a producer.
* @param context the enviorment the operation is being executed under. * @param context the enviorment the operation is being executed under.
* @param info
* @throws Exception
*/ */
public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception{ public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception{
super.removeProducer(context, info); super.removeProducer(context, info);
sendAsyncToSlave(new RemoveInfo(info.getProducerId())); sendAsyncToSlave(new RemoveInfo(info.getProducerId()));
} }
/**
* add a consumer
* @param context
* @param info
* @return the assocated subscription
* @throws Exception
*/
public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
Subscription answer = super.addConsumer(context, info);
sendAsyncToSlave(info); sendAsyncToSlave(info);
Subscription answer = super.addConsumer(context, info);
return answer; return answer;
} }
/**
* remove a subscription
* @param context
* @param info
* @throws Exception
*/
public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception { public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
super.removeSubscription(context, info); super.removeSubscription(context, info);
sendAsyncToSlave(info); sendAsyncToSlave(info);
@ -147,60 +187,75 @@ public class MasterBroker extends InsertableMutableBrokerFilter{
/**
* begin a transaction
* @param context
* @param xid
* @throws Exception
*/
public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception{ public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception{
super.beginTransaction(context, xid);
TransactionInfo info = new TransactionInfo(context.getConnectionId(),xid,TransactionInfo.BEGIN); TransactionInfo info = new TransactionInfo(context.getConnectionId(),xid,TransactionInfo.BEGIN);
sendAsyncToSlave(info); sendAsyncToSlave(info);
super.beginTransaction(context, xid);
} }
/** /**
* Prepares a transaction. Only valid for xa transactions. * Prepares a transaction. Only valid for xa transactions.
* @param client * @param context
* @param xid * @param xid
* @return * @return the state
* @throws Exception
*/ */
public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception{ public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception{
int result = super.prepareTransaction(context, xid);
TransactionInfo info = new TransactionInfo(context.getConnectionId(),xid,TransactionInfo.PREPARE); TransactionInfo info = new TransactionInfo(context.getConnectionId(),xid,TransactionInfo.PREPARE);
sendAsyncToSlave(info); sendAsyncToSlave(info);
int result = super.prepareTransaction(context, xid);
return result; return result;
} }
/** /**
* Rollsback a transaction. * Rollsback a transaction.
* @param client * @param context
* @param xid * @param xid
* @throws Exception
*/ */
public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception{ public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception{
super.rollbackTransaction(context, xid);
TransactionInfo info = new TransactionInfo(context.getConnectionId(),xid,TransactionInfo.ROLLBACK); TransactionInfo info = new TransactionInfo(context.getConnectionId(),xid,TransactionInfo.ROLLBACK);
sendAsyncToSlave(info); sendAsyncToSlave(info);
super.rollbackTransaction(context, xid);
} }
/** /**
* Commits a transaction. * Commits a transaction.
* @param client * @param context
* @param xid * @param xid
* @param onePhase * @param onePhase
* @throws Exception
*/ */
public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception{ public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception{
super.commitTransaction(context, xid,onePhase);
TransactionInfo info = new TransactionInfo(context.getConnectionId(),xid,TransactionInfo.COMMIT_ONE_PHASE); TransactionInfo info = new TransactionInfo(context.getConnectionId(),xid,TransactionInfo.COMMIT_ONE_PHASE);
sendSyncToSlave(info); sendSyncToSlave(info);
super.commitTransaction(context, xid,onePhase);
} }
/** /**
* Forgets a transaction. * Forgets a transaction.
* @param client * @param context
* @param xid * @param xid
* @param onePhase * @throws Exception
*/ */
public void forgetTransaction(ConnectionContext context, TransactionId xid) throws Exception{ public void forgetTransaction(ConnectionContext context, TransactionId xid) throws Exception{
super.forgetTransaction(context, xid);
TransactionInfo info = new TransactionInfo(context.getConnectionId(),xid,TransactionInfo.FORGET); TransactionInfo info = new TransactionInfo(context.getConnectionId(),xid,TransactionInfo.FORGET);
sendAsyncToSlave(info); sendAsyncToSlave(info);
super.forgetTransaction(context, xid);
} }
/** /**
@ -208,15 +263,22 @@ public class MasterBroker extends InsertableMutableBrokerFilter{
* @param messageDispatch * @param messageDispatch
*/ */
public void processDispatch(MessageDispatch messageDispatch){ public void processDispatch(MessageDispatch messageDispatch){
super.processDispatch(messageDispatch);
MessageDispatchNotification mdn = new MessageDispatchNotification(); MessageDispatchNotification mdn = new MessageDispatchNotification();
mdn.setConsumerId(messageDispatch.getConsumerId()); mdn.setConsumerId(messageDispatch.getConsumerId());
mdn.setDeliverySequenceId(messageDispatch.getDeliverySequenceId()); mdn.setDeliverySequenceId(messageDispatch.getDeliverySequenceId());
mdn.setDestination(messageDispatch.getDestination()); mdn.setDestination(messageDispatch.getDestination());
mdn.setMessageId(messageDispatch.getMessage().getMessageId()); mdn.setMessageId(messageDispatch.getMessage().getMessageId());
sendAsyncToSlave(mdn); sendAsyncToSlave(mdn);
super.processDispatch(messageDispatch);
} }
/**
* @param context
* @param message
* @throws Exception
*
*/
public void send(ConnectionContext context, Message message) throws Exception{ public void send(ConnectionContext context, Message message) throws Exception{
/** /**
* A message can be dispatched before the super.send() method returns * A message can be dispatched before the super.send() method returns
@ -225,12 +287,20 @@ public class MasterBroker extends InsertableMutableBrokerFilter{
*/ */
sendToSlave(message); sendToSlave(message);
super.send(context,message); super.send(context,message);
} }
/**
* @param context
* @param ack
* @throws Exception
*
*/
public void acknowledge(ConnectionContext context, MessageAck ack) throws Exception{ public void acknowledge(ConnectionContext context, MessageAck ack) throws Exception{
super.acknowledge(context, ack);
sendToSlave(ack); sendToSlave(ack);
super.acknowledge(context, ack);
} }