ARTEMIS-2324 Fixing possible race on DuplciateDetection

This is fixing AmqpSenderTest.testDuplicateDetection
This commit is contained in:
Clebert Suconic 2019-04-26 10:34:09 -04:00
parent edbf3575d0
commit 00a6b933bb
2 changed files with 23 additions and 9 deletions

View File

@ -213,22 +213,24 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache {
tx.setContainsPersistent();
}
if (logger.isTraceEnabled()) {
logger.trace("DuplicateIDCacheImpl(" + this.address + ")::addToCache Adding duplicateID TX operation for " + describeID(duplID, recordID) + ", tx=" + tx);
}
if (instantAdd) {
addToCacheInMemory(duplID, recordID);
tx.addOperation(new AddDuplicateIDOperation(duplID, recordID, false));
} else {
if (logger.isTraceEnabled()) {
logger.trace("DuplicateIDCacheImpl(" + this.address + ")::addToCache Adding duplicateID TX operation for " + describeID(duplID, recordID) + ", tx=" + tx);
}
// For a tx, it's important that the entry is not added to the cache until commit
// since if the client fails then resends them tx we don't want it to get rejected
tx.afterStore(new AddDuplicateIDOperation(duplID, recordID));
tx.afterStore(new AddDuplicateIDOperation(duplID, recordID, true));
}
}
}
@Override
public void load(final Transaction tx, final byte[] duplID) {
tx.addOperation(new AddDuplicateIDOperation(duplID, tx.getID()));
tx.addOperation(new AddDuplicateIDOperation(duplID, tx.getID(), true));
}
private synchronized void addToCacheInMemory(final byte[] duplID, final long recordID) {
@ -330,9 +332,12 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache {
volatile boolean done;
AddDuplicateIDOperation(final byte[] duplID, final long recordID) {
private final boolean afterCommit;
AddDuplicateIDOperation(final byte[] duplID, final long recordID, boolean afterCommit) {
this.duplID = duplID;
this.recordID = recordID;
this.afterCommit = afterCommit;
}
private void process() {
@ -345,7 +350,16 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache {
@Override
public void afterCommit(final Transaction tx) {
process();
if (afterCommit) {
process();
}
}
@Override
public void beforeCommit(Transaction tx) throws Exception {
if (!afterCommit) {
process();
}
}
@Override

View File

@ -1505,7 +1505,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
startedTX.set(true);
}
cache.addToCache(duplicateIDBytes, context.getTransaction(), false);
cache.addToCache(duplicateIDBytes, context.getTransaction(), startedTX.get());
}
}