mirror of https://github.com/apache/activemq.git
https://issues.apache.org/jira/browse/AMQ-3576 - fix regression in network connector tests, the sequence id check is not valud for bridges that concentrate messages from multiple consumers. duplicate suppression for network connectors needs to be based on message ids
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1197072 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
5cf33b52c1
commit
2711ad1f3f
|
@ -39,6 +39,7 @@ public class ProducerBrokerExchange {
|
||||||
private ProducerState producerState;
|
private ProducerState producerState;
|
||||||
private boolean mutable = true;
|
private boolean mutable = true;
|
||||||
private AtomicLong lastSendSequenceNumber = new AtomicLong(-1);
|
private AtomicLong lastSendSequenceNumber = new AtomicLong(-1);
|
||||||
|
private boolean auditProducerSequenceIds;
|
||||||
|
|
||||||
public ProducerBrokerExchange() {
|
public ProducerBrokerExchange() {
|
||||||
}
|
}
|
||||||
|
@ -131,20 +132,23 @@ public class ProducerBrokerExchange {
|
||||||
*/
|
*/
|
||||||
public boolean canDispatch(Message messageSend) {
|
public boolean canDispatch(Message messageSend) {
|
||||||
boolean canDispatch = true;
|
boolean canDispatch = true;
|
||||||
if (lastSendSequenceNumber.get() > 0) {
|
if (auditProducerSequenceIds) {
|
||||||
if (messageSend.getMessageId().getProducerSequenceId() <= lastSendSequenceNumber.get()) {
|
if (messageSend.getMessageId().getProducerSequenceId() <= lastSendSequenceNumber.get()) {
|
||||||
canDispatch = false;
|
canDispatch = false;
|
||||||
LOG.debug("suppressing duplicate message send [" + messageSend.getMessageId() + "] with producerSequenceId ["
|
LOG.debug("suppressing duplicate message send [" + messageSend.getMessageId() + "] with producerSequenceId ["
|
||||||
+ messageSend.getMessageId().getProducerSequenceId() + "] less than last stored: " + lastSendSequenceNumber);
|
+ messageSend.getMessageId().getProducerSequenceId() + "] less than last stored: " + lastSendSequenceNumber);
|
||||||
}
|
}
|
||||||
}
|
|
||||||
if (canDispatch) {
|
if (canDispatch) {
|
||||||
|
// track current so we can suppress duplicates later in the stream
|
||||||
lastSendSequenceNumber.set(messageSend.getMessageId().getProducerSequenceId());
|
lastSendSequenceNumber.set(messageSend.getMessageId().getProducerSequenceId());
|
||||||
}
|
}
|
||||||
|
}
|
||||||
return canDispatch;
|
return canDispatch;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setLastStoredSequenceId(long l) {
|
public void setLastStoredSequenceId(long l) {
|
||||||
|
auditProducerSequenceIds = true;
|
||||||
lastSendSequenceNumber.set(l);
|
lastSendSequenceNumber.set(l);
|
||||||
LOG.debug("last stored sequence id set: " + l);
|
LOG.debug("last stored sequence id set: " + l);
|
||||||
}
|
}
|
||||||
|
|
|
@ -1313,7 +1313,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
|
||||||
result = new ProducerBrokerExchange();
|
result = new ProducerBrokerExchange();
|
||||||
TransportConnectionState state = lookupConnectionState(id);
|
TransportConnectionState state = lookupConnectionState(id);
|
||||||
context = state.getContext();
|
context = state.getContext();
|
||||||
if (context.isReconnect()) {
|
if (context.isReconnect() && !context.isNetworkConnection()) {
|
||||||
result.setLastStoredSequenceId(broker.getBrokerService().getPersistenceAdapter().getLastProducerSequenceId(id));
|
result.setLastStoredSequenceId(broker.getBrokerService().getPersistenceAdapter().getLastProducerSequenceId(id));
|
||||||
}
|
}
|
||||||
result.setConnectionContext(context);
|
result.setConnectionContext(context);
|
||||||
|
|
Loading…
Reference in New Issue