This commit is contained in:
Clebert Suconic 2020-01-08 12:39:14 -05:00
commit 52d4193efd
1 changed files with 16 additions and 10 deletions

View File

@ -21,6 +21,7 @@ import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.paging.PageTransactionInfo; import org.apache.activemq.artemis.core.paging.PageTransactionInfo;
@ -43,6 +44,11 @@ public final class PageTransactionInfoImpl implements PageTransactionInfo {
private static final Logger logger = Logger.getLogger(PageTransactionInfoImpl.class); private static final Logger logger = Logger.getLogger(PageTransactionInfoImpl.class);
private static final AtomicIntegerFieldUpdater<PageTransactionInfoImpl> numberOfMessagesUpdater =
AtomicIntegerFieldUpdater.newUpdater(PageTransactionInfoImpl.class, "numberOfMessages");
private static final AtomicIntegerFieldUpdater<PageTransactionInfoImpl> numberOfPersistentMessagesUpdater =
AtomicIntegerFieldUpdater.newUpdater(PageTransactionInfoImpl.class, "numberOfPersistentMessages");
private long transactionID; private long transactionID;
private volatile long recordID = -1; private volatile long recordID = -1;
@ -53,9 +59,9 @@ public final class PageTransactionInfoImpl implements PageTransactionInfo {
private volatile boolean rolledback = false; private volatile boolean rolledback = false;
private final AtomicInteger numberOfMessages = new AtomicInteger(0); private volatile int numberOfMessages = 0;
private final AtomicInteger numberOfPersistentMessages = new AtomicInteger(0); private volatile int numberOfPersistentMessages = 0;
private List<LateDelivery> lateDeliveries; private List<LateDelivery> lateDeliveries;
@ -90,13 +96,13 @@ public final class PageTransactionInfoImpl implements PageTransactionInfo {
@Override @Override
public boolean onUpdate(final int update, final StorageManager storageManager, PagingManager pagingManager) { public boolean onUpdate(final int update, final StorageManager storageManager, PagingManager pagingManager) {
int afterUpdate = numberOfMessages.addAndGet(-update); int afterUpdate = numberOfMessagesUpdater.addAndGet(this, -update);
return internalCheckSize(storageManager, pagingManager, afterUpdate); return internalCheckSize(storageManager, pagingManager, afterUpdate);
} }
@Override @Override
public boolean checkSize(StorageManager storageManager, PagingManager pagingManager) { public boolean checkSize(StorageManager storageManager, PagingManager pagingManager) {
return internalCheckSize(storageManager, pagingManager, numberOfMessages.get()); return internalCheckSize(storageManager, pagingManager, numberOfMessagesUpdater.get(this));
} }
public boolean internalCheckSize(StorageManager storageManager, PagingManager pagingManager, int size) { public boolean internalCheckSize(StorageManager storageManager, PagingManager pagingManager, int size) {
@ -118,13 +124,13 @@ public final class PageTransactionInfoImpl implements PageTransactionInfo {
@Override @Override
public void increment(final int durableSize, final int nonDurableSize) { public void increment(final int durableSize, final int nonDurableSize) {
numberOfPersistentMessages.addAndGet(durableSize); numberOfPersistentMessagesUpdater.addAndGet(this, durableSize);
numberOfMessages.addAndGet(durableSize + nonDurableSize); numberOfMessagesUpdater.addAndGet(this, durableSize + nonDurableSize);
} }
@Override @Override
public int getNumberOfMessages() { public int getNumberOfMessages() {
return numberOfMessages.get(); return numberOfMessagesUpdater.get(this);
} }
// EncodingSupport implementation // EncodingSupport implementation
@ -132,15 +138,15 @@ public final class PageTransactionInfoImpl implements PageTransactionInfo {
@Override @Override
public synchronized void decode(final ActiveMQBuffer buffer) { public synchronized void decode(final ActiveMQBuffer buffer) {
transactionID = buffer.readLong(); transactionID = buffer.readLong();
numberOfMessages.set(buffer.readInt()); numberOfMessagesUpdater.set(this, buffer.readInt());
numberOfPersistentMessages.set(numberOfMessages.get()); numberOfPersistentMessagesUpdater.set(this, numberOfMessagesUpdater.get(this));
committed = true; committed = true;
} }
@Override @Override
public synchronized void encode(final ActiveMQBuffer buffer) { public synchronized void encode(final ActiveMQBuffer buffer) {
buffer.writeLong(transactionID); buffer.writeLong(transactionID);
buffer.writeInt(numberOfPersistentMessages.get()); buffer.writeInt(numberOfPersistentMessagesUpdater.get(this));
} }
@Override @Override