ARTEMIS-2399 Improve performance when there are a lot of subscribers
This commit is contained in:
parent
cfdec52719
commit
76d420590f
|
@ -251,6 +251,9 @@ public final class ActiveMQDefaultConfiguration {
|
||||||
// The max number of concurrent reads allowed on paging
|
// The max number of concurrent reads allowed on paging
|
||||||
private static int DEFAULT_MAX_CONCURRENT_PAGE_IO = 5;
|
private static int DEFAULT_MAX_CONCURRENT_PAGE_IO = 5;
|
||||||
|
|
||||||
|
// If true the whole page would be read, otherwise just seek and read while getting message
|
||||||
|
private static boolean DEFAULT_READ_WHOLE_PAGE = false;
|
||||||
|
|
||||||
// the directory to store the journal files in
|
// the directory to store the journal files in
|
||||||
private static String DEFAULT_JOURNAL_DIR = "data/journal";
|
private static String DEFAULT_JOURNAL_DIR = "data/journal";
|
||||||
|
|
||||||
|
@ -843,6 +846,11 @@ public final class ActiveMQDefaultConfiguration {
|
||||||
return DEFAULT_MAX_CONCURRENT_PAGE_IO;
|
return DEFAULT_MAX_CONCURRENT_PAGE_IO;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public static boolean isDefaultReadWholePage() {
|
||||||
|
return DEFAULT_READ_WHOLE_PAGE;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* the directory to store the journal files in
|
* the directory to store the journal files in
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -82,6 +82,7 @@
|
||||||
<include>org.jboss.logmanager:jboss-logmanager</include>
|
<include>org.jboss.logmanager:jboss-logmanager</include>
|
||||||
<include>org.jboss.logging:jboss-logging</include>
|
<include>org.jboss.logging:jboss-logging</include>
|
||||||
<include>org.jboss.slf4j:slf4j-jboss-logmanager</include>
|
<include>org.jboss.slf4j:slf4j-jboss-logmanager</include>
|
||||||
|
<include>org.jctools:jctools-core</include>
|
||||||
<include>io.netty:netty-all</include>
|
<include>io.netty:netty-all</include>
|
||||||
<include>io.netty:netty-tcnative-boringssl-static</include>
|
<include>io.netty:netty-tcnative-boringssl-static</include>
|
||||||
<include>org.apache.qpid:proton-j</include>
|
<include>org.apache.qpid:proton-j</include>
|
||||||
|
|
|
@ -64,6 +64,7 @@
|
||||||
<bundle dependency="true">mvn:org.apache.commons/commons-configuration2/${commons.config.version}</bundle>
|
<bundle dependency="true">mvn:org.apache.commons/commons-configuration2/${commons.config.version}</bundle>
|
||||||
<bundle dependency="true">mvn:org.apache.commons/commons-text/1.6</bundle>
|
<bundle dependency="true">mvn:org.apache.commons/commons-text/1.6</bundle>
|
||||||
<bundle dependency="true">mvn:org.apache.commons/commons-lang3/${commons.lang.version}</bundle>
|
<bundle dependency="true">mvn:org.apache.commons/commons-lang3/${commons.lang.version}</bundle>
|
||||||
|
<bundle dependency="true">mvn:org.jctools/jctools-core/${jctools.version}</bundle>
|
||||||
<!-- Micrometer can't be included until it supports OSGi. It is currently an "optional" Maven dependency. -->
|
<!-- Micrometer can't be included until it supports OSGi. It is currently an "optional" Maven dependency. -->
|
||||||
<!--bundle dependency="true">mvn:io.micrometer/micrometer-core/${version.micrometer}</bundle-->
|
<!--bundle dependency="true">mvn:io.micrometer/micrometer-core/${version.micrometer}</bundle-->
|
||||||
|
|
||||||
|
|
|
@ -89,6 +89,10 @@
|
||||||
<version>${project.version}</version>
|
<version>${project.version}</version>
|
||||||
<scope>test</scope>
|
<scope>test</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.jctools</groupId>
|
||||||
|
<artifactId>jctools-core</artifactId>
|
||||||
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>io.netty</groupId>
|
<groupId>io.netty</groupId>
|
||||||
<artifactId>netty-buffer</artifactId>
|
<artifactId>netty-buffer</artifactId>
|
||||||
|
|
|
@ -587,6 +587,17 @@ public interface Configuration {
|
||||||
*/
|
*/
|
||||||
Configuration setPageMaxConcurrentIO(int maxIO);
|
Configuration setPageMaxConcurrentIO(int maxIO);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns whether the whole page is read while getting message after page cache is evicted. <br>
|
||||||
|
* Default value is {@link org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration#DEFAULT_READ_WHOLE_PAGE}.
|
||||||
|
*/
|
||||||
|
boolean isReadWholePage();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets whether the whole page is read while getting message after page cache is evicted.
|
||||||
|
*/
|
||||||
|
Configuration setReadWholePage(boolean read);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the file system directory used to store journal log. <br>
|
* Returns the file system directory used to store journal log. <br>
|
||||||
* Default value is {@link org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration#DEFAULT_JOURNAL_DIR}.
|
* Default value is {@link org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration#DEFAULT_JOURNAL_DIR}.
|
||||||
|
|
|
@ -176,6 +176,8 @@ public class ConfigurationImpl implements Configuration, Serializable {
|
||||||
|
|
||||||
private int maxConcurrentPageIO = ActiveMQDefaultConfiguration.getDefaultMaxConcurrentPageIo();
|
private int maxConcurrentPageIO = ActiveMQDefaultConfiguration.getDefaultMaxConcurrentPageIo();
|
||||||
|
|
||||||
|
private boolean readWholePage = ActiveMQDefaultConfiguration.isDefaultReadWholePage();
|
||||||
|
|
||||||
protected String largeMessagesDirectory = ActiveMQDefaultConfiguration.getDefaultLargeMessagesDir();
|
protected String largeMessagesDirectory = ActiveMQDefaultConfiguration.getDefaultLargeMessagesDir();
|
||||||
|
|
||||||
protected String bindingsDirectory = ActiveMQDefaultConfiguration.getDefaultBindingsDirectory();
|
protected String bindingsDirectory = ActiveMQDefaultConfiguration.getDefaultBindingsDirectory();
|
||||||
|
@ -811,6 +813,17 @@ public class ConfigurationImpl implements Configuration, Serializable {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isReadWholePage() {
|
||||||
|
return readWholePage;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ConfigurationImpl setReadWholePage(boolean read) {
|
||||||
|
readWholePage = read;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public File getJournalLocation() {
|
public File getJournalLocation() {
|
||||||
return subFolder(getJournalDirectory());
|
return subFolder(getJournalDirectory());
|
||||||
|
|
|
@ -568,6 +568,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
|
||||||
|
|
||||||
config.setPageMaxConcurrentIO(getInteger(e, "page-max-concurrent-io", config.getPageMaxConcurrentIO(), Validators.MINUS_ONE_OR_GT_ZERO));
|
config.setPageMaxConcurrentIO(getInteger(e, "page-max-concurrent-io", config.getPageMaxConcurrentIO(), Validators.MINUS_ONE_OR_GT_ZERO));
|
||||||
|
|
||||||
|
config.setReadWholePage(getBoolean(e, "read-whole-page", config.isReadWholePage()));
|
||||||
|
|
||||||
config.setPagingDirectory(getString(e, "paging-directory", config.getPagingDirectory(), Validators.NOT_NULL_OR_EMPTY));
|
config.setPagingDirectory(getString(e, "paging-directory", config.getPagingDirectory(), Validators.NOT_NULL_OR_EMPTY));
|
||||||
|
|
||||||
config.setCreateJournalDir(getBoolean(e, "create-journal-dir", config.isCreateJournalDir()));
|
config.setCreateJournalDir(getBoolean(e, "create-journal-dir", config.isCreateJournalDir()));
|
||||||
|
|
|
@ -36,10 +36,10 @@ public interface PageCache extends SoftValueLongObjectHashMap.ValueCache {
|
||||||
boolean isLive();
|
boolean isLive();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param messageNumber The order of the message on the page
|
* @param pagePosition page position
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
PagedMessage getMessage(int messageNumber);
|
PagedMessage getMessage(PagePosition pagePosition);
|
||||||
|
|
||||||
void close();
|
void close();
|
||||||
|
|
||||||
|
|
|
@ -28,6 +28,8 @@ public interface PagePosition extends Comparable<PagePosition> {
|
||||||
|
|
||||||
int getMessageNr();
|
int getMessageNr();
|
||||||
|
|
||||||
|
int getFileOffset();
|
||||||
|
|
||||||
long getPersistentSize();
|
long getPersistentSize();
|
||||||
|
|
||||||
void setPersistentSize(long persistentSize);
|
void setPersistentSize(long persistentSize);
|
||||||
|
|
|
@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.paging.cursor.impl;
|
||||||
|
|
||||||
import org.apache.activemq.artemis.core.paging.PagedMessage;
|
import org.apache.activemq.artemis.core.paging.PagedMessage;
|
||||||
import org.apache.activemq.artemis.core.paging.cursor.LivePageCache;
|
import org.apache.activemq.artemis.core.paging.cursor.LivePageCache;
|
||||||
|
import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
|
||||||
import org.apache.activemq.artemis.core.server.LargeServerMessage;
|
import org.apache.activemq.artemis.core.server.LargeServerMessage;
|
||||||
import org.apache.activemq.artemis.utils.collections.ConcurrentAppendOnlyChunkedList;
|
import org.apache.activemq.artemis.utils.collections.ConcurrentAppendOnlyChunkedList;
|
||||||
import org.jboss.logging.Logger;
|
import org.jboss.logging.Logger;
|
||||||
|
@ -61,8 +62,8 @@ public final class LivePageCacheImpl implements LivePageCache {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public PagedMessage getMessage(int messageNumber) {
|
public PagedMessage getMessage(PagePosition pagePosition) {
|
||||||
return messages.get(messageNumber);
|
return messages.get(pagePosition.getMessageNr());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.paging.cursor.impl;
|
||||||
|
|
||||||
import org.apache.activemq.artemis.core.paging.PagedMessage;
|
import org.apache.activemq.artemis.core.paging.PagedMessage;
|
||||||
import org.apache.activemq.artemis.core.paging.cursor.PageCache;
|
import org.apache.activemq.artemis.core.paging.cursor.PageCache;
|
||||||
|
import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The caching associated to a single page.
|
* The caching associated to a single page.
|
||||||
|
@ -43,9 +44,9 @@ class PageCacheImpl implements PageCache {
|
||||||
// Public --------------------------------------------------------
|
// Public --------------------------------------------------------
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public PagedMessage getMessage(final int messageNumber) {
|
public PagedMessage getMessage(PagePosition pagePosition) {
|
||||||
if (messageNumber < messages.length) {
|
if (pagePosition.getMessageNr() < messages.length) {
|
||||||
return messages[messageNumber];
|
return messages[pagePosition.getMessageNr()];
|
||||||
} else {
|
} else {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
|
@ -74,6 +74,8 @@ public class PageCursorProviderImpl implements PageCursorProvider {
|
||||||
|
|
||||||
private final SoftValueLongObjectHashMap<PageCache> softCache;
|
private final SoftValueLongObjectHashMap<PageCache> softCache;
|
||||||
|
|
||||||
|
private LongObjectHashMap<Integer> numberOfMessages = null;
|
||||||
|
|
||||||
private final LongObjectHashMap<CompletableFuture<PageCache>> inProgressReadPages;
|
private final LongObjectHashMap<CompletableFuture<PageCache>> inProgressReadPages;
|
||||||
|
|
||||||
private final ConcurrentLongHashMap<PageSubscription> activeCursors = new ConcurrentLongHashMap<>();
|
private final ConcurrentLongHashMap<PageSubscription> activeCursors = new ConcurrentLongHashMap<>();
|
||||||
|
@ -90,15 +92,25 @@ public class PageCursorProviderImpl implements PageCursorProvider {
|
||||||
// Static --------------------------------------------------------
|
// Static --------------------------------------------------------
|
||||||
|
|
||||||
// Constructors --------------------------------------------------
|
// Constructors --------------------------------------------------
|
||||||
|
|
||||||
public PageCursorProviderImpl(final PagingStore pagingStore,
|
public PageCursorProviderImpl(final PagingStore pagingStore,
|
||||||
final StorageManager storageManager,
|
final StorageManager storageManager,
|
||||||
final ArtemisExecutor executor,
|
final ArtemisExecutor executor,
|
||||||
final int maxCacheSize) {
|
final int maxCacheSize) {
|
||||||
|
this(pagingStore, storageManager, executor, maxCacheSize, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
public PageCursorProviderImpl(final PagingStore pagingStore,
|
||||||
|
final StorageManager storageManager,
|
||||||
|
final ArtemisExecutor executor,
|
||||||
|
final int maxCacheSize,
|
||||||
|
final boolean readWholePage) {
|
||||||
this.pagingStore = pagingStore;
|
this.pagingStore = pagingStore;
|
||||||
this.storageManager = storageManager;
|
this.storageManager = storageManager;
|
||||||
this.executor = executor;
|
this.executor = executor;
|
||||||
this.softCache = new SoftValueLongObjectHashMap<>(maxCacheSize);
|
this.softCache = new SoftValueLongObjectHashMap<>(maxCacheSize);
|
||||||
|
if (!readWholePage) {
|
||||||
|
this.numberOfMessages = new LongObjectHashMap<>();
|
||||||
|
}
|
||||||
this.inProgressReadPages = new LongObjectHashMap<>();
|
this.inProgressReadPages = new LongObjectHashMap<>();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -133,7 +145,7 @@ public class PageCursorProviderImpl implements PageCursorProvider {
|
||||||
throw new NonExistentPage("Invalid messageNumber passed = " + pos + " on " + cache);
|
throw new NonExistentPage("Invalid messageNumber passed = " + pos + " on " + cache);
|
||||||
}
|
}
|
||||||
|
|
||||||
return cache.getMessage(pos.getMessageNr());
|
return cache.getMessage(pos);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -169,6 +181,9 @@ public class PageCursorProviderImpl implements PageCursorProvider {
|
||||||
}
|
}
|
||||||
inProgressReadPage = inProgressReadPages.get(pageId);
|
inProgressReadPage = inProgressReadPages.get(pageId);
|
||||||
if (inProgressReadPage == null) {
|
if (inProgressReadPage == null) {
|
||||||
|
if (numberOfMessages != null && numberOfMessages.containsKey(pageId)) {
|
||||||
|
return new PageReader(pagingStore.createPage((int) pageId), numberOfMessages.get(pageId));
|
||||||
|
}
|
||||||
final CompletableFuture<PageCache> readPage = new CompletableFuture<>();
|
final CompletableFuture<PageCache> readPage = new CompletableFuture<>();
|
||||||
cache = createPageCache(pageId);
|
cache = createPageCache(pageId);
|
||||||
page = pagingStore.createPage((int) pageId);
|
page = pagingStore.createPage((int) pageId);
|
||||||
|
@ -203,6 +218,7 @@ public class PageCursorProviderImpl implements PageCursorProvider {
|
||||||
CompletableFuture<PageCache> inProgressReadPage) throws Exception {
|
CompletableFuture<PageCache> inProgressReadPage) throws Exception {
|
||||||
logger.tracef("adding pageCache pageNr=%d into cursor = %s", pageId, this.pagingStore.getAddress());
|
logger.tracef("adding pageCache pageNr=%d into cursor = %s", pageId, this.pagingStore.getAddress());
|
||||||
boolean acquiredPageReadPermission = false;
|
boolean acquiredPageReadPermission = false;
|
||||||
|
int num = -1;
|
||||||
try {
|
try {
|
||||||
final long startedRequest = System.nanoTime();
|
final long startedRequest = System.nanoTime();
|
||||||
while (!acquiredPageReadPermission) {
|
while (!acquiredPageReadPermission) {
|
||||||
|
@ -221,7 +237,8 @@ public class PageCursorProviderImpl implements PageCursorProvider {
|
||||||
logger.warnf("Page::read for pageNr=%d on cursor %s tooks %d ms to read %d bytes", pageId,
|
logger.warnf("Page::read for pageNr=%d on cursor %s tooks %d ms to read %d bytes", pageId,
|
||||||
pagingStore.getAddress(), TimeUnit.NANOSECONDS.toMillis(elapsedReadPage), page.getSize());
|
pagingStore.getAddress(), TimeUnit.NANOSECONDS.toMillis(elapsedReadPage), page.getSize());
|
||||||
}
|
}
|
||||||
cache.setMessages(pgdMessages.toArray(new PagedMessage[pgdMessages.size()]));
|
num = pgdMessages.size();
|
||||||
|
cache.setMessages(pgdMessages.toArray(new PagedMessage[num]));
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
inProgressReadPage.completeExceptionally(t);
|
inProgressReadPage.completeExceptionally(t);
|
||||||
synchronized (softCache) {
|
synchronized (softCache) {
|
||||||
|
@ -243,6 +260,9 @@ public class PageCursorProviderImpl implements PageCursorProvider {
|
||||||
synchronized (softCache) {
|
synchronized (softCache) {
|
||||||
inProgressReadPages.remove(pageId);
|
inProgressReadPages.remove(pageId);
|
||||||
softCache.put(pageId, cache);
|
softCache.put(pageId, cache);
|
||||||
|
if (numberOfMessages != null && num != -1) {
|
||||||
|
numberOfMessages.put(pageId, Integer.valueOf(num));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return cache;
|
return cache;
|
||||||
}
|
}
|
||||||
|
@ -540,7 +560,9 @@ public class PageCursorProviderImpl implements PageCursorProvider {
|
||||||
|
|
||||||
depagedPage.delete(pgdMessages);
|
depagedPage.delete(pgdMessages);
|
||||||
synchronized (softCache) {
|
synchronized (softCache) {
|
||||||
softCache.remove((long) depagedPage.getPageId());
|
long pageId = (long) depagedPage.getPageId();
|
||||||
|
softCache.remove(pageId);
|
||||||
|
numberOfMessages.remove(pageId);
|
||||||
}
|
}
|
||||||
onDeletePage(depagedPage);
|
onDeletePage(depagedPage);
|
||||||
}
|
}
|
||||||
|
|
|
@ -31,6 +31,8 @@ public class PagePositionImpl implements PagePosition {
|
||||||
*/
|
*/
|
||||||
private int messageNr;
|
private int messageNr;
|
||||||
|
|
||||||
|
private int fileOffset = -1;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* ID used for storage
|
* ID used for storage
|
||||||
*/
|
*/
|
||||||
|
@ -45,11 +47,17 @@ public class PagePositionImpl implements PagePosition {
|
||||||
/**
|
/**
|
||||||
* @param pageNr
|
* @param pageNr
|
||||||
* @param messageNr
|
* @param messageNr
|
||||||
|
* @param fileOffset
|
||||||
*/
|
*/
|
||||||
public PagePositionImpl(long pageNr, int messageNr) {
|
public PagePositionImpl(long pageNr, int messageNr, int fileOffset) {
|
||||||
this();
|
this();
|
||||||
this.pageNr = pageNr;
|
this.pageNr = pageNr;
|
||||||
this.messageNr = messageNr;
|
this.messageNr = messageNr;
|
||||||
|
this.fileOffset = fileOffset;
|
||||||
|
}
|
||||||
|
|
||||||
|
public PagePositionImpl(long pageNr, int messageNr) {
|
||||||
|
this(pageNr, messageNr, -1);
|
||||||
}
|
}
|
||||||
|
|
||||||
public PagePositionImpl() {
|
public PagePositionImpl() {
|
||||||
|
@ -88,6 +96,11 @@ public class PagePositionImpl implements PagePosition {
|
||||||
return messageNr;
|
return messageNr;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getFileOffset() {
|
||||||
|
return fileOffset;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return the persistentSize
|
* @return the persistentSize
|
||||||
*/
|
*/
|
||||||
|
@ -120,7 +133,7 @@ public class PagePositionImpl implements PagePosition {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public PagePosition nextPage() {
|
public PagePosition nextPage() {
|
||||||
return new PagePositionImpl(this.pageNr + 1, 0);
|
return new PagePositionImpl(this.pageNr + 1, 0, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -150,7 +163,8 @@ public class PagePositionImpl implements PagePosition {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "PagePositionImpl [pageNr=" + pageNr + ", messageNr=" + messageNr + ", recordID=" + recordID + "]";
|
return "PagePositionImpl [pageNr=" + pageNr + ", messageNr=" + messageNr + ", recordID=" + recordID +
|
||||||
|
", fileOffset=" + fileOffset + "]";
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -0,0 +1,141 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.activemq.artemis.core.paging.cursor.impl;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.core.paging.PagedMessage;
|
||||||
|
import org.apache.activemq.artemis.core.paging.cursor.NonExistentPage;
|
||||||
|
import org.apache.activemq.artemis.core.paging.cursor.PageCache;
|
||||||
|
import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
|
||||||
|
import org.apache.activemq.artemis.core.paging.impl.Page;
|
||||||
|
import org.jboss.logging.Logger;
|
||||||
|
|
||||||
|
public class PageReader implements PageCache {
|
||||||
|
private static final Logger logger = Logger.getLogger(PageReader.class);
|
||||||
|
|
||||||
|
private final Page page;
|
||||||
|
private final int numberOfMessages;
|
||||||
|
private PagedMessage[] pagedMessages = null;
|
||||||
|
|
||||||
|
public PageReader(Page page, int numberOfMessages) {
|
||||||
|
this.page = page;
|
||||||
|
this.numberOfMessages = numberOfMessages;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getPageId() {
|
||||||
|
return page.getPageId();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getNumberOfMessages() {
|
||||||
|
return numberOfMessages;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setMessages(PagedMessage[] messages) {
|
||||||
|
this.pagedMessages = messages;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized PagedMessage[] getMessages() {
|
||||||
|
if (pagedMessages != null) {
|
||||||
|
return pagedMessages;
|
||||||
|
} else {
|
||||||
|
try {
|
||||||
|
openPage();
|
||||||
|
return page.read().toArray(new PagedMessage[numberOfMessages]);
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new RuntimeException(e.getMessage(), e);
|
||||||
|
} finally {
|
||||||
|
close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isLive() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param pagePosition page position
|
||||||
|
* @param throwException if {@code true} exception will be thrown when message number is beyond the page
|
||||||
|
* @param keepOpen if {@code true} page file would keep open after reading message
|
||||||
|
* @return the paged message
|
||||||
|
*/
|
||||||
|
public synchronized PagedMessage getMessage(PagePosition pagePosition, boolean throwException, boolean keepOpen) {
|
||||||
|
if (pagePosition.getMessageNr() >= getNumberOfMessages()) {
|
||||||
|
if (throwException) {
|
||||||
|
throw new NonExistentPage("Invalid messageNumber passed = " + pagePosition + " on " + this);
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean previouslyClosed = true;
|
||||||
|
try {
|
||||||
|
previouslyClosed = openPage();
|
||||||
|
PagedMessage msg;
|
||||||
|
if (pagePosition.getFileOffset() != -1) {
|
||||||
|
msg = page.readMessage(pagePosition.getFileOffset(), pagePosition.getMessageNr(), pagePosition.getMessageNr());
|
||||||
|
} else {
|
||||||
|
if (logger.isTraceEnabled()) {
|
||||||
|
logger.trace("get message from pos " + pagePosition, new Exception("trace get message without file offset"));
|
||||||
|
}
|
||||||
|
msg = page.readMessage(0, 0, pagePosition.getMessageNr());
|
||||||
|
}
|
||||||
|
return msg;
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new RuntimeException(e.getMessage(), e);
|
||||||
|
} finally {
|
||||||
|
if (!keepOpen && previouslyClosed) {
|
||||||
|
close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized PagedMessage getMessage(PagePosition pagePosition) {
|
||||||
|
return getMessage(pagePosition, false, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return true if file was previously closed
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
boolean openPage() throws Exception {
|
||||||
|
if (!page.getFile().isOpen()) {
|
||||||
|
page.getFile().open();
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized void close() {
|
||||||
|
try {
|
||||||
|
page.close(false, false);
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.warn("Closing page " + page.getPageId() + " occurs exception:", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "PageReader::page=" + getPageId() + " numberOfMessages = " + numberOfMessages;
|
||||||
|
}
|
||||||
|
}
|
|
@ -58,6 +58,7 @@ import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes;
|
||||||
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
|
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
|
||||||
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
|
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
|
||||||
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
|
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
|
||||||
|
import org.apache.activemq.artemis.utils.collections.ConcurrentLongHashMap;
|
||||||
import org.jboss.logging.Logger;
|
import org.jboss.logging.Logger;
|
||||||
|
|
||||||
public final class PageSubscriptionImpl implements PageSubscription {
|
public final class PageSubscriptionImpl implements PageSubscription {
|
||||||
|
@ -99,6 +100,9 @@ public final class PageSubscriptionImpl implements PageSubscription {
|
||||||
|
|
||||||
private final AtomicLong deliveredSize = new AtomicLong(0);
|
private final AtomicLong deliveredSize = new AtomicLong(0);
|
||||||
|
|
||||||
|
// Each CursorIterator will record their current PageReader in this map
|
||||||
|
private final ConcurrentLongHashMap<PageReader> pageReaders = new ConcurrentLongHashMap<>();
|
||||||
|
|
||||||
PageSubscriptionImpl(final PageCursorProvider cursorProvider,
|
PageSubscriptionImpl(final PageCursorProvider cursorProvider,
|
||||||
final PagingStore pageStore,
|
final PagingStore pageStore,
|
||||||
final StorageManager store,
|
final StorageManager store,
|
||||||
|
@ -366,7 +370,15 @@ public final class PageSubscriptionImpl implements PageSubscription {
|
||||||
}
|
}
|
||||||
|
|
||||||
private PagedReference getReference(PagePosition pos) {
|
private PagedReference getReference(PagePosition pos) {
|
||||||
return cursorProvider.newReference(pos, cursorProvider.getMessage(pos), this);
|
PagedMessage pagedMessage;
|
||||||
|
PageReader pageReader = pageReaders.get(pos.getPageNr());
|
||||||
|
if (pageReader != null) {
|
||||||
|
pagedMessage = pageReader.getMessage(pos, true, false);
|
||||||
|
} else {
|
||||||
|
pagedMessage = cursorProvider.getMessage(pos);
|
||||||
|
}
|
||||||
|
|
||||||
|
return cursorProvider.newReference(pos, pagedMessage, this);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -379,13 +391,19 @@ public final class PageSubscriptionImpl implements PageSubscription {
|
||||||
return new CursorIterator(browsing);
|
return new CursorIterator(browsing);
|
||||||
}
|
}
|
||||||
|
|
||||||
private PagedReference internalGetNext(final PagePosition pos) {
|
private PagedReference internalGetNext(final PagePositionAndFileOffset pos) {
|
||||||
PagePosition retPos = pos.nextMessage();
|
PagePosition retPos = pos.nextPagePostion();
|
||||||
|
|
||||||
PageCache cache = null;
|
PageCache cache = null;
|
||||||
|
|
||||||
while (retPos.getPageNr() <= pageStore.getCurrentWritingPage()) {
|
while (retPos.getPageNr() <= pageStore.getCurrentWritingPage()) {
|
||||||
cache = cursorProvider.getPageCache(retPos.getPageNr());
|
PageReader pageReader = pageReaders.get(retPos.getPageNr());
|
||||||
|
if (pageReader == null) {
|
||||||
|
cache = cursorProvider.getPageCache(retPos.getPageNr());
|
||||||
|
} else {
|
||||||
|
cache = pageReader;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* In following cases, we should move to the next page
|
* In following cases, we should move to the next page
|
||||||
* case 1: cache == null means file might be deleted unexpectedly.
|
* case 1: cache == null means file might be deleted unexpectedly.
|
||||||
|
@ -404,7 +422,17 @@ public final class PageSubscriptionImpl implements PageSubscription {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (cache != null) {
|
if (cache != null) {
|
||||||
PagedMessage serverMessage = cache.getMessage(retPos.getMessageNr());
|
PagedMessage serverMessage;
|
||||||
|
if (cache instanceof PageReader) {
|
||||||
|
serverMessage = ((PageReader) cache).getMessage(retPos, false, true);
|
||||||
|
PageCache previousPageCache = pageReaders.putIfAbsent(retPos.getPageNr(), (PageReader) cache);
|
||||||
|
if (previousPageCache != null && previousPageCache != cache) {
|
||||||
|
// Maybe other cursor iterators have added page reader, we have to close this one to avoid file leak
|
||||||
|
cache.close();
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
serverMessage = cache.getMessage(retPos);
|
||||||
|
}
|
||||||
|
|
||||||
if (serverMessage != null) {
|
if (serverMessage != null) {
|
||||||
return cursorProvider.newReference(retPos, serverMessage, this);
|
return cursorProvider.newReference(retPos, serverMessage, this);
|
||||||
|
@ -415,6 +443,10 @@ public final class PageSubscriptionImpl implements PageSubscription {
|
||||||
|
|
||||||
private PagePosition moveNextPage(final PagePosition pos) {
|
private PagePosition moveNextPage(final PagePosition pos) {
|
||||||
PagePosition retPos = pos;
|
PagePosition retPos = pos;
|
||||||
|
PageReader pageReader = pageReaders.remove(pos.getPageNr());
|
||||||
|
if (pageReader != null) {
|
||||||
|
pageReader.close();
|
||||||
|
}
|
||||||
while (true) {
|
while (true) {
|
||||||
retPos = retPos.nextPage();
|
retPos = retPos.nextPage();
|
||||||
synchronized (consumedPages) {
|
synchronized (consumedPages) {
|
||||||
|
@ -441,8 +473,8 @@ public final class PageSubscriptionImpl implements PageSubscription {
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
private synchronized PagePosition getStartPosition() {
|
private synchronized PagePositionAndFileOffset getStartPosition() {
|
||||||
return new PagePositionImpl(pageStore.getFirstPage(), -1);
|
return new PagePositionAndFileOffset(-1, new PagePositionImpl(pageStore.getFirstPage(), -1));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -585,7 +617,12 @@ public final class PageSubscriptionImpl implements PageSubscription {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public PagedMessage queryMessage(PagePosition pos) {
|
public PagedMessage queryMessage(PagePosition pos) {
|
||||||
return cursorProvider.getMessage(pos);
|
PageReader pageReader = pageReaders.get(pos.getPageNr());
|
||||||
|
if (pageReader != null) {
|
||||||
|
return pageReader.getMessage(pos, true, false);
|
||||||
|
} else {
|
||||||
|
return cursorProvider.getMessage(pos);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -892,7 +929,7 @@ public final class PageSubscriptionImpl implements PageSubscription {
|
||||||
if (persistentSize < 0) {
|
if (persistentSize < 0) {
|
||||||
//cache.getMessage is potentially expensive depending
|
//cache.getMessage is potentially expensive depending
|
||||||
//on the current cache size and which message is queried
|
//on the current cache size and which message is queried
|
||||||
size = getPersistentSize(cache.getMessage(position.getMessageNr()));
|
size = getPersistentSize(cache.getMessage(position));
|
||||||
} else {
|
} else {
|
||||||
size = persistentSize;
|
size = persistentSize;
|
||||||
}
|
}
|
||||||
|
@ -1209,9 +1246,9 @@ public final class PageSubscriptionImpl implements PageSubscription {
|
||||||
|
|
||||||
private class CursorIterator implements PageIterator {
|
private class CursorIterator implements PageIterator {
|
||||||
|
|
||||||
private PagePosition position = null;
|
private PagePositionAndFileOffset position = null;
|
||||||
|
|
||||||
private PagePosition lastOperation = null;
|
private PagePositionAndFileOffset lastOperation = null;
|
||||||
|
|
||||||
private volatile boolean isredelivery = false;
|
private volatile boolean isredelivery = false;
|
||||||
|
|
||||||
|
@ -1234,7 +1271,6 @@ public final class PageSubscriptionImpl implements PageSubscription {
|
||||||
this.browsing = browsing;
|
this.browsing = browsing;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
private CursorIterator() {
|
private CursorIterator() {
|
||||||
this.browsing = false;
|
this.browsing = false;
|
||||||
}
|
}
|
||||||
|
@ -1284,8 +1320,8 @@ public final class PageSubscriptionImpl implements PageSubscription {
|
||||||
|
|
||||||
PagedReference message;
|
PagedReference message;
|
||||||
|
|
||||||
PagePosition lastPosition = position;
|
PagePositionAndFileOffset lastPosition = position;
|
||||||
PagePosition tmpPosition = position;
|
PagePositionAndFileOffset tmpPosition = position;
|
||||||
|
|
||||||
do {
|
do {
|
||||||
synchronized (redeliveries) {
|
synchronized (redeliveries) {
|
||||||
|
@ -1310,7 +1346,8 @@ public final class PageSubscriptionImpl implements PageSubscription {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
tmpPosition = message.getPosition();
|
int nextFileOffset = message.getPosition().getFileOffset() == -1 ? -1 : message.getPosition().getFileOffset() + message.getPagedMessage().getEncodeSize() + Page.SIZE_RECORD;
|
||||||
|
tmpPosition = new PagePositionAndFileOffset(nextFileOffset, message.getPosition());
|
||||||
|
|
||||||
boolean valid = true;
|
boolean valid = true;
|
||||||
boolean ignored = false;
|
boolean ignored = false;
|
||||||
|
@ -1361,7 +1398,7 @@ public final class PageSubscriptionImpl implements PageSubscription {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
position = message.getPosition();
|
position = tmpPosition;
|
||||||
|
|
||||||
if (valid) {
|
if (valid) {
|
||||||
match = match(message.getMessage());
|
match = match(message.getMessage());
|
||||||
|
@ -1417,6 +1454,13 @@ public final class PageSubscriptionImpl implements PageSubscription {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() {
|
public void close() {
|
||||||
|
// When the CursorIterator(especially browse one) is closed, we need to close page they opened
|
||||||
|
if (position != null) {
|
||||||
|
PageReader pageReader = pageReaders.remove(position.pagePosition.getPageNr());
|
||||||
|
if (pageReader != null) {
|
||||||
|
pageReader.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1458,4 +1502,20 @@ public final class PageSubscriptionImpl implements PageSubscription {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected static class PagePositionAndFileOffset {
|
||||||
|
|
||||||
|
private final int nextFileOffset;
|
||||||
|
private final PagePosition pagePosition;
|
||||||
|
|
||||||
|
PagePositionAndFileOffset(int nextFileOffset, PagePosition pagePosition) {
|
||||||
|
this.nextFileOffset = nextFileOffset;
|
||||||
|
this.pagePosition = pagePosition;
|
||||||
|
}
|
||||||
|
|
||||||
|
PagePosition nextPagePostion() {
|
||||||
|
int messageNr = pagePosition.getMessageNr();
|
||||||
|
return new PagePositionImpl(pagePosition.getPageNr(), messageNr + 1, messageNr + 1 == 0 ? 0 : nextFileOffset);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -80,6 +80,12 @@ public final class Page implements Comparable<Page> {
|
||||||
*/
|
*/
|
||||||
private Set<PageSubscriptionCounter> pendingCounters;
|
private Set<PageSubscriptionCounter> pendingCounters;
|
||||||
|
|
||||||
|
private int lastReadMessageNumber;
|
||||||
|
private ByteBuffer readFileBuffer;
|
||||||
|
private final ByteBuffer headerBuffer = ByteBuffer.allocate(HEADER_SIZE);
|
||||||
|
private ChannelBufferWrapper readFileBufferWrapper;
|
||||||
|
private int readProcessedBytes;
|
||||||
|
|
||||||
public Page(final SimpleString storeName,
|
public Page(final SimpleString storeName,
|
||||||
final StorageManager storageManager,
|
final StorageManager storageManager,
|
||||||
final SequentialFileFactory factory,
|
final SequentialFileFactory factory,
|
||||||
|
@ -90,6 +96,7 @@ public final class Page implements Comparable<Page> {
|
||||||
fileFactory = factory;
|
fileFactory = factory;
|
||||||
this.storageManager = storageManager;
|
this.storageManager = storageManager;
|
||||||
this.storeName = storeName;
|
this.storeName = storeName;
|
||||||
|
resetReadMessageStatus();
|
||||||
}
|
}
|
||||||
|
|
||||||
public int getPageId() {
|
public int getPageId() {
|
||||||
|
@ -104,6 +111,129 @@ public final class Page implements Comparable<Page> {
|
||||||
return pageCache;
|
return pageCache;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private synchronized void resetReadMessageStatus() {
|
||||||
|
lastReadMessageNumber = -3;
|
||||||
|
readProcessedBytes = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
public synchronized PagedMessage readMessage(int startOffset,
|
||||||
|
int startMessageNumber,
|
||||||
|
int targetMessageNumber) throws Exception {
|
||||||
|
assert startMessageNumber <= targetMessageNumber;
|
||||||
|
|
||||||
|
if (!file.isOpen()) {
|
||||||
|
throw ActiveMQMessageBundle.BUNDLE.invalidPageIO();
|
||||||
|
}
|
||||||
|
final int fileSize = (int) file.size();
|
||||||
|
try {
|
||||||
|
if (readFileBuffer == null) {
|
||||||
|
readProcessedBytes = startOffset;
|
||||||
|
file.position(readProcessedBytes);
|
||||||
|
readFileBuffer = fileFactory.allocateDirectBuffer(Math.min(fileSize - readProcessedBytes, MIN_CHUNK_SIZE));
|
||||||
|
//the wrapper is reused to avoid unnecessary allocations
|
||||||
|
readFileBufferWrapper = wrapWhole(readFileBuffer);
|
||||||
|
readFileBuffer.limit(0);
|
||||||
|
} else if (lastReadMessageNumber + 1 != targetMessageNumber) {
|
||||||
|
readProcessedBytes = startOffset;
|
||||||
|
file.position(readProcessedBytes);
|
||||||
|
readFileBuffer.limit(0);
|
||||||
|
} else {
|
||||||
|
startMessageNumber = targetMessageNumber;
|
||||||
|
}
|
||||||
|
|
||||||
|
int remainingBytes = fileSize - readProcessedBytes;
|
||||||
|
int currentMessageNumber = startMessageNumber;
|
||||||
|
// First we search forward for the file position of the target number message
|
||||||
|
while (remainingBytes >= MINIMUM_MSG_PERSISTENT_SIZE && currentMessageNumber < targetMessageNumber) {
|
||||||
|
headerBuffer.clear();
|
||||||
|
file.read(headerBuffer);
|
||||||
|
headerBuffer.position(0);
|
||||||
|
|
||||||
|
if (headerBuffer.remaining() >= HEADER_SIZE && headerBuffer.get() == START_BYTE) {
|
||||||
|
final int encodedSize = headerBuffer.getInt();
|
||||||
|
final int nextPosition = readProcessedBytes + HEADER_AND_TRAILER_SIZE + encodedSize;
|
||||||
|
if (nextPosition <= fileSize) {
|
||||||
|
final int endPosition = nextPosition - 1;
|
||||||
|
file.position(endPosition);
|
||||||
|
headerBuffer.rewind();
|
||||||
|
headerBuffer.limit(1);
|
||||||
|
file.read(headerBuffer);
|
||||||
|
headerBuffer.position(0);
|
||||||
|
|
||||||
|
if (headerBuffer.remaining() >= 1 && headerBuffer.get() == END_BYTE) {
|
||||||
|
readProcessedBytes = nextPosition;
|
||||||
|
currentMessageNumber++;
|
||||||
|
} else {
|
||||||
|
markFileAsSuspect(file.getFileName(), readProcessedBytes, currentMessageNumber);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
markFileAsSuspect(file.getFileName(), readProcessedBytes, currentMessageNumber);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
markFileAsSuspect(file.getFileName(), readProcessedBytes, currentMessageNumber);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
remainingBytes = fileSize - readProcessedBytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Then we read the target message
|
||||||
|
if (currentMessageNumber == targetMessageNumber && remainingBytes >= MINIMUM_MSG_PERSISTENT_SIZE) {
|
||||||
|
final ByteBuffer oldFileBuffer = readFileBuffer;
|
||||||
|
readFileBuffer = readIntoFileBufferIfNecessary(readFileBuffer, MINIMUM_MSG_PERSISTENT_SIZE, true);
|
||||||
|
//change wrapper if fileBuffer has changed
|
||||||
|
if (readFileBuffer != oldFileBuffer) {
|
||||||
|
readFileBufferWrapper = wrapWhole(readFileBuffer);
|
||||||
|
}
|
||||||
|
final byte startByte = readFileBuffer.get();
|
||||||
|
if (startByte == Page.START_BYTE) {
|
||||||
|
final int encodedSize = readFileBuffer.getInt();
|
||||||
|
final int nextPosition = readProcessedBytes + HEADER_AND_TRAILER_SIZE + encodedSize;
|
||||||
|
if (nextPosition <= fileSize) {
|
||||||
|
final ByteBuffer currentFileBuffer = readFileBuffer;
|
||||||
|
readFileBuffer = readIntoFileBufferIfNecessary(readFileBuffer, encodedSize + 1, true);
|
||||||
|
//change wrapper if fileBuffer has changed
|
||||||
|
if (readFileBuffer != currentFileBuffer) {
|
||||||
|
readFileBufferWrapper = wrapWhole(readFileBuffer);
|
||||||
|
}
|
||||||
|
final int endPosition = readFileBuffer.position() + encodedSize;
|
||||||
|
//this check must be performed upfront decoding
|
||||||
|
if (readFileBuffer.remaining() >= (encodedSize + 1) && readFileBuffer.get(endPosition) == Page.END_BYTE) {
|
||||||
|
final PagedMessageImpl msg = new PagedMessageImpl(storageManager);
|
||||||
|
readFileBufferWrapper.setIndex(readFileBuffer.position(), endPosition);
|
||||||
|
msg.decode(readFileBufferWrapper);
|
||||||
|
readFileBuffer.position(endPosition + 1);
|
||||||
|
assert readFileBuffer.get(endPosition) == Page.END_BYTE : "decoding cannot change end byte";
|
||||||
|
msg.initMessage(storageManager);
|
||||||
|
if (logger.isTraceEnabled()) {
|
||||||
|
logger.tracef("Reading message %s on pageId=%d for address=%s", msg, pageId, storeName);
|
||||||
|
}
|
||||||
|
readProcessedBytes = nextPosition;
|
||||||
|
lastReadMessageNumber = targetMessageNumber;
|
||||||
|
return msg;
|
||||||
|
} else {
|
||||||
|
markFileAsSuspect(file.getFileName(), readProcessedBytes, currentMessageNumber);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
markFileAsSuspect(file.getFileName(), readProcessedBytes, currentMessageNumber);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
markFileAsSuspect(file.getFileName(), readProcessedBytes, currentMessageNumber);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
resetReadMessageStatus();
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
resetReadMessageStatus();
|
||||||
|
throw new RuntimeException("target message no." + targetMessageNumber + " not found from start offset " + startOffset + " and start message number " + startMessageNumber);
|
||||||
|
}
|
||||||
|
|
||||||
|
public synchronized List<PagedMessage> read() throws Exception {
|
||||||
|
return read(storageManager);
|
||||||
|
}
|
||||||
|
|
||||||
public synchronized List<PagedMessage> read(StorageManager storage) throws Exception {
|
public synchronized List<PagedMessage> read(StorageManager storage) throws Exception {
|
||||||
if (logger.isDebugEnabled()) {
|
if (logger.isDebugEnabled()) {
|
||||||
logger.debug("reading page " + this.pageId + " on address = " + storeName);
|
logger.debug("reading page " + this.pageId + " on address = " + storeName);
|
||||||
|
@ -122,10 +252,17 @@ public final class Page implements Comparable<Page> {
|
||||||
return messages;
|
return messages;
|
||||||
}
|
}
|
||||||
|
|
||||||
private ByteBuffer allocateAndReadIntoFileBuffer(ByteBuffer fileBuffer, int requiredBytes) throws Exception {
|
private ByteBuffer allocateAndReadIntoFileBuffer(ByteBuffer fileBuffer, int requiredBytes, boolean direct) throws Exception {
|
||||||
final ByteBuffer newFileBuffer = fileFactory.newBuffer(Math.max(requiredBytes, MIN_CHUNK_SIZE));
|
ByteBuffer newFileBuffer;
|
||||||
newFileBuffer.put(fileBuffer);
|
if (direct) {
|
||||||
fileFactory.releaseBuffer(fileBuffer);
|
newFileBuffer = fileFactory.allocateDirectBuffer(Math.max(requiredBytes, MIN_CHUNK_SIZE));
|
||||||
|
newFileBuffer.put(fileBuffer);
|
||||||
|
fileFactory.releaseDirectBuffer(fileBuffer);
|
||||||
|
} else {
|
||||||
|
newFileBuffer = fileFactory.newBuffer(Math.max(requiredBytes, MIN_CHUNK_SIZE));
|
||||||
|
newFileBuffer.put(fileBuffer);
|
||||||
|
fileFactory.releaseBuffer(fileBuffer);
|
||||||
|
}
|
||||||
fileBuffer = newFileBuffer;
|
fileBuffer = newFileBuffer;
|
||||||
//move the limit to allow reading as much as possible from the file
|
//move the limit to allow reading as much as possible from the file
|
||||||
fileBuffer.limit(fileBuffer.capacity());
|
fileBuffer.limit(fileBuffer.capacity());
|
||||||
|
@ -138,7 +275,7 @@ public final class Page implements Comparable<Page> {
|
||||||
* It returns a {@link ByteBuffer} that has {@link ByteBuffer#remaining()} bytes >= {@code requiredBytes}
|
* It returns a {@link ByteBuffer} that has {@link ByteBuffer#remaining()} bytes >= {@code requiredBytes}
|
||||||
* of valid data from {@link #file}.
|
* of valid data from {@link #file}.
|
||||||
*/
|
*/
|
||||||
private ByteBuffer readIntoFileBufferIfNecessary(ByteBuffer fileBuffer, int requiredBytes) throws Exception {
|
private ByteBuffer readIntoFileBufferIfNecessary(ByteBuffer fileBuffer, int requiredBytes, boolean direct) throws Exception {
|
||||||
final int remaining = fileBuffer.remaining();
|
final int remaining = fileBuffer.remaining();
|
||||||
//fileBuffer::remaining is the current size of valid data
|
//fileBuffer::remaining is the current size of valid data
|
||||||
final int bytesToBeRead = requiredBytes - remaining;
|
final int bytesToBeRead = requiredBytes - remaining;
|
||||||
|
@ -162,7 +299,7 @@ public final class Page implements Comparable<Page> {
|
||||||
file.read(fileBuffer);
|
file.read(fileBuffer);
|
||||||
fileBuffer.position(0);
|
fileBuffer.position(0);
|
||||||
} else {
|
} else {
|
||||||
fileBuffer = allocateAndReadIntoFileBuffer(fileBuffer, requiredBytes);
|
fileBuffer = allocateAndReadIntoFileBuffer(fileBuffer, requiredBytes, direct);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return fileBuffer;
|
return fileBuffer;
|
||||||
|
@ -189,6 +326,7 @@ public final class Page implements Comparable<Page> {
|
||||||
//sizeOf(START_BYTE) + sizeOf(MESSAGE LENGTH) + sizeOf(END_BYTE)
|
//sizeOf(START_BYTE) + sizeOf(MESSAGE LENGTH) + sizeOf(END_BYTE)
|
||||||
private static final int HEADER_AND_TRAILER_SIZE = DataConstants.SIZE_INT + 2;
|
private static final int HEADER_AND_TRAILER_SIZE = DataConstants.SIZE_INT + 2;
|
||||||
private static final int MINIMUM_MSG_PERSISTENT_SIZE = HEADER_AND_TRAILER_SIZE;
|
private static final int MINIMUM_MSG_PERSISTENT_SIZE = HEADER_AND_TRAILER_SIZE;
|
||||||
|
private static final int HEADER_SIZE = HEADER_AND_TRAILER_SIZE - 1;
|
||||||
private static final int MIN_CHUNK_SIZE = Env.osPageSize();
|
private static final int MIN_CHUNK_SIZE = Env.osPageSize();
|
||||||
|
|
||||||
private List<PagedMessage> readFromSequentialFile(StorageManager storage) throws Exception {
|
private List<PagedMessage> readFromSequentialFile(StorageManager storage) throws Exception {
|
||||||
|
@ -208,7 +346,7 @@ public final class Page implements Comparable<Page> {
|
||||||
fileBuffer.limit(0);
|
fileBuffer.limit(0);
|
||||||
do {
|
do {
|
||||||
final ByteBuffer oldFileBuffer = fileBuffer;
|
final ByteBuffer oldFileBuffer = fileBuffer;
|
||||||
fileBuffer = readIntoFileBufferIfNecessary(fileBuffer, MINIMUM_MSG_PERSISTENT_SIZE);
|
fileBuffer = readIntoFileBufferIfNecessary(fileBuffer, MINIMUM_MSG_PERSISTENT_SIZE, false);
|
||||||
//change wrapper if fileBuffer has changed
|
//change wrapper if fileBuffer has changed
|
||||||
if (fileBuffer != oldFileBuffer) {
|
if (fileBuffer != oldFileBuffer) {
|
||||||
fileBufferWrapper = wrapWhole(fileBuffer);
|
fileBufferWrapper = wrapWhole(fileBuffer);
|
||||||
|
@ -219,7 +357,7 @@ public final class Page implements Comparable<Page> {
|
||||||
final int nextPosition = processedBytes + HEADER_AND_TRAILER_SIZE + encodedSize;
|
final int nextPosition = processedBytes + HEADER_AND_TRAILER_SIZE + encodedSize;
|
||||||
if (nextPosition <= fileSize) {
|
if (nextPosition <= fileSize) {
|
||||||
final ByteBuffer currentFileBuffer = fileBuffer;
|
final ByteBuffer currentFileBuffer = fileBuffer;
|
||||||
fileBuffer = readIntoFileBufferIfNecessary(fileBuffer, encodedSize + 1);
|
fileBuffer = readIntoFileBufferIfNecessary(fileBuffer, encodedSize + 1, false);
|
||||||
//change wrapper if fileBuffer has changed
|
//change wrapper if fileBuffer has changed
|
||||||
if (fileBuffer != currentFileBuffer) {
|
if (fileBuffer != currentFileBuffer) {
|
||||||
fileBufferWrapper = wrapWhole(fileBuffer);
|
fileBufferWrapper = wrapWhole(fileBuffer);
|
||||||
|
@ -317,6 +455,11 @@ public final class Page implements Comparable<Page> {
|
||||||
* While reading the cache we don't need (and shouldn't inform the backup
|
* While reading the cache we don't need (and shouldn't inform the backup
|
||||||
*/
|
*/
|
||||||
public synchronized void close(boolean sendEvent, boolean waitSync) throws Exception {
|
public synchronized void close(boolean sendEvent, boolean waitSync) throws Exception {
|
||||||
|
if (readFileBuffer != null) {
|
||||||
|
fileFactory.releaseDirectBuffer(readFileBuffer);
|
||||||
|
readFileBuffer = null;
|
||||||
|
}
|
||||||
|
|
||||||
if (sendEvent && storageManager != null) {
|
if (sendEvent && storageManager != null) {
|
||||||
storageManager.pageClosed(storeName, pageId);
|
storageManager.pageClosed(storeName, pageId);
|
||||||
}
|
}
|
||||||
|
|
|
@ -82,6 +82,8 @@ public class PagingStoreFactoryDatabase implements PagingStoreFactory {
|
||||||
|
|
||||||
private JDBCSequentialFile directoryList;
|
private JDBCSequentialFile directoryList;
|
||||||
|
|
||||||
|
private final boolean readWholePage;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ScheduledExecutorService getScheduledExecutor() {
|
public ScheduledExecutorService getScheduledExecutor() {
|
||||||
return scheduledExecutor;
|
return scheduledExecutor;
|
||||||
|
@ -105,6 +107,17 @@ public class PagingStoreFactoryDatabase implements PagingStoreFactory {
|
||||||
final ExecutorFactory executorFactory,
|
final ExecutorFactory executorFactory,
|
||||||
final boolean syncNonTransactional,
|
final boolean syncNonTransactional,
|
||||||
final IOCriticalErrorListener critialErrorListener) throws Exception {
|
final IOCriticalErrorListener critialErrorListener) throws Exception {
|
||||||
|
this(dbConf, storageManager, syncTimeout, scheduledExecutor, executorFactory, syncNonTransactional, critialErrorListener, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
public PagingStoreFactoryDatabase(final DatabaseStorageConfiguration dbConf,
|
||||||
|
final StorageManager storageManager,
|
||||||
|
final long syncTimeout,
|
||||||
|
final ScheduledExecutorService scheduledExecutor,
|
||||||
|
final ExecutorFactory executorFactory,
|
||||||
|
final boolean syncNonTransactional,
|
||||||
|
final IOCriticalErrorListener critialErrorListener,
|
||||||
|
final boolean readWholePage) throws Exception {
|
||||||
this.storageManager = storageManager;
|
this.storageManager = storageManager;
|
||||||
this.executorFactory = executorFactory;
|
this.executorFactory = executorFactory;
|
||||||
this.syncNonTransactional = syncNonTransactional;
|
this.syncNonTransactional = syncNonTransactional;
|
||||||
|
@ -113,6 +126,7 @@ public class PagingStoreFactoryDatabase implements PagingStoreFactory {
|
||||||
this.dbConf = dbConf;
|
this.dbConf = dbConf;
|
||||||
this.criticalErrorListener = critialErrorListener;
|
this.criticalErrorListener = critialErrorListener;
|
||||||
this.factoryToTableName = new HashMap<>();
|
this.factoryToTableName = new HashMap<>();
|
||||||
|
this.readWholePage = readWholePage;
|
||||||
start();
|
start();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -160,7 +174,7 @@ public class PagingStoreFactoryDatabase implements PagingStoreFactory {
|
||||||
StorageManager storageManager,
|
StorageManager storageManager,
|
||||||
AddressSettings addressSettings,
|
AddressSettings addressSettings,
|
||||||
ArtemisExecutor executor) {
|
ArtemisExecutor executor) {
|
||||||
return new PageCursorProviderImpl(store, storageManager, executor, addressSettings.getPageCacheMaxSize());
|
return new PageCursorProviderImpl(store, storageManager, executor, addressSettings.getPageCacheMaxSize(), readWholePage);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -76,6 +76,8 @@ public class PagingStoreFactoryNIO implements PagingStoreFactory {
|
||||||
|
|
||||||
private final IOCriticalErrorListener critialErrorListener;
|
private final IOCriticalErrorListener critialErrorListener;
|
||||||
|
|
||||||
|
private final boolean readWholePage;
|
||||||
|
|
||||||
public File getDirectory() {
|
public File getDirectory() {
|
||||||
return directory;
|
return directory;
|
||||||
}
|
}
|
||||||
|
@ -111,6 +113,17 @@ public class PagingStoreFactoryNIO implements PagingStoreFactory {
|
||||||
final ExecutorFactory executorFactory,
|
final ExecutorFactory executorFactory,
|
||||||
final boolean syncNonTransactional,
|
final boolean syncNonTransactional,
|
||||||
final IOCriticalErrorListener critialErrorListener) {
|
final IOCriticalErrorListener critialErrorListener) {
|
||||||
|
this(storageManager, directory, syncTimeout, scheduledExecutor, executorFactory, syncNonTransactional, critialErrorListener, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
public PagingStoreFactoryNIO(final StorageManager storageManager,
|
||||||
|
final File directory,
|
||||||
|
final long syncTimeout,
|
||||||
|
final ScheduledExecutorService scheduledExecutor,
|
||||||
|
final ExecutorFactory executorFactory,
|
||||||
|
final boolean syncNonTransactional,
|
||||||
|
final IOCriticalErrorListener critialErrorListener,
|
||||||
|
final boolean readWholePage) {
|
||||||
this.storageManager = storageManager;
|
this.storageManager = storageManager;
|
||||||
this.directory = directory;
|
this.directory = directory;
|
||||||
this.executorFactory = executorFactory;
|
this.executorFactory = executorFactory;
|
||||||
|
@ -118,6 +131,7 @@ public class PagingStoreFactoryNIO implements PagingStoreFactory {
|
||||||
this.scheduledExecutor = scheduledExecutor;
|
this.scheduledExecutor = scheduledExecutor;
|
||||||
this.syncTimeout = syncTimeout;
|
this.syncTimeout = syncTimeout;
|
||||||
this.critialErrorListener = critialErrorListener;
|
this.critialErrorListener = critialErrorListener;
|
||||||
|
this.readWholePage = readWholePage;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Public --------------------------------------------------------
|
// Public --------------------------------------------------------
|
||||||
|
@ -146,7 +160,7 @@ public class PagingStoreFactoryNIO implements PagingStoreFactory {
|
||||||
StorageManager storageManager,
|
StorageManager storageManager,
|
||||||
AddressSettings addressSettings,
|
AddressSettings addressSettings,
|
||||||
ArtemisExecutor executor) {
|
ArtemisExecutor executor) {
|
||||||
return new PageCursorProviderImpl(store, storageManager, executor, addressSettings.getPageCacheMaxSize());
|
return new PageCursorProviderImpl(store, storageManager, executor, addressSettings.getPageCacheMaxSize(), readWholePage);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -2571,9 +2571,9 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
||||||
protected PagingStoreFactory getPagingStoreFactory() throws Exception {
|
protected PagingStoreFactory getPagingStoreFactory() throws Exception {
|
||||||
if (configuration.getStoreConfiguration() != null && configuration.getStoreConfiguration().getStoreType() == StoreConfiguration.StoreType.DATABASE) {
|
if (configuration.getStoreConfiguration() != null && configuration.getStoreConfiguration().getStoreType() == StoreConfiguration.StoreType.DATABASE) {
|
||||||
DatabaseStorageConfiguration dbConf = (DatabaseStorageConfiguration) configuration.getStoreConfiguration();
|
DatabaseStorageConfiguration dbConf = (DatabaseStorageConfiguration) configuration.getStoreConfiguration();
|
||||||
return new PagingStoreFactoryDatabase(dbConf, storageManager, configuration.getJournalBufferTimeout_NIO(), scheduledPool, ioExecutorFactory, false, shutdownOnCriticalIO);
|
return new PagingStoreFactoryDatabase(dbConf, storageManager, configuration.getJournalBufferTimeout_NIO(), scheduledPool, ioExecutorFactory, false, shutdownOnCriticalIO, configuration.isReadWholePage());
|
||||||
}
|
}
|
||||||
return new PagingStoreFactoryNIO(storageManager, configuration.getPagingLocation(), configuration.getJournalBufferTimeout_NIO(), scheduledPool, ioExecutorFactory, configuration.isJournalSyncNonTransactional(), shutdownOnCriticalIO);
|
return new PagingStoreFactoryNIO(storageManager, configuration.getPagingLocation(), configuration.getJournalBufferTimeout_NIO(), scheduledPool, ioExecutorFactory, configuration.isJournalSyncNonTransactional(), shutdownOnCriticalIO, configuration.isReadWholePage());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -29,7 +29,6 @@ import java.util.Map;
|
||||||
import java.util.NoSuchElementException;
|
import java.util.NoSuchElementException;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
|
||||||
import java.util.concurrent.RejectedExecutionException;
|
import java.util.concurrent.RejectedExecutionException;
|
||||||
import java.util.concurrent.ScheduledExecutorService;
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
import java.util.concurrent.ScheduledFuture;
|
import java.util.concurrent.ScheduledFuture;
|
||||||
|
@ -108,6 +107,7 @@ import org.apache.activemq.artemis.utils.collections.TypedProperties;
|
||||||
import org.apache.activemq.artemis.utils.critical.CriticalComponentImpl;
|
import org.apache.activemq.artemis.utils.critical.CriticalComponentImpl;
|
||||||
import org.apache.activemq.artemis.utils.critical.EmptyCriticalAnalyzer;
|
import org.apache.activemq.artemis.utils.critical.EmptyCriticalAnalyzer;
|
||||||
import org.jboss.logging.Logger;
|
import org.jboss.logging.Logger;
|
||||||
|
import org.jctools.queues.MpscUnboundedArrayQueue;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Implementation of a Queue
|
* Implementation of a Queue
|
||||||
|
@ -176,7 +176,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
||||||
// Messages will first enter intermediateMessageReferences
|
// Messages will first enter intermediateMessageReferences
|
||||||
// Before they are added to messageReferences
|
// Before they are added to messageReferences
|
||||||
// This is to avoid locking the queue on the producer
|
// This is to avoid locking the queue on the producer
|
||||||
private final ConcurrentLinkedQueue<MessageReference> intermediateMessageReferences = new ConcurrentLinkedQueue<>();
|
private final MpscUnboundedArrayQueue<MessageReference> intermediateMessageReferences = new MpscUnboundedArrayQueue<>(8192);
|
||||||
|
|
||||||
// This is where messages are stored
|
// This is where messages are stored
|
||||||
private final PriorityLinkedList<MessageReference> messageReferences = new PriorityLinkedListImpl<>(QueueImpl.NUM_PRIORITIES);
|
private final PriorityLinkedList<MessageReference> messageReferences = new PriorityLinkedListImpl<>(QueueImpl.NUM_PRIORITIES);
|
||||||
|
@ -209,7 +209,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
||||||
|
|
||||||
private long pauseStatusRecord = -1;
|
private long pauseStatusRecord = -1;
|
||||||
|
|
||||||
private static final int MAX_SCHEDULED_RUNNERS = 2;
|
private static final int MAX_SCHEDULED_RUNNERS = 1;
|
||||||
|
private static final int MAX_DEPAGE_NUM = MAX_DELIVERIES_IN_LOOP * MAX_SCHEDULED_RUNNERS;
|
||||||
|
|
||||||
// We don't ever need more than two DeliverRunner on the executor's list
|
// We don't ever need more than two DeliverRunner on the executor's list
|
||||||
// that is getting the worse scenario possible when one runner is almost finishing before the second started
|
// that is getting the worse scenario possible when one runner is almost finishing before the second started
|
||||||
|
@ -324,13 +325,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
||||||
out.println("consumer: " + holder.consumer.debug());
|
out.println("consumer: " + holder.consumer.debug());
|
||||||
}
|
}
|
||||||
|
|
||||||
for (MessageReference reference : intermediateMessageReferences) {
|
out.println("Intermediate reference size is " + intermediateMessageReferences.size());
|
||||||
out.print("Intermediate reference:" + reference);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (intermediateMessageReferences.isEmpty()) {
|
|
||||||
out.println("No intermediate references");
|
|
||||||
}
|
|
||||||
|
|
||||||
boolean foundRef = false;
|
boolean foundRef = false;
|
||||||
|
|
||||||
|
@ -1042,14 +1037,13 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
||||||
public void deliverAsync() {
|
public void deliverAsync() {
|
||||||
if (scheduledRunners.get() < MAX_SCHEDULED_RUNNERS) {
|
if (scheduledRunners.get() < MAX_SCHEDULED_RUNNERS) {
|
||||||
scheduledRunners.incrementAndGet();
|
scheduledRunners.incrementAndGet();
|
||||||
|
checkDepage();
|
||||||
try {
|
try {
|
||||||
getExecutor().execute(deliverRunner);
|
getExecutor().execute(deliverRunner);
|
||||||
} catch (RejectedExecutionException ignored) {
|
} catch (RejectedExecutionException ignored) {
|
||||||
// no-op
|
// no-op
|
||||||
scheduledRunners.decrementAndGet();
|
scheduledRunners.decrementAndGet();
|
||||||
}
|
}
|
||||||
|
|
||||||
checkDepage();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -2513,6 +2507,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
||||||
logger.debug(this + " doing deliver. messageReferences=" + messageReferences.size());
|
logger.debug(this + " doing deliver. messageReferences=" + messageReferences.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
scheduledRunners.decrementAndGet();
|
||||||
|
|
||||||
doInternalPoll();
|
doInternalPoll();
|
||||||
|
|
||||||
// Either the iterator is empty or the consumer is busy
|
// Either the iterator is empty or the consumer is busy
|
||||||
|
@ -2699,7 +2695,14 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
private boolean needsDepage() {
|
private boolean needsDepage() {
|
||||||
return queueMemorySize.get() < pageSubscription.getPagingStore().getMaxSize();
|
return queueMemorySize.get() < pageSubscription.getPagingStore().getMaxSize() &&
|
||||||
|
/**
|
||||||
|
* In most cases, one depage round following by at most MAX_SCHEDULED_RUNNERS deliver round,
|
||||||
|
* thus we just need to read MAX_DELIVERIES_IN_LOOP * MAX_SCHEDULED_RUNNERS messages. If we read too much, the message reference
|
||||||
|
* maybe discarded by gc collector in response to memory demand and we need to read it again at
|
||||||
|
* a great cost when delivering.
|
||||||
|
*/
|
||||||
|
intermediateMessageReferences.size() + messageReferences.size() < MAX_DEPAGE_NUM;
|
||||||
}
|
}
|
||||||
|
|
||||||
private SimpleString extractGroupID(MessageReference ref) {
|
private SimpleString extractGroupID(MessageReference ref) {
|
||||||
|
@ -3634,8 +3637,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
||||||
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
ActiveMQServerLogger.LOGGER.errorDelivering(e);
|
ActiveMQServerLogger.LOGGER.errorDelivering(e);
|
||||||
} finally {
|
|
||||||
scheduledRunners.decrementAndGet();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -625,6 +625,14 @@
|
||||||
</xsd:annotation>
|
</xsd:annotation>
|
||||||
</xsd:element>
|
</xsd:element>
|
||||||
|
|
||||||
|
<xsd:element name="read-whole-page" type="xsd:boolean" default="false" maxOccurs="1" minOccurs="0">
|
||||||
|
<xsd:annotation>
|
||||||
|
<xsd:documentation>
|
||||||
|
Whether the whole page is read while getting message after page cache is evicted.
|
||||||
|
</xsd:documentation>
|
||||||
|
</xsd:annotation>
|
||||||
|
</xsd:element>
|
||||||
|
|
||||||
<xsd:element name="journal-directory" type="xsd:string" default="data/journal" maxOccurs="1" minOccurs="0">
|
<xsd:element name="journal-directory" type="xsd:string" default="data/journal" maxOccurs="1" minOccurs="0">
|
||||||
<xsd:annotation>
|
<xsd:annotation>
|
||||||
<xsd:documentation>
|
<xsd:documentation>
|
||||||
|
|
|
@ -88,6 +88,7 @@ public class ConfigurationImplTest extends ActiveMQTestBase {
|
||||||
Assert.assertEquals(ActiveMQDefaultConfiguration.getDefaultMemoryMeasureInterval(), conf.getMemoryMeasureInterval());
|
Assert.assertEquals(ActiveMQDefaultConfiguration.getDefaultMemoryMeasureInterval(), conf.getMemoryMeasureInterval());
|
||||||
Assert.assertEquals(conf.getJournalLocation(), conf.getNodeManagerLockLocation());
|
Assert.assertEquals(conf.getJournalLocation(), conf.getNodeManagerLockLocation());
|
||||||
Assert.assertNull(conf.getJournalDeviceBlockSize());
|
Assert.assertNull(conf.getJournalDeviceBlockSize());
|
||||||
|
Assert.assertEquals(ActiveMQDefaultConfiguration.isDefaultReadWholePage(), conf.isReadWholePage());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
|
@ -125,6 +125,7 @@ public class FileConfigurationTest extends ConfigurationImplTest {
|
||||||
Assert.assertEquals(true, conf.isAmqpUseCoreSubscriptionNaming());
|
Assert.assertEquals(true, conf.isAmqpUseCoreSubscriptionNaming());
|
||||||
|
|
||||||
Assert.assertEquals("max concurrent io", 17, conf.getPageMaxConcurrentIO());
|
Assert.assertEquals("max concurrent io", 17, conf.getPageMaxConcurrentIO());
|
||||||
|
Assert.assertEquals(true, conf.isReadWholePage());
|
||||||
Assert.assertEquals("somedir2", conf.getJournalDirectory());
|
Assert.assertEquals("somedir2", conf.getJournalDirectory());
|
||||||
Assert.assertEquals(false, conf.isCreateJournalDir());
|
Assert.assertEquals(false, conf.isCreateJournalDir());
|
||||||
Assert.assertEquals(JournalType.NIO, conf.getJournalType());
|
Assert.assertEquals(JournalType.NIO, conf.getJournalType());
|
||||||
|
|
|
@ -74,4 +74,41 @@ public class PageCursorProviderImplTest {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 30_000)
|
||||||
|
public void returnPageCacheImplIfEvicted() throws Exception {
|
||||||
|
returnCacheIfEvicted(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 30_000)
|
||||||
|
public void returnPageReaderIfEvicted() throws Exception {
|
||||||
|
returnCacheIfEvicted(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void returnCacheIfEvicted(boolean readWholePage) throws Exception {
|
||||||
|
final PagingStore pagingStore = mock(PagingStore.class);
|
||||||
|
final StorageManager storageManager = mock(StorageManager.class);
|
||||||
|
when(storageManager.beforePageRead(anyLong(), any(TimeUnit.class))).thenReturn(true);
|
||||||
|
final int pages = 2;
|
||||||
|
final ArtemisExecutor artemisExecutor = mock(ArtemisExecutor.class);
|
||||||
|
final PageCursorProviderImpl pageCursorProvider = new PageCursorProviderImpl(pagingStore, storageManager, artemisExecutor, 1, readWholePage);
|
||||||
|
when(pagingStore.getCurrentWritingPage()).thenReturn(pages);
|
||||||
|
when(pagingStore.checkPageFileExists(anyInt())).thenReturn(true);
|
||||||
|
final Page firstPage = mock(Page.class);
|
||||||
|
when(firstPage.getPageId()).thenReturn(1);
|
||||||
|
when(pagingStore.createPage(1)).thenReturn(firstPage);
|
||||||
|
final Page secondPage = mock(Page.class);
|
||||||
|
when(secondPage.getPageId()).thenReturn(2);
|
||||||
|
when(pagingStore.createPage(2)).thenReturn(secondPage);
|
||||||
|
|
||||||
|
Assert.assertTrue(pageCursorProvider.getPageCache(1) instanceof PageCacheImpl);
|
||||||
|
Assert.assertTrue(pageCursorProvider.getPageCache(2) instanceof PageCacheImpl);
|
||||||
|
if (readWholePage) {
|
||||||
|
Assert.assertTrue(pageCursorProvider.getPageCache(1) instanceof PageCacheImpl);
|
||||||
|
} else {
|
||||||
|
Assert.assertTrue(pageCursorProvider.getPageCache(1) instanceof PageReader);
|
||||||
|
}
|
||||||
|
Assert.assertEquals(pageCursorProvider.getCacheSize(), 1);
|
||||||
|
Assert.assertTrue(pageCursorProvider.getPageCache(2) instanceof PageCacheImpl);
|
||||||
|
pageCursorProvider.stop();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,145 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.activemq.artemis.core.paging.cursor.impl;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.api.core.ICoreMessage;
|
||||||
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
|
import org.apache.activemq.artemis.core.io.SequentialFile;
|
||||||
|
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
|
||||||
|
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
|
||||||
|
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
|
||||||
|
import org.apache.activemq.artemis.core.paging.PagedMessage;
|
||||||
|
import org.apache.activemq.artemis.core.paging.cursor.NonExistentPage;
|
||||||
|
import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
|
||||||
|
import org.apache.activemq.artemis.core.paging.impl.Page;
|
||||||
|
import org.apache.activemq.artemis.core.paging.impl.PagedMessageImpl;
|
||||||
|
import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
|
||||||
|
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import static org.apache.activemq.artemis.core.paging.cursor.impl.PageSubscriptionImpl.PagePositionAndFileOffset;
|
||||||
|
import static org.apache.activemq.artemis.utils.RandomUtil.randomBoolean;
|
||||||
|
|
||||||
|
public class PageReaderTest extends ActiveMQTestBase {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPageReadMessage() throws Exception {
|
||||||
|
recreateDirectory(getTestDir());
|
||||||
|
|
||||||
|
int num = 50;
|
||||||
|
int[] offsets = createPage(num);
|
||||||
|
PageReader pageReader = getPageReader();
|
||||||
|
|
||||||
|
PagedMessage[] pagedMessages = pageReader.getMessages();
|
||||||
|
assertEquals(pagedMessages.length, num);
|
||||||
|
|
||||||
|
PagedMessage pagedMessage = null;
|
||||||
|
for (int i = 0; i < num; i++) {
|
||||||
|
if (randomBoolean()) {
|
||||||
|
PagePosition pagePosition = new PagePositionImpl(10, i);
|
||||||
|
pagedMessage = pageReader.getMessage(pagePosition);
|
||||||
|
} else {
|
||||||
|
int nextFileOffset = pagedMessage == null ? -1 : offsets[i - 1] + pagedMessage.getEncodeSize() + Page.SIZE_RECORD;
|
||||||
|
PagePositionAndFileOffset startPosition = new PagePositionAndFileOffset(nextFileOffset, new PagePositionImpl(10, i - 1));
|
||||||
|
PagePosition pagePosition = startPosition.nextPagePostion();
|
||||||
|
assertEquals(offsets[i], pagePosition.getFileOffset());
|
||||||
|
pagedMessage = pageReader.getMessage(pagePosition);
|
||||||
|
}
|
||||||
|
assertNotNull(pagedMessage);
|
||||||
|
assertEquals(pagedMessage.getMessage().getMessageID(), i);
|
||||||
|
assertEquals(pagedMessages[i].getMessage().getMessageID(), i);
|
||||||
|
}
|
||||||
|
|
||||||
|
pageReader.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPageReadMessageBeyondPage() throws Exception {
|
||||||
|
recreateDirectory(getTestDir());
|
||||||
|
|
||||||
|
int num = 10;
|
||||||
|
createPage(num);
|
||||||
|
PageReader pageReader = getPageReader();
|
||||||
|
|
||||||
|
assertNull(pageReader.getMessage(new PagePositionImpl(10, num)));
|
||||||
|
try {
|
||||||
|
pageReader.getMessage(new PagePositionImpl(10, num), true, true);
|
||||||
|
assertFalse("Expect exception since message number is beyond page ", true);
|
||||||
|
} catch (NonExistentPage e) {
|
||||||
|
}
|
||||||
|
|
||||||
|
pageReader.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPageReadMessageKeepOpen() throws Exception {
|
||||||
|
recreateDirectory(getTestDir());
|
||||||
|
|
||||||
|
int num = 10;
|
||||||
|
createPage(num);
|
||||||
|
PageReader pageReader = getPageReader();
|
||||||
|
|
||||||
|
pageReader.getMessage(new PagePositionImpl(10, 1), true, true);
|
||||||
|
assertFalse("Page file should keep open", pageReader.openPage());
|
||||||
|
pageReader.getMessage(new PagePositionImpl(10, 1), true, false);
|
||||||
|
assertFalse("Page file should preserve previous state", pageReader.openPage());
|
||||||
|
|
||||||
|
pageReader.close();
|
||||||
|
pageReader.getMessage(new PagePositionImpl(10, 1), true, false);
|
||||||
|
assertTrue("Page file should preserve previous state", pageReader.openPage());
|
||||||
|
|
||||||
|
pageReader.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
private int[] createPage(int num) throws Exception {
|
||||||
|
SequentialFileFactory factory = new NIOSequentialFileFactory(getTestDirfile(), 1);
|
||||||
|
SequentialFile file = factory.createSequentialFile("00010.page");
|
||||||
|
Page page = new Page(new SimpleString("something"), new NullStorageManager(), factory, file, 10);
|
||||||
|
page.open();
|
||||||
|
SimpleString simpleDestination = new SimpleString("Test");
|
||||||
|
int[] offsets = new int[num];
|
||||||
|
for (int i = 0; i < num; i++) {
|
||||||
|
ICoreMessage msg = new CoreMessage().setMessageID(i).initBuffer(1024);
|
||||||
|
|
||||||
|
for (int j = 0; j < 100; j++) {
|
||||||
|
msg.getBodyBuffer().writeByte((byte) 'b');
|
||||||
|
}
|
||||||
|
|
||||||
|
msg.setAddress(simpleDestination);
|
||||||
|
offsets[i] = (int)page.getFile().position();
|
||||||
|
page.write(new PagedMessageImpl(msg, new long[0]));
|
||||||
|
|
||||||
|
Assert.assertEquals(i + 1, page.getNumberOfMessages());
|
||||||
|
}
|
||||||
|
page.close(false, false);
|
||||||
|
return offsets;
|
||||||
|
}
|
||||||
|
|
||||||
|
private PageReader getPageReader() throws Exception {
|
||||||
|
SequentialFileFactory factory = new NIOSequentialFileFactory(getTestDirfile(), 1);
|
||||||
|
SequentialFile file = factory.createSequentialFile("00010.page");
|
||||||
|
file.open();
|
||||||
|
Page page = new Page(new SimpleString("something"), new NullStorageManager(), factory, file, 10);
|
||||||
|
page.open();
|
||||||
|
page.read(new NullStorageManager());
|
||||||
|
PageReader pageReader = new PageReader(page, page.getNumberOfMessages());
|
||||||
|
return pageReader;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -342,6 +342,7 @@
|
||||||
<bindings-directory>somedir</bindings-directory>
|
<bindings-directory>somedir</bindings-directory>
|
||||||
<create-bindings-dir>false</create-bindings-dir>
|
<create-bindings-dir>false</create-bindings-dir>
|
||||||
<page-max-concurrent-io>17</page-max-concurrent-io>
|
<page-max-concurrent-io>17</page-max-concurrent-io>
|
||||||
|
<read-whole-page>true</read-whole-page>
|
||||||
<journal-directory>somedir2</journal-directory>
|
<journal-directory>somedir2</journal-directory>
|
||||||
<create-journal-dir>false</create-journal-dir>
|
<create-journal-dir>false</create-journal-dir>
|
||||||
<journal-type>NIO</journal-type>
|
<journal-type>NIO</journal-type>
|
||||||
|
|
|
@ -256,6 +256,7 @@
|
||||||
<bindings-directory>somedir</bindings-directory>
|
<bindings-directory>somedir</bindings-directory>
|
||||||
<create-bindings-dir>false</create-bindings-dir>
|
<create-bindings-dir>false</create-bindings-dir>
|
||||||
<page-max-concurrent-io>17</page-max-concurrent-io>
|
<page-max-concurrent-io>17</page-max-concurrent-io>
|
||||||
|
<read-whole-page>true</read-whole-page>
|
||||||
<journal-directory>somedir2</journal-directory>
|
<journal-directory>somedir2</journal-directory>
|
||||||
<create-journal-dir>false</create-journal-dir>
|
<create-journal-dir>false</create-journal-dir>
|
||||||
<journal-type>NIO</journal-type>
|
<journal-type>NIO</journal-type>
|
||||||
|
|
|
@ -617,6 +617,14 @@
|
||||||
</xsd:annotation>
|
</xsd:annotation>
|
||||||
</xsd:element>
|
</xsd:element>
|
||||||
|
|
||||||
|
<xsd:element name="read-whole-page" type="xsd:boolean" default="false" maxOccurs="1" minOccurs="0">
|
||||||
|
<xsd:annotation>
|
||||||
|
<xsd:documentation>
|
||||||
|
Whether the whole page is read while getting message after page cache is evicted.
|
||||||
|
</xsd:documentation>
|
||||||
|
</xsd:annotation>
|
||||||
|
</xsd:element>
|
||||||
|
|
||||||
<xsd:element name="journal-directory" type="xsd:string" default="data/journal" maxOccurs="1" minOccurs="0">
|
<xsd:element name="journal-directory" type="xsd:string" default="data/journal" maxOccurs="1" minOccurs="0">
|
||||||
<xsd:annotation>
|
<xsd:annotation>
|
||||||
<xsd:documentation>
|
<xsd:documentation>
|
||||||
|
|
|
@ -159,6 +159,7 @@ log-delegate-factory-class-name | **deprecated** the name of the factory class t
|
||||||
name | node name; used in topology notifications if set. | n/a
|
name | node name; used in topology notifications if set. | n/a
|
||||||
[password-codec](masking-passwords.md) | the name of the class (and optional configuration properties) used to decode masked passwords. Only valid when `mask-password` is `true`. | n/a
|
[password-codec](masking-passwords.md) | the name of the class (and optional configuration properties) used to decode masked passwords. Only valid when `mask-password` is `true`. | n/a
|
||||||
[page-max-concurrent-io](paging.md) | The max number of concurrent reads allowed on paging. | 5
|
[page-max-concurrent-io](paging.md) | The max number of concurrent reads allowed on paging. | 5
|
||||||
|
[read-whole-page](paging.md) | If true the whole page would be read, otherwise just seek and read while getting message. | `false`
|
||||||
[paging-directory](paging.md#configuration)| the directory to store paged messages in. | `data/paging`
|
[paging-directory](paging.md#configuration)| the directory to store paged messages in. | `data/paging`
|
||||||
[persist-delivery-count-before-delivery](undelivered-messages.md#delivery-count-persistence) | True means that the delivery count is persisted before delivery. False means that this only happens after a message has been cancelled. | `false`
|
[persist-delivery-count-before-delivery](undelivered-messages.md#delivery-count-persistence) | True means that the delivery count is persisted before delivery. False means that this only happens after a message has been cancelled. | `false`
|
||||||
[persistence-enabled](persistence.md#zero-persistence)| true means that the server will use the file based journal for persistence. | `true`
|
[persistence-enabled](persistence.md#zero-persistence)| true means that the server will use the file based journal for persistence. | `true`
|
||||||
|
|
7
pom.xml
7
pom.xml
|
@ -90,6 +90,7 @@
|
||||||
<jgroups.version>3.6.13.Final</jgroups.version>
|
<jgroups.version>3.6.13.Final</jgroups.version>
|
||||||
<maven.assembly.plugin.version>2.4</maven.assembly.plugin.version>
|
<maven.assembly.plugin.version>2.4</maven.assembly.plugin.version>
|
||||||
<mockito.version>2.25.0</mockito.version>
|
<mockito.version>2.25.0</mockito.version>
|
||||||
|
<jctools.version>2.1.1</jctools.version>
|
||||||
<netty.version>4.1.34.Final</netty.version>
|
<netty.version>4.1.34.Final</netty.version>
|
||||||
<netty-tcnative-version>2.0.22.Final</netty-tcnative-version>
|
<netty-tcnative-version>2.0.22.Final</netty-tcnative-version>
|
||||||
<proton.version>0.33.2</proton.version>
|
<proton.version>0.33.2</proton.version>
|
||||||
|
@ -493,6 +494,12 @@
|
||||||
<!-- License: Apache 2.0 -->
|
<!-- License: Apache 2.0 -->
|
||||||
</dependency>
|
</dependency>
|
||||||
<!--needed to compile transport jar-->
|
<!--needed to compile transport jar-->
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.jctools</groupId>
|
||||||
|
<artifactId>jctools-core</artifactId>
|
||||||
|
<version>${jctools.version}</version>
|
||||||
|
<!-- License: Apache 2.0 -->
|
||||||
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>io.netty</groupId>
|
<groupId>io.netty</groupId>
|
||||||
<artifactId>netty-all</artifactId>
|
<artifactId>netty-all</artifactId>
|
||||||
|
|
|
@ -27,8 +27,8 @@ import java.rmi.server.RemoteObject;
|
||||||
import java.rmi.server.RemoteRef;
|
import java.rmi.server.RemoteRef;
|
||||||
|
|
||||||
import io.netty.util.internal.PlatformDependent;
|
import io.netty.util.internal.PlatformDependent;
|
||||||
import io.netty.util.internal.shaded.org.jctools.util.UnsafeAccess;
|
|
||||||
import org.apache.activemq.artemis.tests.smoke.common.SmokeTestBase;
|
import org.apache.activemq.artemis.tests.smoke.common.SmokeTestBase;
|
||||||
|
import org.jctools.util.UnsafeAccess;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Assume;
|
import org.junit.Assume;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
|
Loading…
Reference in New Issue