From f2bac5ad08dc2b31628b622e68056d0e813e989e Mon Sep 17 00:00:00 2001 From: Francesco Nigro Date: Thu, 25 Apr 2019 22:30:53 +0200 Subject: [PATCH] ARTEMIS-2321 Non-blocking Page::read on page cache --- artemis-server/pom.xml | 5 + .../cursor/impl/PageCursorProviderImpl.java | 106 ++++++++++++++---- .../core/persistence/StorageManager.java | 7 ++ .../AbstractJournalStorageManager.java | 9 ++ .../impl/nullpm/NullStorageManager.java | 6 + .../impl/PageCursorProviderImplTest.java | 77 +++++++++++++ .../transaction/impl/TransactionImplTest.java | 6 + .../org.mockito.plugins.MockMaker | 1 + pom.xml | 2 + .../integration/client/SendAckFailTest.java | 5 + 10 files changed, 202 insertions(+), 22 deletions(-) create mode 100644 artemis-server/src/test/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImplTest.java create mode 100644 artemis-server/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker diff --git a/artemis-server/pom.xml b/artemis-server/pom.xml index 6ab2967a14..e20179f492 100644 --- a/artemis-server/pom.xml +++ b/artemis-server/pom.xml @@ -189,6 +189,11 @@ derby test + + org.mockito + mockito-core + test + diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java index ddca7f2b74..4e03b3b4a5 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java @@ -19,9 +19,12 @@ package org.apache.activemq.artemis.core.paging.cursor.impl; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; +import io.netty.util.collection.LongObjectHashMap; import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.paging.PagedMessage; import org.apache.activemq.artemis.core.paging.PagingStore; @@ -71,8 +74,19 @@ public class PageCursorProviderImpl implements PageCursorProvider { private final SoftValueLongObjectHashMap softCache; + private final LongObjectHashMap> inProgressReadPages; + private final ConcurrentLongHashMap activeCursors = new ConcurrentLongHashMap<>(); + private static final long PAGE_READ_TIMEOUT_NS = TimeUnit.SECONDS.toNanos(30); + + //Any concurrent read page request will wait in a loop the original Page::read to complete while + //printing at intervals a warn message + private static final long CONCURRENT_PAGE_READ_TIMEOUT_NS = TimeUnit.SECONDS.toNanos(10); + + //storageManager.beforePageRead will be attempted in a loop, printing at intervals a warn message + private static final long PAGE_READ_PERMISSION_TIMEOUT_NS = TimeUnit.SECONDS.toNanos(10); + // Static -------------------------------------------------------- // Constructors -------------------------------------------------- @@ -85,6 +99,7 @@ public class PageCursorProviderImpl implements PageCursorProvider { this.storageManager = storageManager; this.executor = executor; this.softCache = new SoftValueLongObjectHashMap<>(maxCacheSize); + this.inProgressReadPages = new LongObjectHashMap<>(); } // Public -------------------------------------------------------- @@ -131,43 +146,82 @@ public class PageCursorProviderImpl implements PageCursorProvider { @Override public PageCache getPageCache(final long pageId) { try { + if (pageId > pagingStore.getCurrentWritingPage()) { + return null; + } + boolean createPage = false; + CompletableFuture inProgressReadPage; PageCache cache; + Page page = null; synchronized (softCache) { - if (pageId > pagingStore.getCurrentWritingPage()) { + cache = softCache.get(pageId); + if (cache != null) { + return cache; + } + if (!pagingStore.checkPageFileExists((int) pageId)) { return null; } - - cache = softCache.get(pageId); - if (cache == null) { - if (!pagingStore.checkPageFileExists((int) pageId)) { - return null; - } - + inProgressReadPage = inProgressReadPages.get(pageId); + if (inProgressReadPage == null) { + final CompletableFuture readPage = new CompletableFuture<>(); cache = createPageCache(pageId); - // anyone reading from this cache will have to wait reading to finish first - // we also want only one thread reading this cache - logger.tracef("adding pageCache pageNr=%d into cursor = %s", pageId, this.pagingStore.getAddress()); - readPage((int) pageId, cache); - softCache.put(pageId, cache); + page = pagingStore.createPage((int) pageId); + createPage = true; + inProgressReadPage = readPage; + inProgressReadPages.put(pageId, readPage); + } + } + if (createPage) { + return readPage(pageId, page, cache, inProgressReadPage); + } else { + final long startedWait = System.nanoTime(); + while (true) { + try { + return inProgressReadPage.get(CONCURRENT_PAGE_READ_TIMEOUT_NS, TimeUnit.NANOSECONDS); + } catch (TimeoutException e) { + final long elapsed = System.nanoTime() - startedWait; + final long elapsedMillis = TimeUnit.NANOSECONDS.toMillis(elapsed); + logger.warnf("Waiting a concurrent Page::read for pageNr=%d on cursor %s by %d ms", + pageId, pagingStore.getAddress(), elapsedMillis); + } } } - - return cache; } catch (Exception e) { throw new RuntimeException(e.getMessage(), e); } } - private void readPage(int pageId, PageCache cache) throws Exception { - Page page = null; + private PageCache readPage(long pageId, + Page page, + PageCache cache, + CompletableFuture inProgressReadPage) throws Exception { + logger.tracef("adding pageCache pageNr=%d into cursor = %s", pageId, this.pagingStore.getAddress()); + boolean acquiredPageReadPermission = false; try { - page = pagingStore.createPage(pageId); - - storageManager.beforePageRead(); + final long startedRequest = System.nanoTime(); + while (!acquiredPageReadPermission) { + acquiredPageReadPermission = storageManager.beforePageRead(PAGE_READ_PERMISSION_TIMEOUT_NS, TimeUnit.NANOSECONDS); + if (!acquiredPageReadPermission) { + final long elapsedMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startedRequest); + logger.warnf("Cannot acquire page read permission of pageNr=%d on cursor %s after %d ms: consider increasing page-max-concurrent-io or use a faster disk", + pageId, pagingStore.getAddress(), elapsedMillis); + } + } page.open(); - + final long startedReadPage = System.nanoTime(); List pgdMessages = page.read(storageManager); + final long elapsedReadPage = System.nanoTime() - startedReadPage; + if (elapsedReadPage > PAGE_READ_TIMEOUT_NS) { + logger.warnf("Page::read for pageNr=%d on cursor %s tooks %d ms to read %d bytes", pageId, + pagingStore.getAddress(), TimeUnit.NANOSECONDS.toMillis(elapsedReadPage), page.getSize()); + } cache.setMessages(pgdMessages.toArray(new PagedMessage[pgdMessages.size()])); + } catch (Throwable t) { + inProgressReadPage.completeExceptionally(t); + synchronized (softCache) { + inProgressReadPages.remove(pageId); + } + throw t; } finally { try { if (page != null) { @@ -175,8 +229,16 @@ public class PageCursorProviderImpl implements PageCursorProvider { } } catch (Throwable ignored) { } - storageManager.afterPageRead(); + if (acquiredPageReadPermission) { + storageManager.afterPageRead(); + } } + inProgressReadPage.complete(cache); + synchronized (softCache) { + inProgressReadPages.remove(pageId); + softCache.put(pageId, cache); + } + return cache; } @Override diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java index d025d5e991..73c43febaf 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Pair; @@ -146,6 +147,12 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent { */ void beforePageRead() throws Exception; + /** + * Like {@link #beforePageRead()} but return {@code true} if acquired within {@code timeout}, + * {@code false} otherwise. + */ + boolean beforePageRead(long timeout, TimeUnit unit) throws InterruptedException; + /** * We need a safeguard in place to avoid too much concurrent IO happening on Paging, otherwise * the system may become unresponsive if too many destinations are reading all the same time. diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java index 1b92e862e0..fd14d552d6 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java @@ -1632,6 +1632,15 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp } } + @Override + public boolean beforePageRead(long timeout, TimeUnit unit) throws InterruptedException { + final Semaphore pageMaxConcurrentIO = this.pageMaxConcurrentIO; + if (pageMaxConcurrentIO == null) { + return true; + } + return pageMaxConcurrentIO.tryAcquire(timeout, unit); + } + @Override public void afterPageRead() throws Exception { if (pageMaxConcurrentIO != null) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java index 995e57b007..577ce8b6fe 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import org.apache.activemq.artemis.api.core.Message; @@ -577,6 +578,11 @@ public class NullStorageManager implements StorageManager { public void beforePageRead() throws Exception { } + @Override + public boolean beforePageRead(long timeout, TimeUnit unit) throws InterruptedException { + return true; + } + @Override public void afterPageRead() throws Exception { } diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImplTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImplTest.java new file mode 100644 index 0000000000..4ed38e87f2 --- /dev/null +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImplTest.java @@ -0,0 +1,77 @@ +/* + * Copyright The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.paging.cursor.impl; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.artemis.core.paging.PagingStore; +import org.apache.activemq.artemis.core.paging.cursor.PageCache; +import org.apache.activemq.artemis.core.paging.impl.Page; +import org.apache.activemq.artemis.core.persistence.StorageManager; +import org.apache.activemq.artemis.utils.actors.ArtemisExecutor; +import org.junit.Assert; +import org.junit.Test; + +import static java.util.Collections.emptyList; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class PageCursorProviderImplTest { + + @Test(timeout = 30_000) + public void shouldAllowConcurrentPageReads() throws Exception { + final PagingStore pagingStore = mock(PagingStore.class); + final StorageManager storageManager = mock(StorageManager.class); + when(storageManager.beforePageRead(anyLong(), any(TimeUnit.class))).thenReturn(true); + final int pages = 2; + final ArtemisExecutor artemisExecutor = mock(ArtemisExecutor.class); + final PageCursorProviderImpl pageCursorProvider = new PageCursorProviderImpl(pagingStore, storageManager, artemisExecutor, 2); + when(pagingStore.getCurrentWritingPage()).thenReturn(pages); + when(pagingStore.checkPageFileExists(anyInt())).thenReturn(true); + final Page firstPage = mock(Page.class); + when(firstPage.getPageId()).thenReturn(1); + when(pagingStore.createPage(1)).thenReturn(firstPage); + final Page secondPage = mock(Page.class); + when(secondPage.getPageId()).thenReturn(2); + when(pagingStore.createPage(2)).thenReturn(secondPage); + final CountDownLatch finishFirstPageRead = new CountDownLatch(1); + final Thread concurrentRead = new Thread(() -> { + try { + final PageCache cache = pageCursorProvider.getPageCache(2); + Assert.assertNotNull(cache); + } finally { + finishFirstPageRead.countDown(); + } + }); + try { + when(firstPage.read(storageManager)).then(invocationOnMock -> { + concurrentRead.start(); + finishFirstPageRead.await(); + return emptyList(); + }); + Assert.assertNotNull(pageCursorProvider.getPageCache(1)); + } finally { + pageCursorProvider.stop(); + concurrentRead.interrupt(); + concurrentRead.join(); + } + } + +} diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java index b51be9a138..0a15022a60 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.artemis.api.core.ActiveMQException; @@ -289,6 +290,11 @@ public class TransactionImplTest extends ActiveMQTestBase { } + @Override + public boolean beforePageRead(long timeout, TimeUnit unit) throws InterruptedException { + return true; + } + @Override public void afterPageRead() throws Exception { diff --git a/artemis-server/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker b/artemis-server/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker new file mode 100644 index 0000000000..ca6ee9cea8 --- /dev/null +++ b/artemis-server/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker @@ -0,0 +1 @@ +mock-maker-inline \ No newline at end of file diff --git a/pom.xml b/pom.xml index b32a6b0a21..3c7b1a5203 100644 --- a/pom.xml +++ b/pom.xml @@ -1631,6 +1631,8 @@ activemq-artemis-native/src/main/c/org_apache_activemq_artemis_jlibaio_LibaioContext.h **/dependency-reduced-pom.xml + + **/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java index b9b686c405..9364aa0b57 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java @@ -365,6 +365,11 @@ public class SendAckFailTest extends SpawnedTestBase { manager.beforePageRead(); } + @Override + public boolean beforePageRead(long timeout, TimeUnit unit) throws InterruptedException { + return manager.beforePageRead(timeout, unit); + } + @Override public void afterPageRead() throws Exception { manager.afterPageRead();