This commit is contained in:
Clebert Suconic 2019-05-13 12:20:23 -04:00
commit 56e33bcacf
10 changed files with 202 additions and 22 deletions

View File

@ -189,6 +189,11 @@
<artifactId>derby</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<profiles>

View File

@ -19,9 +19,12 @@ package org.apache.activemq.artemis.core.paging.cursor.impl;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import io.netty.util.collection.LongObjectHashMap;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.paging.PagedMessage;
import org.apache.activemq.artemis.core.paging.PagingStore;
@ -71,8 +74,19 @@ public class PageCursorProviderImpl implements PageCursorProvider {
private final SoftValueLongObjectHashMap<PageCache> softCache;
private final LongObjectHashMap<CompletableFuture<PageCache>> inProgressReadPages;
private final ConcurrentLongHashMap<PageSubscription> activeCursors = new ConcurrentLongHashMap<>();
private static final long PAGE_READ_TIMEOUT_NS = TimeUnit.SECONDS.toNanos(30);
//Any concurrent read page request will wait in a loop the original Page::read to complete while
//printing at intervals a warn message
private static final long CONCURRENT_PAGE_READ_TIMEOUT_NS = TimeUnit.SECONDS.toNanos(10);
//storageManager.beforePageRead will be attempted in a loop, printing at intervals a warn message
private static final long PAGE_READ_PERMISSION_TIMEOUT_NS = TimeUnit.SECONDS.toNanos(10);
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@ -85,6 +99,7 @@ public class PageCursorProviderImpl implements PageCursorProvider {
this.storageManager = storageManager;
this.executor = executor;
this.softCache = new SoftValueLongObjectHashMap<>(maxCacheSize);
this.inProgressReadPages = new LongObjectHashMap<>();
}
// Public --------------------------------------------------------
@ -131,43 +146,82 @@ public class PageCursorProviderImpl implements PageCursorProvider {
@Override
public PageCache getPageCache(final long pageId) {
try {
if (pageId > pagingStore.getCurrentWritingPage()) {
return null;
}
boolean createPage = false;
CompletableFuture<PageCache> inProgressReadPage;
PageCache cache;
Page page = null;
synchronized (softCache) {
if (pageId > pagingStore.getCurrentWritingPage()) {
cache = softCache.get(pageId);
if (cache != null) {
return cache;
}
if (!pagingStore.checkPageFileExists((int) pageId)) {
return null;
}
cache = softCache.get(pageId);
if (cache == null) {
if (!pagingStore.checkPageFileExists((int) pageId)) {
return null;
}
inProgressReadPage = inProgressReadPages.get(pageId);
if (inProgressReadPage == null) {
final CompletableFuture<PageCache> readPage = new CompletableFuture<>();
cache = createPageCache(pageId);
// anyone reading from this cache will have to wait reading to finish first
// we also want only one thread reading this cache
logger.tracef("adding pageCache pageNr=%d into cursor = %s", pageId, this.pagingStore.getAddress());
readPage((int) pageId, cache);
softCache.put(pageId, cache);
page = pagingStore.createPage((int) pageId);
createPage = true;
inProgressReadPage = readPage;
inProgressReadPages.put(pageId, readPage);
}
}
if (createPage) {
return readPage(pageId, page, cache, inProgressReadPage);
} else {
final long startedWait = System.nanoTime();
while (true) {
try {
return inProgressReadPage.get(CONCURRENT_PAGE_READ_TIMEOUT_NS, TimeUnit.NANOSECONDS);
} catch (TimeoutException e) {
final long elapsed = System.nanoTime() - startedWait;
final long elapsedMillis = TimeUnit.NANOSECONDS.toMillis(elapsed);
logger.warnf("Waiting a concurrent Page::read for pageNr=%d on cursor %s by %d ms",
pageId, pagingStore.getAddress(), elapsedMillis);
}
}
}
return cache;
} catch (Exception e) {
throw new RuntimeException(e.getMessage(), e);
}
}
private void readPage(int pageId, PageCache cache) throws Exception {
Page page = null;
private PageCache readPage(long pageId,
Page page,
PageCache cache,
CompletableFuture<PageCache> inProgressReadPage) throws Exception {
logger.tracef("adding pageCache pageNr=%d into cursor = %s", pageId, this.pagingStore.getAddress());
boolean acquiredPageReadPermission = false;
try {
page = pagingStore.createPage(pageId);
storageManager.beforePageRead();
final long startedRequest = System.nanoTime();
while (!acquiredPageReadPermission) {
acquiredPageReadPermission = storageManager.beforePageRead(PAGE_READ_PERMISSION_TIMEOUT_NS, TimeUnit.NANOSECONDS);
if (!acquiredPageReadPermission) {
final long elapsedMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startedRequest);
logger.warnf("Cannot acquire page read permission of pageNr=%d on cursor %s after %d ms: consider increasing page-max-concurrent-io or use a faster disk",
pageId, pagingStore.getAddress(), elapsedMillis);
}
}
page.open();
final long startedReadPage = System.nanoTime();
List<PagedMessage> pgdMessages = page.read(storageManager);
final long elapsedReadPage = System.nanoTime() - startedReadPage;
if (elapsedReadPage > PAGE_READ_TIMEOUT_NS) {
logger.warnf("Page::read for pageNr=%d on cursor %s tooks %d ms to read %d bytes", pageId,
pagingStore.getAddress(), TimeUnit.NANOSECONDS.toMillis(elapsedReadPage), page.getSize());
}
cache.setMessages(pgdMessages.toArray(new PagedMessage[pgdMessages.size()]));
} catch (Throwable t) {
inProgressReadPage.completeExceptionally(t);
synchronized (softCache) {
inProgressReadPages.remove(pageId);
}
throw t;
} finally {
try {
if (page != null) {
@ -175,8 +229,16 @@ public class PageCursorProviderImpl implements PageCursorProvider {
}
} catch (Throwable ignored) {
}
storageManager.afterPageRead();
if (acquiredPageReadPermission) {
storageManager.afterPageRead();
}
}
inProgressReadPage.complete(cache);
synchronized (softCache) {
inProgressReadPages.remove(pageId);
softCache.put(pageId, cache);
}
return cache;
}
@Override

View File

@ -22,6 +22,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.Pair;
@ -146,6 +147,12 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent {
*/
void beforePageRead() throws Exception;
/**
* Like {@link #beforePageRead()} but return {@code true} if acquired within {@code timeout},
* {@code false} otherwise.
*/
boolean beforePageRead(long timeout, TimeUnit unit) throws InterruptedException;
/**
* We need a safeguard in place to avoid too much concurrent IO happening on Paging, otherwise
* the system may become unresponsive if too many destinations are reading all the same time.

View File

@ -1632,6 +1632,15 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
}
}
@Override
public boolean beforePageRead(long timeout, TimeUnit unit) throws InterruptedException {
final Semaphore pageMaxConcurrentIO = this.pageMaxConcurrentIO;
if (pageMaxConcurrentIO == null) {
return true;
}
return pageMaxConcurrentIO.tryAcquire(timeout, unit);
}
@Override
public void afterPageRead() throws Exception {
if (pageMaxConcurrentIO != null) {

View File

@ -23,6 +23,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.artemis.api.core.Message;
@ -577,6 +578,11 @@ public class NullStorageManager implements StorageManager {
public void beforePageRead() throws Exception {
}
@Override
public boolean beforePageRead(long timeout, TimeUnit unit) throws InterruptedException {
return true;
}
@Override
public void afterPageRead() throws Exception {
}

View File

@ -0,0 +1,77 @@
/*
* Copyright The Apache Software Foundation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.paging.cursor.impl;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.paging.cursor.PageCache;
import org.apache.activemq.artemis.core.paging.impl.Page;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
import org.junit.Assert;
import org.junit.Test;
import static java.util.Collections.emptyList;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class PageCursorProviderImplTest {
@Test(timeout = 30_000)
public void shouldAllowConcurrentPageReads() throws Exception {
final PagingStore pagingStore = mock(PagingStore.class);
final StorageManager storageManager = mock(StorageManager.class);
when(storageManager.beforePageRead(anyLong(), any(TimeUnit.class))).thenReturn(true);
final int pages = 2;
final ArtemisExecutor artemisExecutor = mock(ArtemisExecutor.class);
final PageCursorProviderImpl pageCursorProvider = new PageCursorProviderImpl(pagingStore, storageManager, artemisExecutor, 2);
when(pagingStore.getCurrentWritingPage()).thenReturn(pages);
when(pagingStore.checkPageFileExists(anyInt())).thenReturn(true);
final Page firstPage = mock(Page.class);
when(firstPage.getPageId()).thenReturn(1);
when(pagingStore.createPage(1)).thenReturn(firstPage);
final Page secondPage = mock(Page.class);
when(secondPage.getPageId()).thenReturn(2);
when(pagingStore.createPage(2)).thenReturn(secondPage);
final CountDownLatch finishFirstPageRead = new CountDownLatch(1);
final Thread concurrentRead = new Thread(() -> {
try {
final PageCache cache = pageCursorProvider.getPageCache(2);
Assert.assertNotNull(cache);
} finally {
finishFirstPageRead.countDown();
}
});
try {
when(firstPage.read(storageManager)).then(invocationOnMock -> {
concurrentRead.start();
finishFirstPageRead.await();
return emptyList();
});
Assert.assertNotNull(pageCursorProvider.getPageCache(1));
} finally {
pageCursorProvider.stop();
concurrentRead.interrupt();
concurrentRead.join();
}
}
}

View File

@ -23,6 +23,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.ActiveMQException;
@ -289,6 +290,11 @@ public class TransactionImplTest extends ActiveMQTestBase {
}
@Override
public boolean beforePageRead(long timeout, TimeUnit unit) throws InterruptedException {
return true;
}
@Override
public void afterPageRead() throws Exception {

View File

@ -0,0 +1 @@
mock-maker-inline

View File

@ -1631,6 +1631,8 @@
<exclude>activemq-artemis-native/src/main/c/org_apache_activemq_artemis_jlibaio_LibaioContext.h</exclude>
<exclude>**/dependency-reduced-pom.xml</exclude>
<!-- Mockito -->
<exclude>**/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker</exclude>
</excludes>
</configuration>
<executions>

View File

@ -365,6 +365,11 @@ public class SendAckFailTest extends SpawnedTestBase {
manager.beforePageRead();
}
@Override
public boolean beforePageRead(long timeout, TimeUnit unit) throws InterruptedException {
return manager.beforePageRead(timeout, unit);
}
@Override
public void afterPageRead() throws Exception {
manager.afterPageRead();