ARTEMIS-3049 Reduce live page lookup cost
This commit is contained in:
parent
884336f08c
commit
19b04531c6
|
@ -42,8 +42,6 @@ public final class ConcurrentAppendOnlyChunkedList<T> {
|
|||
|
||||
private static final AtomicLongFieldUpdater<ConcurrentAppendOnlyChunkedList> LAST_INDEX_UPDATER = AtomicLongFieldUpdater.newUpdater(ConcurrentAppendOnlyChunkedList.class, "lastIndex");
|
||||
|
||||
private static final AtomicLongFieldUpdater<ConcurrentAppendOnlyChunkedList> CACHED_LAST_INDEX_UPDATER = AtomicLongFieldUpdater.newUpdater(ConcurrentAppendOnlyChunkedList.class, "cachedLastIndex");
|
||||
|
||||
private final int chunkSize;
|
||||
|
||||
private final int chunkMask;
|
||||
|
@ -58,10 +56,6 @@ public final class ConcurrentAppendOnlyChunkedList<T> {
|
|||
//it's using a parity bit to mark the rotation state ie size === lastIndex >> 1
|
||||
private volatile long lastIndex = 0;
|
||||
|
||||
//cached view of lastIndex used to avoid invalidating lastIndex while being updated by the appends
|
||||
|
||||
private volatile long cachedLastIndex = 0;
|
||||
|
||||
/**
|
||||
* @throws IllegalArgumentException if {@code chunkSize} is <0 or not a power of 2
|
||||
*/
|
||||
|
@ -105,17 +99,11 @@ public final class ConcurrentAppendOnlyChunkedList<T> {
|
|||
if (index < 0) {
|
||||
return null;
|
||||
}
|
||||
//it allow to perform less cache invalidations vs lastIndex if there are bursts of appends
|
||||
long lastIndex = cachedLastIndex;
|
||||
if (index >= lastIndex) {
|
||||
lastIndex = getValidLastIndex();
|
||||
final long lastIndex = getValidLastIndex();
|
||||
//it is a element over the current size?
|
||||
if (index >= lastIndex) {
|
||||
return null;
|
||||
}
|
||||
//publish it for others readers
|
||||
CACHED_LAST_INDEX_UPDATER.lazySet(this, lastIndex);
|
||||
}
|
||||
final AtomicChunk<T> buffer;
|
||||
final int offset;
|
||||
if (index >= chunkSize) {
|
||||
|
@ -139,20 +127,20 @@ public final class ConcurrentAppendOnlyChunkedList<T> {
|
|||
final int chunkIndex = index >> chunkSizeLog2;
|
||||
//size is never allowed to be > Integer.MAX_VALUE
|
||||
final int lastChunkIndex = (int) lastIndex >> chunkSizeLog2;
|
||||
int chunkIndexes = chunkIndex;
|
||||
int distance = chunkIndex;
|
||||
AtomicChunk<T> buffer = null;
|
||||
boolean forward = true;
|
||||
int distanceFromLastChunkIndex = lastChunkIndex - chunkIndex;
|
||||
int distanceFromLast = lastChunkIndex - chunkIndex;
|
||||
//it's worth to go backward from lastChunkIndex?
|
||||
//trying first to check against the value we already have: if it won't worth, won't make sense to load the lastBuffer
|
||||
if (distanceFromLastChunkIndex < chunkIndex) {
|
||||
if (distanceFromLast < distance) {
|
||||
final AtomicChunk<T> lastBuffer = this.lastBuffer;
|
||||
//lastBuffer is a potential moving, always increasing, target ie better to re-check the distance
|
||||
distanceFromLastChunkIndex = lastBuffer.index - chunkIndex;
|
||||
if (distanceFromLastChunkIndex < chunkIndex) {
|
||||
distanceFromLast = lastBuffer.index - chunkIndex;
|
||||
if (distanceFromLast < distance) {
|
||||
//we're saving some jumps ie is fine to go backward from here
|
||||
buffer = lastBuffer;
|
||||
chunkIndexes = distanceFromLastChunkIndex;
|
||||
distance = distanceFromLast;
|
||||
forward = false;
|
||||
}
|
||||
}
|
||||
|
@ -160,7 +148,7 @@ public final class ConcurrentAppendOnlyChunkedList<T> {
|
|||
if (buffer == null) {
|
||||
buffer = firstBuffer;
|
||||
}
|
||||
for (int i = 0; i < chunkIndexes; i++) {
|
||||
for (int i = 0; i < distance; i++) {
|
||||
//next chunk is always set if below a read lastIndex value
|
||||
//previous chunk is final and can be safely read
|
||||
buffer = forward ? buffer.next : buffer.prev;
|
||||
|
@ -234,21 +222,31 @@ public final class ConcurrentAppendOnlyChunkedList<T> {
|
|||
return true;
|
||||
}
|
||||
|
||||
public T[] toArray(IntFunction<T[]> arrayAllocator) {
|
||||
return toArray(arrayAllocator, 0);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns an array containing all of the elements in this collection in proper
|
||||
* sequence (from first to last element).<br>
|
||||
* {@code arrayAllocator} will be used to instantiate the array of the correct size with the right runtime type.
|
||||
*/
|
||||
public T[] toArray(IntFunction<T[]> arrayAllocator) {
|
||||
public T[] toArray(IntFunction<T[]> arrayAllocator, int startIndex) {
|
||||
if (startIndex < 0) {
|
||||
throw new ArrayIndexOutOfBoundsException("startIndex must be >= 0");
|
||||
}
|
||||
final long lastIndex = getValidLastIndex();
|
||||
assert lastIndex <= Integer.MAX_VALUE;
|
||||
final int size = (int) lastIndex;
|
||||
final T[] elements = arrayAllocator.apply(size);
|
||||
if (startIndex + size > elements.length) {
|
||||
throw new ArrayIndexOutOfBoundsException();
|
||||
}
|
||||
//fast division by a power of 2
|
||||
final int chunkSize = this.chunkSize;
|
||||
final int chunks = size > chunkSize ? size >> chunkSizeLog2 : 0;
|
||||
AtomicChunk<T> buffer = firstBuffer;
|
||||
int elementIndex = 0;
|
||||
int elementIndex = startIndex;
|
||||
for (int i = 0; i < chunks; i++) {
|
||||
drain(buffer, elements, elementIndex, chunkSize);
|
||||
elementIndex += chunkSize;
|
||||
|
|
|
@ -20,13 +20,15 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.utils.collections;
|
||||
|
||||
import java.util.Arrays;
|
||||
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
public class ConcurrentAppendOnlyChunkedListTest {
|
||||
|
||||
private static final int CHUNK_SIZE = 32;
|
||||
private static final int ELEMENTS = (CHUNK_SIZE * 3) + 1;
|
||||
private static final int CHUNK_SIZE = 16;
|
||||
private static final int ELEMENTS = (CHUNK_SIZE * 4) + 1;
|
||||
|
||||
private final ConcurrentAppendOnlyChunkedList<Integer> chunkedList;
|
||||
|
||||
|
@ -101,7 +103,12 @@ public class ConcurrentAppendOnlyChunkedListTest {
|
|||
for (int i = 0; i < messages; i++) {
|
||||
cachedElements[i] = chunkedList.get(i);
|
||||
}
|
||||
Assert.assertArrayEquals(cachedElements, elements);
|
||||
Assert.assertArrayEquals(elements, cachedElements);
|
||||
Arrays.fill(cachedElements, null);
|
||||
for (int i = messages - 1; i >= 0; i--) {
|
||||
cachedElements[i] = chunkedList.get(i);
|
||||
}
|
||||
Assert.assertArrayEquals(elements, cachedElements);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -117,7 +124,7 @@ public class ConcurrentAppendOnlyChunkedListTest {
|
|||
for (int i = 0; i < messages; i++) {
|
||||
cachedElements[i] = chunkedList.get(i);
|
||||
}
|
||||
Assert.assertArrayEquals(cachedElements, elements);
|
||||
Assert.assertArrayEquals(elements, cachedElements);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -133,6 +140,44 @@ public class ConcurrentAppendOnlyChunkedListTest {
|
|||
Assert.assertArrayEquals(elements, cachedElements);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldToArrayWithIndexReturnElementsAccordingToAddOrder() {
|
||||
final int messages = ELEMENTS;
|
||||
final Integer[] elements = new Integer[messages];
|
||||
for (int i = 0; i < messages; i++) {
|
||||
final Integer element = i;
|
||||
elements[i] = element;
|
||||
chunkedList.add(element);
|
||||
}
|
||||
final int offset = 10;
|
||||
final Integer[] cachedElements = chunkedList.toArray(size -> new Integer[offset + size], offset);
|
||||
Assert.assertArrayEquals(elements, Arrays.copyOfRange(cachedElements, offset, cachedElements.length));
|
||||
Assert.assertArrayEquals(new Integer[offset], Arrays.copyOfRange(cachedElements, 0, offset));
|
||||
}
|
||||
|
||||
@Test(expected = ArrayIndexOutOfBoundsException.class)
|
||||
public void shouldFailToArrayWithInsufficientArrayCapacity() {
|
||||
final int messages = ELEMENTS;
|
||||
final Integer[] elements = new Integer[messages];
|
||||
for (int i = 0; i < messages; i++) {
|
||||
final Integer element = i;
|
||||
elements[i] = element;
|
||||
chunkedList.add(element);
|
||||
}
|
||||
final int offset = 10;
|
||||
chunkedList.toArray(size -> new Integer[offset + size - 1], offset);
|
||||
}
|
||||
|
||||
@Test(expected = ArrayIndexOutOfBoundsException.class)
|
||||
public void shouldFailToArrayWithNegativeStartIndex() {
|
||||
chunkedList.toArray(Integer[]::new, -1);
|
||||
}
|
||||
|
||||
@Test(expected = NullPointerException.class)
|
||||
public void shouldFailToArrayWithNullArray() {
|
||||
chunkedList.toArray(size -> null);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldToArrayReturnElementsAccordingToAddAllOrder() {
|
||||
final int messages = ELEMENTS;
|
||||
|
|
|
@ -31,7 +31,9 @@ public final class LivePageCacheImpl implements LivePageCache {
|
|||
|
||||
private static final int CHUNK_SIZE = 32;
|
||||
|
||||
private final ConcurrentAppendOnlyChunkedList<PagedMessage> messages;
|
||||
private final PagedMessage[] initialMessages;
|
||||
|
||||
private final ConcurrentAppendOnlyChunkedList<PagedMessage> liveMessages;
|
||||
|
||||
private final long pageId;
|
||||
|
||||
|
@ -39,7 +41,19 @@ public final class LivePageCacheImpl implements LivePageCache {
|
|||
|
||||
public LivePageCacheImpl(final long pageId) {
|
||||
this.pageId = pageId;
|
||||
this.messages = new ConcurrentAppendOnlyChunkedList<>(CHUNK_SIZE);
|
||||
this.liveMessages = new ConcurrentAppendOnlyChunkedList<>(CHUNK_SIZE);
|
||||
this.initialMessages = null;
|
||||
}
|
||||
|
||||
public LivePageCacheImpl(final long pageId, PagedMessage[] initialMessages) {
|
||||
this.pageId = pageId;
|
||||
this.liveMessages = new ConcurrentAppendOnlyChunkedList<>(CHUNK_SIZE);
|
||||
this.initialMessages = initialMessages;
|
||||
}
|
||||
|
||||
private int initialMessagesSize() {
|
||||
final PagedMessage[] initialMessages = this.initialMessages;
|
||||
return initialMessages == null ? 0 : initialMessages.length;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -49,7 +63,7 @@ public final class LivePageCacheImpl implements LivePageCache {
|
|||
|
||||
@Override
|
||||
public int getNumberOfMessages() {
|
||||
return messages.size();
|
||||
return initialMessagesSize() + liveMessages.size();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -62,7 +76,16 @@ public final class LivePageCacheImpl implements LivePageCache {
|
|||
|
||||
@Override
|
||||
public PagedMessage getMessage(PagePosition pagePosition) {
|
||||
return messages.get(pagePosition.getMessageNr());
|
||||
final int messageNr = pagePosition.getMessageNr();
|
||||
if (messageNr < 0) {
|
||||
return null;
|
||||
}
|
||||
final int initialOffset = initialMessagesSize();
|
||||
if (messageNr < initialOffset) {
|
||||
return initialMessages[messageNr];
|
||||
}
|
||||
final int index = messageNr - initialOffset;
|
||||
return liveMessages.get(index);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -73,7 +96,7 @@ public final class LivePageCacheImpl implements LivePageCache {
|
|||
@Override
|
||||
public void addLiveMessage(PagedMessage message) {
|
||||
message.getMessage().usageUp();
|
||||
messages.add(message);
|
||||
liveMessages.add(message);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -84,7 +107,12 @@ public final class LivePageCacheImpl implements LivePageCache {
|
|||
|
||||
@Override
|
||||
public PagedMessage[] getMessages() {
|
||||
return messages.toArray(PagedMessage[]::new);
|
||||
final PagedMessage[] pagedMessages = liveMessages.toArray(size -> new PagedMessage[initialMessagesSize() + size], initialMessagesSize());
|
||||
final PagedMessage[] initialMessages = this.initialMessages;
|
||||
if (initialMessages != null) {
|
||||
System.arraycopy(initialMessages, 0, pagedMessages, 0, initialMessages.length);
|
||||
}
|
||||
return pagedMessages;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -478,16 +478,11 @@ public class PagingStoreImpl implements PagingStore {
|
|||
Page page = createPage(pageId);
|
||||
page.open();
|
||||
|
||||
List<PagedMessage> messages = page.read(storageManager);
|
||||
final List<PagedMessage> messages = page.read(storageManager);
|
||||
|
||||
LivePageCache pageCache = new LivePageCacheImpl(pageId);
|
||||
final PagedMessage[] initialMessages = messages.toArray(new PagedMessage[messages.size()]);
|
||||
|
||||
for (PagedMessage msg : messages) {
|
||||
pageCache.addLiveMessage(msg);
|
||||
// As we add back to the live page,
|
||||
// we have to discount one when we read the page
|
||||
msg.getMessage().usageDown();
|
||||
}
|
||||
final LivePageCache pageCache = new LivePageCacheImpl(pageId, initialMessages);
|
||||
|
||||
page.setLiveCache(pageCache);
|
||||
|
||||
|
|
Loading…
Reference in New Issue