diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterConnector.java b/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterConnector.java index bdaf12494b..96286cd6c7 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterConnector.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterConnector.java @@ -26,7 +26,6 @@ import org.apache.activemq.command.Command; import org.apache.activemq.command.CommandTypes; import org.apache.activemq.command.ConnectionId; import org.apache.activemq.command.ConnectionInfo; -import org.apache.activemq.command.ExceptionResponse; import org.apache.activemq.command.MessageDispatch; import org.apache.activemq.command.ProducerInfo; import org.apache.activemq.command.Response; @@ -59,6 +58,7 @@ public class MasterConnector implements Service{ private Transport remoteBroker; private TransportConnector connector; private AtomicBoolean masterActive=new AtomicBoolean(false); + private AtomicBoolean started=new AtomicBoolean(false); IdGenerator idGenerator=new IdGenerator(); ConnectionInfo connectionInfo; @@ -75,7 +75,10 @@ public class MasterConnector implements Service{ } public void start() throws Exception{ - + if( !started.compareAndSet(false, true) ) { + return; + } + localBroker=TransportFactory.connect(localURI); remoteBroker=TransportFactory.connect(remoteURI); log.info("Starting a network connection between "+localBroker+" and "+remoteBroker+" has been established."); @@ -84,16 +87,22 @@ public class MasterConnector implements Service{ public void onCommand(Command command){ } public void onException(IOException error){ - serviceLocalException(error); + if( started.get() ) { + serviceLocalException(error); + } } }); remoteBroker.setTransportListener(new TransportListener(){ public void onCommand(Command command){ - serviceRemoteCommand(command); + if( started.get() ) { + serviceRemoteCommand(command); + } } public void onException(IOException error){ - serviceRemoteException(error); + if( started.get() ) { + serviceRemoteException(error); + } } }); @@ -145,14 +154,18 @@ public class MasterConnector implements Service{ log.info("Slave connection between "+localBroker+" and "+remoteBroker+" has been established."); } - public void stop() throws Exception{ + public void stop() throws Exception{ + if( !started.compareAndSet(true, false) ) { + return; + } + masterActive.set(false); try{ - if (connectionInfo!=null){ - localBroker.request(connectionInfo.createRemoveCommand()); - } - localBroker.setTransportListener(null); - remoteBroker.setTransportListener(null); +// if (connectionInfo!=null){ +// localBroker.request(connectionInfo.createRemoveCommand()); +// } +// localBroker.setTransportListener(null); +// remoteBroker.setTransportListener(null); remoteBroker.oneway(new ShutdownInfo()); localBroker.oneway(new ShutdownInfo()); }catch(IOException e){ diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java index 7b5b8c90cc..50a572f25d 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java @@ -296,7 +296,7 @@ public class RegionBroker implements Broker { public void send(ConnectionContext context, Message message) throws Throwable { message.getMessageId().setBrokerSequenceId(sequenceGenerator.getNextSequenceId()); - if (message.getTimestamp() > 0 && (message.getBrokerPath() == null | message.getBrokerPath().length == 0)) { + if (message.getTimestamp() > 0 && (message.getBrokerPath() == null || message.getBrokerPath().length == 0)) { //timestamp not been disabled and has not passed through a network message.setTimestamp(System.currentTimeMillis()); }