NIFI-1200 fixed CPU saturation in FileSystemRepository

added minimal interval
added WARN message if set interval is lower

NIFI-1200 fixed unused imports
This commit is contained in:
Oleg Zhurakousky 2016-01-26 16:21:31 -05:00
parent 68975dc2d7
commit 0395b90315
2 changed files with 62 additions and 12 deletions

View File

@ -84,6 +84,7 @@ import org.slf4j.LoggerFactory;
public class FileSystemRepository implements ContentRepository { public class FileSystemRepository implements ContentRepository {
public static final int SECTIONS_PER_CONTAINER = 1024; public static final int SECTIONS_PER_CONTAINER = 1024;
public static final long MIN_CLEANUP_INTERVAL_MILLIS = 1000;
public static final String ARCHIVE_DIR_NAME = "archive"; public static final String ARCHIVE_DIR_NAME = "archive";
public static final Pattern MAX_ARCHIVE_SIZE_PATTERN = Pattern.compile("\\d{1,2}%"); public static final Pattern MAX_ARCHIVE_SIZE_PATTERN = Pattern.compile("\\d{1,2}%");
private static final Logger LOG = LoggerFactory.getLogger(FileSystemRepository.class); private static final Logger LOG = LoggerFactory.getLogger(FileSystemRepository.class);
@ -226,17 +227,8 @@ public class FileSystemRepository implements ContentRepository {
executor.scheduleWithFixedDelay(new ArchiveOrDestroyDestructableClaims(), 1, 1, TimeUnit.SECONDS); executor.scheduleWithFixedDelay(new ArchiveOrDestroyDestructableClaims(), 1, 1, TimeUnit.SECONDS);
} }
final String archiveCleanupFrequency = properties.getProperty(NiFiProperties.CONTENT_ARCHIVE_CLEANUP_FREQUENCY); final long cleanupMillis = this.determineCleanupInterval(properties);
final long cleanupMillis;
if (archiveCleanupFrequency == null) {
cleanupMillis = 1000L;
} else {
try {
cleanupMillis = FormatUtils.getTimeDuration(archiveCleanupFrequency.trim(), TimeUnit.MILLISECONDS);
} catch (final Exception e) {
throw new RuntimeException("Invalid value set for property " + NiFiProperties.CONTENT_ARCHIVE_CLEANUP_FREQUENCY);
}
}
for (final Map.Entry<String, Path> containerEntry : containers.entrySet()) { for (final Map.Entry<String, Path> containerEntry : containers.entrySet()) {
final String containerName = containerEntry.getKey(); final String containerName = containerEntry.getKey();
final Path containerPath = containerEntry.getValue(); final Path containerPath = containerEntry.getValue();
@ -1704,4 +1696,29 @@ public class FileSystemRepository implements ContentRepository {
} }
} }
/**
* Will determine the scheduling interval to be used by archive cleanup task
* (in milliseconds). This method will enforce the minimum allowed value of
* 1 second (1000 milliseconds). If attempt is made to set lower value a
* warning will be logged and the method will return minimum value of 1000
*/
private long determineCleanupInterval(NiFiProperties properties) {
long cleanupInterval = MIN_CLEANUP_INTERVAL_MILLIS;
String archiveCleanupFrequency = properties.getProperty(NiFiProperties.CONTENT_ARCHIVE_CLEANUP_FREQUENCY);
if (archiveCleanupFrequency != null) {
try {
cleanupInterval = FormatUtils.getTimeDuration(archiveCleanupFrequency.trim(), TimeUnit.MILLISECONDS);
} catch (Exception e) {
throw new RuntimeException(
"Invalid value set for property " + NiFiProperties.CONTENT_ARCHIVE_CLEANUP_FREQUENCY);
}
if (cleanupInterval < MIN_CLEANUP_INTERVAL_MILLIS) {
LOG.warn("The value of " + NiFiProperties.CONTENT_ARCHIVE_CLEANUP_FREQUENCY + " property is set to '"
+ archiveCleanupFrequency + "' which is "
+ "below the allowed minimum of 1 second (1000 milliseconds). Minimum value of 1 sec will be used as scheduling interval for archive cleanup task.");
cleanupInterval = MIN_CLEANUP_INTERVAL_MILLIS;
}
}
return cleanupInterval;
}
} }

View File

@ -45,6 +45,12 @@ import org.apache.nifi.util.NiFiProperties;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.slf4j.LoggerFactory;
import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.Logger;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.read.ListAppender;
public class TestFileSystemRepository { public class TestFileSystemRepository {
@ -61,7 +67,6 @@ public class TestFileSystemRepository {
if (rootFile.exists()) { if (rootFile.exists()) {
DiskUtils.deleteRecursively(rootFile); DiskUtils.deleteRecursively(rootFile);
} }
repository = new FileSystemRepository(); repository = new FileSystemRepository();
repository.initialize(new StandardResourceClaimManager()); repository.initialize(new StandardResourceClaimManager());
repository.purge(); repository.purge();
@ -72,6 +77,34 @@ public class TestFileSystemRepository {
repository.shutdown(); repository.shutdown();
} }
@Test
public void testMinimalArchiveCleanupIntervalHonoredAndLogged() throws Exception {
Logger root = (Logger) LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME);
ListAppender<ILoggingEvent> testAppender = new ListAppender<>();
testAppender.setName("Test");
testAppender.start();
root.addAppender(testAppender);
NiFiProperties.getInstance().setProperty(NiFiProperties.CONTENT_ARCHIVE_CLEANUP_FREQUENCY, "1 millis");
repository = new FileSystemRepository();
repository.initialize(new StandardResourceClaimManager());
repository.purge();
boolean messageFound = false;
String message = "The value of nifi.content.repository.archive.cleanup.frequency property "
+ "is set to '1 millis' which is below the allowed minimum of 1 second (1000 milliseconds). "
+ "Minimum value of 1 sec will be used as scheduling interval for archive cleanup task.";
for (ILoggingEvent event : testAppender.list) {
String actualMessage = event.getFormattedMessage();
if (actualMessage.equals(message)) {
assertEquals(event.getLevel(), Level.WARN);
messageFound = true;
break;
}
}
assertTrue(messageFound);
}
@Test @Test
public void testBogusFile() throws IOException { public void testBogusFile() throws IOException {
repository.shutdown(); repository.shutdown();