diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java index ca0be58aa2..6f1b886315 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java @@ -51,19 +51,37 @@ public class MasterBroker extends InsertableMutableBrokerFilter{ private Transport slave; private AtomicBoolean started=new AtomicBoolean(false); + /** + * Constructor + * @param parent + * @param slave + */ public MasterBroker(MutableBrokerFilter parent,Transport slave){ super(parent); this.slave=slave; } + /** + * start processing this broker + * + */ public void startProcessing(){ started.set(true); } + /** + * stop the broker + * @throws Exception + */ public void stop() throws Exception{ super.stop(); stopProcessing(); } + + /** + * stop processing this broker + * + */ public void stopProcessing(){ if (started.compareAndSet(true,false)){ remove(); @@ -76,7 +94,7 @@ public class MasterBroker extends InsertableMutableBrokerFilter{ * A client is establishing a connection with the broker. * @param context * @param info - * @param client + * @throws Exception */ public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception{ super.addConnection(context,info); @@ -87,8 +105,8 @@ public class MasterBroker extends InsertableMutableBrokerFilter{ * A client is disconnecting from the broker. * @param context the environment the operation is being executed under. * @param info - * @param client * @param error null if the client requested the disconnect or the error that caused the client to disconnect. + * @throws Exception */ public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception{ super.removeConnection(context,info,error); @@ -99,8 +117,10 @@ public class MasterBroker extends InsertableMutableBrokerFilter{ * Adds a session. * @param context * @param info + * @throws Exception */ public void addSession(ConnectionContext context, SessionInfo info) throws Exception{ + super.addSession(context, info); sendAsyncToSlave(info); } @@ -109,15 +129,20 @@ public class MasterBroker extends InsertableMutableBrokerFilter{ * Removes a session. * @param context * @param info + * @throws Exception */ public void removeSession(ConnectionContext context, SessionInfo info) throws Exception{ super.removeSession(context, info); sendAsyncToSlave(new RemoveInfo(info.getSessionId())); + + } /** * Adds a producer. * @param context the enviorment the operation is being executed under. + * @param info + * @throws Exception */ public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception{ super.addProducer(context,info); @@ -127,19 +152,34 @@ public class MasterBroker extends InsertableMutableBrokerFilter{ /** * Removes a producer. * @param context the enviorment the operation is being executed under. + * @param info + * @throws Exception */ public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception{ super.removeProducer(context, info); sendAsyncToSlave(new RemoveInfo(info.getProducerId())); } + /** + * add a consumer + * @param context + * @param info + * @return the assocated subscription + * @throws Exception + */ public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { - Subscription answer = super.addConsumer(context, info); sendAsyncToSlave(info); + Subscription answer = super.addConsumer(context, info); + return answer; } - + /** + * remove a subscription + * @param context + * @param info + * @throws Exception + */ public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception { super.removeSubscription(context, info); sendAsyncToSlave(info); @@ -147,60 +187,75 @@ public class MasterBroker extends InsertableMutableBrokerFilter{ + /** + * begin a transaction + * @param context + * @param xid + * @throws Exception + */ public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception{ - super.beginTransaction(context, xid); TransactionInfo info = new TransactionInfo(context.getConnectionId(),xid,TransactionInfo.BEGIN); sendAsyncToSlave(info); + super.beginTransaction(context, xid); + + } /** * Prepares a transaction. Only valid for xa transactions. - * @param client + * @param context * @param xid - * @return + * @return the state + * @throws Exception */ public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception{ - int result = super.prepareTransaction(context, xid); TransactionInfo info = new TransactionInfo(context.getConnectionId(),xid,TransactionInfo.PREPARE); sendAsyncToSlave(info); + int result = super.prepareTransaction(context, xid); + return result; } /** * Rollsback a transaction. - * @param client + * @param context * @param xid + * @throws Exception */ public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception{ - super.rollbackTransaction(context, xid); TransactionInfo info = new TransactionInfo(context.getConnectionId(),xid,TransactionInfo.ROLLBACK); sendAsyncToSlave(info); + super.rollbackTransaction(context, xid); + } /** * Commits a transaction. - * @param client + * @param context * @param xid * @param onePhase + * @throws Exception */ public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception{ - super.commitTransaction(context, xid,onePhase); + TransactionInfo info = new TransactionInfo(context.getConnectionId(),xid,TransactionInfo.COMMIT_ONE_PHASE); sendSyncToSlave(info); + super.commitTransaction(context, xid,onePhase); } /** * Forgets a transaction. - * @param client - * @param xid - * @param onePhase + * @param context + * @param xid + * @throws Exception */ public void forgetTransaction(ConnectionContext context, TransactionId xid) throws Exception{ - super.forgetTransaction(context, xid); + TransactionInfo info = new TransactionInfo(context.getConnectionId(),xid,TransactionInfo.FORGET); sendAsyncToSlave(info); + super.forgetTransaction(context, xid); } /** @@ -208,15 +263,22 @@ public class MasterBroker extends InsertableMutableBrokerFilter{ * @param messageDispatch */ public void processDispatch(MessageDispatch messageDispatch){ - super.processDispatch(messageDispatch); + MessageDispatchNotification mdn = new MessageDispatchNotification(); mdn.setConsumerId(messageDispatch.getConsumerId()); mdn.setDeliverySequenceId(messageDispatch.getDeliverySequenceId()); mdn.setDestination(messageDispatch.getDestination()); mdn.setMessageId(messageDispatch.getMessage().getMessageId()); sendAsyncToSlave(mdn); + super.processDispatch(messageDispatch); } + /** + * @param context + * @param message + * @throws Exception + * + */ public void send(ConnectionContext context, Message message) throws Exception{ /** * A message can be dispatched before the super.send() method returns @@ -225,12 +287,20 @@ public class MasterBroker extends InsertableMutableBrokerFilter{ */ sendToSlave(message); super.send(context,message); + } + /** + * @param context + * @param ack + * @throws Exception + * + */ public void acknowledge(ConnectionContext context, MessageAck ack) throws Exception{ - super.acknowledge(context, ack); sendToSlave(ack); + super.acknowledge(context, ack); + }