From e541126ca61efe28c0157232b778c2748f259c1a Mon Sep 17 00:00:00 2001 From: Francesco Nigro Date: Fri, 4 Jan 2019 19:17:55 +0100 Subject: [PATCH] ARTEMIS-2224 lock-free LivePageCache + tests LivePageCacheImpl has been reimplemented to be lock-free, multi-producer and multi-consumer in any of its operations. --- .../ConcurrentAppendOnlyChunkedList.java | 286 ++++++++++++++++++ .../ConcurrentAppendOnlyChunkedListTest.java | 155 ++++++++++ .../paging/cursor/impl/LivePageCacheImpl.java | 44 ++- 3 files changed, 461 insertions(+), 24 deletions(-) create mode 100644 artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/ConcurrentAppendOnlyChunkedList.java create mode 100644 artemis-commons/src/test/java/org/apache/activemq/artemis/utils/collections/ConcurrentAppendOnlyChunkedListTest.java diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/ConcurrentAppendOnlyChunkedList.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/ConcurrentAppendOnlyChunkedList.java new file mode 100644 index 0000000000..f55d9e1a0c --- /dev/null +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/ConcurrentAppendOnlyChunkedList.java @@ -0,0 +1,286 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.utils.collections; + +import java.util.Objects; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; +import java.util.concurrent.atomic.AtomicReferenceArray; +import java.util.function.IntFunction; + +/** + * This collection is a concurrent append-only list that grows in chunks.
+ * It's safe to be used by many threads concurrently and has a max capacity of {@link Integer#MAX_VALUE}. + */ +public final class ConcurrentAppendOnlyChunkedList { + + private static final class AtomicChunk extends AtomicReferenceArray { + + AtomicChunk next = null; + final AtomicChunk prev; + final int index; + + AtomicChunk(int index, AtomicChunk prev, int length) { + super(length); + this.index = index; + this.prev = prev; + } + } + + private static final AtomicLongFieldUpdater LAST_INDEX_UPDATER = AtomicLongFieldUpdater.newUpdater(ConcurrentAppendOnlyChunkedList.class, "lastIndex"); + + private static final AtomicLongFieldUpdater CACHED_LAST_INDEX_UPDATER = AtomicLongFieldUpdater.newUpdater(ConcurrentAppendOnlyChunkedList.class, "cachedLastIndex"); + + private final int chunkSize; + + private final int chunkMask; + + private final int chunkSizeLog2; + + private static final long RESIZING = -1; + + private AtomicChunk firstBuffer = null; + + private AtomicChunk lastBuffer = null; + + //it is both the current index of the next element to be claimed and the current size of the collection + private volatile long lastIndex = 0; + + //cached view of lastIndex used to avoid invalidating lastIndex while being updated by the appends + + private volatile long cachedLastIndex = 0; + + /** + * @throws IllegalArgumentException if {@code chunkSize} is <0 or not a power of 2 + */ + public ConcurrentAppendOnlyChunkedList(final int chunkSize) { + if (chunkSize <= 0) { + throw new IllegalArgumentException("chunkSize must be >0"); + } + //IMPORTANT: to enable some nice optimizations on / and %, chunk size MUST BE a power of 2 + if (Integer.bitCount(chunkSize) != 1) { + throw new IllegalArgumentException("chunkSize must be a power of 2"); + } + this.chunkSize = chunkSize; + this.chunkMask = chunkSize - 1; + this.chunkSizeLog2 = Integer.numberOfTrailingZeros(chunkSize); + } + + private long getValidLastIndex() { + while (true) { + final long lastIndex = this.lastIndex; + if (lastIndex == RESIZING) { + Thread.yield(); + continue; + } + return lastIndex; + } + } + + /** + * It returns the number of elements currently added. + */ + public int size() { + return (int) getValidLastIndex(); + } + + /** + * It appends {@code elements} to the collection. + */ + public void addAll(T[] elements) { + for (T e : elements) { + add(e); + } + } + + /** + * Returns the element at the specified position in this collection or {@code null} if not found. + */ + public T get(int index) { + if (index < 0) { + return null; + } + //it allow to perform less cache invalidations vs lastIndex if there are bursts of appends + long lastIndex = cachedLastIndex; + if (index >= lastIndex) { + lastIndex = getValidLastIndex(); + //it is a element over the current size? + if (index >= lastIndex) { + return null; + } + //publish it for others readers + CACHED_LAST_INDEX_UPDATER.lazySet(this, lastIndex); + } + final AtomicChunk buffer; + final int offset; + if (index >= chunkSize) { + offset = index & chunkMask; + //slow path is moved in a separate method + buffer = getChunkOf(index, lastIndex); + } else { + offset = index; + buffer = firstBuffer; + } + return pollElement(buffer, offset); + } + + /** + * Implements a lock-free version of the optimization used on {@link java.util.LinkedList#get(int)} to speed up queries + * ie backward search of a node if needed. + */ + private AtomicChunk getChunkOf(final int index, final long lastIndex) { + final int chunkSizeLog2 = this.chunkSizeLog2; + //fast division by a power of 2 + final int chunkIndex = index >> chunkSizeLog2; + //size is never allowed to be > Integer.MAX_VALUE + final int lastChunkIndex = (int) lastIndex >> chunkSizeLog2; + int chunkIndexes = chunkIndex; + AtomicChunk buffer = null; + boolean forward = true; + int distanceFromLastChunkIndex = lastChunkIndex - chunkIndex; + //it's worth to go backward from lastChunkIndex? + //trying first to check against the value we already have: if it won't worth, won't make sense to load the lastBuffer + if (distanceFromLastChunkIndex < chunkIndex) { + final AtomicChunk lastBuffer = this.lastBuffer; + //lastBuffer is a potential moving, always increasing, target ie better to re-check the distance + distanceFromLastChunkIndex = lastBuffer.index - chunkIndex; + if (distanceFromLastChunkIndex < chunkIndex) { + //we're saving some jumps ie is fine to go backward from here + buffer = lastBuffer; + chunkIndexes = distanceFromLastChunkIndex; + forward = false; + } + } + //start from the first buffer only is needed + if (buffer == null) { + buffer = firstBuffer; + } + for (int i = 0; i < chunkIndexes; i++) { + //next chunk is always set if below a read lastIndex value + //previous chunk is final and can be safely read + buffer = forward ? buffer.next : buffer.prev; + } + return buffer; + } + + /** + * Appends the specified element to the end of this collection. + * + * @throws NullPointerException if {@code e} is {@code null} + **/ + public void add(T e) { + Objects.requireNonNull(e); + while (true) { + final long lastIndex = this.lastIndex; + if (lastIndex != RESIZING) { + if (lastIndex == Integer.MAX_VALUE) { + throw new IllegalStateException("can't add more then " + Integer.MAX_VALUE + " elements"); + } + //load acquire the current lastBuffer + final AtomicChunk lastBuffer = this.lastBuffer; + final int offset = (int) (lastIndex & chunkMask); + //only the first attempt to add an element to a chunk can attempt to resize + if (offset == 0) { + if (addChunkAndElement(lastBuffer, lastIndex, e)) { + return; + } + } else if (LAST_INDEX_UPDATER.compareAndSet(this, lastIndex, lastIndex + 1)) { + //this.lastBuffer is the correct buffer to append a element: it is guarded by the lastIndex logic + //NOTE: lastIndex is being updated before setting a new value + lastBuffer.lazySet(offset, e); + return; + } + } + Thread.yield(); + } + } + + private boolean addChunkAndElement(AtomicChunk lastBuffer, long lastIndex, T element) { + if (!LAST_INDEX_UPDATER.compareAndSet(this, lastIndex, RESIZING)) { + return false; + } + final AtomicChunk newChunk; + try { + final int index = (int) (lastIndex >> chunkSizeLog2); + newChunk = new AtomicChunk<>(index, lastBuffer, chunkSize); + } catch (OutOfMemoryError oom) { + //unblock lastIndex without updating it + LAST_INDEX_UPDATER.lazySet(this, lastIndex); + throw oom; + } + //adding the element to it + newChunk.lazySet(0, element); + //linking it to the old one, if any + if (lastBuffer != null) { + //a plain store is enough, given that lastIndex prevents any reader/writer to access it + lastBuffer.next = newChunk; + } else { + //it's first one + this.firstBuffer = newChunk; + } + //making it the current produced one + this.lastBuffer = newChunk; + //store release any previous write and unblock anyone waiting resizing to finish + LAST_INDEX_UPDATER.lazySet(this, lastIndex + 1); + return true; + } + + /** + * Returns an array containing all of the elements in this collection in proper + * sequence (from first to last element).
+ * {@code arrayAllocator} will be used to instantiate the array of the correct size with the right runtime type. + */ + public T[] toArray(IntFunction arrayAllocator) { + final long lastIndex = getValidLastIndex(); + assert lastIndex <= Integer.MAX_VALUE; + final int size = (int) lastIndex; + final T[] elements = arrayAllocator.apply(size); + //fast division by a power of 2 + final int chunkSize = this.chunkSize; + final int chunks = size > chunkSize ? size >> chunkSizeLog2 : 0; + AtomicChunk buffer = firstBuffer; + int elementIndex = 0; + for (int i = 0; i < chunks; i++) { + drain(buffer, elements, elementIndex, chunkSize); + elementIndex += chunkSize; + //the next chunk is always set if we stay below a past size/lastIndex value + buffer = buffer.next; + } + final int remaining = chunks > 0 ? (size & chunkMask) : size; + drain(buffer, elements, elementIndex, remaining); + return elements; + } + + //NOTE: lastIndex is being updated BEFORE setting a new value ie on reader side need to spin until a not null value is set + private static T pollElement(AtomicChunk buffer, int i) { + T e; + while ((e = buffer.get(i)) == null) { + Thread.yield(); + } + return e; + } + + private static void drain(AtomicChunk buffer, T[] elements, int elementNumber, int length) { + for (int j = 0; j < length; j++) { + final T e = pollElement(buffer, j); + assert e != null; + elements[elementNumber] = e; + elementNumber++; + } + } + +} + diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/collections/ConcurrentAppendOnlyChunkedListTest.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/collections/ConcurrentAppendOnlyChunkedListTest.java new file mode 100644 index 0000000000..a4f405971a --- /dev/null +++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/collections/ConcurrentAppendOnlyChunkedListTest.java @@ -0,0 +1,155 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.activemq.artemis.utils.collections; + +import org.junit.Assert; +import org.junit.Test; + +public class ConcurrentAppendOnlyChunkedListTest { + + private static final int CHUNK_SIZE = 32; + private static final int ELEMENTS = (CHUNK_SIZE * 3) + 1; + + private final ConcurrentAppendOnlyChunkedList chunkedList; + + public ConcurrentAppendOnlyChunkedListTest() { + chunkedList = new ConcurrentAppendOnlyChunkedList<>(CHUNK_SIZE); + } + + @Test(expected = IllegalArgumentException.class) + public void shouldFailToCreateNotPowerOf2ChunkSizeCollection() { + new ConcurrentAppendOnlyChunkedList<>(3); + } + + @Test(expected = IllegalArgumentException.class) + public void shouldFailToCreateNegativeChunkSizeCollection() { + new ConcurrentAppendOnlyChunkedList<>(-1); + } + + @Test + public void shouldNumberOfElementsBeTheSameOfTheAddedElements() { + final int messages = ELEMENTS; + for (int i = 0; i < messages; i++) { + Assert.assertEquals(i, chunkedList.size()); + chunkedList.add((i)); + } + Assert.assertEquals(messages, chunkedList.size()); + } + + @Test + public void shouldNumberOfElementsBeTheSameOfAddAllElements() { + final int messages = ELEMENTS; + final Integer[] elements = new Integer[messages]; + for (int i = 0; i < messages; i++) { + final Integer element = i; + elements[i] = element; + } + chunkedList.addAll(elements); + Assert.assertEquals(messages, chunkedList.size()); + } + + @Test + public void shouldGetReturnNullIfEmpty() { + Assert.assertNull(chunkedList.get(0)); + } + + @Test + public void shouldNegativeIndexedGetReturnNull() { + Assert.assertNull(chunkedList.get(-1)); + chunkedList.add(0); + Assert.assertNull(chunkedList.get(-1)); + } + + @Test + public void shouldGetReturnNullIfExceedSize() { + final int messages = ELEMENTS; + for (int i = 0; i < messages; i++) { + final Integer element = i; + chunkedList.add(element); + Assert.assertNull(chunkedList.get(i + 1)); + } + } + + @Test + public void shouldGetReturnElementsAccordingToAddOrder() { + final int messages = ELEMENTS; + final Integer[] elements = new Integer[messages]; + for (int i = 0; i < messages; i++) { + final Integer element = i; + elements[i] = element; + chunkedList.add(element); + } + final Integer[] cachedElements = new Integer[messages]; + for (int i = 0; i < messages; i++) { + cachedElements[i] = chunkedList.get(i); + } + Assert.assertArrayEquals(cachedElements, elements); + } + + @Test + public void shouldGetReturnElementsAccordingToAddAllOrder() { + final int messages = ELEMENTS; + final Integer[] elements = new Integer[messages]; + for (int i = 0; i < messages; i++) { + final Integer element = i; + elements[i] = element; + } + chunkedList.addAll(elements); + final Integer[] cachedElements = new Integer[messages]; + for (int i = 0; i < messages; i++) { + cachedElements[i] = chunkedList.get(i); + } + Assert.assertArrayEquals(cachedElements, elements); + } + + @Test + public void shouldToArrayReturnElementsAccordingToAddOrder() { + final int messages = ELEMENTS; + final Integer[] elements = new Integer[messages]; + for (int i = 0; i < messages; i++) { + final Integer element = i; + elements[i] = element; + chunkedList.add(element); + } + final Integer[] cachedElements = chunkedList.toArray(Integer[]::new); + Assert.assertArrayEquals(elements, cachedElements); + } + + @Test + public void shouldToArrayReturnElementsAccordingToAddAllOrder() { + final int messages = ELEMENTS; + final Integer[] elements = new Integer[messages]; + for (int i = 0; i < messages; i++) { + final Integer element = i; + elements[i] = element; + } + chunkedList.addAll(elements); + final Integer[] cachedElements = chunkedList.toArray(Integer[]::new); + Assert.assertArrayEquals(elements, cachedElements); + } + + @Test + public void shouldToArrayReturnEmptyArrayIfEmpty() { + final Integer[] array = chunkedList.toArray(Integer[]::new); + Assert.assertArrayEquals(new Integer[0], array); + } + +} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java index 57d2e27608..6b55439b05 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java @@ -16,30 +16,31 @@ */ package org.apache.activemq.artemis.core.paging.cursor.impl; -import java.util.LinkedList; -import java.util.List; - import org.apache.activemq.artemis.core.paging.PagedMessage; import org.apache.activemq.artemis.core.paging.cursor.LivePageCache; import org.apache.activemq.artemis.core.paging.impl.Page; import org.apache.activemq.artemis.core.server.LargeServerMessage; +import org.apache.activemq.artemis.utils.collections.ConcurrentAppendOnlyChunkedList; import org.jboss.logging.Logger; /** * This is the same as PageCache, however this is for the page that's being currently written. */ -public class LivePageCacheImpl implements LivePageCache { +public final class LivePageCacheImpl implements LivePageCache { private static final Logger logger = Logger.getLogger(LivePageCacheImpl.class); - private final List messages = new LinkedList<>(); + private static final int CHUNK_SIZE = 32; + + private final ConcurrentAppendOnlyChunkedList messages; private final Page page; - private boolean isLive = true; + private volatile boolean isLive = true; public LivePageCacheImpl(final Page page) { this.page = page; + this.messages = new ConcurrentAppendOnlyChunkedList<>(CHUNK_SIZE); } @Override @@ -48,54 +49,49 @@ public class LivePageCacheImpl implements LivePageCache { } @Override - public synchronized int getNumberOfMessages() { + public int getNumberOfMessages() { return messages.size(); } @Override - public synchronized void setMessages(PagedMessage[] messages) { + public void setMessages(PagedMessage[] messages) { // This method shouldn't be called on liveCache, but we will provide the implementation for it anyway - for (PagedMessage msg : messages) { - addLiveMessage(msg); + for (PagedMessage message : messages) { + addLiveMessage(message); } } @Override - public synchronized PagedMessage getMessage(int messageNumber) { - if (messageNumber < messages.size()) { - return messages.get(messageNumber); - } else { - return null; - } + public PagedMessage getMessage(int messageNumber) { + return messages.get(messageNumber); } @Override - public synchronized boolean isLive() { + public boolean isLive() { return isLive; } @Override - public synchronized void addLiveMessage(PagedMessage message) { + public void addLiveMessage(PagedMessage message) { if (message.getMessage().isLargeMessage()) { ((LargeServerMessage) message.getMessage()).incrementDelayDeletionCount(); } - this.messages.add(message); + messages.add(message); } @Override - public synchronized void close() { + public void close() { logger.tracef("Closing %s", this); this.isLive = false; } @Override - public synchronized PagedMessage[] getMessages() { - return messages.toArray(new PagedMessage[messages.size()]); + public PagedMessage[] getMessages() { + return messages.toArray(PagedMessage[]::new); } @Override public String toString() { - return "LivePacheCacheImpl::page=" + page.getPageId() + " number of messages=" + messages.size() + " isLive = " + - isLive; + return "LivePacheCacheImpl::page=" + page.getPageId() + " number of messages=" + getNumberOfMessages() + " isLive = " + isLive; } }