ARTEMIS-2845 ConcurrentAppendOnlyChunkedList cannot be queried while resizing

This commit is contained in:
Francesco Nigro 2020-07-14 19:05:47 +02:00 committed by Clebert Suconic
parent f46527b2e0
commit 41efeb2669
1 changed files with 27 additions and 32 deletions

View File

@ -50,13 +50,12 @@ public final class ConcurrentAppendOnlyChunkedList<T> {
private final int chunkSizeLog2;
private static final long RESIZING = -1;
private AtomicChunk<T> firstBuffer = null;
private AtomicChunk<T> lastBuffer = null;
//it is both the current index of the next element to be claimed and the current size of the collection
//it's using a parity bit to mark the rotation state ie size === lastIndex >> 1
private volatile long lastIndex = 0;
//cached view of lastIndex used to avoid invalidating lastIndex while being updated by the appends
@ -80,14 +79,7 @@ public final class ConcurrentAppendOnlyChunkedList<T> {
}
private long getValidLastIndex() {
while (true) {
final long lastIndex = this.lastIndex;
if (lastIndex == RESIZING) {
Thread.yield();
continue;
}
return lastIndex;
}
return this.lastIndex >> 1;
}
/**
@ -185,36 +177,39 @@ public final class ConcurrentAppendOnlyChunkedList<T> {
Objects.requireNonNull(e);
while (true) {
final long lastIndex = this.lastIndex;
if (lastIndex != RESIZING) {
if (lastIndex == Integer.MAX_VALUE) {
throw new IllegalStateException("can't add more then " + Integer.MAX_VALUE + " elements");
}
//load acquire the current lastBuffer
final AtomicChunk<T> lastBuffer = this.lastBuffer;
final int offset = (int) (lastIndex & chunkMask);
//only the first attempt to add an element to a chunk can attempt to resize
if (offset == 0) {
if (addChunkAndElement(lastBuffer, lastIndex, e)) {
return;
}
} else if (LAST_INDEX_UPDATER.compareAndSet(this, lastIndex, lastIndex + 1)) {
//this.lastBuffer is the correct buffer to append a element: it is guarded by the lastIndex logic
//NOTE: lastIndex is being updated before setting a new value
lastBuffer.lazySet(offset, e);
// lower bit is indicative of appending
if ((lastIndex & 1) == 1) {
continue;
}
final long validLastIndex = lastIndex >> 1;
if (validLastIndex == Integer.MAX_VALUE) {
throw new IllegalStateException("can't add more then " + Integer.MAX_VALUE + " elements");
}
//load acquire the current lastBuffer
final AtomicChunk<T> lastBuffer = this.lastBuffer;
final int offset = (int) (validLastIndex & chunkMask);
//only the first attempt to add an element to a chunk can attempt to resize
if (offset == 0) {
if (addChunkAndElement(lastBuffer, lastIndex, validLastIndex, e)) {
return;
}
} else if (LAST_INDEX_UPDATER.compareAndSet(this, lastIndex, lastIndex + 2)) {
//this.lastBuffer is the correct buffer to append a element: it is guarded by the lastIndex logic
//NOTE: lastIndex is being updated before setting a new value
lastBuffer.lazySet(offset, e);
return;
}
Thread.yield();
}
}
private boolean addChunkAndElement(AtomicChunk<T> lastBuffer, long lastIndex, T element) {
if (!LAST_INDEX_UPDATER.compareAndSet(this, lastIndex, RESIZING)) {
private boolean addChunkAndElement(AtomicChunk<T> lastBuffer, long lastIndex, long validLastIndex, T element) {
// adding 1 will set the lower bit
if (!LAST_INDEX_UPDATER.compareAndSet(this, lastIndex, lastIndex + 1)) {
return false;
}
final AtomicChunk<T> newChunk;
try {
final int index = (int) (lastIndex >> chunkSizeLog2);
final int index = (int) (validLastIndex >> chunkSizeLog2);
newChunk = new AtomicChunk<>(index, lastBuffer, chunkSize);
} catch (OutOfMemoryError oom) {
//unblock lastIndex without updating it
@ -234,7 +229,8 @@ public final class ConcurrentAppendOnlyChunkedList<T> {
//making it the current produced one
this.lastBuffer = newChunk;
//store release any previous write and unblock anyone waiting resizing to finish
LAST_INDEX_UPDATER.lazySet(this, lastIndex + 1);
//and would clean the lower bit
LAST_INDEX_UPDATER.lazySet(this, lastIndex + 2);
return true;
}
@ -268,7 +264,6 @@ public final class ConcurrentAppendOnlyChunkedList<T> {
private static <T> T pollElement(AtomicChunk<T> buffer, int i) {
T e;
while ((e = buffer.get(i)) == null) {
Thread.yield();
}
return e;
}