NIFI-3373: Add nifi.flow.configuration.archive.max.count

- Add 'nifi.flow.configuration.archive.max.count' in nifi.properties
- Change default archive limit so that it uses archive max time(30 days)
  and storage (500MB) if no limitation is specified
- Simplified logic to delete old archives

This closes #1460.

Signed-off-by: Koji Kawamura <ijokarumawak@apache.org>
This commit is contained in:
Koji Kawamura 2017-01-24 09:23:01 +09:00
parent 76fcf5def1
commit 1eb98aefee
6 changed files with 240 additions and 111 deletions

View File

@ -53,6 +53,7 @@ public abstract class NiFiProperties {
public static final String FLOW_CONFIGURATION_ARCHIVE_DIR = "nifi.flow.configuration.archive.dir";
public static final String FLOW_CONFIGURATION_ARCHIVE_MAX_TIME = "nifi.flow.configuration.archive.max.time";
public static final String FLOW_CONFIGURATION_ARCHIVE_MAX_STORAGE = "nifi.flow.configuration.archive.max.storage";
public static final String FLOW_CONFIGURATION_ARCHIVE_MAX_COUNT = "nifi.flow.configuration.archive.max.count";
public static final String AUTHORIZER_CONFIGURATION_FILE = "nifi.authorizer.configuration.file";
public static final String LOGIN_IDENTITY_PROVIDER_CONFIGURATION_FILE = "nifi.login.identity.provider.configuration.file";
public static final String REPOSITORY_DATABASE_DIRECTORY = "nifi.database.directory";
@ -974,11 +975,15 @@ public abstract class NiFiProperties {
}
public String getFlowConfigurationArchiveMaxTime() {
return getProperty(FLOW_CONFIGURATION_ARCHIVE_MAX_TIME, DEFAULT_FLOW_CONFIGURATION_ARCHIVE_MAX_TIME);
return getProperty(FLOW_CONFIGURATION_ARCHIVE_MAX_TIME, null);
}
public String getFlowConfigurationArchiveMaxStorage() {
return getProperty(FLOW_CONFIGURATION_ARCHIVE_MAX_STORAGE, DEFAULT_FLOW_CONFIGURATION_ARCHIVE_MAX_STORAGE);
return getProperty(FLOW_CONFIGURATION_ARCHIVE_MAX_STORAGE, null);
}
public Integer getFlowConfigurationArchiveMaxCount() {
return getIntegerProperty(FLOW_CONFIGURATION_ARCHIVE_MAX_COUNT, null);
}
public String getVariableRegistryProperties() {

View File

@ -1935,9 +1935,11 @@ The first section of the _nifi.properties_ file is for the Core Properties. Thes
|nifi.version|The version number of the current release. If upgrading but reusing this file, be sure to update this value.
|nifi.flow.configuration.file*|The location of the flow configuration file (i.e., the file that contains what is currently displayed on the NiFi graph). The default value is ./conf/flow.xml.gz.
|nifi.flow.configuration.archive.enabled*|Specifies whether NiFi creates a backup copy of the flow automatically when the flow is updated. The default value is _true_.
|nifi.flow.configuration.archive.dir*|The location of the archive directory where backup copies of the flow.xml are saved. The default value is ./conf/archive. NiFi removes old archive files to limit disk usage based on file lifespan and total size, as specified with max.time and max.storage properties below. However, this cleanup mechanism takes into account only automatically created archived flow.xml files. That is, if there are other files or directories in this archive directory, NiFi will ignore them. Automatically created archives have filename with ISO 8601 format timestamp prefix followed by '_<original-filename>'. That is <year><month><day>T<hour><minute><second>+<timezone offset>_<original filename>. For example, `20160706T160719+0900_flow.xml.gz`. NiFi checks filenames when it cleans archive directory. If you would like to keep a particular archive in this directory without worrying about NiFi deleting it, you can do so by copying it with a different filename pattern.
|nifi.flow.configuration.archive.max.time*|The lifespan of archived flow.xml files. NiFi will delete expired archive files when it updates flow.xml. Expiration is determined based on current system time and the last modified timestamp of an archived flow.xml. The default value is 30 days.
|nifi.flow.configuration.archive.max.storage*|The total data size allowed for the archived flow.xml files. NiFi will delete the oldest archive files until the total archived file size becomes less than this configuration value. The default value is 500 MB.
|nifi.flow.configuration.archive.dir*|The location of the archive directory where backup copies of the flow.xml are saved. The default value is ./conf/archive. NiFi removes old archive files to limit disk usage based on archived file lifespan, total size, and number of files, as specified with nifi.flow.configuration.archive.max.time, max.storage and max.count properties respectively. If none of these limitation for archiving is specified, NiFi uses default condition, that is "30 days" for max.time and "500 MB" for max.storage. +
This cleanup mechanism takes into account only automatically created archived flow.xml files. If there are other files or directories in this archive directory, NiFi will ignore them. Automatically created archives have filename with ISO 8601 format timestamp prefix followed by '_<original-filename>'. That is <year><month><day>T<hour><minute><second>+<timezone offset>_<original filename>. For example, `20160706T160719+0900_flow.xml.gz`. NiFi checks filenames when it cleans archive directory. If you would like to keep a particular archive in this directory without worrying about NiFi deleting it, you can do so by copying it with a different filename pattern.
|nifi.flow.configuration.archive.max.time*|The lifespan of archived flow.xml files. NiFi will delete expired archive files when it updates flow.xml if this property is specified. Expiration is determined based on current system time and the last modified timestamp of an archived flow.xml. If no archive limitation is specified in nifi.properties, NiFi removes archives older than "30 days".
|nifi.flow.configuration.archive.max.storage*|The total data size allowed for the archived flow.xml files. NiFi will delete the oldest archive files until the total archived file size becomes less than this configuration value, if this property is specified. If no archive limitation is specified in nifi.properties, NiFi uses "500 MB" for this.
|nifi.flow.configuration.archive.max.count*|The number of archive files allowed. NiFi will delete the oldest archive files so that only N latest archives can be kept, if this property is specified.
|nifi.flowcontroller.autoResumeState|Indicates whether -upon restart- the components on the NiFi graph should return to their last state. The default value is _true_.
|nifi.flowcontroller.graceful.shutdown.period|Indicates the shutdown period. The default value is 10 sec.
|nifi.flowservice.writedelay.interval|When many changes are made to the flow.xml, this property specifies how long to wait before writing out the changes, so as to batch the changes into a single write. The default value is 500 ms.

View File

@ -19,6 +19,7 @@ package org.apache.nifi.persistence;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -28,14 +29,16 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.util.Calendar;
import java.util.Comparator;
import java.util.GregorianCalendar;
import java.util.List;
import java.util.Map;
import java.util.TimeZone;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class FlowConfigurationArchiveManager {
@ -48,35 +51,38 @@ public class FlowConfigurationArchiveManager {
* <li>yyyyMMddTHHmmssZ_original-file-name</li>
*/
private final Pattern archiveFilenamePattern = Pattern.compile("^([\\d]{8}T[\\d]{6}([\\+\\-][\\d]{4}|Z))_.+$");
private final Path flowFile;
private final Path flowConfigFile;
private final Path archiveDir;
private final long maxTimeMillis;
private final long maxStorageBytes;
private final Integer maxCount;
private final Long maxTimeMillis;
private final Long maxStorageBytes;
public FlowConfigurationArchiveManager(final Path flowFile, NiFiProperties properties) {
public FlowConfigurationArchiveManager(final Path flowConfigFile, final NiFiProperties properties) {
final String archiveDirVal = properties.getFlowConfigurationArchiveDir();
final Path archiveDir = (archiveDirVal == null || archiveDirVal.equals(""))
? flowFile.getParent().resolve("archive") : new File(archiveDirVal).toPath();
? flowConfigFile.getParent().resolve("archive") : new File(archiveDirVal).toPath();
final long archiveMaxTime =
FormatUtils.getTimeDuration(properties.getFlowConfigurationArchiveMaxTime(), TimeUnit.MILLISECONDS);
final long archiveMaxStorage =
DataUnit.parseDataSize(properties.getFlowConfigurationArchiveMaxStorage(), DataUnit.B).longValue();
this.maxCount = properties.getFlowConfigurationArchiveMaxCount();
this.flowFile = flowFile;
this.archiveDir = archiveDir;
this.maxTimeMillis = archiveMaxTime;
this.maxStorageBytes = archiveMaxStorage;
String maxTime = properties.getFlowConfigurationArchiveMaxTime();
String maxStorage = properties.getFlowConfigurationArchiveMaxStorage();
if (maxCount == null && StringUtils.isBlank(maxTime) && StringUtils.isBlank(maxStorage)) {
// None of limitation is specified, fall back to the default configuration;
maxTime = NiFiProperties.DEFAULT_FLOW_CONFIGURATION_ARCHIVE_MAX_TIME;
maxStorage = NiFiProperties.DEFAULT_FLOW_CONFIGURATION_ARCHIVE_MAX_STORAGE;
logger.info("None of archive max limitation is specified, fall back to the default configuration, maxTime={}, maxStorage={}",
maxTime, maxStorage);
}
public FlowConfigurationArchiveManager(final Path flowFile, final Path archiveDir, long maxTimeMillis, long maxStorageBytes) {
this.flowFile = flowFile;
this.maxTimeMillis = StringUtils.isBlank(maxTime) ? null : FormatUtils.getTimeDuration(maxTime, TimeUnit.MILLISECONDS);
this.maxStorageBytes = StringUtils.isBlank(maxStorage) ? null : DataUnit.parseDataSize(maxStorage, DataUnit.B).longValue();
this.flowConfigFile = flowConfigFile;
this.archiveDir = archiveDir;
this.maxTimeMillis = maxTimeMillis;
this.maxStorageBytes = maxStorageBytes;
}
private String createArchiveFileName(final String originalFlowFileName) {
private String createArchiveFileName(final String originalFlowConfigFileName) {
TimeZone tz = TimeZone.getDefault();
Calendar cal = GregorianCalendar.getInstance(tz);
int offsetInMillis = tz.getOffset(cal.getTimeInMillis());
@ -93,7 +99,7 @@ public class FlowConfigurationArchiveManager {
Math.abs((offsetInMillis / 60000) % 60));
return String.format("%d%02d%02dT%02d%02d%02d%s_%s",
year, month, day, hour, min, sec, offset, originalFlowFileName);
year, month, day, hour, min, sec, offset, originalFlowConfigFileName);
}
/**
@ -107,16 +113,19 @@ public class FlowConfigurationArchiveManager {
if (!Files.isDirectory(archiveDir)) {
throw new IOException("Archive directory doesn't appear to be a directory " + archiveDir);
}
final String originalFlowFileName = flowFile.getFileName().toString();
final Path archiveFile = archiveDir.resolve(createArchiveFileName(originalFlowFileName));
final String originalFlowConfigFileName = flowConfigFile.getFileName().toString();
final Path archiveFile = archiveDir.resolve(createArchiveFileName(originalFlowConfigFileName));
return archiveFile.toFile();
}
/**
* Archive current flow configuration file by copying the original file to the archive directory.
* After creating new archive file:
* <li>It removes expired archive files based on its last modification date and maxTimeMillis</li>
* <li>It removes old files until total size of archive files becomes less than maxStorageBytes</li>
* Before creating new archive file, this method removes old archives to satisfy following conditions:
* <ul>
* <li>Number of archive files less than or equal to maxCount</li>
* <li>Keep archive files which has last modified timestamp no older than current system timestamp - maxTimeMillis</li>
* <li>Total size of archive files less than or equal to maxStorageBytes</li>
* </ul>
* This method keeps other files intact, so that users can keep particular archive by copying it with different name.
* Whether a given file is archive file or not is determined by the filename.
* Since archive file name consists of timestamp up to seconds, if archive is called multiple times within a second,
@ -127,65 +136,64 @@ public class FlowConfigurationArchiveManager {
* Although, other IOExceptions like the ones thrown during removing expired archive files will not be thrown.
*/
public File archive() throws IOException {
final String originalFlowFileName = flowFile.getFileName().toString();
final File archiveFile = setupArchiveFile();
Files.copy(flowFile, archiveFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
final String originalFlowConfigFileName = flowConfigFile.getFileName().toString();
// Collect archive files by its name, and group by expiry state.
final File archiveFile = setupArchiveFile();
// Collect archive files by its name
final long now = System.currentTimeMillis();
final Map<Boolean, List<Path>> oldArchives = Files.walk(archiveDir, 1).filter(p -> {
final AtomicLong totalArchiveSize = new AtomicLong(0);
final List<Path> archives = Files.walk(archiveDir, 1).filter(p -> {
final String filename = p.getFileName().toString();
if (Files.isRegularFile(p) && filename.endsWith("_" + originalFlowFileName)) {
if (Files.isRegularFile(p) && filename.endsWith("_" + originalFlowConfigFileName)) {
final Matcher matcher = archiveFilenamePattern.matcher(filename);
if (matcher.matches() && filename.equals(matcher.group(1) + "_" + originalFlowFileName)) {
if (matcher.matches() && filename.equals(matcher.group(1) + "_" + originalFlowConfigFileName)) {
try {
totalArchiveSize.getAndAdd(Files.size(p));
} catch (IOException e) {
logger.warn("Failed to get file size of {} due to {}", p, e);
}
return true;
}
}
return false;
}).collect(Collectors.groupingBy(p -> (now - p.toFile().lastModified()) > maxTimeMillis, Collectors.toList()));
}).collect(Collectors.toList());
logger.debug("oldArchives={}", oldArchives);
// Sort by timestamp.
archives.sort(Comparator.comparingLong(path -> path.toFile().lastModified()));
// Remove expired files
final List<Path> expiredArchives = oldArchives.get(true);
if (expiredArchives != null) {
expiredArchives.stream().forEach(p -> {
logger.debug("archives={}", archives);
final int archiveCount = archives.size();
final long flowConfigFileSize = Files.size(flowConfigFile);
IntStream.range(0, archiveCount).filter(i -> {
// If maxCount is specified, remove old archives
boolean old = maxCount != null && maxCount > 0 && (archiveCount - i) > maxCount - 1;
// If maxTime is specified, remove expired archives
final File archive = archives.get(i).toFile();
old = old || (maxTimeMillis != null && maxTimeMillis > 0 && (now - archive.lastModified()) > maxTimeMillis);
// If maxStorage is specified, remove old archives
old = old || (maxStorageBytes != null && maxStorageBytes > 0 && (totalArchiveSize.get() + flowConfigFileSize > maxStorageBytes));
if (old) {
totalArchiveSize.getAndAdd(archive.length() * -1);
logger.info("Removing old archive file {} to reduce storage usage. currentSize={}", archive, totalArchiveSize);
}
return old;
}).forEach(i -> {
try {
logger.info("Removing expired archive file {}", p);
Files.delete(p);
Files.delete(archives.get(i));
} catch (IOException e) {
logger.warn("Failed to delete expired archive {} due to {}", p, e.toString());
logger.warn("Failed to delete {} to reduce storage usage, due to {}", archives.get(i), e);
}
});
}
// Calculate size
final List<Path> remainingArchives = oldArchives.get(false);
final long totalArchiveSize = remainingArchives.stream().mapToLong(p -> {
try {
return Files.size(p);
} catch (IOException e) {
logger.warn("Failed to get file size of {} due to {}", p, e.toString());
return 0;
}
}).sum();
logger.debug("totalArchiveSize={}", totalArchiveSize);
// Create new archive file.
Files.copy(flowConfigFile, archiveFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
// Remove old files until total size gets less than max storage size
remainingArchives.sort((a, b)
-> Long.valueOf(a.toFile().lastModified()).compareTo(Long.valueOf(b.toFile().lastModified())));
long reducedTotalArchiveSize = totalArchiveSize;
for (int i = 0; i < remainingArchives.size()
&& reducedTotalArchiveSize > maxStorageBytes; i++) {
final Path path = remainingArchives.get(i);
try {
logger.info("Removing archive file {} to reduce storage usage. currentSize={}", path, reducedTotalArchiveSize);
final long size = Files.size(path);
Files.delete(path);
reducedTotalArchiveSize -= size;
} catch (IOException e) {
logger.warn("Failed to delete {} to reduce storage usage, due to {}", path, e.toString());
}
if (maxStorageBytes != null && maxStorageBytes > 0 && flowConfigFileSize > maxStorageBytes) {
logger.warn("Size of {} ({}) exceeds configured maxStorage size ({}). Archive won't be able to keep old files.",
flowConfigFile, flowConfigFileSize, maxStorageBytes);
}
return archiveFile;

View File

@ -173,9 +173,9 @@ public final class StandardXMLFlowConfigurationDAO implements FlowConfigurationD
try {
archiveManager.archive();
} catch (final Exception ex) {
LOG.warn("Unable to archive flow configuration as requested due to " + ex);
LOG.error("Unable to archive flow configuration as requested due to " + ex);
if (LOG.isDebugEnabled()) {
LOG.warn("", ex);
LOG.error("", ex);
}
}
}

View File

@ -16,16 +16,14 @@
*/
package org.apache.nifi.persistence;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.lang.reflect.Field;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
@ -36,18 +34,17 @@ import java.nio.file.attribute.BasicFileAttributes;
import java.nio.file.attribute.FileTime;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertNull;
public class TestFlowConfigurationArchiveManager {
private static final Logger logger = LoggerFactory.getLogger(TestFlowConfigurationArchiveManager.class);
private final File flowFile = new File("./target/flow-archive/flow.xml.gz");
private final File archiveDir = new File("./target/flow-archive");
private final long maxTime = FormatUtils.getTimeDuration("30 days", TimeUnit.MILLISECONDS);
private long maxStorage = DataUnit.parseDataSize("500 MB", DataUnit.B).longValue();
@Before
public void before() throws Exception {
@ -73,11 +70,81 @@ public class TestFlowConfigurationArchiveManager {
}
private Object getPrivateFieldValue(final FlowConfigurationArchiveManager archiveManager, final String fieldName)
throws NoSuchFieldException, IllegalAccessException {
final Field field = FlowConfigurationArchiveManager.class.getDeclaredField(fieldName);
field.setAccessible(true);
return field.get(archiveManager);
}
@Test
public void testNiFiPropertiesDefault() throws Exception {
final NiFiProperties defaultProperties = mock(NiFiProperties.class);
when(defaultProperties.getFlowConfigurationArchiveMaxCount()).thenReturn(null);
when(defaultProperties.getFlowConfigurationArchiveMaxTime()).thenReturn(null);
when(defaultProperties.getFlowConfigurationArchiveMaxStorage()).thenReturn(null);
final FlowConfigurationArchiveManager archiveManager = new FlowConfigurationArchiveManager(flowFile.toPath(), defaultProperties);
assertNull(getPrivateFieldValue(archiveManager, "maxCount"));
assertEquals(60L * 60L * 24L * 30L * 1000L, getPrivateFieldValue(archiveManager, "maxTimeMillis"));
assertEquals(500L * 1024L * 1024L, getPrivateFieldValue(archiveManager, "maxStorageBytes"));
}
@Test
public void testNiFiPropertiesMaxTime() throws Exception {
final NiFiProperties withMaxTime = mock(NiFiProperties.class);
when(withMaxTime.getFlowConfigurationArchiveMaxCount()).thenReturn(null);
when(withMaxTime.getFlowConfigurationArchiveMaxTime()).thenReturn("10 days");
when(withMaxTime.getFlowConfigurationArchiveMaxStorage()).thenReturn(null);
final FlowConfigurationArchiveManager archiveManager = new FlowConfigurationArchiveManager(flowFile.toPath(), withMaxTime);
assertNull(getPrivateFieldValue(archiveManager, "maxCount"));
assertEquals(60L * 60L * 24L * 10L * 1000L, getPrivateFieldValue(archiveManager, "maxTimeMillis"));
assertNull(getPrivateFieldValue(archiveManager, "maxStorageBytes"));
}
@Test
public void testNiFiPropertiesMaxStorage() throws Exception {
final NiFiProperties withMaxTime = mock(NiFiProperties.class);
when(withMaxTime.getFlowConfigurationArchiveMaxCount()).thenReturn(null);
when(withMaxTime.getFlowConfigurationArchiveMaxTime()).thenReturn(null);
when(withMaxTime.getFlowConfigurationArchiveMaxStorage()).thenReturn("10 MB");
final FlowConfigurationArchiveManager archiveManager = new FlowConfigurationArchiveManager(flowFile.toPath(), withMaxTime);
assertNull(getPrivateFieldValue(archiveManager, "maxCount"));
assertNull(getPrivateFieldValue(archiveManager, "maxTimeMillis"));
assertEquals(10L * 1024L * 1024L, getPrivateFieldValue(archiveManager, "maxStorageBytes"));
}
@Test
public void testNiFiPropertiesCount() throws Exception {
final NiFiProperties onlyCount = mock(NiFiProperties.class);
when(onlyCount.getFlowConfigurationArchiveMaxCount()).thenReturn(10);
when(onlyCount.getFlowConfigurationArchiveMaxTime()).thenReturn(null);
when(onlyCount.getFlowConfigurationArchiveMaxStorage()).thenReturn(null);
final FlowConfigurationArchiveManager archiveManager = new FlowConfigurationArchiveManager(flowFile.toPath(), onlyCount);
assertEquals(10, getPrivateFieldValue(archiveManager, "maxCount"));
assertNull(getPrivateFieldValue(archiveManager, "maxTimeMillis"));
assertNull(getPrivateFieldValue(archiveManager, "maxStorageBytes"));
}
@Test(expected = NoSuchFileException.class)
public void testArchiveWithoutOriginalFile() throws Exception {
final NiFiProperties properties = mock(NiFiProperties.class);
when(properties.getFlowConfigurationArchiveDir()).thenReturn(archiveDir.getPath());
final File flowFile = new File("does-not-exist");
final FlowConfigurationArchiveManager archiveManager =
new FlowConfigurationArchiveManager(flowFile.toPath(), archiveDir.toPath(), maxTime, maxStorage);
new FlowConfigurationArchiveManager(flowFile.toPath(), properties);
archiveManager.archive();
}
@ -88,8 +155,7 @@ public class TestFlowConfigurationArchiveManager {
final long now = System.currentTimeMillis();
final SimpleDateFormat dateFormat = new SimpleDateFormat("HHmmss");
FlowConfigurationArchiveManager archiveManager =
new FlowConfigurationArchiveManager(flowFile.toPath(), archiveDir.toPath(), maxTime, maxStorage);
final FlowConfigurationArchiveManager archiveManager = createArchiveManager(null,null, null);
for (int i = oldArchives.length; i > 0; i--) {
final Date date = new Date(now - (intervalMillis * i));
@ -106,6 +172,15 @@ public class TestFlowConfigurationArchiveManager {
}
}
private FlowConfigurationArchiveManager createArchiveManager(final Integer maxCount, final String maxTime, final String maxStorage) {
final NiFiProperties properties = mock(NiFiProperties.class);
when(properties.getFlowConfigurationArchiveDir()).thenReturn(archiveDir.getPath());
when(properties.getFlowConfigurationArchiveMaxCount()).thenReturn(maxCount);
when(properties.getFlowConfigurationArchiveMaxTime()).thenReturn(maxTime);
when(properties.getFlowConfigurationArchiveMaxStorage()).thenReturn(maxStorage);
return new FlowConfigurationArchiveManager(flowFile.toPath(), properties);
}
@Test
public void testArchiveExpiration() throws Exception {
@ -115,25 +190,22 @@ public class TestFlowConfigurationArchiveManager {
// Now, we will test expiration. There should be following old archives created above:
// -5 min, -4 min, -3min, -2min, -1min
// if maxTime = 3.5min, The oldest two files should be removed, -5 min and -4 min,
// resulting four files of -3min, -2min, -1min, and newly created archive.
final long maxTimeForExpirationTest = intervalMillis * 3 + (intervalMillis / 2);
FlowConfigurationArchiveManager archiveManager = new FlowConfigurationArchiveManager(flowFile.toPath(),
archiveDir.toPath(), maxTimeForExpirationTest, maxStorage);
final FlowConfigurationArchiveManager archiveManager = createArchiveManager(null, maxTimeForExpirationTest + "ms", null);
final File archive = archiveManager.archive();
assertTrue(archive.isFile());
assertFalse(oldArchives[0].exists());
assertFalse(oldArchives[1].exists());
assertTrue(oldArchives[2].isFile());
assertTrue(oldArchives[3].isFile());
assertTrue(oldArchives[4].isFile());
assertTrue(!oldArchives[0].exists()); // -5 min
assertTrue(!oldArchives[1].exists()); // -4 min
assertTrue(oldArchives[2].isFile()); // -3 min
assertTrue(oldArchives[3].isFile()); // -2 min
assertTrue(oldArchives[4].isFile()); // -1 min
assertTrue(archive.exists()); // new archive
assertTrue("Original file should remain intact", flowFile.isFile());
}
@Test
public void testArchiveStorageSizeLimit() throws Exception {
@ -143,22 +215,63 @@ public class TestFlowConfigurationArchiveManager {
// Now, we will test storage size limit. There should be following old archives created above:
// -5 min, -4 min, -3min, -2min, -1min, each of those have 10 bytes.
// if maxStorage = 20 bytes, The oldest four files should be removed,
// resulting two files of -1min, and newly created archive, 20 bytes in total.
FlowConfigurationArchiveManager archiveManager = new FlowConfigurationArchiveManager(flowFile.toPath(),
archiveDir.toPath(), maxTime, 20);
final FlowConfigurationArchiveManager archiveManager = createArchiveManager(null,null, "20b");
final File archive = archiveManager.archive();
assertTrue(archive.isFile());
assertFalse(oldArchives[0].exists());
assertFalse(oldArchives[1].exists());
assertFalse(oldArchives[2].exists());
assertFalse(oldArchives[3].exists());
assertTrue(oldArchives[4].isFile());
assertTrue(!oldArchives[0].exists()); // -5 min
assertTrue(!oldArchives[1].exists()); // -4 min
assertTrue(!oldArchives[2].exists()); // -3 min
assertTrue(!oldArchives[3].exists()); // -2 min
assertTrue(oldArchives[4].exists()); // -1 min
assertTrue(archive.exists()); // new archive
assertTrue("Original file should remain intact", flowFile.isFile());
}
@Test
public void testArchiveStorageCountLimit() throws Exception {
final long intervalMillis = 60_000;
File[] oldArchives = new File[5];
createSimulatedOldArchives(oldArchives, intervalMillis);
// Now, we will test count limit. There should be following old archives created above:
// -5 min, -4 min, -3min, -2min, -1min, each of those have 10 bytes.
final FlowConfigurationArchiveManager archiveManager = createArchiveManager(2,null, null);
final File archive = archiveManager.archive();
assertTrue(!oldArchives[0].exists()); // -5 min
assertTrue(!oldArchives[1].exists()); // -4 min
assertTrue(!oldArchives[2].exists()); // -3 min
assertTrue(!oldArchives[3].exists()); // -2 min
assertTrue(oldArchives[4].exists()); // -1 min
assertTrue(archive.exists()); // new archive
assertTrue("Original file should remain intact", flowFile.isFile());
}
@Test
public void testLargeConfigFile() throws Exception{
final long intervalMillis = 60_000;
File[] oldArchives = new File[5];
createSimulatedOldArchives(oldArchives, intervalMillis);
// Now, we will test storage size limit. There should be following old archives created above:
// -5 min, -4 min, -3min, -2min, -1min, each of those have 10 bytes.
final FlowConfigurationArchiveManager archiveManager = createArchiveManager(null,null, "3b");
final File archive = archiveManager.archive();
assertTrue(!oldArchives[0].exists()); // -5 min
assertTrue(!oldArchives[1].exists()); // -4 min
assertTrue(!oldArchives[2].exists()); // -3 min
assertTrue(!oldArchives[3].exists()); // -2 min
assertTrue(!oldArchives[4].exists()); // -1 min
assertTrue("Even if flow config file is larger than maxStorage file, it can be archived", archive.exists()); // new archive
assertTrue("Original file should remain intact", flowFile.isFile());
}
}

View File

@ -20,6 +20,7 @@ nifi.flow.configuration.archive.enabled=${nifi.flow.configuration.archive.enable
nifi.flow.configuration.archive.dir=${nifi.flow.configuration.archive.dir}
nifi.flow.configuration.archive.max.time=${nifi.flow.configuration.archive.max.time}
nifi.flow.configuration.archive.max.storage=${nifi.flow.configuration.archive.max.storage}
nifi.flow.configuration.archive.max.count=
nifi.flowcontroller.autoResumeState=${nifi.flowcontroller.autoResumeState}
nifi.flowcontroller.graceful.shutdown.period=${nifi.flowcontroller.graceful.shutdown.period}
nifi.flowservice.writedelay.interval=${nifi.flowservice.writedelay.interval}