diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageTransactionInfoImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageTransactionInfoImpl.java index 70594ca5dc..1b92b90e84 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageTransactionInfoImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageTransactionInfoImpl.java @@ -21,6 +21,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.core.paging.PageTransactionInfo; @@ -43,6 +44,11 @@ public final class PageTransactionInfoImpl implements PageTransactionInfo { private static final Logger logger = Logger.getLogger(PageTransactionInfoImpl.class); + private static final AtomicIntegerFieldUpdater numberOfMessagesUpdater = + AtomicIntegerFieldUpdater.newUpdater(PageTransactionInfoImpl.class, "numberOfMessages"); + private static final AtomicIntegerFieldUpdater numberOfPersistentMessagesUpdater = + AtomicIntegerFieldUpdater.newUpdater(PageTransactionInfoImpl.class, "numberOfPersistentMessages"); + private long transactionID; private volatile long recordID = -1; @@ -53,9 +59,9 @@ public final class PageTransactionInfoImpl implements PageTransactionInfo { private volatile boolean rolledback = false; - private final AtomicInteger numberOfMessages = new AtomicInteger(0); + private volatile int numberOfMessages = 0; - private final AtomicInteger numberOfPersistentMessages = new AtomicInteger(0); + private volatile int numberOfPersistentMessages = 0; private List lateDeliveries; @@ -90,13 +96,13 @@ public final class PageTransactionInfoImpl implements PageTransactionInfo { @Override public boolean onUpdate(final int update, final StorageManager storageManager, PagingManager pagingManager) { - int afterUpdate = numberOfMessages.addAndGet(-update); + int afterUpdate = numberOfMessagesUpdater.addAndGet(this, -update); return internalCheckSize(storageManager, pagingManager, afterUpdate); } @Override public boolean checkSize(StorageManager storageManager, PagingManager pagingManager) { - return internalCheckSize(storageManager, pagingManager, numberOfMessages.get()); + return internalCheckSize(storageManager, pagingManager, numberOfMessagesUpdater.get(this)); } public boolean internalCheckSize(StorageManager storageManager, PagingManager pagingManager, int size) { @@ -118,13 +124,13 @@ public final class PageTransactionInfoImpl implements PageTransactionInfo { @Override public void increment(final int durableSize, final int nonDurableSize) { - numberOfPersistentMessages.addAndGet(durableSize); - numberOfMessages.addAndGet(durableSize + nonDurableSize); + numberOfPersistentMessagesUpdater.addAndGet(this, durableSize); + numberOfMessagesUpdater.addAndGet(this, durableSize + nonDurableSize); } @Override public int getNumberOfMessages() { - return numberOfMessages.get(); + return numberOfMessagesUpdater.get(this); } // EncodingSupport implementation @@ -132,15 +138,15 @@ public final class PageTransactionInfoImpl implements PageTransactionInfo { @Override public synchronized void decode(final ActiveMQBuffer buffer) { transactionID = buffer.readLong(); - numberOfMessages.set(buffer.readInt()); - numberOfPersistentMessages.set(numberOfMessages.get()); + numberOfMessagesUpdater.set(this, buffer.readInt()); + numberOfPersistentMessagesUpdater.set(this, numberOfMessagesUpdater.get(this)); committed = true; } @Override public synchronized void encode(final ActiveMQBuffer buffer) { buffer.writeLong(transactionID); - buffer.writeInt(numberOfPersistentMessages.get()); + buffer.writeInt(numberOfPersistentMessagesUpdater.get(this)); } @Override