diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/PrintData.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/PrintData.java index 448c3b247c..4cae819e6c 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/PrintData.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/PrintData.java @@ -279,7 +279,7 @@ public class PrintData extends DBOption { folder = pgStore.getFolder(); out.println("####################################################################################################"); out.println("Exploring store " + store + " folder = " + folder); - int pgid = (int) pgStore.getFirstPage(); + long pgid = pgStore.getFirstPage(); out.println("Number of pages ::" + pgStore.getNumberOfPages() + ", Current writing page ::" + pgStore.getCurrentWritingPage()); for (int pg = 0; pg < pgStore.getNumberOfPages(); pg++) { diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java index 2256e05058..248f44d4a6 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java @@ -2003,5 +2003,8 @@ public interface ActiveMQServerControl { @Attribute(desc = "Whether the embedded web server is started") boolean isEmbeddedWebServerStarted(); + + @Attribute(desc = "Scan all paged destinations to rebuild the page counters") + void rebuildPageCounters() throws Exception; } diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/AbstractJDBCDriver.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/AbstractJDBCDriver.java index cd1600e1a2..2e445e3b3e 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/AbstractJDBCDriver.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/AbstractJDBCDriver.java @@ -67,6 +67,9 @@ public abstract class AbstractJDBCDriver { } public void destroy() throws Exception { + if (logger.isTraceEnabled()) { + logger.trace("dropping {}", sqlProvider.getTableName(), new Exception("trace")); + } final String dropTableSql = "DROP TABLE " + sqlProvider.getTableName(); try (Connection connection = connectionProvider.getConnection()) { try { diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java index 4b03f91779..96375af025 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java @@ -95,8 +95,10 @@ public class JDBCSequentialFile implements SequentialFile { try { return fileFactory.listFiles(extension).contains(filename); } catch (Exception e) { - logger.warn(e.getMessage(), e); - fileFactory.onIOError(e, "Error checking JDBC file exists.", this); + logger.debug(e.getMessage(), e); + // this shouldn't throw a critical IO Error + // as if the destination does not exists (ot table store removed), the table will not exist and + // we may get a SQL Exception return false; } } @@ -114,7 +116,9 @@ public class JDBCSequentialFile implements SequentialFile { return true; } catch (SQLException e) { isLoaded.set(false); - fileFactory.onIOError(e, "Error attempting to open JDBC file.", this); + // should not throw exceptions, as we drop the table on queue.destroy. + // storage.exists could be called for non existing pages during async cleanup and they are + // just supposed to return false } return false; } @@ -158,7 +162,9 @@ public class JDBCSequentialFile implements SequentialFile { } } } catch (SQLException e) { - fileFactory.onIOError(e, "Error deleting JDBC file.", this); + // file is already gone from a drop somewhere + logger.debug("Expected error deleting Sequential File", e); + return; } } diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java index 6bceb5a7c1..e55fb4d03b 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java @@ -141,7 +141,9 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM try { return dbDriver.listFiles(extension); } catch (SQLException e) { - criticalErrorListener.onIOException(e, "Error listing JDBC files.", null); + // We can't throw critical error here + // exists will call listfiles, and if the store does not exists + // it should simply return false throw e; } } diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFile.java index 044d9162b7..3d9d52dc25 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFile.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFile.java @@ -96,6 +96,9 @@ public abstract class AbstractSequentialFile implements SequentialFile { @Override public final void delete() throws IOException, InterruptedException, ActiveMQException { try { + if (logger.isTraceEnabled()) { + logger.trace("Deleting {}", this.getFileName(), new Exception("trace")); + } if (isOpen()) { close(false, false); } diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/IOCriticalErrorListener.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/IOCriticalErrorListener.java index aeb24d3736..2f28196a07 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/IOCriticalErrorListener.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/IOCriticalErrorListener.java @@ -16,10 +16,11 @@ */ package org.apache.activemq.artemis.core.io; -/** - * TODO Merge this with IOExceptionListener - */ public interface IOCriticalErrorListener { + default boolean isPreviouslyFailed() { + return false; + } + void onIOException(Throwable code, String message, String file); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java index 2b541583ec..6d1f4341da 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java @@ -45,6 +45,7 @@ import java.util.Map.Entry; import java.util.Set; import java.util.TreeSet; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -4619,6 +4620,15 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active } } + @Override + public void rebuildPageCounters() throws Exception { + // managementLock will guarantee there's only one management operation being called + try (AutoCloseable lock = server.managementLock()) { + Future task = server.getPagingManager().rebuildCounters(); + task.get(); + } + } + private ServiceComponent getEmbeddedWebServerComponent() throws ActiveMQIllegalStateException { for (ActiveMQComponent component : server.getExternalComponents()) { if (component instanceof WebServerComponentMarker) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingManager.java index 89ff513566..80b16cdb2d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingManager.java @@ -17,6 +17,8 @@ package org.apache.activemq.artemis.core.paging; import java.util.Map; +import java.util.concurrent.Future; +import java.util.function.BiConsumer; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.server.ActiveMQComponent; @@ -141,6 +143,7 @@ public interface PagingManager extends ActiveMQComponent, HierarchicalRepository */ void checkMemory(Runnable runWhenAvailable); + void counterSnapshot(); /** * Use this when you have no refernce of an address. (anonymous AMQP Producers for example) @@ -157,4 +160,15 @@ public interface PagingManager extends ActiveMQComponent, HierarchicalRepository default long getMaxMessages() { return 0; } + + /** + * Rebuilds all page counters for destinations that are paging in the background. + */ + default Future rebuildCounters() { + return null; + } + + default void forEachTransaction(BiConsumer transactionConsumer) { + } + } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java index 20b62def9d..da680fce6b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java @@ -24,6 +24,7 @@ import org.apache.activemq.artemis.api.core.RefCountMessageListener; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider; import org.apache.activemq.artemis.core.paging.impl.Page; +import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.replication.ReplicationManager; import org.apache.activemq.artemis.core.server.ActiveMQComponent; import org.apache.activemq.artemis.core.server.RouteContextList; @@ -129,6 +130,9 @@ public interface PagingStore extends ActiveMQComponent, RefCountMessageListener Page getCurrentPage(); + /** it will save snapshots on the counters */ + void counterSnapshot(); + /** * @return true if paging was started, or false if paging was already started before this call */ @@ -220,4 +224,8 @@ public interface PagingStore extends ActiveMQComponent, RefCountMessageListener void block(); void unblock(); + + default StorageManager getStorageManager() { + return null; + } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/ConsumedPage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/ConsumedPage.java index 1f1ae7b4cb..7eb5b7bb94 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/ConsumedPage.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/ConsumedPage.java @@ -17,6 +17,8 @@ package org.apache.activemq.artemis.core.paging.cursor; +import java.util.function.BiConsumer; + // this is to expose PageSubscriptionImpl::PageCursorInfo public interface ConsumedPage { @@ -24,4 +26,8 @@ public interface ConsumedPage { boolean isDone(); + boolean isAck(int messageNumber); + + void forEachAck(BiConsumer ackConsumer); + } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCursorProvider.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCursorProvider.java index a6f1714a47..d3f25e21b5 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCursorProvider.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCursorProvider.java @@ -16,6 +16,8 @@ */ package org.apache.activemq.artemis.core.paging.cursor; +import java.util.function.Consumer; + import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.paging.PagedMessage; @@ -32,12 +34,16 @@ public interface PageCursorProvider { */ PageSubscription getSubscription(long queueId); + void forEachSubscription(Consumer consumer); + PageSubscription createSubscription(long queueId, Filter filter, boolean durable); void processReload() throws Exception; void stop(); + void counterSnapshot(); + void flushExecutors(); void scheduleCleanup(); @@ -56,4 +62,8 @@ public interface PageCursorProvider { */ void close(PageSubscription pageCursorImpl); + void counterRebuildStarted(); + + void counterRebuildDone(); + } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java index 50d7fae5d0..17a743f111 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java @@ -35,6 +35,9 @@ public interface PageSubscription { // To be called before the server is down void stop(); + /** Save a snapshot of the current counter value in the journal */ + void counterSnapshot(); + /** * This is a callback to inform the PageSubscription that something was routed, so the empty flag can be cleared */ @@ -46,6 +49,8 @@ public interface PageSubscription { long getMessageCount(); + boolean isCounterPending(); + long getPersistentSize(); long getId(); @@ -170,4 +175,6 @@ public interface PageSubscription { void incrementDeliveredSize(long size); void removePendingDelivery(PagedMessage pagedMessage); + + ConsumedPage locatePageInfo(long pageNr); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscriptionCounter.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscriptionCounter.java index 75eedfa9ee..56614a4871 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscriptionCounter.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscriptionCounter.java @@ -16,7 +16,6 @@ */ package org.apache.activemq.artemis.core.paging.cursor; -import org.apache.activemq.artemis.core.paging.impl.Page; import org.apache.activemq.artemis.core.transaction.Transaction; public interface PageSubscriptionCounter { @@ -36,7 +35,13 @@ public interface PageSubscriptionCounter { void loadInc(long recordInd, int add, long persistentSize); - void applyIncrementOnTX(Transaction tx, long recordID, int add, long persistentSize); + void applyIncrementOnTX(Transaction tx, int add, long persistentSize); + + void markRebuilding(); + + void finishRebuild(); + + boolean isRebuilding(); /** * This will process the reload @@ -46,12 +51,12 @@ public interface PageSubscriptionCounter { // used when deleting the counter void delete() throws Exception; - void pendingCounter(Page page, int increment, long persistentSize) throws Exception; + void snapshot(); // used when leaving page mode, so the counters are deleted in batches // for each queue on the address void delete(Transaction tx) throws Exception; - void cleanupNonTXCounters(long pageID) throws Exception; + PageSubscriptionCounter setSubscription(PageSubscription subscription); } \ No newline at end of file diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/BasePagingCounter.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/BasePagingCounter.java new file mode 100644 index 0000000000..3ba540bdf5 --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/BasePagingCounter.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.paging.cursor.impl; + +import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter; + +public abstract class BasePagingCounter implements PageSubscriptionCounter { + + private volatile boolean rebuilding = false; + + @Override + public void markRebuilding() { + rebuilding = true; + } + + @Override + public void finishRebuild() { + rebuilding = false; + } + + @Override + public boolean isRebuilding() { + return rebuilding; + } + +} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCounterRebuildManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCounterRebuildManager.java new file mode 100644 index 0000000000..92a5ac5d39 --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCounterRebuildManager.java @@ -0,0 +1,327 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.paging.cursor.impl; + +import io.netty.util.collection.IntObjectHashMap; +import io.netty.util.collection.LongObjectHashMap; +import org.apache.activemq.artemis.core.paging.PagedMessage; +import org.apache.activemq.artemis.core.paging.PagingStore; +import org.apache.activemq.artemis.core.paging.cursor.ConsumedPage; +import org.apache.activemq.artemis.core.paging.cursor.PagePosition; +import org.apache.activemq.artemis.core.paging.cursor.PageSubscription; +import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter; +import org.apache.activemq.artemis.core.paging.impl.Page; +import org.apache.activemq.artemis.core.persistence.StorageManager; +import org.apache.activemq.artemis.utils.collections.LinkedList; +import org.apache.activemq.artemis.utils.collections.LinkedListIterator; +import org.apache.activemq.artemis.utils.collections.LongHashSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.invoke.MethodHandles; +import java.util.function.BiConsumer; + +/** this class will copy current data from the Subscriptions, count messages while the server is already active + * performing other activity */ +public class PageCounterRebuildManager implements Runnable { + + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + private final PagingStore pgStore; + private final StorageManager sm; + private final LongHashSet transactions; + private boolean paging; + private long limitPageId; + private int limitMessageNr; + private LongObjectHashMap copiedSubscriptionMap = new LongObjectHashMap<>(); + + + public PageCounterRebuildManager(PagingStore store, LongHashSet transactions) { + // we make a copy of the data because we are allowing data to influx. We will consolidate the values at the end + initialize(store); + this.pgStore = store; + this.sm = store.getStorageManager(); + this.transactions = transactions; + } + /** this method will perform the copy from Acked recorded from the subscription into a separate data structure. + * So we can count data while we consolidate at the end */ + private void initialize(PagingStore store) { + store.lock(-1); + try { + try { + paging = store.isPaging(); + if (!paging) { + logger.debug("Destination {} was not paging, no need to rebuild counters"); + store.getCursorProvider().forEachSubscription(subscription -> { + subscription.getCounter().markRebuilding(); + subscription.getCounter().finishRebuild(); + }); + + store.getCursorProvider().counterRebuildDone(); + return; + } + store.getCursorProvider().counterRebuildStarted(); + Page currentPage = store.getCurrentPage(); + limitPageId = store.getCurrentWritingPage(); + limitMessageNr = currentPage.getNumberOfMessages(); + if (logger.isDebugEnabled()) { + logger.debug("PageCounterRebuild for {}, Current writing page {} and limit will be {} with lastMessage on last page={}", store.getStoreName(), store.getCurrentWritingPage(), limitPageId, limitMessageNr); + } + } catch (Exception e) { + logger.warn(e.getMessage(), e); + limitPageId = store.getCurrentWritingPage(); + } + logger.trace("Copying page store ack information from address {}", store.getAddress()); + store.getCursorProvider().forEachSubscription(subscription -> { + if (logger.isTraceEnabled()) { + logger.trace("Copying subscription ID {}", subscription.getId()); + } + + CopiedSubscription copiedSubscription = new CopiedSubscription(subscription); + copiedSubscription.subscriptionCounter.markRebuilding(); + copiedSubscriptionMap.put(subscription.getId(), copiedSubscription); + + subscription.forEachConsumedPage(consumedPage -> { + if (logger.isTraceEnabled()) { + logger.trace("Copying page {}", consumedPage.getPageId()); + } + + CopiedConsumedPage copiedConsumedPage = new CopiedConsumedPage(); + copiedSubscription.consumedPageMap.put(consumedPage.getPageId(), copiedConsumedPage); + if (consumedPage.isDone()) { + if (logger.isTraceEnabled()) { + logger.trace("Marking page {} as done on the copy", consumedPage.getPageId()); + } + copiedConsumedPage.done = true; + } else { + // We only copy the acks if the page is not done + // as if the page is done, we just move over + consumedPage.forEachAck((messageNR, pagePosition) -> { + if (logger.isTraceEnabled()) { + logger.trace("Marking messageNR {} as acked on pageID={} copy", messageNR, consumedPage.getPageId()); + } + if (copiedConsumedPage.acks == null) { + copiedConsumedPage.acks = new IntObjectHashMap<>(); + } + copiedConsumedPage.acks.put(messageNR, Boolean.TRUE); + }); + } + }); + }); + } finally { + store.unlock(); + } + } + + private synchronized PageSubscriptionCounter getCounter(long queueID) { + CopiedSubscription copiedSubscription = copiedSubscriptionMap.get(queueID); + if (copiedSubscription != null) { + return copiedSubscription.subscriptionCounter; + } else { + return null; + } + } + + private CopiedSubscription getSubscription(long queueID) { + return copiedSubscriptionMap.get(queueID); + } + + private boolean isACK(long queueID, long pageNR, int messageNR) { + CopiedSubscription subscription = getSubscription(queueID); + if (subscription == null) { + return true; + } + + CopiedConsumedPage consumedPage = subscription.getPage(pageNR); + if (consumedPage == null) { + return false; + } else { + return consumedPage.isAck(messageNR); + } + } + + private void done() { + copiedSubscriptionMap.forEach((k, copiedSubscription) -> { + if (!copiedSubscription.empty) { + copiedSubscription.subscription.notEmpty(); + try { + copiedSubscription.subscriptionCounter.increment(null, copiedSubscription.addUp, copiedSubscription.sizeUp); + } catch (Exception e) { + logger.warn(e.getMessage(), e); + } + } + if (!copiedSubscription.empty) { + copiedSubscription.subscription.notEmpty(); + } + if (copiedSubscription.subscriptionCounter != null) { + copiedSubscription.subscriptionCounter.finishRebuild(); + } + }); + pgStore.getCursorProvider().counterRebuildDone(); + pgStore.getCursorProvider().scheduleCleanup(); + } + + @Override + public void run() { + try { + rebuild(); + } catch (Exception e) { + logger.warn(e.getMessage(), e); + } + } + + public void rebuild() throws Exception { + if (pgStore == null) { + logger.debug("Page store is null during rebuildCounters"); + return; + } + + if (!paging) { + logger.debug("Ignoring call to rebuild pgStore {}", pgStore.getAddress()); + } + + logger.debug("Rebuilding counter for store {}", pgStore.getAddress()); + + for (long pgid = pgStore.getFirstPage(); pgid <= limitPageId; pgid++) { + if (logger.isDebugEnabled()) { + logger.debug("Rebuilding counter on messages from page {} on rebuildCounters for address {}", pgid, pgStore.getAddress()); + } + Page page = pgStore.newPageObject(pgid); + + if (!page.getFile().exists()) { + if (logger.isDebugEnabled()) { + logger.debug("Skipping page {} on store {}", pgid, pgStore.getAddress()); + } + continue; + } + page.open(false); + LinkedList msgs = page.read(sm); + page.close(false, false); + + try (LinkedListIterator iter = msgs.iterator()) { + while (iter.hasNext()) { + PagedMessage msg = iter.next(); + if (limitPageId == pgid) { + if (msg.getMessageNumber() >= limitMessageNr) { + if (logger.isDebugEnabled()) { + logger.debug("Rebuild counting on {} reached the last message at {}-{}", pgStore.getAddress(), limitPageId, limitMessageNr); + } + // this is the limit where we should count.. + // anything beyond this will be new data + break; + } + } + msg.initMessage(sm); + long[] routedQueues = msg.getQueueIDs(); + + if (logger.isTraceEnabled()) { + logger.trace("reading message for rebuild cursor on address={}, pg={}, messageNR={}, routedQueues={}, message={}, queueLIst={}", pgStore.getAddress(), msg.getPageNumber(), msg.getMessageNumber(), routedQueues, msg, routedQueues); + } + for (long queueID : routedQueues) { + boolean ok = !isACK(queueID, msg.getPageNumber(), msg.getMessageNumber()); + + boolean txOK = msg.getTransactionID() <= 0 || transactions == null || transactions.contains(msg.getTransactionID()); + + if (!txOK) { + logger.debug("TX is not ok for {}", msg); + } + + if (ok && txOK) { // not acked and TX is ok + if (logger.isTraceEnabled()) { + logger.trace("Message pageNumber={}/{} NOT acked on queue {}", msg.getPageNumber(), msg.getMessageNumber(), queueID); + } + CopiedSubscription copiedSubscription = copiedSubscriptionMap.get(queueID); + if (copiedSubscription != null) { + copiedSubscription.empty = false; + copiedSubscription.addUp++; + copiedSubscription.sizeUp += msg.getPersistentSize(); + } + } else { + if (logger.isTraceEnabled()) { + logger.trace("Message pageNumber={}/{} IS acked on queue {}", msg.getPageNumber(), msg.getMessageNumber(), queueID); + } + } + } + } + } + } + + logger.debug("Counter rebuilding done for address {}", pgStore.getAddress()); + + done(); + + } + + private static class CopiedSubscription { + CopiedSubscription(PageSubscription subscription) { + this.subscriptionCounter = subscription.getCounter(); + this.subscription = subscription; + } + + private boolean empty = true; + + LongObjectHashMap consumedPageMap = new LongObjectHashMap<>(); + + // this is not a copy! This will be the actual object listed in the PageSubscription + // any changes to this object will reflect in the system and management; + PageSubscriptionCounter subscriptionCounter; + + PageSubscription subscription; + + CopiedConsumedPage getPage(long pageNr) { + return consumedPageMap.get(pageNr); + } + + int addUp; + long sizeUp; + + } + + private static class CopiedConsumedPage implements ConsumedPage { + boolean done; + IntObjectHashMap acks; + + @Override + public long getPageId() { + throw new RuntimeException("method not implemented"); + } + + @Override + public void forEachAck(BiConsumer ackConsumer) { + throw new RuntimeException("method not implemented"); + } + + @Override + public boolean isDone() { + return done; + } + + @Override + public boolean isAck(int messageNumber) { + if (done) { + return true; + } + if (acks != null) { + return acks.get(messageNumber) != null; + } + return false; + } + } + + +} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java index 242fbcb14e..24d65970f8 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java @@ -28,6 +28,7 @@ import org.apache.activemq.artemis.core.paging.PagedMessage; import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider; import org.apache.activemq.artemis.core.paging.cursor.PageSubscription; +import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter; import org.apache.activemq.artemis.core.paging.cursor.PagedReference; import org.apache.activemq.artemis.core.paging.cursor.PagedReferenceImpl; import org.apache.activemq.artemis.core.paging.impl.Page; @@ -42,6 +43,7 @@ import org.apache.activemq.artemis.utils.collections.LongHashSet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.lang.invoke.MethodHandles; +import java.util.function.Consumer; public class PageCursorProviderImpl implements PageCursorProvider { @@ -54,6 +56,10 @@ public class PageCursorProviderImpl implements PageCursorProvider { protected volatile boolean cleanupEnabled = true; + // We can't call cleanup before counters were rebuilt + // as they will determine if a subscription is empty or not + protected volatile boolean countersRebuilt = true; + protected final PagingStore pagingStore; protected final StorageManager storageManager; @@ -85,16 +91,30 @@ public class PageCursorProviderImpl implements PageCursorProvider { throw new IllegalStateException("Cursor " + cursorID + " had already been created"); } - PageSubscription activeCursor = new PageSubscriptionImpl(this, pagingStore, storageManager, filter, cursorID, persistent); + + PageSubscriptionCounter subscriptionCounter = createPageCounter(cursorID, persistent); + PageSubscription activeCursor = new PageSubscriptionImpl(this, pagingStore, storageManager, filter, cursorID, persistent, subscriptionCounter); + + activeCursors.put(cursorID, activeCursor); return activeCursor; } + + private PageSubscriptionCounter createPageCounter(long cursorID, boolean persistent) { + return new PageSubscriptionCounterImpl(storageManager, cursorID); + } + @Override public synchronized PageSubscription getSubscription(long cursorID) { return activeCursors.get(cursorID); } + @Override + public void forEachSubscription(Consumer consumer) { + activeCursors.forEach((k, v) -> consumer.accept(v)); + } + @Override public PagedReference newReference(final PagedMessage msg, final PageSubscription subscription) { @@ -139,6 +159,13 @@ public class PageCursorProviderImpl implements PageCursorProvider { } } + @Override + public void counterSnapshot() { + for (PageSubscription cursor : activeCursors.values()) { + cursor.counterSnapshot(); + } + } + @Override public void flushExecutors() { pagingStore.flushExecutors(); @@ -216,6 +243,11 @@ public class PageCursorProviderImpl implements PageCursorProvider { protected void cleanup() { + if (!countersRebuilt) { + logger.debug("Counters were not rebuilt yet, cleanup has to be ignored on address {}", pagingStore != null ? pagingStore.getAddress() : "NULL"); + return; + } + ArrayList depagedPages = new ArrayList<>(); LongHashSet depagedPagesSet = new LongHashSet(); @@ -506,6 +538,10 @@ public class PageCursorProviderImpl implements PageCursorProvider { private long checkMinPage(Collection cursorList) { long minPage = Long.MAX_VALUE; + if (logger.isTraceEnabled()) { + logger.trace("Min page cursorList size {} on {}", cursorList.size(), pagingStore.getAddress(), new Exception("trace")); + } + for (PageSubscription cursor : cursorList) { long firstPage = cursor.getFirstPage(); if (logger.isTraceEnabled()) { @@ -543,4 +579,14 @@ public class PageCursorProviderImpl implements PageCursorProvider { } } } + + @Override + public void counterRebuildStarted() { + this.countersRebuilt = false; + } + + @Override + public void counterRebuildDone() { + this.countersRebuilt = true; + } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionCounterImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionCounterImpl.java index 6ca73feb19..f580be47f5 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionCounterImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionCounterImpl.java @@ -16,19 +16,12 @@ */ package org.apache.activemq.artemis.core.paging.cursor.impl; -import java.util.ArrayList; -import java.util.HashMap; import java.util.LinkedList; -import java.util.Map; -import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; -import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLongFieldUpdater; -import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.paging.cursor.PageSubscription; import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter; -import org.apache.activemq.artemis.core.paging.impl.Page; import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.transaction.Transaction; @@ -44,173 +37,117 @@ import java.lang.invoke.MethodHandles; /** * This class will encapsulate the persistent counters for the PagingSubscription */ -public class PageSubscriptionCounterImpl implements PageSubscriptionCounter { +public class PageSubscriptionCounterImpl extends BasePagingCounter { private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - private static final int FLUSH_COUNTER = 1000; - private final long subscriptionID; // the journal record id that is holding the current value private long recordID = -1; - private boolean persistent; + /** while we rebuild the counters, we will use the recordedValues */ + private volatile long recordedValue = -1; + private static final AtomicLongFieldUpdater recordedValueUpdater = AtomicLongFieldUpdater.newUpdater(PageSubscriptionCounterImpl.class, "recordedValue"); - private final PageSubscription subscription; + /** while we rebuild the counters, we will use the recordedValues */ + private volatile long recordedSize = -1; + private static final AtomicLongFieldUpdater recordedSizeUpdater = AtomicLongFieldUpdater.newUpdater(PageSubscriptionCounterImpl.class, "recordedSize"); + + private PageSubscription subscription; private final StorageManager storage; - private final AtomicLong value = new AtomicLong(0); - private final AtomicLong persistentSize = new AtomicLong(0); + private volatile long value; + private static final AtomicLongFieldUpdater valueUpdater = AtomicLongFieldUpdater.newUpdater(PageSubscriptionCounterImpl.class, "value"); - private final AtomicLong added = new AtomicLong(0); - private final AtomicLong addedPersistentSize = new AtomicLong(0); + private volatile long persistentSize; + private static final AtomicLongFieldUpdater persistentSizeUpdater = AtomicLongFieldUpdater.newUpdater(PageSubscriptionCounterImpl.class, "persistentSize"); - private final AtomicLong pendingValue = new AtomicLong(0); - private final AtomicLong pendingPersistentSize = new AtomicLong(0); + private volatile long added; + private static final AtomicLongFieldUpdater addedUpdater = AtomicLongFieldUpdater.newUpdater(PageSubscriptionCounterImpl.class, "added"); - private final LinkedList incrementRecords = new LinkedList<>(); - - // We are storing pending counters for non transactional writes on page - // we will recount a page case we still see pending records - // as soon as we close a page we remove these records replacing by a regular page increment record - // A Map per pageID, each page will have a set of IDs, with the increment on each one - private final Map pendingCounters = new HashMap<>(); + private volatile long addedPersistentSize; + private static final AtomicLongFieldUpdater addedPersistentSizeUpdater = AtomicLongFieldUpdater.newUpdater(PageSubscriptionCounterImpl.class, "addedPersistentSize"); private LinkedList loadList; - private final Executor pageExecutor; - public PageSubscriptionCounterImpl(final StorageManager storage, - final PageSubscription subscription, - final boolean persistent, final long subscriptionID) { this.subscriptionID = subscriptionID; this.storage = storage; - this.persistent = persistent; - this.subscription = subscription; - if (subscription == null) { - this.pageExecutor = null; - } else { - this.pageExecutor = subscription.getPagingStore().getExecutor(); - assert pageExecutor != null; + } + + @Override + public void markRebuilding() { + if (logger.isDebugEnabled()) { + logger.debug("Subscription {} marked for rebuilding", subscriptionID); } + super.markRebuilding(); + recordedSizeUpdater.set(this, persistentSizeUpdater.get(this)); + recordedValueUpdater.set(this, recordedValueUpdater.get(this)); + try { + reset(); + } catch (Exception e) { + logger.warn(e.getMessage(), e); + } + } + + @Override + public void finishRebuild() { + super.finishRebuild(); + if (logger.isDebugEnabled()) { + logger.debug("Subscription {} finished rebuilding", subscriptionID); + } + snapshot(); + addedUpdater.set(this, valueUpdater.get(this)); + addedPersistentSizeUpdater.set(this, persistentSizeUpdater.get(this)); } @Override public long getValueAdded() { - return added.get() + pendingValue.get(); + return addedUpdater.get(this); } @Override public long getValue() { - return value.get() + pendingValue.get(); + if (isRebuilding()) { + if (logger.isTraceEnabled()) { + logger.trace("returning getValue from isPending on subscription {}, recordedValue={}, addedUpdater={}", recordedValueUpdater.get(this), addedUpdater.get(this)); + } + return recordedValueUpdater.get(this); + } + if (logger.isTraceEnabled()) { + logger.trace("returning regular getValue subscription {}, value={}", subscriptionID, valueUpdater.get(this)); + } + return valueUpdater.get(this); } @Override public long getPersistentSizeAdded() { - return addedPersistentSize.get() + pendingPersistentSize.get(); + return addedPersistentSizeUpdater.get(this); } @Override public long getPersistentSize() { - return persistentSize.get() + pendingPersistentSize.get(); - } - - /** - * This is used only on non transactional paging - * - * @param page - * @param increment - * @throws Exception - */ - @Override - public synchronized void pendingCounter(Page page, int increment, long size) throws Exception { - if (!persistent) { - return; // nothing to be done + if (isRebuilding()) { + if (logger.isTraceEnabled()) { + logger.trace("returning getPersistentSize from isPending on subscription {}, recordedSize={}. addedSize={}", subscriptionID, recordedSizeUpdater.get(this), addedPersistentSizeUpdater.get(this)); + } + return recordedSizeUpdater.get(this); } - - assert page != null; - - PendingCounter pendingInfo = pendingCounters.get((long) page.getPageId()); - if (pendingInfo == null) { - // We have to make sure this is sync here - // not syncing this to disk may cause the page files to be out of sync on pages. - // we can't afford the case where a page file is written without a record here - long id = storage.storePendingCounter(this.subscriptionID, page.getPageId()); - pendingInfo = new PendingCounter(id, increment, size); - pendingCounters.put((long) page.getPageId(), pendingInfo); - } else { - pendingInfo.addAndGet(increment, size); - } - - pendingValue.addAndGet(increment); - pendingPersistentSize.addAndGet(size); - - page.addPendingCounter(this); - } - - /** - * Cleanup temporary page counters on non transactional paged messages - * - * @param pageID - */ - @Override - public void cleanupNonTXCounters(final long pageID) throws Exception { - PendingCounter pendingInfo; - synchronized (this) { - pendingInfo = pendingCounters.remove(pageID); - } - - if (pendingInfo != null) { - final int valueCleaned = pendingInfo.getCount(); - final long valueSizeCleaned = pendingInfo.getPersistentSize(); - Transaction tx = new TransactionImpl(storage); - storage.deletePendingPageCounter(tx.getID(), pendingInfo.getId()); - - // To apply the increment of the value just being cleaned - increment(tx, valueCleaned, valueSizeCleaned); - - tx.addOperation(new TransactionOperationAbstract() { - @Override - public void afterCommit(Transaction tx) { - pendingValue.addAndGet(-valueCleaned); - pendingPersistentSize.updateAndGet(val -> val >= valueSizeCleaned ? val - valueSizeCleaned : 0); - } - }); - - tx.commit(); + if (logger.isTraceEnabled()) { + logger.trace("returning regular getPersistentSize subscription {}, value={}", subscriptionID, persistentSizeUpdater.get(this)); } + return persistentSizeUpdater.get(this); } @Override public void increment(Transaction tx, int add, long size) throws Exception { if (tx == null) { - if (persistent) { - long id = storage.storePageCounterInc(this.subscriptionID, add, size); - storage.getContext().executeOnCompletion(new IOCallback() { - @Override - public void done() { - process(id, add, size); - } - - @Override - public void onError(int errorCode, String errorMessage) { - - } - }); - } else { - process(-1, add, size); - } + process(add, size); } else { - if (persistent) { - tx.setContainsPersistent(); - long id = storage.storePageCounterInc(tx.getID(), this.subscriptionID, add, size); - applyIncrementOnTX(tx, id, add, size); - } else { - applyIncrementOnTX(tx, -1, add, size); - } + applyIncrementOnTX(tx, add, size); } } @@ -218,11 +155,10 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter { * This method will install the TXs * * @param tx - * @param recordID1 * @param add */ @Override - public void applyIncrementOnTX(Transaction tx, long recordID1, int add, long size) { + public void applyIncrementOnTX(Transaction tx, int add, long size) { CounterOperations oper = (CounterOperations) tx.getProperty(TransactionPropertyIndexes.PAGE_COUNT_INC); if (oper == null) { @@ -231,27 +167,36 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter { tx.addOperation(oper); } - oper.operations.add(new ItemOper(this, recordID1, add, size)); + oper.operations.add(new ItemOper(this, add, size)); } @Override - public synchronized void loadValue(final long recordID1, final long value1, long size) { - if (this.subscription != null) { - // it could be null on testcases... which is ok - this.subscription.notEmpty(); + public synchronized void loadValue(final long recordID, final long value, long size) { + if (logger.isDebugEnabled()) { + logger.debug("Counter for subscription {} reloading recordID={}, value={}, size={}", this.subscriptionID, recordID, value, size); } - this.value.set(value1); - this.added.set(value1); - this.persistentSize.set(size); - this.addedPersistentSize.set(size); - this.recordID = recordID1; + this.recordID = recordID; + recordedValueUpdater.set(this, value); + recordedSizeUpdater.set(this, size); + valueUpdater.set(this, value); + persistentSizeUpdater.set(this, size); + addedUpdater.set(this, size); } - private void process(long id, int add, long size) { - if (id >= 0 && pageExecutor != null) { - pageExecutor.execute(() -> doIncrement(id, add, size)); - } else { - doIncrement(-1, add, size); + private void process(int add, long size) { + if (logger.isTraceEnabled()) { + logger.trace("process subscription={} add={}, size={}", subscriptionID, add, size); + } + valueUpdater.addAndGet(this, add); + persistentSizeUpdater.addAndGet(this, size); + if (add > 0) { + addedUpdater.addAndGet(this, add); + addedPersistentSizeUpdater.addAndGet(this, size); + } + + if (isRebuilding()) { + recordedValueUpdater.addAndGet(this, value); + recordedSizeUpdater.addAndGet(this, size); } } @@ -264,24 +209,39 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter { tx.commit(); } + private void reset() throws Exception { + Transaction tx = new TransactionImpl(storage); + + delete(tx, true); + + tx.commit(); + } + @Override public void delete(Transaction tx) throws Exception { + delete(tx, false); + } + + private void delete(Transaction tx, boolean keepZero) throws Exception { + if (logger.isDebugEnabled()) { + logger.debug("Subscription {} delete, keepZero={}", subscriptionID, keepZero); + } // always lock the StorageManager first. try (ArtemisCloseable lock = storage.closeableReadLock()) { synchronized (this) { - for (Long record : incrementRecords) { - storage.deleteIncrementRecord(tx.getID(), record.longValue()); - tx.setContainsPersistent(); - } - if (recordID >= 0) { storage.deletePageCounter(tx.getID(), this.recordID); tx.setContainsPersistent(); } - recordID = -1; - value.set(0); - incrementRecords.clear(); + if (keepZero) { + recordID = storage.storePageCounter(tx.getID(), subscriptionID, 0L, 0L); + } else { + recordID = -1; + } + + valueUpdater.set(this, 0); + persistentSizeUpdater.set(this, 0); } } } @@ -298,110 +258,101 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter { @Override public void processReload() { if (loadList != null) { - if (subscription != null) { - // it could be null on testcases - subscription.notEmpty(); - } - - for (PendingCounter incElement : loadList) { - value.addAndGet(incElement.getCount()); - added.addAndGet(incElement.getCount()); - persistentSize.addAndGet(incElement.getPersistentSize()); - addedPersistentSize.addAndGet(incElement.getPersistentSize()); - incrementRecords.add(incElement.getId()); + try { + long tx = -1L; + logger.debug("Removing increment records on cursor {}", subscriptionID); + for (PendingCounter incElement : loadList) { + if (tx < 0) { + tx = storage.generateID(); + } + storage.deletePageCounter(tx, incElement.id); + } + if (tx >= 0) { + storage.commit(tx); + } + } catch (Exception e) { + logger.warn(e.getMessage(), e); } loadList.clear(); loadList = null; } } - // you need to call this method from the executors when id > 0 - private void doIncrement(long id, int variance, long size) { - value.addAndGet(variance); - this.persistentSize.addAndGet(size); - if (variance > 0) { - added.addAndGet(variance); - } - if (size > 0) { - addedPersistentSize.addAndGet(size); - } - if (id >= 0) { - synchronized (this) { - incrementRecords.add(id); - if (incrementRecords.size() > FLUSH_COUNTER) { - this.cleanup(); - } - } - } - } - - /** - * used on testing only - */ - public void setPersistent(final boolean persistent) { - this.persistent = persistent; - } - /** * This method should always be called from a single threaded executor */ - protected synchronized void cleanup() { - if (incrementRecords.size() <= FLUSH_COUNTER) { + @Override + public synchronized void snapshot() { + if (isRebuilding()) { + if (logger.isDebugEnabled()) { + logger.debug("snapshot call ignored as cursor is being rebuilt for {}", subscriptionID); + } return; } - long valueReplace = value.get(); - long sizeReplace = persistentSize.get(); - ArrayList deleteList = new ArrayList<>(incrementRecords); - incrementRecords.clear(); + if (!storage.isStarted()) { + logger.debug("Storage is not active, ignoring snapshot call on {}", subscriptionID); + return; + } + + long valueReplace = valueUpdater.get(this); + long sizeReplace = persistentSizeUpdater.get(this); long newRecordID = -1; - long txCleanup = storage.generateID(); + long txCleanup = -1; try { - for (Long value1 : deleteList) { - storage.deleteIncrementRecord(txCleanup, value1); - } - if (recordID >= 0) { + if (txCleanup < 0) { + txCleanup = storage.generateID(); + } storage.deletePageCounter(txCleanup, recordID); + recordID = -1; } - newRecordID = storage.storePageCounter(txCleanup, subscriptionID, valueReplace, sizeReplace); - - if (logger.isTraceEnabled()) { - logger.trace("Replacing page-counter record = {} by record = {} on subscriptionID = {} for queue = {}", - recordID, newRecordID, subscriptionID, subscription.getQueue().getName()); + if (valueReplace > 0) { + if (txCleanup < 0) { + txCleanup = storage.generateID(); + } + newRecordID = storage.storePageCounter(txCleanup, subscriptionID, valueReplace, sizeReplace); } - storage.commit(txCleanup); + if (logger.isDebugEnabled()) { + logger.debug("Replacing page-counter record = {} by record = {} on subscriptionID = {} for queue = {}, value = {}, size = {}", + recordID, newRecordID, subscriptionID, subscription.getQueue().getName(), valueReplace, sizeReplace); + } + + if (txCleanup >= 0) { + storage.commit(txCleanup); + } } catch (Exception e) { newRecordID = recordID; ActiveMQServerLogger.LOGGER.problemCleaningPagesubscriptionCounter(e); - try { - storage.rollback(txCleanup); - } catch (Exception ignored) { + if (txCleanup >= 0) { + try { + storage.rollback(txCleanup); + } catch (Exception ignored) { + } } } finally { recordID = newRecordID; + recordedValueUpdater.set(this, valueReplace); + recordedSizeUpdater.set(this, sizeReplace); } } private static class ItemOper { - private ItemOper(PageSubscriptionCounterImpl counter, long id, int add, long persistentSize) { + private ItemOper(PageSubscriptionCounterImpl counter, int add, long persistentSize) { this.counter = counter; - this.id = id; this.amount = add; this.persistentSize = persistentSize; } PageSubscriptionCounterImpl counter; - long id; - int amount; long persistentSize; @@ -414,7 +365,7 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter { @Override public void afterCommit(Transaction tx) { for (ItemOper oper : operations) { - oper.counter.process(oper.id, oper.amount, oper.persistentSize); + oper.counter.process(oper.amount, oper.persistentSize); } } } @@ -465,4 +416,10 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter { SIZE_UPDATER.addAndGet(this, persistentSize); } } + + @Override + public PageSubscriptionCounter setSubscription(PageSubscription subscription) { + this.subscription = subscription; + return this; + } } \ No newline at end of file diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java index cdbd1dba72..a0ebc3a9c4 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java @@ -28,6 +28,7 @@ import java.util.TreeMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.BiConsumer; import java.util.function.BooleanSupplier; import java.util.function.Consumer; import java.util.function.ToIntFunction; @@ -273,14 +274,17 @@ public final class PageSubscriptionImpl implements PageSubscription { final StorageManager store, final Filter filter, final long cursorId, - final boolean persistent) { + final boolean persistent, + final PageSubscriptionCounter counter) { + assert counter != null; this.pageStore = pageStore; this.store = store; this.cursorProvider = cursorProvider; this.cursorId = cursorId; this.filter = filter; this.persistent = persistent; - this.counter = new PageSubscriptionCounterImpl(store, this, persistent, cursorId); + this.counter = counter; + this.counter.setSubscription(this); } @@ -346,6 +350,11 @@ public final class PageSubscriptionImpl implements PageSubscription { } } + @Override + public boolean isCounterPending() { + return counter.isRebuilding(); + } + @Override public long getPersistentSize() { if (empty) { @@ -418,10 +427,8 @@ public final class PageSubscriptionImpl implements PageSubscription { @Override public void onPageModeCleared(Transaction tx) throws Exception { - if (counter != null) { - // this could be null on testcases - counter.delete(tx); - } + // this could be null on testcases + counter.delete(tx); this.empty = true; } @@ -746,9 +753,9 @@ public final class PageSubscriptionImpl implements PageSubscription { } @Override - public void forEachConsumedPage(Consumer pageCleaner) { + public void forEachConsumedPage(Consumer pageConsumer) { synchronized (consumedPages) { - consumedPages.values().forEach(pageCleaner); + consumedPages.values().forEach(pageConsumer); } } @@ -860,6 +867,11 @@ public final class PageSubscriptionImpl implements PageSubscription { public void stop() { } + @Override + public void counterSnapshot() { + counter.snapshot(); + } + @Override public void printDebug() { printDebug(toString()); @@ -912,6 +924,7 @@ public final class PageSubscriptionImpl implements PageSubscription { return getPageInfo(pos.getPageNr()); } + @Override public PageCursorInfo locatePageInfo(final long pageNr) { synchronized (consumedPages) { return consumedPages.get(pageNr); @@ -1064,10 +1077,16 @@ public final class PageSubscriptionImpl implements PageSubscription { // expressions private final AtomicInteger confirmed = new AtomicInteger(0); + @Override public synchronized boolean isAck(int messageNumber) { return completePage != null || acks.get(messageNumber) != null; } + @Override + public void forEachAck(BiConsumer ackConsumer) { + acks.forEach(ackConsumer); + } + @Override public String toString() { try { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java index 6a0551b430..8e919fb670 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java @@ -17,7 +17,6 @@ package org.apache.activemq.artemis.core.paging.impl; import java.nio.ByteBuffer; -import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; @@ -26,13 +25,11 @@ import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.io.SequentialFileFactory; import org.apache.activemq.artemis.core.paging.PagedMessage; -import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter; import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.LargeServerMessage; import org.apache.activemq.artemis.utils.ReferenceCounterUtil; -import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet; import org.apache.activemq.artemis.utils.collections.EmptyList; import org.apache.activemq.artemis.utils.collections.LinkedList; import org.apache.activemq.artemis.utils.collections.LinkedListImpl; @@ -86,11 +83,6 @@ public final class Page { private final SimpleString storeName; - /** - * A list of subscriptions containing pending counters (with non tx adds) on this page - */ - private Set pendingCounters; - private ByteBuffer readFileBuffer; public Page(final SimpleString storeName, @@ -241,13 +233,6 @@ public final class Page { storageManager.pageClosed(storeName, pageId); } file.close(waitSync, waitSync); - - Set counters = getPendingCounters(); - if (counters != null) { - for (PageSubscriptionCounter counter : counters) { - counter.cleanupNonTXCounters(this.getPageId()); - } - } } public boolean delete(final LinkedList messages) throws Exception { @@ -255,7 +240,9 @@ public final class Page { storageManager.pageDeleted(storeName, pageId); } - if (logger.isDebugEnabled()) { + if (logger.isTraceEnabled()) { + logger.trace("Deleting pageNr={} on store {}", pageId, storeName, new Exception("trace")); + } else if (logger.isDebugEnabled()) { logger.debug("Deleting pageNr={} on store {}", pageId, storeName); } @@ -373,24 +360,4 @@ public final class Page { return file; } - /** - * This will indicate a page that will need to be called on cleanup when the page has been closed and confirmed - * - * @param pageSubscriptionCounter - */ - public void addPendingCounter(PageSubscriptionCounter pageSubscriptionCounter) { - getOrCreatePendingCounters().add(pageSubscriptionCounter); - } - - private synchronized Set getPendingCounters() { - return pendingCounters; - } - - private synchronized Set getOrCreatePendingCounters() { - if (pendingCounters == null) { - pendingCounters = new ConcurrentHashSet<>(); - } - - return pendingCounters; - } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java index 59db0da45f..6bec335949 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java @@ -24,6 +24,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executor; +import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -32,6 +34,7 @@ import org.apache.activemq.artemis.core.paging.PageTransactionInfo; import org.apache.activemq.artemis.core.paging.PagingManager; import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.paging.PagingStoreFactory; +import org.apache.activemq.artemis.core.paging.cursor.impl.PageCounterRebuildManager; import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.files.FileStoreMonitor; @@ -40,14 +43,16 @@ import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.utils.ByteUtil; import org.apache.activemq.artemis.utils.SizeAwareMetric; import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet; +import org.apache.activemq.artemis.utils.collections.LongHashSet; import org.apache.activemq.artemis.utils.runnables.AtomicRunnable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.lang.invoke.MethodHandles; +import java.util.function.BiConsumer; public final class PagingManagerImpl implements PagingManager { - private static final int ARTEMIS_DEBUG_PAGING_INTERVAL = Integer.valueOf(System.getProperty("artemis.debug.paging.interval", "0")); + private static final int ARTEMIS_PAGING_COUNTER_SNAPSHOT_INTERVAL = Integer.valueOf(System.getProperty("artemis.paging.counter.snapshot.interval", "60")); private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); @@ -92,13 +97,13 @@ public final class PagingManagerImpl implements PagingManager { private volatile long diskTotalSpace = 0; - private final Executor memoryExecutor; + private final Executor managerExecutor; private final Queue memoryCallback = new ConcurrentLinkedQueue<>(); private final ConcurrentMap transactions = new ConcurrentHashMap<>(); - private ActiveMQScheduledComponent scheduledComponent = null; + private ActiveMQScheduledComponent snapshotUpdater = null; private final SimpleString managementAddress; @@ -127,7 +132,7 @@ public final class PagingManagerImpl implements PagingManager { globalSizeMetric.setElementsEnabled(maxMessages >= 0); globalSizeMetric.setOverCallback(() -> setGlobalFull(true)); globalSizeMetric.setUnderCallback(() -> setGlobalFull(false)); - this.memoryExecutor = pagingSPI.newExecutor(); + this.managerExecutor = pagingSPI.newExecutor(); this.managementAddress = managementAddress; } @@ -205,8 +210,8 @@ public final class PagingManagerImpl implements PagingManager { protected void checkMemoryRelease() { if (!diskFull && (maxSize < 0 || !globalFull) && !blockedStored.isEmpty()) { if (!memoryCallback.isEmpty()) { - if (memoryExecutor != null) { - memoryExecutor.execute(this::memoryReleased); + if (managerExecutor != null) { + managerExecutor.execute(this::memoryReleased); } else { memoryReleased(); } @@ -368,8 +373,8 @@ public final class PagingManagerImpl implements PagingManager { PagingStore oldStore = stores.remove(store.getStoreName()); if (oldStore != null) { oldStore.stop(); - oldStore = null; } + store.getCursorProvider().counterRebuildStarted(); // TODO-NOW-DONT-MERGE maybe this should be removed store.start(); stores.put(store.getStoreName(), store); } @@ -466,28 +471,38 @@ public final class PagingManagerImpl implements PagingManager { reloadStores(); - if (ARTEMIS_DEBUG_PAGING_INTERVAL > 0) { - this.scheduledComponent = new ActiveMQScheduledComponent(pagingStoreFactory.getScheduledExecutor(), pagingStoreFactory.newExecutor(), ARTEMIS_DEBUG_PAGING_INTERVAL, TimeUnit.SECONDS, false) { + if (ARTEMIS_PAGING_COUNTER_SNAPSHOT_INTERVAL > 0) { + this.snapshotUpdater = new ActiveMQScheduledComponent(pagingStoreFactory.getScheduledExecutor(), pagingStoreFactory.newExecutor(), ARTEMIS_PAGING_COUNTER_SNAPSHOT_INTERVAL, TimeUnit.SECONDS, false) { @Override public void run() { - debug(); + try { + logger.debug("Updating counter snapshots"); + counterSnapshot(); + } catch (Throwable e) { + logger.warn(e.getMessage(), e); + } } }; - this.scheduledComponent.start(); + this.snapshotUpdater.start(); } started = true; + } finally { unlock(); } } - public void debug() { - logger.info("size = {} bytes, messages = {}", globalSizeMetric.getSize(), globalSizeMetric.getElements()); + @Override + public void counterSnapshot() { + for (PagingStore store : stores.values()) { + store.counterSnapshot(); + } } + @Override public synchronized void stop() throws Exception { if (!started) { @@ -495,9 +510,9 @@ public final class PagingManagerImpl implements PagingManager { } started = false; - if (scheduledComponent != null) { - this.scheduledComponent.stop(); - this.scheduledComponent = null; + if (snapshotUpdater != null) { + this.snapshotUpdater.stop(); + this.snapshotUpdater = null; } lock(); @@ -548,4 +563,26 @@ public final class PagingManagerImpl implements PagingManager { syncLock.writeLock().lock(); } + @Override + public void forEachTransaction(BiConsumer transactionConsumer) { + transactions.forEach(transactionConsumer); + } + + @Override + public Future rebuildCounters() { + LongHashSet transactionsSet = new LongHashSet(); + transactions.forEach((txId, tx) -> { + transactionsSet.add(txId); + }); + stores.forEach((address, pgStore) -> { + PageCounterRebuildManager rebuildManager = new PageCounterRebuildManager(pgStore, transactionsSet); + logger.debug("Setting destination {} to rebuild counters", address); + managerExecutor.execute(rebuildManager); + }); + + FutureTask task = new FutureTask<>(() -> null); + managerExecutor.execute(task); + + return task; + } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java index ca3fd43a71..81d10b9691 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java @@ -380,33 +380,39 @@ public class PagingStoreImpl implements PagingStore { } @Override - public synchronized void stop() throws Exception { - if (running) { - cursorProvider.flushExecutors(); - cursorProvider.stop(); + public void counterSnapshot() { + cursorProvider.counterSnapshot(); + } - final List pendingTasks = new ArrayList<>(); + @Override + public void stop() throws Exception { + synchronized (this) { + if (running) { + cursorProvider.stop(); + running = false; + } else { + return; + } + } - // TODO we could have a parameter to use this - final int pendingTasksWhileShuttingDown = executor.shutdownNow(pendingTasks::add, 30, TimeUnit.SECONDS); - if (pendingTasksWhileShuttingDown > 0) { - logger.trace("Try executing {} pending tasks on stop", pendingTasksWhileShuttingDown); - for (Runnable pendingTask : pendingTasks) { - try { - pendingTask.run(); - } catch (Throwable t) { - logger.warn("Error while executing a pending task on shutdown", t); - } + final List pendingTasks = new ArrayList<>(); + + final int pendingTasksWhileShuttingDown = executor.shutdownNow(pendingTasks::add, 30, TimeUnit.SECONDS); + if (pendingTasksWhileShuttingDown > 0) { + logger.trace("Try executing {} pending tasks on stop", pendingTasksWhileShuttingDown); + for (Runnable pendingTask : pendingTasks) { + try { + pendingTask.run(); + } catch (Throwable t) { + logger.warn("Error while executing a pending task on shutdown", t); } } + } - running = false; - - final Page page = currentPage; - if (page != null) { - page.close(false); - currentPage = null; - } + final Page page = currentPage; + if (page != null) { + page.close(false); + currentPage = null; } } @@ -424,10 +430,13 @@ public class PagingStoreImpl implements PagingStore { public void flushExecutors() { FutureLatch future = new FutureLatch(); - executor.execute(future); + try { + executor.execute(future); - if (!future.await(60000)) { - ActiveMQServerLogger.LOGGER.pageStoreTimeout(address); + if (!future.await(60000)) { + ActiveMQServerLogger.LOGGER.pageStoreTimeout(address); + } + } catch (Exception ignored) { } } @@ -1122,14 +1131,7 @@ public class PagingStoreImpl implements PagingStore { List durableQueues = ctx.getDurableQueues(); List nonDurableQueues = ctx.getNonDurableQueues(); for (org.apache.activemq.artemis.core.server.Queue q : durableQueues) { - if (tx == null) { - // non transactional writes need an intermediate place - // to avoid the counter getting out of sync - q.getPageSubscription().getCounter().pendingCounter(page, 1, size); - } else { - // null tx is treated through pending counters - q.getPageSubscription().getCounter().increment(tx, 1, size); - } + q.getPageSubscription().getCounter().increment(tx, 1, size); } for (org.apache.activemq.artemis.core.server.Queue q : nonDurableQueues) { @@ -1408,5 +1410,9 @@ public class PagingStoreImpl implements PagingStore { usedPages.forEachUsedPage(consumerPage); } + @Override + public StorageManager getStorageManager() { + return storageManager; + } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java index d7ca55888f..5157103ff9 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java @@ -22,7 +22,6 @@ import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalR import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADD_LARGE_MESSAGE_PENDING; import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.DUPLICATE_ID; import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.PAGE_CURSOR_COUNTER_INC; -import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.PAGE_CURSOR_COUNTER_VALUE; import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.SET_SCHEDULED_DELIVERY_TIME; import java.security.InvalidParameterException; @@ -1692,7 +1691,11 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp @Override public synchronized boolean isStarted() { - return started; + if (ioCriticalErrorListener != null) { + return started && !ioCriticalErrorListener.isPreviouslyFailed(); + } else { + return started; + } } /** @@ -1895,25 +1898,14 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp } break; } - case PAGE_CURSOR_COUNTER_VALUE: { - ActiveMQServerLogger.LOGGER.journalPAGEOnPrepared(); - - break; - } - case PAGE_CURSOR_COUNTER_INC: { PageCountRecordInc encoding = new PageCountRecordInc(); encoding.decode(buff); - PageSubscription sub = locateSubscription(encoding.getQueueID(), pageSubscriptions, queueInfos, pagingManager); + logger.debug("Page cursor counter inc on a prepared TX."); - if (sub != null) { - sub.getCounter().applyIncrementOnTX(tx, record.id, encoding.getValue(), encoding.getPersistentSize()); - sub.notEmpty(); - } else { - ActiveMQServerLogger.LOGGER.journalCannotFindQueueReloadingACK(encoding.getQueueID()); - } + // TODO: do I need to remove the record on commit? break; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java index 5a585fe8bb..19840061b1 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java @@ -535,7 +535,7 @@ public final class DescribeJournal { PageSubscriptionCounterImpl subsCounter; subsCounter = counters.get(queueIDForCounter); if (subsCounter == null) { - subsCounter = new PageSubscriptionCounterImpl(null, null, false, -1); + subsCounter = new PageSubscriptionCounterImpl(null, -1); counters.put(queueIDForCounter, subsCounter); } return subsCounter; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java index aadb9ce633..2c2f632d93 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java @@ -524,4 +524,7 @@ public interface ActiveMQMessageBundle { @Message(id = 229244, value = "Meters already registered for {}") IllegalStateException metersAlreadyRegistered(String resource); + + @Message(id = 229245, value = "Management controller is busy with another task. Please try again") + ActiveMQTimeoutException managementBusy(); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java index bfc0feea5a..57a02aa305 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java @@ -111,6 +111,7 @@ public interface ActiveMQServer extends ServiceComponent { STOPPED } + AutoCloseable managementLock() throws Exception; void setState(SERVER_STATE state); @@ -357,6 +358,14 @@ public interface ActiveMQServer extends ServiceComponent { Map prefixes, String securityDomain) throws Exception; + /** should the server rebuild page counters upon startup. + * this will be useful on testing or an embedded broker scenario */ + boolean isRebuildCounters(); + + /** should the server rebuild page counters upon startup. + * this will be useful on testing or an embedded broker scenario */ + void setRebuildCounters(boolean rebuildCounters); + SecurityStore getSecurityStore(); void removeSession(String name) throws Exception; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java index 2f007976e3..1feee23ec0 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java @@ -472,9 +472,6 @@ public interface ActiveMQServerLogger { @LogMessage(id = 222047, value = "Can not find queue {} while reloading ACKNOWLEDGE_CURSOR", level = LogMessage.Level.WARN) void journalCannotFindQueueReloadingACK(Long queueID); - @LogMessage(id = 222048, value = "PAGE_CURSOR_COUNTER_VALUE record used on a prepared statement, invalid state", level = LogMessage.Level.WARN) - void journalPAGEOnPrepared(); - @LogMessage(id = 222049, value = "InternalError: Record type {} not recognized. Maybe you are using journal files created on a different version", level = LogMessage.Level.WARN) void journalInvalidRecordType(Byte recordType); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index af278ebfd9..d9a1f7bc21 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -239,6 +239,9 @@ public class ActiveMQServerImpl implements ActiveMQServer { private HAPolicy haPolicy; + // This will be useful on tests or embedded + private boolean rebuildCounters = true; + private volatile SERVER_STATE state = SERVER_STATE.STOPPED; private final Version version; @@ -271,6 +274,10 @@ public class ActiveMQServerImpl implements ActiveMQServer { private ReplayManager replayManager; + /** Certain management operations shouldn't use more than one thread. + * this semaphore is used to guarantee a single thread used. */ + private final Semaphore managementSemaphore = new Semaphore(1); + /** * This is a thread pool for io tasks only. * We can't use the same global executor to avoid starvations. @@ -496,6 +503,16 @@ public class ActiveMQServerImpl implements ActiveMQServer { return networkHealthCheck; } + @Override + public void setRebuildCounters(boolean rebuildCounters) { + this.rebuildCounters = rebuildCounters; + } + + @Override + public boolean isRebuildCounters() { + return this.rebuildCounters; + } + @Override public void replay(Date start, Date end, String address, String target, String filter) throws Exception { @@ -1323,6 +1340,10 @@ public class ActiveMQServerImpl implements ActiveMQServer { } } + if (!criticalIOError && pagingManager != null) { + pagingManager.counterSnapshot(); + } + stopComponent(pagingManager); if (storageManager != null) @@ -3323,6 +3344,10 @@ public class ActiveMQServerImpl implements ActiveMQServer { JournalLoadInformation[] journalInfo = loadJournals(); + if (rebuildCounters) { + pagingManager.rebuildCounters(); + } + removeExtraAddressStores(); if (securityManager instanceof ActiveMQBasicSecurityManager) { @@ -4245,8 +4270,18 @@ public class ActiveMQServerImpl implements ActiveMQServer { private final AtomicBoolean failedAlready = new AtomicBoolean(); + @Override + public boolean isPreviouslyFailed() { + return failedAlready.get(); + } + @Override public synchronized void onIOException(Throwable cause, String message, String file) { + if (logger.isTraceEnabled()) { + // the purpose of this is to find where the critical error is being called at + // useful for when debugging where the critical error is being called at + logger.trace("Throwing critical error {}", cause.getMessage(), new Exception("trace")); + } if (!failedAlready.compareAndSet(false, true)) { return; } @@ -4542,4 +4577,12 @@ public class ActiveMQServerImpl implements ActiveMQServer { } } + @Override + public AutoCloseable managementLock() throws Exception { + if (!managementSemaphore.tryAcquire(1, TimeUnit.MINUTES)) { + throw ActiveMQMessageBundle.BUNDLE.managementBusy(); + } else { + return managementSemaphore::release; + } + } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index 488109c83e..e990162503 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -1725,7 +1725,13 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { if (pageSubscription != null) { // messageReferences will have depaged messages which we need to discount from the counter as they are // counted on the pageSubscription as well - return (long) pendingMetrics.getMessageCount() + getScheduledCount() + getDeliveringCount() + pageSubscription.getMessageCount(); + long returnValue = (long) pendingMetrics.getMessageCount() + getScheduledCount() + getDeliveringCount() + pageSubscription.getMessageCount(); + if (logger.isTraceEnabled()) { + logger.trace("Queue={}/{} returning getMessageCount returning {}. pendingMetrics.getMessageCount() = {}, getScheduledCount() = {}, pageSubscription.getMessageCount()={}, pageSubscription.getDeliveredCount()={}", + name, id, returnValue, pendingMetrics.getMessageCount(), getScheduledCount(), pageSubscription.getMessageCount(), + pageSubscription.getDeliveredCount()); + } + return returnValue; } else { return (long) pendingMetrics.getMessageCount() + getScheduledCount() + getDeliveringCount(); } @@ -2279,6 +2285,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { public void destroyPaging() throws Exception { // it could be null on embedded or certain unit tests if (pageSubscription != null) { + if (logger.isTraceEnabled()) { + logger.trace("Destroying paging for {}", this.name, new Exception("trace")); + } pageSubscription.destroy(); pageSubscription.cleanupEntries(true); } diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java index 9b18e29ed7..fb04c2b2a4 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java @@ -295,6 +295,7 @@ public abstract class ActiveMQTestBase extends Assert { try { DriverManager.getConnection("jdbc:derby:;shutdown=true"); } catch (Exception ignored) { + // it always throws an exception on shutdown } } @@ -878,7 +879,7 @@ public abstract class ActiveMQTestBase extends Assert { return testDir; } - private String getEmbeddedDataBaseName() { + protected String getEmbeddedDataBaseName() { return "memory:" + getTestDir(); } @@ -2314,6 +2315,10 @@ public abstract class ActiveMQTestBase extends Assert { } protected int getMessageCount(final Queue queue) { + try { + Wait.waitFor(() -> queue.getPageSubscription().isCounterPending() == false); + } catch (Exception ignored) { + } queue.flushExecutor(); return (int) queue.getMessageCount(); } diff --git a/tests/compatibility-tests/src/main/resources/pageCounter/checkMessages.groovy b/tests/compatibility-tests/src/main/resources/pageCounter/checkMessages.groovy new file mode 100644 index 0000000000..c3508890e0 --- /dev/null +++ b/tests/compatibility-tests/src/main/resources/pageCounter/checkMessages.groovy @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package pageCounter + +import org.apache.activemq.artemis.api.core.SimpleString +import org.apache.activemq.artemis.core.server.Queue +import org.apache.activemq.artemis.tests.compatibility.GroovyRun + +int messages = Integer.parseInt(arg[0]); + +Queue queue = server.getJMSServerManager().getActiveMQServer().locateQueue(SimpleString.toSimpleString("queue")) +for (int i = 0; i < 20 && queue.getMessageCount() != messages; i++) { + Thread.sleep(100); + +} +GroovyRun.assertEquals((int)messages, (int)queue.getMessageCount()); \ No newline at end of file diff --git a/tests/compatibility-tests/src/main/resources/pageCounter/sendMessages.groovy b/tests/compatibility-tests/src/main/resources/pageCounter/sendMessages.groovy new file mode 100644 index 0000000000..20f9dccc2d --- /dev/null +++ b/tests/compatibility-tests/src/main/resources/pageCounter/sendMessages.groovy @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package pageCounter + +import org.apache.activemq.artemis.tests.compatibility.GroovyRun + +import javax.jms.* + +// starts an artemis server +String serverType = arg[0]; +String protocol = arg[1]; +int messages = Integer.parseInt(arg[2]); + +// Can't depend directly on artemis, otherwise it wouldn't compile in hornetq +if (protocol != null && protocol.equals("AMQP")) { + GroovyRun.evaluate("clients/artemisClientAMQP.groovy", "serverArg", serverType, protocol); +} else { + GroovyRun.evaluate("clients/artemisClient.groovy", "serverArg", serverType); +} + +Connection connection = cf.createConnection(); +Session session = connection.createSession(true, Session.SESSION_TRANSACTED); +Queue destination = session.createQueue("queue") + +MessageProducer producer = session.createProducer(destination); +producer.setDeliveryMode(DeliveryMode.PERSISTENT); + +for (int i = 0; i < messages; i++) { + TextMessage message = session.createTextMessage("Message " + i); + producer.send(message); + if (i % 100 == 0) { + session.commit(); + } +} + +session.commit(); + +connection.close(); \ No newline at end of file diff --git a/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/PagingCounterTest.java b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/PagingCounterTest.java new file mode 100644 index 0000000000..ae96f5e85c --- /dev/null +++ b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/PagingCounterTest.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.compatibility; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import org.apache.activemq.artemis.tests.compatibility.base.VersionedBase; +import org.apache.activemq.artemis.utils.FileUtil; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.SNAPSHOT; +import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.TWO_TWENTYTWO_ZERO; + +@RunWith(Parameterized.class) +public class PagingCounterTest extends VersionedBase { + + // this will ensure that all tests in this class are run twice, + // once with "true" passed to the class' constructor and once with "false" + @Parameterized.Parameters(name = "server={0}, producer={1}, consumer={2}") + public static Collection getParameters() { + // we don't need every single version ever released.. + // if we keep testing current one against 2.4 and 1.4.. we are sure the wire and API won't change over time + List combinations = new ArrayList<>(); + + /* + // during development sometimes is useful to comment out the combinations + // and add the ones you are interested.. example: + */ + // combinations.add(new Object[]{SNAPSHOT, ONE_FIVE, ONE_FIVE}); + // combinations.add(new Object[]{ONE_FIVE, ONE_FIVE, ONE_FIVE}); + + combinations.add(new Object[]{null, TWO_TWENTYTWO_ZERO, SNAPSHOT}); + combinations.add(new Object[]{null, SNAPSHOT, TWO_TWENTYTWO_ZERO}); + // the purpose on this one is just to validate the test itself. + /// if it can't run against itself it won't work at all + combinations.add(new Object[]{null, SNAPSHOT, SNAPSHOT}); + return combinations; + } + + public PagingCounterTest(String server, String sender, String receiver) throws Exception { + super(server, sender, receiver); + } + + @Before + public void removeFolder() throws Throwable { + FileUtil.deleteDirectory(serverFolder.getRoot()); + serverFolder.getRoot().mkdirs(); + } + + @After + public void tearDown() { + try { + stopServer(serverClassloader); + } catch (Throwable ignored) { + } + try { + stopServer(receiverClassloader); + } catch (Throwable ignored) { + } + } + + @Test + public void testSendReceivePaging() throws Throwable { + setVariable(senderClassloader, "persistent", true); + startServer(serverFolder.getRoot(), senderClassloader, "pageCounter", null, true); + evaluate(senderClassloader, "journalcompatibility/forcepaging.groovy"); + evaluate(senderClassloader, "pageCounter/sendMessages.groovy", server, "core", "1000"); + evaluate(senderClassloader, "journalcompatibility/ispaging.groovy"); + evaluate(senderClassloader, "pageCounter/checkMessages.groovy", "1000"); + stopServer(senderClassloader); + + setVariable(receiverClassloader, "persistent", true); + startServer(serverFolder.getRoot(), receiverClassloader, "pageCounter", null, false); + evaluate(receiverClassloader, "journalcompatibility/ispaging.groovy"); + evaluate(receiverClassloader, "pageCounter/checkMessages.groovy", "1000"); + + } +} + diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/BackupSyncJournalTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/BackupSyncJournalTest.java index 30986fa890..d4e45b2471 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/BackupSyncJournalTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/BackupSyncJournalTest.java @@ -159,7 +159,6 @@ public class BackupSyncJournalTest extends FailoverTestBase { for (Pair pair : backupIds) { totalBackup += pair.getB(); } - assertEquals("number of records must match ", total, totalBackup); // "+ 2": there two other calls that send N_MSGS. for (int i = 0; i < totalRounds + 3; i++) { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java index 2ffef62bb2..f141a8ba66 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java @@ -1715,6 +1715,11 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes public boolean isEmbeddedWebServerStarted() { return (boolean) proxy.retrieveAttributeValue("embeddedWebServerStarted"); } + + @Override + public void rebuildPageCounters() throws Exception { + proxy.invokeOperation("rebuildPageCounters"); + } }; } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PageCountSyncOnNonTXTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PageCountSyncOnNonTXTest.java index 0231610c69..f2c67c9d13 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PageCountSyncOnNonTXTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PageCountSyncOnNonTXTest.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.artemis.tests.integration.paging; +import java.lang.invoke.MethodHandles; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -39,9 +40,13 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Rule; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class PageCountSyncOnNonTXTest extends SpawnedTestBase { + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + @Rule public RetryRule retryRule = new RetryRule(1); @@ -151,7 +156,7 @@ public class PageCountSyncOnNonTXTest extends SpawnedTestBase { } } } catch (Exception expected) { - expected.printStackTrace(); + logger.info("expected exception {}", expected.toString(), expected); } } finally { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PageCounterRebuildTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PageCounterRebuildTest.java new file mode 100644 index 0000000000..9669de41a6 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PageCounterRebuildTest.java @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.integration.paging; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import java.lang.invoke.MethodHandles; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.activemq.artemis.api.core.QueueConfiguration; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.core.paging.cursor.impl.PageSubscriptionCounterImpl; +import org.apache.activemq.artemis.core.persistence.StorageManager; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.activemq.artemis.tests.util.CFUtil; +import org.apache.activemq.artemis.tests.util.Wait; +import org.apache.activemq.artemis.utils.ReusableLatch; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class PageCounterRebuildTest extends ActiveMQTestBase { + + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + @Test + public void testUnitSize() throws Exception { + AtomicInteger errors = new AtomicInteger(0); + + StorageManager mockStorage = Mockito.mock(StorageManager.class); + + PageSubscriptionCounterImpl nonPersistentPagingCounter = new PageSubscriptionCounterImpl(mockStorage, -1); + + final int THREADS = 33; + final int ADD_VALUE = 7; + final int SIZE_VALUE = 17; + final int REPEAT = 777; + + ExecutorService executorService = Executors.newFixedThreadPool(THREADS); + runAfter(executorService::shutdownNow); + + CyclicBarrier startFlag = new CyclicBarrier(THREADS); + + ReusableLatch latch = new ReusableLatch(THREADS); + + for (int j = 0; j < THREADS; j++) { + executorService.execute(() -> { + try { + startFlag.await(10, TimeUnit.SECONDS); + for (int i = 0; i < REPEAT; i++) { + nonPersistentPagingCounter.increment(null, ADD_VALUE, SIZE_VALUE); + } + } catch (Throwable e) { + logger.warn(e.getMessage(), e); + errors.incrementAndGet(); + } finally { + latch.countDown(); + } + }); + } + + Assert.assertTrue(latch.await(10, TimeUnit.SECONDS)); + + Assert.assertEquals(ADD_VALUE * THREADS * REPEAT, nonPersistentPagingCounter.getValue()); + Assert.assertEquals(SIZE_VALUE * THREADS * REPEAT, nonPersistentPagingCounter.getPersistentSize()); + + + latch.setCount(THREADS); + + for (int j = 0; j < THREADS; j++) { + executorService.execute(() -> { + try { + startFlag.await(10, TimeUnit.SECONDS); + for (int i = 0; i < REPEAT; i++) { + nonPersistentPagingCounter.increment(null, -ADD_VALUE, -SIZE_VALUE); + } + } catch (Throwable e) { + logger.warn(e.getMessage(), e); + errors.incrementAndGet(); + } finally { + latch.countDown(); + } + }); + } + + Assert.assertTrue(latch.await(10, TimeUnit.SECONDS)); + + Assert.assertEquals(0L, nonPersistentPagingCounter.getValue()); + Assert.assertEquals(0L, nonPersistentPagingCounter.getPersistentSize()); + Assert.assertEquals(0, errors.get()); + } + + @Test + public void testRebuildCounter() throws Exception { + ActiveMQServer server = createServer(true, true); + AddressSettings defaultSetting = new AddressSettings().setPageSizeBytes(100 * 1024).setMaxReadPageMessages(1); + server.getAddressSettingsRepository().addMatch("#", defaultSetting); + server.start(); + + String queueName = getName(); + String nonConsumedQueueName = getName() + "_nonConsumed"; + server.addAddressInfo(new AddressInfo(queueName).addRoutingType(RoutingType.MULTICAST)); + server.createQueue(new QueueConfiguration(nonConsumedQueueName).setAddress(queueName).setRoutingType(RoutingType.MULTICAST)); + server.createQueue(new QueueConfiguration(queueName).setRoutingType(RoutingType.MULTICAST)); + + Queue serverQueue = server.locateQueue(queueName); + Queue serverNonConsumedQueue = server.locateQueue(nonConsumedQueueName); + + Assert.assertNotNull(serverQueue); + Assert.assertNotNull(serverNonConsumedQueue); + + serverQueue.getPagingStore().startPaging(); + + final int THREADS = 4; + final int TX_SEND = 2000; + final int NON_TXT_SEND = 200; + final int CONSUME_MESSAGES = 200; + AtomicInteger errors = new AtomicInteger(0); + + ExecutorService executorService = Executors.newFixedThreadPool(THREADS); + runAfter(executorService::shutdownNow); + + CyclicBarrier startFlag = new CyclicBarrier(THREADS); + + ReusableLatch latch = new ReusableLatch(THREADS); + + for (int i = 0; i < THREADS; i++) { + final int threadNumber = i; + executorService.execute(() -> { + try { + startFlag.await(10, TimeUnit.SECONDS); + ConnectionFactory factory = CFUtil.createConnectionFactory("core", "tcp://localhost:61616"); + try (Connection connection = factory.createConnection(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Session txSession = connection.createSession(true, Session.AUTO_ACKNOWLEDGE)) { + + logger.info("sending thread {}", threadNumber); + + javax.jms.Topic jmsQueue = session.createTopic(queueName); + MessageProducer producerNonTX = session.createProducer(jmsQueue); + MessageProducer producerTX = txSession.createProducer(jmsQueue); + + for (int message = 0; message < NON_TXT_SEND; message++) { + TextMessage txtMessage = session.createTextMessage("hello" + message); + txtMessage.setBooleanProperty("first", false); + producerNonTX.send(session.createTextMessage("hello" + message)); + } + for (int message = 0; message < TX_SEND; message++) { + producerTX.send(session.createTextMessage("helloTX" + message)); + } + txSession.commit(); + } + + } catch (Throwable e) { + errors.incrementAndGet(); + } finally { + latch.countDown(); + } + }); + } + + // this should be fast on the CIs, but if you use a slow disk, it might take a few extra seconds. + Assert.assertTrue(latch.await(1, TimeUnit.MINUTES)); + + final int numberOfMessages = TX_SEND * THREADS + NON_TXT_SEND * THREADS; + Wait.assertEquals(numberOfMessages, serverQueue::getMessageCount); + + ConnectionFactory factory = CFUtil.createConnectionFactory("core", "tcp://localhost:61616"); + try (Connection connection = factory.createConnection(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);) { + MessageConsumer consumer = session.createConsumer(session.createQueue(queueName + "::" + queueName)); + connection.start(); + for (int i = 0; i < CONSUME_MESSAGES; i++) { + Message message = consumer.receive(5000); + Assert.assertNotNull(message); + } + } + + Wait.assertEquals(numberOfMessages - CONSUME_MESSAGES, serverQueue::getMessageCount); + Wait.assertEquals(numberOfMessages, serverNonConsumedQueue::getMessageCount); + + server.stop(); + server.start(); + + serverQueue = server.locateQueue(queueName); + serverNonConsumedQueue = server.locateQueue(nonConsumedQueueName); + + Wait.assertEquals(numberOfMessages - CONSUME_MESSAGES, serverQueue::getMessageCount); + Wait.assertEquals(numberOfMessages, serverNonConsumedQueue::getMessageCount); + + serverQueue.getPageSubscription().getCounter().markRebuilding(); + serverNonConsumedQueue.getPageSubscription().getCounter().markRebuilding(); + + // if though we are rebuilding, we are still returning based on the last recorded value until processing is finished + Assert.assertEquals(8600, serverQueue.getMessageCount()); + Assert.assertEquals(8800, serverNonConsumedQueue.getMessageCount()); + + serverQueue.getPageSubscription().getCounter().finishRebuild(); + + serverNonConsumedQueue.getPageSubscription().getCounter().finishRebuild(); + + Assert.assertEquals(0, serverQueue.getMessageCount()); // we artificially made it 0 by faking a rebuild + Assert.assertEquals(0, serverNonConsumedQueue.getMessageCount()); // we artificially made it 0 by faking a rebuild + + server.stop(); + server.start(); + + serverQueue = server.locateQueue(queueName); + serverNonConsumedQueue = server.locateQueue(nonConsumedQueueName); + + // after a rebuild, the counter should be back to where it was + Wait.assertEquals(numberOfMessages - CONSUME_MESSAGES, serverQueue::getMessageCount); + Wait.assertEquals(numberOfMessages, serverNonConsumedQueue::getMessageCount); + + server.stop(); + server.start(); + + serverQueue = server.locateQueue(queueName); + serverNonConsumedQueue = server.locateQueue(nonConsumedQueueName); + + Assert.assertNotNull(serverQueue); + Assert.assertNotNull(serverNonConsumedQueue); + + Wait.assertEquals(numberOfMessages - CONSUME_MESSAGES, serverQueue::getMessageCount); + Wait.assertEquals(numberOfMessages, serverNonConsumedQueue::getMessageCount); + + server.stop(); + // restarting the server to issue a rebuild on the counters + server.start(); + + logger.info("Consuming messages"); + factory = CFUtil.createConnectionFactory("core", "tcp://localhost:61616"); + try (Connection connection = factory.createConnection(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);) { + MessageConsumer consumer = session.createConsumer(session.createQueue(queueName + "::" + queueName)); + connection.start(); + for (int i = 0; i < numberOfMessages - CONSUME_MESSAGES; i++) { + Message message = consumer.receive(5000); + Assert.assertNotNull(message); + if (i % 100 == 0) { + logger.info("Received {} messages", i); + } + } + Assert.assertNull(consumer.receiveNoWait()); + consumer.close(); + + consumer = session.createConsumer(session.createQueue(queueName + "::" + nonConsumedQueueName)); + connection.start(); + for (int i = 0; i < numberOfMessages; i++) { + Message message = consumer.receive(5000); + Assert.assertNotNull(message); + } + Assert.assertNull(consumer.receiveNoWait()); + consumer.close(); + } + + serverQueue = server.locateQueue(queueName); + serverNonConsumedQueue = server.locateQueue(nonConsumedQueueName); + + Wait.assertEquals(0L, serverQueue::getMessageCount, 1000, 100); + Wait.assertEquals(0L, serverNonConsumedQueue::getMessageCount, 1000, 100); + } +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingCounterTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingCounterTest.java index f68608e24a..8492f9626f 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingCounterTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingCounterTest.java @@ -33,7 +33,6 @@ import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; import org.apache.activemq.artemis.api.core.client.ServerLocator; import org.apache.activemq.artemis.core.paging.cursor.PageSubscription; import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter; -import org.apache.activemq.artemis.core.paging.cursor.impl.PageSubscriptionCounterImpl; import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl; import org.apache.activemq.artemis.core.server.ActiveMQServer; @@ -164,6 +163,8 @@ public class PagingCounterTest extends ActiveMQTestBase { server.stop(); + server.setRebuildCounters(false); + server.start(); queue = server.locateQueue("A1"); @@ -177,6 +178,70 @@ public class PagingCounterTest extends ActiveMQTestBase { } } + @Test + public void testMultiThreadCounter() throws Exception { + ClientSessionFactory sf = createSessionFactory(sl); + ClientSession session = sf.createSession(); + + try { + server.addAddressInfo(new AddressInfo(new SimpleString("A1"), RoutingType.ANYCAST)); + Queue queue = server.createQueue(new QueueConfiguration(new SimpleString("A1")).setRoutingType(RoutingType.ANYCAST)); + + final PageSubscriptionCounter counter = locateCounter(queue); + + final int THREADS = 10; + + final CyclicBarrier flagStart = new CyclicBarrier(THREADS); + final CountDownLatch done = new CountDownLatch(THREADS); + + final int BUMPS = 2000; + + Assert.assertEquals(0, counter.getValue()); + + ExecutorService executorService = Executors.newFixedThreadPool(THREADS); + runAfter(executorService::shutdownNow); + + for (int i = 0; i < THREADS; i++) { + executorService.execute(() -> { + try { + flagStart.await(10, TimeUnit.SECONDS); + for (int repeat = 0; repeat < BUMPS; repeat++) { + counter.increment(null, 1, 1L); + Transaction tx = new TransactionImpl(server.getStorageManager()); + counter.increment(tx, 1, 1L); + tx.commit(); + } + } catch (Exception e) { + logger.warn(e.getMessage(), e); + } finally { + done.countDown(); + } + }); + } + + // it should take a couple seconds only + done.await(1, TimeUnit.MINUTES); + + Wait.assertEquals((long)(BUMPS * 2 * THREADS), counter::getValue, 5000, 100); + + server.stop(); + + server.setRebuildCounters(false); + + server.start(); + + queue = server.locateQueue("A1"); + + final PageSubscriptionCounter counterAfterRestart = locateCounter(queue); + Wait.assertEquals((long)(BUMPS * 2 * THREADS), counterAfterRestart::getValue, 5000, 100); + Assert.assertEquals(BUMPS * 2 * THREADS, counterAfterRestart.getValue()); + + } finally { + sf.close(); + session.close(); + } + } + @Test public void testCleanupCounter() throws Exception { ClientSessionFactory sf = createSessionFactory(sl); @@ -216,6 +281,7 @@ public class PagingCounterTest extends ActiveMQTestBase { server.stop(); server = newActiveMQServer(); + server.setRebuildCounters(false); server.start(); @@ -228,6 +294,11 @@ public class PagingCounterTest extends ActiveMQTestBase { assertEquals(2100, counter.getValue()); assertEquals(2100 * 1000, counter.getPersistentSize()); + server.getPagingManager().rebuildCounters(); + + // it should be zero after rebuild, since no actual messages were sent + Wait.assertEquals(0, counter::getValue); + } finally { sf.close(); session.close(); @@ -246,8 +317,6 @@ public class PagingCounterTest extends ActiveMQTestBase { PageSubscriptionCounter counter = locateCounter(queue); - ((PageSubscriptionCounterImpl) counter).setPersistent(false); - StorageManager storage = server.getStorageManager(); Transaction tx = new TransactionImpl(server.getStorageManager()); @@ -321,7 +390,9 @@ public class PagingCounterTest extends ActiveMQTestBase { server.stop(); + server = newActiveMQServer(); + server.setRebuildCounters(false); server.start(); @@ -329,10 +400,29 @@ public class PagingCounterTest extends ActiveMQTestBase { assertNotNull(queue); - counter = locateCounter(queue); + PageSubscriptionCounter counterAfterRestart = locateCounter(queue); - assertEquals(1, counter.getValue()); - assertEquals(1000, counter.getPersistentSize()); + Wait.assertEquals(1, counterAfterRestart::getValue); + Wait.assertEquals(1000, counterAfterRestart::getPersistentSize); + + counterAfterRestart.markRebuilding(); + + // should be using a previously added value while rebuilding + Wait.assertEquals(1, counterAfterRestart::getValue); + + tx = new TransactionImpl(server.getStorageManager()); + + counterAfterRestart.increment(tx, 10, 10_000); + tx.commit(); + + Wait.assertEquals(11, counterAfterRestart::getValue); + Wait.assertEquals(11_000, counterAfterRestart::getPersistentSize); + counterAfterRestart.finishRebuild(); + + server.getPagingManager().rebuildCounters(); + + Wait.assertEquals(0, counterAfterRestart::getValue); + Wait.assertEquals(0, counterAfterRestart::getPersistentSize); } @@ -349,7 +439,7 @@ public class PagingCounterTest extends ActiveMQTestBase { } @Test - public void testPrepareCounter() throws Exception { + public void testCommitCounter() throws Exception { Xid xid = newXID(); Queue queue = server.createQueue(new QueueConfiguration(new SimpleString("A1")).setRoutingType(RoutingType.ANYCAST)); @@ -366,19 +456,19 @@ public class PagingCounterTest extends ActiveMQTestBase { assertEquals(0, counter.getValue()); - tx.prepare(); + tx.commit(); storage.waitOnOperations(); - assertEquals(0, counter.getValue()); + assertEquals(2000, counter.getValue()); server.stop(); server = newActiveMQServer(); - server.start(); + server.setRebuildCounters(false); - storage = server.getStorageManager(); + server.start(); queue = server.locateQueue(new SimpleString("A1")); @@ -386,16 +476,6 @@ public class PagingCounterTest extends ActiveMQTestBase { counter = locateCounter(queue); - tx = server.getResourceManager().removeTransaction(xid, null); - - assertNotNull(tx); - - assertEquals(0, counter.getValue()); - - tx.commit(false); - - storage.waitOnOperations(); - Wait.assertEquals(2000, counter::getValue); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingSendTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingSendTest.java index 381e14e27a..8192a1e54f 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingSendTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingSendTest.java @@ -34,8 +34,6 @@ import org.apache.activemq.artemis.api.core.client.ClientProducer; import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; import org.apache.activemq.artemis.api.core.client.ServerLocator; -import org.apache.activemq.artemis.core.config.Configuration; -import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; @@ -62,7 +60,6 @@ public class PagingSendTest extends ActiveMQTestBase { @Before public void setUp() throws Exception { super.setUp(); - Configuration config = new ConfigurationImpl(); server = newActiveMQServer(); server.start(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java index f29ad471d5..a76465988d 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java @@ -651,6 +651,7 @@ public final class ReplicationTest extends ActiveMQTestBase { PagingManager paging = new PagingManagerImpl(new PagingStoreFactoryNIO(storageManager, configuration.getPagingLocation(), 1000, null, executorFactory, executorFactory, false, null), addressSettingsRepository, configuration.getManagementAddress()); paging.start(); + runAfter(paging::stop); return paging; } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/SharedNothingReplicationFlowControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/SharedNothingReplicationFlowControlTest.java index 00ced9ee10..d3a69d2202 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/SharedNothingReplicationFlowControlTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/SharedNothingReplicationFlowControlTest.java @@ -150,9 +150,9 @@ public class SharedNothingReplicationFlowControlTest extends ActiveMQTestBase { ClientProducer producer = session.createProducer("flowcontrol"); ClientMessage message = session.createMessage(true); message.writeBodyBufferBytes(body); - logger.info("try to send a message after replicated"); + logger.debug("try to send a message after replicated"); producer.send(message); - logger.info("send message done"); + logger.debug("send message done"); producer.close(); session.close(); @@ -187,8 +187,8 @@ public class SharedNothingReplicationFlowControlTest extends ActiveMQTestBase { if (!(info.userRecordType == JournalRecordIds.ADD_MESSAGE_PROTOCOL)) { // ignore } - logger.info("got live message {} {}", info.id, info.userRecordType); liveJournalCounter.incrementAndGet(); + logger.info("got live message {} {}, counter={}", info.id, info.userRecordType, liveJournalCounter.get()); } }); @@ -207,8 +207,8 @@ public class SharedNothingReplicationFlowControlTest extends ActiveMQTestBase { if (!(info.userRecordType == JournalRecordIds.ADD_MESSAGE_PROTOCOL)) { // ignore } - logger.info("replicated message {}", info.id); replicationCounter.incrementAndGet(); + logger.info("replicated message {}, counter={}", info.id, replicationCounter.get()); } }); diff --git a/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java b/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java index 0426f55706..5297e6dcf0 100644 --- a/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java +++ b/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java @@ -245,6 +245,10 @@ public class PersistMultiThreadTest extends ActiveMQTestBase { class FakePagingStore implements PagingStore { + @Override + public void counterSnapshot() { + } + @Override public void execute(Runnable runnable) { runnable.run(); diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingManagerImplTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingManagerImplTest.java index 713ce21d44..68024e4fce 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingManagerImplTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingManagerImplTest.java @@ -61,6 +61,8 @@ public class PagingManagerImplTest extends ActiveMQTestBase { managerImpl.start(); + runAfter(managerImpl::stop); + PagingStore store = managerImpl.getPageStore(new SimpleString("simple-test")); ICoreMessage msg = createMessage(1L, new SimpleString("simple-test"), createRandomBuffer(10)); diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/FakePagingManager.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/FakePagingManager.java index bed3ea5ad6..2012848010 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/FakePagingManager.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/FakePagingManager.java @@ -39,6 +39,11 @@ public class FakePagingManager implements PagingManager { } + @Override + public void counterSnapshot() { + + } + @Override public void addTransaction(final PageTransactionInfo pageTransaction) { }