diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/ConcurrentAppendOnlyChunkedList.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/ConcurrentAppendOnlyChunkedList.java index c288e16b55..4a91fe0175 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/ConcurrentAppendOnlyChunkedList.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/ConcurrentAppendOnlyChunkedList.java @@ -42,8 +42,6 @@ public final class ConcurrentAppendOnlyChunkedList { private static final AtomicLongFieldUpdater LAST_INDEX_UPDATER = AtomicLongFieldUpdater.newUpdater(ConcurrentAppendOnlyChunkedList.class, "lastIndex"); - private static final AtomicLongFieldUpdater CACHED_LAST_INDEX_UPDATER = AtomicLongFieldUpdater.newUpdater(ConcurrentAppendOnlyChunkedList.class, "cachedLastIndex"); - private final int chunkSize; private final int chunkMask; @@ -58,10 +56,6 @@ public final class ConcurrentAppendOnlyChunkedList { //it's using a parity bit to mark the rotation state ie size === lastIndex >> 1 private volatile long lastIndex = 0; - //cached view of lastIndex used to avoid invalidating lastIndex while being updated by the appends - - private volatile long cachedLastIndex = 0; - /** * @throws IllegalArgumentException if {@code chunkSize} is <0 or not a power of 2 */ @@ -105,16 +99,10 @@ public final class ConcurrentAppendOnlyChunkedList { if (index < 0) { return null; } - //it allow to perform less cache invalidations vs lastIndex if there are bursts of appends - long lastIndex = cachedLastIndex; + final long lastIndex = getValidLastIndex(); + //it is a element over the current size? if (index >= lastIndex) { - lastIndex = getValidLastIndex(); - //it is a element over the current size? - if (index >= lastIndex) { - return null; - } - //publish it for others readers - CACHED_LAST_INDEX_UPDATER.lazySet(this, lastIndex); + return null; } final AtomicChunk buffer; final int offset; @@ -139,20 +127,20 @@ public final class ConcurrentAppendOnlyChunkedList { final int chunkIndex = index >> chunkSizeLog2; //size is never allowed to be > Integer.MAX_VALUE final int lastChunkIndex = (int) lastIndex >> chunkSizeLog2; - int chunkIndexes = chunkIndex; + int distance = chunkIndex; AtomicChunk buffer = null; boolean forward = true; - int distanceFromLastChunkIndex = lastChunkIndex - chunkIndex; + int distanceFromLast = lastChunkIndex - chunkIndex; //it's worth to go backward from lastChunkIndex? //trying first to check against the value we already have: if it won't worth, won't make sense to load the lastBuffer - if (distanceFromLastChunkIndex < chunkIndex) { + if (distanceFromLast < distance) { final AtomicChunk lastBuffer = this.lastBuffer; //lastBuffer is a potential moving, always increasing, target ie better to re-check the distance - distanceFromLastChunkIndex = lastBuffer.index - chunkIndex; - if (distanceFromLastChunkIndex < chunkIndex) { + distanceFromLast = lastBuffer.index - chunkIndex; + if (distanceFromLast < distance) { //we're saving some jumps ie is fine to go backward from here buffer = lastBuffer; - chunkIndexes = distanceFromLastChunkIndex; + distance = distanceFromLast; forward = false; } } @@ -160,7 +148,7 @@ public final class ConcurrentAppendOnlyChunkedList { if (buffer == null) { buffer = firstBuffer; } - for (int i = 0; i < chunkIndexes; i++) { + for (int i = 0; i < distance; i++) { //next chunk is always set if below a read lastIndex value //previous chunk is final and can be safely read buffer = forward ? buffer.next : buffer.prev; @@ -234,21 +222,31 @@ public final class ConcurrentAppendOnlyChunkedList { return true; } + public T[] toArray(IntFunction arrayAllocator) { + return toArray(arrayAllocator, 0); + } + /** * Returns an array containing all of the elements in this collection in proper * sequence (from first to last element).
* {@code arrayAllocator} will be used to instantiate the array of the correct size with the right runtime type. */ - public T[] toArray(IntFunction arrayAllocator) { + public T[] toArray(IntFunction arrayAllocator, int startIndex) { + if (startIndex < 0) { + throw new ArrayIndexOutOfBoundsException("startIndex must be >= 0"); + } final long lastIndex = getValidLastIndex(); assert lastIndex <= Integer.MAX_VALUE; final int size = (int) lastIndex; final T[] elements = arrayAllocator.apply(size); + if (startIndex + size > elements.length) { + throw new ArrayIndexOutOfBoundsException(); + } //fast division by a power of 2 final int chunkSize = this.chunkSize; final int chunks = size > chunkSize ? size >> chunkSizeLog2 : 0; AtomicChunk buffer = firstBuffer; - int elementIndex = 0; + int elementIndex = startIndex; for (int i = 0; i < chunks; i++) { drain(buffer, elements, elementIndex, chunkSize); elementIndex += chunkSize; diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/collections/ConcurrentAppendOnlyChunkedListTest.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/collections/ConcurrentAppendOnlyChunkedListTest.java index 922c78024b..2803d60dbb 100644 --- a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/collections/ConcurrentAppendOnlyChunkedListTest.java +++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/collections/ConcurrentAppendOnlyChunkedListTest.java @@ -20,13 +20,15 @@ */ package org.apache.activemq.artemis.utils.collections; +import java.util.Arrays; + import org.junit.Assert; import org.junit.Test; public class ConcurrentAppendOnlyChunkedListTest { - private static final int CHUNK_SIZE = 32; - private static final int ELEMENTS = (CHUNK_SIZE * 3) + 1; + private static final int CHUNK_SIZE = 16; + private static final int ELEMENTS = (CHUNK_SIZE * 4) + 1; private final ConcurrentAppendOnlyChunkedList chunkedList; @@ -101,7 +103,12 @@ public class ConcurrentAppendOnlyChunkedListTest { for (int i = 0; i < messages; i++) { cachedElements[i] = chunkedList.get(i); } - Assert.assertArrayEquals(cachedElements, elements); + Assert.assertArrayEquals(elements, cachedElements); + Arrays.fill(cachedElements, null); + for (int i = messages - 1; i >= 0; i--) { + cachedElements[i] = chunkedList.get(i); + } + Assert.assertArrayEquals(elements, cachedElements); } @Test @@ -117,7 +124,7 @@ public class ConcurrentAppendOnlyChunkedListTest { for (int i = 0; i < messages; i++) { cachedElements[i] = chunkedList.get(i); } - Assert.assertArrayEquals(cachedElements, elements); + Assert.assertArrayEquals(elements, cachedElements); } @Test @@ -133,6 +140,44 @@ public class ConcurrentAppendOnlyChunkedListTest { Assert.assertArrayEquals(elements, cachedElements); } + @Test + public void shouldToArrayWithIndexReturnElementsAccordingToAddOrder() { + final int messages = ELEMENTS; + final Integer[] elements = new Integer[messages]; + for (int i = 0; i < messages; i++) { + final Integer element = i; + elements[i] = element; + chunkedList.add(element); + } + final int offset = 10; + final Integer[] cachedElements = chunkedList.toArray(size -> new Integer[offset + size], offset); + Assert.assertArrayEquals(elements, Arrays.copyOfRange(cachedElements, offset, cachedElements.length)); + Assert.assertArrayEquals(new Integer[offset], Arrays.copyOfRange(cachedElements, 0, offset)); + } + + @Test(expected = ArrayIndexOutOfBoundsException.class) + public void shouldFailToArrayWithInsufficientArrayCapacity() { + final int messages = ELEMENTS; + final Integer[] elements = new Integer[messages]; + for (int i = 0; i < messages; i++) { + final Integer element = i; + elements[i] = element; + chunkedList.add(element); + } + final int offset = 10; + chunkedList.toArray(size -> new Integer[offset + size - 1], offset); + } + + @Test(expected = ArrayIndexOutOfBoundsException.class) + public void shouldFailToArrayWithNegativeStartIndex() { + chunkedList.toArray(Integer[]::new, -1); + } + + @Test(expected = NullPointerException.class) + public void shouldFailToArrayWithNullArray() { + chunkedList.toArray(size -> null); + } + @Test public void shouldToArrayReturnElementsAccordingToAddAllOrder() { final int messages = ELEMENTS; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/BulkPageCache.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/BulkPageCache.java new file mode 100644 index 0000000000..98ef786764 --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/BulkPageCache.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.paging.cursor; + +import org.apache.activemq.artemis.core.paging.PagedMessage; + +public interface BulkPageCache extends PageCache { + + PagedMessage[] getMessages(); +} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/LivePageCache.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/LivePageCache.java index 0d09bc4bd9..01f09a8d24 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/LivePageCache.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/LivePageCache.java @@ -18,7 +18,7 @@ package org.apache.activemq.artemis.core.paging.cursor; import org.apache.activemq.artemis.core.paging.PagedMessage; -public interface LivePageCache extends PageCache { +public interface LivePageCache extends BulkPageCache { void addLiveMessage(PagedMessage message); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCache.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCache.java index 646b568c10..a79dc948fb 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCache.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCache.java @@ -19,28 +19,19 @@ package org.apache.activemq.artemis.core.paging.cursor; import org.apache.activemq.artemis.core.paging.PagedMessage; import org.apache.activemq.artemis.utils.SoftValueLongObjectHashMap; -public interface PageCache extends SoftValueLongObjectHashMap.ValueCache { +public interface PageCache extends SoftValueLongObjectHashMap.ValueCache, AutoCloseable { long getPageId(); int getNumberOfMessages(); - void setMessages(PagedMessage[] messages); - - PagedMessage[] getMessages(); - - /** - * @return whether this cache is still being updated - */ - @Override - boolean isLive(); - /** * @param pagePosition page position * @return */ PagedMessage getMessage(PagePosition pagePosition); + @Override void close(); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCursorProvider.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCursorProvider.java index 29ce8a0ae4..15d78b0a58 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCursorProvider.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCursorProvider.java @@ -33,7 +33,7 @@ public interface PageCursorProvider { PagedReference newReference(PagePosition pos, PagedMessage msg, PageSubscription sub); - void addPageCache(PageCache cache); + void addLivePageCache(LivePageCache cache); /** * @param queueId The cursorID should be the same as the queueId associated for persistence diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagePosition.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagePosition.java index e794f909e2..8e73670bd6 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagePosition.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagePosition.java @@ -34,8 +34,6 @@ public interface PagePosition extends Comparable { void setPersistentSize(long persistentSize); - PagePosition nextMessage(); - PagePosition nextPage(); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java index 6fb42c1a58..260b097ceb 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java @@ -31,7 +31,9 @@ public final class LivePageCacheImpl implements LivePageCache { private static final int CHUNK_SIZE = 32; - private final ConcurrentAppendOnlyChunkedList messages; + private final PagedMessage[] initialMessages; + + private final ConcurrentAppendOnlyChunkedList liveMessages; private final long pageId; @@ -39,7 +41,19 @@ public final class LivePageCacheImpl implements LivePageCache { public LivePageCacheImpl(final long pageId) { this.pageId = pageId; - this.messages = new ConcurrentAppendOnlyChunkedList<>(CHUNK_SIZE); + this.liveMessages = new ConcurrentAppendOnlyChunkedList<>(CHUNK_SIZE); + this.initialMessages = null; + } + + public LivePageCacheImpl(final long pageId, PagedMessage[] initialMessages) { + this.pageId = pageId; + this.liveMessages = new ConcurrentAppendOnlyChunkedList<>(CHUNK_SIZE); + this.initialMessages = initialMessages; + } + + private int initialMessagesSize() { + final PagedMessage[] initialMessages = this.initialMessages; + return initialMessages == null ? 0 : initialMessages.length; } @Override @@ -49,20 +63,21 @@ public final class LivePageCacheImpl implements LivePageCache { @Override public int getNumberOfMessages() { - return messages.size(); - } - - @Override - public void setMessages(PagedMessage[] messages) { - // This method shouldn't be called on liveCache, but we will provide the implementation for it anyway - for (PagedMessage message : messages) { - addLiveMessage(message); - } + return initialMessagesSize() + liveMessages.size(); } @Override public PagedMessage getMessage(PagePosition pagePosition) { - return messages.get(pagePosition.getMessageNr()); + final int messageNr = pagePosition.getMessageNr(); + if (messageNr < 0) { + return null; + } + final int initialOffset = initialMessagesSize(); + if (messageNr < initialOffset) { + return initialMessages[messageNr]; + } + final int index = messageNr - initialOffset; + return liveMessages.get(index); } @Override @@ -73,7 +88,7 @@ public final class LivePageCacheImpl implements LivePageCache { @Override public void addLiveMessage(PagedMessage message) { message.getMessage().usageUp(); - messages.add(message); + liveMessages.add(message); } @Override @@ -84,7 +99,12 @@ public final class LivePageCacheImpl implements LivePageCache { @Override public PagedMessage[] getMessages() { - return messages.toArray(PagedMessage[]::new); + final PagedMessage[] pagedMessages = liveMessages.toArray(size -> new PagedMessage[initialMessagesSize() + size], initialMessagesSize()); + final PagedMessage[] initialMessages = this.initialMessages; + if (initialMessages != null) { + System.arraycopy(initialMessages, 0, pagedMessages, 0, initialMessages.length); + } + return pagedMessages; } @Override diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCacheImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCacheImpl.java index a350ceb0de..84c0cac4b8 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCacheImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCacheImpl.java @@ -17,32 +17,23 @@ package org.apache.activemq.artemis.core.paging.cursor.impl; import org.apache.activemq.artemis.core.paging.PagedMessage; -import org.apache.activemq.artemis.core.paging.cursor.PageCache; +import org.apache.activemq.artemis.core.paging.cursor.BulkPageCache; import org.apache.activemq.artemis.core.paging.cursor.PagePosition; /** * The caching associated to a single page. */ -class PageCacheImpl implements PageCache { +class PageCacheImpl implements BulkPageCache { - // Constants ----------------------------------------------------- - - // Attributes ---------------------------------------------------- - - private PagedMessage[] messages; + private final PagedMessage[] messages; private final long pageId; - // Static -------------------------------------------------------- - - // Constructors -------------------------------------------------- - - PageCacheImpl(final long pageId) { + PageCacheImpl(final long pageId, PagedMessage[] messages) { this.pageId = pageId; + this.messages = messages; } - // Public -------------------------------------------------------- - @Override public PagedMessage getMessage(PagePosition pagePosition) { if (pagePosition.getMessageNr() < messages.length) { @@ -57,11 +48,6 @@ class PageCacheImpl implements PageCache { return pageId; } - @Override - public void setMessages(final PagedMessage[] messages) { - this.messages = messages; - } - @Override public int getNumberOfMessages() { return messages.length; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java index 7ace24b4d1..9acffb3449 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java @@ -28,8 +28,10 @@ import io.netty.util.collection.LongObjectHashMap; import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.paging.PagedMessage; import org.apache.activemq.artemis.core.paging.PagingStore; +import org.apache.activemq.artemis.core.paging.cursor.LivePageCache; import org.apache.activemq.artemis.core.paging.cursor.NonExistentPage; import org.apache.activemq.artemis.core.paging.cursor.PageCache; +import org.apache.activemq.artemis.core.paging.cursor.BulkPageCache; import org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider; import org.apache.activemq.artemis.core.paging.cursor.PagePosition; import org.apache.activemq.artemis.core.paging.cursor.PageSubscription; @@ -72,11 +74,11 @@ public class PageCursorProviderImpl implements PageCursorProvider { // This is the same executor used at the PageStoreImpl. One Executor per pageStore private final ArtemisExecutor executor; - private final SoftValueLongObjectHashMap softCache; + private final SoftValueLongObjectHashMap softCache; private LongObjectHashMap numberOfMessages = null; - private final LongObjectHashMap> inProgressReadPages; + private final LongObjectHashMap> inProgressReadPages; private final ConcurrentLongHashMap activeCursors = new ConcurrentLongHashMap<>(); @@ -162,8 +164,8 @@ public class PageCursorProviderImpl implements PageCursorProvider { return null; } boolean createPage = false; - CompletableFuture inProgressReadPage; - PageCache cache; + CompletableFuture inProgressReadPage; + BulkPageCache cache; Page page = null; synchronized (softCache) { cache = softCache.get(pageId); @@ -184,8 +186,7 @@ public class PageCursorProviderImpl implements PageCursorProvider { if (numberOfMessages != null && numberOfMessages.containsKey(pageId)) { return new PageReader(pagingStore.createPage((int) pageId), numberOfMessages.get(pageId)); } - final CompletableFuture readPage = new CompletableFuture<>(); - cache = createPageCache(pageId); + final CompletableFuture readPage = new CompletableFuture<>(); page = pagingStore.createPage((int) pageId); createPage = true; inProgressReadPage = readPage; @@ -193,7 +194,7 @@ public class PageCursorProviderImpl implements PageCursorProvider { } } if (createPage) { - return readPage(pageId, page, cache, inProgressReadPage); + return readPage(pageId, page, inProgressReadPage); } else { final long startedWait = System.nanoTime(); while (true) { @@ -214,11 +215,11 @@ public class PageCursorProviderImpl implements PageCursorProvider { private PageCache readPage(long pageId, Page page, - PageCache cache, - CompletableFuture inProgressReadPage) throws Exception { + CompletableFuture inProgressReadPage) throws Exception { logger.tracef("adding pageCache pageNr=%d into cursor = %s", pageId, this.pagingStore.getAddress()); boolean acquiredPageReadPermission = false; int num = -1; + final PageCacheImpl cache; try { final long startedRequest = System.nanoTime(); while (!acquiredPageReadPermission) { @@ -238,7 +239,7 @@ public class PageCursorProviderImpl implements PageCursorProvider { pagingStore.getAddress(), TimeUnit.NANOSECONDS.toMillis(elapsedReadPage), page.getSize()); } num = pgdMessages.size(); - cache.setMessages(pgdMessages.toArray(new PagedMessage[num])); + cache = new PageCacheImpl(pageId, pgdMessages.toArray(new PagedMessage[num])); } catch (Throwable t) { inProgressReadPage.completeExceptionally(t); synchronized (softCache) { @@ -268,8 +269,8 @@ public class PageCursorProviderImpl implements PageCursorProvider { } @Override - public void addPageCache(PageCache cache) { - logger.tracef("Add page cache %s", cache); + public void addLivePageCache(LivePageCache cache) { + logger.tracef("Add live page cache %s", cache); synchronized (softCache) { softCache.put(cache.getPageId(), cache); } @@ -537,7 +538,7 @@ public class PageCursorProviderImpl implements PageCursorProvider { logger.tracef("this(%s) finishing cleanup on %s", this, depagedPages); try { for (Page depagedPage : depagedPages) { - PageCache cache; + BulkPageCache cache; PagedMessage[] pgdMessages; synchronized (softCache) { cache = softCache.get((long) depagedPage.getPageId()); @@ -659,17 +660,6 @@ public class PageCursorProviderImpl implements PageCursorProvider { '}'; } - // Package protected --------------------------------------------- - - // Protected ----------------------------------------------------- - - /* Protected as we may let test cases to instrument the test */ - protected PageCacheImpl createPageCache(final long pageId) { - return new PageCacheImpl(pageId); - } - - // Private ------------------------------------------------------- - /** * This method is synchronized because we want it to be atomic with the cursors being used */ diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PagePositionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PagePositionImpl.java index 50907dbe9d..6447903e3b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PagePositionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PagePositionImpl.java @@ -126,11 +126,6 @@ public class PagePositionImpl implements PagePosition { } else return Long.compare(recordID, o.getRecordID()); } - @Override - public PagePosition nextMessage() { - return new PagePositionImpl(this.pageNr, this.messageNr + 1); - } - @Override public PagePosition nextPage() { return new PagePositionImpl(this.pageNr + 1, 0, 0); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageReader.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageReader.java index f518f75892..83ad21e177 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageReader.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageReader.java @@ -29,7 +29,6 @@ public class PageReader implements PageCache { private final Page page; private final int numberOfMessages; - private PagedMessage[] pagedMessages = null; public PageReader(Page page, int numberOfMessages) { this.page = page; @@ -46,24 +45,17 @@ public class PageReader implements PageCache { return numberOfMessages; } - @Override - public void setMessages(PagedMessage[] messages) { - this.pagedMessages = messages; - } - - @Override - public synchronized PagedMessage[] getMessages() { - if (pagedMessages != null) { - return pagedMessages; - } else { - try { - openPage(); - return page.read().toArray(new PagedMessage[numberOfMessages]); - } catch (Exception e) { - throw new RuntimeException(e.getMessage(), e); - } finally { - close(); - } + /** + * Used just for testing purposes. + */ + protected synchronized PagedMessage[] readMessages() { + try { + openPage(); + return page.read().toArray(new PagedMessage[numberOfMessages]); + } catch (Exception e) { + throw new RuntimeException(e.getMessage(), e); + } finally { + close(); } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java index 3c248af7b5..688448614b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java @@ -478,16 +478,11 @@ public class PagingStoreImpl implements PagingStore { Page page = createPage(pageId); page.open(); - List messages = page.read(storageManager); + final List messages = page.read(storageManager); - LivePageCache pageCache = new LivePageCacheImpl(pageId); + final PagedMessage[] initialMessages = messages.toArray(new PagedMessage[messages.size()]); - for (PagedMessage msg : messages) { - pageCache.addLiveMessage(msg); - // As we add back to the live page, - // we have to discount one when we read the page - msg.getMessage().usageDown(); - } + final LivePageCache pageCache = new LivePageCacheImpl(pageId, initialMessages); page.setLiveCache(pageCache); @@ -495,7 +490,7 @@ public class PagingStoreImpl implements PagingStore { currentPage = page; - cursorProvider.addPageCache(pageCache); + cursorProvider.addLivePageCache(pageCache); /** * The page file might be incomplete in the cases: 1) last message incomplete 2) disk damaged. @@ -1125,7 +1120,7 @@ public class PagingStoreImpl implements PagingStore { newPage.setLiveCache(pageCache); - cursorProvider.addPageCache(pageCache); + cursorProvider.addLivePageCache(pageCache); currentPageSize = 0; diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageReaderTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageReaderTest.java index 2ad18e54aa..eba884f547 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageReaderTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageReaderTest.java @@ -50,7 +50,7 @@ public class PageReaderTest extends ActiveMQTestBase { int[] offsets = createPage(num); PageReader pageReader = getPageReader(); - PagedMessage[] pagedMessages = pageReader.getMessages(); + PagedMessage[] pagedMessages = pageReader.readMessages(); assertEquals(pagedMessages.length, num); PagedMessage pagedMessage = null; @@ -80,7 +80,7 @@ public class PageReaderTest extends ActiveMQTestBase { int[] offsets = createPage(num); PageReader pageReader = getPageReader(); - PagedMessage[] pagedMessages = pageReader.getMessages(); + PagedMessage[] pagedMessages = pageReader.readMessages(); assertEquals(pagedMessages.length, num); PagePosition pagePosition = new PagePositionImpl(10, 0); @@ -108,7 +108,7 @@ public class PageReaderTest extends ActiveMQTestBase { int[] offsets = createPage(num); PageReader pageReader = getPageReader(); - PagedMessage[] pagedMessages = pageReader.getMessages(); + PagedMessage[] pagedMessages = pageReader.readMessages(); assertEquals(pagedMessages.length, num); PagePosition pagePosition = new PagePositionImpl(10, 0, 50); diff --git a/tests/performance-jmh/src/main/java/org/apache/activemq/artemis/tests/performance/jmh/ConcurrentAppendOnlyChunkListLookupBenchmark.java b/tests/performance-jmh/src/main/java/org/apache/activemq/artemis/tests/performance/jmh/ConcurrentAppendOnlyChunkListLookupBenchmark.java new file mode 100644 index 0000000000..8d36fb49e8 --- /dev/null +++ b/tests/performance-jmh/src/main/java/org/apache/activemq/artemis/tests/performance/jmh/ConcurrentAppendOnlyChunkListLookupBenchmark.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.performance.jmh; + +import org.apache.activemq.artemis.utils.collections.ConcurrentAppendOnlyChunkedList; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; + +@State(Scope.Benchmark) +@Fork(2) +@Warmup(iterations = 5, time = 1) +@Measurement(iterations = 8, time = 1) +public class ConcurrentAppendOnlyChunkListLookupBenchmark { + + private static final Integer MSG = 0; + + @Param({"1000", "10000"}) + int size; + + @Param({"32"}) + int chunkSize; + + private ConcurrentAppendOnlyChunkedList list; + + @Setup + public void init() { + list = new ConcurrentAppendOnlyChunkedList<>(chunkSize); + for (int i = 0; i < size; i++) { + list.add(MSG); + } + } + + @State(Scope.Thread) + public static class Index { + + private int size; + private int index; + + @Setup + public void init(ConcurrentAppendOnlyChunkListLookupBenchmark benchmark) { + index = 0; + size = benchmark.size; + } + + public int next() { + index++; + if (index == size) { + index = 0; + return 0; + } + return index; + } + } + + @Benchmark + public Integer lookup(Index index) { + return list.get(index.next()); + } + +}