ARTEMIS-3761 Improve page cleanup to remove messages in the middle of the stream as well
Paging only removes files at the beginning of the stream... Say you have paged files 1 through 1000... if all the messages are ack, but one message on file 1 is missing an ack, all the 999 subsequent files would not be removed until all the messages on file 1 is ack. This was working as engineered, but sometimes devs don't have complete control on their app. With this improvement we will now remove messages in the middle of the stream as well. There is also some improvement to how browsing and page work with this
This commit is contained in:
parent
1da68b3024
commit
cfdb710a08
|
@ -280,7 +280,7 @@ public class PrintData extends DBOption {
|
|||
}
|
||||
out.println("******* Page " + pgid);
|
||||
Page page = pgStore.createPage(pgid);
|
||||
page.open();
|
||||
page.open(false);
|
||||
List<PagedMessage> msgs = page.read(sm);
|
||||
page.close(false, false);
|
||||
|
||||
|
|
|
@ -401,7 +401,7 @@ public final class XmlDataExporter extends DBOption {
|
|||
for (int i = 0; i < pageStore.getNumberOfPages(); i++) {
|
||||
ActiveMQServerLogger.LOGGER.debug("Reading page " + pageId);
|
||||
Page page = pageStore.createPage(pageId);
|
||||
page.open();
|
||||
page.open(false);
|
||||
List<PagedMessage> messages = page.read(storageManager);
|
||||
page.close(false, false);
|
||||
|
||||
|
|
|
@ -117,6 +117,8 @@ public interface PagingStore extends ActiveMQComponent, RefCountMessageListener
|
|||
*/
|
||||
Page depage() throws Exception;
|
||||
|
||||
Page removePage(int pageId);
|
||||
|
||||
void forceAnotherPage() throws Exception;
|
||||
|
||||
Page getCurrentPage();
|
||||
|
|
|
@ -0,0 +1,27 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.artemis.core.paging.cursor;
|
||||
|
||||
// this is to expose PageSubscriptionImpl::PageCursorInfo
|
||||
public interface ConsumedPage {
|
||||
|
||||
long getPageId();
|
||||
|
||||
boolean isDone();
|
||||
|
||||
}
|
|
@ -17,6 +17,7 @@
|
|||
package org.apache.activemq.artemis.core.paging.cursor;
|
||||
|
||||
import java.util.function.BooleanSupplier;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.ToIntFunction;
|
||||
|
||||
import org.apache.activemq.artemis.core.paging.PagedMessage;
|
||||
|
@ -149,6 +150,8 @@ public interface PageSubscription {
|
|||
*/
|
||||
boolean isComplete(long page);
|
||||
|
||||
void forEachConsumedPage(Consumer<ConsumedPage> pageCleaner);
|
||||
|
||||
/**
|
||||
* wait all the scheduled runnables to finish their current execution
|
||||
*/
|
||||
|
|
|
@ -47,14 +47,9 @@ import org.apache.activemq.artemis.utils.SoftValueLongObjectHashMap;
|
|||
import org.apache.activemq.artemis.utils.ThreadDumpUtil;
|
||||
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
|
||||
import org.apache.activemq.artemis.utils.collections.ConcurrentLongHashMap;
|
||||
import org.apache.activemq.artemis.utils.collections.LongHashSet;
|
||||
import org.jboss.logging.Logger;
|
||||
|
||||
/**
|
||||
* A PageProviderIMpl
|
||||
*
|
||||
* TODO: this may be moved entirely into PagingStore as there's an one-to-one relationship here
|
||||
* However I want to keep this isolated as much as possible during development
|
||||
*/
|
||||
public class PageCursorProviderImpl implements PageCursorProvider {
|
||||
|
||||
|
||||
|
@ -119,7 +114,7 @@ public class PageCursorProviderImpl implements PageCursorProvider {
|
|||
@Override
|
||||
public synchronized PageSubscription createSubscription(long cursorID, Filter filter, boolean persistent) {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(this.pagingStore.getAddress() + " creating subscription " + cursorID + " with filter " + filter, new Exception("trace"));
|
||||
logger.trace(this.pagingStore.getAddress() + " creating subscription " + cursorID + " with filter " + filter);
|
||||
}
|
||||
|
||||
if (activeCursors.containsKey(cursorID)) {
|
||||
|
@ -228,7 +223,7 @@ public class PageCursorProviderImpl implements PageCursorProvider {
|
|||
pageId, pagingStore.getAddress(), elapsedMillis);
|
||||
}
|
||||
}
|
||||
page.open();
|
||||
page.open(false);
|
||||
final long startedReadPage = System.nanoTime();
|
||||
List<PagedMessage> pgdMessages = page.read(storageManager);
|
||||
final long elapsedReadPage = System.nanoTime() - startedReadPage;
|
||||
|
@ -356,9 +351,6 @@ public class PageCursorProviderImpl implements PageCursorProvider {
|
|||
@Override
|
||||
public void scheduleCleanup() {
|
||||
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("scheduling cleanup", new Exception("trace"));
|
||||
}
|
||||
if (!cleanupEnabled || scheduledCleanup.intValue() > 2) {
|
||||
// Scheduled cleanup was already scheduled before.. never mind!
|
||||
// or we have cleanup disabled
|
||||
|
@ -423,9 +415,8 @@ public class PageCursorProviderImpl implements PageCursorProvider {
|
|||
@Override
|
||||
public void cleanup() {
|
||||
|
||||
logger.tracef("performing page cleanup %s", this);
|
||||
|
||||
ArrayList<Page> depagedPages = new ArrayList<>();
|
||||
LongHashSet depagedPagesSet = new LongHashSet();
|
||||
|
||||
// This read lock is required
|
||||
// because in case of a replicated configuration
|
||||
|
@ -445,7 +436,7 @@ public class PageCursorProviderImpl implements PageCursorProvider {
|
|||
return;
|
||||
}
|
||||
|
||||
logger.tracef("%s locked", this);
|
||||
logger.tracef(">>>> Cleanup %s", this.pagingStore.getAddress());
|
||||
|
||||
synchronized (this) {
|
||||
try {
|
||||
|
@ -460,36 +451,16 @@ public class PageCursorProviderImpl implements PageCursorProvider {
|
|||
ArrayList<PageSubscription> cursorList = cloneSubscriptions();
|
||||
|
||||
long minPage = checkMinPage(cursorList);
|
||||
final long firstPage = pagingStore.getFirstPage();
|
||||
deliverIfNecessary(cursorList, minPage);
|
||||
|
||||
logger.debugf("Asserting cleanup for address %s, firstPage=%d", pagingStore.getAddress(), minPage);
|
||||
logger.tracef("firstPage=%s, minPage=%s, currentWritingPage=%s", firstPage, minPage, pagingStore.getCurrentWritingPage());
|
||||
|
||||
// if the current page is being written...
|
||||
// on that case we need to move to verify it in a different way
|
||||
if (minPage == pagingStore.getCurrentWritingPage() && pagingStore.getCurrentPage().getNumberOfMessages() > 0) {
|
||||
boolean complete = checkPageCompletion(cursorList, minPage);
|
||||
// First we cleanup regular streaming, at the beginning of set of files
|
||||
cleanupRegularStream(depagedPages, depagedPagesSet, cursorList, minPage, firstPage);
|
||||
|
||||
if (!pagingStore.isStarted()) {
|
||||
return;
|
||||
}
|
||||
|
||||
// All the pages on the cursor are complete.. so we will cleanup everything and store a bookmark
|
||||
if (complete) {
|
||||
|
||||
cleanupComplete(cursorList);
|
||||
}
|
||||
}
|
||||
|
||||
for (long i = pagingStore.getFirstPage(); i <= minPage; i++) {
|
||||
if (!checkPageCompletion(cursorList, i)) {
|
||||
break;
|
||||
}
|
||||
Page page = pagingStore.depage();
|
||||
if (page == null) {
|
||||
break;
|
||||
}
|
||||
depagedPages.add(page);
|
||||
}
|
||||
// Then we do some check on eventual pages that can be already removed but they are away from the streaming
|
||||
cleanupMiddleStream(depagedPages, depagedPagesSet, cursorList, minPage, firstPage);
|
||||
|
||||
if (pagingStore.getNumberOfPages() == 0 || pagingStore.getNumberOfPages() == 1 && pagingStore.getCurrentPage().getNumberOfMessages() == 0) {
|
||||
pagingStore.stopPaging();
|
||||
|
@ -503,6 +474,7 @@ public class PageCursorProviderImpl implements PageCursorProvider {
|
|||
logger.warn(ex.getMessage(), ex);
|
||||
return;
|
||||
} finally {
|
||||
logger.tracef("<<<< Cleanup end on %s", pagingStore.getAddress());
|
||||
pagingStore.unlock();
|
||||
}
|
||||
}
|
||||
|
@ -511,6 +483,102 @@ public class PageCursorProviderImpl implements PageCursorProvider {
|
|||
|
||||
}
|
||||
|
||||
/**
|
||||
* This cleanup process will calculate the min page for every cursor
|
||||
* and then we remove the pages based on that.
|
||||
* if we knew ahead all the queues belonging to every page we could remove this process.
|
||||
* @param depagedPages
|
||||
* @param depagedPagesSet
|
||||
* @param cursorList
|
||||
* @param minPage
|
||||
* @param firstPage
|
||||
* @throws Exception
|
||||
*/
|
||||
private void cleanupRegularStream(ArrayList<Page> depagedPages,
|
||||
LongHashSet depagedPagesSet,
|
||||
ArrayList<PageSubscription> cursorList,
|
||||
long minPage,
|
||||
long firstPage) throws Exception {
|
||||
// if the current page is being written...
|
||||
// on that case we need to move to verify it in a different way
|
||||
Page currentPage = pagingStore.getCurrentPage();
|
||||
if (minPage == pagingStore.getCurrentWritingPage() && currentPage != null && currentPage.getNumberOfMessages() > 0) {
|
||||
boolean complete = checkPageCompletion(cursorList, minPage);
|
||||
|
||||
// All the pages on the cursor are complete.. so we will cleanup everything and store a bookmark
|
||||
if (complete) {
|
||||
cleanupComplete(cursorList);
|
||||
}
|
||||
}
|
||||
|
||||
for (long i = firstPage; i <= minPage; i++) {
|
||||
if (!checkPageCompletion(cursorList, i)) {
|
||||
break;
|
||||
}
|
||||
Page page = pagingStore.depage();
|
||||
if (page == null) {
|
||||
break;
|
||||
}
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Depaging page " + page.getPageId());
|
||||
}
|
||||
depagedPagesSet.add(page.getPageId());
|
||||
depagedPages.add(page);
|
||||
}
|
||||
}
|
||||
|
||||
/** The regular depaging will take care of removing messages in a regular streaming.
|
||||
*
|
||||
* if we had a list of all the cursors that belong to each page, this cleanup would be enough on every situation (with some adjustment to currentPages)
|
||||
* So, this routing is to optimize removing pages when all the acks are made on every cursor.
|
||||
* We still need regular depaging on a streamed manner as it will check the min page for all the existent cursors.
|
||||
* */
|
||||
private void cleanupMiddleStream(ArrayList<Page> depagedPages,
|
||||
LongHashSet depagedPagesSet,
|
||||
ArrayList<PageSubscription> cursorList,
|
||||
long minPage,
|
||||
long firstPage) {
|
||||
|
||||
final long currentPageId = pagingStore.getCurrentWritingPage();
|
||||
LongObjectHashMap<AtomicInteger> counts = new LongObjectHashMap<>();
|
||||
|
||||
int subscriptions = cursorList.size();
|
||||
|
||||
cursorList.forEach(sub -> {
|
||||
sub.forEachConsumedPage(consumedPage -> {
|
||||
if (consumedPage.isDone()) {
|
||||
AtomicInteger count = counts.get(consumedPage.getPageId());
|
||||
if (count == null) {
|
||||
count = new AtomicInteger(0);
|
||||
counts.put(consumedPage.getPageId(), count);
|
||||
}
|
||||
count.incrementAndGet();
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
counts.forEach((pageID, counter) -> {
|
||||
try {
|
||||
// This check is to make sure we are not removing what has been already removed by depaging
|
||||
if (pageID > minPage && pageID > firstPage && pageID != currentPageId) {
|
||||
if (counter.get() >= subscriptions) {
|
||||
if (!depagedPagesSet.contains(pageID.longValue())) {
|
||||
Page page = pagingStore.removePage(pageID.intValue());
|
||||
logger.debugf("Removing page %s", pageID);
|
||||
if (page != null) {
|
||||
depagedPages.add(page);
|
||||
depagedPagesSet.add(page.getPageId());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
logger.warn("Error while Issuing cleanupMiddlePages with " + pageID + ", counter = " + counter, e);
|
||||
depagedPages.forEach(p -> logger.warn("page " + p));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// Protected as a way to inject testing
|
||||
protected void cleanupComplete(ArrayList<PageSubscription> cursorList) throws Exception {
|
||||
if (logger.isDebugEnabled()) {
|
||||
|
@ -550,7 +618,7 @@ public class PageCursorProviderImpl implements PageCursorProvider {
|
|||
|
||||
List<PagedMessage> pgdMessagesList = null;
|
||||
try {
|
||||
depagedPage.open();
|
||||
depagedPage.open(false);
|
||||
pgdMessagesList = depagedPage.read(storageManager, true);
|
||||
} finally {
|
||||
try {
|
||||
|
@ -583,23 +651,29 @@ public class PageCursorProviderImpl implements PageCursorProvider {
|
|||
|
||||
}
|
||||
|
||||
private boolean checkPageCompletion(ArrayList<PageSubscription> cursorList, long minPage) {
|
||||
private boolean checkPageCompletion(ArrayList<PageSubscription> cursorList, long minPage) throws Exception {
|
||||
|
||||
logger.tracef("checkPageCompletion(%d)", minPage);
|
||||
|
||||
boolean complete = true;
|
||||
|
||||
Page page = pagingStore.createPage((int)minPage);
|
||||
if (!page.getFile().exists()) {
|
||||
logger.tracef("store %s did not have an existing file, considering it a complete file then", pagingStore.getAddress());
|
||||
return true;
|
||||
}
|
||||
|
||||
for (PageSubscription cursor : cursorList) {
|
||||
if (!cursor.isComplete(minPage)) {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Cursor " + cursor + " was considered incomplete at pageNr=" + minPage);
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Cursor " + cursor + " was considered incomplete at pageNr=" + minPage);
|
||||
}
|
||||
|
||||
complete = false;
|
||||
break;
|
||||
} else {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Cursor " + cursor + " was considered **complete** at pageNr=" + minPage);
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Cursor " + cursor + " was considered **complete** at pageNr=" + minPage);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -662,8 +736,8 @@ public class PageCursorProviderImpl implements PageCursorProvider {
|
|||
|
||||
for (PageSubscription cursor : cursorList) {
|
||||
long firstPage = cursor.getFirstPage();
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug(this.pagingStore.getAddress() + " has a cursor " + cursor + " with first page=" + firstPage);
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(this.pagingStore.getAddress() + " has a cursor " + cursor + " with first page=" + firstPage);
|
||||
}
|
||||
|
||||
// the cursor will return -1 if the cursor is empty
|
||||
|
@ -672,9 +746,7 @@ public class PageCursorProviderImpl implements PageCursorProvider {
|
|||
}
|
||||
}
|
||||
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug(this.pagingStore.getAddress() + " has minPage=" + minPage);
|
||||
}
|
||||
logger.tracef("checkMinPage(%s) will have minPage=%s", pagingStore.getAddress(), minPage);
|
||||
|
||||
return minPage;
|
||||
|
||||
|
|
|
@ -33,6 +33,7 @@ import java.util.concurrent.TimeUnit;
|
|||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.function.BooleanSupplier;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.ToIntFunction;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||
|
@ -42,6 +43,7 @@ import org.apache.activemq.artemis.core.io.IOCallback;
|
|||
import org.apache.activemq.artemis.core.paging.PageTransactionInfo;
|
||||
import org.apache.activemq.artemis.core.paging.PagedMessage;
|
||||
import org.apache.activemq.artemis.core.paging.PagingStore;
|
||||
import org.apache.activemq.artemis.core.paging.cursor.ConsumedPage;
|
||||
import org.apache.activemq.artemis.core.paging.cursor.PageCache;
|
||||
import org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider;
|
||||
import org.apache.activemq.artemis.core.paging.cursor.PageIterator;
|
||||
|
@ -823,6 +825,14 @@ public final class PageSubscriptionImpl implements PageSubscription {
|
|||
cursorInfo.decrementPendingTX();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void forEachConsumedPage(Consumer<ConsumedPage> pageCleaner) {
|
||||
synchronized (consumedPages) {
|
||||
consumedPages.values().forEach(pageCleaner);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public boolean isComplete(long page) {
|
||||
logger.tracef("%s isComplete %d", this, page);
|
||||
|
@ -967,6 +977,7 @@ public final class PageSubscriptionImpl implements PageSubscription {
|
|||
|
||||
@Override
|
||||
public void onDeletePage(Page deletedPage) throws Exception {
|
||||
logger.tracef("removing page %s", deletedPage);
|
||||
PageCursorInfo info;
|
||||
synchronized (consumedPages) {
|
||||
info = consumedPages.remove(Long.valueOf(deletedPage.getPageId()));
|
||||
|
@ -1154,7 +1165,7 @@ public final class PageSubscriptionImpl implements PageSubscription {
|
|||
* This instance will be released as soon as the entire page is consumed, releasing the memory at
|
||||
* that point The ref counts are increased also when a message is ignored for any reason.
|
||||
*/
|
||||
public final class PageCursorInfo {
|
||||
public final class PageCursorInfo implements ConsumedPage {
|
||||
|
||||
// Number of messages existent on this page
|
||||
private int numberOfMessages;
|
||||
|
@ -1254,6 +1265,7 @@ public final class PageSubscriptionImpl implements PageSubscription {
|
|||
return completePage;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isDone() {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(PageSubscriptionImpl.this + "::PageCursorInfo(" + pageId + ")::isDone checking with completePage!=null->" + (completePage != null) + " getNumberOfMessages=" + getNumberOfMessages() + ", confirmed=" + confirmed.get() + " and pendingTX=" + pendingTX.get());
|
||||
|
@ -1273,6 +1285,7 @@ public final class PageSubscriptionImpl implements PageSubscription {
|
|||
/**
|
||||
* @return the pageId
|
||||
*/
|
||||
@Override
|
||||
public long getPageId() {
|
||||
return pageId;
|
||||
}
|
||||
|
|
|
@ -120,6 +120,9 @@ public final class Page implements Comparable<Page> {
|
|||
assert startMessageNumber <= targetMessageNumber;
|
||||
|
||||
if (!file.isOpen()) {
|
||||
if (!file.exists()) {
|
||||
return null;
|
||||
}
|
||||
throw ActiveMQMessageBundle.BUNDLE.invalidPageIO();
|
||||
}
|
||||
final int fileSize = (int) file.size();
|
||||
|
@ -256,6 +259,9 @@ public final class Page implements Comparable<Page> {
|
|||
}
|
||||
|
||||
if (!file.isOpen()) {
|
||||
if (!file.exists()) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
throw ActiveMQMessageBundle.BUNDLE.invalidPageIO();
|
||||
}
|
||||
|
||||
|
@ -482,12 +488,14 @@ public final class Page implements Comparable<Page> {
|
|||
file.sync();
|
||||
}
|
||||
|
||||
public void open() throws Exception {
|
||||
if (!file.isOpen()) {
|
||||
public void open(boolean createFile) throws Exception {
|
||||
if (!file.isOpen() && (createFile || file.exists())) {
|
||||
file.open();
|
||||
}
|
||||
size.set((int) file.size());
|
||||
file.position(0);
|
||||
if (file.isOpen()) {
|
||||
size.set((int) file.size());
|
||||
file.position(0);
|
||||
}
|
||||
}
|
||||
|
||||
public void close(boolean sendEvent) throws Exception {
|
||||
|
|
|
@ -497,7 +497,7 @@ public class PagingStoreImpl implements PagingStore {
|
|||
|
||||
protected void reloadLivePage(int pageId) throws Exception {
|
||||
Page page = createPage(pageId);
|
||||
page.open();
|
||||
page.open(true);
|
||||
|
||||
final List<PagedMessage> messages = page.read(storageManager);
|
||||
|
||||
|
@ -568,15 +568,20 @@ public class PagingStoreImpl implements PagingStore {
|
|||
return false;
|
||||
}
|
||||
|
||||
if (currentPage == null) {
|
||||
try {
|
||||
try {
|
||||
if (currentPage == null) {
|
||||
openNewPage();
|
||||
} catch (Exception e) {
|
||||
// If not possible to starting page due to an IO error, we will just consider it non paging.
|
||||
// This shouldn't happen anyway
|
||||
ActiveMQServerLogger.LOGGER.pageStoreStartIOError(e);
|
||||
return false;
|
||||
} else {
|
||||
if (!currentPage.getFile().exists() || !currentPage.getFile().isOpen()) {
|
||||
currentPage.getFile().open();
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
// If not possible to starting page due to an IO error, we will just consider it non paging.
|
||||
// This shouldn't happen anyway
|
||||
ActiveMQServerLogger.LOGGER.pageStoreStartIOError(e);
|
||||
storageManager.criticalError(e);
|
||||
return false;
|
||||
}
|
||||
paging = true;
|
||||
ActiveMQServerLogger.LOGGER.pageStoreStart(storeName, getPageInfo());
|
||||
|
@ -619,16 +624,14 @@ public class PagingStoreImpl implements PagingStore {
|
|||
|
||||
Page page = new Page(storeName, storageManager, factory, file, pageNumber);
|
||||
|
||||
// To create the file
|
||||
file.open();
|
||||
|
||||
file.position(0);
|
||||
|
||||
file.close(false, false);
|
||||
|
||||
return page;
|
||||
}
|
||||
|
||||
protected SequentialFileFactory getFileFactory() throws Exception {
|
||||
checkFileFactory();
|
||||
return fileFactory;
|
||||
}
|
||||
|
||||
private SequentialFileFactory checkFileFactory() throws Exception {
|
||||
SequentialFileFactory factory = fileFactory;
|
||||
if (factory == null) {
|
||||
|
@ -643,6 +646,60 @@ public class PagingStoreImpl implements PagingStore {
|
|||
openNewPage();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Returns a Page out of the Page System without reading it.
|
||||
* <p>
|
||||
* The method calling this method will remove the page and will start reading it outside of any
|
||||
* locks. This method could also replace the current file by a new file, and that process is done
|
||||
* through acquiring a writeLock on currentPageLock.
|
||||
* </p>
|
||||
* <p>
|
||||
* Observation: This method is used internally as part of the regular depage process, but
|
||||
* externally is used only on tests, and that's why this method is part of the Testable Interface
|
||||
* </p>
|
||||
*/
|
||||
@Override
|
||||
public Page removePage(int pageId) {
|
||||
try {
|
||||
lock.writeLock().lock(); // Make sure no checks are done on currentPage while we are depaging
|
||||
try {
|
||||
if (!running) {
|
||||
return null;
|
||||
}
|
||||
|
||||
if (currentPageId == pageId) {
|
||||
logger.debugf("Ignoring remove(%d) as this is the current writing page", pageId);
|
||||
// we don't deal with the current page, we let that one to be cleared from the regular depage
|
||||
return null;
|
||||
}
|
||||
|
||||
Page page = createPage(pageId);
|
||||
|
||||
if (page.getFile().exists()) {
|
||||
// we only decrement numberOfPages if the file existed
|
||||
// it could have been removed by a previous delete
|
||||
// on this case we just need to ignore this and move on
|
||||
numberOfPages--;
|
||||
}
|
||||
|
||||
assert numberOfPages >= 0 : "numberOfPages should never be negative. on removePage(" + pageId + "). numberOfPages=" + numberOfPages;
|
||||
|
||||
return page;
|
||||
} finally {
|
||||
lock.writeLock().unlock();
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
logger.warn(e.getMessage(), e);
|
||||
if (e instanceof AssertionError) {
|
||||
// this will give a chance to callers log an AssertionError if assertion flag is enabled
|
||||
throw (AssertionError)e;
|
||||
}
|
||||
storageManager.criticalError(e);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a Page out of the Page System without reading it.
|
||||
* <p>
|
||||
|
@ -666,14 +723,15 @@ public class PagingStoreImpl implements PagingStore {
|
|||
if (numberOfPages == 0) {
|
||||
return null;
|
||||
} else {
|
||||
numberOfPages--;
|
||||
|
||||
final Page returnPage;
|
||||
|
||||
numberOfPages--;
|
||||
|
||||
// We are out of old pages, all that is left now is the current page.
|
||||
// On that case we need to replace it by a new empty page, and return the current page immediately
|
||||
if (currentPageId == firstPageId) {
|
||||
firstPageId = Integer.MAX_VALUE;
|
||||
logger.tracef("Setting up firstPageID=MAX_VALUE");
|
||||
|
||||
if (currentPage == null) {
|
||||
// sanity check... it shouldn't happen!
|
||||
|
@ -687,7 +745,7 @@ public class PagingStoreImpl implements PagingStore {
|
|||
// The current page is empty... which means we reached the end of the pages
|
||||
if (returnPage.getNumberOfMessages() == 0) {
|
||||
stopPaging();
|
||||
returnPage.open();
|
||||
returnPage.open(true);
|
||||
returnPage.delete(null);
|
||||
|
||||
// This will trigger this address to exit the page mode,
|
||||
|
@ -697,12 +755,21 @@ public class PagingStoreImpl implements PagingStore {
|
|||
// We need to create a new page, as we can't lock the address until we finish depaging.
|
||||
openNewPage();
|
||||
}
|
||||
|
||||
return returnPage;
|
||||
} else {
|
||||
logger.tracef("firstPageId++ = beforeIncrement=%d", firstPageId);
|
||||
returnPage = createPage(firstPageId++);
|
||||
}
|
||||
|
||||
if (!returnPage.getFile().exists()) {
|
||||
// if the file does not exist, we will just increment back to where it was before
|
||||
numberOfPages++;
|
||||
}
|
||||
|
||||
// we make this assertion after checking the file existed before.
|
||||
// this could be eventually negative for a short period of time
|
||||
// but after compensating the non existent file the assertion should still hold true
|
||||
assert numberOfPages >= 0 : "numberOfPages should never be negative. on depage(). currentPageId=" + currentPageId + ", firstPageId=" + firstPageId + "";
|
||||
|
||||
return returnPage;
|
||||
}
|
||||
} finally {
|
||||
|
@ -1140,7 +1207,7 @@ public class PagingStoreImpl implements PagingStore {
|
|||
final int newPageId = currentPageId + 1;
|
||||
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("new pageNr=" + newPageId, new Exception("trace"));
|
||||
logger.trace("new pageNr=" + newPageId);
|
||||
}
|
||||
|
||||
final Page oldPage = currentPage;
|
||||
|
@ -1160,11 +1227,12 @@ public class PagingStoreImpl implements PagingStore {
|
|||
|
||||
currentPageSize = 0;
|
||||
|
||||
newPage.open();
|
||||
newPage.open(true);
|
||||
|
||||
currentPageId = newPageId;
|
||||
|
||||
if (newPageId < firstPageId) {
|
||||
logger.debugf("open new page, setting firstPageId = %s, it was %s before", newPageId, firstPageId);
|
||||
firstPageId = newPageId;
|
||||
}
|
||||
} finally {
|
||||
|
|
|
@ -858,7 +858,7 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
|
|||
|
||||
if (page == null) {
|
||||
page = pageManager.getPageStore(storeName).createPage(pageId);
|
||||
page.open();
|
||||
page.open(true);
|
||||
map.put(pageId, page);
|
||||
}
|
||||
|
||||
|
|
|
@ -381,7 +381,7 @@ public class PostOfficeJournalLoader implements JournalLoader {
|
|||
if (store != null && store.checkPageFileExists(pageId.intValue())) {
|
||||
// on this case we need to recalculate the records
|
||||
Page pg = store.createPage(pageId.intValue());
|
||||
pg.open();
|
||||
pg.open(true);
|
||||
|
||||
List<PagedMessage> pgMessages = pg.read(storageManager);
|
||||
pg.close(false, false);
|
||||
|
|
|
@ -169,7 +169,7 @@ public class PageReaderTest extends ActiveMQTestBase {
|
|||
SequentialFileFactory factory = new NIOSequentialFileFactory(getTestDirfile(), 1);
|
||||
SequentialFile file = factory.createSequentialFile("00010.page");
|
||||
Page page = new Page(new SimpleString("something"), new NullStorageManager(), factory, file, 10);
|
||||
page.open();
|
||||
page.open(true);
|
||||
SimpleString simpleDestination = new SimpleString("Test");
|
||||
final int msgSize = 100;
|
||||
final byte[] content = new byte[msgSize];
|
||||
|
@ -205,7 +205,7 @@ public class PageReaderTest extends ActiveMQTestBase {
|
|||
SequentialFile file = factory.createSequentialFile("00010.page");
|
||||
file.open();
|
||||
Page page = new Page(new SimpleString("something"), new NullStorageManager(), factory, file, 10);
|
||||
page.open();
|
||||
page.open(true);
|
||||
page.read(new NullStorageManager());
|
||||
PageReader pageReader = new PageReader(page, page.getNumberOfMessages());
|
||||
return pageReader;
|
||||
|
|
|
@ -0,0 +1,27 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.artemis.core.paging.impl;
|
||||
|
||||
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
|
||||
import org.apache.activemq.artemis.core.paging.PagingStore;
|
||||
|
||||
public class PagingStoreTestAccessor {
|
||||
public static SequentialFileFactory getFileFactory(PagingStore store) throws Exception {
|
||||
return ((PagingStoreImpl) store).getFileFactory();
|
||||
}
|
||||
}
|
|
@ -29,6 +29,7 @@ import org.apache.activemq.artemis.core.server.Queue;
|
|||
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||
import org.apache.activemq.artemis.utils.Wait;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
|
@ -49,9 +50,8 @@ public class ExpireTestOnRestartTest extends ActiveMQTestBase {
|
|||
server.start();
|
||||
}
|
||||
|
||||
// The biggest problem on this test was the exceptions that happened. I couldn't find any wrong state beyond the exceptions
|
||||
@Test
|
||||
public void testRestartWithExpire() throws Exception {
|
||||
public void testRestartWithExpireAndPaging() throws Exception {
|
||||
int NUMBER_OF_EXPIRED_MESSAGES = 1000;
|
||||
ServerLocator locator = createInVMNonHALocator();
|
||||
locator.setBlockOnDurableSend(false);
|
||||
|
@ -65,14 +65,16 @@ public class ExpireTestOnRestartTest extends ActiveMQTestBase {
|
|||
for (int i = 0; i < 10; i++) {
|
||||
ClientMessage message = session.createMessage(true);
|
||||
message.getBodyBuffer().writeBytes(new byte[1024 * 10]);
|
||||
message.putStringProperty("expiryStatus", "not Expiring");
|
||||
prod.send(message);
|
||||
}
|
||||
|
||||
for (int i = 0; i < NUMBER_OF_EXPIRED_MESSAGES; i++) {
|
||||
ClientMessage message = session.createMessage(true);
|
||||
message.putIntProperty("i", i);
|
||||
message.putStringProperty("expiryStatus", "Will Expire");
|
||||
message.getBodyBuffer().writeBytes(new byte[1024 * 10]);
|
||||
message.setExpiration(System.currentTimeMillis() + 5000);
|
||||
message.setExpiration(System.currentTimeMillis() + 1000);
|
||||
prod.send(message);
|
||||
}
|
||||
|
||||
|
@ -83,7 +85,7 @@ public class ExpireTestOnRestartTest extends ActiveMQTestBase {
|
|||
server.stop();
|
||||
server.getConfiguration().setMessageExpiryScanPeriod(1);
|
||||
|
||||
Thread.sleep(5500); // enough time for expiration of the messages
|
||||
Thread.sleep(1500); // enough time for expiration of the messages
|
||||
|
||||
server.start();
|
||||
|
||||
|
@ -99,15 +101,12 @@ public class ExpireTestOnRestartTest extends ActiveMQTestBase {
|
|||
assertNotNull(msg);
|
||||
msg.acknowledge();
|
||||
}
|
||||
session.commit();
|
||||
|
||||
assertNull(cons.receiveImmediate());
|
||||
cons.close();
|
||||
|
||||
long timeout = System.currentTimeMillis() + 60000;
|
||||
while (queue.getPageSubscription().getPagingStore().isPaging() && timeout > System.currentTimeMillis()) {
|
||||
Thread.sleep(1);
|
||||
}
|
||||
assertFalse(queue.getPageSubscription().getPagingStore().isPaging());
|
||||
Wait.assertFalse(queue.getPagingStore()::isPaging, 5000, 100);
|
||||
|
||||
cons = session.createConsumer("exp");
|
||||
for (int i = 0; i < NUMBER_OF_EXPIRED_MESSAGES; i++) {
|
||||
|
|
|
@ -59,8 +59,8 @@ import org.junit.runners.Parameterized;
|
|||
@RunWith(Parameterized.class)
|
||||
public class GlobalPagingTest extends PagingTest {
|
||||
|
||||
public GlobalPagingTest(StoreConfiguration.StoreType storeType, boolean mapped) {
|
||||
super(storeType, mapped);
|
||||
public GlobalPagingTest(StoreConfiguration.StoreType storeType) {
|
||||
super(storeType);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -24,8 +24,10 @@ import javax.jms.Session;
|
|||
import javax.transaction.xa.XAResource;
|
||||
import javax.transaction.xa.Xid;
|
||||
import java.io.File;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.io.PrintStream;
|
||||
import java.io.PrintWriter;
|
||||
import java.lang.management.ManagementFactory;
|
||||
import java.nio.ByteBuffer;
|
||||
|
@ -61,6 +63,7 @@ import org.apache.activemq.artemis.api.core.client.MessageHandler;
|
|||
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
||||
import org.apache.activemq.artemis.api.core.management.QueueControl;
|
||||
import org.apache.activemq.artemis.api.core.management.ResourceNames;
|
||||
import org.apache.activemq.artemis.cli.commands.tools.PrintData;
|
||||
import org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal;
|
||||
import org.apache.activemq.artemis.core.config.Configuration;
|
||||
import org.apache.activemq.artemis.core.config.DivertConfiguration;
|
||||
|
@ -87,6 +90,7 @@ import org.apache.activemq.artemis.core.paging.impl.PageTransactionInfoImpl;
|
|||
import org.apache.activemq.artemis.core.paging.impl.PagingStoreFactoryDatabase;
|
||||
import org.apache.activemq.artemis.core.paging.impl.PagingStoreFactoryNIO;
|
||||
import org.apache.activemq.artemis.core.paging.impl.PagingStoreImpl;
|
||||
import org.apache.activemq.artemis.core.paging.impl.PagingStoreTestAccessor;
|
||||
import org.apache.activemq.artemis.core.persistence.OperationContext;
|
||||
import org.apache.activemq.artemis.core.persistence.StorageManager;
|
||||
import org.apache.activemq.artemis.core.persistence.impl.journal.AckDescribe;
|
||||
|
@ -95,7 +99,6 @@ import org.apache.activemq.artemis.core.persistence.impl.journal.DescribeJournal
|
|||
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
|
||||
import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.server.JournalType;
|
||||
import org.apache.activemq.artemis.core.server.Queue;
|
||||
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
|
||||
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
|
||||
|
@ -143,20 +146,17 @@ public class PagingTest extends ActiveMQTestBase {
|
|||
|
||||
protected static final int PAGE_SIZE = 10 * 1024;
|
||||
|
||||
protected final boolean mapped;
|
||||
|
||||
protected final StoreConfiguration.StoreType storeType;
|
||||
|
||||
static final SimpleString ADDRESS = new SimpleString("SimpleAddress");
|
||||
|
||||
public PagingTest(StoreConfiguration.StoreType storeType, boolean mapped) {
|
||||
public PagingTest(StoreConfiguration.StoreType storeType) {
|
||||
this.storeType = storeType;
|
||||
this.mapped = mapped;
|
||||
}
|
||||
|
||||
@Parameterized.Parameters(name = "storeType={0}, mapped={1}")
|
||||
@Parameterized.Parameters(name = "storeType={0}")
|
||||
public static Collection<Object[]> data() {
|
||||
Object[][] params = new Object[][]{{StoreConfiguration.StoreType.FILE, false}, {StoreConfiguration.StoreType.FILE, true}, {StoreConfiguration.StoreType.DATABASE, false}};
|
||||
Object[][] params = new Object[][]{{StoreConfiguration.StoreType.FILE}, {StoreConfiguration.StoreType.DATABASE}};
|
||||
return Arrays.asList(params);
|
||||
}
|
||||
|
||||
|
@ -644,8 +644,17 @@ public class PagingTest extends ActiveMQTestBase {
|
|||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testPageCleanupWithInvalidDataTruncated() throws Exception {
|
||||
testPageCleanupWithInvalidData(true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPageCleanupWithInvalidData() throws Exception {
|
||||
testPageCleanupWithInvalidData(false);
|
||||
}
|
||||
|
||||
public void testPageCleanupWithInvalidData(boolean truncated) throws Exception {
|
||||
Assume.assumeTrue(storeType != StoreConfiguration.StoreType.DATABASE);
|
||||
clearDataRecreateServerDirs();
|
||||
|
||||
|
@ -727,18 +736,23 @@ public class PagingTest extends ActiveMQTestBase {
|
|||
File file = new File(folder, fileName);
|
||||
file.delete();
|
||||
file.createNewFile();
|
||||
if (!truncated) {
|
||||
FileOutputStream fileOutputStream = new FileOutputStream(file);
|
||||
fileOutputStream.write(new byte[10]);
|
||||
fileOutputStream.close();
|
||||
}
|
||||
}
|
||||
sf.close();
|
||||
|
||||
server.getStorageManager().getMessageJournal().scheduleCompactAndBlock(5000);
|
||||
|
||||
Page page4 = queue.getPagingStore().createPage(4);
|
||||
page4.open();
|
||||
page4.open(true);
|
||||
List<PagedMessage> messagesRead = page4.read(server.getStorageManager());
|
||||
Assert.assertEquals(10, messagesRead.size());
|
||||
page4.close(false);
|
||||
page4.delete(null);
|
||||
page4.open();
|
||||
page4.open(true);
|
||||
for (int i = 0; i < 9; i++) {
|
||||
page4.write(messagesRead.get(i)); // this will make message 29 disappear
|
||||
}
|
||||
|
@ -774,6 +788,226 @@ public class PagingTest extends ActiveMQTestBase {
|
|||
Wait.assertFalse(queue.getPagingStore()::isPaging);
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testCleanupMiddlePageSingleQueue() throws Exception {
|
||||
clearDataRecreateServerDirs();
|
||||
|
||||
Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
|
||||
|
||||
String address = "testCleanupMiddlePage";
|
||||
|
||||
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
|
||||
addServer(server);
|
||||
|
||||
server.start();
|
||||
|
||||
|
||||
server.addAddressInfo(new AddressInfo(address).addRoutingType(RoutingType.MULTICAST));
|
||||
server.createQueue(new QueueConfiguration(address + "_1").setAddress(address).setRoutingType(RoutingType.MULTICAST).setDurable(true).setFilterString("page<>5"));
|
||||
server.createQueue(new QueueConfiguration(address + "_2").setAddress(address).setRoutingType(RoutingType.MULTICAST).setDurable(true).setFilterString("page=5"));
|
||||
|
||||
final int numberOfMessages = 100;
|
||||
|
||||
locator = createInVMNonHALocator().setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true);
|
||||
|
||||
sf = createSessionFactory(locator);
|
||||
|
||||
ClientSession session = sf.createSession(false, true, true);
|
||||
|
||||
ClientProducer producer = session.createProducer(address);
|
||||
|
||||
ClientMessage message = null;
|
||||
|
||||
byte[] body = new byte[10];
|
||||
|
||||
ByteBuffer bb = ByteBuffer.wrap(body);
|
||||
|
||||
for (int j = 1; j <= 10; j++) {
|
||||
bb.put(getSamplebyte(j));
|
||||
}
|
||||
|
||||
Queue queue = server.locateQueue(address + "_1");
|
||||
queue.getPagingStore().startPaging();
|
||||
|
||||
int page = 1;
|
||||
|
||||
for (int i = 0; i < numberOfMessages; i++) {
|
||||
if (i % 10 == 0 && i > 0) {
|
||||
queue.getPagingStore().forceAnotherPage();
|
||||
page++;
|
||||
}
|
||||
message = session.createMessage(true);
|
||||
|
||||
ActiveMQBuffer bodyLocal = message.getBodyBuffer();
|
||||
|
||||
bodyLocal.writeBytes(body);
|
||||
|
||||
message.putIntProperty("i", i);
|
||||
message.putIntProperty("page", page);
|
||||
|
||||
producer.send(message);
|
||||
}
|
||||
|
||||
queue.getPagingStore().getCursorProvider().disableCleanup();
|
||||
|
||||
ClientConsumer consumer = session.createConsumer(address + "_1", "page=3");
|
||||
session.start();
|
||||
|
||||
for (int i = 0; i < 10; i++) {
|
||||
ClientMessage msgRec = consumer.receive(1000);
|
||||
Assert.assertNotNull(msgRec);
|
||||
logger.debugf("received i=%s page=%s",msgRec.getIntProperty("i"), msgRec.getIntProperty("page"));
|
||||
msgRec.acknowledge();
|
||||
}
|
||||
session.commit();
|
||||
|
||||
Assert.assertEquals(10, queue.getPagingStore().getNumberOfPages());
|
||||
|
||||
queue.getPagingStore().getCursorProvider().cleanup();
|
||||
|
||||
Assert.assertEquals(9, queue.getPagingStore().getNumberOfPages());
|
||||
|
||||
{
|
||||
SequentialFileFactory factory = PagingStoreTestAccessor.getFileFactory(queue.getPagingStore());
|
||||
|
||||
Wait.assertEquals(9, () -> factory.listFiles("page").size(), 5000, 100);
|
||||
|
||||
if (storeType != StoreConfiguration.StoreType.DATABASE) {
|
||||
PrintData.printData(server.getConfiguration().getBindingsLocation(), server.getConfiguration().getJournalLocation(), server.getConfiguration().getPagingLocation(), new PrintStream(OutputStream.nullOutputStream()), false, false, true, true, -1);
|
||||
Assert.assertEquals("PrintData is recreating empty files", 9, factory.listFiles("page").size());
|
||||
}
|
||||
}
|
||||
|
||||
session.close();
|
||||
|
||||
|
||||
server.stop();
|
||||
|
||||
server.start();
|
||||
|
||||
queue = server.locateQueue(address + "_1");
|
||||
queue.getPagingStore().startPaging();
|
||||
|
||||
{
|
||||
SequentialFileFactory factory = PagingStoreTestAccessor.getFileFactory(queue.getPagingStore());
|
||||
// Making sure restarting the server should not recreate a file
|
||||
Assert.assertEquals(9, factory.listFiles("page").size());
|
||||
|
||||
sf = createSessionFactory(locator);
|
||||
|
||||
session = sf.createSession(false, true, true);
|
||||
|
||||
ClientConsumer browser = session.createConsumer(address + "_1", true);
|
||||
session.start();
|
||||
|
||||
for (int i = 0; i < 80; i++) {
|
||||
ClientMessage msgRec = browser.receive(1000);
|
||||
Assert.assertNotNull(msgRec);
|
||||
|
||||
logger.debugf("received i=%s page=%s", msgRec.getIntProperty("i"), msgRec.getIntProperty("page"));
|
||||
|
||||
int pageProperty = msgRec.getIntProperty("page");
|
||||
Assert.assertTrue(pageProperty != 5 && pageProperty != 3);
|
||||
msgRec.acknowledge();
|
||||
}
|
||||
Assert.assertNull(browser.receiveImmediate());
|
||||
session.commit();
|
||||
|
||||
// Browsing should not recreate a file
|
||||
Assert.assertEquals(9, factory.listFiles("page").size());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
// Send messages in page, consume them
|
||||
// send again...
|
||||
// consume again
|
||||
// just once...
|
||||
// this test is similar to .testPageAndDepageRapidly however I needed a simpler version
|
||||
// easier to debug an issue during one development... I decided to then keep the simpler test
|
||||
@Test
|
||||
public void testSimpleResume() throws Exception {
|
||||
clearDataRecreateServerDirs();
|
||||
|
||||
Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
|
||||
|
||||
String address = "testSimpleResume";
|
||||
|
||||
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
|
||||
|
||||
server.start();
|
||||
|
||||
|
||||
server.addAddressInfo(new AddressInfo(address).addRoutingType(RoutingType.ANYCAST));
|
||||
server.createQueue(new QueueConfiguration(address).setAddress(address).setRoutingType(RoutingType.ANYCAST).setDurable(true));
|
||||
|
||||
final int numberOfMessages = 100;
|
||||
|
||||
locator = createInVMNonHALocator().setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true);
|
||||
|
||||
sf = createSessionFactory(locator);
|
||||
|
||||
ClientSession session = sf.createSession(false, true, true);
|
||||
|
||||
ClientProducer producer = session.createProducer(address);
|
||||
|
||||
ClientMessage message = null;
|
||||
|
||||
byte[] body = new byte[10];
|
||||
|
||||
ByteBuffer bb = ByteBuffer.wrap(body);
|
||||
|
||||
for (int j = 1; j <= 10; j++) {
|
||||
bb.put(getSamplebyte(j));
|
||||
}
|
||||
|
||||
Queue queue = server.locateQueue(address);
|
||||
|
||||
|
||||
for (int repeat = 0; repeat < 5; repeat++) {
|
||||
queue.getPagingStore().startPaging();
|
||||
int page = 1;
|
||||
|
||||
for (int i = 0; i < numberOfMessages; i++) {
|
||||
if (i % 10 == 0 && i > 0) {
|
||||
queue.getPagingStore().forceAnotherPage();
|
||||
page++;
|
||||
}
|
||||
message = session.createMessage(true);
|
||||
|
||||
ActiveMQBuffer bodyLocal = message.getBodyBuffer();
|
||||
|
||||
bodyLocal.writeBytes(body);
|
||||
|
||||
message.putIntProperty("i", i);
|
||||
message.putIntProperty("page", page);
|
||||
|
||||
producer.send(message);
|
||||
}
|
||||
|
||||
ClientConsumer consumer = session.createConsumer(address);
|
||||
session.start();
|
||||
|
||||
for (int i = 0; i < numberOfMessages; i++) {
|
||||
ClientMessage msgRec = consumer.receive(1000);
|
||||
Assert.assertNotNull(msgRec);
|
||||
logger.debug("msgRec, i=" + msgRec.getIntProperty("i") + ", page=" + msgRec.getIntProperty("page"));
|
||||
msgRec.acknowledge();
|
||||
}
|
||||
session.commit();
|
||||
|
||||
consumer.close();
|
||||
queue.getPagingStore().getCursorProvider().cleanup();
|
||||
|
||||
Wait.assertFalse(queue.getPagingStore()::isPaging, 5000, 100);
|
||||
}
|
||||
|
||||
|
||||
server.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testQueueRetryMessages() throws Exception {
|
||||
clearDataRecreateServerDirs();
|
||||
|
@ -1661,7 +1895,7 @@ public class PagingTest extends ActiveMQTestBase {
|
|||
session.start();
|
||||
|
||||
for (int i = 0; i < numberOfMessages; i++) {
|
||||
message = cons.receive(5000);
|
||||
message = cons.receive(1000);
|
||||
assertNotNull(message);
|
||||
message.acknowledge();
|
||||
if (i % 1000 == 0) {
|
||||
|
@ -2736,13 +2970,7 @@ public class PagingTest extends ActiveMQTestBase {
|
|||
PagingStore store = server.getPagingManager().getPageStore(ADDRESS);
|
||||
store.getCursorProvider().cleanup();
|
||||
|
||||
long timeout = System.currentTimeMillis() + 5000;
|
||||
while (store.isPaging() && timeout > System.currentTimeMillis()) {
|
||||
Thread.sleep(100);
|
||||
}
|
||||
|
||||
// It's async, so need to wait a bit for it happening
|
||||
assertFalse(server.getPagingManager().getPageStore(ADDRESS).isPaging());
|
||||
Wait.assertFalse(server.getPagingManager().getPageStore(ADDRESS)::isPaging, 5000, 100);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -7198,8 +7426,6 @@ public class PagingTest extends ActiveMQTestBase {
|
|||
Configuration configuration = super.createDefaultConfig(serverID, netty);
|
||||
if (storeType == StoreConfiguration.StoreType.DATABASE) {
|
||||
setDBStoreType(configuration);
|
||||
} else if (mapped) {
|
||||
configuration.setJournalType(JournalType.MAPPED);
|
||||
}
|
||||
return configuration;
|
||||
}
|
||||
|
|
|
@ -266,6 +266,11 @@ public class PersistMultiThreadTest extends ActiveMQTestBase {
|
|||
|
||||
}
|
||||
|
||||
@Override
|
||||
public Page removePage(int pageId) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public SimpleString getAddress() {
|
||||
return null;
|
||||
|
|
|
@ -125,7 +125,7 @@ public class PageTest extends ActiveMQTestBase {
|
|||
|
||||
Assert.assertEquals(10, page.getPageId());
|
||||
|
||||
page.open();
|
||||
page.open(true);
|
||||
|
||||
Assert.assertEquals(1, factory.listFiles("page").size());
|
||||
|
||||
|
@ -180,7 +180,7 @@ public class PageTest extends ActiveMQTestBase {
|
|||
|
||||
Assert.assertEquals(10, page.getPageId());
|
||||
|
||||
page.open();
|
||||
page.open(true);
|
||||
|
||||
Assert.assertEquals(1, factory.listFiles("page").size());
|
||||
|
||||
|
|
|
@ -74,7 +74,7 @@ public class PagingManagerImplTest extends ActiveMQTestBase {
|
|||
|
||||
Page page = store.depage();
|
||||
|
||||
page.open();
|
||||
page.open(true);
|
||||
|
||||
List<PagedMessage> msgs = page.read(new NullStorageManager());
|
||||
|
||||
|
|
|
@ -45,6 +45,9 @@ import org.apache.activemq.artemis.core.paging.PagingManager;
|
|||
import org.apache.activemq.artemis.core.paging.PagingStore;
|
||||
import org.apache.activemq.artemis.core.paging.PagingStoreFactory;
|
||||
import org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider;
|
||||
import org.apache.activemq.artemis.core.paging.cursor.PageIterator;
|
||||
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
|
||||
import org.apache.activemq.artemis.core.paging.cursor.PagedReference;
|
||||
import org.apache.activemq.artemis.core.paging.cursor.impl.PageCursorProviderImpl;
|
||||
import org.apache.activemq.artemis.core.paging.impl.Page;
|
||||
import org.apache.activemq.artemis.core.paging.impl.PageTransactionInfoImpl;
|
||||
|
@ -60,6 +63,7 @@ import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
|
|||
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessagePersister;
|
||||
import org.apache.activemq.artemis.spi.core.protocol.MessagePersister;
|
||||
import org.apache.activemq.artemis.tests.unit.core.journal.impl.fakes.FakeSequentialFileFactory;
|
||||
import org.apache.activemq.artemis.tests.unit.core.postoffice.impl.FakeQueue;
|
||||
import org.apache.activemq.artemis.tests.unit.util.FakePagingManager;
|
||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
|
||||
|
@ -225,7 +229,7 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
|
|||
|
||||
Page page = storeImpl.depage();
|
||||
|
||||
page.open();
|
||||
page.open(true);
|
||||
|
||||
List<PagedMessage> msg = page.read(new NullStorageManager());
|
||||
|
||||
|
@ -251,9 +255,196 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
|
|||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRemoveInTheMiddle() throws Exception {
|
||||
SequentialFileFactory factory = new NIOSequentialFileFactory(getTestDirfile(), 1);
|
||||
|
||||
SimpleString destination = new SimpleString("test");
|
||||
|
||||
PagingStoreFactory storeFactory = new FakeStoreFactory(factory);
|
||||
|
||||
PagingStoreImpl storeImpl = new PagingStoreImpl(PagingStoreImplTest.destinationTestName, null, 100, createMockManager(), createStorageManagerMock(), factory, storeFactory, PagingStoreImplTest.destinationTestName, new AddressSettings().setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE), getExecutorFactory().getExecutor(), true);
|
||||
PageSubscription subscription = storeImpl.getCursorProvider().createSubscription(1, null, true);
|
||||
FakeQueue fakeQueue = new FakeQueue(destination, 1).setDurable(true).setPageSubscription(subscription);
|
||||
|
||||
storeImpl.getCursorProvider().disableCleanup();
|
||||
storeImpl.start();
|
||||
|
||||
for (int repeat = 0; repeat < 5; repeat++) {
|
||||
|
||||
storeImpl.startPaging();
|
||||
|
||||
int numMessages = 100;
|
||||
{
|
||||
int page = 1;
|
||||
|
||||
for (int i = 0; i < numMessages; i++) {
|
||||
ActiveMQBuffer buffer = createRandomBuffer(i + 1L, 10);
|
||||
|
||||
Message msg = createMessage(i, storeImpl, destination, buffer);
|
||||
msg.putIntProperty("i", i);
|
||||
msg.putIntProperty("page", page);
|
||||
final RoutingContextImpl ctx = new RoutingContextImpl(null);
|
||||
ctx.addQueue(fakeQueue.getName(), fakeQueue);
|
||||
Assert.assertTrue(storeImpl.page(msg, ctx.getTransaction(), ctx.getContextListing(storeImpl.getStoreName()), lock));
|
||||
if (i > 0 && i % 10 == 0) {
|
||||
storeImpl.forceAnotherPage();
|
||||
page++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Assert.assertEquals(numMessages / 10, storeImpl.getNumberOfPages());
|
||||
|
||||
PageIterator iterator = subscription.iterator();
|
||||
for (int i = 0; i < numMessages; i++) {
|
||||
Assert.assertTrue(iterator.hasNext());
|
||||
PagedReference reference = iterator.next();
|
||||
Assert.assertNotNull(reference);
|
||||
Assert.assertEquals(i, reference.getPagedMessage().getMessage().getIntProperty("i").intValue());
|
||||
int pageOnMsg = reference.getMessage().getIntProperty("page").intValue();
|
||||
if (pageOnMsg > 2 && pageOnMsg < 10) {
|
||||
subscription.ack(reference);
|
||||
}
|
||||
}
|
||||
iterator.close();
|
||||
|
||||
storeImpl.getCursorProvider().cleanup();
|
||||
|
||||
Assert.assertTrue(storeImpl.isPaging());
|
||||
|
||||
int messagesRead = 0;
|
||||
iterator = subscription.iterator();
|
||||
while (iterator.hasNext()) {
|
||||
PagedReference reference = iterator.next();
|
||||
if (reference == null) {
|
||||
break;
|
||||
}
|
||||
messagesRead++;
|
||||
int pageOnMsg = reference.getMessage().getIntProperty("page");
|
||||
Assert.assertTrue(pageOnMsg <= 2 || pageOnMsg >= 10);
|
||||
|
||||
subscription.ack(reference);
|
||||
}
|
||||
iterator.close();
|
||||
|
||||
Assert.assertEquals(30, messagesRead);
|
||||
|
||||
storeImpl.getCursorProvider().cleanup();
|
||||
|
||||
Assert.assertFalse(storeImpl.isPaging());
|
||||
|
||||
Assert.assertEquals(1, storeImpl.getNumberOfPages());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testRemoveCurrentPage() throws Exception {
|
||||
SequentialFileFactory factory = new NIOSequentialFileFactory(getTestDirfile(), 1);
|
||||
|
||||
SimpleString destination = new SimpleString("test");
|
||||
|
||||
PagingStoreFactory storeFactory = new FakeStoreFactory(factory);
|
||||
|
||||
PagingStoreImpl storeImpl = new PagingStoreImpl(PagingStoreImplTest.destinationTestName, null, 100, createMockManager(), createStorageManagerMock(), factory, storeFactory, PagingStoreImplTest.destinationTestName, new AddressSettings().setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE), getExecutorFactory().getExecutor(), true);
|
||||
PageSubscription subscription = storeImpl.getCursorProvider().createSubscription(1, null, true);
|
||||
FakeQueue fakeQueue = new FakeQueue(destination, 1).setDurable(true).setPageSubscription(subscription);
|
||||
|
||||
storeImpl.getCursorProvider().disableCleanup();
|
||||
storeImpl.start();
|
||||
|
||||
for (int repeat = 0; repeat < 5; repeat++) {
|
||||
|
||||
System.out.println("#repeat " + repeat);
|
||||
|
||||
storeImpl.startPaging();
|
||||
|
||||
int numMessages = 97;
|
||||
{
|
||||
int page = 1;
|
||||
|
||||
for (int i = 1; i <= numMessages; i++) {
|
||||
ActiveMQBuffer buffer = createRandomBuffer(i + 1L, 10);
|
||||
|
||||
Message msg = createMessage(i, storeImpl, destination, buffer);
|
||||
msg.putIntProperty("i", i);
|
||||
msg.putIntProperty("page", page);
|
||||
final RoutingContextImpl ctx = new RoutingContextImpl(null);
|
||||
ctx.addQueue(fakeQueue.getName(), fakeQueue);
|
||||
Assert.assertTrue(storeImpl.page(msg, ctx.getTransaction(), ctx.getContextListing(storeImpl.getStoreName()), lock));
|
||||
if (i > 0 && i % 10 == 0) {
|
||||
storeImpl.forceAnotherPage();
|
||||
page++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Assert.assertEquals(10, storeImpl.getNumberOfPages());
|
||||
|
||||
Assert.assertEquals(10, factory.listFiles("page").size());
|
||||
|
||||
int messagesRead = 0;
|
||||
PageIterator iterator = subscription.iterator();
|
||||
for (int i = 1; i <= numMessages; i++) {
|
||||
Assert.assertTrue(iterator.hasNext());
|
||||
PagedReference reference = iterator.next();
|
||||
Assert.assertNotNull(reference);
|
||||
Assert.assertEquals(i, reference.getPagedMessage().getMessage().getIntProperty("i").intValue());
|
||||
int pageOnMsg = reference.getMessage().getIntProperty("page").intValue();
|
||||
if (pageOnMsg == 10) {
|
||||
messagesRead++;
|
||||
subscription.ack(reference);
|
||||
}
|
||||
}
|
||||
iterator.close();
|
||||
|
||||
Assert.assertEquals(7, messagesRead);
|
||||
|
||||
storeImpl.getCursorProvider().cleanup();
|
||||
|
||||
Assert.assertEquals(10, factory.listFiles("page").size());
|
||||
|
||||
Assert.assertTrue(storeImpl.isPaging());
|
||||
|
||||
storeImpl.forceAnotherPage();
|
||||
|
||||
Assert.assertEquals(11, factory.listFiles("page").size());
|
||||
|
||||
storeImpl.getCursorProvider().cleanup();
|
||||
|
||||
Assert.assertEquals(10, factory.listFiles("page").size());
|
||||
|
||||
Assert.assertEquals(10, storeImpl.getNumberOfPages());
|
||||
|
||||
Assert.assertEquals(11 + 10 * repeat, storeImpl.getCurrentWritingPage());
|
||||
|
||||
messagesRead = 0;
|
||||
iterator = subscription.iterator();
|
||||
while (iterator.hasNext()) {
|
||||
PagedReference reference = iterator.next();
|
||||
if (reference == null) {
|
||||
break;
|
||||
}
|
||||
messagesRead++;
|
||||
int pageOnMsg = reference.getMessage().getIntProperty("page");
|
||||
Assert.assertTrue(pageOnMsg != 10);
|
||||
|
||||
subscription.ack(reference);
|
||||
}
|
||||
iterator.close();
|
||||
|
||||
Assert.assertEquals(90, messagesRead);
|
||||
|
||||
storeImpl.getCursorProvider().cleanup();
|
||||
|
||||
Assert.assertFalse(storeImpl.isPaging());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDepageMultiplePages() throws Exception {
|
||||
SequentialFileFactory factory = new FakeSequentialFileFactory();
|
||||
SequentialFileFactory factory = new NIOSequentialFileFactory(new File(getPageDir()), 1).setDatasync(false);
|
||||
SimpleString destination = new SimpleString("test");
|
||||
|
||||
PagingStoreFactory storeFactory = new FakeStoreFactory(factory);
|
||||
|
@ -297,7 +488,7 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
|
|||
|
||||
log.debug("numberOfPages = " + store.getNumberOfPages());
|
||||
|
||||
page.open();
|
||||
page.open(true);
|
||||
|
||||
List<PagedMessage> msg = page.read(new NullStorageManager());
|
||||
|
||||
|
@ -322,7 +513,7 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
|
|||
|
||||
Page newPage = store.depage();
|
||||
|
||||
newPage.open();
|
||||
newPage.open(true);
|
||||
|
||||
Assert.assertEquals(1, newPage.read(new NullStorageManager()).size());
|
||||
|
||||
|
@ -350,7 +541,7 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
|
|||
|
||||
Page page = store.depage();
|
||||
|
||||
page.open();
|
||||
page.open(true);
|
||||
|
||||
List<PagedMessage> msgs = page.read(new NullStorageManager());
|
||||
|
||||
|
@ -368,7 +559,7 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
|
|||
|
||||
Assert.assertEquals(0, store.getNumberOfPages());
|
||||
|
||||
page.open();
|
||||
page.open(true);
|
||||
page.close(false);
|
||||
|
||||
}
|
||||
|
@ -498,7 +689,7 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
|
|||
final ConcurrentMap<Long, Message> buffers2 = new ConcurrentHashMap<>();
|
||||
|
||||
for (Page page : readPages) {
|
||||
page.open();
|
||||
page.open(true);
|
||||
List<PagedMessage> msgs = page.read(new NullStorageManager());
|
||||
page.close(false, false);
|
||||
|
||||
|
@ -557,7 +748,7 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
|
|||
|
||||
lastPage = page;
|
||||
|
||||
page.open();
|
||||
page.open(true);
|
||||
|
||||
List<PagedMessage> msgs = page.read(new NullStorageManager());
|
||||
|
||||
|
@ -573,7 +764,7 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
|
|||
}
|
||||
}
|
||||
|
||||
lastPage.open();
|
||||
lastPage.open(true);
|
||||
List<PagedMessage> lastMessages = lastPage.read(new NullStorageManager());
|
||||
lastPage.close(false, false);
|
||||
Assert.assertEquals(1, lastMessages.size());
|
||||
|
@ -697,7 +888,7 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
|
|||
while (msgsRead < NUMBER_OF_MESSAGES) {
|
||||
Page page = store.depage();
|
||||
if (page != null) {
|
||||
page.open();
|
||||
page.open(true);
|
||||
List<PagedMessage> messages = page.read(new NullStorageManager());
|
||||
|
||||
for (PagedMessage pgmsg : messages) {
|
||||
|
@ -792,7 +983,7 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
|
|||
while (msgsRead < num1 + num2) {
|
||||
page = storeImpl.depage();
|
||||
assertNotNull("no page after read " + msgsRead + " msg", page);
|
||||
page.open();
|
||||
page.open(true);
|
||||
List<PagedMessage> messages = page.read(new NullStorageManager());
|
||||
|
||||
for (PagedMessage pgmsg : messages) {
|
||||
|
|
|
@ -357,6 +357,8 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
|
|||
|
||||
private final Long id;
|
||||
|
||||
private boolean durable;
|
||||
|
||||
private long messageCount;
|
||||
|
||||
public FakeQueue(final SimpleString name) {
|
||||
|
@ -719,7 +721,12 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
|
|||
@Override
|
||||
public boolean isDurableMessage() {
|
||||
// no-op
|
||||
return false;
|
||||
return durable;
|
||||
}
|
||||
|
||||
public FakeQueue setDurable(boolean durable) {
|
||||
this.durable = durable;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -889,11 +896,12 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
|
|||
|
||||
}
|
||||
|
||||
public void setPageSubscription(PageSubscription sub) {
|
||||
public FakeQueue setPageSubscription(PageSubscription sub) {
|
||||
this.subs = sub;
|
||||
if (subs != null) {
|
||||
sub.setQueue(this);
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
Loading…
Reference in New Issue