From 27c343913f15101bcb9cfc649ced3c18ceb0c321 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Tue, 11 May 2021 17:22:56 -0400 Subject: [PATCH] ARTEMIS-3297 Journal Retention Feature --- .../apache/activemq/artemis/cli/Artemis.java | 2 +- .../cli/commands/tools/RecoverMessages.java | 65 ++- .../tools/journal/CompactJournal.java | 8 +- .../artemis/cli/commands/etc/broker.xml | 10 + .../artemis/utils/StringPrintStream.java | 4 + .../jdbc/store/journal/JDBCJournalImpl.java | 10 + .../core/io/nio/NIOSequentialFile.java | 6 +- .../artemis/core/journal/Journal.java | 29 ++ .../core/journal/impl/FileWrapperJournal.java | 11 + .../core/journal/impl/JournalFile.java | 7 + .../core/journal/impl/JournalFileImpl.java | 9 +- .../journal/impl/JournalFilesRepository.java | 10 +- .../core/journal/impl/JournalImpl.java | 371 +++++++++++++++++- .../journal/impl/JournalReaderCallback.java | 6 + .../impl/dataformat/JournalAddRecord.java | 35 +- .../artemis/core/config/Configuration.java | 20 + .../core/config/impl/ConfigurationImpl.java | 55 ++- .../impl/FileConfigurationParser.java | 45 +++ .../artemis/core/paging/impl/Page.java | 8 +- .../AbstractJournalStorageManager.java | 4 + .../impl/journal/BufferSplitter.java | 77 ++++ .../impl/journal/DescribeJournal.java | 4 + .../impl/journal/JournalRecordIds.java | 3 + .../impl/journal/JournalStorageManager.java | 36 ++ .../wireformat/ReplicationAddMessage.java | 4 +- .../wireformat/ReplicationAddTXMessage.java | 4 +- .../core/replication/ReplicatedJournal.java | 14 + .../core/replication/ReplicationEndpoint.java | 33 +- .../core/replication/ReplicationManager.java | 27 +- .../schema/artemis-configuration.xsd | 47 +++ .../impl/FileConfigurationParserTest.java | 123 ++++++ .../config/impl/FileConfigurationTest.java | 4 + .../impl/journal/BufferSplitterTest.java | 52 +++ .../ConfigurationTest-full-config.xml | 1 + .../ConfigurationTest-xinclude-config.xml | 1 + docs/user-manual/en/data-tools.md | 1 + docs/user-manual/en/persistence.md | 40 ++ .../tests/integration/cli/RecoverTest.java | 160 ++++++-- .../cluster/failover/FailoverTestBase.java | 3 + .../journal/NIOJournalCompactTest.java | 10 + .../replication/ReplicationTest.java | 10 + .../servers/replicated-static0/broker.xml | 2 + .../servers/replicated-static1/broker.xml | 2 + .../ReplicationFlowControlTest.java | 13 + .../journal/impl/FakeJournalImplTest.java | 5 + .../impl/JournaHistorylBackupTest.java | 312 +++++++++++++++ .../journal/impl/JournalImplTestBase.java | 11 + .../journal/impl/JournalImplTestUnit.java | 63 +++ 48 files changed, 1663 insertions(+), 114 deletions(-) create mode 100644 artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/BufferSplitter.java create mode 100644 artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/BufferSplitterTest.java create mode 100644 tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournaHistorylBackupTest.java diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/Artemis.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/Artemis.java index da109d7361..876367d2d4 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/Artemis.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/Artemis.java @@ -178,7 +178,7 @@ public class Artemis { builder = builder.withCommands(Run.class, Stop.class, Kill.class, PerfJournal.class); } else { builder.withGroup("data").withDescription("data tools group (print) (example ./artemis data print)"). - withDefaultCommand(HelpData.class).withCommands(PrintData.class); + withDefaultCommand(HelpData.class).withCommands(RecoverMessages.class, PrintData.class); builder = builder.withCommand(Create.class); } diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/RecoverMessages.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/RecoverMessages.java index a05ebedaea..31c7e728e0 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/RecoverMessages.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/RecoverMessages.java @@ -25,12 +25,14 @@ import io.airlift.airline.Option; import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.cli.commands.ActionContext; import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.io.SequentialFileFactory; import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory; import org.apache.activemq.artemis.core.journal.RecordInfo; import org.apache.activemq.artemis.core.journal.impl.JournalFile; import org.apache.activemq.artemis.core.journal.impl.JournalImpl; import org.apache.activemq.artemis.core.journal.impl.JournalReaderCallback; +import org.apache.activemq.artemis.core.journal.impl.dataformat.ByteArrayEncoding; import org.apache.activemq.artemis.core.message.impl.CoreMessagePersister; import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds; import org.apache.activemq.artemis.spi.core.protocol.MessagePersister; @@ -63,7 +65,7 @@ public class RecoverMessages extends DBOption { if (configuration.isJDBC()) { throw new IllegalAccessException("JDBC Not supported on recover"); } else { - recover(configuration, journalOutput, reclaimed); + recover(configuration, getJournal(), journalOutput, new File(getLargeMessages()), reclaimed); } } catch (Exception e) { treatError(e, "data", "print"); @@ -71,9 +73,9 @@ public class RecoverMessages extends DBOption { return null; } - public static void recover(Configuration configuration, File journalOutput, boolean reclaimed) throws Exception { + public static void recover(Configuration configuration, String journallocation, File journalOutput, File largeMessage, boolean reclaimed) throws Exception { - File journal = configuration.getJournalLocation(); + File journal = new File(journallocation); if (!journalOutput.exists()) { if (!journalOutput.mkdirs()) { @@ -87,12 +89,14 @@ public class RecoverMessages extends DBOption { SequentialFileFactory outputFF = new NIOSequentialFileFactory(journalOutput, null, 1); outputFF.setDatasync(false); - JournalImpl targetJournal = new JournalImpl(configuration.getJournalFileSize(), 2, 2, 0, 0, outputFF, "activemq-data", "amq", 1); + JournalImpl targetJournal = new JournalImpl(configuration.getJournalFileSize(), 2, 2, -1, 0, outputFF, "activemq-data", "amq", 1); + targetJournal.setAutoReclaim(false); targetJournal.start(); targetJournal.loadInternalOnly(); SequentialFileFactory messagesFF = new NIOSequentialFileFactory(journal, null, 1); + SequentialFileFactory largeMessagesFF = new NIOSequentialFileFactory(largeMessage, null, 1); // Will use only default values. The load function should adapt to anything different JournalImpl messagesJournal = new JournalImpl(configuration.getJournalFileSize(), configuration.getJournalMinFiles(), configuration.getJournalPoolFiles(), 0, 0, messagesFF, "activemq-data", "amq", 1); @@ -106,23 +110,66 @@ public class RecoverMessages extends DBOption { userRecordsOfInterest.add(JournalRecordIds.ADD_REF); userRecordsOfInterest.add(JournalRecordIds.PAGE_TRANSACTION); + HashSet> routeBindigns = new HashSet<>(); + for (JournalFile file : files) { + // For reviewers and future maintainers: I really meant System.out.println here + // This is part of the CLI, hence this is like user's output System.out.println("Recovering messages from file " + file); - HashSet> routeBindigns = new HashSet<>(); - JournalImpl.readJournalFile(messagesFF, file, new JournalReaderCallback() { + long lastlargeMessageId = -1; + SequentialFile largeMessageFile; + @Override + public void done() { + try { + if (largeMessageFile != null) { + largeMessageFile.close(); + largeMessageFile = null; + } + } catch (Exception e) { + e.printStackTrace(); + } + } + @Override + public void onReadEventRecord(RecordInfo info) throws Exception { + switch (info.getUserRecordType()) { + case JournalRecordIds.ADD_REF: + onReadUpdateRecord(info); + break; + + case JournalRecordIds.ADD_MESSAGE_BODY: + if (lastlargeMessageId != info.id || largeMessageFile == null) { + if (largeMessageFile != null) { + largeMessageFile.close(); + } + + largeMessageFile = largeMessagesFF.createSequentialFile(info.id + ".msg"); + largeMessageFile.open(); + largeMessageFile.position(largeMessageFile.size()); + lastlargeMessageId = info.id; + } + largeMessageFile.write(new ByteArrayEncoding(info.data), false, null); + break; + + default: + onReadAddRecord(info); + } + } + @Override public void onReadAddRecord(RecordInfo info) throws Exception { if (userRecordsOfInterest.contains(info.getUserRecordType())) { if (targetJournal.getRecords().get(info.id) != null) { + // Really meant System.out.. user's information on the CLI System.out.println("RecordID " + info.id + " would been duplicated, ignoring it"); return; } try { - targetJournal.appendAddRecord(info.id, info.userRecordType, info.data, true); + targetJournal.appendAddRecord(info.id, info.userRecordType, info.data, false); } catch (Exception e) { + // Really meant System.out.. user's information on the CLI System.out.println("Cannot append record for " + info.id + "->" + e.getMessage()); } } @@ -135,7 +182,8 @@ public class RecoverMessages extends DBOption { long queue = ByteUtil.bytesToLong(info.data); Pair pairQueue = new Pair<>(info.id, queue); if (routeBindigns.contains(pairQueue)) { - System.out.println("AddRef on " + info.id + " / queue=" + queue + " has already been recorded, ignoring it"); + // really meant system.out + System.out.println("AddReference on " + info.id + " / queue=" + queue + " has already been recorded, ignoring it"); return; } @@ -145,6 +193,7 @@ public class RecoverMessages extends DBOption { targetJournal.appendUpdateRecord(info.id, info.userRecordType, info.data, true); } catch (Exception e) { System.out.println("Cannot update record " + info.id + "-> " + e.getMessage()); + e.printStackTrace(System.out); } } } diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/journal/CompactJournal.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/journal/CompactJournal.java index c27225bab8..32aef5ef7c 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/journal/CompactJournal.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/journal/CompactJournal.java @@ -44,15 +44,16 @@ public final class CompactJournal extends LockAbstract { } public static void compactJournals(Configuration configuration) throws Exception { - compactJournal(configuration.getJournalLocation(), "activemq-data", "amq", configuration.getJournalMinFiles(), + compactJournal(configuration.getJournalLocation(), configuration.getJournalRetentionLocation(), "activemq-data", "amq", configuration.getJournalMinFiles(), configuration.getJournalPoolFiles(), configuration.getJournalFileSize(), null, JournalRecordIds.UPDATE_DELIVERY_COUNT, JournalRecordIds.SET_SCHEDULED_DELIVERY_TIME); System.out.println("Compactation succeeded for " + configuration.getJournalLocation().getAbsolutePath()); - compactJournal(configuration.getBindingsLocation(), "activemq-bindings", "bindings", 2, 2, 1048576, null); + compactJournal(configuration.getBindingsLocation(), null, "activemq-bindings", "bindings", 2, 2, 1048576, null); System.out.println("Compactation succeeded for " + configuration.getBindingsLocation()); } public static void compactJournal(final File directory, + final File historyFolder, final String journalPrefix, final String journalSuffix, final int minFiles, @@ -63,6 +64,9 @@ public final class CompactJournal extends LockAbstract { NIOSequentialFileFactory nio = new NIOSequentialFileFactory(directory, listener, 1); JournalImpl journal = new JournalImpl(fileSize, minFiles, poolFiles, 0, 0, nio, journalPrefix, journalSuffix, 1); + if (historyFolder != null) { + journal.setHistoryFolder(historyFolder, -1, -1); + } for (int i : replaceableRecords) { journal.replaceableRecord(i); } diff --git a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/broker.xml b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/broker.xml index dabfc44a98..66793a5e9e 100644 --- a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/broker.xml +++ b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/broker.xml @@ -64,6 +64,16 @@ ${jdbc} that won't support flow control. --> 90 + + true diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/StringPrintStream.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/StringPrintStream.java index eed4f3be2d..7898856dee 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/StringPrintStream.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/StringPrintStream.java @@ -28,6 +28,10 @@ public class StringPrintStream { return new PrintStream(byteOuptut, true, StandardCharsets.UTF_8.name()); } + public byte[] getBytes() { + return byteOuptut.toByteArray(); + } + @Override public String toString() { try { diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java index c5ef737c67..9015c4e0d5 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java @@ -120,6 +120,16 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { this.syncDelay = syncDelay; } + @Override + public void appendAddEvent(long id, + byte recordType, + Persister persister, + Object record, + boolean sync, + IOCompletion completionCallback) throws Exception { + // Nothing to be done + } + @Override public void start() throws SQLException { super.start(); diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java index 99f268c0aa..eb4ff8d3a9 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java @@ -366,10 +366,6 @@ public class NIOSequentialFile extends AbstractSequentialFile { @Override public void writeDirect(final ByteBuffer bytes, final boolean sync, final IOCallback callback) { - if (callback == null) { - throw new NullPointerException("callback parameter need to be set"); - } - try { internalWrite(bytes, sync, callback, true); } catch (Exception e) { @@ -393,7 +389,7 @@ public class NIOSequentialFile extends AbstractSequentialFile { boolean releaseBuffer) throws IOException, ActiveMQIOErrorException, InterruptedException { if (!isOpen()) { if (callback != null) { - callback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), "File not opened"); + callback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), "File not opened - " + getFileName()); } else { throw ActiveMQJournalBundle.BUNDLE.fileNotOpened(); } diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java index 0c27cf4ed1..d0734a8f5b 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.artemis.core.journal; +import java.io.File; import java.util.List; import java.util.Map; @@ -62,6 +63,10 @@ public interface Journal extends ActiveMQComponent { boolean isRemoveExtraFilesOnLoad(); + default boolean isHistory() { + return false; + } + // Non transactional operations void appendAddRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception; @@ -70,6 +75,10 @@ public interface Journal extends ActiveMQComponent { appendAddRecord(id, recordType, EncoderPersister.getInstance(), record, sync); } + default Journal setHistoryFolder(File historyFolder, long maxBytes, long period) throws Exception { + return this; + } + void appendAddRecord(long id, byte recordType, Persister persister, Object record, boolean sync) throws Exception; void appendAddRecord(long id, @@ -79,6 +88,15 @@ public interface Journal extends ActiveMQComponent { boolean sync, IOCompletion completionCallback) throws Exception; + /** An event is data recorded on the journal, but it won't have any weight or deletes. It's always ready to be removed. + * It is useful on recovery data while in use with backup history journal. */ + void appendAddEvent(long id, + byte recordType, + Persister persister, + Object record, + boolean sync, + IOCompletion completionCallback) throws Exception; + default void appendAddRecord(long id, byte recordType, EncodingSupport record, @@ -284,6 +302,17 @@ public interface Journal extends ActiveMQComponent { */ void synchronizationUnlock(); + /** + * It will rename temporary files and place them on the copy folder, by resotring the original file name. + */ + default void processBackup() { + } + + /** + * It will check max files and max days on files and remove extra files. + */ + default void processBackupCleanup() { + } /** * Force the usage of a new {@link JournalFile}. * diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java index 0434cf2be3..41da3ead2c 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java @@ -101,7 +101,18 @@ public final class FileWrapperJournal extends JournalBase { boolean sync, IOCompletion callback) throws Exception { JournalInternalRecord addRecord = new JournalAddRecord(true, id, recordType, persister, record); + writeRecord(addRecord, false, -1, false, callback); + } + @Override + public void appendAddEvent(long id, + byte recordType, + Persister persister, + Object record, + boolean sync, + IOCompletion callback) throws Exception { + + JournalInternalRecord addRecord = new JournalAddRecord(JournalImpl.EVENT_RECORD, id, recordType, persister, record); writeRecord(addRecord, false, -1, false, callback); } diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFile.java index 48aa232665..9bd2b50933 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFile.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFile.java @@ -20,6 +20,13 @@ import org.apache.activemq.artemis.core.io.SequentialFile; public interface JournalFile { + default boolean isReclaimable() { + return true; + } + + default void setReclaimable(boolean reclaimable) { + } + int getNegCount(JournalFile file); void incNegCount(JournalFile file); diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFileImpl.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFileImpl.java index 8c5e439302..8c99f29bb8 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFileImpl.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFileImpl.java @@ -35,6 +35,13 @@ public class JournalFileImpl implements JournalFile { private long offset; + boolean reclaimable = true; + + @Override + public void setReclaimable(boolean reclaimable) { + this.reclaimable = reclaimable; + } + private static final AtomicIntegerFieldUpdater posCountUpdater = AtomicIntegerFieldUpdater.newUpdater(JournalFileImpl.class, "posCountField"); private static final AtomicIntegerFieldUpdater addRecordUpdate = AtomicIntegerFieldUpdater.newUpdater(JournalFileImpl.class, "addRecordField"); private static final AtomicIntegerFieldUpdater liveBytesUpdater = AtomicIntegerFieldUpdater.newUpdater(JournalFileImpl.class, "liveBytesField"); @@ -92,7 +99,7 @@ public class JournalFileImpl implements JournalFile { @Override public boolean isCanReclaim() { - return posReclaimCriteria && negReclaimCriteria && !file.isPending(); + return reclaimable && posReclaimCriteria && negReclaimCriteria && !file.isPending(); } @Override diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFilesRepository.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFilesRepository.java index 258ab1d14d..6cdf92cad9 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFilesRepository.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFilesRepository.java @@ -192,7 +192,7 @@ public class JournalFilesRepository { for (JournalFile file : files) { final long fileIdFromFile = file.getFileID(); - final long fileIdFromName = getFileNameID(file.getFile().getFileName()); + final long fileIdFromName = getFileNameID(filePrefix, file.getFile().getFileName()); // The compactor could create a fileName but use a previously assigned ID. // Because of that we need to take both parts into account @@ -718,11 +718,15 @@ public class JournalFilesRepository { /** * Get the ID part of the name */ - private long getFileNameID(final String fileName) { + public static long getFileNameID(String filePrefix, final String fileName) { try { return Long.parseLong(fileName.substring(filePrefix.length() + 1, fileName.indexOf('.'))); } catch (Throwable e) { - ActiveMQJournalLogger.LOGGER.errorRetrievingID(e, fileName); + try { + return Long.parseLong(fileName.substring(fileName.lastIndexOf("-") + 1, fileName.indexOf('.'))); + } catch (Throwable e2) { + ActiveMQJournalLogger.LOGGER.errorRetrievingID(e, fileName); + } return 0; } } diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java index d8e2e84e79..4bb2647916 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java @@ -16,16 +16,24 @@ */ package org.apache.activemq.artemis.core.journal.impl; +import java.io.File; +import java.io.FilenameFilter; import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.nio.file.StandardCopyOption; import java.security.AccessController; import java.security.PrivilegedAction; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Calendar; import java.util.Collections; import java.util.Comparator; +import java.util.GregorianCalendar; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; @@ -108,6 +116,9 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal * * */ public static final double UPDATE_FACTOR; + private static final String BKP_EXTENSION = "bkp"; + public static final String BKP = "." + BKP_EXTENSION; + static { String UPDATE_FACTOR_STR = System.getProperty(JournalImpl.class.getName() + ".UPDATE_FACTOR"); @@ -146,6 +157,9 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal // Record markers - they must be all unique + + public static final byte EVENT_RECORD = 10; + public static final byte ADD_RECORD = 11; public static final byte UPDATE_RECORD = 12; @@ -202,6 +216,75 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal private final JournalFilesRepository filesRepository; + private File journalRetentionFolder; + + private long journalRetentionPeriod = -1; + + private int journalRetentionMaxFiles = -1; + + private final List historyPendingFiles = Collections.synchronizedList(new LinkedList<>()); + + // This is to guarantee only one thread is making a copy of a file + // the processBackup is pretty much single threaded happening at the compactorExecutor + // there are a few exceptions like startup, or during a replica-copy-catch-up in a small possibility + private final Object processBackupLock = new Object(); + + @Override + public boolean isHistory() { + return journalRetentionFolder != null; + } + + @Override + public JournalImpl setHistoryFolder(File historyFolder, long maxBytes, long period) throws Exception { + + if (this.state != JournalState.STOPPED) { + throw new IllegalStateException("State = " + state); + } + this.journalRetentionFolder = historyFolder; + this.journalRetentionFolder.mkdirs(); + + this.journalRetentionMaxFiles = (int) (maxBytes / this.fileSize); + this.journalRetentionPeriod = period; + + + try { + List files = this.fileFactory.listFiles(BKP_EXTENSION); + + for (String name : files) { + SequentialFile file = fileFactory.createSequentialFile(name); + JournalFileImpl journalFile; + try { + file.open(); + journalFile = readFileHeader(file); + } finally { + file.close(); + } + historyPendingFiles.add(journalFile); + } + + + for (JournalFile file : historyPendingFiles) { + File[] repeatFiles = historyFolder.listFiles((a, name) -> name.startsWith(getFilePrefix()) && name.endsWith(file.getFileID() + "." + filesRepository.getFileExtension())); + + for (File f : repeatFiles) { + logger.warn("File " + f + " was partially copied before, removing the file"); + f.delete(); + } + } + + processBackup(); + } catch (Exception e) { + logger.warn(e.getMessage(), e); + if (criticalErrorListener != null) { + criticalErrorListener.onIOException(e, e.getMessage(), null); + } + } + + return this; + } + + + // Compacting may replace this structure private final ConcurrentLongHashMap records = new ConcurrentLongHashMap<>(); @@ -230,6 +313,8 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal protected ExecutorFactory ioExecutorFactory; private ThreadPoolExecutor threadPool; + ThreadLocal calendarThreadLocal = ThreadLocal.withInitial(() -> new GregorianCalendar()); + /** * We don't lock the journal during the whole compacting operation. During compacting we only * lock it (i) when gathering the initial structure, and (ii) when replicating the structures @@ -339,7 +424,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal final String fileExtension, final int maxAIO, final int userVersion) { - this(ioExecutors, fileSize, minFiles, poolSize, compactMinFiles, compactPercentage, journalFileOpenTimeout, fileFactory, filePrefix, fileExtension, maxAIO, userVersion, null, 0); + this(ioExecutors, fileSize, minFiles, poolSize, compactMinFiles, compactPercentage, journalFileOpenTimeout, fileFactory, filePrefix, fileExtension, maxAIO, userVersion, (a, b, c) -> logger.warn(a.getMessage(), a), 0); } @@ -510,7 +595,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal byte recordType = wholeFileBuffer.get(); - if (recordType < JournalImpl.ADD_RECORD || recordType > JournalImpl.ROLLBACK_RECORD) { + if (recordType < JournalImpl.EVENT_RECORD || recordType > JournalImpl.ROLLBACK_RECORD) { // I - We scan for any valid record on the file. If a hole // happened on the middle of the file we keep looking until all // the possibilities are gone @@ -711,6 +796,11 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal } switch (recordType) { + case EVENT_RECORD: { + reader.onReadEventRecord(new RecordInfo(recordID, userRecordType, record, false, compactCount)); + break; + } + case ADD_RECORD: { reader.onReadAddRecord(new RecordInfo(recordID, userRecordType, record, false, compactCount)); break; @@ -781,6 +871,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal lastDataPos = wholeFileBuffer.position(); } + reader.done(); return lastDataPos; } catch (Throwable e) { ActiveMQJournalLogger.LOGGER.errorReadingFile(e); @@ -846,16 +937,16 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal if (logger.isTraceEnabled()) { logger.trace("appendAddRecord::id=" + id + - ", userRecordType=" + - recordType + - ", record = " + record + - ", usedFile = " + - usedFile); + ", userRecordType=" + + recordType + + ", record = " + record + + ", usedFile = " + + usedFile); } result.set(true); } catch (ActiveMQShutdownException e) { result.fail(e); - logger.error("appendPrepareRecord:" + e, e); + logger.error("appendAddRecord:" + e, e); } catch (Throwable e) { result.fail(e); setErrorCondition(callback, null, e); @@ -870,6 +961,64 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal result.get(); } + + @Override + public void appendAddEvent(final long id, + final byte recordType, + final Persister persister, + final Object record, + final boolean sync, + final IOCompletion callback) throws Exception { + checkJournalIsLoaded(); + lineUpContext(callback); + + if (logger.isTraceEnabled()) { + logger.trace("scheduling appendAddEvent::id=" + id + + ", userRecordType=" + + recordType + + ", record = " + record); + } + + final long maxRecordSize = getMaxRecordSize(); + final JournalInternalRecord addRecord = new JournalAddRecord(JournalImpl.EVENT_RECORD, id, recordType, persister, record); + final int addRecordEncodeSize = addRecord.getEncodeSize(); + + if (addRecordEncodeSize > maxRecordSize) { + //The record size should be larger than max record size only on the large messages case. + throw ActiveMQJournalBundle.BUNDLE.recordLargerThanStoreMax(addRecordEncodeSize, maxRecordSize); + } + + final SimpleFuture result = newSyncAndCallbackResult(sync, callback); + appendExecutor.execute(() -> { + journalLock.readLock().lock(); + try { + JournalFile usedFile = appendRecord(addRecord, false, sync, null, callback); + + if (logger.isTraceEnabled()) { + logger.trace("appendAddEvent:id=" + id + + ", userRecordType=" + + recordType + + ", record = " + record + + ", usedFile = " + + usedFile); + } + result.set(true); + } catch (ActiveMQShutdownException e) { + result.fail(e); + logger.error("appendAddEvent:" + e, e); + } catch (Throwable e) { + result.fail(e); + setErrorCondition(callback, null, e); + logger.error("appendAddEvent::" + e, e); + } finally { + pendingRecords.remove(id); + journalLock.readLock().unlock(); + } + }); + + result.get(); + } + @Override public void appendUpdateRecord(final long id, final byte recordType, @@ -1676,7 +1825,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal onCompactStart(); - dataFilesToProcess = getDataListToProcess(); + dataFilesToProcess = getDataListToCompact(); if (dataFilesToProcess == null) return; @@ -1812,7 +1961,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal /** this private method will return a list of data files that need to be cleaned up. * It will get the list, and replace it on the journal structure, while a separate thread would be able * to read it, and append to a new list that will be replaced on the journal. */ - private ArrayList getDataListToProcess() throws Exception { + private ArrayList getDataListToCompact() throws Exception { ArrayList dataFilesToProcess = new ArrayList<>(filesRepository.getDataFilesCount()); // We need to guarantee that the journal is frozen for this short time // We don't freeze the journal as we compact, only for the short time where we replace records @@ -1858,6 +2007,8 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal journalLock.writeLock().unlock(); } + processBackup(); + for (JournalFile file : dataFilesToProcess) { file.getFile().waitNotPending(); } @@ -2216,7 +2367,9 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal } } - checkReclaimStatus(); + if (changeData) { + checkReclaimStatus(); + } return new JournalLoadInformation(records.size(), maxID.longValue()); } @@ -2245,6 +2398,183 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal } } + + @Override + public void processBackupCleanup() { + if (logger.isDebugEnabled()) { + logger.debug("processBackupCleanup with maxFiles = " + journalRetentionMaxFiles + " and period = " + journalRetentionPeriod); + } + if (journalRetentionFolder != null && (journalRetentionMaxFiles > 0 || journalRetentionPeriod > 0)) { + + FilenameFilter fnf = (d, name) -> name.endsWith("." + filesRepository.getFileExtension()); + + if (journalRetentionPeriod > 0) { + String[] fileNames = journalRetentionFolder.list(fnf); + Arrays.sort(fileNames); + + GregorianCalendar calendar = this.calendarThreadLocal.get(); + calendar.setTimeInMillis(System.currentTimeMillis() - journalRetentionPeriod); + long timeCutOf = calendar.getTimeInMillis(); + + for (String fileName : fileNames) { + long timeOnFile = getDatePortionMillis(fileName); + if (timeOnFile < timeCutOf) { + logger.debug("File " + fileName + " is too old and should go"); + File fileToRemove = new File(journalRetentionFolder, fileName); + if (!fileToRemove.delete()) { + logger.debug("Could not remove " + fileToRemove); + } + } else { + break; + } + } + } + + if (journalRetentionMaxFiles > 0) { + String[] fileNames = journalRetentionFolder.list(fnf); + Arrays.sort(fileNames); + + if (fileNames.length > journalRetentionMaxFiles) { + int toRemove = fileNames.length - journalRetentionMaxFiles; + + for (String file : fileNames) { + logger.debug("Removing " + file); + File fileToRemove = new File(journalRetentionFolder, file); + fileToRemove.delete(); + toRemove--; + if (toRemove <= 0) { + break; + } + } + } + } + + + } + } + + /** With the exception of initialization, this has to be always called within the compactorExecutor */ + @Override + public void processBackup() { + if (this.journalRetentionFolder == null) { + return; + } + + synchronized (processBackupLock) { + ArrayList filesToMove; + filesToMove = new ArrayList<>(historyPendingFiles.size()); + filesToMove.addAll(historyPendingFiles); + historyPendingFiles.clear(); + + for (JournalFile fileToCopy : filesToMove) { + copyFile(fileToCopy); + } + } + + if (compactorExecutor != null) { + compactorExecutor.execute(this::processBackupCleanup); + } else { + processBackupCleanup(); + } + } + + // This exists to avoid a race with copying the files on initial replica + // we get the list, and check each individual file if they have the pending copy + private void checkRetentionFile(JournalFile file) { + if (this.journalRetentionFolder == null) { + return; + } + + // It is cheaper to check without a lock + if (!file.getFile().getFileName().endsWith(BKP)) { + return; + } + + copyFile(file); + } + + // you need to synchronize processBackupLock before calling this + private void copyFile(JournalFile fileToCopy) { + synchronized (processBackupLock) { + if (fileToCopy == null || !fileToCopy.getFile().getFileName().endsWith(BKP)) { + return; + } + + long fileId = fileToCopy.getFileID(); + + GregorianCalendar calendar = calendarThreadLocal.get(); + + calendar.setTimeInMillis(System.currentTimeMillis()); + String fileName = getHistoryFileName(fileId, calendar); + + File copyFrom = fileToCopy.getFile().getJavaFile(); + + File copyTo = new File(journalRetentionFolder, fileName); + + if (logger.isDebugEnabled()) { + logger.debug("Copying journal retention from " + copyFrom + " to " + copyTo); + } + + try { + Files.copy(copyFrom.toPath(), copyTo.toPath(), StandardCopyOption.REPLACE_EXISTING); + } catch (IOException e) { + logger.warn(e.getMessage(), e); + try { + criticalIO(e); + } catch (Exception ignored) { + } + } + + try { + fileToCopy.getFile().renameTo(removeBackupExtension(fileToCopy.getFile().getFileName())); + } catch (Exception e) { + logger.warn(e.getMessage(), e); + if (criticalErrorListener != null) { + criticalErrorListener.onIOException(e, e.getMessage(), fileToCopy.getFile()); + } + } + + fileToCopy.setReclaimable(true); + } + } + + public String getHistoryFileName(long sequence, Calendar calendar) { + + String fileName = String.format("%s-%04d%02d%02d%02d%02d%02d-%d.%s", filesRepository.getFilePrefix(), calendar.get(Calendar.YEAR), + calendar.get(Calendar.MONTH), calendar.get(Calendar.DAY_OF_MONTH), calendar.get(Calendar.HOUR_OF_DAY), + calendar.get(Calendar.MINUTE), calendar.get(Calendar.SECOND), sequence, filesRepository.getFileExtension()); + + return fileName; + } + + public String removeBackupExtension(String name) { + int indexOfBKP = name.indexOf(BKP); + if (indexOfBKP >= 0) { + return name.substring(0, name.indexOf(BKP)); + } else { + return name; + } + } + + public long getDatePortionMillis(String name) { + String datePortion = getDatePortion(name); + GregorianCalendar calendar = calendarThreadLocal.get(); + + int year = Integer.parseInt(datePortion.substring(0, 4)); + int month = Integer.parseInt(datePortion.substring(4, 6)); + int day = Integer.parseInt(datePortion.substring(6, 8)); + int hour = Integer.parseInt(datePortion.substring(8, 10)); + int minutes = Integer.parseInt(datePortion.substring(10, 12)); + int seconds = Integer.parseInt(datePortion.substring(12, 14)); + + calendar.set(year, month, day, hour, minutes, seconds); + return calendar.getTimeInMillis(); + + } + public String getDatePortion(String name) { + return name.substring(filesRepository.getFilePrefix().length() + 1, name.indexOf("-", filesRepository.getFilePrefix().length() + 1)); + } + /** * @return true if cleanup was called */ @@ -2487,7 +2817,11 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal @Override public JournalFile[] getDataFiles() { - return filesRepository.getDataFilesArray(); + JournalFile[] files = filesRepository.getDataFilesArray(); + for (JournalFile file : files) { + checkRetentionFile(file); + } + return files; } @Override @@ -2775,7 +3109,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal } private static boolean isContainsBody(final byte recordType) { - return recordType >= JournalImpl.ADD_RECORD && recordType <= JournalImpl.DELETE_RECORD_TX; + return recordType >= JournalImpl.EVENT_RECORD && recordType <= JournalImpl.DELETE_RECORD_TX; } private static int getRecordSize(final byte recordType, final int journalVersion) { @@ -2783,6 +3117,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal int recordSize = 0; switch (recordType) { case ADD_RECORD: + case EVENT_RECORD: recordSize = JournalImpl.SIZE_ADD_RECORD; break; case UPDATE_RECORD: @@ -2827,7 +3162,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal * @return * @throws Exception */ - private JournalFileImpl readFileHeader(final SequentialFile file) throws Exception { + public JournalFileImpl readFileHeader(final SequentialFile file) throws Exception { ByteBuffer bb = fileFactory.newBuffer(JournalImpl.SIZE_HEADER); file.read(bb); @@ -2973,6 +3308,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal @Override public void run() { try { + processBackup(); if (!checkReclaimStatus()) { checkCompact(); } @@ -3257,6 +3593,13 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal protected void moveNextFile(final boolean scheduleReclaim, boolean blockOnClose) throws Exception { filesRepository.closeFile(currentFile, blockOnClose); + + if (this.journalRetentionFolder != null) { + currentFile.setReclaimable(false); + currentFile.getFile().renameTo(currentFile.getFile().getFileName() + BKP); + this.historyPendingFiles.add(currentFile); + } + currentFile = filesRepository.openFile(); if (scheduleReclaim) { diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalReaderCallback.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalReaderCallback.java index ae14ebdcdf..eabdae9080 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalReaderCallback.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalReaderCallback.java @@ -20,6 +20,12 @@ import org.apache.activemq.artemis.core.journal.RecordInfo; public interface JournalReaderCallback { + default void onReadEventRecord(RecordInfo info) throws Exception { + } + + default void done() { + } + void onReadAddRecord(RecordInfo info) throws Exception; /** diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/dataformat/JournalAddRecord.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/dataformat/JournalAddRecord.java index fe4d432830..8067da6e3e 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/dataformat/JournalAddRecord.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/dataformat/JournalAddRecord.java @@ -30,7 +30,24 @@ public class JournalAddRecord extends JournalInternalRecord { protected final byte recordType; - protected final boolean add; + protected final byte journalType; + + /** + * @param id + * @param recordType + * @param record + */ + public JournalAddRecord(final byte journalType, final long id, final byte recordType, final Persister persister, Object record) { + this.id = id; + + this.record = record; + + this.recordType = recordType; + + this.journalType = journalType; + + this.persister = persister; + } /** * @param id @@ -38,24 +55,12 @@ public class JournalAddRecord extends JournalInternalRecord { * @param record */ public JournalAddRecord(final boolean add, final long id, final byte recordType, final Persister persister, Object record) { - this.id = id; - - this.record = record; - - this.recordType = recordType; - - this.add = add; - - this.persister = persister; + this(add ? JournalImpl.ADD_RECORD : JournalImpl.UPDATE_RECORD, id, recordType, persister, record); } @Override public void encode(final ActiveMQBuffer buffer) { - if (add) { - buffer.writeByte(JournalImpl.ADD_RECORD); - } else { - buffer.writeByte(JournalImpl.UPDATE_RECORD); - } + buffer.writeByte(journalType); buffer.writeInt(fileID); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java index 2ff3c3f866..d6301f862f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; +import java.util.concurrent.TimeUnit; import org.apache.activemq.artemis.api.core.QueueConfiguration; import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration; @@ -668,6 +669,24 @@ public interface Configuration { */ Configuration setJournalDirectory(String dir); + String getJournalRetentionDirectory(); + + /** + * Sets the file system directory used to store historical backup journal. + */ + Configuration setJournalRetentionDirectory(String dir); + + File getJournalRetentionLocation(); + + /** The retention period for the journal in milliseconds (always in milliseconds, a conversion is performed on set) */ + long getJournalRetentionPeriod(); + + Configuration setJournalRetentionPeriod(TimeUnit unit, long limit); + + long getJournalRetentionMaxBytes(); + + Configuration setJournalRetentionMaxBytes(long bytes); + /** * Returns the type of journal used by this server ({@code NIO}, {@code ASYNCIO} or {@code MAPPED}). *
@@ -1350,4 +1369,5 @@ public interface Configuration { String getTemporaryQueueNamespace(); Configuration setTemporaryQueueNamespace(String temporaryQueueNamespace); + } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java index 5ff3e9c7f4..d36a804566 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java @@ -39,6 +39,7 @@ import java.util.Map; import java.util.Properties; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.TimeUnit; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.core.BroadcastGroupConfiguration; @@ -199,6 +200,12 @@ public class ConfigurationImpl implements Configuration, Serializable { protected String journalDirectory = ActiveMQDefaultConfiguration.getDefaultJournalDir(); + protected String journalRetentionDirectory = null; + + protected long journalRetentionMaxBytes = 0; + + protected long journalRetentionPeriod; + protected String nodeManagerLockDirectory = null; protected boolean createJournalDir = ActiveMQDefaultConfiguration.isDefaultCreateJournalDir(); @@ -367,6 +374,52 @@ public class ConfigurationImpl implements Configuration, Serializable { // Public ------------------------------------------------------------------------- + @Override + public String getJournalRetentionDirectory() { + return journalRetentionDirectory; + } + + @Override + public ConfigurationImpl setJournalRetentionDirectory(String dir) { + this.journalRetentionDirectory = dir; + return this; + } + + @Override + public File getJournalRetentionLocation() { + if (journalRetentionDirectory == null) { + return null; + } else { + return subFolder(getJournalRetentionDirectory()); + } + } + + @Override + public long getJournalRetentionPeriod() { + return this.journalRetentionPeriod; + } + + @Override + public Configuration setJournalRetentionPeriod(TimeUnit unit, long period) { + if (period <= 0) { + this.journalRetentionPeriod = -1; + } else { + this.journalRetentionPeriod = unit.toMillis(period); + } + return this; + } + + @Override + public long getJournalRetentionMaxBytes() { + return journalRetentionMaxBytes; + } + + @Override + public ConfigurationImpl setJournalRetentionMaxBytes(long bytes) { + this.journalRetentionMaxBytes = bytes; + return this; + } + @Override public Configuration setSystemPropertyPrefix(String systemPropertyPrefix) { this.systemPropertyPrefix = systemPropertyPrefix; @@ -2505,7 +2558,7 @@ public class ConfigurationImpl implements Configuration, Serializable { /** * It will find the right location of a subFolder, related to artemisInstance */ - private File subFolder(String subFolder) { + public File subFolder(String subFolder) { try { return getBrokerInstance().toPath().resolve(subFolder).toFile(); } catch (Exception e) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java index 3b429cc0b9..494829bca7 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java @@ -31,6 +31,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; import org.apache.activemq.artemis.ArtemisConstants; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; @@ -626,6 +627,9 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { config.setJournalDirectory(getString(e, "journal-directory", config.getJournalDirectory(), Validators.NOT_NULL_OR_EMPTY)); + + parseJournalRetention(e, config); + config.setNodeManagerLockDirectory(getString(e, "node-manager-lock-directory", null, Validators.NO_CHECK)); config.setPageMaxConcurrentIO(getInteger(e, "page-max-concurrent-io", config.getPageMaxConcurrentIO(), Validators.MINUS_ONE_OR_GT_ZERO)); @@ -768,6 +772,47 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { } } + + private void parseJournalRetention(final Element e, final Configuration config) { + NodeList retention = e.getElementsByTagName("journal-retention-directory"); + + if (retention.getLength() != 0) { + Element node = (Element) retention.item(0); + + String directory = node.getTextContent().trim(); + + String storageLimitStr = getAttributeValue(node, "storage-limit"); + long storageLimit; + + if (storageLimitStr == null) { + storageLimit = -1; + } else { + storageLimit = ByteUtil.convertTextBytes(storageLimitStr.trim()); + } + int period = getAttributeInteger(node, "period", -1, Validators.GT_ZERO); + String unitStr = getAttributeValue(node, "unit"); + + if (unitStr == null) { + unitStr = "DAYS"; + } + + TimeUnit unit = TimeUnit.valueOf(unitStr.toUpperCase()); + + config.setJournalRetentionDirectory(directory); + config.setJournalRetentionMaxBytes(storageLimit); + config.setJournalRetentionPeriod(unit, period); + + if (directory == null || directory.equals("")) { + throw new IllegalArgumentException("journal-retention-directory=null"); + } + + if (storageLimit == -1 && period == -1) { + throw new IllegalArgumentException("configure either storage-limit or period on journal-retention-directory"); + } + + } + } + /** * @param e * @param config diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java index 099caf79bb..38ac0b402c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java @@ -449,6 +449,13 @@ public final class Page implements Comparable { } public synchronized void write(final PagedMessage message) throws Exception { + writeDirect(message); + storageManager.pageWrite(message, pageId); + } + + /** This write will not interact back with the storage manager. + * To avoid ping pongs with Journal retaining events and any other stuff. */ + public void writeDirect(PagedMessage message) throws Exception { if (!file.isOpen()) { throw ActiveMQMessageBundle.BUNDLE.cannotWriteToClosedFile(file); } @@ -471,7 +478,6 @@ public final class Page implements Comparable { //lighter than addAndGet when single writer numberOfMessages.lazySet(numberOfMessages.get() + 1); size.lazySet(size.get() + bufferSize); - storageManager.pageWrite(message, pageId); } public void sync() throws Exception { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java index 06b3fe6916..632d8b9786 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java @@ -1550,6 +1550,10 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp bindingsJournal.start(); + + if (config.getJournalRetentionLocation() != null) { + messageJournal.setHistoryFolder(config.getJournalRetentionLocation(), config.getJournalRetentionMaxBytes(), config.getJournalRetentionPeriod()); + } messageJournal.start(); started = true; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/BufferSplitter.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/BufferSplitter.java new file mode 100644 index 0000000000..fe812cf710 --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/BufferSplitter.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.persistence.impl.journal; + + +import java.util.function.Consumer; + +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.core.journal.EncodingSupport; + +/** this class will split a big buffer into smaller buffers */ +public class BufferSplitter { + + + public static void split(ActiveMQBuffer buffer, int splitSize, Consumer target) { + byte[] bytesBuffer = new byte[buffer.readableBytes()]; + buffer.readBytes(bytesBuffer); + split(bytesBuffer, splitSize, target); + } + + public static void split(byte[] buffer, int splitSize, Consumer target) { + + int location = 0; + while (location < buffer.length) { + int maxSize = Math.min(splitSize, buffer.length - location); + target.accept(new PartialEncoding(buffer, location, maxSize)); + location += maxSize; + } + + } + + protected static class PartialEncoding implements EncodingSupport { + + final byte[] data; + final int begin; + final int length; + + public PartialEncoding(final byte[] data, final int begin, final int length) { + this.data = data; + this.begin = begin; + this.length = length; + } + + // Public -------------------------------------------------------- + + @Override + public void decode(final ActiveMQBuffer buffer) { + throw new IllegalStateException("operation not supported"); + } + + @Override + public void encode(final ActiveMQBuffer buffer) { + buffer.writeBytes(data, begin, length); + } + + @Override + public int getEncodeSize() { + return length; + } + } + + +} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java index 1fc54dae9c..6b63074e9f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java @@ -213,6 +213,10 @@ public final class DescribeJournal { recordsPrintStream.println("#" + file + " (size=" + file.getFile().size() + ")"); JournalImpl.readJournalFile(fileFactory, file, new JournalReaderCallback() { + @Override + public void onReadEventRecord(RecordInfo recordInfo) throws Exception { + recordsPrintStream.println("operation@Event;" + describeRecord(recordInfo, safe)); + } @Override public void onReadUpdateRecordTX(final long transactionID, final RecordInfo recordInfo) throws Exception { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalRecordIds.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalRecordIds.java index 5644324c92..7bd371fb4e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalRecordIds.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalRecordIds.java @@ -96,4 +96,7 @@ public final class JournalRecordIds { public static final byte ROLE_RECORD = 48; + // Used to record the large message body on the journal when history is on + public static final byte ADD_MESSAGE_BODY = 49; + } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java index 8f9ad41a49..509d5dfae7 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java @@ -46,6 +46,8 @@ import org.apache.activemq.artemis.core.io.SequentialFileFactory; import org.apache.activemq.artemis.core.io.aio.AIOSequentialFileFactory; import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory; import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory; +import org.apache.activemq.artemis.core.journal.EncoderPersister; +import org.apache.activemq.artemis.core.journal.EncodingSupport; import org.apache.activemq.artemis.core.journal.Journal; import org.apache.activemq.artemis.core.journal.impl.JournalFile; import org.apache.activemq.artemis.core.journal.impl.JournalImpl; @@ -55,6 +57,7 @@ import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.persistence.OperationContext; import org.apache.activemq.artemis.core.persistence.impl.journal.codec.LargeMessagePersister; import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PendingLargeMessageEncoding; +import org.apache.activemq.artemis.core.persistence.impl.journal.codec.RefEncoding; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationLiveIsStoppingMessage; import org.apache.activemq.artemis.core.replication.ReplicatedJournal; import org.apache.activemq.artemis.core.replication.ReplicationManager; @@ -405,6 +408,23 @@ public class JournalStorageManager extends AbstractJournalStorageManager { @Override public void pageWrite(final PagedMessage message, final int pageNumber) { + if (messageJournal.isHistory()) { + try (ArtemisCloseable lock = closeableReadLock()) { + + Message theMessage = message.getMessage(); + + if (theMessage.isLargeMessage() && theMessage instanceof LargeServerMessageImpl) { + messageJournal.appendAddEvent(theMessage.getMessageID(), JournalRecordIds.ADD_LARGE_MESSAGE, LargeMessagePersister.getInstance(), theMessage, false, getContext(false)); + } else { + messageJournal.appendAddEvent(theMessage.getMessageID(), JournalRecordIds.ADD_MESSAGE_PROTOCOL, theMessage.getPersister(), theMessage, false, getContext(false)); + } + for (long queueID : message.getQueueIDs()) { + messageJournal.appendAddEvent(message.getMessage().getMessageID(), JournalRecordIds.ADD_REF, EncoderPersister.getInstance(), new RefEncoding(queueID), false, getContext(false)); + } + } catch (Exception e) { + logger.warn(e.getMessage(), e); + } + } if (isReplicated()) { // Note: (https://issues.jboss.org/browse/HORNETQ-1059) // We have to replicate durable and non-durable messages on paging @@ -840,6 +860,9 @@ public class JournalStorageManager extends AbstractJournalStorageManager { final long messageId, final ActiveMQBuffer bytes) throws Exception { try (ArtemisCloseable lock = closeableReadLock()) { + if (messageJournal.isHistory()) { + BufferSplitter.split(bytes, 10 * 1024, (c) -> historyBody(messageId, c)); + } file.position(file.size()); if (bytes.byteBuf() != null && bytes.byteBuf().nioBufferCount() == 1) { final ByteBuffer nioBytes = bytes.byteBuf().internalNioBuffer(bytes.readerIndex(), bytes.readableBytes()); @@ -859,11 +882,24 @@ public class JournalStorageManager extends AbstractJournalStorageManager { } } + private void historyBody(long messageId, EncodingSupport partialBuffer) { + + try { + messageJournal.appendAddEvent(messageId, JournalRecordIds.ADD_MESSAGE_BODY, EncoderPersister.getInstance(), partialBuffer, true, null); + } catch (Exception e) { + logger.warn("Error processing history large message body for " + messageId + " - " + e.getMessage(), e); + } + + } + @Override public final void addBytesToLargeMessage(final SequentialFile file, final long messageId, final byte[] bytes) throws Exception { try (ArtemisCloseable lock = closeableReadLock()) { + if (messageJournal.isHistory()) { + BufferSplitter.split(bytes, 10 * 1024, (c) -> historyBody(messageId, c)); + } file.position(file.size()); //that's an additional precaution to avoid ByteBuffer to be pooled: //NIOSequentialFileFactory doesn't pool heap ByteBuffer, but better to make evident diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationAddMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationAddMessage.java index 4a5a8b5bcd..90e42630bb 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationAddMessage.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationAddMessage.java @@ -78,7 +78,7 @@ public final class ReplicationAddMessage extends PacketImpl { @Override public void encodeRest(final ActiveMQBuffer buffer) { buffer.writeByte(journalID); - buffer.writeBoolean(operation.toBoolean()); + buffer.writeByte(operation.toRecord()); buffer.writeLong(id); buffer.writeByte(journalRecordType); buffer.writeInt(persister.getEncodeSize(encodingData)); @@ -88,7 +88,7 @@ public final class ReplicationAddMessage extends PacketImpl { @Override public void decodeRest(final ActiveMQBuffer buffer) { journalID = buffer.readByte(); - operation = ADD_OPERATION_TYPE.toOperation(buffer.readBoolean()); + operation = ADD_OPERATION_TYPE.toOperation(buffer.readByte()); id = buffer.readLong(); journalRecordType = buffer.readByte(); final int recordDataSize = buffer.readInt(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationAddTXMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationAddTXMessage.java index fd7946a3e6..0ed85b682c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationAddTXMessage.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationAddTXMessage.java @@ -83,7 +83,7 @@ public class ReplicationAddTXMessage extends PacketImpl { @Override public void encodeRest(final ActiveMQBuffer buffer) { buffer.writeByte(journalID); - buffer.writeBoolean(operation.toBoolean()); + buffer.writeByte(operation.toRecord()); buffer.writeLong(txId); buffer.writeLong(id); buffer.writeByte(recordType); @@ -94,7 +94,7 @@ public class ReplicationAddTXMessage extends PacketImpl { @Override public void decodeRest(final ActiveMQBuffer buffer) { journalID = buffer.readByte(); - operation = ADD_OPERATION_TYPE.toOperation(buffer.readBoolean()); + operation = ADD_OPERATION_TYPE.toOperation(buffer.readByte()); txId = buffer.readLong(); id = buffer.readLong(); recordType = buffer.readByte(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java index 21d7fc16c7..4661149183 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java @@ -129,6 +129,20 @@ public class ReplicatedJournal implements Journal { localJournal.appendAddRecord(id, recordType, persister, record, sync, completionCallback); } + @Override + public void appendAddEvent(long id, + byte recordType, + Persister persister, + Object record, + boolean sync, + IOCompletion completionCallback) throws Exception { + if (log.isTraceEnabled()) { + log.trace("Append record id = " + id + " recordType = " + recordType); + } + replicationManager.appendUpdateRecord(journalID, ADD_OPERATION_TYPE.EVENT, id, recordType, persister, record); + localJournal.appendAddEvent(id, recordType, persister, record, sync, completionCallback); + } + /** * @param txID * @param id diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java index da26fa06f8..12202609c2 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java @@ -39,11 +39,13 @@ import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; import org.apache.activemq.artemis.core.io.SequentialFile; +import org.apache.activemq.artemis.core.journal.EncoderPersister; import org.apache.activemq.artemis.core.journal.Journal; import org.apache.activemq.artemis.core.journal.Journal.JournalState; import org.apache.activemq.artemis.core.journal.JournalLoadInformation; import org.apache.activemq.artemis.core.journal.impl.FileWrapperJournal; import org.apache.activemq.artemis.core.journal.impl.JournalFile; +import org.apache.activemq.artemis.core.journal.impl.dataformat.ByteArrayEncoding; import org.apache.activemq.artemis.core.paging.PagedMessage; import org.apache.activemq.artemis.core.paging.PagingManager; import org.apache.activemq.artemis.core.paging.impl.Page; @@ -750,16 +752,25 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon */ private void handleAppendAddRecord(final ReplicationAddMessage packet) throws Exception { Journal journalToUse = getJournal(packet.getJournalID()); - if (packet.getRecord() == ADD_OPERATION_TYPE.UPDATE) { - if (logger.isTraceEnabled()) { - logger.trace("Endpoint appendUpdate id = " + packet.getId()); - } - journalToUse.appendUpdateRecord(packet.getId(), packet.getJournalRecordType(), packet.getRecordData(), noSync); - } else { - if (logger.isTraceEnabled()) { - logger.trace("Endpoint append id = " + packet.getId()); - } - journalToUse.appendAddRecord(packet.getId(), packet.getJournalRecordType(), packet.getRecordData(), noSync); + switch (packet.getRecord()) { + case UPDATE: + if (logger.isTraceEnabled()) { + logger.trace("Endpoint appendUpdate id = " + packet.getId()); + } + journalToUse.appendUpdateRecord(packet.getId(), packet.getJournalRecordType(), packet.getRecordData(), noSync); + break; + case ADD: + if (logger.isTraceEnabled()) { + logger.trace("Endpoint append id = " + packet.getId()); + } + journalToUse.appendAddRecord(packet.getId(), packet.getJournalRecordType(), packet.getRecordData(), noSync); + break; + case EVENT: + if (logger.isTraceEnabled()) { + logger.trace("Endpoint append id = " + packet.getId()); + } + journalToUse.appendAddEvent(packet.getId(), packet.getJournalRecordType(), EncoderPersister.getInstance(), new ByteArrayEncoding(packet.getRecordData()), noSync, null); + break; } } @@ -800,7 +811,7 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon pgdMessage.initMessage(storageManager); Message msg = pgdMessage.getMessage(); Page page = getPage(msg.getAddressSimpleString(), packet.getPageNumber()); - page.write(pgdMessage); + page.writeDirect(pgdMessage); } private ConcurrentMap getPageMap(final SimpleString storeName) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java index fa82ff6f63..ccebb6a3a0 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java @@ -103,20 +103,33 @@ public final class ReplicationManager implements ActiveMQComponent { public enum ADD_OPERATION_TYPE { UPDATE { @Override - public boolean toBoolean() { - return true; + public byte toRecord() { + return 0; } }, ADD { @Override - public boolean toBoolean() { - return false; + public byte toRecord() { + return 1; + } + }, EVENT { + @Override + public byte toRecord() { + return 2; } }; - public abstract boolean toBoolean(); + public abstract byte toRecord(); - public static ADD_OPERATION_TYPE toOperation(boolean isUpdate) { - return isUpdate ? UPDATE : ADD; + public static ADD_OPERATION_TYPE toOperation(byte recordType) { + switch (recordType) { + case 0: // 0: it used to be false, we need to use 0 for compatibility reasons with writeBoolean on the channel + return UPDATE; + case 1: // 1: it used to be true, we need to use 1 for compatibility reasons with writeBoolean + return ADD; + case 2: // 2: this represents the new value + return EVENT; + } + return ADD; } } diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd index fb9f0fe7fa..7698008d9a 100644 --- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd +++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd @@ -675,6 +675,53 @@ + + + + the directory to store journal-retention message in and rention configuraion. + + + + + + + + + This configures the period type to use on limit. By default it is DAYS. + + + + + + + + + + + + + + + The amount of time used to keep files. + + + + + + + Size (in bytes) before we starting removing files from the retention area. + this is an extra protection on top of the period. + Notice we first remove files based on period and if you're using more storage then you + configured we start removing older files. + By default this is unlimited (not filled). + Supports byte notation like "K", "Mb", "GB", etc. + + + + + + + diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java index 389bbc74cb..a4b0938b10 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java @@ -17,10 +17,12 @@ package org.apache.activemq.artemis.core.config.impl; import java.io.ByteArrayInputStream; +import java.io.PrintStream; import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.core.SimpleString; @@ -38,6 +40,7 @@ import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.utils.DefaultSensitiveStringCodec; import org.apache.activemq.artemis.utils.PasswordMaskingUtil; +import org.apache.activemq.artemis.utils.StringPrintStream; import org.junit.Assert; import org.junit.Test; @@ -390,6 +393,126 @@ public class FileConfigurationParserTest extends ActiveMQTestBase { assertEquals(expected, config.getPageSyncTimeout()); } + + @Test + public void testMinimalXML() throws Exception { + StringPrintStream stringPrintStream = new StringPrintStream(); + PrintStream stream = stringPrintStream.newStream(); + + stream.println(""); + stream.println(""); + + ByteArrayInputStream inputStream = new ByteArrayInputStream(stringPrintStream.getBytes()); + FileConfigurationParser parser = new FileConfigurationParser(); + Configuration configuration = parser.parseMainConfig(inputStream); + } + + @Test + public void testRetentionJournalOptionsDays() throws Exception { + testStreamDatesOption("DAYS", TimeUnit.DAYS); + } + + @Test + public void testRetentionJournalOptionsHours() throws Exception { + testStreamDatesOption("HOURS", TimeUnit.HOURS); + } + + @Test + public void testRetentionJournalOptionsMinutes() throws Exception { + testStreamDatesOption("MINUTES", TimeUnit.MINUTES); + } + + @Test + public void testRetentionJournalOptionsSeconds() throws Exception { + testStreamDatesOption("SECONDS", TimeUnit.SECONDS); + } + + private void testStreamDatesOption(String option, TimeUnit expected) throws Exception { + StringPrintStream stringPrintStream = new StringPrintStream(); + PrintStream stream = stringPrintStream.newStream(); + + stream.println(""); + stream.println("history"); + stream.println(""); + + ByteArrayInputStream inputStream = new ByteArrayInputStream(stringPrintStream.getBytes()); + FileConfigurationParser parser = new FileConfigurationParser(); + Configuration configuration = parser.parseMainConfig(inputStream); + + Assert.assertEquals("history", configuration.getJournalRetentionDirectory()); + + Assert.assertEquals(expected.toMillis(365), configuration.getJournalRetentionPeriod()); + } + + + @Test + public void unlimitedJustHistory() throws Throwable { + StringPrintStream stringPrintStream = new StringPrintStream(); + PrintStream stream = stringPrintStream.newStream(); + + stream.println(""); + stream.println("directory"); + stream.println(""); + + ByteArrayInputStream inputStream = new ByteArrayInputStream(stringPrintStream.getBytes()); + FileConfigurationParser parser = new FileConfigurationParser(); + Configuration configuration = null; + boolean exceptionHappened = false; + try { + configuration = parser.parseMainConfig(inputStream); + } catch (Exception e) { + exceptionHappened = true; + } + + Assert.assertTrue(exceptionHappened); + } + + + + @Test + public void noRetention() throws Throwable { + StringPrintStream stringPrintStream = new StringPrintStream(); + PrintStream stream = stringPrintStream.newStream(); + + stream.println(""); + stream.println("journal"); + stream.println(""); + + ByteArrayInputStream inputStream = new ByteArrayInputStream(stringPrintStream.getBytes()); + FileConfigurationParser parser = new FileConfigurationParser(); + Configuration configuration = null; + configuration = parser.parseMainConfig(inputStream); + Assert.assertNull(configuration.getJournalRetentionLocation()); + Assert.assertNull(configuration.getJournalRetentionDirectory()); + Assert.assertEquals("journal", configuration.getJournalDirectory()); + } + + + @Test + public void noFolderOnRetention() throws Throwable { + StringPrintStream stringPrintStream = new StringPrintStream(); + PrintStream stream = stringPrintStream.newStream(); + + stream.println(""); + stream.println(""); + stream.println(""); + FileConfigurationParser parser = new FileConfigurationParser(); + ByteArrayInputStream inputStream = new ByteArrayInputStream(stringPrintStream.getBytes()); + boolean exception = false; + try { + Configuration configuration = parser.parseMainConfig(inputStream); + } catch (Exception e) { + exception = true; + } + + Assert.assertTrue(exception); + + + + } + + + private static String firstPart = "" + "\n" + "ActiveMQ.main.config" + "\n" + "org.apache.activemq.artemis.integration.logging.Log4jLogDelegateFactory" + "\n" + diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java index a258557b21..926a78bab2 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java @@ -27,6 +27,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; import io.micrometer.core.instrument.MeterRegistry; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; @@ -131,6 +132,9 @@ public class FileConfigurationTest extends ConfigurationImplTest { Assert.assertEquals("max concurrent io", 17, conf.getPageMaxConcurrentIO()); Assert.assertEquals(true, conf.isReadWholePage()); Assert.assertEquals("somedir2", conf.getJournalDirectory()); + Assert.assertEquals("history", conf.getJournalRetentionDirectory()); + Assert.assertEquals(10L * 1024L * 1024L * 1024L, conf.getJournalRetentionMaxBytes()); + Assert.assertEquals(TimeUnit.DAYS.toMillis(365), conf.getJournalRetentionPeriod()); Assert.assertEquals(false, conf.isCreateJournalDir()); Assert.assertEquals(JournalType.NIO, conf.getJournalType()); Assert.assertEquals(10000, conf.getJournalBufferSize_NIO()); diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/BufferSplitterTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/BufferSplitterTest.java new file mode 100644 index 0000000000..44d7c001f1 --- /dev/null +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/BufferSplitterTest.java @@ -0,0 +1,52 @@ +/* + * Copyright The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.persistence.impl.journal; + +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.ActiveMQBuffers; +import org.apache.activemq.artemis.utils.DataConstants; +import org.junit.Assert; +import org.junit.Test; + +public class BufferSplitterTest { + + @Test + public void testSplitting() { + ActiveMQBuffer buffer = ActiveMQBuffers.fixedBuffer(1000 * DataConstants.SIZE_INT); + + for (int i = 0; i < 1000; i++) { + buffer.writeInt(i); + } + + ActiveMQBuffer outputBuffer = ActiveMQBuffers.fixedBuffer(1000 * DataConstants.SIZE_INT); + + BufferSplitter.split(buffer, 77, (c) -> { + Assert.assertTrue(c.getEncodeSize() <= 77); + c.encode(outputBuffer); + }); + + outputBuffer.resetReaderIndex(); + buffer.resetReaderIndex(); + + byte[] sourceBytes = new byte[1000 * DataConstants.SIZE_INT]; + buffer.readBytes(sourceBytes); + byte[] targetBytes = new byte[1000 * DataConstants.SIZE_INT]; + outputBuffer.readBytes(targetBytes); + + Assert.assertArrayEquals(sourceBytes, targetBytes); + } + +} diff --git a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml index b6ed421b3b..a576035f18 100644 --- a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml +++ b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml @@ -406,6 +406,7 @@ 17 true somedir2 + history false NIO 1000 diff --git a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml index 37d447ec48..ed25c0cb95 100644 --- a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml +++ b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml @@ -275,6 +275,7 @@ 17 true somedir2 + history false NIO 1000 diff --git a/docs/user-manual/en/data-tools.md b/docs/user-manual/en/data-tools.md index fb7443fd9f..51fee2942f 100644 --- a/docs/user-manual/en/data-tools.md +++ b/docs/user-manual/en/data-tools.md @@ -9,6 +9,7 @@ Name | Description exp | Export the message data using a special and independent XML format imp | Imports the journal to a running broker using the output from expt data | Prints a report about journal records and summary of existent records, as well a report on paging +recover | Revive data from the journal. It can be used in conjunction with historic journaling. encode | shows an internal format of the journal encoded to String decode | imports the internal journal format from encode diff --git a/docs/user-manual/en/persistence.md b/docs/user-manual/en/persistence.md index eb5cec481a..68a8df5ab6 100644 --- a/docs/user-manual/en/persistence.md +++ b/docs/user-manual/en/persistence.md @@ -43,6 +43,46 @@ The majority of the journal is written in Java, however we abstract out the interaction with the actual file system to allow different pluggable implementations. Apache ActiveMQ Artemis ships with two implementations: +### Journal Retention + +If you enable ``journal-retention`` on broker.xml, ActiveMQ Artemis will keep copy of every data that has passed through the broker on this folder. + +```xml + ... + + ... + +``` + +ActiveMQ Artemis will keep a copy of each generated journal file, up to the configured retention period, at the unit chose. On the example above the system would keep all the journal files up to 365 days. + +It is also possible to limit the number of files kept on the retention directory. You can keep a storage-limit, and the system will start removing older files when you have more files than the configured storage limit. + +Notice the storage limit is optional however you need to be careful to not run out of disk space at the retention folder or the broker might be shutdown because of a critical IO failure. + + +You can use the CLI tools to inspect and recover data from the history, by just passing the journal folder being the retention directory. + +Example: + +```shell +./artemis data print --journal ../data/history +``` + +To recover the messages from the history: + +```shell +./artemis data recovery --journal ../data/history --target ../data/recovered --large-messages ../data/large-messages +``` + +It is important that you don't call recover into a the journal while the broker is alive. As a matter of fact the current recommendations is to do that on a new journal directory. Perhaps on a new broker so you can inspect and transfer these messages. + +The retention feature is in its current form very simple and intended for emergency situations. If you think it is useful new options to recover the data could be added, perhaps thorugh the admin console and other possibilities. Please share your feedback on this area, and as always Pull Requests are welcomed! + +Also the recovery CLI tool will recover every data on the selected folder. It is important that you do some maintenance and copy the files and interval you need to a new location before you call recover. + + + ### Java [NIO](https://en.wikipedia.org/wiki/New_I/O) The first implementation uses standard Java NIO to interface with diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/RecoverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/RecoverTest.java index 75dfc4071a..299e7e71c6 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/RecoverTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/RecoverTest.java @@ -23,64 +23,121 @@ import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; - +import javax.jms.TextMessage; import java.io.File; +import java.util.ArrayList; +import java.util.Collection; import org.apache.activemq.artemis.cli.commands.tools.RecoverMessages; +import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.server.JournalType; +import org.apache.activemq.artemis.nativo.jlibaio.LibaioContext; import org.apache.activemq.artemis.tests.util.CFUtil; import org.apache.activemq.artemis.tests.util.JMSTestBase; +import org.apache.activemq.artemis.utils.Wait; import org.junit.Assert; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +@RunWith(Parameterized.class) public class RecoverTest extends JMSTestBase { + boolean useTX; + String protocol; + boolean paging; + boolean large; + String journalType; + + public RecoverTest(boolean useTX, String protocol, boolean paging, boolean large, String journalType) { + this.useTX = useTX; + this.protocol = protocol; + this.paging = paging; + this.large = large; + this.journalType = journalType; + } + + @Parameterized.Parameters(name = "useTX={0}, protocol={1}, paging={2}, largeMessage={3}, journal-type={4}") + public static Collection data() { + Object[] journalType; + if (LibaioContext.isLoaded()) { + journalType = new Object[]{"AIO", "NIO", "MAPPED"}; + } else { + journalType = new Object[]{"NIO", "MAPPED"}; + } + return combine(new Object[]{true, false}, new Object[]{"AMQP", "CORE", "OPENWIRE"}, new Object[]{true, false}, new Object[]{true, false}, journalType); + } + + protected static Collection combine(Object[] one, Object[] two, Object[] three, Object[] four, Object[] five) { + ArrayList combinations = new ArrayList<>(); + for (Object o1 : one) { + for (Object o2 : two) { + for (Object o3 : three) { + for (Object o4 : four) { + for (Object o5 : five) { + combinations.add(new Object[]{o1, o2, o3, o4, o5}); + } + } + } + } + } + + return combinations; + } + + @Override + protected Configuration createDefaultConfig(boolean netty) throws Exception { + Configuration configuration = super.createDefaultConfig(netty).setJMXManagementEnabled(true); + configuration.setJournalRetentionDirectory(getTestDir() + "/historyJournal"); + switch (journalType) { + case "NIO": + configuration.setJournalType(JournalType.NIO); + break; + case "MAPPED": + configuration.setJournalType(JournalType.MAPPED); + break; + case "AIO": + configuration.setJournalType(JournalType.ASYNCIO); + break; + } + return configuration; + } + @Override protected boolean usePersistence() { return true; } @Test - public void testRecoverCoreNoTx() throws Exception { - testRecover(false, "CORE"); - } - - @Test - public void testRecoverCORETx() throws Exception { - testRecover(true, "CORE"); - } - - - @Test - public void testRecoverAMQPNoTx() throws Exception { - testRecover(false, "AMQP"); - } - - @Test - public void testRecoverAMQPTx() throws Exception { - testRecover(true, "AMQP"); - } - - @Test - public void testRecoverOpenWireNoTx() throws Exception { - testRecover(false, "OPENWIRE"); - } - - @Test - public void testRecoverOpenWireTx() throws Exception { - testRecover(true, "OPENWIRE"); - } - - - public void testRecover(boolean useTX, String protocol) throws Exception { + public void testRecover() throws Exception { createQueue(true, "TestQueue"); + org.apache.activemq.artemis.core.server.Queue serverQueue = server.locateQueue("TestQueue"); + if (paging) { + serverQueue.getPagingStore().startPaging(); + } ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616"); Connection connection = factory.createConnection(); Session session = connection.createSession(useTX, useTX ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue("TestQueue"); - MessageProducer producer = session.createProducer(queue); - for (int i = 0; i < 1000; i++) { - producer.send(session.createTextMessage("test1")); + MessageProducer producer = session.createProducer(queue); + String messageBody; + { + StringBuffer stringBuffer = new StringBuffer(); + if (large) { + int i = 0; + while (stringBuffer.length() < 110 * 1024) { + //stringBuffer.append("this is " + (i++)); + stringBuffer.append(" "); + } + } else { + stringBuffer.append("hello"); + } + messageBody = stringBuffer.toString(); + } + int maxMessage = large ? 10 : 1000; + for (int i = 0; i < maxMessage; i++) { + producer.send(session.createTextMessage(i + messageBody)); } if (useTX) { @@ -94,8 +151,13 @@ public class RecoverTest extends JMSTestBase { MessageConsumer consumer = session.createConsumer(queue); connection.start(); - for (int i = 0; i < 1000; i++) { - Assert.assertNotNull(consumer.receive(5000)); + for (int i = 0; i < maxMessage; i++) { + TextMessage message = (TextMessage) consumer.receive(5000); + Assert.assertNotNull(message); + if (!protocol.equals("OPENWIRE")) { + // openwire won't support large message or its conversions + Assert.assertEquals(i + messageBody, message.getText()); + } } if (useTX) { @@ -104,11 +166,22 @@ public class RecoverTest extends JMSTestBase { connection.close(); + // need to wait no paging, otherwise an eventual page cleanup would remove large message bodies from the recovery + Wait.assertFalse(serverQueue.getPagingStore()::isPaging); + server.stop(); File newJournalLocation = new File(server.getConfiguration().getJournalLocation().getParentFile(), "recovered"); - RecoverMessages.recover(server.getConfiguration(), newJournalLocation, true); + RecoverMessages.recover(server.getConfiguration(), server.getConfiguration().getJournalRetentionDirectory(), newJournalLocation, server.getConfiguration().getLargeMessagesLocation(), false); + + if (large) { + File[] largeMessageFiles = server.getConfiguration().getLargeMessagesLocation().listFiles(); + Assert.assertEquals(maxMessage, largeMessageFiles.length); + for (File f : largeMessageFiles) { + Assert.assertTrue("File length was " + f.length(), f.length() > 0); + } + } server.getConfiguration().setJournalDirectory(newJournalLocation.getAbsolutePath()); @@ -122,8 +195,13 @@ public class RecoverTest extends JMSTestBase { consumer = session.createConsumer(queue); - for (int i = 0; i < 1000; i++) { - Assert.assertNotNull(consumer.receive(5000)); + for (int i = 0; i < maxMessage; i++) { + TextMessage message = (TextMessage) consumer.receive(5000); + Assert.assertNotNull(message); + if (!protocol.equals("OPENWIRE")) { + // openwire won't support large message or its conversions + Assert.assertEquals(i + messageBody, message.getText()); + } } Assert.assertNull(consumer.receiveNoWait()); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTestBase.java index 1609b5679c..273117346e 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTestBase.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTestBase.java @@ -206,6 +206,9 @@ public abstract class FailoverTestBase extends ActiveMQTestBase { liveConfig.clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(true)); liveServer = createTestableServer(liveConfig); + + liveServer.getServer().getConfiguration().setJournalRetentionDirectory(getJournalDir(0, false) + "_retention"); + backupServer.getServer().getConfiguration().setJournalRetentionDirectory(getJournalDir(0, true) + "_retention"); } protected void setupHAPolicyConfiguration() { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOJournalCompactTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOJournalCompactTest.java index dee22fbf35..4b27f53cc1 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOJournalCompactTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOJournalCompactTest.java @@ -37,6 +37,7 @@ import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.io.SequentialFileFactory; import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory; +import org.apache.activemq.artemis.core.journal.EncoderPersister; import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo; import org.apache.activemq.artemis.core.journal.RecordInfo; import org.apache.activemq.artemis.core.journal.impl.AbstractJournalUpdateTask; @@ -44,6 +45,7 @@ import org.apache.activemq.artemis.core.journal.impl.JournalCompactor; import org.apache.activemq.artemis.core.journal.impl.JournalFile; import org.apache.activemq.artemis.core.journal.impl.JournalFileImpl; import org.apache.activemq.artemis.core.journal.impl.JournalImpl; +import org.apache.activemq.artemis.core.journal.impl.dataformat.ByteArrayEncoding; import org.apache.activemq.artemis.core.message.impl.CoreMessage; import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager; import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl; @@ -549,6 +551,10 @@ public class NIOJournalCompactTest extends JournalImplTestBase { } } + for (int i = 0; i < 10; i++) { + journal.appendAddEvent(idGenerator.generateID(), (byte) 0, EncoderPersister.getInstance(), new ByteArrayEncoding(new byte[10]), false, null); + } + if (pendingTransactions) { for (long i = 0; i < 100; i++) { long recordID = idGenerator.generateID(); @@ -1200,6 +1206,10 @@ public class NIOJournalCompactTest extends JournalImplTestBase { } } + for (int i = 0; i < 10; i++) { + journal.appendAddEvent(idGenerator.generateID(), (byte) 0, EncoderPersister.getInstance(), new ByteArrayEncoding(new byte[10]), false, null); + } + journal.forceMoveNextFile(); instanceLog.debug("Number of Files: " + journal.getDataFilesCount()); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java index 9622b46dbd..24342ffb6e 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java @@ -652,6 +652,16 @@ public final class ReplicationTest extends ActiveMQTestBase { } + @Override + public void appendAddEvent(long id, + byte recordType, + Persister persister, + Object record, + boolean sync, + IOCompletion completionCallback) throws Exception { + + } + @Override public boolean isRemoveExtraFilesOnLoad() { return false; diff --git a/tests/smoke-tests/src/main/resources/servers/replicated-static0/broker.xml b/tests/smoke-tests/src/main/resources/servers/replicated-static0/broker.xml index eb226e6005..f96eaa9db6 100644 --- a/tests/smoke-tests/src/main/resources/servers/replicated-static0/broker.xml +++ b/tests/smoke-tests/src/main/resources/servers/replicated-static0/broker.xml @@ -30,6 +30,8 @@ under the License. ./data/paging + ./data/retention + exampleUser secret diff --git a/tests/smoke-tests/src/main/resources/servers/replicated-static1/broker.xml b/tests/smoke-tests/src/main/resources/servers/replicated-static1/broker.xml index 4d62a6ac71..779b428dca 100644 --- a/tests/smoke-tests/src/main/resources/servers/replicated-static1/broker.xml +++ b/tests/smoke-tests/src/main/resources/servers/replicated-static1/broker.xml @@ -34,6 +34,8 @@ under the License. secret + ./data/retention + diff --git a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/replicationflow/ReplicationFlowControlTest.java b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/replicationflow/ReplicationFlowControlTest.java index 7dd6507151..50bb56fe28 100644 --- a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/replicationflow/ReplicationFlowControlTest.java +++ b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/replicationflow/ReplicationFlowControlTest.java @@ -23,6 +23,7 @@ import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; +import java.io.File; import java.util.ArrayList; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -34,6 +35,7 @@ import org.apache.activemq.artemis.util.ServerUtil; import org.apache.activemq.artemis.utils.ReusableLatch; import org.apache.qpid.jms.JmsConnectionFactory; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -171,6 +173,17 @@ public class ReplicationFlowControlTest extends SmokeTestBase { consumer.join(); } } + + assertRetentionFolder(getServerLocation(SERVER_NAME_0)); + assertRetentionFolder(getServerLocation(SERVER_NAME_1)); + } + + private void assertRetentionFolder(String serverLocation) { + File retentionFolder = new File(serverLocation + "/data/retention"); + System.out.println("retention folder = " + retentionFolder.getAbsolutePath()); + File[] files = retentionFolder.listFiles(); + // it should be max = 2, however I'm giving some extra due to async factors.. + Assert.assertTrue(retentionFolder.getAbsolutePath() + " has " + (files == null ? "no files" : files.length + " elements"), files != null && files.length <= 10); } void startConsumers(boolean useAMQP) { diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/FakeJournalImplTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/FakeJournalImplTest.java index ce8c30fbc8..668c219f8f 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/FakeJournalImplTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/FakeJournalImplTest.java @@ -26,6 +26,11 @@ public class FakeJournalImplTest extends JournalImplTestUnit { return new FakeSequentialFileFactory(); } + @Override + protected boolean suportsRetention() { + return false; + } + @Override protected int getAlignment() { return 1; diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournaHistorylBackupTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournaHistorylBackupTest.java new file mode 100644 index 0000000000..9c94ba7a83 --- /dev/null +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournaHistorylBackupTest.java @@ -0,0 +1,312 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.unit.core.journal.impl; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.FilenameFilter; +import java.util.Calendar; +import java.util.GregorianCalendar; +import java.util.HashSet; +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.artemis.core.io.SequentialFile; +import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory; +import org.apache.activemq.artemis.core.journal.impl.JournalFile; +import org.apache.activemq.artemis.core.journal.impl.JournalFilesRepository; +import org.apache.activemq.artemis.core.journal.impl.JournalImpl; +import org.apache.activemq.artemis.tests.unit.core.journal.impl.fakes.FakeSequentialFileFactory; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.junit.Assert; +import org.junit.Test; + +public class JournaHistorylBackupTest extends ActiveMQTestBase { + + + @Test + public void testDoubleReplacement() throws Throwable { + + File history = new File(getTestDirfile(), "history"); + history.mkdirs(); + + + File journalFolder = new File(getTestDirfile(), "journal"); + journalFolder.mkdirs(); + NIOSequentialFileFactory nioSequentialFileFactory = new NIOSequentialFileFactory(journalFolder, 1); + JournalImpl journal = new JournalImpl(10 * 1024, 10, 10, 0, 100, nioSequentialFileFactory, "test", "journal", 1); + journal.setHistoryFolder(history, -1, -1); + journal.start(); + journal.loadInternalOnly(); + + SequentialFile file = nioSequentialFileFactory.createSequentialFile("test-4.journal"); + file.open(); + JournalFile journalFile = journal.readFileHeader(file); + file.close(); + journalFile.getFile().renameTo(journalFile.getFile().getFileName() + ".bkp"); + + journal.stop(); + + + Calendar oldCalendar = new GregorianCalendar(); + oldCalendar.setTimeInMillis(System.currentTimeMillis() - TimeUnit.HOURS.toMillis(1)); + String toBeReplacedFileName = journal.getHistoryFileName(journalFile.getFileID(), oldCalendar); + + + File historyFile = new File(history, toBeReplacedFileName); + FileOutputStream outputStream = new FileOutputStream(historyFile); + outputStream.write(0); + outputStream.close(); + + nioSequentialFileFactory = new NIOSequentialFileFactory(journalFolder, 1); + journal = new JournalImpl(10 * 1024, 10, 10, 0, 100, nioSequentialFileFactory, "test", "journal", 1); + journal.setHistoryFolder(history, -1, -1); + journal.start(); + journal.loadInternalOnly(); + + File[] fileList = history.listFiles((a, name) -> name.endsWith(".journal")); + + Assert.assertEquals(1, fileList.length); + } + + @Test + public void verifyFileName() throws Throwable { + GregorianCalendar clebertsBirthday = new GregorianCalendar(1972, 1, 19, 4, 5, 7); + + + JournalImpl journal = new JournalImpl(10 * 1024, 10, 10, 0, 100, new FakeSequentialFileFactory(), "cleberts", "birthday", 1); + String fileNameGenerated = journal.getHistoryFileName(1, clebertsBirthday); + + // I was actually born at 4:30 :) but I need all numbers lower than 2 digits on the test + Assert.assertEquals("cleberts-19720119040507-1.birthday", fileNameGenerated); + Assert.assertEquals("19720119040507", journal.getDatePortion(fileNameGenerated)); + + long d = journal.getDatePortionMillis(fileNameGenerated); + + GregorianCalendar compareCalendar = new GregorianCalendar(); + compareCalendar.setTimeInMillis(d); + + Assert.assertEquals(1972, compareCalendar.get(Calendar.YEAR)); + Assert.assertEquals(1, compareCalendar.get(Calendar.MONTH)); + Assert.assertEquals(19, compareCalendar.get(Calendar.DAY_OF_MONTH)); + Assert.assertEquals(4, compareCalendar.get(Calendar.HOUR_OF_DAY)); + Assert.assertEquals(5, compareCalendar.get(Calendar.MINUTE)); + Assert.assertEquals(7, compareCalendar.get(Calendar.SECOND)); + + Assert.assertFalse(d < clebertsBirthday.getTimeInMillis()); + + compareCalendar.set(Calendar.YEAR, 1971); + + Assert.assertTrue(compareCalendar.getTimeInMillis() < clebertsBirthday.getTimeInMillis()); + + } + + @Test + public void removeBKPExtension() throws Throwable { + JournalImpl journal = new JournalImpl(10 * 1024, 10, 10, 0, 100, new FakeSequentialFileFactory(), "jrn", "data", 1); + + String withoutBkp = "jrn-1.data"; + String withBKP = withoutBkp + ".bkp"; + // I was actually born at 4:30 :) but I need all numbers lower than 2 digits on the test + Assert.assertEquals(withoutBkp, journal.removeBackupExtension(withBKP)); + Assert.assertEquals(withoutBkp, journal.removeBackupExtension(withoutBkp)); // it should be possible to do it + + String withoutBKP = "jrn-1.data"; + } + + @Test + public void testFileID() throws Throwable { + JournalImpl journal = new JournalImpl(10 * 1024, 10, 10, 0, 100, new FakeSequentialFileFactory(), "jrn", "data", 1); + GregorianCalendar calendar = new GregorianCalendar(); + calendar.setTimeInMillis(System.currentTimeMillis()); + String fileName = journal.getHistoryFileName(3, calendar); + long id = JournalFilesRepository.getFileNameID("jrn", fileName); + Assert.assertEquals(3, id); + } + + @Test + public void testRemoveOldFiles() throws Exception { + GregorianCalendar todayCalendar = new GregorianCalendar(); + todayCalendar.setTimeInMillis(System.currentTimeMillis()); + + File tempFolder = new File(getTestDirfile(), "history"); + tempFolder.mkdirs(); + + JournalImpl journal = new JournalImpl(10 * 1024, 10, 10, 0, 100, new FakeSequentialFileFactory(), "jrn", "data", 1); + journal.setHistoryFolder(tempFolder, -1, TimeUnit.HOURS.toMillis(24)); + + Calendar dayOlderCalendar = new GregorianCalendar(); + dayOlderCalendar.setTimeInMillis(System.currentTimeMillis() - TimeUnit.HOURS.toMillis(25)); + + + for (int i = 0; i < 100; i++) { + String fileName = journal.getHistoryFileName(i, dayOlderCalendar); + File file = new File(tempFolder, fileName); + FileOutputStream outputStream = new FileOutputStream(file); + outputStream.write(0); + outputStream.close(); + } + + for (int i = 0; i < 100; i++) { + String fileName = journal.getHistoryFileName(i, todayCalendar); + File file = new File(tempFolder, fileName); + FileOutputStream outputStream = new FileOutputStream(file); + outputStream.write(0); + outputStream.close(); + } + + journal.processBackupCleanup(); + + FilenameFilter fnf = new FilenameFilter() { + @Override + public boolean accept(final File file, final String name) { + return name.endsWith(".data"); + } + }; + + File[] files = tempFolder.listFiles(fnf); + + Assert.assertEquals(100, files.length); + + HashSet hashSet = new HashSet<>(); + for (File file : files) { + hashSet.add(file.getName()); + } + + for (int i = 0; i < 100; i++) { + Assert.assertTrue(hashSet.contains(journal.getHistoryFileName(i, todayCalendar))); + } + + } + + + @Test + public void testKeepOldFiles() throws Exception { + GregorianCalendar todayCalendar = new GregorianCalendar(); + todayCalendar.setTimeInMillis(System.currentTimeMillis()); + + File tempFolder = new File(getTestDirfile(), "history"); + tempFolder.mkdirs(); + + JournalImpl journal = new JournalImpl(10 * 1024, 10, 10, 0, 100, new FakeSequentialFileFactory(), "jrn", "data", 1); + journal.setHistoryFolder(tempFolder, -1, TimeUnit.HOURS.toMillis(24)); + + Calendar oldCalendar = new GregorianCalendar(); + oldCalendar.setTimeInMillis(System.currentTimeMillis() - TimeUnit.HOURS.toMillis(1)); + + + for (int i = 0; i < 100; i++) { + String fileName = journal.getHistoryFileName(i, oldCalendar); + File file = new File(tempFolder, fileName); + FileOutputStream outputStream = new FileOutputStream(file); + outputStream.write(0); + outputStream.close(); + } + + for (int i = 0; i < 100; i++) { + String fileName = journal.getHistoryFileName(i, todayCalendar); + File file = new File(tempFolder, fileName); + FileOutputStream outputStream = new FileOutputStream(file); + outputStream.write(0); + outputStream.close(); + } + + journal.processBackupCleanup(); + + FilenameFilter fnf = new FilenameFilter() { + @Override + public boolean accept(final File file, final String name) { + return name.endsWith(".data"); + } + }; + + File[] files = tempFolder.listFiles(fnf); + + Assert.assertEquals(200, files.length); + + HashSet hashSet = new HashSet<>(); + for (File file : files) { + hashSet.add(file.getName()); + } + + for (int i = 0; i < 100; i++) { + Assert.assertTrue(hashSet.contains(journal.getHistoryFileName(i, todayCalendar))); + } + + for (int i = 0; i < 100; i++) { + Assert.assertTrue(hashSet.contains(journal.getHistoryFileName(i, oldCalendar))); + } + + } + + + @Test + public void testMaxFiles() throws Exception { + GregorianCalendar todayCalendar = new GregorianCalendar(); + todayCalendar.setTimeInMillis(System.currentTimeMillis()); + + File tempFolder = new File(getTestDirfile(), "history"); + tempFolder.mkdirs(); + + JournalImpl journal = new JournalImpl(10 * 1024, 10, 10, 0, 100, new FakeSequentialFileFactory(), "jrn", "data", 1); + journal.setHistoryFolder(tempFolder, 10 * journal.getFileSize(), TimeUnit.HOURS.toMillis(24)); + + Calendar oldCalendar = new GregorianCalendar(); + oldCalendar.setTimeInMillis(System.currentTimeMillis() - TimeUnit.HOURS.toMillis(1)); + + + for (int i = 0; i < 100; i++) { + String fileName = journal.getHistoryFileName(i, oldCalendar); + File file = new File(tempFolder, fileName); + FileOutputStream outputStream = new FileOutputStream(file); + outputStream.write(0); + outputStream.close(); + } + + for (int i = 0; i < 100; i++) { + String fileName = journal.getHistoryFileName(i, todayCalendar); + File file = new File(tempFolder, fileName); + FileOutputStream outputStream = new FileOutputStream(file); + outputStream.write(0); + outputStream.close(); + } + + journal.processBackupCleanup(); + + FilenameFilter fnf = new FilenameFilter() { + @Override + public boolean accept(final File file, final String name) { + return name.endsWith(".data"); + } + }; + + File[] files = tempFolder.listFiles(fnf); + + Assert.assertEquals(10, files.length); + + HashSet hashSet = new HashSet<>(); + for (File file : files) { + hashSet.add(file.getName()); + } + + + for (int i = 90; i < 100; i++) { + Assert.assertTrue(hashSet.contains(journal.getHistoryFileName(i, todayCalendar))); + } + + } + +} diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestBase.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestBase.java index cf399f9be5..1a5c86624f 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestBase.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestBase.java @@ -159,6 +159,10 @@ public abstract class JournalImplTestBase extends ActiveMQTestBase { maxAIO = 50; } + protected boolean suportsRetention() { + return true; + } + public void createJournal() throws Exception { journal = new JournalImpl(fileSize, minFiles, poolSize, 0, 0, fileFactory, filePrefix, fileExtension, maxAIO) { @Override @@ -172,6 +176,13 @@ public abstract class JournalImplTestBase extends ActiveMQTestBase { } }; + if (suportsRetention()) { + // FakeSequentialFile won't support retention + File fileBackup = new File(getTestDir(), "backupFoler"); + fileBackup.mkdirs(); + ((JournalImpl) journal).setHistoryFolder(fileBackup, -1, -1); + } + journal.setAutoReclaim(false); addActiveMQComponent(journal); } diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestUnit.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestUnit.java index 8299c9b4cc..18f2b54a0c 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestUnit.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestUnit.java @@ -557,6 +557,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase { Assert.assertEquals(0, journal.getFreeFilesCount()); Assert.assertEquals(91, journal.getIDMapSize()); + journal.processBackup(); List files2 = fileFactory.listFiles(fileExtension); // The Journal will aways have a file ready to be opened @@ -601,6 +602,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase { Assert.assertEquals(0, journal.getFreeFilesCount()); Assert.assertEquals(200, journal.getIDMapSize()); + journal.processBackup(); List files4 = fileFactory.listFiles(fileExtension); Assert.assertEquals(numberOfFiles + 2, files4.size()); @@ -746,6 +748,8 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase { Assert.assertEquals(0, journal.getFreeFilesCount()); Assert.assertEquals(initialNumberOfAddRecords, journal.getIDMapSize()); + journal.processBackup(); + List files4 = fileFactory.listFiles(fileExtension); Assert.assertEquals(11, files4.size()); @@ -827,6 +831,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase { update(1); delete(1); + journal.processBackup(); List files1 = fileFactory.listFiles(fileExtension); Assert.assertEquals(4, files1.size()); @@ -866,6 +871,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase { update(1); add(2); + journal.processBackup(); List files1 = fileFactory.listFiles(fileExtension); Assert.assertEquals(4, files1.size()); @@ -920,6 +926,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase { addTx(1, i); } + journal.processBackup(); Assert.assertEquals(calculateNumberOfFiles(journal, fileSize, journal.getAlignment(), 100, recordLength), journal.getDataFilesCount()); Assert.assertEquals(0, journal.getFreeFilesCount()); Assert.assertEquals(0, journal.getIDMapSize()); @@ -941,6 +948,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase { Assert.assertEquals(0, journal.getFreeFilesCount()); Assert.assertEquals(0, journal.getIDMapSize()); + journal.processBackup(); List files3 = fileFactory.listFiles(fileExtension); Assert.assertEquals(calculateNumberOfFiles(journal, fileSize, journal.getAlignment(), 100, recordLength) + 2, files3.size()); @@ -960,6 +968,8 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase { Assert.assertEquals(0, journal.getFreeFilesCount()); Assert.assertEquals(0, journal.getIDMapSize()); + + journal.processBackup(); List files4 = fileFactory.listFiles(fileExtension); Assert.assertEquals(calculateNumberOfFiles(journal, fileSize, journal.getAlignment(), 200, recordLength) + 2, files4.size()); @@ -976,6 +986,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase { Assert.assertEquals(0, journal.getFreeFilesCount()); Assert.assertEquals(0, journal.getIDMapSize()); + journal.processBackup(); List files5 = fileFactory.listFiles(fileExtension); Assert.assertEquals(24, files5.size()); @@ -994,6 +1005,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase { Assert.assertEquals(0, journal.getFreeFilesCount()); Assert.assertEquals(0, journal.getIDMapSize()); + journal.processBackup(); List files7 = fileFactory.listFiles(fileExtension); Assert.assertEquals(1, journal.getOpenedFilesCount()); @@ -1007,6 +1019,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase { Assert.assertEquals(0, journal.getFreeFilesCount()); Assert.assertEquals(0, journal.getIDMapSize()); + journal.processBackup(); List files8 = fileFactory.listFiles(fileExtension); Assert.assertEquals(1, journal.getOpenedFilesCount()); @@ -1032,6 +1045,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase { Assert.assertEquals(0, journal.getFreeFilesCount()); Assert.assertEquals(10, journal.getIDMapSize()); + journal.processBackup(); List files9 = fileFactory.listFiles(fileExtension); Assert.assertEquals(1, journal.getOpenedFilesCount()); @@ -1053,6 +1067,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase { createJournal(); startJournal(); load(); + journal.processBackup(); List files1 = fileFactory.listFiles(fileExtension); Assert.assertEquals(2, files1.size()); @@ -1070,6 +1085,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase { log.debug("journal tmp :" + journal.debug()); + journal.processBackup(); List files2 = fileFactory.listFiles(fileExtension); Assert.assertEquals(3, files2.size()); @@ -1087,6 +1103,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase { log.debug("journal tmp2 :" + journal.debug()); + journal.processBackup(); List files3 = fileFactory.listFiles(fileExtension); Assert.assertEquals(4, files3.size()); @@ -1101,6 +1118,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase { commit(1); // in file 3 + journal.processBackup(); List files4 = fileFactory.listFiles(fileExtension); Assert.assertEquals(5, files4.size()); @@ -1114,6 +1132,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase { addWithSize(recordLength - JournalImpl.SIZE_ADD_RECORD - 1, 3); // in file 4 + journal.processBackup(); List files5 = fileFactory.listFiles(fileExtension); Assert.assertEquals(6, files5.size()); @@ -1125,6 +1144,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase { checkAndReclaimFiles(); + journal.processBackup(); List files6 = fileFactory.listFiles(fileExtension); // Three should get deleted (files 0, 1, 3) @@ -1162,6 +1182,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase { startJournal(); load(); + journal.processBackup(); List files1 = fileFactory.listFiles(fileExtension); Assert.assertEquals(10, files1.size()); @@ -1200,6 +1221,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase { startJournal(); load(); + journal.processBackup(); List files1 = fileFactory.listFiles(fileExtension); Assert.assertEquals(10, files1.size()); @@ -1231,6 +1253,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase { startJournal(); load(); + journal.processBackup(); List files1 = fileFactory.listFiles(fileExtension); Assert.assertEquals(10, files1.size()); @@ -1269,6 +1292,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase { startJournal(); load(); + journal.processBackup(); List files1 = fileFactory.listFiles(fileExtension); Assert.assertEquals(10, files1.size()); @@ -1307,6 +1331,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase { startJournal(); load(); + journal.processBackup(); List files1 = fileFactory.listFiles(fileExtension); Assert.assertEquals(10, files1.size()); @@ -1338,6 +1363,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase { startJournal(); load(); + journal.processBackup(); List files1 = fileFactory.listFiles(fileExtension); Assert.assertEquals(10, files1.size()); @@ -1376,6 +1402,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase { startJournal(); load(); + journal.processBackup(); List files1 = fileFactory.listFiles(fileExtension); Assert.assertEquals(10, files1.size()); @@ -1408,6 +1435,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase { startJournal(); load(); + journal.processBackup(); List files1 = fileFactory.listFiles(fileExtension); Assert.assertEquals(2, files1.size()); @@ -1419,6 +1447,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase { addTx(1, 1); + journal.processBackup(); List files2 = fileFactory.listFiles(fileExtension); Assert.assertEquals(2, files2.size()); @@ -1432,6 +1461,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase { commit(1); + journal.processBackup(); List files3 = fileFactory.listFiles(fileExtension); Assert.assertEquals(3, files3.size()); @@ -1445,6 +1475,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase { // Move on to another file + journal.processBackup(); List files4 = fileFactory.listFiles(fileExtension); Assert.assertEquals(4, files4.size()); @@ -1458,6 +1489,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase { // Nothing should be reclaimed + journal.processBackup(); List files5 = fileFactory.listFiles(fileExtension); Assert.assertEquals(4, files5.size()); @@ -1479,6 +1511,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase { startJournal(); load(); + journal.processBackup(); List files1 = fileFactory.listFiles(fileExtension); Assert.assertEquals(2, files1.size()); @@ -1494,6 +1527,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase { addWithSize(recordLength - JournalImpl.SIZE_ADD_RECORD - 1, 2); // in file 1 + journal.processBackup(); List files2 = fileFactory.listFiles(fileExtension); Assert.assertEquals(3, files2.size()); @@ -1505,6 +1539,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase { commit(1); // in file 1 + journal.processBackup(); List files3 = fileFactory.listFiles(fileExtension); Assert.assertEquals(calculateNumberOfFiles(journal, fileSize, journal.getAlignment(), 2, recordLength, 1, JournalImpl.SIZE_COMMIT_RECORD + 1) + 2, files3.size()); @@ -1516,6 +1551,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase { delete(2); // in file 1 + journal.processBackup(); List files4 = fileFactory.listFiles(fileExtension); Assert.assertEquals(calculateNumberOfFiles(journal, fileSize, journal.getAlignment(), 2, recordLength, 1, JournalImpl.SIZE_COMMIT_RECORD + 1, 1, JournalImpl.SIZE_DELETE_RECORD + 1) + 2, files4.size()); @@ -1529,6 +1565,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase { addWithSize(recordLength - JournalImpl.SIZE_ADD_RECORD, 3); // in file 2 + journal.processBackup(); List files5 = fileFactory.listFiles(fileExtension); Assert.assertEquals(0, journal.getFreeFilesCount()); @@ -1537,6 +1574,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase { checkAndReclaimFiles(); + journal.processBackup(); List files6 = fileFactory.listFiles(fileExtension); Assert.assertEquals(0, journal.getFreeFilesCount()); @@ -1562,6 +1600,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase { startJournal(); load(); + journal.processBackup(); List files1 = fileFactory.listFiles(fileExtension); Assert.assertEquals(2, files1.size()); @@ -1577,6 +1616,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase { addWithSize(recordLength - JournalImpl.SIZE_ADD_RECORD - 1, 2); // in file 1 + journal.processBackup(); List files2 = fileFactory.listFiles(fileExtension); Assert.assertEquals(3, files2.size()); @@ -1597,6 +1637,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase { delete(2); // in file 1 + journal.processBackup(); List files4 = fileFactory.listFiles(fileExtension); Assert.assertEquals(calculateNumberOfFiles(journal, fileSize, journal.getAlignment(), 2, recordLength, 1, JournalImpl.SIZE_ROLLBACK_RECORD + 1, 1, JournalImpl.SIZE_DELETE_RECORD + 1) + 2, files4.size()); @@ -1618,6 +1659,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase { checkAndReclaimFiles(); + journal.processBackup(); List files6 = fileFactory.listFiles(fileExtension); // files 0 and 1 should be deleted @@ -1636,6 +1678,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase { startJournal(); loadAndCheck(); + journal.processBackup(); List files7 = fileFactory.listFiles(fileExtension); Assert.assertEquals(journal.getAlignment() == 1 ? 2 : 3, files7.size()); @@ -1687,6 +1730,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase { startJournal(); load(); + journal.processBackup(); List files1 = fileFactory.listFiles(fileExtension); Assert.assertEquals(2, files1.size()); @@ -1702,6 +1746,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase { addWithSize(1024 - JournalImpl.SIZE_ADD_RECORD, 2); // in file 1 + journal.processBackup(); List files2 = fileFactory.listFiles(fileExtension); Assert.assertEquals(3, files2.size()); @@ -1736,6 +1781,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase { checkAndReclaimFiles(); + journal.processBackup(); List files6 = fileFactory.listFiles(fileExtension); Assert.assertEquals(4, files6.size()); @@ -1747,6 +1793,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase { addWithSize(recordLength - JournalImpl.SIZE_ADD_RECORD - 1, 4); // in file 3 + journal.processBackup(); List files7 = fileFactory.listFiles(fileExtension); Assert.assertEquals(5, files7.size()); @@ -1758,6 +1805,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase { commit(1); // in file 4 + journal.processBackup(); List files8 = fileFactory.listFiles(fileExtension); Assert.assertEquals(5, files8.size()); @@ -1783,6 +1831,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase { startJournal(); load(); + journal.processBackup(); List files1 = fileFactory.listFiles(fileExtension); Assert.assertEquals(2, files1.size()); @@ -1794,6 +1843,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase { addTx(1, 1); // in file 0 + journal.processBackup(); files1 = fileFactory.listFiles(fileExtension); Assert.assertEquals(2, files1.size()); @@ -1811,6 +1861,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase { addWithSize(recordLength - JournalImpl.SIZE_ADD_RECORD, 2); // in file 1 + journal.processBackup(); List files2 = fileFactory.listFiles(fileExtension); Assert.assertEquals(3, files2.size()); @@ -1822,6 +1873,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase { EncodingSupport xid = new SimpleEncoding(10, (byte) 0); prepare(1, xid); // in file 1 + journal.processBackup(); List files3 = fileFactory.listFiles(fileExtension); Assert.assertEquals(3, files3.size()); @@ -1833,6 +1885,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase { delete(2); // in file 1 + journal.processBackup(); List files4 = fileFactory.listFiles(fileExtension); Assert.assertEquals(3, files4.size()); @@ -1851,6 +1904,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase { checkAndReclaimFiles(); + journal.processBackup(); List files5 = fileFactory.listFiles(fileExtension); Assert.assertEquals(4, files5.size()); @@ -1862,6 +1916,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase { checkAndReclaimFiles(); + journal.processBackup(); List files6 = fileFactory.listFiles(fileExtension); Assert.assertEquals(4, files6.size()); @@ -1875,6 +1930,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase { addWithSize(1024 - JournalImpl.SIZE_ADD_RECORD, 4); // in file 3 + journal.processBackup(); List files7 = fileFactory.listFiles(fileExtension); Assert.assertEquals(5, files7.size()); @@ -1886,6 +1942,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase { commit(1); // in file 3 + journal.processBackup(); List files8 = fileFactory.listFiles(fileExtension); Assert.assertEquals(5, files8.size()); @@ -1897,6 +1954,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase { delete(1); // in file 3 + journal.processBackup(); List files9 = fileFactory.listFiles(fileExtension); Assert.assertEquals(5, files9.size()); @@ -1908,6 +1966,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase { checkAndReclaimFiles(); + journal.processBackup(); List files10 = fileFactory.listFiles(fileExtension); Assert.assertEquals(journal.getAlignment() == 1 ? 5 : 5, files10.size()); @@ -1920,6 +1979,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase { addWithSize(1024 - JournalImpl.SIZE_ADD_RECORD, 5); // in file 4 + journal.processBackup(); List files11 = fileFactory.listFiles(fileExtension); Assert.assertEquals(6, files11.size()); @@ -1931,6 +1991,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase { checkAndReclaimFiles(); + journal.processBackup(); List files12 = fileFactory.listFiles(fileExtension); // File 0, and File 1 should be deleted @@ -1968,6 +2029,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase { // file 3 should now be deleted + journal.processBackup(); List files15 = fileFactory.listFiles(fileExtension); Assert.assertEquals(4, files15.size()); @@ -3092,6 +3154,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase { // log.debug(journal.debug()); // log.debug("*****************************************"); + journal.processBackup(); stopJournal(); createJournal(); startJournal();