NIFI-5868: Added instrumentation around ListFile such that all disk accesses are timed and any unusually long listing times or disk access operations can be logged. Additionally, information is logged at a debug level including significant amounts of troubleshooting information when configured to do so

This closes #3202.

Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
Mark Payne 2018-12-05 09:37:20 -05:00 committed by Bryan Bende
parent 8ebb4d1974
commit 72ea93a657
No known key found for this signature in database
GPG Key ID: A0DDA9ED50711C39
2 changed files with 1032 additions and 96 deletions

View File

@ -17,10 +17,22 @@
package org.apache.nifi.processors.standard;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.util.list.AbstractListProcessor;
import org.apache.nifi.processor.util.list.ListProcessorTestWatcher;
import org.apache.nifi.processors.standard.util.FileInfo;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.Description;
import java.io.File;
import java.io.FileOutputStream;
@ -28,6 +40,8 @@ import java.io.IOException;
import java.nio.file.FileStore;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.BasicFileAttributes;
import java.nio.file.attribute.FileTime;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
@ -38,27 +52,15 @@ import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.BiPredicate;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.list.AbstractListProcessor;
import org.apache.nifi.processor.util.list.ListProcessorTestWatcher;
import org.apache.nifi.processors.standard.util.FileInfo;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.Description;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
public class TestListFile {
@ -160,13 +162,99 @@ public class TestListFile {
dumpState.dumpState(startedAtMillis);
}
@Test
public void testGetRelationships() throws Exception {
Set<Relationship> relationships = processor.getRelationships();
assertEquals(1, relationships.size());
assertEquals(AbstractListProcessor.REL_SUCCESS, relationships.toArray()[0]);
@Ignore("Intended only for manual testing, as is very expensive to run as a unit test. Performs listing of 1,000,000 files (doesn't actually create the files, though - injects them in) to " +
"ensure performance is not harmed")
public void testPerformanceOnLargeListing() {
final List<Path> paths = new ArrayList<>(1_000_000);
final File base = new File("target");
for (int firstLevel=0; firstLevel < 1000; firstLevel++) {
final File dir = new File(base, String.valueOf(firstLevel));
for (int secondLevel = 0; secondLevel < 1000; secondLevel++) {
final File file = new File(dir, String.valueOf(secondLevel));
paths.add(file.toPath());
}
}
final BasicFileAttributes basicFileAttributes = new BasicFileAttributes() {
@Override
public FileTime lastModifiedTime() {
return FileTime.fromMillis(System.currentTimeMillis());
}
@Override
public FileTime lastAccessTime() {
return FileTime.fromMillis(System.currentTimeMillis());
}
@Override
public FileTime creationTime() {
return FileTime.fromMillis(System.currentTimeMillis());
}
@Override
public boolean isRegularFile() {
return false;
}
@Override
public boolean isDirectory() {
return false;
}
@Override
public boolean isSymbolicLink() {
return false;
}
@Override
public boolean isOther() {
return false;
}
@Override
public long size() {
return 0;
}
@Override
public Object fileKey() {
return null;
}
};
processor = new ListFile() {
@Override
protected Stream<Path> getPathStream(final Path basePath, final int maxDepth, final BiPredicate<Path, BasicFileAttributes> matcher) throws IOException {
return paths.stream()
.filter(path -> matcher.test(path, basicFileAttributes));
}
};
runner = TestRunners.newTestRunner(processor);
runner.setProperty(AbstractListProcessor.TARGET_SYSTEM_TIMESTAMP_PRECISION, AbstractListProcessor.PRECISION_SECONDS.getValue());
runner.setProperty(ListFile.TRACK_PERFORMANCE, "true");
runner.setProperty(ListFile.MAX_TRACKED_FILES, "100000");
runner.setProperty(ListFile.DIRECTORY, "target");
runner.run();
final ListFile.PerformanceTracker tracker = processor.getPerformanceTracker();
assertEquals(100_000, tracker.getTrackedFileCount());
final ListFile.MonitorActiveTasks monitorActiveTasks = new ListFile.MonitorActiveTasks(tracker, runner.getLogger(), 1000, 1000, 1);
while (tracker.getTrackedFileCount() > 0) {
monitorActiveTasks.run();
}
assertEquals(0, tracker.getTrackedFileCount());
}
@Test
public void testGetPath() {
runner.setProperty(ListFile.DIRECTORY, "/dir/test1");
@ -636,14 +724,14 @@ public class TestListFile {
@Test
public void testIsListingResetNecessary() throws Exception {
assertEquals(true, processor.isListingResetNecessary(ListFile.DIRECTORY));
assertEquals(true, processor.isListingResetNecessary(ListFile.RECURSE));
assertEquals(true, processor.isListingResetNecessary(ListFile.FILE_FILTER));
assertEquals(true, processor.isListingResetNecessary(ListFile.PATH_FILTER));
assertEquals(true, processor.isListingResetNecessary(ListFile.MIN_AGE));
assertEquals(true, processor.isListingResetNecessary(ListFile.MAX_AGE));
assertEquals(true, processor.isListingResetNecessary(ListFile.MIN_SIZE));
assertEquals(true, processor.isListingResetNecessary(ListFile.MAX_SIZE));
assertTrue(processor.isListingResetNecessary(ListFile.DIRECTORY));
assertTrue(processor.isListingResetNecessary(ListFile.RECURSE));
assertTrue(processor.isListingResetNecessary(ListFile.FILE_FILTER));
assertTrue(processor.isListingResetNecessary(ListFile.PATH_FILTER));
assertTrue(processor.isListingResetNecessary(ListFile.MIN_AGE));
assertTrue(processor.isListingResetNecessary(ListFile.MAX_AGE));
assertTrue(processor.isListingResetNecessary(ListFile.MIN_SIZE));
assertTrue(processor.isListingResetNecessary(ListFile.MAX_SIZE));
assertEquals(true, processor.isListingResetNecessary(ListFile.IGNORE_HIDDEN_FILES));
assertEquals(false, processor.isListingResetNecessary(new PropertyDescriptor.Builder().name("x").build()));
}