diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/journal/CompactJournal.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/journal/CompactJournal.java index d8c339c173..c27225bab8 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/journal/CompactJournal.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/journal/CompactJournal.java @@ -25,6 +25,7 @@ import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory; import org.apache.activemq.artemis.core.journal.impl.JournalImpl; +import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds; @Command(name = "compact", description = "Compacts the journal of a non running server") public final class CompactJournal extends LockAbstract { @@ -34,10 +35,7 @@ public final class CompactJournal extends LockAbstract { super.execute(context); try { Configuration configuration = getFileConfiguration(); - compactJournal(new File(getJournal()), "activemq-data", "amq", configuration.getJournalMinFiles(), configuration.getJournalPoolFiles(), configuration.getJournalFileSize(), null); - System.out.println("Compactation succeeded for " + getJournal()); - compactJournal(new File(getBinding()), "activemq-bindings", "bindings", 2, 2, 1048576, null); - System.out.println("Compactation succeeded for " + getBinding()); + compactJournals(configuration); } catch (Exception e) { treatError(e, "data", "compact"); @@ -45,16 +43,30 @@ public final class CompactJournal extends LockAbstract { return null; } - private void compactJournal(final File directory, + public static void compactJournals(Configuration configuration) throws Exception { + compactJournal(configuration.getJournalLocation(), "activemq-data", "amq", configuration.getJournalMinFiles(), + configuration.getJournalPoolFiles(), configuration.getJournalFileSize(), null, JournalRecordIds.UPDATE_DELIVERY_COUNT, + JournalRecordIds.SET_SCHEDULED_DELIVERY_TIME); + System.out.println("Compactation succeeded for " + configuration.getJournalLocation().getAbsolutePath()); + compactJournal(configuration.getBindingsLocation(), "activemq-bindings", "bindings", 2, 2, 1048576, null); + System.out.println("Compactation succeeded for " + configuration.getBindingsLocation()); + } + + public static void compactJournal(final File directory, final String journalPrefix, final String journalSuffix, final int minFiles, final int poolFiles, final int fileSize, - final IOCriticalErrorListener listener) throws Exception { + final IOCriticalErrorListener listener, + int... replaceableRecords) throws Exception { NIOSequentialFileFactory nio = new NIOSequentialFileFactory(directory, listener, 1); JournalImpl journal = new JournalImpl(fileSize, minFiles, poolFiles, 0, 0, nio, journalPrefix, journalSuffix, 1); + for (int i : replaceableRecords) { + journal.replaceableRecord(i); + } + journal.setRemoveExtraFilesOnLoad(true); journal.start(); diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java index fe341e4797..56a260bd56 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java @@ -62,6 +62,16 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { // Sync Delay in ms //private static final int SYNC_DELAY = 5; + @Override + public void setRemoveExtraFilesOnLoad(boolean removeExtraFilesOnLoad) { + // no op on JDBC + } + + @Override + public boolean isRemoveExtraFilesOnLoad() { + return false; + } + private long syncDelay; private static int USER_VERSION = 1; diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java index a9299aaf83..7c652afa13 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java @@ -58,6 +58,10 @@ public interface Journal extends ActiveMQComponent { LOADED; } + void setRemoveExtraFilesOnLoad(boolean removeExtraFilesOnLoad); + + boolean isRemoveExtraFilesOnLoad(); + // Non transactional operations void appendAddRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception; diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalBase.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalBase.java index 8c7a89b56b..c1a61a55ba 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalBase.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalBase.java @@ -28,6 +28,17 @@ abstract class JournalBase implements Journal { protected final int fileSize; private final boolean supportsCallback; + protected boolean removeExtraFilesOnLoad = false; + + @Override + public void setRemoveExtraFilesOnLoad(boolean setting) { + this.removeExtraFilesOnLoad = setting; + } + + @Override + public boolean isRemoveExtraFilesOnLoad() { + return removeExtraFilesOnLoad; + } JournalBase(boolean supportsCallback, int fileSize) { if (fileSize < JournalImpl.MIN_FILE_SIZE) { diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java index 68d07dd349..624094d0c0 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java @@ -2195,7 +2195,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal } else { if (changeData) { // Empty dataFiles with no data - filesRepository.addFreeFile(file, false, true); + filesRepository.addFreeFile(file, false, isRemoveExtraFilesOnLoad()); } } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java index 9974b9a69b..ae7985ef1b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java @@ -957,6 +957,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp readLock(); try { + messageJournal.setRemoveExtraFilesOnLoad(true); JournalLoadInformation info = messageJournal.load(records, preparedTransactions, new LargeMessageTXFailureCallback(this)); ArrayList largeMessages = new ArrayList<>(); @@ -1606,6 +1607,8 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp List preparedTransactions = new ArrayList<>(); + bindingsJournal.setRemoveExtraFilesOnLoad(true); + JournalLoadInformation bindingsInfo = bindingsJournal.load(records, preparedTransactions, null); HashMap mapBindings = new HashMap<>(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java index e66e9b3ad4..5a5ffb89c8 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java @@ -54,6 +54,16 @@ public class ReplicatedJournal implements Journal { private final byte journalID; + @Override + public void setRemoveExtraFilesOnLoad(boolean removeExtraFilesOnLoad) { + this.localJournal.setRemoveExtraFilesOnLoad(removeExtraFilesOnLoad); + } + + @Override + public boolean isRemoveExtraFilesOnLoad() { + return localJournal.isRemoveExtraFilesOnLoad(); + } + public ReplicatedJournal(final byte journalID, final Journal localJournal, final ReplicationManager replicationManager) { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InfiniteRedeliveryTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InfiniteRedeliveryTest.java index 69b597765f..efbc8efb0e 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InfiniteRedeliveryTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InfiniteRedeliveryTest.java @@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.artemis.api.core.QueueConfiguration; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.TransportConfiguration; +import org.apache.activemq.artemis.cli.commands.tools.journal.CompactJournal; import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.config.ha.ReplicaPolicyConfiguration; import org.apache.activemq.artemis.core.config.ha.SharedStoreSlavePolicyConfiguration; @@ -57,17 +58,19 @@ public class InfiniteRedeliveryTest extends ActiveMQTestBase { private static final Logger logger = Logger.getLogger(InfiniteRedeliveryTest.class); - @Parameterized.Parameters(name = "protocol={0}") + @Parameterized.Parameters(name = "protocol={0}, useCLI={1}") public static Collection getParameters() { - return Arrays.asList(new Object[][]{{"CORE"}, {"AMQP"}, {"OPENWIRE"}}); + return Arrays.asList(new Object[][]{{"CORE", true}, {"AMQP", false}, {"OPENWIRE", false}}); } - public InfiniteRedeliveryTest(String protocol) { + public InfiniteRedeliveryTest(String protocol, boolean useCLI) { this.protocol = protocol; + this.useCLI = useCLI; } String protocol; + boolean useCLI; TestableServer liveServer; TestableServer backupServer; @@ -171,15 +174,23 @@ public class InfiniteRedeliveryTest extends ActiveMQTestBase { } connection.close(); - liveServer.getServer().getStorageManager().getMessageJournal().scheduleCompactAndBlock(5000); - backupServer.getServer().getStorageManager().getMessageJournal().scheduleCompactAndBlock(5000); + if (!useCLI) { + liveServer.getServer().getStorageManager().getMessageJournal().scheduleCompactAndBlock(5000); + backupServer.getServer().getStorageManager().getMessageJournal().scheduleCompactAndBlock(5000); + } + + liveServer.stop(); + backupServer.stop(); + + if (useCLI) { + CompactJournal.compactJournals(backupServer.getServer().getConfiguration()); + CompactJournal.compactJournals(liveServer.getServer().getConfiguration()); + } HashMap counts = countJournal(liveServer.getServer().getConfiguration()); counts.forEach((k, v) -> logger.debug(k + "=" + v)); counts.forEach((k, v) -> Assert.assertTrue("Record type " + k + " has a lot of records:" + v, v.intValue() < 20)); - backupServer.stop(); - HashMap backupCounts = countJournal(backupServer.getServer().getConfiguration()); Assert.assertTrue(backupCounts.size() > 0); backupCounts.forEach((k, v) -> logger.debug("On Backup:" + k + "=" + v)); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/ShrinkDataOnStartTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/ShrinkDataOnStartTest.java new file mode 100644 index 0000000000..44bbce7947 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/ShrinkDataOnStartTest.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.journal; + +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.activemq.artemis.utils.Wait; +import org.junit.Assert; +import org.junit.Test; + +public class ShrinkDataOnStartTest extends ActiveMQTestBase { + + @Test + public void shrinkDataOnStart() throws Exception { + + ActiveMQServer server = addServer(createServer(true)); + server.getConfiguration().setJournalMinFiles(10); + server.getConfiguration().setJournalPoolFiles(2); + server.start(); + Wait.waitFor(server::isActive); + Assert.assertEquals(10, server.getStorageManager().getMessageJournal().getFileFactory().listFiles("amq").size()); + server.stop(); + server.getConfiguration().setJournalMinFiles(2); + server.getConfiguration().setJournalPoolFiles(2); + server.start(); + Assert.assertEquals(2, server.getStorageManager().getMessageJournal().getFileFactory().listFiles("amq").size()); + } + +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java index b82e5824b2..13674bdf3e 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java @@ -647,6 +647,16 @@ public final class ReplicationTest extends ActiveMQTestBase { static final class FakeJournal implements Journal { + @Override + public void setRemoveExtraFilesOnLoad(boolean removeExtraFilesOnLoad) { + + } + + @Override + public boolean isRemoveExtraFilesOnLoad() { + return false; + } + @Override public void appendAddRecord(long id, byte recordType, diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestUnit.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestUnit.java index f615c29666..8299c9b4cc 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestUnit.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestUnit.java @@ -459,6 +459,33 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase { stopJournal(); } + + @Test + public void testReduceFreeFilesWithCleanup() throws Exception { + setup(10, 10 * 1024, true); + createJournal(); + startJournal(); + load(); + + List files1 = fileFactory.listFiles(fileExtension); + + Assert.assertEquals(10, files1.size()); + + stopJournal(); + + setup(5, 10 * 1024, true); + createJournal(); + journal.setRemoveExtraFilesOnLoad(true); + startJournal(); + load(); + + List files2 = fileFactory.listFiles(fileExtension); + + Assert.assertEquals(5, files2.size()); + + stopJournal(); + } + private int calculateRecordsPerFile(final int fileSize, final int alignment, int recordSize) { recordSize = calculateRecordSize(recordSize, alignment); int headerSize = calculateRecordSize(JournalImpl.SIZE_HEADER, alignment);