diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java b/activemq-core/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java index 425f493c01..68b24e3574 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java @@ -39,6 +39,7 @@ public class ProducerBrokerExchange { private ProducerState producerState; private boolean mutable = true; private AtomicLong lastSendSequenceNumber = new AtomicLong(-1); + private boolean auditProducerSequenceIds; public ProducerBrokerExchange() { } @@ -131,20 +132,23 @@ public class ProducerBrokerExchange { */ public boolean canDispatch(Message messageSend) { boolean canDispatch = true; - if (lastSendSequenceNumber.get() > 0) { + if (auditProducerSequenceIds) { if (messageSend.getMessageId().getProducerSequenceId() <= lastSendSequenceNumber.get()) { canDispatch = false; LOG.debug("suppressing duplicate message send [" + messageSend.getMessageId() + "] with producerSequenceId [" + messageSend.getMessageId().getProducerSequenceId() + "] less than last stored: " + lastSendSequenceNumber); } - } - if (canDispatch) { - lastSendSequenceNumber.set(messageSend.getMessageId().getProducerSequenceId()); + + if (canDispatch) { + // track current so we can suppress duplicates later in the stream + lastSendSequenceNumber.set(messageSend.getMessageId().getProducerSequenceId()); + } } return canDispatch; } public void setLastStoredSequenceId(long l) { + auditProducerSequenceIds = true; lastSendSequenceNumber.set(l); LOG.debug("last stored sequence id set: " + l); } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java b/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java index 3e468f3780..4cbc8512cd 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java @@ -1313,7 +1313,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor { result = new ProducerBrokerExchange(); TransportConnectionState state = lookupConnectionState(id); context = state.getContext(); - if (context.isReconnect()) { + if (context.isReconnect() && !context.isNetworkConnection()) { result.setLastStoredSequenceId(broker.getBrokerService().getPersistenceAdapter().getLastProducerSequenceId(id)); } result.setConnectionContext(context);