[ARTEMIS-2939]: Artemis should not delete corrupt log files.
* Moving corrupted journal files to the attic folder. Jira: https://issues.apache.org/jira/browse/ARTEMIS-2939
This commit is contained in:
parent
9a954188d8
commit
fdfc58171b
|
@ -291,6 +291,9 @@ public final class ActiveMQDefaultConfiguration {
|
||||||
// The minimal number of data files before we can start compacting
|
// The minimal number of data files before we can start compacting
|
||||||
private static int DEFAULT_JOURNAL_COMPACT_MIN_FILES = 10;
|
private static int DEFAULT_JOURNAL_COMPACT_MIN_FILES = 10;
|
||||||
|
|
||||||
|
// The maximal number of data files before we can start deleting corrupted files instead of moving them to attic.
|
||||||
|
private static int DEFAULT_JOURNAL_MAX_ATTIC_FILES = 10;
|
||||||
|
|
||||||
// Interval to log server specific information (e.g. memory usage etc)
|
// Interval to log server specific information (e.g. memory usage etc)
|
||||||
private static long DEFAULT_SERVER_DUMP_INTERVAL = -1;
|
private static long DEFAULT_SERVER_DUMP_INTERVAL = -1;
|
||||||
|
|
||||||
|
@ -998,6 +1001,13 @@ public final class ActiveMQDefaultConfiguration {
|
||||||
return DEFAULT_JOURNAL_COMPACT_MIN_FILES;
|
return DEFAULT_JOURNAL_COMPACT_MIN_FILES;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* how many journal files to be stored in the attic.
|
||||||
|
*/
|
||||||
|
public static int getDefaultJournalMaxAtticFiles() {
|
||||||
|
return DEFAULT_JOURNAL_MAX_ATTIC_FILES;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Interval to log server specific information (e.g. memory usage etc)
|
* Interval to log server specific information (e.g. memory usage etc)
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -16,6 +16,10 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.artemis.core.journal.impl;
|
package org.apache.activemq.artemis.core.journal.impl;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.file.Files;
|
||||||
|
import java.nio.file.Path;
|
||||||
|
import java.nio.file.StandardCopyOption;
|
||||||
import java.security.AccessController;
|
import java.security.AccessController;
|
||||||
import java.security.PrivilegedActionException;
|
import java.security.PrivilegedActionException;
|
||||||
import java.security.PrivilegedExceptionAction;
|
import java.security.PrivilegedExceptionAction;
|
||||||
|
@ -30,6 +34,7 @@ import java.util.concurrent.LinkedBlockingQueue;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException;
|
import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException;
|
||||||
import org.apache.activemq.artemis.core.io.SequentialFile;
|
import org.apache.activemq.artemis.core.io.SequentialFile;
|
||||||
|
@ -83,6 +88,8 @@ public class JournalFilesRepository {
|
||||||
|
|
||||||
private final int journalFileOpenTimeout;
|
private final int journalFileOpenTimeout;
|
||||||
|
|
||||||
|
private final int maxAtticFiles;
|
||||||
|
|
||||||
private Executor openFilesExecutor;
|
private Executor openFilesExecutor;
|
||||||
|
|
||||||
private final Runnable pushOpenRunnable = new Runnable() {
|
private final Runnable pushOpenRunnable = new Runnable() {
|
||||||
|
@ -109,7 +116,8 @@ public class JournalFilesRepository {
|
||||||
final int fileSize,
|
final int fileSize,
|
||||||
final int minFiles,
|
final int minFiles,
|
||||||
final int poolSize,
|
final int poolSize,
|
||||||
final int journalFileOpenTimeout) {
|
final int journalFileOpenTimeout,
|
||||||
|
final int maxAtticFiles) {
|
||||||
if (filePrefix == null) {
|
if (filePrefix == null) {
|
||||||
throw new IllegalArgumentException("filePrefix cannot be null");
|
throw new IllegalArgumentException("filePrefix cannot be null");
|
||||||
}
|
}
|
||||||
|
@ -129,6 +137,7 @@ public class JournalFilesRepository {
|
||||||
this.userVersion = userVersion;
|
this.userVersion = userVersion;
|
||||||
this.journal = journal;
|
this.journal = journal;
|
||||||
this.journalFileOpenTimeout = journalFileOpenTimeout;
|
this.journalFileOpenTimeout = journalFileOpenTimeout;
|
||||||
|
this.maxAtticFiles = maxAtticFiles;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Public --------------------------------------------------------
|
// Public --------------------------------------------------------
|
||||||
|
@ -365,8 +374,7 @@ public class JournalFilesRepository {
|
||||||
throw new IllegalStateException(e.getMessage() + " file: " + file);
|
throw new IllegalStateException(e.getMessage() + " file: " + file);
|
||||||
}
|
}
|
||||||
if (calculatedSize != fileSize) {
|
if (calculatedSize != fileSize) {
|
||||||
ActiveMQJournalLogger.LOGGER.deletingFile(file);
|
damagedFile(file);
|
||||||
file.getFile().delete();
|
|
||||||
} else if (!checkDelete || (freeFilesCount.get() + dataFiles.size() + 1 + openedFiles.size() < poolSize) || (poolSize < 0)) {
|
} else if (!checkDelete || (freeFilesCount.get() + dataFiles.size() + 1 + openedFiles.size() < poolSize) || (poolSize < 0)) {
|
||||||
// Re-initialise it
|
// Re-initialise it
|
||||||
|
|
||||||
|
@ -400,6 +408,30 @@ public class JournalFilesRepository {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void damagedFile(JournalFile file) throws Exception {
|
||||||
|
if (file.getFile().isOpen()) {
|
||||||
|
file.getFile().close(false);
|
||||||
|
}
|
||||||
|
if (file.getFile().exists()) {
|
||||||
|
final Path journalPath = file.getFile().getJavaFile().toPath();
|
||||||
|
final Path atticPath = journalPath.getParent().resolve("attic");
|
||||||
|
Files.createDirectories(atticPath);
|
||||||
|
if (listFiles(atticPath) < maxAtticFiles) {
|
||||||
|
ActiveMQJournalLogger.LOGGER.movingFileToAttic(file.getFile().getFileName());
|
||||||
|
Files.move(journalPath, atticPath.resolve(journalPath.getFileName()), StandardCopyOption.REPLACE_EXISTING);
|
||||||
|
} else {
|
||||||
|
ActiveMQJournalLogger.LOGGER.deletingFile(file);
|
||||||
|
Files.delete(journalPath);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private int listFiles(Path path) throws IOException {
|
||||||
|
try (Stream<Path> files = Files.list(path)) {
|
||||||
|
return files.mapToInt(e -> 1).sum();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public Collection<JournalFile> getFreeFiles() {
|
public Collection<JournalFile> getFreeFiles() {
|
||||||
return freeFiles;
|
return freeFiles;
|
||||||
}
|
}
|
||||||
|
|
|
@ -294,7 +294,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
|
||||||
final String fileExtension,
|
final String fileExtension,
|
||||||
final int maxAIO,
|
final int maxAIO,
|
||||||
final int userVersion) {
|
final int userVersion) {
|
||||||
this(ioExecutors, fileSize, minFiles, poolSize, compactMinFiles, compactPercentage, journalFileOpenTimeout, fileFactory, filePrefix, fileExtension, maxAIO, userVersion, null);
|
this(ioExecutors, fileSize, minFiles, poolSize, compactMinFiles, compactPercentage, journalFileOpenTimeout, fileFactory, filePrefix, fileExtension, maxAIO, userVersion, null, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -310,7 +310,8 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
|
||||||
final String fileExtension,
|
final String fileExtension,
|
||||||
final int maxAIO,
|
final int maxAIO,
|
||||||
final int userVersion,
|
final int userVersion,
|
||||||
IOCriticalErrorListener criticalErrorListener) {
|
IOCriticalErrorListener criticalErrorListener,
|
||||||
|
final int maxAtticFiles) {
|
||||||
|
|
||||||
super(fileFactory.isSupportsCallbacks(), fileSize);
|
super(fileFactory.isSupportsCallbacks(), fileSize);
|
||||||
|
|
||||||
|
@ -340,7 +341,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
|
||||||
|
|
||||||
this.fileFactory = fileFactory;
|
this.fileFactory = fileFactory;
|
||||||
|
|
||||||
filesRepository = new JournalFilesRepository(fileFactory, this, filePrefix, fileExtension, userVersion, maxAIO, fileSize, minFiles, poolSize, journalFileOpenTimeout);
|
filesRepository = new JournalFilesRepository(fileFactory, this, filePrefix, fileExtension, userVersion, maxAIO, fileSize, minFiles, poolSize, journalFileOpenTimeout, maxAtticFiles);
|
||||||
|
|
||||||
this.userVersion = userVersion;
|
this.userVersion = userVersion;
|
||||||
}
|
}
|
||||||
|
|
|
@ -128,7 +128,7 @@ public interface ActiveMQJournalLogger extends BasicLogger {
|
||||||
void couldNotRemoveFile(JournalFile file);
|
void couldNotRemoveFile(JournalFile file);
|
||||||
|
|
||||||
@LogMessage(level = Logger.Level.WARN)
|
@LogMessage(level = Logger.Level.WARN)
|
||||||
@Message(id = 142009, value = "Deleting {0} as it does not have the configured size",
|
@Message(id = 142009, value = "*******************************************************************************************************************************\nThe File Storage Attic is full, as the file {0} does not have the configured size, and the file will be removed\n*******************************************************************************************************************************",
|
||||||
format = Message.Format.MESSAGE_FORMAT)
|
format = Message.Format.MESSAGE_FORMAT)
|
||||||
void deletingFile(JournalFile file);
|
void deletingFile(JournalFile file);
|
||||||
|
|
||||||
|
@ -277,4 +277,7 @@ public interface ActiveMQJournalLogger extends BasicLogger {
|
||||||
@Message(id = 144007, value = "Ignoring journal file {0}: file is shorter then minimum header size. This file is being removed.", format = Message.Format.MESSAGE_FORMAT)
|
@Message(id = 144007, value = "Ignoring journal file {0}: file is shorter then minimum header size. This file is being removed.", format = Message.Format.MESSAGE_FORMAT)
|
||||||
void ignoringShortFile(String fileName);
|
void ignoringShortFile(String fileName);
|
||||||
|
|
||||||
|
@LogMessage(level = Logger.Level.WARN)
|
||||||
|
@Message(id = 144008, value = "*******************************************************************************************************************************\nFile {0}: was moved under attic, please review it and remove it.\n*******************************************************************************************************************************", format = Message.Format.MESSAGE_FORMAT)
|
||||||
|
void movingFileToAttic(String fileName);
|
||||||
}
|
}
|
||||||
|
|
|
@ -845,6 +845,18 @@ public interface Configuration {
|
||||||
*/
|
*/
|
||||||
Configuration setJournalBufferSize_NIO(int journalBufferSize);
|
Configuration setJournalBufferSize_NIO(int journalBufferSize);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the maximal number of data files before we can start deleting corrupted files instead of moving them to attic.
|
||||||
|
* <br>
|
||||||
|
* Default value is {@link org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration#DEFAULT_JOURNAL_MAX_ATTIC_FILES}.
|
||||||
|
*/
|
||||||
|
int getJournalMaxAtticFiles();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets the maximal number of data files before we can start deleting corrupted files instead of moving them to attic.
|
||||||
|
*/
|
||||||
|
Configuration setJournalMaxAtticFiles(int maxAtticFiles);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns whether the bindings directory is created on this server startup. <br>
|
* Returns whether the bindings directory is created on this server startup. <br>
|
||||||
* Default value is {@link org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration#DEFAULT_CREATE_BINDINGS_DIR}.
|
* Default value is {@link org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration#DEFAULT_CREATE_BINDINGS_DIR}.
|
||||||
|
|
|
@ -218,6 +218,8 @@ public class ConfigurationImpl implements Configuration, Serializable {
|
||||||
|
|
||||||
protected int journalMinFiles = ActiveMQDefaultConfiguration.getDefaultJournalMinFiles();
|
protected int journalMinFiles = ActiveMQDefaultConfiguration.getDefaultJournalMinFiles();
|
||||||
|
|
||||||
|
protected int journalMaxAtticFilesFiles = ActiveMQDefaultConfiguration.getDefaultJournalMaxAtticFiles();
|
||||||
|
|
||||||
// AIO and NIO need different values for these attributes
|
// AIO and NIO need different values for these attributes
|
||||||
|
|
||||||
protected int journalMaxIO_AIO = ActiveMQDefaultConfiguration.getDefaultJournalMaxIoAio();
|
protected int journalMaxIO_AIO = ActiveMQDefaultConfiguration.getDefaultJournalMaxIoAio();
|
||||||
|
@ -2508,4 +2510,15 @@ public class ConfigurationImpl implements Configuration, Serializable {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getJournalMaxAtticFiles() {
|
||||||
|
return journalMaxAtticFilesFiles;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Configuration setJournalMaxAtticFiles(int maxAtticFiles) {
|
||||||
|
this.journalMaxAtticFilesFiles = maxAtticFiles;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -637,6 +637,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
|
||||||
|
|
||||||
config.setJournalFileSize(getTextBytesAsIntBytes(e, "journal-file-size", config.getJournalFileSize(), Validators.POSITIVE_INT));
|
config.setJournalFileSize(getTextBytesAsIntBytes(e, "journal-file-size", config.getJournalFileSize(), Validators.POSITIVE_INT));
|
||||||
|
|
||||||
|
config.setJournalMaxAtticFiles(getInteger(e, "journal-max-attic-files", config.getJournalMaxAtticFiles(), Validators.NO_CHECK));
|
||||||
|
|
||||||
int journalBufferTimeout = getInteger(e, "journal-buffer-timeout", config.getJournalType() == JournalType.ASYNCIO ? ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO : ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO, Validators.GE_ZERO);
|
int journalBufferTimeout = getInteger(e, "journal-buffer-timeout", config.getJournalType() == JournalType.ASYNCIO ? ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO : ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO, Validators.GE_ZERO);
|
||||||
|
|
||||||
int journalBufferSize = getTextBytesAsIntBytes(e, "journal-buffer-size", config.getJournalType() == JournalType.ASYNCIO ? ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO : ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_NIO, Validators.POSITIVE_INT);
|
int journalBufferSize = getTextBytesAsIntBytes(e, "journal-buffer-size", config.getJournalType() == JournalType.ASYNCIO ? ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO : ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_NIO, Validators.POSITIVE_INT);
|
||||||
|
|
|
@ -130,7 +130,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
|
||||||
bindingsFF = new NIOSequentialFileFactory(config.getBindingsLocation(), criticalErrorListener, config.getJournalMaxIO_NIO());
|
bindingsFF = new NIOSequentialFileFactory(config.getBindingsLocation(), criticalErrorListener, config.getJournalMaxIO_NIO());
|
||||||
bindingsFF.setDatasync(config.isJournalDatasync());
|
bindingsFF.setDatasync(config.isJournalDatasync());
|
||||||
|
|
||||||
Journal localBindings = new JournalImpl(ioExecutorFactory, 1024 * 1024, 2, config.getJournalPoolFiles(), config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), config.getJournalFileOpenTimeout(), bindingsFF, "activemq-bindings", "bindings", 1, 0, criticalErrorListener);
|
Journal localBindings = new JournalImpl(ioExecutorFactory, 1024 * 1024, 2, config.getJournalPoolFiles(), config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), config.getJournalFileOpenTimeout(), bindingsFF, "activemq-bindings", "bindings", 1, 0, criticalErrorListener, config.getJournalMaxAtticFiles());
|
||||||
|
|
||||||
bindingsJournal = localBindings;
|
bindingsJournal = localBindings;
|
||||||
originalBindingsJournal = localBindings;
|
originalBindingsJournal = localBindings;
|
||||||
|
@ -210,7 +210,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
|
||||||
protected Journal createMessageJournal(Configuration config,
|
protected Journal createMessageJournal(Configuration config,
|
||||||
IOCriticalErrorListener criticalErrorListener,
|
IOCriticalErrorListener criticalErrorListener,
|
||||||
int fileSize) {
|
int fileSize) {
|
||||||
return new JournalImpl(ioExecutorFactory, fileSize, config.getJournalMinFiles(), config.getJournalPoolFiles(), config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), config.getJournalFileOpenTimeout(), journalFF, "activemq-data", "amq", journalFF.getMaxIO(), 0, criticalErrorListener);
|
return new JournalImpl(ioExecutorFactory, fileSize, config.getJournalMinFiles(), config.getJournalPoolFiles(), config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), config.getJournalFileOpenTimeout(), journalFF, "activemq-data", "amq", journalFF.getMaxIO(), 0, criticalErrorListener, config.getJournalMaxAtticFiles());
|
||||||
}
|
}
|
||||||
|
|
||||||
// Life Cycle Handlers
|
// Life Cycle Handlers
|
||||||
|
|
|
@ -90,6 +90,8 @@ public class DefaultsFileConfigurationTest extends ConfigurationImplTest {
|
||||||
|
|
||||||
Assert.assertEquals(ActiveMQDefaultConfiguration.getDefaultJournalFileSize(), conf.getJournalFileSize());
|
Assert.assertEquals(ActiveMQDefaultConfiguration.getDefaultJournalFileSize(), conf.getJournalFileSize());
|
||||||
|
|
||||||
|
Assert.assertEquals(ActiveMQDefaultConfiguration.getDefaultJournalMaxAtticFiles(), conf.getJournalMaxAtticFiles());
|
||||||
|
|
||||||
Assert.assertEquals(ActiveMQDefaultConfiguration.getDefaultJournalCompactMinFiles(), conf.getJournalCompactMinFiles());
|
Assert.assertEquals(ActiveMQDefaultConfiguration.getDefaultJournalCompactMinFiles(), conf.getJournalCompactMinFiles());
|
||||||
|
|
||||||
Assert.assertEquals(ActiveMQDefaultConfiguration.getDefaultJournalCompactPercentage(), conf.getJournalCompactPercentage());
|
Assert.assertEquals(ActiveMQDefaultConfiguration.getDefaultJournalCompactPercentage(), conf.getJournalCompactPercentage());
|
||||||
|
|
|
@ -97,7 +97,7 @@ public class ShutdownOnCriticalIOErrorMoveNextTest extends ActiveMQTestBase {
|
||||||
protected Journal createMessageJournal(Configuration config,
|
protected Journal createMessageJournal(Configuration config,
|
||||||
IOCriticalErrorListener criticalErrorListener,
|
IOCriticalErrorListener criticalErrorListener,
|
||||||
int fileSize) {
|
int fileSize) {
|
||||||
return new JournalImpl(ioExecutorFactory, fileSize, config.getJournalMinFiles(), config.getJournalPoolFiles(), config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), config.getJournalFileOpenTimeout(), journalFF, "activemq-data", "amq", journalFF.getMaxIO(), 0, criticalErrorListener) {
|
return new JournalImpl(ioExecutorFactory, fileSize, config.getJournalMinFiles(), config.getJournalPoolFiles(), config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), config.getJournalFileOpenTimeout(), journalFF, "activemq-data", "amq", journalFF.getMaxIO(), 0, criticalErrorListener, config.getJournalMaxAtticFiles()) {
|
||||||
@Override
|
@Override
|
||||||
protected void moveNextFile(boolean scheduleReclaim) throws Exception {
|
protected void moveNextFile(boolean scheduleReclaim) throws Exception {
|
||||||
super.moveNextFile(scheduleReclaim);
|
super.moveNextFile(scheduleReclaim);
|
||||||
|
|
Loading…
Reference in New Issue