diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCacheImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCacheImpl.java index ce3c7820de..fff7e288b7 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCacheImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCacheImpl.java @@ -213,22 +213,24 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache { tx.setContainsPersistent(); } + if (logger.isTraceEnabled()) { + logger.trace("DuplicateIDCacheImpl(" + this.address + ")::addToCache Adding duplicateID TX operation for " + describeID(duplID, recordID) + ", tx=" + tx); + } + + if (instantAdd) { - addToCacheInMemory(duplID, recordID); + tx.addOperation(new AddDuplicateIDOperation(duplID, recordID, false)); } else { - if (logger.isTraceEnabled()) { - logger.trace("DuplicateIDCacheImpl(" + this.address + ")::addToCache Adding duplicateID TX operation for " + describeID(duplID, recordID) + ", tx=" + tx); - } // For a tx, it's important that the entry is not added to the cache until commit // since if the client fails then resends them tx we don't want it to get rejected - tx.afterStore(new AddDuplicateIDOperation(duplID, recordID)); + tx.afterStore(new AddDuplicateIDOperation(duplID, recordID, true)); } } } @Override public void load(final Transaction tx, final byte[] duplID) { - tx.addOperation(new AddDuplicateIDOperation(duplID, tx.getID())); + tx.addOperation(new AddDuplicateIDOperation(duplID, tx.getID(), true)); } private synchronized void addToCacheInMemory(final byte[] duplID, final long recordID) { @@ -330,9 +332,12 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache { volatile boolean done; - AddDuplicateIDOperation(final byte[] duplID, final long recordID) { + private final boolean afterCommit; + + AddDuplicateIDOperation(final byte[] duplID, final long recordID, boolean afterCommit) { this.duplID = duplID; this.recordID = recordID; + this.afterCommit = afterCommit; } private void process() { @@ -345,7 +350,16 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache { @Override public void afterCommit(final Transaction tx) { - process(); + if (afterCommit) { + process(); + } + } + + @Override + public void beforeCommit(Transaction tx) throws Exception { + if (!afterCommit) { + process(); + } } @Override diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java index ad839c100f..b0d8063645 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java @@ -1505,7 +1505,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding startedTX.set(true); } - cache.addToCache(duplicateIDBytes, context.getTransaction(), false); + cache.addToCache(duplicateIDBytes, context.getTransaction(), startedTX.get()); } }