From 0395b903152caef57644662dc4a00bcd490e26d7 Mon Sep 17 00:00:00 2001 From: Oleg Zhurakousky Date: Tue, 26 Jan 2016 16:21:31 -0500 Subject: [PATCH] NIFI-1200 fixed CPU saturation in FileSystemRepository added minimal interval added WARN message if set interval is lower NIFI-1200 fixed unused imports --- .../repository/FileSystemRepository.java | 39 +++++++++++++------ .../repository/TestFileSystemRepository.java | 35 ++++++++++++++++- 2 files changed, 62 insertions(+), 12 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java index c72a19c00a..9fec793442 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java @@ -84,6 +84,7 @@ import org.slf4j.LoggerFactory; public class FileSystemRepository implements ContentRepository { public static final int SECTIONS_PER_CONTAINER = 1024; + public static final long MIN_CLEANUP_INTERVAL_MILLIS = 1000; public static final String ARCHIVE_DIR_NAME = "archive"; public static final Pattern MAX_ARCHIVE_SIZE_PATTERN = Pattern.compile("\\d{1,2}%"); private static final Logger LOG = LoggerFactory.getLogger(FileSystemRepository.class); @@ -226,17 +227,8 @@ public class FileSystemRepository implements ContentRepository { executor.scheduleWithFixedDelay(new ArchiveOrDestroyDestructableClaims(), 1, 1, TimeUnit.SECONDS); } - final String archiveCleanupFrequency = properties.getProperty(NiFiProperties.CONTENT_ARCHIVE_CLEANUP_FREQUENCY); - final long cleanupMillis; - if (archiveCleanupFrequency == null) { - cleanupMillis = 1000L; - } else { - try { - cleanupMillis = FormatUtils.getTimeDuration(archiveCleanupFrequency.trim(), TimeUnit.MILLISECONDS); - } catch (final Exception e) { - throw new RuntimeException("Invalid value set for property " + NiFiProperties.CONTENT_ARCHIVE_CLEANUP_FREQUENCY); - } - } + final long cleanupMillis = this.determineCleanupInterval(properties); + for (final Map.Entry containerEntry : containers.entrySet()) { final String containerName = containerEntry.getKey(); final Path containerPath = containerEntry.getValue(); @@ -1704,4 +1696,29 @@ public class FileSystemRepository implements ContentRepository { } } + /** + * Will determine the scheduling interval to be used by archive cleanup task + * (in milliseconds). This method will enforce the minimum allowed value of + * 1 second (1000 milliseconds). If attempt is made to set lower value a + * warning will be logged and the method will return minimum value of 1000 + */ + private long determineCleanupInterval(NiFiProperties properties) { + long cleanupInterval = MIN_CLEANUP_INTERVAL_MILLIS; + String archiveCleanupFrequency = properties.getProperty(NiFiProperties.CONTENT_ARCHIVE_CLEANUP_FREQUENCY); + if (archiveCleanupFrequency != null) { + try { + cleanupInterval = FormatUtils.getTimeDuration(archiveCleanupFrequency.trim(), TimeUnit.MILLISECONDS); + } catch (Exception e) { + throw new RuntimeException( + "Invalid value set for property " + NiFiProperties.CONTENT_ARCHIVE_CLEANUP_FREQUENCY); + } + if (cleanupInterval < MIN_CLEANUP_INTERVAL_MILLIS) { + LOG.warn("The value of " + NiFiProperties.CONTENT_ARCHIVE_CLEANUP_FREQUENCY + " property is set to '" + + archiveCleanupFrequency + "' which is " + + "below the allowed minimum of 1 second (1000 milliseconds). Minimum value of 1 sec will be used as scheduling interval for archive cleanup task."); + cleanupInterval = MIN_CLEANUP_INTERVAL_MILLIS; + } + } + return cleanupInterval; + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java index 5da67a081a..c40d0e3c0c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java @@ -45,6 +45,12 @@ import org.apache.nifi.util.NiFiProperties; import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.slf4j.LoggerFactory; + +import ch.qos.logback.classic.Level; +import ch.qos.logback.classic.Logger; +import ch.qos.logback.classic.spi.ILoggingEvent; +import ch.qos.logback.core.read.ListAppender; public class TestFileSystemRepository { @@ -61,7 +67,6 @@ public class TestFileSystemRepository { if (rootFile.exists()) { DiskUtils.deleteRecursively(rootFile); } - repository = new FileSystemRepository(); repository.initialize(new StandardResourceClaimManager()); repository.purge(); @@ -72,6 +77,34 @@ public class TestFileSystemRepository { repository.shutdown(); } + @Test + public void testMinimalArchiveCleanupIntervalHonoredAndLogged() throws Exception { + Logger root = (Logger) LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME); + ListAppender testAppender = new ListAppender<>(); + testAppender.setName("Test"); + testAppender.start(); + root.addAppender(testAppender); + NiFiProperties.getInstance().setProperty(NiFiProperties.CONTENT_ARCHIVE_CLEANUP_FREQUENCY, "1 millis"); + repository = new FileSystemRepository(); + repository.initialize(new StandardResourceClaimManager()); + repository.purge(); + + + boolean messageFound = false; + String message = "The value of nifi.content.repository.archive.cleanup.frequency property " + + "is set to '1 millis' which is below the allowed minimum of 1 second (1000 milliseconds). " + + "Minimum value of 1 sec will be used as scheduling interval for archive cleanup task."; + for (ILoggingEvent event : testAppender.list) { + String actualMessage = event.getFormattedMessage(); + if (actualMessage.equals(message)) { + assertEquals(event.getLevel(), Level.WARN); + messageFound = true; + break; + } + } + assertTrue(messageFound); + } + @Test public void testBogusFile() throws IOException { repository.shutdown();