This commit is contained in:
Clebert Suconic 2021-01-12 17:28:13 -05:00
commit 65d6efa2ed
15 changed files with 251 additions and 137 deletions

View File

@ -42,8 +42,6 @@ public final class ConcurrentAppendOnlyChunkedList<T> {
private static final AtomicLongFieldUpdater<ConcurrentAppendOnlyChunkedList> LAST_INDEX_UPDATER = AtomicLongFieldUpdater.newUpdater(ConcurrentAppendOnlyChunkedList.class, "lastIndex"); private static final AtomicLongFieldUpdater<ConcurrentAppendOnlyChunkedList> LAST_INDEX_UPDATER = AtomicLongFieldUpdater.newUpdater(ConcurrentAppendOnlyChunkedList.class, "lastIndex");
private static final AtomicLongFieldUpdater<ConcurrentAppendOnlyChunkedList> CACHED_LAST_INDEX_UPDATER = AtomicLongFieldUpdater.newUpdater(ConcurrentAppendOnlyChunkedList.class, "cachedLastIndex");
private final int chunkSize; private final int chunkSize;
private final int chunkMask; private final int chunkMask;
@ -58,10 +56,6 @@ public final class ConcurrentAppendOnlyChunkedList<T> {
//it's using a parity bit to mark the rotation state ie size === lastIndex >> 1 //it's using a parity bit to mark the rotation state ie size === lastIndex >> 1
private volatile long lastIndex = 0; private volatile long lastIndex = 0;
//cached view of lastIndex used to avoid invalidating lastIndex while being updated by the appends
private volatile long cachedLastIndex = 0;
/** /**
* @throws IllegalArgumentException if {@code chunkSize} is &lt;0 or not a power of 2 * @throws IllegalArgumentException if {@code chunkSize} is &lt;0 or not a power of 2
*/ */
@ -105,17 +99,11 @@ public final class ConcurrentAppendOnlyChunkedList<T> {
if (index < 0) { if (index < 0) {
return null; return null;
} }
//it allow to perform less cache invalidations vs lastIndex if there are bursts of appends final long lastIndex = getValidLastIndex();
long lastIndex = cachedLastIndex;
if (index >= lastIndex) {
lastIndex = getValidLastIndex();
//it is a element over the current size? //it is a element over the current size?
if (index >= lastIndex) { if (index >= lastIndex) {
return null; return null;
} }
//publish it for others readers
CACHED_LAST_INDEX_UPDATER.lazySet(this, lastIndex);
}
final AtomicChunk<T> buffer; final AtomicChunk<T> buffer;
final int offset; final int offset;
if (index >= chunkSize) { if (index >= chunkSize) {
@ -139,20 +127,20 @@ public final class ConcurrentAppendOnlyChunkedList<T> {
final int chunkIndex = index >> chunkSizeLog2; final int chunkIndex = index >> chunkSizeLog2;
//size is never allowed to be > Integer.MAX_VALUE //size is never allowed to be > Integer.MAX_VALUE
final int lastChunkIndex = (int) lastIndex >> chunkSizeLog2; final int lastChunkIndex = (int) lastIndex >> chunkSizeLog2;
int chunkIndexes = chunkIndex; int distance = chunkIndex;
AtomicChunk<T> buffer = null; AtomicChunk<T> buffer = null;
boolean forward = true; boolean forward = true;
int distanceFromLastChunkIndex = lastChunkIndex - chunkIndex; int distanceFromLast = lastChunkIndex - chunkIndex;
//it's worth to go backward from lastChunkIndex? //it's worth to go backward from lastChunkIndex?
//trying first to check against the value we already have: if it won't worth, won't make sense to load the lastBuffer //trying first to check against the value we already have: if it won't worth, won't make sense to load the lastBuffer
if (distanceFromLastChunkIndex < chunkIndex) { if (distanceFromLast < distance) {
final AtomicChunk<T> lastBuffer = this.lastBuffer; final AtomicChunk<T> lastBuffer = this.lastBuffer;
//lastBuffer is a potential moving, always increasing, target ie better to re-check the distance //lastBuffer is a potential moving, always increasing, target ie better to re-check the distance
distanceFromLastChunkIndex = lastBuffer.index - chunkIndex; distanceFromLast = lastBuffer.index - chunkIndex;
if (distanceFromLastChunkIndex < chunkIndex) { if (distanceFromLast < distance) {
//we're saving some jumps ie is fine to go backward from here //we're saving some jumps ie is fine to go backward from here
buffer = lastBuffer; buffer = lastBuffer;
chunkIndexes = distanceFromLastChunkIndex; distance = distanceFromLast;
forward = false; forward = false;
} }
} }
@ -160,7 +148,7 @@ public final class ConcurrentAppendOnlyChunkedList<T> {
if (buffer == null) { if (buffer == null) {
buffer = firstBuffer; buffer = firstBuffer;
} }
for (int i = 0; i < chunkIndexes; i++) { for (int i = 0; i < distance; i++) {
//next chunk is always set if below a read lastIndex value //next chunk is always set if below a read lastIndex value
//previous chunk is final and can be safely read //previous chunk is final and can be safely read
buffer = forward ? buffer.next : buffer.prev; buffer = forward ? buffer.next : buffer.prev;
@ -234,21 +222,31 @@ public final class ConcurrentAppendOnlyChunkedList<T> {
return true; return true;
} }
public T[] toArray(IntFunction<T[]> arrayAllocator) {
return toArray(arrayAllocator, 0);
}
/** /**
* Returns an array containing all of the elements in this collection in proper * Returns an array containing all of the elements in this collection in proper
* sequence (from first to last element).<br> * sequence (from first to last element).<br>
* {@code arrayAllocator} will be used to instantiate the array of the correct size with the right runtime type. * {@code arrayAllocator} will be used to instantiate the array of the correct size with the right runtime type.
*/ */
public T[] toArray(IntFunction<T[]> arrayAllocator) { public T[] toArray(IntFunction<T[]> arrayAllocator, int startIndex) {
if (startIndex < 0) {
throw new ArrayIndexOutOfBoundsException("startIndex must be >= 0");
}
final long lastIndex = getValidLastIndex(); final long lastIndex = getValidLastIndex();
assert lastIndex <= Integer.MAX_VALUE; assert lastIndex <= Integer.MAX_VALUE;
final int size = (int) lastIndex; final int size = (int) lastIndex;
final T[] elements = arrayAllocator.apply(size); final T[] elements = arrayAllocator.apply(size);
if (startIndex + size > elements.length) {
throw new ArrayIndexOutOfBoundsException();
}
//fast division by a power of 2 //fast division by a power of 2
final int chunkSize = this.chunkSize; final int chunkSize = this.chunkSize;
final int chunks = size > chunkSize ? size >> chunkSizeLog2 : 0; final int chunks = size > chunkSize ? size >> chunkSizeLog2 : 0;
AtomicChunk<T> buffer = firstBuffer; AtomicChunk<T> buffer = firstBuffer;
int elementIndex = 0; int elementIndex = startIndex;
for (int i = 0; i < chunks; i++) { for (int i = 0; i < chunks; i++) {
drain(buffer, elements, elementIndex, chunkSize); drain(buffer, elements, elementIndex, chunkSize);
elementIndex += chunkSize; elementIndex += chunkSize;

View File

@ -20,13 +20,15 @@
*/ */
package org.apache.activemq.artemis.utils.collections; package org.apache.activemq.artemis.utils.collections;
import java.util.Arrays;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
public class ConcurrentAppendOnlyChunkedListTest { public class ConcurrentAppendOnlyChunkedListTest {
private static final int CHUNK_SIZE = 32; private static final int CHUNK_SIZE = 16;
private static final int ELEMENTS = (CHUNK_SIZE * 3) + 1; private static final int ELEMENTS = (CHUNK_SIZE * 4) + 1;
private final ConcurrentAppendOnlyChunkedList<Integer> chunkedList; private final ConcurrentAppendOnlyChunkedList<Integer> chunkedList;
@ -101,7 +103,12 @@ public class ConcurrentAppendOnlyChunkedListTest {
for (int i = 0; i < messages; i++) { for (int i = 0; i < messages; i++) {
cachedElements[i] = chunkedList.get(i); cachedElements[i] = chunkedList.get(i);
} }
Assert.assertArrayEquals(cachedElements, elements); Assert.assertArrayEquals(elements, cachedElements);
Arrays.fill(cachedElements, null);
for (int i = messages - 1; i >= 0; i--) {
cachedElements[i] = chunkedList.get(i);
}
Assert.assertArrayEquals(elements, cachedElements);
} }
@Test @Test
@ -117,7 +124,7 @@ public class ConcurrentAppendOnlyChunkedListTest {
for (int i = 0; i < messages; i++) { for (int i = 0; i < messages; i++) {
cachedElements[i] = chunkedList.get(i); cachedElements[i] = chunkedList.get(i);
} }
Assert.assertArrayEquals(cachedElements, elements); Assert.assertArrayEquals(elements, cachedElements);
} }
@Test @Test
@ -133,6 +140,44 @@ public class ConcurrentAppendOnlyChunkedListTest {
Assert.assertArrayEquals(elements, cachedElements); Assert.assertArrayEquals(elements, cachedElements);
} }
@Test
public void shouldToArrayWithIndexReturnElementsAccordingToAddOrder() {
final int messages = ELEMENTS;
final Integer[] elements = new Integer[messages];
for (int i = 0; i < messages; i++) {
final Integer element = i;
elements[i] = element;
chunkedList.add(element);
}
final int offset = 10;
final Integer[] cachedElements = chunkedList.toArray(size -> new Integer[offset + size], offset);
Assert.assertArrayEquals(elements, Arrays.copyOfRange(cachedElements, offset, cachedElements.length));
Assert.assertArrayEquals(new Integer[offset], Arrays.copyOfRange(cachedElements, 0, offset));
}
@Test(expected = ArrayIndexOutOfBoundsException.class)
public void shouldFailToArrayWithInsufficientArrayCapacity() {
final int messages = ELEMENTS;
final Integer[] elements = new Integer[messages];
for (int i = 0; i < messages; i++) {
final Integer element = i;
elements[i] = element;
chunkedList.add(element);
}
final int offset = 10;
chunkedList.toArray(size -> new Integer[offset + size - 1], offset);
}
@Test(expected = ArrayIndexOutOfBoundsException.class)
public void shouldFailToArrayWithNegativeStartIndex() {
chunkedList.toArray(Integer[]::new, -1);
}
@Test(expected = NullPointerException.class)
public void shouldFailToArrayWithNullArray() {
chunkedList.toArray(size -> null);
}
@Test @Test
public void shouldToArrayReturnElementsAccordingToAddAllOrder() { public void shouldToArrayReturnElementsAccordingToAddAllOrder() {
final int messages = ELEMENTS; final int messages = ELEMENTS;

View File

@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.paging.cursor;
import org.apache.activemq.artemis.core.paging.PagedMessage;
public interface BulkPageCache extends PageCache {
PagedMessage[] getMessages();
}

View File

@ -18,7 +18,7 @@ package org.apache.activemq.artemis.core.paging.cursor;
import org.apache.activemq.artemis.core.paging.PagedMessage; import org.apache.activemq.artemis.core.paging.PagedMessage;
public interface LivePageCache extends PageCache { public interface LivePageCache extends BulkPageCache {
void addLiveMessage(PagedMessage message); void addLiveMessage(PagedMessage message);
} }

View File

@ -19,28 +19,19 @@ package org.apache.activemq.artemis.core.paging.cursor;
import org.apache.activemq.artemis.core.paging.PagedMessage; import org.apache.activemq.artemis.core.paging.PagedMessage;
import org.apache.activemq.artemis.utils.SoftValueLongObjectHashMap; import org.apache.activemq.artemis.utils.SoftValueLongObjectHashMap;
public interface PageCache extends SoftValueLongObjectHashMap.ValueCache { public interface PageCache extends SoftValueLongObjectHashMap.ValueCache, AutoCloseable {
long getPageId(); long getPageId();
int getNumberOfMessages(); int getNumberOfMessages();
void setMessages(PagedMessage[] messages);
PagedMessage[] getMessages();
/**
* @return whether this cache is still being updated
*/
@Override
boolean isLive();
/** /**
* @param pagePosition page position * @param pagePosition page position
* @return * @return
*/ */
PagedMessage getMessage(PagePosition pagePosition); PagedMessage getMessage(PagePosition pagePosition);
@Override
void close(); void close();
} }

View File

@ -33,7 +33,7 @@ public interface PageCursorProvider {
PagedReference newReference(PagePosition pos, PagedMessage msg, PageSubscription sub); PagedReference newReference(PagePosition pos, PagedMessage msg, PageSubscription sub);
void addPageCache(PageCache cache); void addLivePageCache(LivePageCache cache);
/** /**
* @param queueId The cursorID should be the same as the queueId associated for persistence * @param queueId The cursorID should be the same as the queueId associated for persistence

View File

@ -34,8 +34,6 @@ public interface PagePosition extends Comparable<PagePosition> {
void setPersistentSize(long persistentSize); void setPersistentSize(long persistentSize);
PagePosition nextMessage();
PagePosition nextPage(); PagePosition nextPage();
} }

View File

@ -31,7 +31,9 @@ public final class LivePageCacheImpl implements LivePageCache {
private static final int CHUNK_SIZE = 32; private static final int CHUNK_SIZE = 32;
private final ConcurrentAppendOnlyChunkedList<PagedMessage> messages; private final PagedMessage[] initialMessages;
private final ConcurrentAppendOnlyChunkedList<PagedMessage> liveMessages;
private final long pageId; private final long pageId;
@ -39,7 +41,19 @@ public final class LivePageCacheImpl implements LivePageCache {
public LivePageCacheImpl(final long pageId) { public LivePageCacheImpl(final long pageId) {
this.pageId = pageId; this.pageId = pageId;
this.messages = new ConcurrentAppendOnlyChunkedList<>(CHUNK_SIZE); this.liveMessages = new ConcurrentAppendOnlyChunkedList<>(CHUNK_SIZE);
this.initialMessages = null;
}
public LivePageCacheImpl(final long pageId, PagedMessage[] initialMessages) {
this.pageId = pageId;
this.liveMessages = new ConcurrentAppendOnlyChunkedList<>(CHUNK_SIZE);
this.initialMessages = initialMessages;
}
private int initialMessagesSize() {
final PagedMessage[] initialMessages = this.initialMessages;
return initialMessages == null ? 0 : initialMessages.length;
} }
@Override @Override
@ -49,20 +63,21 @@ public final class LivePageCacheImpl implements LivePageCache {
@Override @Override
public int getNumberOfMessages() { public int getNumberOfMessages() {
return messages.size(); return initialMessagesSize() + liveMessages.size();
}
@Override
public void setMessages(PagedMessage[] messages) {
// This method shouldn't be called on liveCache, but we will provide the implementation for it anyway
for (PagedMessage message : messages) {
addLiveMessage(message);
}
} }
@Override @Override
public PagedMessage getMessage(PagePosition pagePosition) { public PagedMessage getMessage(PagePosition pagePosition) {
return messages.get(pagePosition.getMessageNr()); final int messageNr = pagePosition.getMessageNr();
if (messageNr < 0) {
return null;
}
final int initialOffset = initialMessagesSize();
if (messageNr < initialOffset) {
return initialMessages[messageNr];
}
final int index = messageNr - initialOffset;
return liveMessages.get(index);
} }
@Override @Override
@ -73,7 +88,7 @@ public final class LivePageCacheImpl implements LivePageCache {
@Override @Override
public void addLiveMessage(PagedMessage message) { public void addLiveMessage(PagedMessage message) {
message.getMessage().usageUp(); message.getMessage().usageUp();
messages.add(message); liveMessages.add(message);
} }
@Override @Override
@ -84,7 +99,12 @@ public final class LivePageCacheImpl implements LivePageCache {
@Override @Override
public PagedMessage[] getMessages() { public PagedMessage[] getMessages() {
return messages.toArray(PagedMessage[]::new); final PagedMessage[] pagedMessages = liveMessages.toArray(size -> new PagedMessage[initialMessagesSize() + size], initialMessagesSize());
final PagedMessage[] initialMessages = this.initialMessages;
if (initialMessages != null) {
System.arraycopy(initialMessages, 0, pagedMessages, 0, initialMessages.length);
}
return pagedMessages;
} }
@Override @Override

View File

@ -17,32 +17,23 @@
package org.apache.activemq.artemis.core.paging.cursor.impl; package org.apache.activemq.artemis.core.paging.cursor.impl;
import org.apache.activemq.artemis.core.paging.PagedMessage; import org.apache.activemq.artemis.core.paging.PagedMessage;
import org.apache.activemq.artemis.core.paging.cursor.PageCache; import org.apache.activemq.artemis.core.paging.cursor.BulkPageCache;
import org.apache.activemq.artemis.core.paging.cursor.PagePosition; import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
/** /**
* The caching associated to a single page. * The caching associated to a single page.
*/ */
class PageCacheImpl implements PageCache { class PageCacheImpl implements BulkPageCache {
// Constants ----------------------------------------------------- private final PagedMessage[] messages;
// Attributes ----------------------------------------------------
private PagedMessage[] messages;
private final long pageId; private final long pageId;
// Static -------------------------------------------------------- PageCacheImpl(final long pageId, PagedMessage[] messages) {
// Constructors --------------------------------------------------
PageCacheImpl(final long pageId) {
this.pageId = pageId; this.pageId = pageId;
this.messages = messages;
} }
// Public --------------------------------------------------------
@Override @Override
public PagedMessage getMessage(PagePosition pagePosition) { public PagedMessage getMessage(PagePosition pagePosition) {
if (pagePosition.getMessageNr() < messages.length) { if (pagePosition.getMessageNr() < messages.length) {
@ -57,11 +48,6 @@ class PageCacheImpl implements PageCache {
return pageId; return pageId;
} }
@Override
public void setMessages(final PagedMessage[] messages) {
this.messages = messages;
}
@Override @Override
public int getNumberOfMessages() { public int getNumberOfMessages() {
return messages.length; return messages.length;

View File

@ -28,8 +28,10 @@ import io.netty.util.collection.LongObjectHashMap;
import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.paging.PagedMessage; import org.apache.activemq.artemis.core.paging.PagedMessage;
import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.paging.cursor.LivePageCache;
import org.apache.activemq.artemis.core.paging.cursor.NonExistentPage; import org.apache.activemq.artemis.core.paging.cursor.NonExistentPage;
import org.apache.activemq.artemis.core.paging.cursor.PageCache; import org.apache.activemq.artemis.core.paging.cursor.PageCache;
import org.apache.activemq.artemis.core.paging.cursor.BulkPageCache;
import org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider; import org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider;
import org.apache.activemq.artemis.core.paging.cursor.PagePosition; import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription; import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
@ -72,11 +74,11 @@ public class PageCursorProviderImpl implements PageCursorProvider {
// This is the same executor used at the PageStoreImpl. One Executor per pageStore // This is the same executor used at the PageStoreImpl. One Executor per pageStore
private final ArtemisExecutor executor; private final ArtemisExecutor executor;
private final SoftValueLongObjectHashMap<PageCache> softCache; private final SoftValueLongObjectHashMap<BulkPageCache> softCache;
private LongObjectHashMap<Integer> numberOfMessages = null; private LongObjectHashMap<Integer> numberOfMessages = null;
private final LongObjectHashMap<CompletableFuture<PageCache>> inProgressReadPages; private final LongObjectHashMap<CompletableFuture<BulkPageCache>> inProgressReadPages;
private final ConcurrentLongHashMap<PageSubscription> activeCursors = new ConcurrentLongHashMap<>(); private final ConcurrentLongHashMap<PageSubscription> activeCursors = new ConcurrentLongHashMap<>();
@ -162,8 +164,8 @@ public class PageCursorProviderImpl implements PageCursorProvider {
return null; return null;
} }
boolean createPage = false; boolean createPage = false;
CompletableFuture<PageCache> inProgressReadPage; CompletableFuture<BulkPageCache> inProgressReadPage;
PageCache cache; BulkPageCache cache;
Page page = null; Page page = null;
synchronized (softCache) { synchronized (softCache) {
cache = softCache.get(pageId); cache = softCache.get(pageId);
@ -184,8 +186,7 @@ public class PageCursorProviderImpl implements PageCursorProvider {
if (numberOfMessages != null && numberOfMessages.containsKey(pageId)) { if (numberOfMessages != null && numberOfMessages.containsKey(pageId)) {
return new PageReader(pagingStore.createPage((int) pageId), numberOfMessages.get(pageId)); return new PageReader(pagingStore.createPage((int) pageId), numberOfMessages.get(pageId));
} }
final CompletableFuture<PageCache> readPage = new CompletableFuture<>(); final CompletableFuture<BulkPageCache> readPage = new CompletableFuture<>();
cache = createPageCache(pageId);
page = pagingStore.createPage((int) pageId); page = pagingStore.createPage((int) pageId);
createPage = true; createPage = true;
inProgressReadPage = readPage; inProgressReadPage = readPage;
@ -193,7 +194,7 @@ public class PageCursorProviderImpl implements PageCursorProvider {
} }
} }
if (createPage) { if (createPage) {
return readPage(pageId, page, cache, inProgressReadPage); return readPage(pageId, page, inProgressReadPage);
} else { } else {
final long startedWait = System.nanoTime(); final long startedWait = System.nanoTime();
while (true) { while (true) {
@ -214,11 +215,11 @@ public class PageCursorProviderImpl implements PageCursorProvider {
private PageCache readPage(long pageId, private PageCache readPage(long pageId,
Page page, Page page,
PageCache cache, CompletableFuture<BulkPageCache> inProgressReadPage) throws Exception {
CompletableFuture<PageCache> inProgressReadPage) throws Exception {
logger.tracef("adding pageCache pageNr=%d into cursor = %s", pageId, this.pagingStore.getAddress()); logger.tracef("adding pageCache pageNr=%d into cursor = %s", pageId, this.pagingStore.getAddress());
boolean acquiredPageReadPermission = false; boolean acquiredPageReadPermission = false;
int num = -1; int num = -1;
final PageCacheImpl cache;
try { try {
final long startedRequest = System.nanoTime(); final long startedRequest = System.nanoTime();
while (!acquiredPageReadPermission) { while (!acquiredPageReadPermission) {
@ -238,7 +239,7 @@ public class PageCursorProviderImpl implements PageCursorProvider {
pagingStore.getAddress(), TimeUnit.NANOSECONDS.toMillis(elapsedReadPage), page.getSize()); pagingStore.getAddress(), TimeUnit.NANOSECONDS.toMillis(elapsedReadPage), page.getSize());
} }
num = pgdMessages.size(); num = pgdMessages.size();
cache.setMessages(pgdMessages.toArray(new PagedMessage[num])); cache = new PageCacheImpl(pageId, pgdMessages.toArray(new PagedMessage[num]));
} catch (Throwable t) { } catch (Throwable t) {
inProgressReadPage.completeExceptionally(t); inProgressReadPage.completeExceptionally(t);
synchronized (softCache) { synchronized (softCache) {
@ -268,8 +269,8 @@ public class PageCursorProviderImpl implements PageCursorProvider {
} }
@Override @Override
public void addPageCache(PageCache cache) { public void addLivePageCache(LivePageCache cache) {
logger.tracef("Add page cache %s", cache); logger.tracef("Add live page cache %s", cache);
synchronized (softCache) { synchronized (softCache) {
softCache.put(cache.getPageId(), cache); softCache.put(cache.getPageId(), cache);
} }
@ -537,7 +538,7 @@ public class PageCursorProviderImpl implements PageCursorProvider {
logger.tracef("this(%s) finishing cleanup on %s", this, depagedPages); logger.tracef("this(%s) finishing cleanup on %s", this, depagedPages);
try { try {
for (Page depagedPage : depagedPages) { for (Page depagedPage : depagedPages) {
PageCache cache; BulkPageCache cache;
PagedMessage[] pgdMessages; PagedMessage[] pgdMessages;
synchronized (softCache) { synchronized (softCache) {
cache = softCache.get((long) depagedPage.getPageId()); cache = softCache.get((long) depagedPage.getPageId());
@ -659,17 +660,6 @@ public class PageCursorProviderImpl implements PageCursorProvider {
'}'; '}';
} }
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
/* Protected as we may let test cases to instrument the test */
protected PageCacheImpl createPageCache(final long pageId) {
return new PageCacheImpl(pageId);
}
// Private -------------------------------------------------------
/** /**
* This method is synchronized because we want it to be atomic with the cursors being used * This method is synchronized because we want it to be atomic with the cursors being used
*/ */

View File

@ -126,11 +126,6 @@ public class PagePositionImpl implements PagePosition {
} else return Long.compare(recordID, o.getRecordID()); } else return Long.compare(recordID, o.getRecordID());
} }
@Override
public PagePosition nextMessage() {
return new PagePositionImpl(this.pageNr, this.messageNr + 1);
}
@Override @Override
public PagePosition nextPage() { public PagePosition nextPage() {
return new PagePositionImpl(this.pageNr + 1, 0, 0); return new PagePositionImpl(this.pageNr + 1, 0, 0);

View File

@ -29,7 +29,6 @@ public class PageReader implements PageCache {
private final Page page; private final Page page;
private final int numberOfMessages; private final int numberOfMessages;
private PagedMessage[] pagedMessages = null;
public PageReader(Page page, int numberOfMessages) { public PageReader(Page page, int numberOfMessages) {
this.page = page; this.page = page;
@ -46,16 +45,10 @@ public class PageReader implements PageCache {
return numberOfMessages; return numberOfMessages;
} }
@Override /**
public void setMessages(PagedMessage[] messages) { * Used just for testing purposes.
this.pagedMessages = messages; */
} protected synchronized PagedMessage[] readMessages() {
@Override
public synchronized PagedMessage[] getMessages() {
if (pagedMessages != null) {
return pagedMessages;
} else {
try { try {
openPage(); openPage();
return page.read().toArray(new PagedMessage[numberOfMessages]); return page.read().toArray(new PagedMessage[numberOfMessages]);
@ -65,7 +58,6 @@ public class PageReader implements PageCache {
close(); close();
} }
} }
}
@Override @Override
public boolean isLive() { public boolean isLive() {

View File

@ -478,16 +478,11 @@ public class PagingStoreImpl implements PagingStore {
Page page = createPage(pageId); Page page = createPage(pageId);
page.open(); page.open();
List<PagedMessage> messages = page.read(storageManager); final List<PagedMessage> messages = page.read(storageManager);
LivePageCache pageCache = new LivePageCacheImpl(pageId); final PagedMessage[] initialMessages = messages.toArray(new PagedMessage[messages.size()]);
for (PagedMessage msg : messages) { final LivePageCache pageCache = new LivePageCacheImpl(pageId, initialMessages);
pageCache.addLiveMessage(msg);
// As we add back to the live page,
// we have to discount one when we read the page
msg.getMessage().usageDown();
}
page.setLiveCache(pageCache); page.setLiveCache(pageCache);
@ -495,7 +490,7 @@ public class PagingStoreImpl implements PagingStore {
currentPage = page; currentPage = page;
cursorProvider.addPageCache(pageCache); cursorProvider.addLivePageCache(pageCache);
/** /**
* The page file might be incomplete in the cases: 1) last message incomplete 2) disk damaged. * The page file might be incomplete in the cases: 1) last message incomplete 2) disk damaged.
@ -1125,7 +1120,7 @@ public class PagingStoreImpl implements PagingStore {
newPage.setLiveCache(pageCache); newPage.setLiveCache(pageCache);
cursorProvider.addPageCache(pageCache); cursorProvider.addLivePageCache(pageCache);
currentPageSize = 0; currentPageSize = 0;

View File

@ -50,7 +50,7 @@ public class PageReaderTest extends ActiveMQTestBase {
int[] offsets = createPage(num); int[] offsets = createPage(num);
PageReader pageReader = getPageReader(); PageReader pageReader = getPageReader();
PagedMessage[] pagedMessages = pageReader.getMessages(); PagedMessage[] pagedMessages = pageReader.readMessages();
assertEquals(pagedMessages.length, num); assertEquals(pagedMessages.length, num);
PagedMessage pagedMessage = null; PagedMessage pagedMessage = null;
@ -80,7 +80,7 @@ public class PageReaderTest extends ActiveMQTestBase {
int[] offsets = createPage(num); int[] offsets = createPage(num);
PageReader pageReader = getPageReader(); PageReader pageReader = getPageReader();
PagedMessage[] pagedMessages = pageReader.getMessages(); PagedMessage[] pagedMessages = pageReader.readMessages();
assertEquals(pagedMessages.length, num); assertEquals(pagedMessages.length, num);
PagePosition pagePosition = new PagePositionImpl(10, 0); PagePosition pagePosition = new PagePositionImpl(10, 0);
@ -108,7 +108,7 @@ public class PageReaderTest extends ActiveMQTestBase {
int[] offsets = createPage(num); int[] offsets = createPage(num);
PageReader pageReader = getPageReader(); PageReader pageReader = getPageReader();
PagedMessage[] pagedMessages = pageReader.getMessages(); PagedMessage[] pagedMessages = pageReader.readMessages();
assertEquals(pagedMessages.length, num); assertEquals(pagedMessages.length, num);
PagePosition pagePosition = new PagePositionImpl(10, 0, 50); PagePosition pagePosition = new PagePositionImpl(10, 0, 50);

View File

@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.performance.jmh;
import org.apache.activemq.artemis.utils.collections.ConcurrentAppendOnlyChunkedList;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;
@State(Scope.Benchmark)
@Fork(2)
@Warmup(iterations = 5, time = 1)
@Measurement(iterations = 8, time = 1)
public class ConcurrentAppendOnlyChunkListLookupBenchmark {
private static final Integer MSG = 0;
@Param({"1000", "10000"})
int size;
@Param({"32"})
int chunkSize;
private ConcurrentAppendOnlyChunkedList<Integer> list;
@Setup
public void init() {
list = new ConcurrentAppendOnlyChunkedList<>(chunkSize);
for (int i = 0; i < size; i++) {
list.add(MSG);
}
}
@State(Scope.Thread)
public static class Index {
private int size;
private int index;
@Setup
public void init(ConcurrentAppendOnlyChunkListLookupBenchmark benchmark) {
index = 0;
size = benchmark.size;
}
public int next() {
index++;
if (index == size) {
index = 0;
return 0;
}
return index;
}
}
@Benchmark
public Integer lookup(Index index) {
return list.get(index.next());
}
}