ARTEMIS-3297 Journal Retention Feature

This commit is contained in:
Clebert Suconic 2021-05-11 17:22:56 -04:00
parent 7733a76649
commit 27c343913f
48 changed files with 1663 additions and 114 deletions

View File

@ -178,7 +178,7 @@ public class Artemis {
builder = builder.withCommands(Run.class, Stop.class, Kill.class, PerfJournal.class);
} else {
builder.withGroup("data").withDescription("data tools group (print) (example ./artemis data print)").
withDefaultCommand(HelpData.class).withCommands(PrintData.class);
withDefaultCommand(HelpData.class).withCommands(RecoverMessages.class, PrintData.class);
builder = builder.withCommand(Create.class);
}

View File

@ -25,12 +25,14 @@ import io.airlift.airline.Option;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.cli.commands.ActionContext;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.core.journal.impl.JournalFile;
import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
import org.apache.activemq.artemis.core.journal.impl.JournalReaderCallback;
import org.apache.activemq.artemis.core.journal.impl.dataformat.ByteArrayEncoding;
import org.apache.activemq.artemis.core.message.impl.CoreMessagePersister;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
import org.apache.activemq.artemis.spi.core.protocol.MessagePersister;
@ -63,7 +65,7 @@ public class RecoverMessages extends DBOption {
if (configuration.isJDBC()) {
throw new IllegalAccessException("JDBC Not supported on recover");
} else {
recover(configuration, journalOutput, reclaimed);
recover(configuration, getJournal(), journalOutput, new File(getLargeMessages()), reclaimed);
}
} catch (Exception e) {
treatError(e, "data", "print");
@ -71,9 +73,9 @@ public class RecoverMessages extends DBOption {
return null;
}
public static void recover(Configuration configuration, File journalOutput, boolean reclaimed) throws Exception {
public static void recover(Configuration configuration, String journallocation, File journalOutput, File largeMessage, boolean reclaimed) throws Exception {
File journal = configuration.getJournalLocation();
File journal = new File(journallocation);
if (!journalOutput.exists()) {
if (!journalOutput.mkdirs()) {
@ -87,12 +89,14 @@ public class RecoverMessages extends DBOption {
SequentialFileFactory outputFF = new NIOSequentialFileFactory(journalOutput, null, 1);
outputFF.setDatasync(false);
JournalImpl targetJournal = new JournalImpl(configuration.getJournalFileSize(), 2, 2, 0, 0, outputFF, "activemq-data", "amq", 1);
JournalImpl targetJournal = new JournalImpl(configuration.getJournalFileSize(), 2, 2, -1, 0, outputFF, "activemq-data", "amq", 1);
targetJournal.setAutoReclaim(false);
targetJournal.start();
targetJournal.loadInternalOnly();
SequentialFileFactory messagesFF = new NIOSequentialFileFactory(journal, null, 1);
SequentialFileFactory largeMessagesFF = new NIOSequentialFileFactory(largeMessage, null, 1);
// Will use only default values. The load function should adapt to anything different
JournalImpl messagesJournal = new JournalImpl(configuration.getJournalFileSize(), configuration.getJournalMinFiles(), configuration.getJournalPoolFiles(), 0, 0, messagesFF, "activemq-data", "amq", 1);
@ -106,23 +110,66 @@ public class RecoverMessages extends DBOption {
userRecordsOfInterest.add(JournalRecordIds.ADD_REF);
userRecordsOfInterest.add(JournalRecordIds.PAGE_TRANSACTION);
HashSet<Pair<Long, Long>> routeBindigns = new HashSet<>();
for (JournalFile file : files) {
// For reviewers and future maintainers: I really meant System.out.println here
// This is part of the CLI, hence this is like user's output
System.out.println("Recovering messages from file " + file);
HashSet<Pair<Long, Long>> routeBindigns = new HashSet<>();
JournalImpl.readJournalFile(messagesFF, file, new JournalReaderCallback() {
long lastlargeMessageId = -1;
SequentialFile largeMessageFile;
@Override
public void done() {
try {
if (largeMessageFile != null) {
largeMessageFile.close();
largeMessageFile = null;
}
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void onReadEventRecord(RecordInfo info) throws Exception {
switch (info.getUserRecordType()) {
case JournalRecordIds.ADD_REF:
onReadUpdateRecord(info);
break;
case JournalRecordIds.ADD_MESSAGE_BODY:
if (lastlargeMessageId != info.id || largeMessageFile == null) {
if (largeMessageFile != null) {
largeMessageFile.close();
}
largeMessageFile = largeMessagesFF.createSequentialFile(info.id + ".msg");
largeMessageFile.open();
largeMessageFile.position(largeMessageFile.size());
lastlargeMessageId = info.id;
}
largeMessageFile.write(new ByteArrayEncoding(info.data), false, null);
break;
default:
onReadAddRecord(info);
}
}
@Override
public void onReadAddRecord(RecordInfo info) throws Exception {
if (userRecordsOfInterest.contains(info.getUserRecordType())) {
if (targetJournal.getRecords().get(info.id) != null) {
// Really meant System.out.. user's information on the CLI
System.out.println("RecordID " + info.id + " would been duplicated, ignoring it");
return;
}
try {
targetJournal.appendAddRecord(info.id, info.userRecordType, info.data, true);
targetJournal.appendAddRecord(info.id, info.userRecordType, info.data, false);
} catch (Exception e) {
// Really meant System.out.. user's information on the CLI
System.out.println("Cannot append record for " + info.id + "->" + e.getMessage());
}
}
@ -135,7 +182,8 @@ public class RecoverMessages extends DBOption {
long queue = ByteUtil.bytesToLong(info.data);
Pair<Long, Long> pairQueue = new Pair<>(info.id, queue);
if (routeBindigns.contains(pairQueue)) {
System.out.println("AddRef on " + info.id + " / queue=" + queue + " has already been recorded, ignoring it");
// really meant system.out
System.out.println("AddReference on " + info.id + " / queue=" + queue + " has already been recorded, ignoring it");
return;
}
@ -145,6 +193,7 @@ public class RecoverMessages extends DBOption {
targetJournal.appendUpdateRecord(info.id, info.userRecordType, info.data, true);
} catch (Exception e) {
System.out.println("Cannot update record " + info.id + "-> " + e.getMessage());
e.printStackTrace(System.out);
}
}
}

View File

@ -44,15 +44,16 @@ public final class CompactJournal extends LockAbstract {
}
public static void compactJournals(Configuration configuration) throws Exception {
compactJournal(configuration.getJournalLocation(), "activemq-data", "amq", configuration.getJournalMinFiles(),
compactJournal(configuration.getJournalLocation(), configuration.getJournalRetentionLocation(), "activemq-data", "amq", configuration.getJournalMinFiles(),
configuration.getJournalPoolFiles(), configuration.getJournalFileSize(), null, JournalRecordIds.UPDATE_DELIVERY_COUNT,
JournalRecordIds.SET_SCHEDULED_DELIVERY_TIME);
System.out.println("Compactation succeeded for " + configuration.getJournalLocation().getAbsolutePath());
compactJournal(configuration.getBindingsLocation(), "activemq-bindings", "bindings", 2, 2, 1048576, null);
compactJournal(configuration.getBindingsLocation(), null, "activemq-bindings", "bindings", 2, 2, 1048576, null);
System.out.println("Compactation succeeded for " + configuration.getBindingsLocation());
}
public static void compactJournal(final File directory,
final File historyFolder,
final String journalPrefix,
final String journalSuffix,
final int minFiles,
@ -63,6 +64,9 @@ public final class CompactJournal extends LockAbstract {
NIOSequentialFileFactory nio = new NIOSequentialFileFactory(directory, listener, 1);
JournalImpl journal = new JournalImpl(fileSize, minFiles, poolFiles, 0, 0, nio, journalPrefix, journalSuffix, 1);
if (historyFolder != null) {
journal.setHistoryFolder(historyFolder, -1, -1);
}
for (int i : replaceableRecords) {
journal.replaceableRecord(i);
}

View File

@ -64,6 +64,16 @@ ${jdbc}
that won't support flow control. -->
<max-disk-usage>90</max-disk-usage>
<!--
if you want to retain your journal uncomment this following configuration.
This will allow your system to keep 7 days of your data, up to 10G. Tweak it accordingly to your use case and capacity.
it is recommended to use a separate storage unit from the journal for performance considerations.
<journal-retention-directory period="7" unit="DAYS" storage-limit="10G">data/retention</journal-retention-directory>>
-->
<!-- should the broker detect dead locks and other issues -->
<critical-analyzer>true</critical-analyzer>

View File

@ -28,6 +28,10 @@ public class StringPrintStream {
return new PrintStream(byteOuptut, true, StandardCharsets.UTF_8.name());
}
public byte[] getBytes() {
return byteOuptut.toByteArray();
}
@Override
public String toString() {
try {

View File

@ -120,6 +120,16 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
this.syncDelay = syncDelay;
}
@Override
public void appendAddEvent(long id,
byte recordType,
Persister persister,
Object record,
boolean sync,
IOCompletion completionCallback) throws Exception {
// Nothing to be done
}
@Override
public void start() throws SQLException {
super.start();

View File

@ -366,10 +366,6 @@ public class NIOSequentialFile extends AbstractSequentialFile {
@Override
public void writeDirect(final ByteBuffer bytes, final boolean sync, final IOCallback callback) {
if (callback == null) {
throw new NullPointerException("callback parameter need to be set");
}
try {
internalWrite(bytes, sync, callback, true);
} catch (Exception e) {
@ -393,7 +389,7 @@ public class NIOSequentialFile extends AbstractSequentialFile {
boolean releaseBuffer) throws IOException, ActiveMQIOErrorException, InterruptedException {
if (!isOpen()) {
if (callback != null) {
callback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), "File not opened");
callback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), "File not opened - " + getFileName());
} else {
throw ActiveMQJournalBundle.BUNDLE.fileNotOpened();
}

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.core.journal;
import java.io.File;
import java.util.List;
import java.util.Map;
@ -62,6 +63,10 @@ public interface Journal extends ActiveMQComponent {
boolean isRemoveExtraFilesOnLoad();
default boolean isHistory() {
return false;
}
// Non transactional operations
void appendAddRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception;
@ -70,6 +75,10 @@ public interface Journal extends ActiveMQComponent {
appendAddRecord(id, recordType, EncoderPersister.getInstance(), record, sync);
}
default Journal setHistoryFolder(File historyFolder, long maxBytes, long period) throws Exception {
return this;
}
void appendAddRecord(long id, byte recordType, Persister persister, Object record, boolean sync) throws Exception;
void appendAddRecord(long id,
@ -79,6 +88,15 @@ public interface Journal extends ActiveMQComponent {
boolean sync,
IOCompletion completionCallback) throws Exception;
/** An event is data recorded on the journal, but it won't have any weight or deletes. It's always ready to be removed.
* It is useful on recovery data while in use with backup history journal. */
void appendAddEvent(long id,
byte recordType,
Persister persister,
Object record,
boolean sync,
IOCompletion completionCallback) throws Exception;
default void appendAddRecord(long id,
byte recordType,
EncodingSupport record,
@ -284,6 +302,17 @@ public interface Journal extends ActiveMQComponent {
*/
void synchronizationUnlock();
/**
* It will rename temporary files and place them on the copy folder, by resotring the original file name.
*/
default void processBackup() {
}
/**
* It will check max files and max days on files and remove extra files.
*/
default void processBackupCleanup() {
}
/**
* Force the usage of a new {@link JournalFile}.
*

View File

@ -101,7 +101,18 @@ public final class FileWrapperJournal extends JournalBase {
boolean sync,
IOCompletion callback) throws Exception {
JournalInternalRecord addRecord = new JournalAddRecord(true, id, recordType, persister, record);
writeRecord(addRecord, false, -1, false, callback);
}
@Override
public void appendAddEvent(long id,
byte recordType,
Persister persister,
Object record,
boolean sync,
IOCompletion callback) throws Exception {
JournalInternalRecord addRecord = new JournalAddRecord(JournalImpl.EVENT_RECORD, id, recordType, persister, record);
writeRecord(addRecord, false, -1, false, callback);
}

View File

@ -20,6 +20,13 @@ import org.apache.activemq.artemis.core.io.SequentialFile;
public interface JournalFile {
default boolean isReclaimable() {
return true;
}
default void setReclaimable(boolean reclaimable) {
}
int getNegCount(JournalFile file);
void incNegCount(JournalFile file);

View File

@ -35,6 +35,13 @@ public class JournalFileImpl implements JournalFile {
private long offset;
boolean reclaimable = true;
@Override
public void setReclaimable(boolean reclaimable) {
this.reclaimable = reclaimable;
}
private static final AtomicIntegerFieldUpdater<JournalFileImpl> posCountUpdater = AtomicIntegerFieldUpdater.newUpdater(JournalFileImpl.class, "posCountField");
private static final AtomicIntegerFieldUpdater<JournalFileImpl> addRecordUpdate = AtomicIntegerFieldUpdater.newUpdater(JournalFileImpl.class, "addRecordField");
private static final AtomicIntegerFieldUpdater<JournalFileImpl> liveBytesUpdater = AtomicIntegerFieldUpdater.newUpdater(JournalFileImpl.class, "liveBytesField");
@ -92,7 +99,7 @@ public class JournalFileImpl implements JournalFile {
@Override
public boolean isCanReclaim() {
return posReclaimCriteria && negReclaimCriteria && !file.isPending();
return reclaimable && posReclaimCriteria && negReclaimCriteria && !file.isPending();
}
@Override

View File

@ -192,7 +192,7 @@ public class JournalFilesRepository {
for (JournalFile file : files) {
final long fileIdFromFile = file.getFileID();
final long fileIdFromName = getFileNameID(file.getFile().getFileName());
final long fileIdFromName = getFileNameID(filePrefix, file.getFile().getFileName());
// The compactor could create a fileName but use a previously assigned ID.
// Because of that we need to take both parts into account
@ -718,11 +718,15 @@ public class JournalFilesRepository {
/**
* Get the ID part of the name
*/
private long getFileNameID(final String fileName) {
public static long getFileNameID(String filePrefix, final String fileName) {
try {
return Long.parseLong(fileName.substring(filePrefix.length() + 1, fileName.indexOf('.')));
} catch (Throwable e) {
ActiveMQJournalLogger.LOGGER.errorRetrievingID(e, fileName);
try {
return Long.parseLong(fileName.substring(fileName.lastIndexOf("-") + 1, fileName.indexOf('.')));
} catch (Throwable e2) {
ActiveMQJournalLogger.LOGGER.errorRetrievingID(e, fileName);
}
return 0;
}
}

View File

@ -16,16 +16,24 @@
*/
package org.apache.activemq.artemis.core.journal.impl;
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Collections;
import java.util.Comparator;
import java.util.GregorianCalendar;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
@ -108,6 +116,9 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
*
* */
public static final double UPDATE_FACTOR;
private static final String BKP_EXTENSION = "bkp";
public static final String BKP = "." + BKP_EXTENSION;
static {
String UPDATE_FACTOR_STR = System.getProperty(JournalImpl.class.getName() + ".UPDATE_FACTOR");
@ -146,6 +157,9 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
// Record markers - they must be all unique
public static final byte EVENT_RECORD = 10;
public static final byte ADD_RECORD = 11;
public static final byte UPDATE_RECORD = 12;
@ -202,6 +216,75 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
private final JournalFilesRepository filesRepository;
private File journalRetentionFolder;
private long journalRetentionPeriod = -1;
private int journalRetentionMaxFiles = -1;
private final List<JournalFile> historyPendingFiles = Collections.synchronizedList(new LinkedList<>());
// This is to guarantee only one thread is making a copy of a file
// the processBackup is pretty much single threaded happening at the compactorExecutor
// there are a few exceptions like startup, or during a replica-copy-catch-up in a small possibility
private final Object processBackupLock = new Object();
@Override
public boolean isHistory() {
return journalRetentionFolder != null;
}
@Override
public JournalImpl setHistoryFolder(File historyFolder, long maxBytes, long period) throws Exception {
if (this.state != JournalState.STOPPED) {
throw new IllegalStateException("State = " + state);
}
this.journalRetentionFolder = historyFolder;
this.journalRetentionFolder.mkdirs();
this.journalRetentionMaxFiles = (int) (maxBytes / this.fileSize);
this.journalRetentionPeriod = period;
try {
List<String> files = this.fileFactory.listFiles(BKP_EXTENSION);
for (String name : files) {
SequentialFile file = fileFactory.createSequentialFile(name);
JournalFileImpl journalFile;
try {
file.open();
journalFile = readFileHeader(file);
} finally {
file.close();
}
historyPendingFiles.add(journalFile);
}
for (JournalFile file : historyPendingFiles) {
File[] repeatFiles = historyFolder.listFiles((a, name) -> name.startsWith(getFilePrefix()) && name.endsWith(file.getFileID() + "." + filesRepository.getFileExtension()));
for (File f : repeatFiles) {
logger.warn("File " + f + " was partially copied before, removing the file");
f.delete();
}
}
processBackup();
} catch (Exception e) {
logger.warn(e.getMessage(), e);
if (criticalErrorListener != null) {
criticalErrorListener.onIOException(e, e.getMessage(), null);
}
}
return this;
}
// Compacting may replace this structure
private final ConcurrentLongHashMap<JournalRecord> records = new ConcurrentLongHashMap<>();
@ -230,6 +313,8 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
protected ExecutorFactory ioExecutorFactory;
private ThreadPoolExecutor threadPool;
ThreadLocal<GregorianCalendar> calendarThreadLocal = ThreadLocal.withInitial(() -> new GregorianCalendar());
/**
* We don't lock the journal during the whole compacting operation. During compacting we only
* lock it (i) when gathering the initial structure, and (ii) when replicating the structures
@ -339,7 +424,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
final String fileExtension,
final int maxAIO,
final int userVersion) {
this(ioExecutors, fileSize, minFiles, poolSize, compactMinFiles, compactPercentage, journalFileOpenTimeout, fileFactory, filePrefix, fileExtension, maxAIO, userVersion, null, 0);
this(ioExecutors, fileSize, minFiles, poolSize, compactMinFiles, compactPercentage, journalFileOpenTimeout, fileFactory, filePrefix, fileExtension, maxAIO, userVersion, (a, b, c) -> logger.warn(a.getMessage(), a), 0);
}
@ -510,7 +595,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
byte recordType = wholeFileBuffer.get();
if (recordType < JournalImpl.ADD_RECORD || recordType > JournalImpl.ROLLBACK_RECORD) {
if (recordType < JournalImpl.EVENT_RECORD || recordType > JournalImpl.ROLLBACK_RECORD) {
// I - We scan for any valid record on the file. If a hole
// happened on the middle of the file we keep looking until all
// the possibilities are gone
@ -711,6 +796,11 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
}
switch (recordType) {
case EVENT_RECORD: {
reader.onReadEventRecord(new RecordInfo(recordID, userRecordType, record, false, compactCount));
break;
}
case ADD_RECORD: {
reader.onReadAddRecord(new RecordInfo(recordID, userRecordType, record, false, compactCount));
break;
@ -781,6 +871,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
lastDataPos = wholeFileBuffer.position();
}
reader.done();
return lastDataPos;
} catch (Throwable e) {
ActiveMQJournalLogger.LOGGER.errorReadingFile(e);
@ -846,16 +937,16 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
if (logger.isTraceEnabled()) {
logger.trace("appendAddRecord::id=" + id +
", userRecordType=" +
recordType +
", record = " + record +
", usedFile = " +
usedFile);
", userRecordType=" +
recordType +
", record = " + record +
", usedFile = " +
usedFile);
}
result.set(true);
} catch (ActiveMQShutdownException e) {
result.fail(e);
logger.error("appendPrepareRecord:" + e, e);
logger.error("appendAddRecord:" + e, e);
} catch (Throwable e) {
result.fail(e);
setErrorCondition(callback, null, e);
@ -870,6 +961,64 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
result.get();
}
@Override
public void appendAddEvent(final long id,
final byte recordType,
final Persister persister,
final Object record,
final boolean sync,
final IOCompletion callback) throws Exception {
checkJournalIsLoaded();
lineUpContext(callback);
if (logger.isTraceEnabled()) {
logger.trace("scheduling appendAddEvent::id=" + id +
", userRecordType=" +
recordType +
", record = " + record);
}
final long maxRecordSize = getMaxRecordSize();
final JournalInternalRecord addRecord = new JournalAddRecord(JournalImpl.EVENT_RECORD, id, recordType, persister, record);
final int addRecordEncodeSize = addRecord.getEncodeSize();
if (addRecordEncodeSize > maxRecordSize) {
//The record size should be larger than max record size only on the large messages case.
throw ActiveMQJournalBundle.BUNDLE.recordLargerThanStoreMax(addRecordEncodeSize, maxRecordSize);
}
final SimpleFuture<Boolean> result = newSyncAndCallbackResult(sync, callback);
appendExecutor.execute(() -> {
journalLock.readLock().lock();
try {
JournalFile usedFile = appendRecord(addRecord, false, sync, null, callback);
if (logger.isTraceEnabled()) {
logger.trace("appendAddEvent:id=" + id +
", userRecordType=" +
recordType +
", record = " + record +
", usedFile = " +
usedFile);
}
result.set(true);
} catch (ActiveMQShutdownException e) {
result.fail(e);
logger.error("appendAddEvent:" + e, e);
} catch (Throwable e) {
result.fail(e);
setErrorCondition(callback, null, e);
logger.error("appendAddEvent::" + e, e);
} finally {
pendingRecords.remove(id);
journalLock.readLock().unlock();
}
});
result.get();
}
@Override
public void appendUpdateRecord(final long id,
final byte recordType,
@ -1676,7 +1825,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
onCompactStart();
dataFilesToProcess = getDataListToProcess();
dataFilesToProcess = getDataListToCompact();
if (dataFilesToProcess == null)
return;
@ -1812,7 +1961,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
/** this private method will return a list of data files that need to be cleaned up.
* It will get the list, and replace it on the journal structure, while a separate thread would be able
* to read it, and append to a new list that will be replaced on the journal. */
private ArrayList<JournalFile> getDataListToProcess() throws Exception {
private ArrayList<JournalFile> getDataListToCompact() throws Exception {
ArrayList<JournalFile> dataFilesToProcess = new ArrayList<>(filesRepository.getDataFilesCount());
// We need to guarantee that the journal is frozen for this short time
// We don't freeze the journal as we compact, only for the short time where we replace records
@ -1858,6 +2007,8 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
journalLock.writeLock().unlock();
}
processBackup();
for (JournalFile file : dataFilesToProcess) {
file.getFile().waitNotPending();
}
@ -2216,7 +2367,9 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
}
}
checkReclaimStatus();
if (changeData) {
checkReclaimStatus();
}
return new JournalLoadInformation(records.size(), maxID.longValue());
}
@ -2245,6 +2398,183 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
}
}
@Override
public void processBackupCleanup() {
if (logger.isDebugEnabled()) {
logger.debug("processBackupCleanup with maxFiles = " + journalRetentionMaxFiles + " and period = " + journalRetentionPeriod);
}
if (journalRetentionFolder != null && (journalRetentionMaxFiles > 0 || journalRetentionPeriod > 0)) {
FilenameFilter fnf = (d, name) -> name.endsWith("." + filesRepository.getFileExtension());
if (journalRetentionPeriod > 0) {
String[] fileNames = journalRetentionFolder.list(fnf);
Arrays.sort(fileNames);
GregorianCalendar calendar = this.calendarThreadLocal.get();
calendar.setTimeInMillis(System.currentTimeMillis() - journalRetentionPeriod);
long timeCutOf = calendar.getTimeInMillis();
for (String fileName : fileNames) {
long timeOnFile = getDatePortionMillis(fileName);
if (timeOnFile < timeCutOf) {
logger.debug("File " + fileName + " is too old and should go");
File fileToRemove = new File(journalRetentionFolder, fileName);
if (!fileToRemove.delete()) {
logger.debug("Could not remove " + fileToRemove);
}
} else {
break;
}
}
}
if (journalRetentionMaxFiles > 0) {
String[] fileNames = journalRetentionFolder.list(fnf);
Arrays.sort(fileNames);
if (fileNames.length > journalRetentionMaxFiles) {
int toRemove = fileNames.length - journalRetentionMaxFiles;
for (String file : fileNames) {
logger.debug("Removing " + file);
File fileToRemove = new File(journalRetentionFolder, file);
fileToRemove.delete();
toRemove--;
if (toRemove <= 0) {
break;
}
}
}
}
}
}
/** With the exception of initialization, this has to be always called within the compactorExecutor */
@Override
public void processBackup() {
if (this.journalRetentionFolder == null) {
return;
}
synchronized (processBackupLock) {
ArrayList<JournalFile> filesToMove;
filesToMove = new ArrayList<>(historyPendingFiles.size());
filesToMove.addAll(historyPendingFiles);
historyPendingFiles.clear();
for (JournalFile fileToCopy : filesToMove) {
copyFile(fileToCopy);
}
}
if (compactorExecutor != null) {
compactorExecutor.execute(this::processBackupCleanup);
} else {
processBackupCleanup();
}
}
// This exists to avoid a race with copying the files on initial replica
// we get the list, and check each individual file if they have the pending copy
private void checkRetentionFile(JournalFile file) {
if (this.journalRetentionFolder == null) {
return;
}
// It is cheaper to check without a lock
if (!file.getFile().getFileName().endsWith(BKP)) {
return;
}
copyFile(file);
}
// you need to synchronize processBackupLock before calling this
private void copyFile(JournalFile fileToCopy) {
synchronized (processBackupLock) {
if (fileToCopy == null || !fileToCopy.getFile().getFileName().endsWith(BKP)) {
return;
}
long fileId = fileToCopy.getFileID();
GregorianCalendar calendar = calendarThreadLocal.get();
calendar.setTimeInMillis(System.currentTimeMillis());
String fileName = getHistoryFileName(fileId, calendar);
File copyFrom = fileToCopy.getFile().getJavaFile();
File copyTo = new File(journalRetentionFolder, fileName);
if (logger.isDebugEnabled()) {
logger.debug("Copying journal retention from " + copyFrom + " to " + copyTo);
}
try {
Files.copy(copyFrom.toPath(), copyTo.toPath(), StandardCopyOption.REPLACE_EXISTING);
} catch (IOException e) {
logger.warn(e.getMessage(), e);
try {
criticalIO(e);
} catch (Exception ignored) {
}
}
try {
fileToCopy.getFile().renameTo(removeBackupExtension(fileToCopy.getFile().getFileName()));
} catch (Exception e) {
logger.warn(e.getMessage(), e);
if (criticalErrorListener != null) {
criticalErrorListener.onIOException(e, e.getMessage(), fileToCopy.getFile());
}
}
fileToCopy.setReclaimable(true);
}
}
public String getHistoryFileName(long sequence, Calendar calendar) {
String fileName = String.format("%s-%04d%02d%02d%02d%02d%02d-%d.%s", filesRepository.getFilePrefix(), calendar.get(Calendar.YEAR),
calendar.get(Calendar.MONTH), calendar.get(Calendar.DAY_OF_MONTH), calendar.get(Calendar.HOUR_OF_DAY),
calendar.get(Calendar.MINUTE), calendar.get(Calendar.SECOND), sequence, filesRepository.getFileExtension());
return fileName;
}
public String removeBackupExtension(String name) {
int indexOfBKP = name.indexOf(BKP);
if (indexOfBKP >= 0) {
return name.substring(0, name.indexOf(BKP));
} else {
return name;
}
}
public long getDatePortionMillis(String name) {
String datePortion = getDatePortion(name);
GregorianCalendar calendar = calendarThreadLocal.get();
int year = Integer.parseInt(datePortion.substring(0, 4));
int month = Integer.parseInt(datePortion.substring(4, 6));
int day = Integer.parseInt(datePortion.substring(6, 8));
int hour = Integer.parseInt(datePortion.substring(8, 10));
int minutes = Integer.parseInt(datePortion.substring(10, 12));
int seconds = Integer.parseInt(datePortion.substring(12, 14));
calendar.set(year, month, day, hour, minutes, seconds);
return calendar.getTimeInMillis();
}
public String getDatePortion(String name) {
return name.substring(filesRepository.getFilePrefix().length() + 1, name.indexOf("-", filesRepository.getFilePrefix().length() + 1));
}
/**
* @return true if cleanup was called
*/
@ -2487,7 +2817,11 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
@Override
public JournalFile[] getDataFiles() {
return filesRepository.getDataFilesArray();
JournalFile[] files = filesRepository.getDataFilesArray();
for (JournalFile file : files) {
checkRetentionFile(file);
}
return files;
}
@Override
@ -2775,7 +3109,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
}
private static boolean isContainsBody(final byte recordType) {
return recordType >= JournalImpl.ADD_RECORD && recordType <= JournalImpl.DELETE_RECORD_TX;
return recordType >= JournalImpl.EVENT_RECORD && recordType <= JournalImpl.DELETE_RECORD_TX;
}
private static int getRecordSize(final byte recordType, final int journalVersion) {
@ -2783,6 +3117,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
int recordSize = 0;
switch (recordType) {
case ADD_RECORD:
case EVENT_RECORD:
recordSize = JournalImpl.SIZE_ADD_RECORD;
break;
case UPDATE_RECORD:
@ -2827,7 +3162,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
* @return
* @throws Exception
*/
private JournalFileImpl readFileHeader(final SequentialFile file) throws Exception {
public JournalFileImpl readFileHeader(final SequentialFile file) throws Exception {
ByteBuffer bb = fileFactory.newBuffer(JournalImpl.SIZE_HEADER);
file.read(bb);
@ -2973,6 +3308,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
@Override
public void run() {
try {
processBackup();
if (!checkReclaimStatus()) {
checkCompact();
}
@ -3257,6 +3593,13 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
protected void moveNextFile(final boolean scheduleReclaim, boolean blockOnClose) throws Exception {
filesRepository.closeFile(currentFile, blockOnClose);
if (this.journalRetentionFolder != null) {
currentFile.setReclaimable(false);
currentFile.getFile().renameTo(currentFile.getFile().getFileName() + BKP);
this.historyPendingFiles.add(currentFile);
}
currentFile = filesRepository.openFile();
if (scheduleReclaim) {

View File

@ -20,6 +20,12 @@ import org.apache.activemq.artemis.core.journal.RecordInfo;
public interface JournalReaderCallback {
default void onReadEventRecord(RecordInfo info) throws Exception {
}
default void done() {
}
void onReadAddRecord(RecordInfo info) throws Exception;
/**

View File

@ -30,7 +30,24 @@ public class JournalAddRecord extends JournalInternalRecord {
protected final byte recordType;
protected final boolean add;
protected final byte journalType;
/**
* @param id
* @param recordType
* @param record
*/
public JournalAddRecord(final byte journalType, final long id, final byte recordType, final Persister persister, Object record) {
this.id = id;
this.record = record;
this.recordType = recordType;
this.journalType = journalType;
this.persister = persister;
}
/**
* @param id
@ -38,24 +55,12 @@ public class JournalAddRecord extends JournalInternalRecord {
* @param record
*/
public JournalAddRecord(final boolean add, final long id, final byte recordType, final Persister persister, Object record) {
this.id = id;
this.record = record;
this.recordType = recordType;
this.add = add;
this.persister = persister;
this(add ? JournalImpl.ADD_RECORD : JournalImpl.UPDATE_RECORD, id, recordType, persister, record);
}
@Override
public void encode(final ActiveMQBuffer buffer) {
if (add) {
buffer.writeByte(JournalImpl.ADD_RECORD);
} else {
buffer.writeByte(JournalImpl.UPDATE_RECORD);
}
buffer.writeByte(journalType);
buffer.writeInt(fileID);

View File

@ -22,6 +22,7 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
@ -668,6 +669,24 @@ public interface Configuration {
*/
Configuration setJournalDirectory(String dir);
String getJournalRetentionDirectory();
/**
* Sets the file system directory used to store historical backup journal.
*/
Configuration setJournalRetentionDirectory(String dir);
File getJournalRetentionLocation();
/** The retention period for the journal in milliseconds (always in milliseconds, a conversion is performed on set) */
long getJournalRetentionPeriod();
Configuration setJournalRetentionPeriod(TimeUnit unit, long limit);
long getJournalRetentionMaxBytes();
Configuration setJournalRetentionMaxBytes(long bytes);
/**
* Returns the type of journal used by this server ({@code NIO}, {@code ASYNCIO} or {@code MAPPED}).
* <br>
@ -1350,4 +1369,5 @@ public interface Configuration {
String getTemporaryQueueNamespace();
Configuration setTemporaryQueueNamespace(String temporaryQueueNamespace);
}

View File

@ -39,6 +39,7 @@ import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.BroadcastGroupConfiguration;
@ -199,6 +200,12 @@ public class ConfigurationImpl implements Configuration, Serializable {
protected String journalDirectory = ActiveMQDefaultConfiguration.getDefaultJournalDir();
protected String journalRetentionDirectory = null;
protected long journalRetentionMaxBytes = 0;
protected long journalRetentionPeriod;
protected String nodeManagerLockDirectory = null;
protected boolean createJournalDir = ActiveMQDefaultConfiguration.isDefaultCreateJournalDir();
@ -367,6 +374,52 @@ public class ConfigurationImpl implements Configuration, Serializable {
// Public -------------------------------------------------------------------------
@Override
public String getJournalRetentionDirectory() {
return journalRetentionDirectory;
}
@Override
public ConfigurationImpl setJournalRetentionDirectory(String dir) {
this.journalRetentionDirectory = dir;
return this;
}
@Override
public File getJournalRetentionLocation() {
if (journalRetentionDirectory == null) {
return null;
} else {
return subFolder(getJournalRetentionDirectory());
}
}
@Override
public long getJournalRetentionPeriod() {
return this.journalRetentionPeriod;
}
@Override
public Configuration setJournalRetentionPeriod(TimeUnit unit, long period) {
if (period <= 0) {
this.journalRetentionPeriod = -1;
} else {
this.journalRetentionPeriod = unit.toMillis(period);
}
return this;
}
@Override
public long getJournalRetentionMaxBytes() {
return journalRetentionMaxBytes;
}
@Override
public ConfigurationImpl setJournalRetentionMaxBytes(long bytes) {
this.journalRetentionMaxBytes = bytes;
return this;
}
@Override
public Configuration setSystemPropertyPrefix(String systemPropertyPrefix) {
this.systemPropertyPrefix = systemPropertyPrefix;
@ -2505,7 +2558,7 @@ public class ConfigurationImpl implements Configuration, Serializable {
/**
* It will find the right location of a subFolder, related to artemisInstance
*/
private File subFolder(String subFolder) {
public File subFolder(String subFolder) {
try {
return getBrokerInstance().toPath().resolve(subFolder).toFile();
} catch (Exception e) {

View File

@ -31,6 +31,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.ArtemisConstants;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
@ -626,6 +627,9 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
config.setJournalDirectory(getString(e, "journal-directory", config.getJournalDirectory(), Validators.NOT_NULL_OR_EMPTY));
parseJournalRetention(e, config);
config.setNodeManagerLockDirectory(getString(e, "node-manager-lock-directory", null, Validators.NO_CHECK));
config.setPageMaxConcurrentIO(getInteger(e, "page-max-concurrent-io", config.getPageMaxConcurrentIO(), Validators.MINUS_ONE_OR_GT_ZERO));
@ -768,6 +772,47 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
}
}
private void parseJournalRetention(final Element e, final Configuration config) {
NodeList retention = e.getElementsByTagName("journal-retention-directory");
if (retention.getLength() != 0) {
Element node = (Element) retention.item(0);
String directory = node.getTextContent().trim();
String storageLimitStr = getAttributeValue(node, "storage-limit");
long storageLimit;
if (storageLimitStr == null) {
storageLimit = -1;
} else {
storageLimit = ByteUtil.convertTextBytes(storageLimitStr.trim());
}
int period = getAttributeInteger(node, "period", -1, Validators.GT_ZERO);
String unitStr = getAttributeValue(node, "unit");
if (unitStr == null) {
unitStr = "DAYS";
}
TimeUnit unit = TimeUnit.valueOf(unitStr.toUpperCase());
config.setJournalRetentionDirectory(directory);
config.setJournalRetentionMaxBytes(storageLimit);
config.setJournalRetentionPeriod(unit, period);
if (directory == null || directory.equals("")) {
throw new IllegalArgumentException("journal-retention-directory=null");
}
if (storageLimit == -1 && period == -1) {
throw new IllegalArgumentException("configure either storage-limit or period on journal-retention-directory");
}
}
}
/**
* @param e
* @param config

View File

@ -449,6 +449,13 @@ public final class Page implements Comparable<Page> {
}
public synchronized void write(final PagedMessage message) throws Exception {
writeDirect(message);
storageManager.pageWrite(message, pageId);
}
/** This write will not interact back with the storage manager.
* To avoid ping pongs with Journal retaining events and any other stuff. */
public void writeDirect(PagedMessage message) throws Exception {
if (!file.isOpen()) {
throw ActiveMQMessageBundle.BUNDLE.cannotWriteToClosedFile(file);
}
@ -471,7 +478,6 @@ public final class Page implements Comparable<Page> {
//lighter than addAndGet when single writer
numberOfMessages.lazySet(numberOfMessages.get() + 1);
size.lazySet(size.get() + bufferSize);
storageManager.pageWrite(message, pageId);
}
public void sync() throws Exception {

View File

@ -1550,6 +1550,10 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
bindingsJournal.start();
if (config.getJournalRetentionLocation() != null) {
messageJournal.setHistoryFolder(config.getJournalRetentionLocation(), config.getJournalRetentionMaxBytes(), config.getJournalRetentionPeriod());
}
messageJournal.start();
started = true;

View File

@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.persistence.impl.journal;
import java.util.function.Consumer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
/** this class will split a big buffer into smaller buffers */
public class BufferSplitter {
public static void split(ActiveMQBuffer buffer, int splitSize, Consumer<EncodingSupport> target) {
byte[] bytesBuffer = new byte[buffer.readableBytes()];
buffer.readBytes(bytesBuffer);
split(bytesBuffer, splitSize, target);
}
public static void split(byte[] buffer, int splitSize, Consumer<EncodingSupport> target) {
int location = 0;
while (location < buffer.length) {
int maxSize = Math.min(splitSize, buffer.length - location);
target.accept(new PartialEncoding(buffer, location, maxSize));
location += maxSize;
}
}
protected static class PartialEncoding implements EncodingSupport {
final byte[] data;
final int begin;
final int length;
public PartialEncoding(final byte[] data, final int begin, final int length) {
this.data = data;
this.begin = begin;
this.length = length;
}
// Public --------------------------------------------------------
@Override
public void decode(final ActiveMQBuffer buffer) {
throw new IllegalStateException("operation not supported");
}
@Override
public void encode(final ActiveMQBuffer buffer) {
buffer.writeBytes(data, begin, length);
}
@Override
public int getEncodeSize() {
return length;
}
}
}

View File

@ -213,6 +213,10 @@ public final class DescribeJournal {
recordsPrintStream.println("#" + file + " (size=" + file.getFile().size() + ")");
JournalImpl.readJournalFile(fileFactory, file, new JournalReaderCallback() {
@Override
public void onReadEventRecord(RecordInfo recordInfo) throws Exception {
recordsPrintStream.println("operation@Event;" + describeRecord(recordInfo, safe));
}
@Override
public void onReadUpdateRecordTX(final long transactionID, final RecordInfo recordInfo) throws Exception {

View File

@ -96,4 +96,7 @@ public final class JournalRecordIds {
public static final byte ROLE_RECORD = 48;
// Used to record the large message body on the journal when history is on
public static final byte ADD_MESSAGE_BODY = 49;
}

View File

@ -46,6 +46,8 @@ import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.io.aio.AIOSequentialFileFactory;
import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory;
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
import org.apache.activemq.artemis.core.journal.EncoderPersister;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.core.journal.Journal;
import org.apache.activemq.artemis.core.journal.impl.JournalFile;
import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
@ -55,6 +57,7 @@ import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.LargeMessagePersister;
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PendingLargeMessageEncoding;
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.RefEncoding;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationLiveIsStoppingMessage;
import org.apache.activemq.artemis.core.replication.ReplicatedJournal;
import org.apache.activemq.artemis.core.replication.ReplicationManager;
@ -405,6 +408,23 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
@Override
public void pageWrite(final PagedMessage message, final int pageNumber) {
if (messageJournal.isHistory()) {
try (ArtemisCloseable lock = closeableReadLock()) {
Message theMessage = message.getMessage();
if (theMessage.isLargeMessage() && theMessage instanceof LargeServerMessageImpl) {
messageJournal.appendAddEvent(theMessage.getMessageID(), JournalRecordIds.ADD_LARGE_MESSAGE, LargeMessagePersister.getInstance(), theMessage, false, getContext(false));
} else {
messageJournal.appendAddEvent(theMessage.getMessageID(), JournalRecordIds.ADD_MESSAGE_PROTOCOL, theMessage.getPersister(), theMessage, false, getContext(false));
}
for (long queueID : message.getQueueIDs()) {
messageJournal.appendAddEvent(message.getMessage().getMessageID(), JournalRecordIds.ADD_REF, EncoderPersister.getInstance(), new RefEncoding(queueID), false, getContext(false));
}
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
}
if (isReplicated()) {
// Note: (https://issues.jboss.org/browse/HORNETQ-1059)
// We have to replicate durable and non-durable messages on paging
@ -840,6 +860,9 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
final long messageId,
final ActiveMQBuffer bytes) throws Exception {
try (ArtemisCloseable lock = closeableReadLock()) {
if (messageJournal.isHistory()) {
BufferSplitter.split(bytes, 10 * 1024, (c) -> historyBody(messageId, c));
}
file.position(file.size());
if (bytes.byteBuf() != null && bytes.byteBuf().nioBufferCount() == 1) {
final ByteBuffer nioBytes = bytes.byteBuf().internalNioBuffer(bytes.readerIndex(), bytes.readableBytes());
@ -859,11 +882,24 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
}
}
private void historyBody(long messageId, EncodingSupport partialBuffer) {
try {
messageJournal.appendAddEvent(messageId, JournalRecordIds.ADD_MESSAGE_BODY, EncoderPersister.getInstance(), partialBuffer, true, null);
} catch (Exception e) {
logger.warn("Error processing history large message body for " + messageId + " - " + e.getMessage(), e);
}
}
@Override
public final void addBytesToLargeMessage(final SequentialFile file,
final long messageId,
final byte[] bytes) throws Exception {
try (ArtemisCloseable lock = closeableReadLock()) {
if (messageJournal.isHistory()) {
BufferSplitter.split(bytes, 10 * 1024, (c) -> historyBody(messageId, c));
}
file.position(file.size());
//that's an additional precaution to avoid ByteBuffer to be pooled:
//NIOSequentialFileFactory doesn't pool heap ByteBuffer, but better to make evident

View File

@ -78,7 +78,7 @@ public final class ReplicationAddMessage extends PacketImpl {
@Override
public void encodeRest(final ActiveMQBuffer buffer) {
buffer.writeByte(journalID);
buffer.writeBoolean(operation.toBoolean());
buffer.writeByte(operation.toRecord());
buffer.writeLong(id);
buffer.writeByte(journalRecordType);
buffer.writeInt(persister.getEncodeSize(encodingData));
@ -88,7 +88,7 @@ public final class ReplicationAddMessage extends PacketImpl {
@Override
public void decodeRest(final ActiveMQBuffer buffer) {
journalID = buffer.readByte();
operation = ADD_OPERATION_TYPE.toOperation(buffer.readBoolean());
operation = ADD_OPERATION_TYPE.toOperation(buffer.readByte());
id = buffer.readLong();
journalRecordType = buffer.readByte();
final int recordDataSize = buffer.readInt();

View File

@ -83,7 +83,7 @@ public class ReplicationAddTXMessage extends PacketImpl {
@Override
public void encodeRest(final ActiveMQBuffer buffer) {
buffer.writeByte(journalID);
buffer.writeBoolean(operation.toBoolean());
buffer.writeByte(operation.toRecord());
buffer.writeLong(txId);
buffer.writeLong(id);
buffer.writeByte(recordType);
@ -94,7 +94,7 @@ public class ReplicationAddTXMessage extends PacketImpl {
@Override
public void decodeRest(final ActiveMQBuffer buffer) {
journalID = buffer.readByte();
operation = ADD_OPERATION_TYPE.toOperation(buffer.readBoolean());
operation = ADD_OPERATION_TYPE.toOperation(buffer.readByte());
txId = buffer.readLong();
id = buffer.readLong();
recordType = buffer.readByte();

View File

@ -129,6 +129,20 @@ public class ReplicatedJournal implements Journal {
localJournal.appendAddRecord(id, recordType, persister, record, sync, completionCallback);
}
@Override
public void appendAddEvent(long id,
byte recordType,
Persister persister,
Object record,
boolean sync,
IOCompletion completionCallback) throws Exception {
if (log.isTraceEnabled()) {
log.trace("Append record id = " + id + " recordType = " + recordType);
}
replicationManager.appendUpdateRecord(journalID, ADD_OPERATION_TYPE.EVENT, id, recordType, persister, record);
localJournal.appendAddEvent(id, recordType, persister, record, sync, completionCallback);
}
/**
* @param txID
* @param id

View File

@ -39,11 +39,13 @@ import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.journal.EncoderPersister;
import org.apache.activemq.artemis.core.journal.Journal;
import org.apache.activemq.artemis.core.journal.Journal.JournalState;
import org.apache.activemq.artemis.core.journal.JournalLoadInformation;
import org.apache.activemq.artemis.core.journal.impl.FileWrapperJournal;
import org.apache.activemq.artemis.core.journal.impl.JournalFile;
import org.apache.activemq.artemis.core.journal.impl.dataformat.ByteArrayEncoding;
import org.apache.activemq.artemis.core.paging.PagedMessage;
import org.apache.activemq.artemis.core.paging.PagingManager;
import org.apache.activemq.artemis.core.paging.impl.Page;
@ -750,16 +752,25 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
*/
private void handleAppendAddRecord(final ReplicationAddMessage packet) throws Exception {
Journal journalToUse = getJournal(packet.getJournalID());
if (packet.getRecord() == ADD_OPERATION_TYPE.UPDATE) {
if (logger.isTraceEnabled()) {
logger.trace("Endpoint appendUpdate id = " + packet.getId());
}
journalToUse.appendUpdateRecord(packet.getId(), packet.getJournalRecordType(), packet.getRecordData(), noSync);
} else {
if (logger.isTraceEnabled()) {
logger.trace("Endpoint append id = " + packet.getId());
}
journalToUse.appendAddRecord(packet.getId(), packet.getJournalRecordType(), packet.getRecordData(), noSync);
switch (packet.getRecord()) {
case UPDATE:
if (logger.isTraceEnabled()) {
logger.trace("Endpoint appendUpdate id = " + packet.getId());
}
journalToUse.appendUpdateRecord(packet.getId(), packet.getJournalRecordType(), packet.getRecordData(), noSync);
break;
case ADD:
if (logger.isTraceEnabled()) {
logger.trace("Endpoint append id = " + packet.getId());
}
journalToUse.appendAddRecord(packet.getId(), packet.getJournalRecordType(), packet.getRecordData(), noSync);
break;
case EVENT:
if (logger.isTraceEnabled()) {
logger.trace("Endpoint append id = " + packet.getId());
}
journalToUse.appendAddEvent(packet.getId(), packet.getJournalRecordType(), EncoderPersister.getInstance(), new ByteArrayEncoding(packet.getRecordData()), noSync, null);
break;
}
}
@ -800,7 +811,7 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
pgdMessage.initMessage(storageManager);
Message msg = pgdMessage.getMessage();
Page page = getPage(msg.getAddressSimpleString(), packet.getPageNumber());
page.write(pgdMessage);
page.writeDirect(pgdMessage);
}
private ConcurrentMap<Integer, Page> getPageMap(final SimpleString storeName) {

View File

@ -103,20 +103,33 @@ public final class ReplicationManager implements ActiveMQComponent {
public enum ADD_OPERATION_TYPE {
UPDATE {
@Override
public boolean toBoolean() {
return true;
public byte toRecord() {
return 0;
}
}, ADD {
@Override
public boolean toBoolean() {
return false;
public byte toRecord() {
return 1;
}
}, EVENT {
@Override
public byte toRecord() {
return 2;
}
};
public abstract boolean toBoolean();
public abstract byte toRecord();
public static ADD_OPERATION_TYPE toOperation(boolean isUpdate) {
return isUpdate ? UPDATE : ADD;
public static ADD_OPERATION_TYPE toOperation(byte recordType) {
switch (recordType) {
case 0: // 0: it used to be false, we need to use 0 for compatibility reasons with writeBoolean on the channel
return UPDATE;
case 1: // 1: it used to be true, we need to use 1 for compatibility reasons with writeBoolean
return ADD;
case 2: // 2: this represents the new value
return EVENT;
}
return ADD;
}
}

View File

@ -675,6 +675,53 @@
</xsd:annotation>
</xsd:element>
<xsd:element name="journal-retention-directory" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
the directory to store journal-retention message in and rention configuraion.
</xsd:documentation>
</xsd:annotation>
<xsd:complexType>
<xsd:simpleContent>
<xsd:extension base="xsd:string">
<xsd:attribute name="unit" use="optional">
<xsd:annotation>
<xsd:documentation>
This configures the period type to use on limit. By default it is DAYS.
</xsd:documentation>
</xsd:annotation>
<xsd:simpleType>
<xsd:restriction base="xsd:string">
<xsd:enumeration value="DAYS"/>
<xsd:enumeration value="HOURS"/>
<xsd:enumeration value="MINUTES"/>
<xsd:enumeration value="SECONDS"/>
</xsd:restriction>
</xsd:simpleType>
</xsd:attribute>
<xsd:attribute name="period" type="xsd:integer" use="optional">
<xsd:annotation>
<xsd:documentation>
The amount of time used to keep files.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="storage-limit" type="xsd:string" use="optional">
<xsd:annotation>
<xsd:documentation>
Size (in bytes) before we starting removing files from the retention area.
this is an extra protection on top of the period.
Notice we first remove files based on period and if you're using more storage then you
configured we start removing older files.
By default this is unlimited (not filled).
Supports byte notation like "K", "Mb", "GB", etc.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
</xsd:extension>
</xsd:simpleContent>
</xsd:complexType>
</xsd:element>
<xsd:element name="node-manager-lock-directory" type="xsd:string" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>

View File

@ -17,10 +17,12 @@
package org.apache.activemq.artemis.core.config.impl;
import java.io.ByteArrayInputStream;
import java.io.PrintStream;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString;
@ -38,6 +40,7 @@ import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.DefaultSensitiveStringCodec;
import org.apache.activemq.artemis.utils.PasswordMaskingUtil;
import org.apache.activemq.artemis.utils.StringPrintStream;
import org.junit.Assert;
import org.junit.Test;
@ -390,6 +393,126 @@ public class FileConfigurationParserTest extends ActiveMQTestBase {
assertEquals(expected, config.getPageSyncTimeout());
}
@Test
public void testMinimalXML() throws Exception {
StringPrintStream stringPrintStream = new StringPrintStream();
PrintStream stream = stringPrintStream.newStream();
stream.println("<configuration><core>");
stream.println("</core></configuration>");
ByteArrayInputStream inputStream = new ByteArrayInputStream(stringPrintStream.getBytes());
FileConfigurationParser parser = new FileConfigurationParser();
Configuration configuration = parser.parseMainConfig(inputStream);
}
@Test
public void testRetentionJournalOptionsDays() throws Exception {
testStreamDatesOption("DAYS", TimeUnit.DAYS);
}
@Test
public void testRetentionJournalOptionsHours() throws Exception {
testStreamDatesOption("HOURS", TimeUnit.HOURS);
}
@Test
public void testRetentionJournalOptionsMinutes() throws Exception {
testStreamDatesOption("MINUTES", TimeUnit.MINUTES);
}
@Test
public void testRetentionJournalOptionsSeconds() throws Exception {
testStreamDatesOption("SECONDS", TimeUnit.SECONDS);
}
private void testStreamDatesOption(String option, TimeUnit expected) throws Exception {
StringPrintStream stringPrintStream = new StringPrintStream();
PrintStream stream = stringPrintStream.newStream();
stream.println("<configuration><core>");
stream.println("<journal-retention-directory unit=\"" + option + "\" period=\"365\" storage-limit=\"10G\">history</journal-retention-directory>");
stream.println("</core></configuration>");
ByteArrayInputStream inputStream = new ByteArrayInputStream(stringPrintStream.getBytes());
FileConfigurationParser parser = new FileConfigurationParser();
Configuration configuration = parser.parseMainConfig(inputStream);
Assert.assertEquals("history", configuration.getJournalRetentionDirectory());
Assert.assertEquals(expected.toMillis(365), configuration.getJournalRetentionPeriod());
}
@Test
public void unlimitedJustHistory() throws Throwable {
StringPrintStream stringPrintStream = new StringPrintStream();
PrintStream stream = stringPrintStream.newStream();
stream.println("<configuration><core>");
stream.println("<journal-retention-directory>directory</journal-retention-directory>");
stream.println("</core></configuration>");
ByteArrayInputStream inputStream = new ByteArrayInputStream(stringPrintStream.getBytes());
FileConfigurationParser parser = new FileConfigurationParser();
Configuration configuration = null;
boolean exceptionHappened = false;
try {
configuration = parser.parseMainConfig(inputStream);
} catch (Exception e) {
exceptionHappened = true;
}
Assert.assertTrue(exceptionHappened);
}
@Test
public void noRetention() throws Throwable {
StringPrintStream stringPrintStream = new StringPrintStream();
PrintStream stream = stringPrintStream.newStream();
stream.println("<configuration><core>");
stream.println("<journal-directory>journal</journal-directory>");
stream.println("</core></configuration>");
ByteArrayInputStream inputStream = new ByteArrayInputStream(stringPrintStream.getBytes());
FileConfigurationParser parser = new FileConfigurationParser();
Configuration configuration = null;
configuration = parser.parseMainConfig(inputStream);
Assert.assertNull(configuration.getJournalRetentionLocation());
Assert.assertNull(configuration.getJournalRetentionDirectory());
Assert.assertEquals("journal", configuration.getJournalDirectory());
}
@Test
public void noFolderOnRetention() throws Throwable {
StringPrintStream stringPrintStream = new StringPrintStream();
PrintStream stream = stringPrintStream.newStream();
stream.println("<configuration><core>");
stream.println("<journal-retention-directory period=\"3\"></journal-retention-directory>");
stream.println("</core></configuration>");
FileConfigurationParser parser = new FileConfigurationParser();
ByteArrayInputStream inputStream = new ByteArrayInputStream(stringPrintStream.getBytes());
boolean exception = false;
try {
Configuration configuration = parser.parseMainConfig(inputStream);
} catch (Exception e) {
exception = true;
}
Assert.assertTrue(exception);
}
private static String firstPart = "<core xmlns=\"urn:activemq:core\">" + "\n" +
"<name>ActiveMQ.main.config</name>" + "\n" +
"<log-delegate-factory-class-name>org.apache.activemq.artemis.integration.logging.Log4jLogDelegateFactory</log-delegate-factory-class-name>" + "\n" +

View File

@ -27,6 +27,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import io.micrometer.core.instrument.MeterRegistry;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
@ -131,6 +132,9 @@ public class FileConfigurationTest extends ConfigurationImplTest {
Assert.assertEquals("max concurrent io", 17, conf.getPageMaxConcurrentIO());
Assert.assertEquals(true, conf.isReadWholePage());
Assert.assertEquals("somedir2", conf.getJournalDirectory());
Assert.assertEquals("history", conf.getJournalRetentionDirectory());
Assert.assertEquals(10L * 1024L * 1024L * 1024L, conf.getJournalRetentionMaxBytes());
Assert.assertEquals(TimeUnit.DAYS.toMillis(365), conf.getJournalRetentionPeriod());
Assert.assertEquals(false, conf.isCreateJournalDir());
Assert.assertEquals(JournalType.NIO, conf.getJournalType());
Assert.assertEquals(10000, conf.getJournalBufferSize_NIO());

View File

@ -0,0 +1,52 @@
/*
* Copyright The Apache Software Foundation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.persistence.impl.journal;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.utils.DataConstants;
import org.junit.Assert;
import org.junit.Test;
public class BufferSplitterTest {
@Test
public void testSplitting() {
ActiveMQBuffer buffer = ActiveMQBuffers.fixedBuffer(1000 * DataConstants.SIZE_INT);
for (int i = 0; i < 1000; i++) {
buffer.writeInt(i);
}
ActiveMQBuffer outputBuffer = ActiveMQBuffers.fixedBuffer(1000 * DataConstants.SIZE_INT);
BufferSplitter.split(buffer, 77, (c) -> {
Assert.assertTrue(c.getEncodeSize() <= 77);
c.encode(outputBuffer);
});
outputBuffer.resetReaderIndex();
buffer.resetReaderIndex();
byte[] sourceBytes = new byte[1000 * DataConstants.SIZE_INT];
buffer.readBytes(sourceBytes);
byte[] targetBytes = new byte[1000 * DataConstants.SIZE_INT];
outputBuffer.readBytes(targetBytes);
Assert.assertArrayEquals(sourceBytes, targetBytes);
}
}

View File

@ -406,6 +406,7 @@
<page-max-concurrent-io>17</page-max-concurrent-io>
<read-whole-page>true</read-whole-page>
<journal-directory>somedir2</journal-directory>
<journal-retention-directory unit="DAYS" period="365" storage-limit="10G">history</journal-retention-directory>
<create-journal-dir>false</create-journal-dir>
<journal-type>NIO</journal-type>
<journal-buffer-timeout>1000</journal-buffer-timeout>

View File

@ -275,6 +275,7 @@
<page-max-concurrent-io>17</page-max-concurrent-io>
<read-whole-page>true</read-whole-page>
<journal-directory>somedir2</journal-directory>
<journal-retention-directory unit="DAYS" period="365" storage-limit="10G">history</journal-retention-directory>
<create-journal-dir>false</create-journal-dir>
<journal-type>NIO</journal-type>
<journal-buffer-timeout>1000</journal-buffer-timeout>

View File

@ -9,6 +9,7 @@ Name | Description
exp | Export the message data using a special and independent XML format
imp | Imports the journal to a running broker using the output from expt
data | Prints a report about journal records and summary of existent records, as well a report on paging
recover | Revive data from the journal. It can be used in conjunction with historic journaling.
encode | shows an internal format of the journal encoded to String
decode | imports the internal journal format from encode

View File

@ -43,6 +43,46 @@ The majority of the journal is written in Java, however we abstract out
the interaction with the actual file system to allow different pluggable
implementations. Apache ActiveMQ Artemis ships with two implementations:
### Journal Retention
If you enable ``journal-retention`` on broker.xml, ActiveMQ Artemis will keep copy of every data that has passed through the broker on this folder.
```xml
...
<journal-retention unit="DAYS" directory="history" period="365" storage-limit="10G"/>
...
```
ActiveMQ Artemis will keep a copy of each generated journal file, up to the configured retention period, at the unit chose. On the example above the system would keep all the journal files up to 365 days.
It is also possible to limit the number of files kept on the retention directory. You can keep a storage-limit, and the system will start removing older files when you have more files than the configured storage limit.
Notice the storage limit is optional however you need to be careful to not run out of disk space at the retention folder or the broker might be shutdown because of a critical IO failure.
You can use the CLI tools to inspect and recover data from the history, by just passing the journal folder being the retention directory.
Example:
```shell
./artemis data print --journal ../data/history
```
To recover the messages from the history:
```shell
./artemis data recovery --journal ../data/history --target ../data/recovered --large-messages ../data/large-messages
```
It is important that you don't call recover into a the journal while the broker is alive. As a matter of fact the current recommendations is to do that on a new journal directory. Perhaps on a new broker so you can inspect and transfer these messages.
The retention feature is in its current form very simple and intended for emergency situations. If you think it is useful new options to recover the data could be added, perhaps thorugh the admin console and other possibilities. Please share your feedback on this area, and as always Pull Requests are welcomed!
Also the recovery CLI tool will recover every data on the selected folder. It is important that you do some maintenance and copy the files and interval you need to a new location before you call recover.
### Java [NIO](https://en.wikipedia.org/wiki/New_I/O)
The first implementation uses standard Java NIO to interface with

View File

@ -23,64 +23,121 @@ import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.io.File;
import java.util.ArrayList;
import java.util.Collection;
import org.apache.activemq.artemis.cli.commands.tools.RecoverMessages;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.nativo.jlibaio.LibaioContext;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.tests.util.JMSTestBase;
import org.apache.activemq.artemis.utils.Wait;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
public class RecoverTest extends JMSTestBase {
boolean useTX;
String protocol;
boolean paging;
boolean large;
String journalType;
public RecoverTest(boolean useTX, String protocol, boolean paging, boolean large, String journalType) {
this.useTX = useTX;
this.protocol = protocol;
this.paging = paging;
this.large = large;
this.journalType = journalType;
}
@Parameterized.Parameters(name = "useTX={0}, protocol={1}, paging={2}, largeMessage={3}, journal-type={4}")
public static Collection<Object[]> data() {
Object[] journalType;
if (LibaioContext.isLoaded()) {
journalType = new Object[]{"AIO", "NIO", "MAPPED"};
} else {
journalType = new Object[]{"NIO", "MAPPED"};
}
return combine(new Object[]{true, false}, new Object[]{"AMQP", "CORE", "OPENWIRE"}, new Object[]{true, false}, new Object[]{true, false}, journalType);
}
protected static Collection<Object[]> combine(Object[] one, Object[] two, Object[] three, Object[] four, Object[] five) {
ArrayList<Object[]> combinations = new ArrayList<>();
for (Object o1 : one) {
for (Object o2 : two) {
for (Object o3 : three) {
for (Object o4 : four) {
for (Object o5 : five) {
combinations.add(new Object[]{o1, o2, o3, o4, o5});
}
}
}
}
}
return combinations;
}
@Override
protected Configuration createDefaultConfig(boolean netty) throws Exception {
Configuration configuration = super.createDefaultConfig(netty).setJMXManagementEnabled(true);
configuration.setJournalRetentionDirectory(getTestDir() + "/historyJournal");
switch (journalType) {
case "NIO":
configuration.setJournalType(JournalType.NIO);
break;
case "MAPPED":
configuration.setJournalType(JournalType.MAPPED);
break;
case "AIO":
configuration.setJournalType(JournalType.ASYNCIO);
break;
}
return configuration;
}
@Override
protected boolean usePersistence() {
return true;
}
@Test
public void testRecoverCoreNoTx() throws Exception {
testRecover(false, "CORE");
}
@Test
public void testRecoverCORETx() throws Exception {
testRecover(true, "CORE");
}
@Test
public void testRecoverAMQPNoTx() throws Exception {
testRecover(false, "AMQP");
}
@Test
public void testRecoverAMQPTx() throws Exception {
testRecover(true, "AMQP");
}
@Test
public void testRecoverOpenWireNoTx() throws Exception {
testRecover(false, "OPENWIRE");
}
@Test
public void testRecoverOpenWireTx() throws Exception {
testRecover(true, "OPENWIRE");
}
public void testRecover(boolean useTX, String protocol) throws Exception {
public void testRecover() throws Exception {
createQueue(true, "TestQueue");
org.apache.activemq.artemis.core.server.Queue serverQueue = server.locateQueue("TestQueue");
if (paging) {
serverQueue.getPagingStore().startPaging();
}
ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616");
Connection connection = factory.createConnection();
Session session = connection.createSession(useTX, useTX ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("TestQueue");
MessageProducer producer = session.createProducer(queue);
for (int i = 0; i < 1000; i++) {
producer.send(session.createTextMessage("test1"));
MessageProducer producer = session.createProducer(queue);
String messageBody;
{
StringBuffer stringBuffer = new StringBuffer();
if (large) {
int i = 0;
while (stringBuffer.length() < 110 * 1024) {
//stringBuffer.append("this is " + (i++));
stringBuffer.append(" ");
}
} else {
stringBuffer.append("hello");
}
messageBody = stringBuffer.toString();
}
int maxMessage = large ? 10 : 1000;
for (int i = 0; i < maxMessage; i++) {
producer.send(session.createTextMessage(i + messageBody));
}
if (useTX) {
@ -94,8 +151,13 @@ public class RecoverTest extends JMSTestBase {
MessageConsumer consumer = session.createConsumer(queue);
connection.start();
for (int i = 0; i < 1000; i++) {
Assert.assertNotNull(consumer.receive(5000));
for (int i = 0; i < maxMessage; i++) {
TextMessage message = (TextMessage) consumer.receive(5000);
Assert.assertNotNull(message);
if (!protocol.equals("OPENWIRE")) {
// openwire won't support large message or its conversions
Assert.assertEquals(i + messageBody, message.getText());
}
}
if (useTX) {
@ -104,11 +166,22 @@ public class RecoverTest extends JMSTestBase {
connection.close();
// need to wait no paging, otherwise an eventual page cleanup would remove large message bodies from the recovery
Wait.assertFalse(serverQueue.getPagingStore()::isPaging);
server.stop();
File newJournalLocation = new File(server.getConfiguration().getJournalLocation().getParentFile(), "recovered");
RecoverMessages.recover(server.getConfiguration(), newJournalLocation, true);
RecoverMessages.recover(server.getConfiguration(), server.getConfiguration().getJournalRetentionDirectory(), newJournalLocation, server.getConfiguration().getLargeMessagesLocation(), false);
if (large) {
File[] largeMessageFiles = server.getConfiguration().getLargeMessagesLocation().listFiles();
Assert.assertEquals(maxMessage, largeMessageFiles.length);
for (File f : largeMessageFiles) {
Assert.assertTrue("File length was " + f.length(), f.length() > 0);
}
}
server.getConfiguration().setJournalDirectory(newJournalLocation.getAbsolutePath());
@ -122,8 +195,13 @@ public class RecoverTest extends JMSTestBase {
consumer = session.createConsumer(queue);
for (int i = 0; i < 1000; i++) {
Assert.assertNotNull(consumer.receive(5000));
for (int i = 0; i < maxMessage; i++) {
TextMessage message = (TextMessage) consumer.receive(5000);
Assert.assertNotNull(message);
if (!protocol.equals("OPENWIRE")) {
// openwire won't support large message or its conversions
Assert.assertEquals(i + messageBody, message.getText());
}
}
Assert.assertNull(consumer.receiveNoWait());

View File

@ -206,6 +206,9 @@ public abstract class FailoverTestBase extends ActiveMQTestBase {
liveConfig.clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(true));
liveServer = createTestableServer(liveConfig);
liveServer.getServer().getConfiguration().setJournalRetentionDirectory(getJournalDir(0, false) + "_retention");
backupServer.getServer().getConfiguration().setJournalRetentionDirectory(getJournalDir(0, true) + "_retention");
}
protected void setupHAPolicyConfiguration() {

View File

@ -37,6 +37,7 @@ import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
import org.apache.activemq.artemis.core.journal.EncoderPersister;
import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.core.journal.impl.AbstractJournalUpdateTask;
@ -44,6 +45,7 @@ import org.apache.activemq.artemis.core.journal.impl.JournalCompactor;
import org.apache.activemq.artemis.core.journal.impl.JournalFile;
import org.apache.activemq.artemis.core.journal.impl.JournalFileImpl;
import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
import org.apache.activemq.artemis.core.journal.impl.dataformat.ByteArrayEncoding;
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
@ -549,6 +551,10 @@ public class NIOJournalCompactTest extends JournalImplTestBase {
}
}
for (int i = 0; i < 10; i++) {
journal.appendAddEvent(idGenerator.generateID(), (byte) 0, EncoderPersister.getInstance(), new ByteArrayEncoding(new byte[10]), false, null);
}
if (pendingTransactions) {
for (long i = 0; i < 100; i++) {
long recordID = idGenerator.generateID();
@ -1200,6 +1206,10 @@ public class NIOJournalCompactTest extends JournalImplTestBase {
}
}
for (int i = 0; i < 10; i++) {
journal.appendAddEvent(idGenerator.generateID(), (byte) 0, EncoderPersister.getInstance(), new ByteArrayEncoding(new byte[10]), false, null);
}
journal.forceMoveNextFile();
instanceLog.debug("Number of Files: " + journal.getDataFilesCount());

View File

@ -652,6 +652,16 @@ public final class ReplicationTest extends ActiveMQTestBase {
}
@Override
public void appendAddEvent(long id,
byte recordType,
Persister persister,
Object record,
boolean sync,
IOCompletion completionCallback) throws Exception {
}
@Override
public boolean isRemoveExtraFilesOnLoad() {
return false;

View File

@ -30,6 +30,8 @@ under the License.
<paging-directory>./data/paging</paging-directory>
<journal-retention-directory storage-limit="20M">./data/retention</journal-retention-directory>
<cluster-user>exampleUser</cluster-user>
<cluster-password>secret</cluster-password>

View File

@ -34,6 +34,8 @@ under the License.
<cluster-password>secret</cluster-password>
<journal-retention-directory storage-limit="20M">./data/retention</journal-retention-directory>
<ha-policy>
<replication>
<slave>

View File

@ -23,6 +23,7 @@ import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import java.io.File;
import java.util.ArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@ -34,6 +35,7 @@ import org.apache.activemq.artemis.util.ServerUtil;
import org.apache.activemq.artemis.utils.ReusableLatch;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -171,6 +173,17 @@ public class ReplicationFlowControlTest extends SmokeTestBase {
consumer.join();
}
}
assertRetentionFolder(getServerLocation(SERVER_NAME_0));
assertRetentionFolder(getServerLocation(SERVER_NAME_1));
}
private void assertRetentionFolder(String serverLocation) {
File retentionFolder = new File(serverLocation + "/data/retention");
System.out.println("retention folder = " + retentionFolder.getAbsolutePath());
File[] files = retentionFolder.listFiles();
// it should be max = 2, however I'm giving some extra due to async factors..
Assert.assertTrue(retentionFolder.getAbsolutePath() + " has " + (files == null ? "no files" : files.length + " elements"), files != null && files.length <= 10);
}
void startConsumers(boolean useAMQP) {

View File

@ -26,6 +26,11 @@ public class FakeJournalImplTest extends JournalImplTestUnit {
return new FakeSequentialFileFactory();
}
@Override
protected boolean suportsRetention() {
return false;
}
@Override
protected int getAlignment() {
return 1;

View File

@ -0,0 +1,312 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.unit.core.journal.impl;
import java.io.File;
import java.io.FileOutputStream;
import java.io.FilenameFilter;
import java.util.Calendar;
import java.util.GregorianCalendar;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
import org.apache.activemq.artemis.core.journal.impl.JournalFile;
import org.apache.activemq.artemis.core.journal.impl.JournalFilesRepository;
import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
import org.apache.activemq.artemis.tests.unit.core.journal.impl.fakes.FakeSequentialFileFactory;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.Assert;
import org.junit.Test;
public class JournaHistorylBackupTest extends ActiveMQTestBase {
@Test
public void testDoubleReplacement() throws Throwable {
File history = new File(getTestDirfile(), "history");
history.mkdirs();
File journalFolder = new File(getTestDirfile(), "journal");
journalFolder.mkdirs();
NIOSequentialFileFactory nioSequentialFileFactory = new NIOSequentialFileFactory(journalFolder, 1);
JournalImpl journal = new JournalImpl(10 * 1024, 10, 10, 0, 100, nioSequentialFileFactory, "test", "journal", 1);
journal.setHistoryFolder(history, -1, -1);
journal.start();
journal.loadInternalOnly();
SequentialFile file = nioSequentialFileFactory.createSequentialFile("test-4.journal");
file.open();
JournalFile journalFile = journal.readFileHeader(file);
file.close();
journalFile.getFile().renameTo(journalFile.getFile().getFileName() + ".bkp");
journal.stop();
Calendar oldCalendar = new GregorianCalendar();
oldCalendar.setTimeInMillis(System.currentTimeMillis() - TimeUnit.HOURS.toMillis(1));
String toBeReplacedFileName = journal.getHistoryFileName(journalFile.getFileID(), oldCalendar);
File historyFile = new File(history, toBeReplacedFileName);
FileOutputStream outputStream = new FileOutputStream(historyFile);
outputStream.write(0);
outputStream.close();
nioSequentialFileFactory = new NIOSequentialFileFactory(journalFolder, 1);
journal = new JournalImpl(10 * 1024, 10, 10, 0, 100, nioSequentialFileFactory, "test", "journal", 1);
journal.setHistoryFolder(history, -1, -1);
journal.start();
journal.loadInternalOnly();
File[] fileList = history.listFiles((a, name) -> name.endsWith(".journal"));
Assert.assertEquals(1, fileList.length);
}
@Test
public void verifyFileName() throws Throwable {
GregorianCalendar clebertsBirthday = new GregorianCalendar(1972, 1, 19, 4, 5, 7);
JournalImpl journal = new JournalImpl(10 * 1024, 10, 10, 0, 100, new FakeSequentialFileFactory(), "cleberts", "birthday", 1);
String fileNameGenerated = journal.getHistoryFileName(1, clebertsBirthday);
// I was actually born at 4:30 :) but I need all numbers lower than 2 digits on the test
Assert.assertEquals("cleberts-19720119040507-1.birthday", fileNameGenerated);
Assert.assertEquals("19720119040507", journal.getDatePortion(fileNameGenerated));
long d = journal.getDatePortionMillis(fileNameGenerated);
GregorianCalendar compareCalendar = new GregorianCalendar();
compareCalendar.setTimeInMillis(d);
Assert.assertEquals(1972, compareCalendar.get(Calendar.YEAR));
Assert.assertEquals(1, compareCalendar.get(Calendar.MONTH));
Assert.assertEquals(19, compareCalendar.get(Calendar.DAY_OF_MONTH));
Assert.assertEquals(4, compareCalendar.get(Calendar.HOUR_OF_DAY));
Assert.assertEquals(5, compareCalendar.get(Calendar.MINUTE));
Assert.assertEquals(7, compareCalendar.get(Calendar.SECOND));
Assert.assertFalse(d < clebertsBirthday.getTimeInMillis());
compareCalendar.set(Calendar.YEAR, 1971);
Assert.assertTrue(compareCalendar.getTimeInMillis() < clebertsBirthday.getTimeInMillis());
}
@Test
public void removeBKPExtension() throws Throwable {
JournalImpl journal = new JournalImpl(10 * 1024, 10, 10, 0, 100, new FakeSequentialFileFactory(), "jrn", "data", 1);
String withoutBkp = "jrn-1.data";
String withBKP = withoutBkp + ".bkp";
// I was actually born at 4:30 :) but I need all numbers lower than 2 digits on the test
Assert.assertEquals(withoutBkp, journal.removeBackupExtension(withBKP));
Assert.assertEquals(withoutBkp, journal.removeBackupExtension(withoutBkp)); // it should be possible to do it
String withoutBKP = "jrn-1.data";
}
@Test
public void testFileID() throws Throwable {
JournalImpl journal = new JournalImpl(10 * 1024, 10, 10, 0, 100, new FakeSequentialFileFactory(), "jrn", "data", 1);
GregorianCalendar calendar = new GregorianCalendar();
calendar.setTimeInMillis(System.currentTimeMillis());
String fileName = journal.getHistoryFileName(3, calendar);
long id = JournalFilesRepository.getFileNameID("jrn", fileName);
Assert.assertEquals(3, id);
}
@Test
public void testRemoveOldFiles() throws Exception {
GregorianCalendar todayCalendar = new GregorianCalendar();
todayCalendar.setTimeInMillis(System.currentTimeMillis());
File tempFolder = new File(getTestDirfile(), "history");
tempFolder.mkdirs();
JournalImpl journal = new JournalImpl(10 * 1024, 10, 10, 0, 100, new FakeSequentialFileFactory(), "jrn", "data", 1);
journal.setHistoryFolder(tempFolder, -1, TimeUnit.HOURS.toMillis(24));
Calendar dayOlderCalendar = new GregorianCalendar();
dayOlderCalendar.setTimeInMillis(System.currentTimeMillis() - TimeUnit.HOURS.toMillis(25));
for (int i = 0; i < 100; i++) {
String fileName = journal.getHistoryFileName(i, dayOlderCalendar);
File file = new File(tempFolder, fileName);
FileOutputStream outputStream = new FileOutputStream(file);
outputStream.write(0);
outputStream.close();
}
for (int i = 0; i < 100; i++) {
String fileName = journal.getHistoryFileName(i, todayCalendar);
File file = new File(tempFolder, fileName);
FileOutputStream outputStream = new FileOutputStream(file);
outputStream.write(0);
outputStream.close();
}
journal.processBackupCleanup();
FilenameFilter fnf = new FilenameFilter() {
@Override
public boolean accept(final File file, final String name) {
return name.endsWith(".data");
}
};
File[] files = tempFolder.listFiles(fnf);
Assert.assertEquals(100, files.length);
HashSet<String> hashSet = new HashSet<>();
for (File file : files) {
hashSet.add(file.getName());
}
for (int i = 0; i < 100; i++) {
Assert.assertTrue(hashSet.contains(journal.getHistoryFileName(i, todayCalendar)));
}
}
@Test
public void testKeepOldFiles() throws Exception {
GregorianCalendar todayCalendar = new GregorianCalendar();
todayCalendar.setTimeInMillis(System.currentTimeMillis());
File tempFolder = new File(getTestDirfile(), "history");
tempFolder.mkdirs();
JournalImpl journal = new JournalImpl(10 * 1024, 10, 10, 0, 100, new FakeSequentialFileFactory(), "jrn", "data", 1);
journal.setHistoryFolder(tempFolder, -1, TimeUnit.HOURS.toMillis(24));
Calendar oldCalendar = new GregorianCalendar();
oldCalendar.setTimeInMillis(System.currentTimeMillis() - TimeUnit.HOURS.toMillis(1));
for (int i = 0; i < 100; i++) {
String fileName = journal.getHistoryFileName(i, oldCalendar);
File file = new File(tempFolder, fileName);
FileOutputStream outputStream = new FileOutputStream(file);
outputStream.write(0);
outputStream.close();
}
for (int i = 0; i < 100; i++) {
String fileName = journal.getHistoryFileName(i, todayCalendar);
File file = new File(tempFolder, fileName);
FileOutputStream outputStream = new FileOutputStream(file);
outputStream.write(0);
outputStream.close();
}
journal.processBackupCleanup();
FilenameFilter fnf = new FilenameFilter() {
@Override
public boolean accept(final File file, final String name) {
return name.endsWith(".data");
}
};
File[] files = tempFolder.listFiles(fnf);
Assert.assertEquals(200, files.length);
HashSet<String> hashSet = new HashSet<>();
for (File file : files) {
hashSet.add(file.getName());
}
for (int i = 0; i < 100; i++) {
Assert.assertTrue(hashSet.contains(journal.getHistoryFileName(i, todayCalendar)));
}
for (int i = 0; i < 100; i++) {
Assert.assertTrue(hashSet.contains(journal.getHistoryFileName(i, oldCalendar)));
}
}
@Test
public void testMaxFiles() throws Exception {
GregorianCalendar todayCalendar = new GregorianCalendar();
todayCalendar.setTimeInMillis(System.currentTimeMillis());
File tempFolder = new File(getTestDirfile(), "history");
tempFolder.mkdirs();
JournalImpl journal = new JournalImpl(10 * 1024, 10, 10, 0, 100, new FakeSequentialFileFactory(), "jrn", "data", 1);
journal.setHistoryFolder(tempFolder, 10 * journal.getFileSize(), TimeUnit.HOURS.toMillis(24));
Calendar oldCalendar = new GregorianCalendar();
oldCalendar.setTimeInMillis(System.currentTimeMillis() - TimeUnit.HOURS.toMillis(1));
for (int i = 0; i < 100; i++) {
String fileName = journal.getHistoryFileName(i, oldCalendar);
File file = new File(tempFolder, fileName);
FileOutputStream outputStream = new FileOutputStream(file);
outputStream.write(0);
outputStream.close();
}
for (int i = 0; i < 100; i++) {
String fileName = journal.getHistoryFileName(i, todayCalendar);
File file = new File(tempFolder, fileName);
FileOutputStream outputStream = new FileOutputStream(file);
outputStream.write(0);
outputStream.close();
}
journal.processBackupCleanup();
FilenameFilter fnf = new FilenameFilter() {
@Override
public boolean accept(final File file, final String name) {
return name.endsWith(".data");
}
};
File[] files = tempFolder.listFiles(fnf);
Assert.assertEquals(10, files.length);
HashSet<String> hashSet = new HashSet<>();
for (File file : files) {
hashSet.add(file.getName());
}
for (int i = 90; i < 100; i++) {
Assert.assertTrue(hashSet.contains(journal.getHistoryFileName(i, todayCalendar)));
}
}
}

View File

@ -159,6 +159,10 @@ public abstract class JournalImplTestBase extends ActiveMQTestBase {
maxAIO = 50;
}
protected boolean suportsRetention() {
return true;
}
public void createJournal() throws Exception {
journal = new JournalImpl(fileSize, minFiles, poolSize, 0, 0, fileFactory, filePrefix, fileExtension, maxAIO) {
@Override
@ -172,6 +176,13 @@ public abstract class JournalImplTestBase extends ActiveMQTestBase {
}
};
if (suportsRetention()) {
// FakeSequentialFile won't support retention
File fileBackup = new File(getTestDir(), "backupFoler");
fileBackup.mkdirs();
((JournalImpl) journal).setHistoryFolder(fileBackup, -1, -1);
}
journal.setAutoReclaim(false);
addActiveMQComponent(journal);
}

View File

@ -557,6 +557,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
Assert.assertEquals(0, journal.getFreeFilesCount());
Assert.assertEquals(91, journal.getIDMapSize());
journal.processBackup();
List<String> files2 = fileFactory.listFiles(fileExtension);
// The Journal will aways have a file ready to be opened
@ -601,6 +602,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
Assert.assertEquals(0, journal.getFreeFilesCount());
Assert.assertEquals(200, journal.getIDMapSize());
journal.processBackup();
List<String> files4 = fileFactory.listFiles(fileExtension);
Assert.assertEquals(numberOfFiles + 2, files4.size());
@ -746,6 +748,8 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
Assert.assertEquals(0, journal.getFreeFilesCount());
Assert.assertEquals(initialNumberOfAddRecords, journal.getIDMapSize());
journal.processBackup();
List<String> files4 = fileFactory.listFiles(fileExtension);
Assert.assertEquals(11, files4.size());
@ -827,6 +831,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
update(1);
delete(1);
journal.processBackup();
List<String> files1 = fileFactory.listFiles(fileExtension);
Assert.assertEquals(4, files1.size());
@ -866,6 +871,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
update(1);
add(2);
journal.processBackup();
List<String> files1 = fileFactory.listFiles(fileExtension);
Assert.assertEquals(4, files1.size());
@ -920,6 +926,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
addTx(1, i);
}
journal.processBackup();
Assert.assertEquals(calculateNumberOfFiles(journal, fileSize, journal.getAlignment(), 100, recordLength), journal.getDataFilesCount());
Assert.assertEquals(0, journal.getFreeFilesCount());
Assert.assertEquals(0, journal.getIDMapSize());
@ -941,6 +948,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
Assert.assertEquals(0, journal.getFreeFilesCount());
Assert.assertEquals(0, journal.getIDMapSize());
journal.processBackup();
List<String> files3 = fileFactory.listFiles(fileExtension);
Assert.assertEquals(calculateNumberOfFiles(journal, fileSize, journal.getAlignment(), 100, recordLength) + 2, files3.size());
@ -960,6 +968,8 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
Assert.assertEquals(0, journal.getFreeFilesCount());
Assert.assertEquals(0, journal.getIDMapSize());
journal.processBackup();
List<String> files4 = fileFactory.listFiles(fileExtension);
Assert.assertEquals(calculateNumberOfFiles(journal, fileSize, journal.getAlignment(), 200, recordLength) + 2, files4.size());
@ -976,6 +986,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
Assert.assertEquals(0, journal.getFreeFilesCount());
Assert.assertEquals(0, journal.getIDMapSize());
journal.processBackup();
List<String> files5 = fileFactory.listFiles(fileExtension);
Assert.assertEquals(24, files5.size());
@ -994,6 +1005,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
Assert.assertEquals(0, journal.getFreeFilesCount());
Assert.assertEquals(0, journal.getIDMapSize());
journal.processBackup();
List<String> files7 = fileFactory.listFiles(fileExtension);
Assert.assertEquals(1, journal.getOpenedFilesCount());
@ -1007,6 +1019,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
Assert.assertEquals(0, journal.getFreeFilesCount());
Assert.assertEquals(0, journal.getIDMapSize());
journal.processBackup();
List<String> files8 = fileFactory.listFiles(fileExtension);
Assert.assertEquals(1, journal.getOpenedFilesCount());
@ -1032,6 +1045,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
Assert.assertEquals(0, journal.getFreeFilesCount());
Assert.assertEquals(10, journal.getIDMapSize());
journal.processBackup();
List<String> files9 = fileFactory.listFiles(fileExtension);
Assert.assertEquals(1, journal.getOpenedFilesCount());
@ -1053,6 +1067,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
createJournal();
startJournal();
load();
journal.processBackup();
List<String> files1 = fileFactory.listFiles(fileExtension);
Assert.assertEquals(2, files1.size());
@ -1070,6 +1085,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
log.debug("journal tmp :" + journal.debug());
journal.processBackup();
List<String> files2 = fileFactory.listFiles(fileExtension);
Assert.assertEquals(3, files2.size());
@ -1087,6 +1103,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
log.debug("journal tmp2 :" + journal.debug());
journal.processBackup();
List<String> files3 = fileFactory.listFiles(fileExtension);
Assert.assertEquals(4, files3.size());
@ -1101,6 +1118,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
commit(1); // in file 3
journal.processBackup();
List<String> files4 = fileFactory.listFiles(fileExtension);
Assert.assertEquals(5, files4.size());
@ -1114,6 +1132,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
addWithSize(recordLength - JournalImpl.SIZE_ADD_RECORD - 1, 3); // in file 4
journal.processBackup();
List<String> files5 = fileFactory.listFiles(fileExtension);
Assert.assertEquals(6, files5.size());
@ -1125,6 +1144,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
checkAndReclaimFiles();
journal.processBackup();
List<String> files6 = fileFactory.listFiles(fileExtension);
// Three should get deleted (files 0, 1, 3)
@ -1162,6 +1182,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
startJournal();
load();
journal.processBackup();
List<String> files1 = fileFactory.listFiles(fileExtension);
Assert.assertEquals(10, files1.size());
@ -1200,6 +1221,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
startJournal();
load();
journal.processBackup();
List<String> files1 = fileFactory.listFiles(fileExtension);
Assert.assertEquals(10, files1.size());
@ -1231,6 +1253,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
startJournal();
load();
journal.processBackup();
List<String> files1 = fileFactory.listFiles(fileExtension);
Assert.assertEquals(10, files1.size());
@ -1269,6 +1292,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
startJournal();
load();
journal.processBackup();
List<String> files1 = fileFactory.listFiles(fileExtension);
Assert.assertEquals(10, files1.size());
@ -1307,6 +1331,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
startJournal();
load();
journal.processBackup();
List<String> files1 = fileFactory.listFiles(fileExtension);
Assert.assertEquals(10, files1.size());
@ -1338,6 +1363,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
startJournal();
load();
journal.processBackup();
List<String> files1 = fileFactory.listFiles(fileExtension);
Assert.assertEquals(10, files1.size());
@ -1376,6 +1402,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
startJournal();
load();
journal.processBackup();
List<String> files1 = fileFactory.listFiles(fileExtension);
Assert.assertEquals(10, files1.size());
@ -1408,6 +1435,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
startJournal();
load();
journal.processBackup();
List<String> files1 = fileFactory.listFiles(fileExtension);
Assert.assertEquals(2, files1.size());
@ -1419,6 +1447,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
addTx(1, 1);
journal.processBackup();
List<String> files2 = fileFactory.listFiles(fileExtension);
Assert.assertEquals(2, files2.size());
@ -1432,6 +1461,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
commit(1);
journal.processBackup();
List<String> files3 = fileFactory.listFiles(fileExtension);
Assert.assertEquals(3, files3.size());
@ -1445,6 +1475,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
// Move on to another file
journal.processBackup();
List<String> files4 = fileFactory.listFiles(fileExtension);
Assert.assertEquals(4, files4.size());
@ -1458,6 +1489,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
// Nothing should be reclaimed
journal.processBackup();
List<String> files5 = fileFactory.listFiles(fileExtension);
Assert.assertEquals(4, files5.size());
@ -1479,6 +1511,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
startJournal();
load();
journal.processBackup();
List<String> files1 = fileFactory.listFiles(fileExtension);
Assert.assertEquals(2, files1.size());
@ -1494,6 +1527,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
addWithSize(recordLength - JournalImpl.SIZE_ADD_RECORD - 1, 2); // in file 1
journal.processBackup();
List<String> files2 = fileFactory.listFiles(fileExtension);
Assert.assertEquals(3, files2.size());
@ -1505,6 +1539,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
commit(1); // in file 1
journal.processBackup();
List<String> files3 = fileFactory.listFiles(fileExtension);
Assert.assertEquals(calculateNumberOfFiles(journal, fileSize, journal.getAlignment(), 2, recordLength, 1, JournalImpl.SIZE_COMMIT_RECORD + 1) + 2, files3.size());
@ -1516,6 +1551,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
delete(2); // in file 1
journal.processBackup();
List<String> files4 = fileFactory.listFiles(fileExtension);
Assert.assertEquals(calculateNumberOfFiles(journal, fileSize, journal.getAlignment(), 2, recordLength, 1, JournalImpl.SIZE_COMMIT_RECORD + 1, 1, JournalImpl.SIZE_DELETE_RECORD + 1) + 2, files4.size());
@ -1529,6 +1565,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
addWithSize(recordLength - JournalImpl.SIZE_ADD_RECORD, 3); // in file 2
journal.processBackup();
List<String> files5 = fileFactory.listFiles(fileExtension);
Assert.assertEquals(0, journal.getFreeFilesCount());
@ -1537,6 +1574,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
checkAndReclaimFiles();
journal.processBackup();
List<String> files6 = fileFactory.listFiles(fileExtension);
Assert.assertEquals(0, journal.getFreeFilesCount());
@ -1562,6 +1600,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
startJournal();
load();
journal.processBackup();
List<String> files1 = fileFactory.listFiles(fileExtension);
Assert.assertEquals(2, files1.size());
@ -1577,6 +1616,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
addWithSize(recordLength - JournalImpl.SIZE_ADD_RECORD - 1, 2); // in file 1
journal.processBackup();
List<String> files2 = fileFactory.listFiles(fileExtension);
Assert.assertEquals(3, files2.size());
@ -1597,6 +1637,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
delete(2); // in file 1
journal.processBackup();
List<String> files4 = fileFactory.listFiles(fileExtension);
Assert.assertEquals(calculateNumberOfFiles(journal, fileSize, journal.getAlignment(), 2, recordLength, 1, JournalImpl.SIZE_ROLLBACK_RECORD + 1, 1, JournalImpl.SIZE_DELETE_RECORD + 1) + 2, files4.size());
@ -1618,6 +1659,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
checkAndReclaimFiles();
journal.processBackup();
List<String> files6 = fileFactory.listFiles(fileExtension);
// files 0 and 1 should be deleted
@ -1636,6 +1678,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
startJournal();
loadAndCheck();
journal.processBackup();
List<String> files7 = fileFactory.listFiles(fileExtension);
Assert.assertEquals(journal.getAlignment() == 1 ? 2 : 3, files7.size());
@ -1687,6 +1730,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
startJournal();
load();
journal.processBackup();
List<String> files1 = fileFactory.listFiles(fileExtension);
Assert.assertEquals(2, files1.size());
@ -1702,6 +1746,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
addWithSize(1024 - JournalImpl.SIZE_ADD_RECORD, 2); // in file 1
journal.processBackup();
List<String> files2 = fileFactory.listFiles(fileExtension);
Assert.assertEquals(3, files2.size());
@ -1736,6 +1781,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
checkAndReclaimFiles();
journal.processBackup();
List<String> files6 = fileFactory.listFiles(fileExtension);
Assert.assertEquals(4, files6.size());
@ -1747,6 +1793,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
addWithSize(recordLength - JournalImpl.SIZE_ADD_RECORD - 1, 4); // in file 3
journal.processBackup();
List<String> files7 = fileFactory.listFiles(fileExtension);
Assert.assertEquals(5, files7.size());
@ -1758,6 +1805,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
commit(1); // in file 4
journal.processBackup();
List<String> files8 = fileFactory.listFiles(fileExtension);
Assert.assertEquals(5, files8.size());
@ -1783,6 +1831,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
startJournal();
load();
journal.processBackup();
List<String> files1 = fileFactory.listFiles(fileExtension);
Assert.assertEquals(2, files1.size());
@ -1794,6 +1843,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
addTx(1, 1); // in file 0
journal.processBackup();
files1 = fileFactory.listFiles(fileExtension);
Assert.assertEquals(2, files1.size());
@ -1811,6 +1861,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
addWithSize(recordLength - JournalImpl.SIZE_ADD_RECORD, 2); // in file 1
journal.processBackup();
List<String> files2 = fileFactory.listFiles(fileExtension);
Assert.assertEquals(3, files2.size());
@ -1822,6 +1873,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
EncodingSupport xid = new SimpleEncoding(10, (byte) 0);
prepare(1, xid); // in file 1
journal.processBackup();
List<String> files3 = fileFactory.listFiles(fileExtension);
Assert.assertEquals(3, files3.size());
@ -1833,6 +1885,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
delete(2); // in file 1
journal.processBackup();
List<String> files4 = fileFactory.listFiles(fileExtension);
Assert.assertEquals(3, files4.size());
@ -1851,6 +1904,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
checkAndReclaimFiles();
journal.processBackup();
List<String> files5 = fileFactory.listFiles(fileExtension);
Assert.assertEquals(4, files5.size());
@ -1862,6 +1916,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
checkAndReclaimFiles();
journal.processBackup();
List<String> files6 = fileFactory.listFiles(fileExtension);
Assert.assertEquals(4, files6.size());
@ -1875,6 +1930,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
addWithSize(1024 - JournalImpl.SIZE_ADD_RECORD, 4); // in file 3
journal.processBackup();
List<String> files7 = fileFactory.listFiles(fileExtension);
Assert.assertEquals(5, files7.size());
@ -1886,6 +1942,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
commit(1); // in file 3
journal.processBackup();
List<String> files8 = fileFactory.listFiles(fileExtension);
Assert.assertEquals(5, files8.size());
@ -1897,6 +1954,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
delete(1); // in file 3
journal.processBackup();
List<String> files9 = fileFactory.listFiles(fileExtension);
Assert.assertEquals(5, files9.size());
@ -1908,6 +1966,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
checkAndReclaimFiles();
journal.processBackup();
List<String> files10 = fileFactory.listFiles(fileExtension);
Assert.assertEquals(journal.getAlignment() == 1 ? 5 : 5, files10.size());
@ -1920,6 +1979,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
addWithSize(1024 - JournalImpl.SIZE_ADD_RECORD, 5); // in file 4
journal.processBackup();
List<String> files11 = fileFactory.listFiles(fileExtension);
Assert.assertEquals(6, files11.size());
@ -1931,6 +1991,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
checkAndReclaimFiles();
journal.processBackup();
List<String> files12 = fileFactory.listFiles(fileExtension);
// File 0, and File 1 should be deleted
@ -1968,6 +2029,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
// file 3 should now be deleted
journal.processBackup();
List<String> files15 = fileFactory.listFiles(fileExtension);
Assert.assertEquals(4, files15.size());
@ -3092,6 +3154,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
// log.debug(journal.debug());
// log.debug("*****************************************");
journal.processBackup();
stopJournal();
createJournal();
startJournal();