ARTEMIS-4065 Optimize page counters to not use the journal as often

- From now on we will save snapshots of page-counters on the journal (basically for compatibility with previous verions).
  And we will recount the records on startup.

- While the rebuild is being done the value from the previous snapshot is still available with current updates.
This commit is contained in:
Clebert Suconic 2022-10-26 13:30:42 -04:00 committed by clebertsuconic
parent ba2cbddd6b
commit af9bd7b84a
44 changed files with 1479 additions and 373 deletions

View File

@ -279,7 +279,7 @@ public class PrintData extends DBOption {
folder = pgStore.getFolder();
out.println("####################################################################################################");
out.println("Exploring store " + store + " folder = " + folder);
int pgid = (int) pgStore.getFirstPage();
long pgid = pgStore.getFirstPage();
out.println("Number of pages ::" + pgStore.getNumberOfPages() + ", Current writing page ::" + pgStore.getCurrentWritingPage());
for (int pg = 0; pg < pgStore.getNumberOfPages(); pg++) {

View File

@ -2003,5 +2003,8 @@ public interface ActiveMQServerControl {
@Attribute(desc = "Whether the embedded web server is started")
boolean isEmbeddedWebServerStarted();
@Attribute(desc = "Scan all paged destinations to rebuild the page counters")
void rebuildPageCounters() throws Exception;
}

View File

@ -67,6 +67,9 @@ public abstract class AbstractJDBCDriver {
}
public void destroy() throws Exception {
if (logger.isTraceEnabled()) {
logger.trace("dropping {}", sqlProvider.getTableName(), new Exception("trace"));
}
final String dropTableSql = "DROP TABLE " + sqlProvider.getTableName();
try (Connection connection = connectionProvider.getConnection()) {
try {

View File

@ -95,8 +95,10 @@ public class JDBCSequentialFile implements SequentialFile {
try {
return fileFactory.listFiles(extension).contains(filename);
} catch (Exception e) {
logger.warn(e.getMessage(), e);
fileFactory.onIOError(e, "Error checking JDBC file exists.", this);
logger.debug(e.getMessage(), e);
// this shouldn't throw a critical IO Error
// as if the destination does not exists (ot table store removed), the table will not exist and
// we may get a SQL Exception
return false;
}
}
@ -114,7 +116,9 @@ public class JDBCSequentialFile implements SequentialFile {
return true;
} catch (SQLException e) {
isLoaded.set(false);
fileFactory.onIOError(e, "Error attempting to open JDBC file.", this);
// should not throw exceptions, as we drop the table on queue.destroy.
// storage.exists could be called for non existing pages during async cleanup and they are
// just supposed to return false
}
return false;
}
@ -158,7 +162,9 @@ public class JDBCSequentialFile implements SequentialFile {
}
}
} catch (SQLException e) {
fileFactory.onIOError(e, "Error deleting JDBC file.", this);
// file is already gone from a drop somewhere
logger.debug("Expected error deleting Sequential File", e);
return;
}
}

View File

@ -141,7 +141,9 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM
try {
return dbDriver.listFiles(extension);
} catch (SQLException e) {
criticalErrorListener.onIOException(e, "Error listing JDBC files.", null);
// We can't throw critical error here
// exists will call listfiles, and if the store does not exists
// it should simply return false
throw e;
}
}

View File

@ -96,6 +96,9 @@ public abstract class AbstractSequentialFile implements SequentialFile {
@Override
public final void delete() throws IOException, InterruptedException, ActiveMQException {
try {
if (logger.isTraceEnabled()) {
logger.trace("Deleting {}", this.getFileName(), new Exception("trace"));
}
if (isOpen()) {
close(false, false);
}

View File

@ -16,10 +16,11 @@
*/
package org.apache.activemq.artemis.core.io;
/**
* TODO Merge this with IOExceptionListener
*/
public interface IOCriticalErrorListener {
default boolean isPreviouslyFailed() {
return false;
}
void onIOException(Throwable code, String message, String file);
}

View File

@ -45,6 +45,7 @@ import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@ -4619,6 +4620,15 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
}
}
@Override
public void rebuildPageCounters() throws Exception {
// managementLock will guarantee there's only one management operation being called
try (AutoCloseable lock = server.managementLock()) {
Future<Object> task = server.getPagingManager().rebuildCounters();
task.get();
}
}
private ServiceComponent getEmbeddedWebServerComponent() throws ActiveMQIllegalStateException {
for (ActiveMQComponent component : server.getExternalComponents()) {
if (component instanceof WebServerComponentMarker) {

View File

@ -17,6 +17,8 @@
package org.apache.activemq.artemis.core.paging;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.function.BiConsumer;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
@ -141,6 +143,7 @@ public interface PagingManager extends ActiveMQComponent, HierarchicalRepository
*/
void checkMemory(Runnable runWhenAvailable);
void counterSnapshot();
/**
* Use this when you have no refernce of an address. (anonymous AMQP Producers for example)
@ -157,4 +160,15 @@ public interface PagingManager extends ActiveMQComponent, HierarchicalRepository
default long getMaxMessages() {
return 0;
}
/**
* Rebuilds all page counters for destinations that are paging in the background.
*/
default Future<Object> rebuildCounters() {
return null;
}
default void forEachTransaction(BiConsumer<Long, PageTransactionInfo> transactionConsumer) {
}
}

View File

@ -24,6 +24,7 @@ import org.apache.activemq.artemis.api.core.RefCountMessageListener;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider;
import org.apache.activemq.artemis.core.paging.impl.Page;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.replication.ReplicationManager;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.RouteContextList;
@ -129,6 +130,9 @@ public interface PagingStore extends ActiveMQComponent, RefCountMessageListener
Page getCurrentPage();
/** it will save snapshots on the counters */
void counterSnapshot();
/**
* @return true if paging was started, or false if paging was already started before this call
*/
@ -220,4 +224,8 @@ public interface PagingStore extends ActiveMQComponent, RefCountMessageListener
void block();
void unblock();
default StorageManager getStorageManager() {
return null;
}
}

View File

@ -17,6 +17,8 @@
package org.apache.activemq.artemis.core.paging.cursor;
import java.util.function.BiConsumer;
// this is to expose PageSubscriptionImpl::PageCursorInfo
public interface ConsumedPage {
@ -24,4 +26,8 @@ public interface ConsumedPage {
boolean isDone();
boolean isAck(int messageNumber);
void forEachAck(BiConsumer<Integer, PagePosition> ackConsumer);
}

View File

@ -16,6 +16,8 @@
*/
package org.apache.activemq.artemis.core.paging.cursor;
import java.util.function.Consumer;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.paging.PagedMessage;
@ -32,12 +34,16 @@ public interface PageCursorProvider {
*/
PageSubscription getSubscription(long queueId);
void forEachSubscription(Consumer<PageSubscription> consumer);
PageSubscription createSubscription(long queueId, Filter filter, boolean durable);
void processReload() throws Exception;
void stop();
void counterSnapshot();
void flushExecutors();
void scheduleCleanup();
@ -56,4 +62,8 @@ public interface PageCursorProvider {
*/
void close(PageSubscription pageCursorImpl);
void counterRebuildStarted();
void counterRebuildDone();
}

View File

@ -35,6 +35,9 @@ public interface PageSubscription {
// To be called before the server is down
void stop();
/** Save a snapshot of the current counter value in the journal */
void counterSnapshot();
/**
* This is a callback to inform the PageSubscription that something was routed, so the empty flag can be cleared
*/
@ -46,6 +49,8 @@ public interface PageSubscription {
long getMessageCount();
boolean isCounterPending();
long getPersistentSize();
long getId();
@ -170,4 +175,6 @@ public interface PageSubscription {
void incrementDeliveredSize(long size);
void removePendingDelivery(PagedMessage pagedMessage);
ConsumedPage locatePageInfo(long pageNr);
}

View File

@ -16,7 +16,6 @@
*/
package org.apache.activemq.artemis.core.paging.cursor;
import org.apache.activemq.artemis.core.paging.impl.Page;
import org.apache.activemq.artemis.core.transaction.Transaction;
public interface PageSubscriptionCounter {
@ -36,7 +35,13 @@ public interface PageSubscriptionCounter {
void loadInc(long recordInd, int add, long persistentSize);
void applyIncrementOnTX(Transaction tx, long recordID, int add, long persistentSize);
void applyIncrementOnTX(Transaction tx, int add, long persistentSize);
void markRebuilding();
void finishRebuild();
boolean isRebuilding();
/**
* This will process the reload
@ -46,12 +51,12 @@ public interface PageSubscriptionCounter {
// used when deleting the counter
void delete() throws Exception;
void pendingCounter(Page page, int increment, long persistentSize) throws Exception;
void snapshot();
// used when leaving page mode, so the counters are deleted in batches
// for each queue on the address
void delete(Transaction tx) throws Exception;
void cleanupNonTXCounters(long pageID) throws Exception;
PageSubscriptionCounter setSubscription(PageSubscription subscription);
}

View File

@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.paging.cursor.impl;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
public abstract class BasePagingCounter implements PageSubscriptionCounter {
private volatile boolean rebuilding = false;
@Override
public void markRebuilding() {
rebuilding = true;
}
@Override
public void finishRebuild() {
rebuilding = false;
}
@Override
public boolean isRebuilding() {
return rebuilding;
}
}

View File

@ -0,0 +1,327 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.paging.cursor.impl;
import io.netty.util.collection.IntObjectHashMap;
import io.netty.util.collection.LongObjectHashMap;
import org.apache.activemq.artemis.core.paging.PagedMessage;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.paging.cursor.ConsumedPage;
import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
import org.apache.activemq.artemis.core.paging.impl.Page;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.utils.collections.LinkedList;
import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
import org.apache.activemq.artemis.utils.collections.LongHashSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;
import java.util.function.BiConsumer;
/** this class will copy current data from the Subscriptions, count messages while the server is already active
* performing other activity */
public class PageCounterRebuildManager implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final PagingStore pgStore;
private final StorageManager sm;
private final LongHashSet transactions;
private boolean paging;
private long limitPageId;
private int limitMessageNr;
private LongObjectHashMap<CopiedSubscription> copiedSubscriptionMap = new LongObjectHashMap<>();
public PageCounterRebuildManager(PagingStore store, LongHashSet transactions) {
// we make a copy of the data because we are allowing data to influx. We will consolidate the values at the end
initialize(store);
this.pgStore = store;
this.sm = store.getStorageManager();
this.transactions = transactions;
}
/** this method will perform the copy from Acked recorded from the subscription into a separate data structure.
* So we can count data while we consolidate at the end */
private void initialize(PagingStore store) {
store.lock(-1);
try {
try {
paging = store.isPaging();
if (!paging) {
logger.debug("Destination {} was not paging, no need to rebuild counters");
store.getCursorProvider().forEachSubscription(subscription -> {
subscription.getCounter().markRebuilding();
subscription.getCounter().finishRebuild();
});
store.getCursorProvider().counterRebuildDone();
return;
}
store.getCursorProvider().counterRebuildStarted();
Page currentPage = store.getCurrentPage();
limitPageId = store.getCurrentWritingPage();
limitMessageNr = currentPage.getNumberOfMessages();
if (logger.isDebugEnabled()) {
logger.debug("PageCounterRebuild for {}, Current writing page {} and limit will be {} with lastMessage on last page={}", store.getStoreName(), store.getCurrentWritingPage(), limitPageId, limitMessageNr);
}
} catch (Exception e) {
logger.warn(e.getMessage(), e);
limitPageId = store.getCurrentWritingPage();
}
logger.trace("Copying page store ack information from address {}", store.getAddress());
store.getCursorProvider().forEachSubscription(subscription -> {
if (logger.isTraceEnabled()) {
logger.trace("Copying subscription ID {}", subscription.getId());
}
CopiedSubscription copiedSubscription = new CopiedSubscription(subscription);
copiedSubscription.subscriptionCounter.markRebuilding();
copiedSubscriptionMap.put(subscription.getId(), copiedSubscription);
subscription.forEachConsumedPage(consumedPage -> {
if (logger.isTraceEnabled()) {
logger.trace("Copying page {}", consumedPage.getPageId());
}
CopiedConsumedPage copiedConsumedPage = new CopiedConsumedPage();
copiedSubscription.consumedPageMap.put(consumedPage.getPageId(), copiedConsumedPage);
if (consumedPage.isDone()) {
if (logger.isTraceEnabled()) {
logger.trace("Marking page {} as done on the copy", consumedPage.getPageId());
}
copiedConsumedPage.done = true;
} else {
// We only copy the acks if the page is not done
// as if the page is done, we just move over
consumedPage.forEachAck((messageNR, pagePosition) -> {
if (logger.isTraceEnabled()) {
logger.trace("Marking messageNR {} as acked on pageID={} copy", messageNR, consumedPage.getPageId());
}
if (copiedConsumedPage.acks == null) {
copiedConsumedPage.acks = new IntObjectHashMap<>();
}
copiedConsumedPage.acks.put(messageNR, Boolean.TRUE);
});
}
});
});
} finally {
store.unlock();
}
}
private synchronized PageSubscriptionCounter getCounter(long queueID) {
CopiedSubscription copiedSubscription = copiedSubscriptionMap.get(queueID);
if (copiedSubscription != null) {
return copiedSubscription.subscriptionCounter;
} else {
return null;
}
}
private CopiedSubscription getSubscription(long queueID) {
return copiedSubscriptionMap.get(queueID);
}
private boolean isACK(long queueID, long pageNR, int messageNR) {
CopiedSubscription subscription = getSubscription(queueID);
if (subscription == null) {
return true;
}
CopiedConsumedPage consumedPage = subscription.getPage(pageNR);
if (consumedPage == null) {
return false;
} else {
return consumedPage.isAck(messageNR);
}
}
private void done() {
copiedSubscriptionMap.forEach((k, copiedSubscription) -> {
if (!copiedSubscription.empty) {
copiedSubscription.subscription.notEmpty();
try {
copiedSubscription.subscriptionCounter.increment(null, copiedSubscription.addUp, copiedSubscription.sizeUp);
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
}
if (!copiedSubscription.empty) {
copiedSubscription.subscription.notEmpty();
}
if (copiedSubscription.subscriptionCounter != null) {
copiedSubscription.subscriptionCounter.finishRebuild();
}
});
pgStore.getCursorProvider().counterRebuildDone();
pgStore.getCursorProvider().scheduleCleanup();
}
@Override
public void run() {
try {
rebuild();
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
}
public void rebuild() throws Exception {
if (pgStore == null) {
logger.debug("Page store is null during rebuildCounters");
return;
}
if (!paging) {
logger.debug("Ignoring call to rebuild pgStore {}", pgStore.getAddress());
}
logger.debug("Rebuilding counter for store {}", pgStore.getAddress());
for (long pgid = pgStore.getFirstPage(); pgid <= limitPageId; pgid++) {
if (logger.isDebugEnabled()) {
logger.debug("Rebuilding counter on messages from page {} on rebuildCounters for address {}", pgid, pgStore.getAddress());
}
Page page = pgStore.newPageObject(pgid);
if (!page.getFile().exists()) {
if (logger.isDebugEnabled()) {
logger.debug("Skipping page {} on store {}", pgid, pgStore.getAddress());
}
continue;
}
page.open(false);
LinkedList<PagedMessage> msgs = page.read(sm);
page.close(false, false);
try (LinkedListIterator<PagedMessage> iter = msgs.iterator()) {
while (iter.hasNext()) {
PagedMessage msg = iter.next();
if (limitPageId == pgid) {
if (msg.getMessageNumber() >= limitMessageNr) {
if (logger.isDebugEnabled()) {
logger.debug("Rebuild counting on {} reached the last message at {}-{}", pgStore.getAddress(), limitPageId, limitMessageNr);
}
// this is the limit where we should count..
// anything beyond this will be new data
break;
}
}
msg.initMessage(sm);
long[] routedQueues = msg.getQueueIDs();
if (logger.isTraceEnabled()) {
logger.trace("reading message for rebuild cursor on address={}, pg={}, messageNR={}, routedQueues={}, message={}, queueLIst={}", pgStore.getAddress(), msg.getPageNumber(), msg.getMessageNumber(), routedQueues, msg, routedQueues);
}
for (long queueID : routedQueues) {
boolean ok = !isACK(queueID, msg.getPageNumber(), msg.getMessageNumber());
boolean txOK = msg.getTransactionID() <= 0 || transactions == null || transactions.contains(msg.getTransactionID());
if (!txOK) {
logger.debug("TX is not ok for {}", msg);
}
if (ok && txOK) { // not acked and TX is ok
if (logger.isTraceEnabled()) {
logger.trace("Message pageNumber={}/{} NOT acked on queue {}", msg.getPageNumber(), msg.getMessageNumber(), queueID);
}
CopiedSubscription copiedSubscription = copiedSubscriptionMap.get(queueID);
if (copiedSubscription != null) {
copiedSubscription.empty = false;
copiedSubscription.addUp++;
copiedSubscription.sizeUp += msg.getPersistentSize();
}
} else {
if (logger.isTraceEnabled()) {
logger.trace("Message pageNumber={}/{} IS acked on queue {}", msg.getPageNumber(), msg.getMessageNumber(), queueID);
}
}
}
}
}
}
logger.debug("Counter rebuilding done for address {}", pgStore.getAddress());
done();
}
private static class CopiedSubscription {
CopiedSubscription(PageSubscription subscription) {
this.subscriptionCounter = subscription.getCounter();
this.subscription = subscription;
}
private boolean empty = true;
LongObjectHashMap<CopiedConsumedPage> consumedPageMap = new LongObjectHashMap<>();
// this is not a copy! This will be the actual object listed in the PageSubscription
// any changes to this object will reflect in the system and management;
PageSubscriptionCounter subscriptionCounter;
PageSubscription subscription;
CopiedConsumedPage getPage(long pageNr) {
return consumedPageMap.get(pageNr);
}
int addUp;
long sizeUp;
}
private static class CopiedConsumedPage implements ConsumedPage {
boolean done;
IntObjectHashMap<Boolean> acks;
@Override
public long getPageId() {
throw new RuntimeException("method not implemented");
}
@Override
public void forEachAck(BiConsumer<Integer, PagePosition> ackConsumer) {
throw new RuntimeException("method not implemented");
}
@Override
public boolean isDone() {
return done;
}
@Override
public boolean isAck(int messageNumber) {
if (done) {
return true;
}
if (acks != null) {
return acks.get(messageNumber) != null;
}
return false;
}
}
}

View File

@ -28,6 +28,7 @@ import org.apache.activemq.artemis.core.paging.PagedMessage;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
import org.apache.activemq.artemis.core.paging.cursor.PagedReference;
import org.apache.activemq.artemis.core.paging.cursor.PagedReferenceImpl;
import org.apache.activemq.artemis.core.paging.impl.Page;
@ -42,6 +43,7 @@ import org.apache.activemq.artemis.utils.collections.LongHashSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;
import java.util.function.Consumer;
public class PageCursorProviderImpl implements PageCursorProvider {
@ -54,6 +56,10 @@ public class PageCursorProviderImpl implements PageCursorProvider {
protected volatile boolean cleanupEnabled = true;
// We can't call cleanup before counters were rebuilt
// as they will determine if a subscription is empty or not
protected volatile boolean countersRebuilt = true;
protected final PagingStore pagingStore;
protected final StorageManager storageManager;
@ -85,16 +91,30 @@ public class PageCursorProviderImpl implements PageCursorProvider {
throw new IllegalStateException("Cursor " + cursorID + " had already been created");
}
PageSubscription activeCursor = new PageSubscriptionImpl(this, pagingStore, storageManager, filter, cursorID, persistent);
PageSubscriptionCounter subscriptionCounter = createPageCounter(cursorID, persistent);
PageSubscription activeCursor = new PageSubscriptionImpl(this, pagingStore, storageManager, filter, cursorID, persistent, subscriptionCounter);
activeCursors.put(cursorID, activeCursor);
return activeCursor;
}
private PageSubscriptionCounter createPageCounter(long cursorID, boolean persistent) {
return new PageSubscriptionCounterImpl(storageManager, cursorID);
}
@Override
public synchronized PageSubscription getSubscription(long cursorID) {
return activeCursors.get(cursorID);
}
@Override
public void forEachSubscription(Consumer<PageSubscription> consumer) {
activeCursors.forEach((k, v) -> consumer.accept(v));
}
@Override
public PagedReference newReference(final PagedMessage msg,
final PageSubscription subscription) {
@ -139,6 +159,13 @@ public class PageCursorProviderImpl implements PageCursorProvider {
}
}
@Override
public void counterSnapshot() {
for (PageSubscription cursor : activeCursors.values()) {
cursor.counterSnapshot();
}
}
@Override
public void flushExecutors() {
pagingStore.flushExecutors();
@ -216,6 +243,11 @@ public class PageCursorProviderImpl implements PageCursorProvider {
protected void cleanup() {
if (!countersRebuilt) {
logger.debug("Counters were not rebuilt yet, cleanup has to be ignored on address {}", pagingStore != null ? pagingStore.getAddress() : "NULL");
return;
}
ArrayList<Page> depagedPages = new ArrayList<>();
LongHashSet depagedPagesSet = new LongHashSet();
@ -506,6 +538,10 @@ public class PageCursorProviderImpl implements PageCursorProvider {
private long checkMinPage(Collection<PageSubscription> cursorList) {
long minPage = Long.MAX_VALUE;
if (logger.isTraceEnabled()) {
logger.trace("Min page cursorList size {} on {}", cursorList.size(), pagingStore.getAddress(), new Exception("trace"));
}
for (PageSubscription cursor : cursorList) {
long firstPage = cursor.getFirstPage();
if (logger.isTraceEnabled()) {
@ -543,4 +579,14 @@ public class PageCursorProviderImpl implements PageCursorProvider {
}
}
}
@Override
public void counterRebuildStarted() {
this.countersRebuilt = false;
}
@Override
public void counterRebuildDone() {
this.countersRebuilt = true;
}
}

View File

@ -16,19 +16,12 @@
*/
package org.apache.activemq.artemis.core.paging.cursor.impl;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
import org.apache.activemq.artemis.core.paging.impl.Page;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.transaction.Transaction;
@ -44,173 +37,117 @@ import java.lang.invoke.MethodHandles;
/**
* This class will encapsulate the persistent counters for the PagingSubscription
*/
public class PageSubscriptionCounterImpl implements PageSubscriptionCounter {
public class PageSubscriptionCounterImpl extends BasePagingCounter {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final int FLUSH_COUNTER = 1000;
private final long subscriptionID;
// the journal record id that is holding the current value
private long recordID = -1;
private boolean persistent;
/** while we rebuild the counters, we will use the recordedValues */
private volatile long recordedValue = -1;
private static final AtomicLongFieldUpdater<PageSubscriptionCounterImpl> recordedValueUpdater = AtomicLongFieldUpdater.newUpdater(PageSubscriptionCounterImpl.class, "recordedValue");
private final PageSubscription subscription;
/** while we rebuild the counters, we will use the recordedValues */
private volatile long recordedSize = -1;
private static final AtomicLongFieldUpdater<PageSubscriptionCounterImpl> recordedSizeUpdater = AtomicLongFieldUpdater.newUpdater(PageSubscriptionCounterImpl.class, "recordedSize");
private PageSubscription subscription;
private final StorageManager storage;
private final AtomicLong value = new AtomicLong(0);
private final AtomicLong persistentSize = new AtomicLong(0);
private volatile long value;
private static final AtomicLongFieldUpdater<PageSubscriptionCounterImpl> valueUpdater = AtomicLongFieldUpdater.newUpdater(PageSubscriptionCounterImpl.class, "value");
private final AtomicLong added = new AtomicLong(0);
private final AtomicLong addedPersistentSize = new AtomicLong(0);
private volatile long persistentSize;
private static final AtomicLongFieldUpdater<PageSubscriptionCounterImpl> persistentSizeUpdater = AtomicLongFieldUpdater.newUpdater(PageSubscriptionCounterImpl.class, "persistentSize");
private final AtomicLong pendingValue = new AtomicLong(0);
private final AtomicLong pendingPersistentSize = new AtomicLong(0);
private volatile long added;
private static final AtomicLongFieldUpdater<PageSubscriptionCounterImpl> addedUpdater = AtomicLongFieldUpdater.newUpdater(PageSubscriptionCounterImpl.class, "added");
private final LinkedList<Long> incrementRecords = new LinkedList<>();
// We are storing pending counters for non transactional writes on page
// we will recount a page case we still see pending records
// as soon as we close a page we remove these records replacing by a regular page increment record
// A Map per pageID, each page will have a set of IDs, with the increment on each one
private final Map<Long, PendingCounter> pendingCounters = new HashMap<>();
private volatile long addedPersistentSize;
private static final AtomicLongFieldUpdater<PageSubscriptionCounterImpl> addedPersistentSizeUpdater = AtomicLongFieldUpdater.newUpdater(PageSubscriptionCounterImpl.class, "addedPersistentSize");
private LinkedList<PendingCounter> loadList;
private final Executor pageExecutor;
public PageSubscriptionCounterImpl(final StorageManager storage,
final PageSubscription subscription,
final boolean persistent,
final long subscriptionID) {
this.subscriptionID = subscriptionID;
this.storage = storage;
this.persistent = persistent;
this.subscription = subscription;
if (subscription == null) {
this.pageExecutor = null;
} else {
this.pageExecutor = subscription.getPagingStore().getExecutor();
assert pageExecutor != null;
}
@Override
public void markRebuilding() {
if (logger.isDebugEnabled()) {
logger.debug("Subscription {} marked for rebuilding", subscriptionID);
}
super.markRebuilding();
recordedSizeUpdater.set(this, persistentSizeUpdater.get(this));
recordedValueUpdater.set(this, recordedValueUpdater.get(this));
try {
reset();
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
}
@Override
public void finishRebuild() {
super.finishRebuild();
if (logger.isDebugEnabled()) {
logger.debug("Subscription {} finished rebuilding", subscriptionID);
}
snapshot();
addedUpdater.set(this, valueUpdater.get(this));
addedPersistentSizeUpdater.set(this, persistentSizeUpdater.get(this));
}
@Override
public long getValueAdded() {
return added.get() + pendingValue.get();
return addedUpdater.get(this);
}
@Override
public long getValue() {
return value.get() + pendingValue.get();
if (isRebuilding()) {
if (logger.isTraceEnabled()) {
logger.trace("returning getValue from isPending on subscription {}, recordedValue={}, addedUpdater={}", recordedValueUpdater.get(this), addedUpdater.get(this));
}
return recordedValueUpdater.get(this);
}
if (logger.isTraceEnabled()) {
logger.trace("returning regular getValue subscription {}, value={}", subscriptionID, valueUpdater.get(this));
}
return valueUpdater.get(this);
}
@Override
public long getPersistentSizeAdded() {
return addedPersistentSize.get() + pendingPersistentSize.get();
return addedPersistentSizeUpdater.get(this);
}
@Override
public long getPersistentSize() {
return persistentSize.get() + pendingPersistentSize.get();
}
/**
* This is used only on non transactional paging
*
* @param page
* @param increment
* @throws Exception
*/
@Override
public synchronized void pendingCounter(Page page, int increment, long size) throws Exception {
if (!persistent) {
return; // nothing to be done
if (isRebuilding()) {
if (logger.isTraceEnabled()) {
logger.trace("returning getPersistentSize from isPending on subscription {}, recordedSize={}. addedSize={}", subscriptionID, recordedSizeUpdater.get(this), addedPersistentSizeUpdater.get(this));
}
return recordedSizeUpdater.get(this);
}
assert page != null;
PendingCounter pendingInfo = pendingCounters.get((long) page.getPageId());
if (pendingInfo == null) {
// We have to make sure this is sync here
// not syncing this to disk may cause the page files to be out of sync on pages.
// we can't afford the case where a page file is written without a record here
long id = storage.storePendingCounter(this.subscriptionID, page.getPageId());
pendingInfo = new PendingCounter(id, increment, size);
pendingCounters.put((long) page.getPageId(), pendingInfo);
} else {
pendingInfo.addAndGet(increment, size);
}
pendingValue.addAndGet(increment);
pendingPersistentSize.addAndGet(size);
page.addPendingCounter(this);
}
/**
* Cleanup temporary page counters on non transactional paged messages
*
* @param pageID
*/
@Override
public void cleanupNonTXCounters(final long pageID) throws Exception {
PendingCounter pendingInfo;
synchronized (this) {
pendingInfo = pendingCounters.remove(pageID);
}
if (pendingInfo != null) {
final int valueCleaned = pendingInfo.getCount();
final long valueSizeCleaned = pendingInfo.getPersistentSize();
Transaction tx = new TransactionImpl(storage);
storage.deletePendingPageCounter(tx.getID(), pendingInfo.getId());
// To apply the increment of the value just being cleaned
increment(tx, valueCleaned, valueSizeCleaned);
tx.addOperation(new TransactionOperationAbstract() {
@Override
public void afterCommit(Transaction tx) {
pendingValue.addAndGet(-valueCleaned);
pendingPersistentSize.updateAndGet(val -> val >= valueSizeCleaned ? val - valueSizeCleaned : 0);
}
});
tx.commit();
if (logger.isTraceEnabled()) {
logger.trace("returning regular getPersistentSize subscription {}, value={}", subscriptionID, persistentSizeUpdater.get(this));
}
return persistentSizeUpdater.get(this);
}
@Override
public void increment(Transaction tx, int add, long size) throws Exception {
if (tx == null) {
if (persistent) {
long id = storage.storePageCounterInc(this.subscriptionID, add, size);
storage.getContext().executeOnCompletion(new IOCallback() {
@Override
public void done() {
process(id, add, size);
}
@Override
public void onError(int errorCode, String errorMessage) {
}
});
} else {
process(-1, add, size);
}
process(add, size);
} else {
if (persistent) {
tx.setContainsPersistent();
long id = storage.storePageCounterInc(tx.getID(), this.subscriptionID, add, size);
applyIncrementOnTX(tx, id, add, size);
} else {
applyIncrementOnTX(tx, -1, add, size);
}
applyIncrementOnTX(tx, add, size);
}
}
@ -218,11 +155,10 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter {
* This method will install the TXs
*
* @param tx
* @param recordID1
* @param add
*/
@Override
public void applyIncrementOnTX(Transaction tx, long recordID1, int add, long size) {
public void applyIncrementOnTX(Transaction tx, int add, long size) {
CounterOperations oper = (CounterOperations) tx.getProperty(TransactionPropertyIndexes.PAGE_COUNT_INC);
if (oper == null) {
@ -231,27 +167,36 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter {
tx.addOperation(oper);
}
oper.operations.add(new ItemOper(this, recordID1, add, size));
oper.operations.add(new ItemOper(this, add, size));
}
@Override
public synchronized void loadValue(final long recordID1, final long value1, long size) {
if (this.subscription != null) {
// it could be null on testcases... which is ok
this.subscription.notEmpty();
public synchronized void loadValue(final long recordID, final long value, long size) {
if (logger.isDebugEnabled()) {
logger.debug("Counter for subscription {} reloading recordID={}, value={}, size={}", this.subscriptionID, recordID, value, size);
}
this.value.set(value1);
this.added.set(value1);
this.persistentSize.set(size);
this.addedPersistentSize.set(size);
this.recordID = recordID1;
this.recordID = recordID;
recordedValueUpdater.set(this, value);
recordedSizeUpdater.set(this, size);
valueUpdater.set(this, value);
persistentSizeUpdater.set(this, size);
addedUpdater.set(this, size);
}
private void process(long id, int add, long size) {
if (id >= 0 && pageExecutor != null) {
pageExecutor.execute(() -> doIncrement(id, add, size));
} else {
doIncrement(-1, add, size);
private void process(int add, long size) {
if (logger.isTraceEnabled()) {
logger.trace("process subscription={} add={}, size={}", subscriptionID, add, size);
}
valueUpdater.addAndGet(this, add);
persistentSizeUpdater.addAndGet(this, size);
if (add > 0) {
addedUpdater.addAndGet(this, add);
addedPersistentSizeUpdater.addAndGet(this, size);
}
if (isRebuilding()) {
recordedValueUpdater.addAndGet(this, value);
recordedSizeUpdater.addAndGet(this, size);
}
}
@ -264,24 +209,39 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter {
tx.commit();
}
private void reset() throws Exception {
Transaction tx = new TransactionImpl(storage);
delete(tx, true);
tx.commit();
}
@Override
public void delete(Transaction tx) throws Exception {
delete(tx, false);
}
private void delete(Transaction tx, boolean keepZero) throws Exception {
if (logger.isDebugEnabled()) {
logger.debug("Subscription {} delete, keepZero={}", subscriptionID, keepZero);
}
// always lock the StorageManager first.
try (ArtemisCloseable lock = storage.closeableReadLock()) {
synchronized (this) {
for (Long record : incrementRecords) {
storage.deleteIncrementRecord(tx.getID(), record.longValue());
tx.setContainsPersistent();
}
if (recordID >= 0) {
storage.deletePageCounter(tx.getID(), this.recordID);
tx.setContainsPersistent();
}
recordID = -1;
value.set(0);
incrementRecords.clear();
if (keepZero) {
recordID = storage.storePageCounter(tx.getID(), subscriptionID, 0L, 0L);
} else {
recordID = -1;
}
valueUpdater.set(this, 0);
persistentSizeUpdater.set(this, 0);
}
}
}
@ -298,110 +258,101 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter {
@Override
public void processReload() {
if (loadList != null) {
if (subscription != null) {
// it could be null on testcases
subscription.notEmpty();
}
for (PendingCounter incElement : loadList) {
value.addAndGet(incElement.getCount());
added.addAndGet(incElement.getCount());
persistentSize.addAndGet(incElement.getPersistentSize());
addedPersistentSize.addAndGet(incElement.getPersistentSize());
incrementRecords.add(incElement.getId());
try {
long tx = -1L;
logger.debug("Removing increment records on cursor {}", subscriptionID);
for (PendingCounter incElement : loadList) {
if (tx < 0) {
tx = storage.generateID();
}
storage.deletePageCounter(tx, incElement.id);
}
if (tx >= 0) {
storage.commit(tx);
}
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
loadList.clear();
loadList = null;
}
}
// you need to call this method from the executors when id > 0
private void doIncrement(long id, int variance, long size) {
value.addAndGet(variance);
this.persistentSize.addAndGet(size);
if (variance > 0) {
added.addAndGet(variance);
}
if (size > 0) {
addedPersistentSize.addAndGet(size);
}
if (id >= 0) {
synchronized (this) {
incrementRecords.add(id);
if (incrementRecords.size() > FLUSH_COUNTER) {
this.cleanup();
}
}
}
}
/**
* used on testing only
*/
public void setPersistent(final boolean persistent) {
this.persistent = persistent;
}
/**
* This method should always be called from a single threaded executor
*/
protected synchronized void cleanup() {
if (incrementRecords.size() <= FLUSH_COUNTER) {
@Override
public synchronized void snapshot() {
if (isRebuilding()) {
if (logger.isDebugEnabled()) {
logger.debug("snapshot call ignored as cursor is being rebuilt for {}", subscriptionID);
}
return;
}
long valueReplace = value.get();
long sizeReplace = persistentSize.get();
ArrayList<Long> deleteList = new ArrayList<>(incrementRecords);
incrementRecords.clear();
if (!storage.isStarted()) {
logger.debug("Storage is not active, ignoring snapshot call on {}", subscriptionID);
return;
}
long valueReplace = valueUpdater.get(this);
long sizeReplace = persistentSizeUpdater.get(this);
long newRecordID = -1;
long txCleanup = storage.generateID();
long txCleanup = -1;
try {
for (Long value1 : deleteList) {
storage.deleteIncrementRecord(txCleanup, value1);
}
if (recordID >= 0) {
if (txCleanup < 0) {
txCleanup = storage.generateID();
}
storage.deletePageCounter(txCleanup, recordID);
recordID = -1;
}
newRecordID = storage.storePageCounter(txCleanup, subscriptionID, valueReplace, sizeReplace);
if (logger.isTraceEnabled()) {
logger.trace("Replacing page-counter record = {} by record = {} on subscriptionID = {} for queue = {}",
recordID, newRecordID, subscriptionID, subscription.getQueue().getName());
if (valueReplace > 0) {
if (txCleanup < 0) {
txCleanup = storage.generateID();
}
newRecordID = storage.storePageCounter(txCleanup, subscriptionID, valueReplace, sizeReplace);
}
storage.commit(txCleanup);
if (logger.isDebugEnabled()) {
logger.debug("Replacing page-counter record = {} by record = {} on subscriptionID = {} for queue = {}, value = {}, size = {}",
recordID, newRecordID, subscriptionID, subscription.getQueue().getName(), valueReplace, sizeReplace);
}
if (txCleanup >= 0) {
storage.commit(txCleanup);
}
} catch (Exception e) {
newRecordID = recordID;
ActiveMQServerLogger.LOGGER.problemCleaningPagesubscriptionCounter(e);
try {
storage.rollback(txCleanup);
} catch (Exception ignored) {
if (txCleanup >= 0) {
try {
storage.rollback(txCleanup);
} catch (Exception ignored) {
}
}
} finally {
recordID = newRecordID;
recordedValueUpdater.set(this, valueReplace);
recordedSizeUpdater.set(this, sizeReplace);
}
}
private static class ItemOper {
private ItemOper(PageSubscriptionCounterImpl counter, long id, int add, long persistentSize) {
private ItemOper(PageSubscriptionCounterImpl counter, int add, long persistentSize) {
this.counter = counter;
this.id = id;
this.amount = add;
this.persistentSize = persistentSize;
}
PageSubscriptionCounterImpl counter;
long id;
int amount;
long persistentSize;
@ -414,7 +365,7 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter {
@Override
public void afterCommit(Transaction tx) {
for (ItemOper oper : operations) {
oper.counter.process(oper.id, oper.amount, oper.persistentSize);
oper.counter.process(oper.amount, oper.persistentSize);
}
}
}
@ -465,4 +416,10 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter {
SIZE_UPDATER.addAndGet(this, persistentSize);
}
}
@Override
public PageSubscriptionCounter setSubscription(PageSubscription subscription) {
this.subscription = subscription;
return this;
}
}

View File

@ -28,6 +28,7 @@ import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
import java.util.function.ToIntFunction;
@ -273,14 +274,17 @@ public final class PageSubscriptionImpl implements PageSubscription {
final StorageManager store,
final Filter filter,
final long cursorId,
final boolean persistent) {
final boolean persistent,
final PageSubscriptionCounter counter) {
assert counter != null;
this.pageStore = pageStore;
this.store = store;
this.cursorProvider = cursorProvider;
this.cursorId = cursorId;
this.filter = filter;
this.persistent = persistent;
this.counter = new PageSubscriptionCounterImpl(store, this, persistent, cursorId);
this.counter = counter;
this.counter.setSubscription(this);
}
@ -346,6 +350,11 @@ public final class PageSubscriptionImpl implements PageSubscription {
}
}
@Override
public boolean isCounterPending() {
return counter.isRebuilding();
}
@Override
public long getPersistentSize() {
if (empty) {
@ -418,10 +427,8 @@ public final class PageSubscriptionImpl implements PageSubscription {
@Override
public void onPageModeCleared(Transaction tx) throws Exception {
if (counter != null) {
// this could be null on testcases
counter.delete(tx);
}
// this could be null on testcases
counter.delete(tx);
this.empty = true;
}
@ -746,9 +753,9 @@ public final class PageSubscriptionImpl implements PageSubscription {
}
@Override
public void forEachConsumedPage(Consumer<ConsumedPage> pageCleaner) {
public void forEachConsumedPage(Consumer<ConsumedPage> pageConsumer) {
synchronized (consumedPages) {
consumedPages.values().forEach(pageCleaner);
consumedPages.values().forEach(pageConsumer);
}
}
@ -860,6 +867,11 @@ public final class PageSubscriptionImpl implements PageSubscription {
public void stop() {
}
@Override
public void counterSnapshot() {
counter.snapshot();
}
@Override
public void printDebug() {
printDebug(toString());
@ -912,6 +924,7 @@ public final class PageSubscriptionImpl implements PageSubscription {
return getPageInfo(pos.getPageNr());
}
@Override
public PageCursorInfo locatePageInfo(final long pageNr) {
synchronized (consumedPages) {
return consumedPages.get(pageNr);
@ -1064,10 +1077,16 @@ public final class PageSubscriptionImpl implements PageSubscription {
// expressions
private final AtomicInteger confirmed = new AtomicInteger(0);
@Override
public synchronized boolean isAck(int messageNumber) {
return completePage != null || acks.get(messageNumber) != null;
}
@Override
public void forEachAck(BiConsumer<Integer, PagePosition> ackConsumer) {
acks.forEach(ackConsumer);
}
@Override
public String toString() {
try {

View File

@ -17,7 +17,6 @@
package org.apache.activemq.artemis.core.paging.impl;
import java.nio.ByteBuffer;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
@ -26,13 +25,11 @@ import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.paging.PagedMessage;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.utils.ReferenceCounterUtil;
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
import org.apache.activemq.artemis.utils.collections.EmptyList;
import org.apache.activemq.artemis.utils.collections.LinkedList;
import org.apache.activemq.artemis.utils.collections.LinkedListImpl;
@ -86,11 +83,6 @@ public final class Page {
private final SimpleString storeName;
/**
* A list of subscriptions containing pending counters (with non tx adds) on this page
*/
private Set<PageSubscriptionCounter> pendingCounters;
private ByteBuffer readFileBuffer;
public Page(final SimpleString storeName,
@ -241,13 +233,6 @@ public final class Page {
storageManager.pageClosed(storeName, pageId);
}
file.close(waitSync, waitSync);
Set<PageSubscriptionCounter> counters = getPendingCounters();
if (counters != null) {
for (PageSubscriptionCounter counter : counters) {
counter.cleanupNonTXCounters(this.getPageId());
}
}
}
public boolean delete(final LinkedList<PagedMessage> messages) throws Exception {
@ -255,7 +240,9 @@ public final class Page {
storageManager.pageDeleted(storeName, pageId);
}
if (logger.isDebugEnabled()) {
if (logger.isTraceEnabled()) {
logger.trace("Deleting pageNr={} on store {}", pageId, storeName, new Exception("trace"));
} else if (logger.isDebugEnabled()) {
logger.debug("Deleting pageNr={} on store {}", pageId, storeName);
}
@ -373,24 +360,4 @@ public final class Page {
return file;
}
/**
* This will indicate a page that will need to be called on cleanup when the page has been closed and confirmed
*
* @param pageSubscriptionCounter
*/
public void addPendingCounter(PageSubscriptionCounter pageSubscriptionCounter) {
getOrCreatePendingCounters().add(pageSubscriptionCounter);
}
private synchronized Set<PageSubscriptionCounter> getPendingCounters() {
return pendingCounters;
}
private synchronized Set<PageSubscriptionCounter> getOrCreatePendingCounters() {
if (pendingCounters == null) {
pendingCounters = new ConcurrentHashSet<>();
}
return pendingCounters;
}
}

View File

@ -24,6 +24,8 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -32,6 +34,7 @@ import org.apache.activemq.artemis.core.paging.PageTransactionInfo;
import org.apache.activemq.artemis.core.paging.PagingManager;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.paging.PagingStoreFactory;
import org.apache.activemq.artemis.core.paging.cursor.impl.PageCounterRebuildManager;
import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
@ -40,14 +43,16 @@ import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.utils.ByteUtil;
import org.apache.activemq.artemis.utils.SizeAwareMetric;
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
import org.apache.activemq.artemis.utils.collections.LongHashSet;
import org.apache.activemq.artemis.utils.runnables.AtomicRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;
import java.util.function.BiConsumer;
public final class PagingManagerImpl implements PagingManager {
private static final int ARTEMIS_DEBUG_PAGING_INTERVAL = Integer.valueOf(System.getProperty("artemis.debug.paging.interval", "0"));
private static final int ARTEMIS_PAGING_COUNTER_SNAPSHOT_INTERVAL = Integer.valueOf(System.getProperty("artemis.paging.counter.snapshot.interval", "60"));
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@ -92,13 +97,13 @@ public final class PagingManagerImpl implements PagingManager {
private volatile long diskTotalSpace = 0;
private final Executor memoryExecutor;
private final Executor managerExecutor;
private final Queue<Runnable> memoryCallback = new ConcurrentLinkedQueue<>();
private final ConcurrentMap</*TransactionID*/Long, PageTransactionInfo> transactions = new ConcurrentHashMap<>();
private ActiveMQScheduledComponent scheduledComponent = null;
private ActiveMQScheduledComponent snapshotUpdater = null;
private final SimpleString managementAddress;
@ -127,7 +132,7 @@ public final class PagingManagerImpl implements PagingManager {
globalSizeMetric.setElementsEnabled(maxMessages >= 0);
globalSizeMetric.setOverCallback(() -> setGlobalFull(true));
globalSizeMetric.setUnderCallback(() -> setGlobalFull(false));
this.memoryExecutor = pagingSPI.newExecutor();
this.managerExecutor = pagingSPI.newExecutor();
this.managementAddress = managementAddress;
}
@ -205,8 +210,8 @@ public final class PagingManagerImpl implements PagingManager {
protected void checkMemoryRelease() {
if (!diskFull && (maxSize < 0 || !globalFull) && !blockedStored.isEmpty()) {
if (!memoryCallback.isEmpty()) {
if (memoryExecutor != null) {
memoryExecutor.execute(this::memoryReleased);
if (managerExecutor != null) {
managerExecutor.execute(this::memoryReleased);
} else {
memoryReleased();
}
@ -368,8 +373,8 @@ public final class PagingManagerImpl implements PagingManager {
PagingStore oldStore = stores.remove(store.getStoreName());
if (oldStore != null) {
oldStore.stop();
oldStore = null;
}
store.getCursorProvider().counterRebuildStarted(); // TODO-NOW-DONT-MERGE maybe this should be removed
store.start();
stores.put(store.getStoreName(), store);
}
@ -466,28 +471,38 @@ public final class PagingManagerImpl implements PagingManager {
reloadStores();
if (ARTEMIS_DEBUG_PAGING_INTERVAL > 0) {
this.scheduledComponent = new ActiveMQScheduledComponent(pagingStoreFactory.getScheduledExecutor(), pagingStoreFactory.newExecutor(), ARTEMIS_DEBUG_PAGING_INTERVAL, TimeUnit.SECONDS, false) {
if (ARTEMIS_PAGING_COUNTER_SNAPSHOT_INTERVAL > 0) {
this.snapshotUpdater = new ActiveMQScheduledComponent(pagingStoreFactory.getScheduledExecutor(), pagingStoreFactory.newExecutor(), ARTEMIS_PAGING_COUNTER_SNAPSHOT_INTERVAL, TimeUnit.SECONDS, false) {
@Override
public void run() {
debug();
try {
logger.debug("Updating counter snapshots");
counterSnapshot();
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
}
};
this.scheduledComponent.start();
this.snapshotUpdater.start();
}
started = true;
} finally {
unlock();
}
}
public void debug() {
logger.info("size = {} bytes, messages = {}", globalSizeMetric.getSize(), globalSizeMetric.getElements());
@Override
public void counterSnapshot() {
for (PagingStore store : stores.values()) {
store.counterSnapshot();
}
}
@Override
public synchronized void stop() throws Exception {
if (!started) {
@ -495,9 +510,9 @@ public final class PagingManagerImpl implements PagingManager {
}
started = false;
if (scheduledComponent != null) {
this.scheduledComponent.stop();
this.scheduledComponent = null;
if (snapshotUpdater != null) {
this.snapshotUpdater.stop();
this.snapshotUpdater = null;
}
lock();
@ -548,4 +563,26 @@ public final class PagingManagerImpl implements PagingManager {
syncLock.writeLock().lock();
}
@Override
public void forEachTransaction(BiConsumer<Long, PageTransactionInfo> transactionConsumer) {
transactions.forEach(transactionConsumer);
}
@Override
public Future<Object> rebuildCounters() {
LongHashSet transactionsSet = new LongHashSet();
transactions.forEach((txId, tx) -> {
transactionsSet.add(txId);
});
stores.forEach((address, pgStore) -> {
PageCounterRebuildManager rebuildManager = new PageCounterRebuildManager(pgStore, transactionsSet);
logger.debug("Setting destination {} to rebuild counters", address);
managerExecutor.execute(rebuildManager);
});
FutureTask<Object> task = new FutureTask<>(() -> null);
managerExecutor.execute(task);
return task;
}
}

View File

@ -380,33 +380,39 @@ public class PagingStoreImpl implements PagingStore {
}
@Override
public synchronized void stop() throws Exception {
if (running) {
cursorProvider.flushExecutors();
cursorProvider.stop();
public void counterSnapshot() {
cursorProvider.counterSnapshot();
}
final List<Runnable> pendingTasks = new ArrayList<>();
@Override
public void stop() throws Exception {
synchronized (this) {
if (running) {
cursorProvider.stop();
running = false;
} else {
return;
}
}
// TODO we could have a parameter to use this
final int pendingTasksWhileShuttingDown = executor.shutdownNow(pendingTasks::add, 30, TimeUnit.SECONDS);
if (pendingTasksWhileShuttingDown > 0) {
logger.trace("Try executing {} pending tasks on stop", pendingTasksWhileShuttingDown);
for (Runnable pendingTask : pendingTasks) {
try {
pendingTask.run();
} catch (Throwable t) {
logger.warn("Error while executing a pending task on shutdown", t);
}
final List<Runnable> pendingTasks = new ArrayList<>();
final int pendingTasksWhileShuttingDown = executor.shutdownNow(pendingTasks::add, 30, TimeUnit.SECONDS);
if (pendingTasksWhileShuttingDown > 0) {
logger.trace("Try executing {} pending tasks on stop", pendingTasksWhileShuttingDown);
for (Runnable pendingTask : pendingTasks) {
try {
pendingTask.run();
} catch (Throwable t) {
logger.warn("Error while executing a pending task on shutdown", t);
}
}
}
running = false;
final Page page = currentPage;
if (page != null) {
page.close(false);
currentPage = null;
}
final Page page = currentPage;
if (page != null) {
page.close(false);
currentPage = null;
}
}
@ -424,10 +430,13 @@ public class PagingStoreImpl implements PagingStore {
public void flushExecutors() {
FutureLatch future = new FutureLatch();
executor.execute(future);
try {
executor.execute(future);
if (!future.await(60000)) {
ActiveMQServerLogger.LOGGER.pageStoreTimeout(address);
if (!future.await(60000)) {
ActiveMQServerLogger.LOGGER.pageStoreTimeout(address);
}
} catch (Exception ignored) {
}
}
@ -1122,14 +1131,7 @@ public class PagingStoreImpl implements PagingStore {
List<org.apache.activemq.artemis.core.server.Queue> durableQueues = ctx.getDurableQueues();
List<org.apache.activemq.artemis.core.server.Queue> nonDurableQueues = ctx.getNonDurableQueues();
for (org.apache.activemq.artemis.core.server.Queue q : durableQueues) {
if (tx == null) {
// non transactional writes need an intermediate place
// to avoid the counter getting out of sync
q.getPageSubscription().getCounter().pendingCounter(page, 1, size);
} else {
// null tx is treated through pending counters
q.getPageSubscription().getCounter().increment(tx, 1, size);
}
q.getPageSubscription().getCounter().increment(tx, 1, size);
}
for (org.apache.activemq.artemis.core.server.Queue q : nonDurableQueues) {
@ -1408,5 +1410,9 @@ public class PagingStoreImpl implements PagingStore {
usedPages.forEachUsedPage(consumerPage);
}
@Override
public StorageManager getStorageManager() {
return storageManager;
}
}

View File

@ -22,7 +22,6 @@ import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalR
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADD_LARGE_MESSAGE_PENDING;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.DUPLICATE_ID;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.PAGE_CURSOR_COUNTER_INC;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.PAGE_CURSOR_COUNTER_VALUE;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.SET_SCHEDULED_DELIVERY_TIME;
import java.security.InvalidParameterException;
@ -1692,7 +1691,11 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
@Override
public synchronized boolean isStarted() {
return started;
if (ioCriticalErrorListener != null) {
return started && !ioCriticalErrorListener.isPreviouslyFailed();
} else {
return started;
}
}
/**
@ -1895,25 +1898,14 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
}
break;
}
case PAGE_CURSOR_COUNTER_VALUE: {
ActiveMQServerLogger.LOGGER.journalPAGEOnPrepared();
break;
}
case PAGE_CURSOR_COUNTER_INC: {
PageCountRecordInc encoding = new PageCountRecordInc();
encoding.decode(buff);
PageSubscription sub = locateSubscription(encoding.getQueueID(), pageSubscriptions, queueInfos, pagingManager);
logger.debug("Page cursor counter inc on a prepared TX.");
if (sub != null) {
sub.getCounter().applyIncrementOnTX(tx, record.id, encoding.getValue(), encoding.getPersistentSize());
sub.notEmpty();
} else {
ActiveMQServerLogger.LOGGER.journalCannotFindQueueReloadingACK(encoding.getQueueID());
}
// TODO: do I need to remove the record on commit?
break;
}

View File

@ -535,7 +535,7 @@ public final class DescribeJournal {
PageSubscriptionCounterImpl subsCounter;
subsCounter = counters.get(queueIDForCounter);
if (subsCounter == null) {
subsCounter = new PageSubscriptionCounterImpl(null, null, false, -1);
subsCounter = new PageSubscriptionCounterImpl(null, -1);
counters.put(queueIDForCounter, subsCounter);
}
return subsCounter;

View File

@ -524,4 +524,7 @@ public interface ActiveMQMessageBundle {
@Message(id = 229244, value = "Meters already registered for {}")
IllegalStateException metersAlreadyRegistered(String resource);
@Message(id = 229245, value = "Management controller is busy with another task. Please try again")
ActiveMQTimeoutException managementBusy();
}

View File

@ -111,6 +111,7 @@ public interface ActiveMQServer extends ServiceComponent {
STOPPED
}
AutoCloseable managementLock() throws Exception;
void setState(SERVER_STATE state);
@ -357,6 +358,14 @@ public interface ActiveMQServer extends ServiceComponent {
Map<SimpleString, RoutingType> prefixes,
String securityDomain) throws Exception;
/** should the server rebuild page counters upon startup.
* this will be useful on testing or an embedded broker scenario */
boolean isRebuildCounters();
/** should the server rebuild page counters upon startup.
* this will be useful on testing or an embedded broker scenario */
void setRebuildCounters(boolean rebuildCounters);
SecurityStore getSecurityStore();
void removeSession(String name) throws Exception;

View File

@ -472,9 +472,6 @@ public interface ActiveMQServerLogger {
@LogMessage(id = 222047, value = "Can not find queue {} while reloading ACKNOWLEDGE_CURSOR", level = LogMessage.Level.WARN)
void journalCannotFindQueueReloadingACK(Long queueID);
@LogMessage(id = 222048, value = "PAGE_CURSOR_COUNTER_VALUE record used on a prepared statement, invalid state", level = LogMessage.Level.WARN)
void journalPAGEOnPrepared();
@LogMessage(id = 222049, value = "InternalError: Record type {} not recognized. Maybe you are using journal files created on a different version", level = LogMessage.Level.WARN)
void journalInvalidRecordType(Byte recordType);

View File

@ -239,6 +239,9 @@ public class ActiveMQServerImpl implements ActiveMQServer {
private HAPolicy haPolicy;
// This will be useful on tests or embedded
private boolean rebuildCounters = true;
private volatile SERVER_STATE state = SERVER_STATE.STOPPED;
private final Version version;
@ -271,6 +274,10 @@ public class ActiveMQServerImpl implements ActiveMQServer {
private ReplayManager replayManager;
/** Certain management operations shouldn't use more than one thread.
* this semaphore is used to guarantee a single thread used. */
private final Semaphore managementSemaphore = new Semaphore(1);
/**
* This is a thread pool for io tasks only.
* We can't use the same global executor to avoid starvations.
@ -496,6 +503,16 @@ public class ActiveMQServerImpl implements ActiveMQServer {
return networkHealthCheck;
}
@Override
public void setRebuildCounters(boolean rebuildCounters) {
this.rebuildCounters = rebuildCounters;
}
@Override
public boolean isRebuildCounters() {
return this.rebuildCounters;
}
@Override
public void replay(Date start, Date end, String address, String target, String filter) throws Exception {
@ -1323,6 +1340,10 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}
}
if (!criticalIOError && pagingManager != null) {
pagingManager.counterSnapshot();
}
stopComponent(pagingManager);
if (storageManager != null)
@ -3323,6 +3344,10 @@ public class ActiveMQServerImpl implements ActiveMQServer {
JournalLoadInformation[] journalInfo = loadJournals();
if (rebuildCounters) {
pagingManager.rebuildCounters();
}
removeExtraAddressStores();
if (securityManager instanceof ActiveMQBasicSecurityManager) {
@ -4245,8 +4270,18 @@ public class ActiveMQServerImpl implements ActiveMQServer {
private final AtomicBoolean failedAlready = new AtomicBoolean();
@Override
public boolean isPreviouslyFailed() {
return failedAlready.get();
}
@Override
public synchronized void onIOException(Throwable cause, String message, String file) {
if (logger.isTraceEnabled()) {
// the purpose of this is to find where the critical error is being called at
// useful for when debugging where the critical error is being called at
logger.trace("Throwing critical error {}", cause.getMessage(), new Exception("trace"));
}
if (!failedAlready.compareAndSet(false, true)) {
return;
}
@ -4542,4 +4577,12 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}
}
@Override
public AutoCloseable managementLock() throws Exception {
if (!managementSemaphore.tryAcquire(1, TimeUnit.MINUTES)) {
throw ActiveMQMessageBundle.BUNDLE.managementBusy();
} else {
return managementSemaphore::release;
}
}
}

View File

@ -1725,7 +1725,13 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
if (pageSubscription != null) {
// messageReferences will have depaged messages which we need to discount from the counter as they are
// counted on the pageSubscription as well
return (long) pendingMetrics.getMessageCount() + getScheduledCount() + getDeliveringCount() + pageSubscription.getMessageCount();
long returnValue = (long) pendingMetrics.getMessageCount() + getScheduledCount() + getDeliveringCount() + pageSubscription.getMessageCount();
if (logger.isTraceEnabled()) {
logger.trace("Queue={}/{} returning getMessageCount returning {}. pendingMetrics.getMessageCount() = {}, getScheduledCount() = {}, pageSubscription.getMessageCount()={}, pageSubscription.getDeliveredCount()={}",
name, id, returnValue, pendingMetrics.getMessageCount(), getScheduledCount(), pageSubscription.getMessageCount(),
pageSubscription.getDeliveredCount());
}
return returnValue;
} else {
return (long) pendingMetrics.getMessageCount() + getScheduledCount() + getDeliveringCount();
}
@ -2279,6 +2285,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
public void destroyPaging() throws Exception {
// it could be null on embedded or certain unit tests
if (pageSubscription != null) {
if (logger.isTraceEnabled()) {
logger.trace("Destroying paging for {}", this.name, new Exception("trace"));
}
pageSubscription.destroy();
pageSubscription.cleanupEntries(true);
}

View File

@ -295,6 +295,7 @@ public abstract class ActiveMQTestBase extends Assert {
try {
DriverManager.getConnection("jdbc:derby:;shutdown=true");
} catch (Exception ignored) {
// it always throws an exception on shutdown
}
}
@ -878,7 +879,7 @@ public abstract class ActiveMQTestBase extends Assert {
return testDir;
}
private String getEmbeddedDataBaseName() {
protected String getEmbeddedDataBaseName() {
return "memory:" + getTestDir();
}
@ -2314,6 +2315,10 @@ public abstract class ActiveMQTestBase extends Assert {
}
protected int getMessageCount(final Queue queue) {
try {
Wait.waitFor(() -> queue.getPageSubscription().isCounterPending() == false);
} catch (Exception ignored) {
}
queue.flushExecutor();
return (int) queue.getMessageCount();
}

View File

@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package pageCounter
import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.core.server.Queue
import org.apache.activemq.artemis.tests.compatibility.GroovyRun
int messages = Integer.parseInt(arg[0]);
Queue queue = server.getJMSServerManager().getActiveMQServer().locateQueue(SimpleString.toSimpleString("queue"))
for (int i = 0; i < 20 && queue.getMessageCount() != messages; i++) {
Thread.sleep(100);
}
GroovyRun.assertEquals((int)messages, (int)queue.getMessageCount());

View File

@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package pageCounter
import org.apache.activemq.artemis.tests.compatibility.GroovyRun
import javax.jms.*
// starts an artemis server
String serverType = arg[0];
String protocol = arg[1];
int messages = Integer.parseInt(arg[2]);
// Can't depend directly on artemis, otherwise it wouldn't compile in hornetq
if (protocol != null && protocol.equals("AMQP")) {
GroovyRun.evaluate("clients/artemisClientAMQP.groovy", "serverArg", serverType, protocol);
} else {
GroovyRun.evaluate("clients/artemisClient.groovy", "serverArg", serverType);
}
Connection connection = cf.createConnection();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue destination = session.createQueue("queue")
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
for (int i = 0; i < messages; i++) {
TextMessage message = session.createTextMessage("Message " + i);
producer.send(message);
if (i % 100 == 0) {
session.commit();
}
}
session.commit();
connection.close();

View File

@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.compatibility;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.apache.activemq.artemis.tests.compatibility.base.VersionedBase;
import org.apache.activemq.artemis.utils.FileUtil;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.SNAPSHOT;
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.TWO_TWENTYTWO_ZERO;
@RunWith(Parameterized.class)
public class PagingCounterTest extends VersionedBase {
// this will ensure that all tests in this class are run twice,
// once with "true" passed to the class' constructor and once with "false"
@Parameterized.Parameters(name = "server={0}, producer={1}, consumer={2}")
public static Collection getParameters() {
// we don't need every single version ever released..
// if we keep testing current one against 2.4 and 1.4.. we are sure the wire and API won't change over time
List<Object[]> combinations = new ArrayList<>();
/*
// during development sometimes is useful to comment out the combinations
// and add the ones you are interested.. example:
*/
// combinations.add(new Object[]{SNAPSHOT, ONE_FIVE, ONE_FIVE});
// combinations.add(new Object[]{ONE_FIVE, ONE_FIVE, ONE_FIVE});
combinations.add(new Object[]{null, TWO_TWENTYTWO_ZERO, SNAPSHOT});
combinations.add(new Object[]{null, SNAPSHOT, TWO_TWENTYTWO_ZERO});
// the purpose on this one is just to validate the test itself.
/// if it can't run against itself it won't work at all
combinations.add(new Object[]{null, SNAPSHOT, SNAPSHOT});
return combinations;
}
public PagingCounterTest(String server, String sender, String receiver) throws Exception {
super(server, sender, receiver);
}
@Before
public void removeFolder() throws Throwable {
FileUtil.deleteDirectory(serverFolder.getRoot());
serverFolder.getRoot().mkdirs();
}
@After
public void tearDown() {
try {
stopServer(serverClassloader);
} catch (Throwable ignored) {
}
try {
stopServer(receiverClassloader);
} catch (Throwable ignored) {
}
}
@Test
public void testSendReceivePaging() throws Throwable {
setVariable(senderClassloader, "persistent", true);
startServer(serverFolder.getRoot(), senderClassloader, "pageCounter", null, true);
evaluate(senderClassloader, "journalcompatibility/forcepaging.groovy");
evaluate(senderClassloader, "pageCounter/sendMessages.groovy", server, "core", "1000");
evaluate(senderClassloader, "journalcompatibility/ispaging.groovy");
evaluate(senderClassloader, "pageCounter/checkMessages.groovy", "1000");
stopServer(senderClassloader);
setVariable(receiverClassloader, "persistent", true);
startServer(serverFolder.getRoot(), receiverClassloader, "pageCounter", null, false);
evaluate(receiverClassloader, "journalcompatibility/ispaging.groovy");
evaluate(receiverClassloader, "pageCounter/checkMessages.groovy", "1000");
}
}

View File

@ -159,7 +159,6 @@ public class BackupSyncJournalTest extends FailoverTestBase {
for (Pair<Long, Integer> pair : backupIds) {
totalBackup += pair.getB();
}
assertEquals("number of records must match ", total, totalBackup);
// "+ 2": there two other calls that send N_MSGS.
for (int i = 0; i < totalRounds + 3; i++) {

View File

@ -1715,6 +1715,11 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes
public boolean isEmbeddedWebServerStarted() {
return (boolean) proxy.retrieveAttributeValue("embeddedWebServerStarted");
}
@Override
public void rebuildPageCounters() throws Exception {
proxy.invokeOperation("rebuildPageCounters");
}
};
}

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.tests.integration.paging;
import java.lang.invoke.MethodHandles;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@ -39,9 +40,13 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class PageCountSyncOnNonTXTest extends SpawnedTestBase {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@Rule
public RetryRule retryRule = new RetryRule(1);
@ -151,7 +156,7 @@ public class PageCountSyncOnNonTXTest extends SpawnedTestBase {
}
}
} catch (Exception expected) {
expected.printStackTrace();
logger.info("expected exception {}", expected.toString(), expected);
}
} finally {

View File

@ -0,0 +1,289 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.paging;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.lang.invoke.MethodHandles;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.paging.cursor.impl.PageSubscriptionCounterImpl;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.artemis.utils.ReusableLatch;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class PageCounterRebuildTest extends ActiveMQTestBase {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@Test
public void testUnitSize() throws Exception {
AtomicInteger errors = new AtomicInteger(0);
StorageManager mockStorage = Mockito.mock(StorageManager.class);
PageSubscriptionCounterImpl nonPersistentPagingCounter = new PageSubscriptionCounterImpl(mockStorage, -1);
final int THREADS = 33;
final int ADD_VALUE = 7;
final int SIZE_VALUE = 17;
final int REPEAT = 777;
ExecutorService executorService = Executors.newFixedThreadPool(THREADS);
runAfter(executorService::shutdownNow);
CyclicBarrier startFlag = new CyclicBarrier(THREADS);
ReusableLatch latch = new ReusableLatch(THREADS);
for (int j = 0; j < THREADS; j++) {
executorService.execute(() -> {
try {
startFlag.await(10, TimeUnit.SECONDS);
for (int i = 0; i < REPEAT; i++) {
nonPersistentPagingCounter.increment(null, ADD_VALUE, SIZE_VALUE);
}
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
errors.incrementAndGet();
} finally {
latch.countDown();
}
});
}
Assert.assertTrue(latch.await(10, TimeUnit.SECONDS));
Assert.assertEquals(ADD_VALUE * THREADS * REPEAT, nonPersistentPagingCounter.getValue());
Assert.assertEquals(SIZE_VALUE * THREADS * REPEAT, nonPersistentPagingCounter.getPersistentSize());
latch.setCount(THREADS);
for (int j = 0; j < THREADS; j++) {
executorService.execute(() -> {
try {
startFlag.await(10, TimeUnit.SECONDS);
for (int i = 0; i < REPEAT; i++) {
nonPersistentPagingCounter.increment(null, -ADD_VALUE, -SIZE_VALUE);
}
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
errors.incrementAndGet();
} finally {
latch.countDown();
}
});
}
Assert.assertTrue(latch.await(10, TimeUnit.SECONDS));
Assert.assertEquals(0L, nonPersistentPagingCounter.getValue());
Assert.assertEquals(0L, nonPersistentPagingCounter.getPersistentSize());
Assert.assertEquals(0, errors.get());
}
@Test
public void testRebuildCounter() throws Exception {
ActiveMQServer server = createServer(true, true);
AddressSettings defaultSetting = new AddressSettings().setPageSizeBytes(100 * 1024).setMaxReadPageMessages(1);
server.getAddressSettingsRepository().addMatch("#", defaultSetting);
server.start();
String queueName = getName();
String nonConsumedQueueName = getName() + "_nonConsumed";
server.addAddressInfo(new AddressInfo(queueName).addRoutingType(RoutingType.MULTICAST));
server.createQueue(new QueueConfiguration(nonConsumedQueueName).setAddress(queueName).setRoutingType(RoutingType.MULTICAST));
server.createQueue(new QueueConfiguration(queueName).setRoutingType(RoutingType.MULTICAST));
Queue serverQueue = server.locateQueue(queueName);
Queue serverNonConsumedQueue = server.locateQueue(nonConsumedQueueName);
Assert.assertNotNull(serverQueue);
Assert.assertNotNull(serverNonConsumedQueue);
serverQueue.getPagingStore().startPaging();
final int THREADS = 4;
final int TX_SEND = 2000;
final int NON_TXT_SEND = 200;
final int CONSUME_MESSAGES = 200;
AtomicInteger errors = new AtomicInteger(0);
ExecutorService executorService = Executors.newFixedThreadPool(THREADS);
runAfter(executorService::shutdownNow);
CyclicBarrier startFlag = new CyclicBarrier(THREADS);
ReusableLatch latch = new ReusableLatch(THREADS);
for (int i = 0; i < THREADS; i++) {
final int threadNumber = i;
executorService.execute(() -> {
try {
startFlag.await(10, TimeUnit.SECONDS);
ConnectionFactory factory = CFUtil.createConnectionFactory("core", "tcp://localhost:61616");
try (Connection connection = factory.createConnection(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Session txSession = connection.createSession(true, Session.AUTO_ACKNOWLEDGE)) {
logger.info("sending thread {}", threadNumber);
javax.jms.Topic jmsQueue = session.createTopic(queueName);
MessageProducer producerNonTX = session.createProducer(jmsQueue);
MessageProducer producerTX = txSession.createProducer(jmsQueue);
for (int message = 0; message < NON_TXT_SEND; message++) {
TextMessage txtMessage = session.createTextMessage("hello" + message);
txtMessage.setBooleanProperty("first", false);
producerNonTX.send(session.createTextMessage("hello" + message));
}
for (int message = 0; message < TX_SEND; message++) {
producerTX.send(session.createTextMessage("helloTX" + message));
}
txSession.commit();
}
} catch (Throwable e) {
errors.incrementAndGet();
} finally {
latch.countDown();
}
});
}
// this should be fast on the CIs, but if you use a slow disk, it might take a few extra seconds.
Assert.assertTrue(latch.await(1, TimeUnit.MINUTES));
final int numberOfMessages = TX_SEND * THREADS + NON_TXT_SEND * THREADS;
Wait.assertEquals(numberOfMessages, serverQueue::getMessageCount);
ConnectionFactory factory = CFUtil.createConnectionFactory("core", "tcp://localhost:61616");
try (Connection connection = factory.createConnection(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);) {
MessageConsumer consumer = session.createConsumer(session.createQueue(queueName + "::" + queueName));
connection.start();
for (int i = 0; i < CONSUME_MESSAGES; i++) {
Message message = consumer.receive(5000);
Assert.assertNotNull(message);
}
}
Wait.assertEquals(numberOfMessages - CONSUME_MESSAGES, serverQueue::getMessageCount);
Wait.assertEquals(numberOfMessages, serverNonConsumedQueue::getMessageCount);
server.stop();
server.start();
serverQueue = server.locateQueue(queueName);
serverNonConsumedQueue = server.locateQueue(nonConsumedQueueName);
Wait.assertEquals(numberOfMessages - CONSUME_MESSAGES, serverQueue::getMessageCount);
Wait.assertEquals(numberOfMessages, serverNonConsumedQueue::getMessageCount);
serverQueue.getPageSubscription().getCounter().markRebuilding();
serverNonConsumedQueue.getPageSubscription().getCounter().markRebuilding();
// if though we are rebuilding, we are still returning based on the last recorded value until processing is finished
Assert.assertEquals(8600, serverQueue.getMessageCount());
Assert.assertEquals(8800, serverNonConsumedQueue.getMessageCount());
serverQueue.getPageSubscription().getCounter().finishRebuild();
serverNonConsumedQueue.getPageSubscription().getCounter().finishRebuild();
Assert.assertEquals(0, serverQueue.getMessageCount()); // we artificially made it 0 by faking a rebuild
Assert.assertEquals(0, serverNonConsumedQueue.getMessageCount()); // we artificially made it 0 by faking a rebuild
server.stop();
server.start();
serverQueue = server.locateQueue(queueName);
serverNonConsumedQueue = server.locateQueue(nonConsumedQueueName);
// after a rebuild, the counter should be back to where it was
Wait.assertEquals(numberOfMessages - CONSUME_MESSAGES, serverQueue::getMessageCount);
Wait.assertEquals(numberOfMessages, serverNonConsumedQueue::getMessageCount);
server.stop();
server.start();
serverQueue = server.locateQueue(queueName);
serverNonConsumedQueue = server.locateQueue(nonConsumedQueueName);
Assert.assertNotNull(serverQueue);
Assert.assertNotNull(serverNonConsumedQueue);
Wait.assertEquals(numberOfMessages - CONSUME_MESSAGES, serverQueue::getMessageCount);
Wait.assertEquals(numberOfMessages, serverNonConsumedQueue::getMessageCount);
server.stop();
// restarting the server to issue a rebuild on the counters
server.start();
logger.info("Consuming messages");
factory = CFUtil.createConnectionFactory("core", "tcp://localhost:61616");
try (Connection connection = factory.createConnection(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);) {
MessageConsumer consumer = session.createConsumer(session.createQueue(queueName + "::" + queueName));
connection.start();
for (int i = 0; i < numberOfMessages - CONSUME_MESSAGES; i++) {
Message message = consumer.receive(5000);
Assert.assertNotNull(message);
if (i % 100 == 0) {
logger.info("Received {} messages", i);
}
}
Assert.assertNull(consumer.receiveNoWait());
consumer.close();
consumer = session.createConsumer(session.createQueue(queueName + "::" + nonConsumedQueueName));
connection.start();
for (int i = 0; i < numberOfMessages; i++) {
Message message = consumer.receive(5000);
Assert.assertNotNull(message);
}
Assert.assertNull(consumer.receiveNoWait());
consumer.close();
}
serverQueue = server.locateQueue(queueName);
serverNonConsumedQueue = server.locateQueue(nonConsumedQueueName);
Wait.assertEquals(0L, serverQueue::getMessageCount, 1000, 100);
Wait.assertEquals(0L, serverNonConsumedQueue::getMessageCount, 1000, 100);
}
}

View File

@ -33,7 +33,6 @@ import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
import org.apache.activemq.artemis.core.paging.cursor.impl.PageSubscriptionCounterImpl;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
@ -164,6 +163,8 @@ public class PagingCounterTest extends ActiveMQTestBase {
server.stop();
server.setRebuildCounters(false);
server.start();
queue = server.locateQueue("A1");
@ -177,6 +178,70 @@ public class PagingCounterTest extends ActiveMQTestBase {
}
}
@Test
public void testMultiThreadCounter() throws Exception {
ClientSessionFactory sf = createSessionFactory(sl);
ClientSession session = sf.createSession();
try {
server.addAddressInfo(new AddressInfo(new SimpleString("A1"), RoutingType.ANYCAST));
Queue queue = server.createQueue(new QueueConfiguration(new SimpleString("A1")).setRoutingType(RoutingType.ANYCAST));
final PageSubscriptionCounter counter = locateCounter(queue);
final int THREADS = 10;
final CyclicBarrier flagStart = new CyclicBarrier(THREADS);
final CountDownLatch done = new CountDownLatch(THREADS);
final int BUMPS = 2000;
Assert.assertEquals(0, counter.getValue());
ExecutorService executorService = Executors.newFixedThreadPool(THREADS);
runAfter(executorService::shutdownNow);
for (int i = 0; i < THREADS; i++) {
executorService.execute(() -> {
try {
flagStart.await(10, TimeUnit.SECONDS);
for (int repeat = 0; repeat < BUMPS; repeat++) {
counter.increment(null, 1, 1L);
Transaction tx = new TransactionImpl(server.getStorageManager());
counter.increment(tx, 1, 1L);
tx.commit();
}
} catch (Exception e) {
logger.warn(e.getMessage(), e);
} finally {
done.countDown();
}
});
}
// it should take a couple seconds only
done.await(1, TimeUnit.MINUTES);
Wait.assertEquals((long)(BUMPS * 2 * THREADS), counter::getValue, 5000, 100);
server.stop();
server.setRebuildCounters(false);
server.start();
queue = server.locateQueue("A1");
final PageSubscriptionCounter counterAfterRestart = locateCounter(queue);
Wait.assertEquals((long)(BUMPS * 2 * THREADS), counterAfterRestart::getValue, 5000, 100);
Assert.assertEquals(BUMPS * 2 * THREADS, counterAfterRestart.getValue());
} finally {
sf.close();
session.close();
}
}
@Test
public void testCleanupCounter() throws Exception {
ClientSessionFactory sf = createSessionFactory(sl);
@ -216,6 +281,7 @@ public class PagingCounterTest extends ActiveMQTestBase {
server.stop();
server = newActiveMQServer();
server.setRebuildCounters(false);
server.start();
@ -228,6 +294,11 @@ public class PagingCounterTest extends ActiveMQTestBase {
assertEquals(2100, counter.getValue());
assertEquals(2100 * 1000, counter.getPersistentSize());
server.getPagingManager().rebuildCounters();
// it should be zero after rebuild, since no actual messages were sent
Wait.assertEquals(0, counter::getValue);
} finally {
sf.close();
session.close();
@ -246,8 +317,6 @@ public class PagingCounterTest extends ActiveMQTestBase {
PageSubscriptionCounter counter = locateCounter(queue);
((PageSubscriptionCounterImpl) counter).setPersistent(false);
StorageManager storage = server.getStorageManager();
Transaction tx = new TransactionImpl(server.getStorageManager());
@ -321,7 +390,9 @@ public class PagingCounterTest extends ActiveMQTestBase {
server.stop();
server = newActiveMQServer();
server.setRebuildCounters(false);
server.start();
@ -329,10 +400,29 @@ public class PagingCounterTest extends ActiveMQTestBase {
assertNotNull(queue);
counter = locateCounter(queue);
PageSubscriptionCounter counterAfterRestart = locateCounter(queue);
assertEquals(1, counter.getValue());
assertEquals(1000, counter.getPersistentSize());
Wait.assertEquals(1, counterAfterRestart::getValue);
Wait.assertEquals(1000, counterAfterRestart::getPersistentSize);
counterAfterRestart.markRebuilding();
// should be using a previously added value while rebuilding
Wait.assertEquals(1, counterAfterRestart::getValue);
tx = new TransactionImpl(server.getStorageManager());
counterAfterRestart.increment(tx, 10, 10_000);
tx.commit();
Wait.assertEquals(11, counterAfterRestart::getValue);
Wait.assertEquals(11_000, counterAfterRestart::getPersistentSize);
counterAfterRestart.finishRebuild();
server.getPagingManager().rebuildCounters();
Wait.assertEquals(0, counterAfterRestart::getValue);
Wait.assertEquals(0, counterAfterRestart::getPersistentSize);
}
@ -349,7 +439,7 @@ public class PagingCounterTest extends ActiveMQTestBase {
}
@Test
public void testPrepareCounter() throws Exception {
public void testCommitCounter() throws Exception {
Xid xid = newXID();
Queue queue = server.createQueue(new QueueConfiguration(new SimpleString("A1")).setRoutingType(RoutingType.ANYCAST));
@ -366,19 +456,19 @@ public class PagingCounterTest extends ActiveMQTestBase {
assertEquals(0, counter.getValue());
tx.prepare();
tx.commit();
storage.waitOnOperations();
assertEquals(0, counter.getValue());
assertEquals(2000, counter.getValue());
server.stop();
server = newActiveMQServer();
server.start();
server.setRebuildCounters(false);
storage = server.getStorageManager();
server.start();
queue = server.locateQueue(new SimpleString("A1"));
@ -386,16 +476,6 @@ public class PagingCounterTest extends ActiveMQTestBase {
counter = locateCounter(queue);
tx = server.getResourceManager().removeTransaction(xid, null);
assertNotNull(tx);
assertEquals(0, counter.getValue());
tx.commit(false);
storage.waitOnOperations();
Wait.assertEquals(2000, counter::getValue);
}

View File

@ -34,8 +34,6 @@ import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
@ -62,7 +60,6 @@ public class PagingSendTest extends ActiveMQTestBase {
@Before
public void setUp() throws Exception {
super.setUp();
Configuration config = new ConfigurationImpl();
server = newActiveMQServer();
server.start();

View File

@ -651,6 +651,7 @@ public final class ReplicationTest extends ActiveMQTestBase {
PagingManager paging = new PagingManagerImpl(new PagingStoreFactoryNIO(storageManager, configuration.getPagingLocation(), 1000, null, executorFactory, executorFactory, false, null), addressSettingsRepository, configuration.getManagementAddress());
paging.start();
runAfter(paging::stop);
return paging;
}

View File

@ -150,9 +150,9 @@ public class SharedNothingReplicationFlowControlTest extends ActiveMQTestBase {
ClientProducer producer = session.createProducer("flowcontrol");
ClientMessage message = session.createMessage(true);
message.writeBodyBufferBytes(body);
logger.info("try to send a message after replicated");
logger.debug("try to send a message after replicated");
producer.send(message);
logger.info("send message done");
logger.debug("send message done");
producer.close();
session.close();
@ -187,8 +187,8 @@ public class SharedNothingReplicationFlowControlTest extends ActiveMQTestBase {
if (!(info.userRecordType == JournalRecordIds.ADD_MESSAGE_PROTOCOL)) {
// ignore
}
logger.info("got live message {} {}", info.id, info.userRecordType);
liveJournalCounter.incrementAndGet();
logger.info("got live message {} {}, counter={}", info.id, info.userRecordType, liveJournalCounter.get());
}
});
@ -207,8 +207,8 @@ public class SharedNothingReplicationFlowControlTest extends ActiveMQTestBase {
if (!(info.userRecordType == JournalRecordIds.ADD_MESSAGE_PROTOCOL)) {
// ignore
}
logger.info("replicated message {}", info.id);
replicationCounter.incrementAndGet();
logger.info("replicated message {}, counter={}", info.id, replicationCounter.get());
}
});

View File

@ -245,6 +245,10 @@ public class PersistMultiThreadTest extends ActiveMQTestBase {
class FakePagingStore implements PagingStore {
@Override
public void counterSnapshot() {
}
@Override
public void execute(Runnable runnable) {
runnable.run();

View File

@ -61,6 +61,8 @@ public class PagingManagerImplTest extends ActiveMQTestBase {
managerImpl.start();
runAfter(managerImpl::stop);
PagingStore store = managerImpl.getPageStore(new SimpleString("simple-test"));
ICoreMessage msg = createMessage(1L, new SimpleString("simple-test"), createRandomBuffer(10));

View File

@ -39,6 +39,11 @@ public class FakePagingManager implements PagingManager {
}
@Override
public void counterSnapshot() {
}
@Override
public void addTransaction(final PageTransactionInfo pageTransaction) {
}