diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
index 543dff0070..30f4246b2c 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
@@ -251,6 +251,9 @@ public final class ActiveMQDefaultConfiguration {
// The max number of concurrent reads allowed on paging
private static int DEFAULT_MAX_CONCURRENT_PAGE_IO = 5;
+ // If true the whole page would be read, otherwise just seek and read while getting message
+ private static boolean DEFAULT_READ_WHOLE_PAGE = false;
+
// the directory to store the journal files in
private static String DEFAULT_JOURNAL_DIR = "data/journal";
@@ -843,6 +846,11 @@ public final class ActiveMQDefaultConfiguration {
return DEFAULT_MAX_CONCURRENT_PAGE_IO;
}
+
+ public static boolean isDefaultReadWholePage() {
+ return DEFAULT_READ_WHOLE_PAGE;
+ }
+
/**
* the directory to store the journal files in
*/
diff --git a/artemis-distribution/src/main/assembly/dep.xml b/artemis-distribution/src/main/assembly/dep.xml
index a732656a8b..c87786113a 100644
--- a/artemis-distribution/src/main/assembly/dep.xml
+++ b/artemis-distribution/src/main/assembly/dep.xml
@@ -82,6 +82,7 @@
org.jboss.logmanager:jboss-logmanager
org.jboss.logging:jboss-logging
org.jboss.slf4j:slf4j-jboss-logmanager
+ org.jctools:jctools-core
io.netty:netty-all
io.netty:netty-tcnative-boringssl-static
org.apache.qpid:proton-j
diff --git a/artemis-features/src/main/resources/features.xml b/artemis-features/src/main/resources/features.xml
index 53e8dd7169..eb79e752dc 100644
--- a/artemis-features/src/main/resources/features.xml
+++ b/artemis-features/src/main/resources/features.xml
@@ -64,6 +64,7 @@
mvn:org.apache.commons/commons-configuration2/${commons.config.version}
mvn:org.apache.commons/commons-text/1.6
mvn:org.apache.commons/commons-lang3/${commons.lang.version}
+ mvn:org.jctools/jctools-core/${jctools.version}
diff --git a/artemis-server/pom.xml b/artemis-server/pom.xml
index c8bb63b966..23c44f0dec 100644
--- a/artemis-server/pom.xml
+++ b/artemis-server/pom.xml
@@ -89,6 +89,10 @@
${project.version}
test
+
+ org.jctools
+ jctools-core
+
io.netty
netty-buffer
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
index c3fb9853d6..9b1a75cbc8 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
@@ -587,6 +587,17 @@ public interface Configuration {
*/
Configuration setPageMaxConcurrentIO(int maxIO);
+ /**
+ * Returns whether the whole page is read while getting message after page cache is evicted.
+ * Default value is {@link org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration#DEFAULT_READ_WHOLE_PAGE}.
+ */
+ boolean isReadWholePage();
+
+ /**
+ * Sets whether the whole page is read while getting message after page cache is evicted.
+ */
+ Configuration setReadWholePage(boolean read);
+
/**
* Returns the file system directory used to store journal log.
* Default value is {@link org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration#DEFAULT_JOURNAL_DIR}.
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
index db83e5af86..573ebe3a53 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
@@ -176,6 +176,8 @@ public class ConfigurationImpl implements Configuration, Serializable {
private int maxConcurrentPageIO = ActiveMQDefaultConfiguration.getDefaultMaxConcurrentPageIo();
+ private boolean readWholePage = ActiveMQDefaultConfiguration.isDefaultReadWholePage();
+
protected String largeMessagesDirectory = ActiveMQDefaultConfiguration.getDefaultLargeMessagesDir();
protected String bindingsDirectory = ActiveMQDefaultConfiguration.getDefaultBindingsDirectory();
@@ -811,6 +813,17 @@ public class ConfigurationImpl implements Configuration, Serializable {
return this;
}
+ @Override
+ public boolean isReadWholePage() {
+ return readWholePage;
+ }
+
+ @Override
+ public ConfigurationImpl setReadWholePage(boolean read) {
+ readWholePage = read;
+ return this;
+ }
+
@Override
public File getJournalLocation() {
return subFolder(getJournalDirectory());
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
index 8171210b74..e733465996 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
@@ -568,6 +568,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
config.setPageMaxConcurrentIO(getInteger(e, "page-max-concurrent-io", config.getPageMaxConcurrentIO(), Validators.MINUS_ONE_OR_GT_ZERO));
+ config.setReadWholePage(getBoolean(e, "read-whole-page", config.isReadWholePage()));
+
config.setPagingDirectory(getString(e, "paging-directory", config.getPagingDirectory(), Validators.NOT_NULL_OR_EMPTY));
config.setCreateJournalDir(getBoolean(e, "create-journal-dir", config.isCreateJournalDir()));
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCache.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCache.java
index 20c7888683..646b568c10 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCache.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCache.java
@@ -36,10 +36,10 @@ public interface PageCache extends SoftValueLongObjectHashMap.ValueCache {
boolean isLive();
/**
- * @param messageNumber The order of the message on the page
+ * @param pagePosition page position
* @return
*/
- PagedMessage getMessage(int messageNumber);
+ PagedMessage getMessage(PagePosition pagePosition);
void close();
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagePosition.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagePosition.java
index a9e05376e9..e794f909e2 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagePosition.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagePosition.java
@@ -28,6 +28,8 @@ public interface PagePosition extends Comparable {
int getMessageNr();
+ int getFileOffset();
+
long getPersistentSize();
void setPersistentSize(long persistentSize);
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java
index 9d3fa723c6..6c98c12835 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.paging.cursor.impl;
import org.apache.activemq.artemis.core.paging.PagedMessage;
import org.apache.activemq.artemis.core.paging.cursor.LivePageCache;
+import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.utils.collections.ConcurrentAppendOnlyChunkedList;
import org.jboss.logging.Logger;
@@ -61,8 +62,8 @@ public final class LivePageCacheImpl implements LivePageCache {
}
@Override
- public PagedMessage getMessage(int messageNumber) {
- return messages.get(messageNumber);
+ public PagedMessage getMessage(PagePosition pagePosition) {
+ return messages.get(pagePosition.getMessageNr());
}
@Override
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCacheImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCacheImpl.java
index 3874e4f303..a350ceb0de 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCacheImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCacheImpl.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.paging.cursor.impl;
import org.apache.activemq.artemis.core.paging.PagedMessage;
import org.apache.activemq.artemis.core.paging.cursor.PageCache;
+import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
/**
* The caching associated to a single page.
@@ -43,9 +44,9 @@ class PageCacheImpl implements PageCache {
// Public --------------------------------------------------------
@Override
- public PagedMessage getMessage(final int messageNumber) {
- if (messageNumber < messages.length) {
- return messages[messageNumber];
+ public PagedMessage getMessage(PagePosition pagePosition) {
+ if (pagePosition.getMessageNr() < messages.length) {
+ return messages[pagePosition.getMessageNr()];
} else {
return null;
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
index 631de408dc..8f3987c860 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
@@ -74,6 +74,8 @@ public class PageCursorProviderImpl implements PageCursorProvider {
private final SoftValueLongObjectHashMap softCache;
+ private LongObjectHashMap numberOfMessages = null;
+
private final LongObjectHashMap> inProgressReadPages;
private final ConcurrentLongHashMap activeCursors = new ConcurrentLongHashMap<>();
@@ -90,15 +92,25 @@ public class PageCursorProviderImpl implements PageCursorProvider {
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
-
public PageCursorProviderImpl(final PagingStore pagingStore,
final StorageManager storageManager,
final ArtemisExecutor executor,
final int maxCacheSize) {
+ this(pagingStore, storageManager, executor, maxCacheSize, false);
+ }
+
+ public PageCursorProviderImpl(final PagingStore pagingStore,
+ final StorageManager storageManager,
+ final ArtemisExecutor executor,
+ final int maxCacheSize,
+ final boolean readWholePage) {
this.pagingStore = pagingStore;
this.storageManager = storageManager;
this.executor = executor;
this.softCache = new SoftValueLongObjectHashMap<>(maxCacheSize);
+ if (!readWholePage) {
+ this.numberOfMessages = new LongObjectHashMap<>();
+ }
this.inProgressReadPages = new LongObjectHashMap<>();
}
@@ -133,7 +145,7 @@ public class PageCursorProviderImpl implements PageCursorProvider {
throw new NonExistentPage("Invalid messageNumber passed = " + pos + " on " + cache);
}
- return cache.getMessage(pos.getMessageNr());
+ return cache.getMessage(pos);
}
@Override
@@ -169,6 +181,9 @@ public class PageCursorProviderImpl implements PageCursorProvider {
}
inProgressReadPage = inProgressReadPages.get(pageId);
if (inProgressReadPage == null) {
+ if (numberOfMessages != null && numberOfMessages.containsKey(pageId)) {
+ return new PageReader(pagingStore.createPage((int) pageId), numberOfMessages.get(pageId));
+ }
final CompletableFuture readPage = new CompletableFuture<>();
cache = createPageCache(pageId);
page = pagingStore.createPage((int) pageId);
@@ -203,6 +218,7 @@ public class PageCursorProviderImpl implements PageCursorProvider {
CompletableFuture inProgressReadPage) throws Exception {
logger.tracef("adding pageCache pageNr=%d into cursor = %s", pageId, this.pagingStore.getAddress());
boolean acquiredPageReadPermission = false;
+ int num = -1;
try {
final long startedRequest = System.nanoTime();
while (!acquiredPageReadPermission) {
@@ -221,7 +237,8 @@ public class PageCursorProviderImpl implements PageCursorProvider {
logger.warnf("Page::read for pageNr=%d on cursor %s tooks %d ms to read %d bytes", pageId,
pagingStore.getAddress(), TimeUnit.NANOSECONDS.toMillis(elapsedReadPage), page.getSize());
}
- cache.setMessages(pgdMessages.toArray(new PagedMessage[pgdMessages.size()]));
+ num = pgdMessages.size();
+ cache.setMessages(pgdMessages.toArray(new PagedMessage[num]));
} catch (Throwable t) {
inProgressReadPage.completeExceptionally(t);
synchronized (softCache) {
@@ -243,6 +260,9 @@ public class PageCursorProviderImpl implements PageCursorProvider {
synchronized (softCache) {
inProgressReadPages.remove(pageId);
softCache.put(pageId, cache);
+ if (numberOfMessages != null && num != -1) {
+ numberOfMessages.put(pageId, Integer.valueOf(num));
+ }
}
return cache;
}
@@ -540,7 +560,9 @@ public class PageCursorProviderImpl implements PageCursorProvider {
depagedPage.delete(pgdMessages);
synchronized (softCache) {
- softCache.remove((long) depagedPage.getPageId());
+ long pageId = (long) depagedPage.getPageId();
+ softCache.remove(pageId);
+ numberOfMessages.remove(pageId);
}
onDeletePage(depagedPage);
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PagePositionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PagePositionImpl.java
index 40890cf7ab..50907dbe9d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PagePositionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PagePositionImpl.java
@@ -31,6 +31,8 @@ public class PagePositionImpl implements PagePosition {
*/
private int messageNr;
+ private int fileOffset = -1;
+
/**
* ID used for storage
*/
@@ -45,11 +47,17 @@ public class PagePositionImpl implements PagePosition {
/**
* @param pageNr
* @param messageNr
+ * @param fileOffset
*/
- public PagePositionImpl(long pageNr, int messageNr) {
+ public PagePositionImpl(long pageNr, int messageNr, int fileOffset) {
this();
this.pageNr = pageNr;
this.messageNr = messageNr;
+ this.fileOffset = fileOffset;
+ }
+
+ public PagePositionImpl(long pageNr, int messageNr) {
+ this(pageNr, messageNr, -1);
}
public PagePositionImpl() {
@@ -88,6 +96,11 @@ public class PagePositionImpl implements PagePosition {
return messageNr;
}
+ @Override
+ public int getFileOffset() {
+ return fileOffset;
+ }
+
/**
* @return the persistentSize
*/
@@ -120,7 +133,7 @@ public class PagePositionImpl implements PagePosition {
@Override
public PagePosition nextPage() {
- return new PagePositionImpl(this.pageNr + 1, 0);
+ return new PagePositionImpl(this.pageNr + 1, 0, 0);
}
@Override
@@ -150,7 +163,8 @@ public class PagePositionImpl implements PagePosition {
@Override
public String toString() {
- return "PagePositionImpl [pageNr=" + pageNr + ", messageNr=" + messageNr + ", recordID=" + recordID + "]";
+ return "PagePositionImpl [pageNr=" + pageNr + ", messageNr=" + messageNr + ", recordID=" + recordID +
+ ", fileOffset=" + fileOffset + "]";
}
/**
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageReader.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageReader.java
new file mode 100644
index 0000000000..f518f75892
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageReader.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.paging.cursor.impl;
+
+import org.apache.activemq.artemis.core.paging.PagedMessage;
+import org.apache.activemq.artemis.core.paging.cursor.NonExistentPage;
+import org.apache.activemq.artemis.core.paging.cursor.PageCache;
+import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
+import org.apache.activemq.artemis.core.paging.impl.Page;
+import org.jboss.logging.Logger;
+
+public class PageReader implements PageCache {
+ private static final Logger logger = Logger.getLogger(PageReader.class);
+
+ private final Page page;
+ private final int numberOfMessages;
+ private PagedMessage[] pagedMessages = null;
+
+ public PageReader(Page page, int numberOfMessages) {
+ this.page = page;
+ this.numberOfMessages = numberOfMessages;
+ }
+
+ @Override
+ public long getPageId() {
+ return page.getPageId();
+ }
+
+ @Override
+ public int getNumberOfMessages() {
+ return numberOfMessages;
+ }
+
+ @Override
+ public void setMessages(PagedMessage[] messages) {
+ this.pagedMessages = messages;
+ }
+
+ @Override
+ public synchronized PagedMessage[] getMessages() {
+ if (pagedMessages != null) {
+ return pagedMessages;
+ } else {
+ try {
+ openPage();
+ return page.read().toArray(new PagedMessage[numberOfMessages]);
+ } catch (Exception e) {
+ throw new RuntimeException(e.getMessage(), e);
+ } finally {
+ close();
+ }
+ }
+ }
+
+ @Override
+ public boolean isLive() {
+ return false;
+ }
+
+ /**
+ * @param pagePosition page position
+ * @param throwException if {@code true} exception will be thrown when message number is beyond the page
+ * @param keepOpen if {@code true} page file would keep open after reading message
+ * @return the paged message
+ */
+ public synchronized PagedMessage getMessage(PagePosition pagePosition, boolean throwException, boolean keepOpen) {
+ if (pagePosition.getMessageNr() >= getNumberOfMessages()) {
+ if (throwException) {
+ throw new NonExistentPage("Invalid messageNumber passed = " + pagePosition + " on " + this);
+ }
+ return null;
+ }
+
+ boolean previouslyClosed = true;
+ try {
+ previouslyClosed = openPage();
+ PagedMessage msg;
+ if (pagePosition.getFileOffset() != -1) {
+ msg = page.readMessage(pagePosition.getFileOffset(), pagePosition.getMessageNr(), pagePosition.getMessageNr());
+ } else {
+ if (logger.isTraceEnabled()) {
+ logger.trace("get message from pos " + pagePosition, new Exception("trace get message without file offset"));
+ }
+ msg = page.readMessage(0, 0, pagePosition.getMessageNr());
+ }
+ return msg;
+ } catch (Exception e) {
+ throw new RuntimeException(e.getMessage(), e);
+ } finally {
+ if (!keepOpen && previouslyClosed) {
+ close();
+ }
+ }
+ }
+
+ @Override
+ public synchronized PagedMessage getMessage(PagePosition pagePosition) {
+ return getMessage(pagePosition, false, false);
+ }
+
+ /**
+ * @return true if file was previously closed
+ * @throws Exception
+ */
+ boolean openPage() throws Exception {
+ if (!page.getFile().isOpen()) {
+ page.getFile().open();
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public synchronized void close() {
+ try {
+ page.close(false, false);
+ } catch (Exception e) {
+ logger.warn("Closing page " + page.getPageId() + " occurs exception:", e);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "PageReader::page=" + getPageId() + " numberOfMessages = " + numberOfMessages;
+ }
+}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
index 3aec7ea7c2..7fee465268 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
@@ -58,6 +58,7 @@ import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes;
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
+import org.apache.activemq.artemis.utils.collections.ConcurrentLongHashMap;
import org.jboss.logging.Logger;
public final class PageSubscriptionImpl implements PageSubscription {
@@ -99,6 +100,9 @@ public final class PageSubscriptionImpl implements PageSubscription {
private final AtomicLong deliveredSize = new AtomicLong(0);
+ // Each CursorIterator will record their current PageReader in this map
+ private final ConcurrentLongHashMap pageReaders = new ConcurrentLongHashMap<>();
+
PageSubscriptionImpl(final PageCursorProvider cursorProvider,
final PagingStore pageStore,
final StorageManager store,
@@ -366,7 +370,15 @@ public final class PageSubscriptionImpl implements PageSubscription {
}
private PagedReference getReference(PagePosition pos) {
- return cursorProvider.newReference(pos, cursorProvider.getMessage(pos), this);
+ PagedMessage pagedMessage;
+ PageReader pageReader = pageReaders.get(pos.getPageNr());
+ if (pageReader != null) {
+ pagedMessage = pageReader.getMessage(pos, true, false);
+ } else {
+ pagedMessage = cursorProvider.getMessage(pos);
+ }
+
+ return cursorProvider.newReference(pos, pagedMessage, this);
}
@Override
@@ -379,13 +391,19 @@ public final class PageSubscriptionImpl implements PageSubscription {
return new CursorIterator(browsing);
}
- private PagedReference internalGetNext(final PagePosition pos) {
- PagePosition retPos = pos.nextMessage();
+ private PagedReference internalGetNext(final PagePositionAndFileOffset pos) {
+ PagePosition retPos = pos.nextPagePostion();
PageCache cache = null;
while (retPos.getPageNr() <= pageStore.getCurrentWritingPage()) {
- cache = cursorProvider.getPageCache(retPos.getPageNr());
+ PageReader pageReader = pageReaders.get(retPos.getPageNr());
+ if (pageReader == null) {
+ cache = cursorProvider.getPageCache(retPos.getPageNr());
+ } else {
+ cache = pageReader;
+ }
+
/**
* In following cases, we should move to the next page
* case 1: cache == null means file might be deleted unexpectedly.
@@ -404,7 +422,17 @@ public final class PageSubscriptionImpl implements PageSubscription {
}
if (cache != null) {
- PagedMessage serverMessage = cache.getMessage(retPos.getMessageNr());
+ PagedMessage serverMessage;
+ if (cache instanceof PageReader) {
+ serverMessage = ((PageReader) cache).getMessage(retPos, false, true);
+ PageCache previousPageCache = pageReaders.putIfAbsent(retPos.getPageNr(), (PageReader) cache);
+ if (previousPageCache != null && previousPageCache != cache) {
+ // Maybe other cursor iterators have added page reader, we have to close this one to avoid file leak
+ cache.close();
+ }
+ } else {
+ serverMessage = cache.getMessage(retPos);
+ }
if (serverMessage != null) {
return cursorProvider.newReference(retPos, serverMessage, this);
@@ -415,6 +443,10 @@ public final class PageSubscriptionImpl implements PageSubscription {
private PagePosition moveNextPage(final PagePosition pos) {
PagePosition retPos = pos;
+ PageReader pageReader = pageReaders.remove(pos.getPageNr());
+ if (pageReader != null) {
+ pageReader.close();
+ }
while (true) {
retPos = retPos.nextPage();
synchronized (consumedPages) {
@@ -441,8 +473,8 @@ public final class PageSubscriptionImpl implements PageSubscription {
/**
*
*/
- private synchronized PagePosition getStartPosition() {
- return new PagePositionImpl(pageStore.getFirstPage(), -1);
+ private synchronized PagePositionAndFileOffset getStartPosition() {
+ return new PagePositionAndFileOffset(-1, new PagePositionImpl(pageStore.getFirstPage(), -1));
}
@Override
@@ -585,7 +617,12 @@ public final class PageSubscriptionImpl implements PageSubscription {
@Override
public PagedMessage queryMessage(PagePosition pos) {
- return cursorProvider.getMessage(pos);
+ PageReader pageReader = pageReaders.get(pos.getPageNr());
+ if (pageReader != null) {
+ return pageReader.getMessage(pos, true, false);
+ } else {
+ return cursorProvider.getMessage(pos);
+ }
}
/**
@@ -892,7 +929,7 @@ public final class PageSubscriptionImpl implements PageSubscription {
if (persistentSize < 0) {
//cache.getMessage is potentially expensive depending
//on the current cache size and which message is queried
- size = getPersistentSize(cache.getMessage(position.getMessageNr()));
+ size = getPersistentSize(cache.getMessage(position));
} else {
size = persistentSize;
}
@@ -1209,9 +1246,9 @@ public final class PageSubscriptionImpl implements PageSubscription {
private class CursorIterator implements PageIterator {
- private PagePosition position = null;
+ private PagePositionAndFileOffset position = null;
- private PagePosition lastOperation = null;
+ private PagePositionAndFileOffset lastOperation = null;
private volatile boolean isredelivery = false;
@@ -1234,7 +1271,6 @@ public final class PageSubscriptionImpl implements PageSubscription {
this.browsing = browsing;
}
-
private CursorIterator() {
this.browsing = false;
}
@@ -1284,8 +1320,8 @@ public final class PageSubscriptionImpl implements PageSubscription {
PagedReference message;
- PagePosition lastPosition = position;
- PagePosition tmpPosition = position;
+ PagePositionAndFileOffset lastPosition = position;
+ PagePositionAndFileOffset tmpPosition = position;
do {
synchronized (redeliveries) {
@@ -1310,7 +1346,8 @@ public final class PageSubscriptionImpl implements PageSubscription {
break;
}
- tmpPosition = message.getPosition();
+ int nextFileOffset = message.getPosition().getFileOffset() == -1 ? -1 : message.getPosition().getFileOffset() + message.getPagedMessage().getEncodeSize() + Page.SIZE_RECORD;
+ tmpPosition = new PagePositionAndFileOffset(nextFileOffset, message.getPosition());
boolean valid = true;
boolean ignored = false;
@@ -1361,7 +1398,7 @@ public final class PageSubscriptionImpl implements PageSubscription {
}
}
- position = message.getPosition();
+ position = tmpPosition;
if (valid) {
match = match(message.getMessage());
@@ -1417,6 +1454,13 @@ public final class PageSubscriptionImpl implements PageSubscription {
@Override
public void close() {
+ // When the CursorIterator(especially browse one) is closed, we need to close page they opened
+ if (position != null) {
+ PageReader pageReader = pageReaders.remove(position.pagePosition.getPageNr());
+ if (pageReader != null) {
+ pageReader.close();
+ }
+ }
}
}
@@ -1458,4 +1502,20 @@ public final class PageSubscriptionImpl implements PageSubscription {
return 0;
}
}
+
+ protected static class PagePositionAndFileOffset {
+
+ private final int nextFileOffset;
+ private final PagePosition pagePosition;
+
+ PagePositionAndFileOffset(int nextFileOffset, PagePosition pagePosition) {
+ this.nextFileOffset = nextFileOffset;
+ this.pagePosition = pagePosition;
+ }
+
+ PagePosition nextPagePostion() {
+ int messageNr = pagePosition.getMessageNr();
+ return new PagePositionImpl(pagePosition.getPageNr(), messageNr + 1, messageNr + 1 == 0 ? 0 : nextFileOffset);
+ }
+ }
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java
index 47385d9954..ca6ad9a822 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java
@@ -80,6 +80,12 @@ public final class Page implements Comparable {
*/
private Set pendingCounters;
+ private int lastReadMessageNumber;
+ private ByteBuffer readFileBuffer;
+ private final ByteBuffer headerBuffer = ByteBuffer.allocate(HEADER_SIZE);
+ private ChannelBufferWrapper readFileBufferWrapper;
+ private int readProcessedBytes;
+
public Page(final SimpleString storeName,
final StorageManager storageManager,
final SequentialFileFactory factory,
@@ -90,6 +96,7 @@ public final class Page implements Comparable {
fileFactory = factory;
this.storageManager = storageManager;
this.storeName = storeName;
+ resetReadMessageStatus();
}
public int getPageId() {
@@ -104,6 +111,129 @@ public final class Page implements Comparable {
return pageCache;
}
+ private synchronized void resetReadMessageStatus() {
+ lastReadMessageNumber = -3;
+ readProcessedBytes = 0;
+ }
+
+ public synchronized PagedMessage readMessage(int startOffset,
+ int startMessageNumber,
+ int targetMessageNumber) throws Exception {
+ assert startMessageNumber <= targetMessageNumber;
+
+ if (!file.isOpen()) {
+ throw ActiveMQMessageBundle.BUNDLE.invalidPageIO();
+ }
+ final int fileSize = (int) file.size();
+ try {
+ if (readFileBuffer == null) {
+ readProcessedBytes = startOffset;
+ file.position(readProcessedBytes);
+ readFileBuffer = fileFactory.allocateDirectBuffer(Math.min(fileSize - readProcessedBytes, MIN_CHUNK_SIZE));
+ //the wrapper is reused to avoid unnecessary allocations
+ readFileBufferWrapper = wrapWhole(readFileBuffer);
+ readFileBuffer.limit(0);
+ } else if (lastReadMessageNumber + 1 != targetMessageNumber) {
+ readProcessedBytes = startOffset;
+ file.position(readProcessedBytes);
+ readFileBuffer.limit(0);
+ } else {
+ startMessageNumber = targetMessageNumber;
+ }
+
+ int remainingBytes = fileSize - readProcessedBytes;
+ int currentMessageNumber = startMessageNumber;
+ // First we search forward for the file position of the target number message
+ while (remainingBytes >= MINIMUM_MSG_PERSISTENT_SIZE && currentMessageNumber < targetMessageNumber) {
+ headerBuffer.clear();
+ file.read(headerBuffer);
+ headerBuffer.position(0);
+
+ if (headerBuffer.remaining() >= HEADER_SIZE && headerBuffer.get() == START_BYTE) {
+ final int encodedSize = headerBuffer.getInt();
+ final int nextPosition = readProcessedBytes + HEADER_AND_TRAILER_SIZE + encodedSize;
+ if (nextPosition <= fileSize) {
+ final int endPosition = nextPosition - 1;
+ file.position(endPosition);
+ headerBuffer.rewind();
+ headerBuffer.limit(1);
+ file.read(headerBuffer);
+ headerBuffer.position(0);
+
+ if (headerBuffer.remaining() >= 1 && headerBuffer.get() == END_BYTE) {
+ readProcessedBytes = nextPosition;
+ currentMessageNumber++;
+ } else {
+ markFileAsSuspect(file.getFileName(), readProcessedBytes, currentMessageNumber);
+ break;
+ }
+ } else {
+ markFileAsSuspect(file.getFileName(), readProcessedBytes, currentMessageNumber);
+ break;
+ }
+ } else {
+ markFileAsSuspect(file.getFileName(), readProcessedBytes, currentMessageNumber);
+ break;
+ }
+ remainingBytes = fileSize - readProcessedBytes;
+ }
+
+ // Then we read the target message
+ if (currentMessageNumber == targetMessageNumber && remainingBytes >= MINIMUM_MSG_PERSISTENT_SIZE) {
+ final ByteBuffer oldFileBuffer = readFileBuffer;
+ readFileBuffer = readIntoFileBufferIfNecessary(readFileBuffer, MINIMUM_MSG_PERSISTENT_SIZE, true);
+ //change wrapper if fileBuffer has changed
+ if (readFileBuffer != oldFileBuffer) {
+ readFileBufferWrapper = wrapWhole(readFileBuffer);
+ }
+ final byte startByte = readFileBuffer.get();
+ if (startByte == Page.START_BYTE) {
+ final int encodedSize = readFileBuffer.getInt();
+ final int nextPosition = readProcessedBytes + HEADER_AND_TRAILER_SIZE + encodedSize;
+ if (nextPosition <= fileSize) {
+ final ByteBuffer currentFileBuffer = readFileBuffer;
+ readFileBuffer = readIntoFileBufferIfNecessary(readFileBuffer, encodedSize + 1, true);
+ //change wrapper if fileBuffer has changed
+ if (readFileBuffer != currentFileBuffer) {
+ readFileBufferWrapper = wrapWhole(readFileBuffer);
+ }
+ final int endPosition = readFileBuffer.position() + encodedSize;
+ //this check must be performed upfront decoding
+ if (readFileBuffer.remaining() >= (encodedSize + 1) && readFileBuffer.get(endPosition) == Page.END_BYTE) {
+ final PagedMessageImpl msg = new PagedMessageImpl(storageManager);
+ readFileBufferWrapper.setIndex(readFileBuffer.position(), endPosition);
+ msg.decode(readFileBufferWrapper);
+ readFileBuffer.position(endPosition + 1);
+ assert readFileBuffer.get(endPosition) == Page.END_BYTE : "decoding cannot change end byte";
+ msg.initMessage(storageManager);
+ if (logger.isTraceEnabled()) {
+ logger.tracef("Reading message %s on pageId=%d for address=%s", msg, pageId, storeName);
+ }
+ readProcessedBytes = nextPosition;
+ lastReadMessageNumber = targetMessageNumber;
+ return msg;
+ } else {
+ markFileAsSuspect(file.getFileName(), readProcessedBytes, currentMessageNumber);
+ }
+ } else {
+ markFileAsSuspect(file.getFileName(), readProcessedBytes, currentMessageNumber);
+ }
+ } else {
+ markFileAsSuspect(file.getFileName(), readProcessedBytes, currentMessageNumber);
+ }
+ }
+ } catch (Exception e) {
+ resetReadMessageStatus();
+ throw e;
+ }
+ resetReadMessageStatus();
+ throw new RuntimeException("target message no." + targetMessageNumber + " not found from start offset " + startOffset + " and start message number " + startMessageNumber);
+ }
+
+ public synchronized List read() throws Exception {
+ return read(storageManager);
+ }
+
public synchronized List read(StorageManager storage) throws Exception {
if (logger.isDebugEnabled()) {
logger.debug("reading page " + this.pageId + " on address = " + storeName);
@@ -122,10 +252,17 @@ public final class Page implements Comparable {
return messages;
}
- private ByteBuffer allocateAndReadIntoFileBuffer(ByteBuffer fileBuffer, int requiredBytes) throws Exception {
- final ByteBuffer newFileBuffer = fileFactory.newBuffer(Math.max(requiredBytes, MIN_CHUNK_SIZE));
- newFileBuffer.put(fileBuffer);
- fileFactory.releaseBuffer(fileBuffer);
+ private ByteBuffer allocateAndReadIntoFileBuffer(ByteBuffer fileBuffer, int requiredBytes, boolean direct) throws Exception {
+ ByteBuffer newFileBuffer;
+ if (direct) {
+ newFileBuffer = fileFactory.allocateDirectBuffer(Math.max(requiredBytes, MIN_CHUNK_SIZE));
+ newFileBuffer.put(fileBuffer);
+ fileFactory.releaseDirectBuffer(fileBuffer);
+ } else {
+ newFileBuffer = fileFactory.newBuffer(Math.max(requiredBytes, MIN_CHUNK_SIZE));
+ newFileBuffer.put(fileBuffer);
+ fileFactory.releaseBuffer(fileBuffer);
+ }
fileBuffer = newFileBuffer;
//move the limit to allow reading as much as possible from the file
fileBuffer.limit(fileBuffer.capacity());
@@ -138,7 +275,7 @@ public final class Page implements Comparable {
* It returns a {@link ByteBuffer} that has {@link ByteBuffer#remaining()} bytes >= {@code requiredBytes}
* of valid data from {@link #file}.
*/
- private ByteBuffer readIntoFileBufferIfNecessary(ByteBuffer fileBuffer, int requiredBytes) throws Exception {
+ private ByteBuffer readIntoFileBufferIfNecessary(ByteBuffer fileBuffer, int requiredBytes, boolean direct) throws Exception {
final int remaining = fileBuffer.remaining();
//fileBuffer::remaining is the current size of valid data
final int bytesToBeRead = requiredBytes - remaining;
@@ -162,7 +299,7 @@ public final class Page implements Comparable {
file.read(fileBuffer);
fileBuffer.position(0);
} else {
- fileBuffer = allocateAndReadIntoFileBuffer(fileBuffer, requiredBytes);
+ fileBuffer = allocateAndReadIntoFileBuffer(fileBuffer, requiredBytes, direct);
}
}
return fileBuffer;
@@ -189,6 +326,7 @@ public final class Page implements Comparable {
//sizeOf(START_BYTE) + sizeOf(MESSAGE LENGTH) + sizeOf(END_BYTE)
private static final int HEADER_AND_TRAILER_SIZE = DataConstants.SIZE_INT + 2;
private static final int MINIMUM_MSG_PERSISTENT_SIZE = HEADER_AND_TRAILER_SIZE;
+ private static final int HEADER_SIZE = HEADER_AND_TRAILER_SIZE - 1;
private static final int MIN_CHUNK_SIZE = Env.osPageSize();
private List readFromSequentialFile(StorageManager storage) throws Exception {
@@ -208,7 +346,7 @@ public final class Page implements Comparable {
fileBuffer.limit(0);
do {
final ByteBuffer oldFileBuffer = fileBuffer;
- fileBuffer = readIntoFileBufferIfNecessary(fileBuffer, MINIMUM_MSG_PERSISTENT_SIZE);
+ fileBuffer = readIntoFileBufferIfNecessary(fileBuffer, MINIMUM_MSG_PERSISTENT_SIZE, false);
//change wrapper if fileBuffer has changed
if (fileBuffer != oldFileBuffer) {
fileBufferWrapper = wrapWhole(fileBuffer);
@@ -219,7 +357,7 @@ public final class Page implements Comparable {
final int nextPosition = processedBytes + HEADER_AND_TRAILER_SIZE + encodedSize;
if (nextPosition <= fileSize) {
final ByteBuffer currentFileBuffer = fileBuffer;
- fileBuffer = readIntoFileBufferIfNecessary(fileBuffer, encodedSize + 1);
+ fileBuffer = readIntoFileBufferIfNecessary(fileBuffer, encodedSize + 1, false);
//change wrapper if fileBuffer has changed
if (fileBuffer != currentFileBuffer) {
fileBufferWrapper = wrapWhole(fileBuffer);
@@ -317,6 +455,11 @@ public final class Page implements Comparable {
* While reading the cache we don't need (and shouldn't inform the backup
*/
public synchronized void close(boolean sendEvent, boolean waitSync) throws Exception {
+ if (readFileBuffer != null) {
+ fileFactory.releaseDirectBuffer(readFileBuffer);
+ readFileBuffer = null;
+ }
+
if (sendEvent && storageManager != null) {
storageManager.pageClosed(storeName, pageId);
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryDatabase.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryDatabase.java
index 3bedc9231b..2c6dbdddb1 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryDatabase.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryDatabase.java
@@ -82,6 +82,8 @@ public class PagingStoreFactoryDatabase implements PagingStoreFactory {
private JDBCSequentialFile directoryList;
+ private final boolean readWholePage;
+
@Override
public ScheduledExecutorService getScheduledExecutor() {
return scheduledExecutor;
@@ -105,6 +107,17 @@ public class PagingStoreFactoryDatabase implements PagingStoreFactory {
final ExecutorFactory executorFactory,
final boolean syncNonTransactional,
final IOCriticalErrorListener critialErrorListener) throws Exception {
+ this(dbConf, storageManager, syncTimeout, scheduledExecutor, executorFactory, syncNonTransactional, critialErrorListener, false);
+ }
+
+ public PagingStoreFactoryDatabase(final DatabaseStorageConfiguration dbConf,
+ final StorageManager storageManager,
+ final long syncTimeout,
+ final ScheduledExecutorService scheduledExecutor,
+ final ExecutorFactory executorFactory,
+ final boolean syncNonTransactional,
+ final IOCriticalErrorListener critialErrorListener,
+ final boolean readWholePage) throws Exception {
this.storageManager = storageManager;
this.executorFactory = executorFactory;
this.syncNonTransactional = syncNonTransactional;
@@ -113,6 +126,7 @@ public class PagingStoreFactoryDatabase implements PagingStoreFactory {
this.dbConf = dbConf;
this.criticalErrorListener = critialErrorListener;
this.factoryToTableName = new HashMap<>();
+ this.readWholePage = readWholePage;
start();
}
@@ -160,7 +174,7 @@ public class PagingStoreFactoryDatabase implements PagingStoreFactory {
StorageManager storageManager,
AddressSettings addressSettings,
ArtemisExecutor executor) {
- return new PageCursorProviderImpl(store, storageManager, executor, addressSettings.getPageCacheMaxSize());
+ return new PageCursorProviderImpl(store, storageManager, executor, addressSettings.getPageCacheMaxSize(), readWholePage);
}
@Override
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java
index 364f221b81..0a1119ec4a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java
@@ -76,6 +76,8 @@ public class PagingStoreFactoryNIO implements PagingStoreFactory {
private final IOCriticalErrorListener critialErrorListener;
+ private final boolean readWholePage;
+
public File getDirectory() {
return directory;
}
@@ -111,6 +113,17 @@ public class PagingStoreFactoryNIO implements PagingStoreFactory {
final ExecutorFactory executorFactory,
final boolean syncNonTransactional,
final IOCriticalErrorListener critialErrorListener) {
+ this(storageManager, directory, syncTimeout, scheduledExecutor, executorFactory, syncNonTransactional, critialErrorListener, false);
+ }
+
+ public PagingStoreFactoryNIO(final StorageManager storageManager,
+ final File directory,
+ final long syncTimeout,
+ final ScheduledExecutorService scheduledExecutor,
+ final ExecutorFactory executorFactory,
+ final boolean syncNonTransactional,
+ final IOCriticalErrorListener critialErrorListener,
+ final boolean readWholePage) {
this.storageManager = storageManager;
this.directory = directory;
this.executorFactory = executorFactory;
@@ -118,6 +131,7 @@ public class PagingStoreFactoryNIO implements PagingStoreFactory {
this.scheduledExecutor = scheduledExecutor;
this.syncTimeout = syncTimeout;
this.critialErrorListener = critialErrorListener;
+ this.readWholePage = readWholePage;
}
// Public --------------------------------------------------------
@@ -146,7 +160,7 @@ public class PagingStoreFactoryNIO implements PagingStoreFactory {
StorageManager storageManager,
AddressSettings addressSettings,
ArtemisExecutor executor) {
- return new PageCursorProviderImpl(store, storageManager, executor, addressSettings.getPageCacheMaxSize());
+ return new PageCursorProviderImpl(store, storageManager, executor, addressSettings.getPageCacheMaxSize(), readWholePage);
}
@Override
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index 70a4fbf701..ba2bfdcbbe 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -2571,9 +2571,9 @@ public class ActiveMQServerImpl implements ActiveMQServer {
protected PagingStoreFactory getPagingStoreFactory() throws Exception {
if (configuration.getStoreConfiguration() != null && configuration.getStoreConfiguration().getStoreType() == StoreConfiguration.StoreType.DATABASE) {
DatabaseStorageConfiguration dbConf = (DatabaseStorageConfiguration) configuration.getStoreConfiguration();
- return new PagingStoreFactoryDatabase(dbConf, storageManager, configuration.getJournalBufferTimeout_NIO(), scheduledPool, ioExecutorFactory, false, shutdownOnCriticalIO);
+ return new PagingStoreFactoryDatabase(dbConf, storageManager, configuration.getJournalBufferTimeout_NIO(), scheduledPool, ioExecutorFactory, false, shutdownOnCriticalIO, configuration.isReadWholePage());
}
- return new PagingStoreFactoryNIO(storageManager, configuration.getPagingLocation(), configuration.getJournalBufferTimeout_NIO(), scheduledPool, ioExecutorFactory, configuration.isJournalSyncNonTransactional(), shutdownOnCriticalIO);
+ return new PagingStoreFactoryNIO(storageManager, configuration.getPagingLocation(), configuration.getJournalBufferTimeout_NIO(), scheduledPool, ioExecutorFactory, configuration.isJournalSyncNonTransactional(), shutdownOnCriticalIO, configuration.isReadWholePage());
}
/**
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 998ef8cc4b..d56e4dbe36 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -29,7 +29,6 @@ import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Set;
-import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
@@ -108,6 +107,7 @@ import org.apache.activemq.artemis.utils.collections.TypedProperties;
import org.apache.activemq.artemis.utils.critical.CriticalComponentImpl;
import org.apache.activemq.artemis.utils.critical.EmptyCriticalAnalyzer;
import org.jboss.logging.Logger;
+import org.jctools.queues.MpscUnboundedArrayQueue;
/**
* Implementation of a Queue
@@ -176,7 +176,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
// Messages will first enter intermediateMessageReferences
// Before they are added to messageReferences
// This is to avoid locking the queue on the producer
- private final ConcurrentLinkedQueue intermediateMessageReferences = new ConcurrentLinkedQueue<>();
+ private final MpscUnboundedArrayQueue intermediateMessageReferences = new MpscUnboundedArrayQueue<>(8192);
// This is where messages are stored
private final PriorityLinkedList messageReferences = new PriorityLinkedListImpl<>(QueueImpl.NUM_PRIORITIES);
@@ -209,7 +209,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
private long pauseStatusRecord = -1;
- private static final int MAX_SCHEDULED_RUNNERS = 2;
+ private static final int MAX_SCHEDULED_RUNNERS = 1;
+ private static final int MAX_DEPAGE_NUM = MAX_DELIVERIES_IN_LOOP * MAX_SCHEDULED_RUNNERS;
// We don't ever need more than two DeliverRunner on the executor's list
// that is getting the worse scenario possible when one runner is almost finishing before the second started
@@ -324,13 +325,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
out.println("consumer: " + holder.consumer.debug());
}
- for (MessageReference reference : intermediateMessageReferences) {
- out.print("Intermediate reference:" + reference);
- }
-
- if (intermediateMessageReferences.isEmpty()) {
- out.println("No intermediate references");
- }
+ out.println("Intermediate reference size is " + intermediateMessageReferences.size());
boolean foundRef = false;
@@ -1042,14 +1037,13 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
public void deliverAsync() {
if (scheduledRunners.get() < MAX_SCHEDULED_RUNNERS) {
scheduledRunners.incrementAndGet();
+ checkDepage();
try {
getExecutor().execute(deliverRunner);
} catch (RejectedExecutionException ignored) {
// no-op
scheduledRunners.decrementAndGet();
}
-
- checkDepage();
}
}
@@ -2513,6 +2507,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
logger.debug(this + " doing deliver. messageReferences=" + messageReferences.size());
}
+ scheduledRunners.decrementAndGet();
+
doInternalPoll();
// Either the iterator is empty or the consumer is busy
@@ -2699,7 +2695,14 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
* @return
*/
private boolean needsDepage() {
- return queueMemorySize.get() < pageSubscription.getPagingStore().getMaxSize();
+ return queueMemorySize.get() < pageSubscription.getPagingStore().getMaxSize() &&
+ /**
+ * In most cases, one depage round following by at most MAX_SCHEDULED_RUNNERS deliver round,
+ * thus we just need to read MAX_DELIVERIES_IN_LOOP * MAX_SCHEDULED_RUNNERS messages. If we read too much, the message reference
+ * maybe discarded by gc collector in response to memory demand and we need to read it again at
+ * a great cost when delivering.
+ */
+ intermediateMessageReferences.size() + messageReferences.size() < MAX_DEPAGE_NUM;
}
private SimpleString extractGroupID(MessageReference ref) {
@@ -3634,8 +3637,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.errorDelivering(e);
- } finally {
- scheduledRunners.decrementAndGet();
}
}
}
diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
index 78d5ba44e0..2ade964e48 100644
--- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
+++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
@@ -625,6 +625,14 @@
+
+
+
+ Whether the whole page is read while getting message after page cache is evicted.
+
+
+
+
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java
index 22e577e78c..ab4a7ea2a7 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java
@@ -88,6 +88,7 @@ public class ConfigurationImplTest extends ActiveMQTestBase {
Assert.assertEquals(ActiveMQDefaultConfiguration.getDefaultMemoryMeasureInterval(), conf.getMemoryMeasureInterval());
Assert.assertEquals(conf.getJournalLocation(), conf.getNodeManagerLockLocation());
Assert.assertNull(conf.getJournalDeviceBlockSize());
+ Assert.assertEquals(ActiveMQDefaultConfiguration.isDefaultReadWholePage(), conf.isReadWholePage());
}
@Test
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
index 91f92aa2fe..98711f7a95 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
@@ -125,6 +125,7 @@ public class FileConfigurationTest extends ConfigurationImplTest {
Assert.assertEquals(true, conf.isAmqpUseCoreSubscriptionNaming());
Assert.assertEquals("max concurrent io", 17, conf.getPageMaxConcurrentIO());
+ Assert.assertEquals(true, conf.isReadWholePage());
Assert.assertEquals("somedir2", conf.getJournalDirectory());
Assert.assertEquals(false, conf.isCreateJournalDir());
Assert.assertEquals(JournalType.NIO, conf.getJournalType());
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImplTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImplTest.java
index 4ed38e87f2..5a75241ebe 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImplTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImplTest.java
@@ -74,4 +74,41 @@ public class PageCursorProviderImplTest {
}
}
+ @Test(timeout = 30_000)
+ public void returnPageCacheImplIfEvicted() throws Exception {
+ returnCacheIfEvicted(true);
+ }
+
+ @Test(timeout = 30_000)
+ public void returnPageReaderIfEvicted() throws Exception {
+ returnCacheIfEvicted(false);
+ }
+
+ private void returnCacheIfEvicted(boolean readWholePage) throws Exception {
+ final PagingStore pagingStore = mock(PagingStore.class);
+ final StorageManager storageManager = mock(StorageManager.class);
+ when(storageManager.beforePageRead(anyLong(), any(TimeUnit.class))).thenReturn(true);
+ final int pages = 2;
+ final ArtemisExecutor artemisExecutor = mock(ArtemisExecutor.class);
+ final PageCursorProviderImpl pageCursorProvider = new PageCursorProviderImpl(pagingStore, storageManager, artemisExecutor, 1, readWholePage);
+ when(pagingStore.getCurrentWritingPage()).thenReturn(pages);
+ when(pagingStore.checkPageFileExists(anyInt())).thenReturn(true);
+ final Page firstPage = mock(Page.class);
+ when(firstPage.getPageId()).thenReturn(1);
+ when(pagingStore.createPage(1)).thenReturn(firstPage);
+ final Page secondPage = mock(Page.class);
+ when(secondPage.getPageId()).thenReturn(2);
+ when(pagingStore.createPage(2)).thenReturn(secondPage);
+
+ Assert.assertTrue(pageCursorProvider.getPageCache(1) instanceof PageCacheImpl);
+ Assert.assertTrue(pageCursorProvider.getPageCache(2) instanceof PageCacheImpl);
+ if (readWholePage) {
+ Assert.assertTrue(pageCursorProvider.getPageCache(1) instanceof PageCacheImpl);
+ } else {
+ Assert.assertTrue(pageCursorProvider.getPageCache(1) instanceof PageReader);
+ }
+ Assert.assertEquals(pageCursorProvider.getCacheSize(), 1);
+ Assert.assertTrue(pageCursorProvider.getPageCache(2) instanceof PageCacheImpl);
+ pageCursorProvider.stop();
+ }
}
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageReaderTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageReaderTest.java
new file mode 100644
index 0000000000..138bf5a6e8
--- /dev/null
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageReaderTest.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.paging.cursor.impl;
+
+import org.apache.activemq.artemis.api.core.ICoreMessage;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.io.SequentialFile;
+import org.apache.activemq.artemis.core.io.SequentialFileFactory;
+import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
+import org.apache.activemq.artemis.core.paging.PagedMessage;
+import org.apache.activemq.artemis.core.paging.cursor.NonExistentPage;
+import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
+import org.apache.activemq.artemis.core.paging.impl.Page;
+import org.apache.activemq.artemis.core.paging.impl.PagedMessageImpl;
+import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.apache.activemq.artemis.core.paging.cursor.impl.PageSubscriptionImpl.PagePositionAndFileOffset;
+import static org.apache.activemq.artemis.utils.RandomUtil.randomBoolean;
+
+public class PageReaderTest extends ActiveMQTestBase {
+
+ @Test
+ public void testPageReadMessage() throws Exception {
+ recreateDirectory(getTestDir());
+
+ int num = 50;
+ int[] offsets = createPage(num);
+ PageReader pageReader = getPageReader();
+
+ PagedMessage[] pagedMessages = pageReader.getMessages();
+ assertEquals(pagedMessages.length, num);
+
+ PagedMessage pagedMessage = null;
+ for (int i = 0; i < num; i++) {
+ if (randomBoolean()) {
+ PagePosition pagePosition = new PagePositionImpl(10, i);
+ pagedMessage = pageReader.getMessage(pagePosition);
+ } else {
+ int nextFileOffset = pagedMessage == null ? -1 : offsets[i - 1] + pagedMessage.getEncodeSize() + Page.SIZE_RECORD;
+ PagePositionAndFileOffset startPosition = new PagePositionAndFileOffset(nextFileOffset, new PagePositionImpl(10, i - 1));
+ PagePosition pagePosition = startPosition.nextPagePostion();
+ assertEquals(offsets[i], pagePosition.getFileOffset());
+ pagedMessage = pageReader.getMessage(pagePosition);
+ }
+ assertNotNull(pagedMessage);
+ assertEquals(pagedMessage.getMessage().getMessageID(), i);
+ assertEquals(pagedMessages[i].getMessage().getMessageID(), i);
+ }
+
+ pageReader.close();
+ }
+
+ @Test
+ public void testPageReadMessageBeyondPage() throws Exception {
+ recreateDirectory(getTestDir());
+
+ int num = 10;
+ createPage(num);
+ PageReader pageReader = getPageReader();
+
+ assertNull(pageReader.getMessage(new PagePositionImpl(10, num)));
+ try {
+ pageReader.getMessage(new PagePositionImpl(10, num), true, true);
+ assertFalse("Expect exception since message number is beyond page ", true);
+ } catch (NonExistentPage e) {
+ }
+
+ pageReader.close();
+ }
+
+ @Test
+ public void testPageReadMessageKeepOpen() throws Exception {
+ recreateDirectory(getTestDir());
+
+ int num = 10;
+ createPage(num);
+ PageReader pageReader = getPageReader();
+
+ pageReader.getMessage(new PagePositionImpl(10, 1), true, true);
+ assertFalse("Page file should keep open", pageReader.openPage());
+ pageReader.getMessage(new PagePositionImpl(10, 1), true, false);
+ assertFalse("Page file should preserve previous state", pageReader.openPage());
+
+ pageReader.close();
+ pageReader.getMessage(new PagePositionImpl(10, 1), true, false);
+ assertTrue("Page file should preserve previous state", pageReader.openPage());
+
+ pageReader.close();
+ }
+
+ private int[] createPage(int num) throws Exception {
+ SequentialFileFactory factory = new NIOSequentialFileFactory(getTestDirfile(), 1);
+ SequentialFile file = factory.createSequentialFile("00010.page");
+ Page page = new Page(new SimpleString("something"), new NullStorageManager(), factory, file, 10);
+ page.open();
+ SimpleString simpleDestination = new SimpleString("Test");
+ int[] offsets = new int[num];
+ for (int i = 0; i < num; i++) {
+ ICoreMessage msg = new CoreMessage().setMessageID(i).initBuffer(1024);
+
+ for (int j = 0; j < 100; j++) {
+ msg.getBodyBuffer().writeByte((byte) 'b');
+ }
+
+ msg.setAddress(simpleDestination);
+ offsets[i] = (int)page.getFile().position();
+ page.write(new PagedMessageImpl(msg, new long[0]));
+
+ Assert.assertEquals(i + 1, page.getNumberOfMessages());
+ }
+ page.close(false, false);
+ return offsets;
+ }
+
+ private PageReader getPageReader() throws Exception {
+ SequentialFileFactory factory = new NIOSequentialFileFactory(getTestDirfile(), 1);
+ SequentialFile file = factory.createSequentialFile("00010.page");
+ file.open();
+ Page page = new Page(new SimpleString("something"), new NullStorageManager(), factory, file, 10);
+ page.open();
+ page.read(new NullStorageManager());
+ PageReader pageReader = new PageReader(page, page.getNumberOfMessages());
+ return pageReader;
+ }
+
+}
\ No newline at end of file
diff --git a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
index 98155115d5..c97bcbf64e 100644
--- a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
+++ b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
@@ -342,6 +342,7 @@
somedir
false
17
+ true
somedir2
false
NIO
diff --git a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml
index 4a1ece0ee1..23ab14f303 100644
--- a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml
+++ b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml
@@ -256,6 +256,7 @@
somedir
false
17
+ true
somedir2
false
NIO
diff --git a/artemis-tools/src/test/resources/artemis-configuration.xsd b/artemis-tools/src/test/resources/artemis-configuration.xsd
index 0ea3d1dc96..0a0f0fe9d3 100644
--- a/artemis-tools/src/test/resources/artemis-configuration.xsd
+++ b/artemis-tools/src/test/resources/artemis-configuration.xsd
@@ -617,6 +617,14 @@
+
+
+
+ Whether the whole page is read while getting message after page cache is evicted.
+
+
+
+
diff --git a/docs/user-manual/en/configuration-index.md b/docs/user-manual/en/configuration-index.md
index c18c082d11..3e116ec287 100644
--- a/docs/user-manual/en/configuration-index.md
+++ b/docs/user-manual/en/configuration-index.md
@@ -159,6 +159,7 @@ log-delegate-factory-class-name | **deprecated** the name of the factory class t
name | node name; used in topology notifications if set. | n/a
[password-codec](masking-passwords.md) | the name of the class (and optional configuration properties) used to decode masked passwords. Only valid when `mask-password` is `true`. | n/a
[page-max-concurrent-io](paging.md) | The max number of concurrent reads allowed on paging. | 5
+[read-whole-page](paging.md) | If true the whole page would be read, otherwise just seek and read while getting message. | `false`
[paging-directory](paging.md#configuration)| the directory to store paged messages in. | `data/paging`
[persist-delivery-count-before-delivery](undelivered-messages.md#delivery-count-persistence) | True means that the delivery count is persisted before delivery. False means that this only happens after a message has been cancelled. | `false`
[persistence-enabled](persistence.md#zero-persistence)| true means that the server will use the file based journal for persistence. | `true`
diff --git a/pom.xml b/pom.xml
index 67ed992b5d..f76821ec88 100644
--- a/pom.xml
+++ b/pom.xml
@@ -90,6 +90,7 @@
3.6.13.Final
2.4
2.25.0
+ 2.1.1
4.1.34.Final
2.0.22.Final
0.33.2
@@ -493,6 +494,12 @@
+
+ org.jctools
+ jctools-core
+ ${jctools.version}
+
+
io.netty
netty-all
diff --git a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/jmx/JmxConnectionTest.java b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/jmx/JmxConnectionTest.java
index c9ec140d17..db6ada03c0 100644
--- a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/jmx/JmxConnectionTest.java
+++ b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/jmx/JmxConnectionTest.java
@@ -27,8 +27,8 @@ import java.rmi.server.RemoteObject;
import java.rmi.server.RemoteRef;
import io.netty.util.internal.PlatformDependent;
-import io.netty.util.internal.shaded.org.jctools.util.UnsafeAccess;
import org.apache.activemq.artemis.tests.smoke.common.SmokeTestBase;
+import org.jctools.util.UnsafeAccess;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;