This closes #3297
This commit is contained in:
commit
65c1f80dea
|
@ -291,6 +291,9 @@ public final class ActiveMQDefaultConfiguration {
|
|||
// The minimal number of data files before we can start compacting
|
||||
private static int DEFAULT_JOURNAL_COMPACT_MIN_FILES = 10;
|
||||
|
||||
// The maximal number of data files before we can start deleting corrupted files instead of moving them to attic.
|
||||
private static int DEFAULT_JOURNAL_MAX_ATTIC_FILES = 10;
|
||||
|
||||
// Interval to log server specific information (e.g. memory usage etc)
|
||||
private static long DEFAULT_SERVER_DUMP_INTERVAL = -1;
|
||||
|
||||
|
@ -998,6 +1001,13 @@ public final class ActiveMQDefaultConfiguration {
|
|||
return DEFAULT_JOURNAL_COMPACT_MIN_FILES;
|
||||
}
|
||||
|
||||
/**
|
||||
* how many journal files to be stored in the attic.
|
||||
*/
|
||||
public static int getDefaultJournalMaxAtticFiles() {
|
||||
return DEFAULT_JOURNAL_MAX_ATTIC_FILES;
|
||||
}
|
||||
|
||||
/**
|
||||
* Interval to log server specific information (e.g. memory usage etc)
|
||||
*/
|
||||
|
|
|
@ -16,6 +16,10 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.core.journal.impl;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.StandardCopyOption;
|
||||
import java.security.AccessController;
|
||||
import java.security.PrivilegedActionException;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
|
@ -30,6 +34,7 @@ import java.util.concurrent.LinkedBlockingQueue;
|
|||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException;
|
||||
import org.apache.activemq.artemis.core.io.SequentialFile;
|
||||
|
@ -83,6 +88,8 @@ public class JournalFilesRepository {
|
|||
|
||||
private final int journalFileOpenTimeout;
|
||||
|
||||
private final int maxAtticFiles;
|
||||
|
||||
private Executor openFilesExecutor;
|
||||
|
||||
private final Runnable pushOpenRunnable = new Runnable() {
|
||||
|
@ -109,7 +116,8 @@ public class JournalFilesRepository {
|
|||
final int fileSize,
|
||||
final int minFiles,
|
||||
final int poolSize,
|
||||
final int journalFileOpenTimeout) {
|
||||
final int journalFileOpenTimeout,
|
||||
final int maxAtticFiles) {
|
||||
if (filePrefix == null) {
|
||||
throw new IllegalArgumentException("filePrefix cannot be null");
|
||||
}
|
||||
|
@ -129,6 +137,7 @@ public class JournalFilesRepository {
|
|||
this.userVersion = userVersion;
|
||||
this.journal = journal;
|
||||
this.journalFileOpenTimeout = journalFileOpenTimeout;
|
||||
this.maxAtticFiles = maxAtticFiles;
|
||||
}
|
||||
|
||||
// Public --------------------------------------------------------
|
||||
|
@ -365,8 +374,7 @@ public class JournalFilesRepository {
|
|||
throw new IllegalStateException(e.getMessage() + " file: " + file);
|
||||
}
|
||||
if (calculatedSize != fileSize) {
|
||||
ActiveMQJournalLogger.LOGGER.deletingFile(file);
|
||||
file.getFile().delete();
|
||||
damagedFile(file);
|
||||
} else if (!checkDelete || (freeFilesCount.get() + dataFiles.size() + 1 + openedFiles.size() < poolSize) || (poolSize < 0)) {
|
||||
// Re-initialise it
|
||||
|
||||
|
@ -400,6 +408,30 @@ public class JournalFilesRepository {
|
|||
}
|
||||
}
|
||||
|
||||
private void damagedFile(JournalFile file) throws Exception {
|
||||
if (file.getFile().isOpen()) {
|
||||
file.getFile().close(false);
|
||||
}
|
||||
if (file.getFile().exists()) {
|
||||
final Path journalPath = file.getFile().getJavaFile().toPath();
|
||||
final Path atticPath = journalPath.getParent().resolve("attic");
|
||||
Files.createDirectories(atticPath);
|
||||
if (listFiles(atticPath) < maxAtticFiles) {
|
||||
ActiveMQJournalLogger.LOGGER.movingFileToAttic(file.getFile().getFileName());
|
||||
Files.move(journalPath, atticPath.resolve(journalPath.getFileName()), StandardCopyOption.REPLACE_EXISTING);
|
||||
} else {
|
||||
ActiveMQJournalLogger.LOGGER.deletingFile(file);
|
||||
Files.delete(journalPath);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private int listFiles(Path path) throws IOException {
|
||||
try (Stream<Path> files = Files.list(path)) {
|
||||
return files.mapToInt(e -> 1).sum();
|
||||
}
|
||||
}
|
||||
|
||||
public Collection<JournalFile> getFreeFiles() {
|
||||
return freeFiles;
|
||||
}
|
||||
|
|
|
@ -294,7 +294,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
|
|||
final String fileExtension,
|
||||
final int maxAIO,
|
||||
final int userVersion) {
|
||||
this(ioExecutors, fileSize, minFiles, poolSize, compactMinFiles, compactPercentage, journalFileOpenTimeout, fileFactory, filePrefix, fileExtension, maxAIO, userVersion, null);
|
||||
this(ioExecutors, fileSize, minFiles, poolSize, compactMinFiles, compactPercentage, journalFileOpenTimeout, fileFactory, filePrefix, fileExtension, maxAIO, userVersion, null, 0);
|
||||
}
|
||||
|
||||
|
||||
|
@ -310,7 +310,8 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
|
|||
final String fileExtension,
|
||||
final int maxAIO,
|
||||
final int userVersion,
|
||||
IOCriticalErrorListener criticalErrorListener) {
|
||||
IOCriticalErrorListener criticalErrorListener,
|
||||
final int maxAtticFiles) {
|
||||
|
||||
super(fileFactory.isSupportsCallbacks(), fileSize);
|
||||
|
||||
|
@ -340,7 +341,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
|
|||
|
||||
this.fileFactory = fileFactory;
|
||||
|
||||
filesRepository = new JournalFilesRepository(fileFactory, this, filePrefix, fileExtension, userVersion, maxAIO, fileSize, minFiles, poolSize, journalFileOpenTimeout);
|
||||
filesRepository = new JournalFilesRepository(fileFactory, this, filePrefix, fileExtension, userVersion, maxAIO, fileSize, minFiles, poolSize, journalFileOpenTimeout, maxAtticFiles);
|
||||
|
||||
this.userVersion = userVersion;
|
||||
}
|
||||
|
|
|
@ -128,7 +128,7 @@ public interface ActiveMQJournalLogger extends BasicLogger {
|
|||
void couldNotRemoveFile(JournalFile file);
|
||||
|
||||
@LogMessage(level = Logger.Level.WARN)
|
||||
@Message(id = 142009, value = "Deleting {0} as it does not have the configured size",
|
||||
@Message(id = 142009, value = "*******************************************************************************************************************************\nThe File Storage Attic is full, as the file {0} does not have the configured size, and the file will be removed\n*******************************************************************************************************************************",
|
||||
format = Message.Format.MESSAGE_FORMAT)
|
||||
void deletingFile(JournalFile file);
|
||||
|
||||
|
@ -277,4 +277,7 @@ public interface ActiveMQJournalLogger extends BasicLogger {
|
|||
@Message(id = 144007, value = "Ignoring journal file {0}: file is shorter then minimum header size. This file is being removed.", format = Message.Format.MESSAGE_FORMAT)
|
||||
void ignoringShortFile(String fileName);
|
||||
|
||||
@LogMessage(level = Logger.Level.WARN)
|
||||
@Message(id = 144008, value = "*******************************************************************************************************************************\nFile {0}: was moved under attic, please review it and remove it.\n*******************************************************************************************************************************", format = Message.Format.MESSAGE_FORMAT)
|
||||
void movingFileToAttic(String fileName);
|
||||
}
|
||||
|
|
|
@ -845,6 +845,18 @@ public interface Configuration {
|
|||
*/
|
||||
Configuration setJournalBufferSize_NIO(int journalBufferSize);
|
||||
|
||||
/**
|
||||
* Returns the maximal number of data files before we can start deleting corrupted files instead of moving them to attic.
|
||||
* <br>
|
||||
* Default value is {@link org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration#DEFAULT_JOURNAL_MAX_ATTIC_FILES}.
|
||||
*/
|
||||
int getJournalMaxAtticFiles();
|
||||
|
||||
/**
|
||||
* Sets the maximal number of data files before we can start deleting corrupted files instead of moving them to attic.
|
||||
*/
|
||||
Configuration setJournalMaxAtticFiles(int maxAtticFiles);
|
||||
|
||||
/**
|
||||
* Returns whether the bindings directory is created on this server startup. <br>
|
||||
* Default value is {@link org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration#DEFAULT_CREATE_BINDINGS_DIR}.
|
||||
|
|
|
@ -218,6 +218,8 @@ public class ConfigurationImpl implements Configuration, Serializable {
|
|||
|
||||
protected int journalMinFiles = ActiveMQDefaultConfiguration.getDefaultJournalMinFiles();
|
||||
|
||||
protected int journalMaxAtticFilesFiles = ActiveMQDefaultConfiguration.getDefaultJournalMaxAtticFiles();
|
||||
|
||||
// AIO and NIO need different values for these attributes
|
||||
|
||||
protected int journalMaxIO_AIO = ActiveMQDefaultConfiguration.getDefaultJournalMaxIoAio();
|
||||
|
@ -2508,4 +2510,15 @@ public class ConfigurationImpl implements Configuration, Serializable {
|
|||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getJournalMaxAtticFiles() {
|
||||
return journalMaxAtticFilesFiles;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Configuration setJournalMaxAtticFiles(int maxAtticFiles) {
|
||||
this.journalMaxAtticFilesFiles = maxAtticFiles;
|
||||
return this;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -637,6 +637,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
|
|||
|
||||
config.setJournalFileSize(getTextBytesAsIntBytes(e, "journal-file-size", config.getJournalFileSize(), Validators.POSITIVE_INT));
|
||||
|
||||
config.setJournalMaxAtticFiles(getInteger(e, "journal-max-attic-files", config.getJournalMaxAtticFiles(), Validators.NO_CHECK));
|
||||
|
||||
int journalBufferTimeout = getInteger(e, "journal-buffer-timeout", config.getJournalType() == JournalType.ASYNCIO ? ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO : ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO, Validators.GE_ZERO);
|
||||
|
||||
int journalBufferSize = getTextBytesAsIntBytes(e, "journal-buffer-size", config.getJournalType() == JournalType.ASYNCIO ? ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO : ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_NIO, Validators.POSITIVE_INT);
|
||||
|
|
|
@ -130,7 +130,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
|
|||
bindingsFF = new NIOSequentialFileFactory(config.getBindingsLocation(), criticalErrorListener, config.getJournalMaxIO_NIO());
|
||||
bindingsFF.setDatasync(config.isJournalDatasync());
|
||||
|
||||
Journal localBindings = new JournalImpl(ioExecutorFactory, 1024 * 1024, 2, config.getJournalPoolFiles(), config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), config.getJournalFileOpenTimeout(), bindingsFF, "activemq-bindings", "bindings", 1, 0, criticalErrorListener);
|
||||
Journal localBindings = new JournalImpl(ioExecutorFactory, 1024 * 1024, 2, config.getJournalPoolFiles(), config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), config.getJournalFileOpenTimeout(), bindingsFF, "activemq-bindings", "bindings", 1, 0, criticalErrorListener, config.getJournalMaxAtticFiles());
|
||||
|
||||
bindingsJournal = localBindings;
|
||||
originalBindingsJournal = localBindings;
|
||||
|
@ -210,7 +210,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
|
|||
protected Journal createMessageJournal(Configuration config,
|
||||
IOCriticalErrorListener criticalErrorListener,
|
||||
int fileSize) {
|
||||
return new JournalImpl(ioExecutorFactory, fileSize, config.getJournalMinFiles(), config.getJournalPoolFiles(), config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), config.getJournalFileOpenTimeout(), journalFF, "activemq-data", "amq", journalFF.getMaxIO(), 0, criticalErrorListener);
|
||||
return new JournalImpl(ioExecutorFactory, fileSize, config.getJournalMinFiles(), config.getJournalPoolFiles(), config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), config.getJournalFileOpenTimeout(), journalFF, "activemq-data", "amq", journalFF.getMaxIO(), 0, criticalErrorListener, config.getJournalMaxAtticFiles());
|
||||
}
|
||||
|
||||
// Life Cycle Handlers
|
||||
|
|
|
@ -90,6 +90,8 @@ public class DefaultsFileConfigurationTest extends ConfigurationImplTest {
|
|||
|
||||
Assert.assertEquals(ActiveMQDefaultConfiguration.getDefaultJournalFileSize(), conf.getJournalFileSize());
|
||||
|
||||
Assert.assertEquals(ActiveMQDefaultConfiguration.getDefaultJournalMaxAtticFiles(), conf.getJournalMaxAtticFiles());
|
||||
|
||||
Assert.assertEquals(ActiveMQDefaultConfiguration.getDefaultJournalCompactMinFiles(), conf.getJournalCompactMinFiles());
|
||||
|
||||
Assert.assertEquals(ActiveMQDefaultConfiguration.getDefaultJournalCompactPercentage(), conf.getJournalCompactPercentage());
|
||||
|
|
|
@ -97,7 +97,7 @@ public class ShutdownOnCriticalIOErrorMoveNextTest extends ActiveMQTestBase {
|
|||
protected Journal createMessageJournal(Configuration config,
|
||||
IOCriticalErrorListener criticalErrorListener,
|
||||
int fileSize) {
|
||||
return new JournalImpl(ioExecutorFactory, fileSize, config.getJournalMinFiles(), config.getJournalPoolFiles(), config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), config.getJournalFileOpenTimeout(), journalFF, "activemq-data", "amq", journalFF.getMaxIO(), 0, criticalErrorListener) {
|
||||
return new JournalImpl(ioExecutorFactory, fileSize, config.getJournalMinFiles(), config.getJournalPoolFiles(), config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), config.getJournalFileOpenTimeout(), journalFF, "activemq-data", "amq", journalFF.getMaxIO(), 0, criticalErrorListener, config.getJournalMaxAtticFiles()) {
|
||||
@Override
|
||||
protected void moveNextFile(boolean scheduleReclaim) throws Exception {
|
||||
super.moveNextFile(scheduleReclaim);
|
||||
|
|
Loading…
Reference in New Issue