From 45df23b1e09f340fb2c7f3c3234462570ddb5f6b Mon Sep 17 00:00:00 2001
From: Mark Payne
Date: Tue, 14 Nov 2017 13:27:35 -0500
Subject: [PATCH] NIFI-4607, NIFI-3975, NIFI-4602, NIFI-4606: This closes
#2272. Fixed bug in TailFile that caused new Primary Node to not pull current
Clustered State when File Location was set to Remote. Fixed bug that caused
TailFile to occasionally become 'stuck' when the file it is tailing is
renamed and a new file is created with the same name. Removed the 'Rolling
Strategy' property from TailFile because it is not actually used in the
processor anymore. Deleted MonitorMemoryTest because the unit test was
testing the behavior of FlowController more than the behavior of the
reporting task itself and in order to do so had a dependency in the pom.xml
on nifi-framework-core, which means that it no longer compiles when
FlowController is modified.
Signed-off-by: joewitt
---
.../nifi/processors/standard/TailFile.java | 205 ++++++++++--------
.../additionalDetails.html | 23 +-
.../processors/standard/TestTailFile.java | 38 +++-
.../nifi-standard-reporting-tasks/pom.xml | 6 -
.../nifi/controller/MonitorMemoryTest.java | 167 --------------
5 files changed, 160 insertions(+), 279 deletions(-)
delete mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-reporting-tasks/src/test/java/org/apache/nifi/controller/MonitorMemoryTest.java
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
index f5d340961f..2234265f8b 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
@@ -16,39 +16,9 @@
*/
package org.apache.nifi.processors.standard;
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.nifi.annotation.behavior.InputRequirement;
-import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
-import org.apache.nifi.annotation.behavior.Restricted;
-import org.apache.nifi.annotation.behavior.Stateful;
-import org.apache.nifi.annotation.behavior.TriggerSerially;
-import org.apache.nifi.annotation.behavior.WritesAttribute;
-import org.apache.nifi.annotation.behavior.WritesAttributes;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.annotation.lifecycle.OnStopped;
-import org.apache.nifi.components.AllowableValue;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.ValidationContext;
-import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.components.state.Scope;
-import org.apache.nifi.components.state.StateMap;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.processor.AbstractProcessor;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.io.OutputStreamCallback;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.stream.io.NullOutputStream;
-import org.apache.nifi.stream.io.StreamUtils;
-
import java.io.BufferedOutputStream;
import java.io.ByteArrayOutputStream;
+import java.io.EOFException;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
@@ -77,6 +47,37 @@ import java.util.zip.CRC32;
import java.util.zip.CheckedInputStream;
import java.util.zip.Checksum;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.Restricted;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.stream.io.NullOutputStream;
+import org.apache.nifi.stream.io.StreamUtils;
+
// note: it is important that this Processor is not marked as @SupportsBatching because the session commits must complete before persisting state locally; otherwise, data loss may occur
@TriggerSerially
@InputRequirement(Requirement.INPUT_FORBIDDEN)
@@ -89,9 +90,7 @@ import java.util.zip.Checksum;
+ "ingesting files that have been compressed when 'rolled over'.")
@Stateful(scopes = {Scope.LOCAL, Scope.CLUSTER}, description = "Stores state about where in the Tailed File it left off so that on restart it does not have to duplicate data. "
+ "State is stored either local or clustered depend on the property.")
-@WritesAttributes({
- @WritesAttribute(attribute = "tailfile.original.path", description = "Path of the original file the flow file comes from.")
- })
+@WritesAttribute(attribute = "tailfile.original.path", description = "Path of the original file the flow file comes from.")
@Restricted("Provides operator the ability to read from any file that NiFi has access to.")
public class TailFile extends AbstractProcessor {
@@ -195,15 +194,6 @@ public class TailFile extends AbstractProcessor {
.required(true)
.build();
- static final PropertyDescriptor ROLLING_STRATEGY = new PropertyDescriptor.Builder()
- .name("tailfile-rolling-strategy")
- .displayName("Rolling Strategy")
- .description("Specifies if the files to tail have a fixed name or not.")
- .required(true)
- .allowableValues(FIXED_NAME, CHANGING_NAME)
- .defaultValue(FIXED_NAME.getValue())
- .build();
-
static final PropertyDescriptor LOOKUP_FREQUENCY = new PropertyDescriptor.Builder()
.name("tailfile-lookup-frequency")
.displayName("Lookup frequency")
@@ -234,6 +224,7 @@ public class TailFile extends AbstractProcessor {
private volatile Map states = new HashMap();
private volatile AtomicLong lastLookup = new AtomicLong(0L);
private volatile AtomicBoolean isMultiChanging = new AtomicBoolean(false);
+ private volatile boolean requireStateLookup = true;
@Override
protected List getSupportedPropertyDescriptors() {
@@ -245,7 +236,6 @@ public class TailFile extends AbstractProcessor {
properties.add(START_POSITION);
properties.add(STATE_LOCATION);
properties.add(RECURSIVE);
- properties.add(ROLLING_STRATEGY);
properties.add(LOOKUP_FREQUENCY);
properties.add(MAXIMUM_AGE);
return properties;
@@ -267,46 +257,31 @@ public class TailFile extends AbstractProcessor {
protected Collection customValidate(ValidationContext context) {
final List results = new ArrayList<>(super.customValidate(context));
- if(context.getProperty(MODE).getValue().equals(MODE_MULTIFILE.getValue())) {
+ if (context.getProperty(MODE).getValue().equals(MODE_MULTIFILE.getValue())) {
String path = context.getProperty(BASE_DIRECTORY).evaluateAttributeExpressions().getValue();
- if(path == null) {
- results.add(new ValidationResult.Builder().subject(BASE_DIRECTORY.getName()).valid(false)
- .explanation("Base directory property cannot be empty in Multifile mode.").build());
+ if (path == null) {
+ results.add(new ValidationResult.Builder()
+ .subject(BASE_DIRECTORY.getName())
+ .valid(false)
+ .explanation("Base directory property cannot be empty in Multifile mode.")
+ .build());
} else if (!new File(path).isDirectory()) {
- results.add(new ValidationResult.Builder().subject(BASE_DIRECTORY.getName()).valid(false)
- .explanation(path + " is not a directory.").build());
- }
-
- if(context.getProperty(ROLLING_STRATEGY).getValue().equals(CHANGING_NAME.getValue())) {
- String freq = context.getProperty(LOOKUP_FREQUENCY).getValue();
- if(freq == null) {
- results.add(new ValidationResult.Builder().subject(LOOKUP_FREQUENCY.getName()).valid(false)
- .explanation("In Multiple files mode and Changing name rolling strategy, lookup frequency "
- + "property must be specified.").build());
- }
- String maxAge = context.getProperty(MAXIMUM_AGE).getValue();
- if(maxAge == null) {
- results.add(new ValidationResult.Builder().subject(MAXIMUM_AGE.getName()).valid(false)
- .explanation("In Multiple files mode and Changing name rolling strategy, maximum age "
- + "property must be specified.").build());
- }
- } else {
- long max = context.getProperty(MAXIMUM_AGE).getValue() == null ? Long.MAX_VALUE : context.getProperty(MAXIMUM_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
- List filesToTail = getFilesToTail(context.getProperty(BASE_DIRECTORY).evaluateAttributeExpressions().getValue(),
- context.getProperty(FILENAME).evaluateAttributeExpressions().getValue(),
- context.getProperty(RECURSIVE).asBoolean(),
- max);
-
- if(filesToTail.isEmpty()) {
- results.add(new ValidationResult.Builder().subject(FILENAME.getName()).valid(false)
- .explanation("There is no file to tail. Files must exist when starting this processor.").build());
- }
+ results.add(new ValidationResult.Builder()
+ .subject(BASE_DIRECTORY.getName())
+ .valid(false)
+ .explanation(path + " is not a directory.")
+ .build());
}
}
return results;
}
+ @OnPrimaryNodeStateChange
+ public void onPrimaryNodeChange() {
+ this.requireStateLookup = true;
+ }
+
@OnScheduled
public void recoverState(final ProcessContext context) throws IOException {
// set isMultiChanging
@@ -429,10 +404,10 @@ public class TailFile extends AbstractProcessor {
* @return List of files to tail
*/
private List getFilesToTail(final String baseDir, String fileRegex, boolean isRecursive, long maxAge) {
- Collection files = FileUtils.listFiles(new File(baseDir), null, isRecursive);
- List result = new ArrayList();
+ final Collection files = FileUtils.listFiles(new File(baseDir), null, isRecursive);
+ final List result = new ArrayList();
- String baseDirNoTrailingSeparator = baseDir.endsWith(File.separator) ? baseDir.substring(0, baseDir.length() -1) : baseDir;
+ final String baseDirNoTrailingSeparator = baseDir.endsWith(File.separator) ? baseDir.substring(0, baseDir.length() - 1) : baseDir;
final String fullRegex;
if (File.separator.equals("/")) {
// handle unix-style paths
@@ -441,13 +416,13 @@ public class TailFile extends AbstractProcessor {
// handle windows-style paths, need to quote backslash characters
fullRegex = baseDirNoTrailingSeparator + Pattern.quote(File.separator) + fileRegex;
}
- Pattern p = Pattern.compile(fullRegex);
+ final Pattern p = Pattern.compile(fullRegex);
- for(File file : files) {
- String path = file.getPath();
- if(p.matcher(path).matches()) {
- if(isMultiChanging.get()) {
- if((new Date().getTime() - file.lastModified()) < maxAge) {
+ for (File file : files) {
+ final String path = file.getPath();
+ if (p.matcher(path).matches()) {
+ if (isMultiChanging.get()) {
+ if ((new Date().getTime() - file.lastModified()) < maxAge) {
result.add(path);
}
} else {
@@ -515,7 +490,14 @@ public class TailFile extends AbstractProcessor {
if (existingTailFile.length() >= position) {
try (final InputStream tailFileIs = new FileInputStream(existingTailFile);
final CheckedInputStream in = new CheckedInputStream(tailFileIs, checksum)) {
- StreamUtils.copy(in, new NullOutputStream(), states.get(filePath).getState().getPosition());
+
+ try {
+ StreamUtils.copy(in, new NullOutputStream(), states.get(filePath).getState().getPosition());
+ } catch (final EOFException eof) {
+ // If we hit EOFException, then the file is smaller than we expected. Assume rollover.
+ getLogger().debug("When recovering state, file being tailed has less data than was stored in the state. "
+ + "Assuming rollover. Will begin tailing current file from beginning.");
+ }
final long checksumResult = in.getChecksum().getValue();
if (checksumResult == states.get(filePath).getExpectedRecoveryChecksum()) {
@@ -589,16 +571,30 @@ public class TailFile extends AbstractProcessor {
try {
recoverState(context);
} catch (IOException e) {
- getLogger().error("Exception raised while looking up for new files", e);
+ getLogger().error("Exception raised while attempting to recover state about where the tailing last left off", e);
context.yield();
return;
}
}
}
- if(states.isEmpty()) {
+
+ if (requireStateLookup) {
+ try {
+ recoverState(context);
+ } catch (IOException e) {
+ getLogger().error("Exception raised while attempting to recover state about where the tailing last left off", e);
+ context.yield();
+ return;
+ }
+
+ requireStateLookup = false;
+ }
+
+ if (states.isEmpty()) {
context.yield();
return;
}
+
for (String tailFile : states.keySet()) {
processTailFile(context, session, tailFile);
}
@@ -688,9 +684,38 @@ public class TailFile extends AbstractProcessor {
final long startNanos = System.nanoTime();
// Check if file has rotated
- if (rolloverOccurred
- || (timestamp <= file.lastModified() && length > file.length())) {
+ // We determine that the file has rotated if any of the following conditions are met:
+ // 1. 'rolloverOccured' == true, which indicates that we have found a new file matching the rollover pattern.
+ // 2. The file was modified after the timestamp in our state, AND the file is smaller than we expected. This satisfies
+ // the case where we are tailing File A, and that file is then renamed (say to B) and a new file named A is created
+ // and is written to. In such a case, File A may have a file size smaller than we have in our state, so we know that
+ // it rolled over.
+ // 3. The File Channel that we have indicates that the size of the file is different than file.length() indicates, AND
+ // the File Channel also indicates that we have read all data in the file. This case may also occur in the same scenario
+ // as #2, above. In this case, the File Channel is pointing to File A, but the 'file' object is pointing to File B. They
+ // both have the same name but are different files. As a result, once we have consumed all data from the File Channel,
+ // we want to roll over and consume data from the new file.
+ boolean rotated = rolloverOccurred;
+ if (!rotated) {
+ final long fileLength = file.length();
+ if (length > fileLength) {
+ rotated = true;
+ } else {
+ try {
+ final long readerSize = reader.size();
+ final long readerPosition = reader.position();
+ if (readerSize == readerPosition && readerSize != fileLength) {
+ rotated = true;
+ }
+ } catch (final IOException e) {
+ getLogger().warn("Failed to determined the size or position of the File Channel when "
+ + "determining if the file has rolled over. Will assume that the file being tailed has not rolled over", e);
+ }
+ }
+ }
+
+ if (rotated) {
// Since file has rotated, we close the reader, create a new one, and then reset our state.
try {
reader.close();
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.TailFile/additionalDetails.html b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.TailFile/additionalDetails.html
index 87767a5d7f..75fb867798 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.TailFile/additionalDetails.html
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.TailFile/additionalDetails.html
@@ -105,28 +105,23 @@
and new log messages are always appended in log file of the current day.
- If the processor is configured with 'Multiple files' mode and 'Fixed name' rolling strategy, the processor will list the
- files to tail in the 'Base directory' (recursively or not) and matching the regular expression. This listing will only occur once when
- the processor is started. In this configuration, the processor will act as in 'Single file' mode for all files listed by the processor
- when started.
-
-
- If the processor is configured with 'Multiple files' mode and 'Changing name' rolling strategy, two new properties are
- mandatory:
+ If the processor is configured with 'Multiple files' mode, two additional properties are relevant:
- Lookup frequency: specifies the minimum duration the processor will wait before listing again the files to tail.
- Maximum age: specifies the necessary minimum duration to consider that no new messages will be appended in a file
- regarding its last modification date.
+ regarding its last modification date. If the amount of time that has elapsed since the file was modified is larger than this
+ period of time, the file will not be tailed. For example, if a file was modified 24 hours ago and this property is set to 12 hours,
+ the file will not be tailed. But if this property is set to 36 hours, then the file will continue to be tailed.
- It is necessary to pay attention to 'Lookup frequency' and 'Maximum age' properties as well as the frequency at which the processor is
- triggered in order to keep good performances. It is recommended to keep 'Maximum age' > 'Lookup frequency' > processor scheduling
- frequency to avoid loosing messages. It also recommended not to set 'Maximum Age' too low because if messages are appended in a file
- after this file has been considered "too old", all the messages in the file may be read again leading to data duplication.
+ It is necessary to pay attention to 'Lookup frequency' and 'Maximum age' properties, as well as the frequency at which the processor is
+ triggered, in order to achieve high performance. It is recommended to keep 'Maximum age' > 'Lookup frequency' > processor scheduling
+ frequency to avoid missing data. It also recommended not to set 'Maximum Age' too low because if messages are appended in a file
+ after this file has been considered "too old", all the messages in the file may be read again, leading to data duplication.
- Besides, if the processor is configured with 'Multiple files' mode and 'Changing name' rolling strategy, the 'Rolling
+ If the processor is configured with 'Multiple files' mode, the 'Rolling
filename pattern' property must be specific enough to ensure that only the rolling files will be listed and not other currently tailed
files in the same directory (this can be achieved using ${filename} tag).
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
index 485b8e3d14..4c82703083 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
@@ -319,6 +319,42 @@ public class TestTailFile {
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(1).assertContentEquals("1\n");
}
+ @Test
+ public void testRolloverWriteMoreDataThanPrevious() throws IOException, InterruptedException {
+ // If we have read all data in a file, and that file does not end with a new-line, then the last line
+ // in the file will have been read, added to the checksum, and then we would re-seek to "unread" that
+ // last line since it didn't have a new-line. We need to ensure that if the data is then rolled over
+ // that our checksum does not take into account those bytes that have been "unread."
+
+ // this mimics the case when we are reading a log file that rolls over while processor is running.
+ runner.setProperty(TailFile.ROLLING_FILENAME_PATTERN, "log.*");
+ runner.run(1, false, true);
+ runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0);
+
+ raf.write("hello\n".getBytes());
+ runner.run(1, true, false);
+ runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
+ runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("hello\n");
+ runner.clearTransferState();
+
+ raf.write("world".getBytes());
+
+ runner.run(1);
+ runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0); // should not pull in data because no \n
+
+ raf.close();
+ file.renameTo(new File("target/log.1"));
+
+ raf = new RandomAccessFile(new File("target/log.txt"), "rw");
+ raf.write("longer than hello\n".getBytes());
+ runner.run(1);
+
+ runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 2);
+ runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("world");
+ runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(1).assertContentEquals("longer than hello\n");
+ }
+
+
@Test
public void testMultipleRolloversAfterHavingReadAllData() throws IOException, InterruptedException {
// this mimics the case when we are reading a log file that rolls over while processor is running.
@@ -670,7 +706,6 @@ public class TestTailFile {
runner.setProperty(TailFile.LOOKUP_FREQUENCY, "1 sec");
runner.setProperty(TailFile.FILENAME, "log_[0-9]*\\.txt");
runner.setProperty(TailFile.RECURSIVE, "false");
- runner.setProperty(TailFile.ROLLING_STRATEGY, TailFile.FIXED_NAME);
initializeFile("target/log_1.txt", "firstLine\n");
@@ -795,7 +830,6 @@ public class TestTailFile {
public void testMultipleFilesChangingNameStrategy() throws IOException, InterruptedException {
runner.setProperty(TailFile.START_POSITION, TailFile.START_CURRENT_FILE);
runner.setProperty(TailFile.MODE, TailFile.MODE_MULTIFILE);
- runner.setProperty(TailFile.ROLLING_STRATEGY, TailFile.CHANGING_NAME);
runner.setProperty(TailFile.BASE_DIRECTORY, "target");
runner.setProperty(TailFile.FILENAME, ".*app-.*.log");
runner.setProperty(TailFile.LOOKUP_FREQUENCY, "2s");
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-reporting-tasks/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-reporting-tasks/pom.xml
index b408910f66..722eef2973 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-reporting-tasks/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-reporting-tasks/pom.xml
@@ -52,12 +52,6 @@
mockito-all
test
-
- org.apache.nifi
- nifi-framework-core
- ${project.version}
- test
-
org.apache.nifi
nifi-nar-utils
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-reporting-tasks/src/test/java/org/apache/nifi/controller/MonitorMemoryTest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-reporting-tasks/src/test/java/org/apache/nifi/controller/MonitorMemoryTest.java
deleted file mode 100644
index 815b855829..0000000000
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-reporting-tasks/src/test/java/org/apache/nifi/controller/MonitorMemoryTest.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.controller;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.nifi.admin.service.AuditService;
-import org.apache.nifi.authorization.Authorizer;
-import org.apache.nifi.bundle.Bundle;
-import org.apache.nifi.controller.repository.FlowFileEventRepository;
-import org.apache.nifi.nar.ExtensionManager;
-import org.apache.nifi.nar.SystemBundle;
-import org.apache.nifi.provenance.MockProvenanceRepository;
-import org.apache.nifi.util.CapturingLogger;
-import org.apache.nifi.util.NiFiProperties;
-import org.apache.nifi.util.Tuple;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.slf4j.Logger;
-
-import java.io.File;
-import java.lang.reflect.Field;
-import java.lang.reflect.Modifier;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-
-public class MonitorMemoryTest {
-
- private FlowController fc;
- private Bundle systemBundle;
-
- @Before
- public void before() throws Exception {
- System.setProperty("nifi.properties.file.path", "src/test/resources/nifi.properties");
- final Map addProps = new HashMap<>();
- addProps.put(NiFiProperties.ADMINISTRATIVE_YIELD_DURATION, "1 sec");
- addProps.put(NiFiProperties.STATE_MANAGEMENT_CONFIG_FILE, "target/test-classes/state-management.xml");
- addProps.put(NiFiProperties.STATE_MANAGEMENT_LOCAL_PROVIDER_ID, "local-provider");
-
- final Tuple tuple = this.buildFlowControllerForTest(addProps);
- fc = tuple.getKey();
- systemBundle = tuple.getValue();
- }
-
- @After
- public void after() throws Exception {
- fc.shutdown(true);
- FileUtils.deleteDirectory(new File("./target/test-repo"));
- FileUtils.deleteDirectory(new File("./target/content_repository"));
- }
-
- @Test(expected = IllegalStateException.class)
- public void validatevalidationKicksInOnWrongPoolNames() throws Exception {
- ReportingTaskNode reportingTask = fc.createReportingTask(MonitorMemory.class.getName(), systemBundle.getBundleDetails().getCoordinate());
-
- Map props = new HashMap<>();
- props.put(MonitorMemory.MEMORY_POOL_PROPERTY.getName(), "foo");
- reportingTask.setProperties(props);
- ProcessScheduler ps = fc.getProcessScheduler();
- ps.schedule(reportingTask);
- }
-
- @Test
- @Ignore // temporarily ignoring it since it fails intermittently due to
- // unpredictability during full build
- // still keeping it for local testing
- public void validateWarnWhenPercentThresholdReached() throws Exception {
- this.doValidate("10%");
- }
-
- /*
- * We're ignoring this tests as it is practically impossible to run it
- * reliably together with automated Maven build since we can't control the
- * state of the JVM on each machine during the build. However, you can run
- * it selectively for further validation
- */
- @Test
- @Ignore
- public void validateWarnWhenSizeThresholdReached() throws Exception {
- this.doValidate("10 MB");
- }
-
- public void doValidate(String threshold) throws Exception {
- CapturingLogger capturingLogger = this.wrapAndReturnCapturingLogger();
- ReportingTaskNode reportingTask = fc.createReportingTask(MonitorMemory.class.getName(), systemBundle.getBundleDetails().getCoordinate());
- reportingTask.setSchedulingPeriod("1 sec");
-
- Map props = new HashMap<>();
- props.put(MonitorMemory.MEMORY_POOL_PROPERTY.getName(), "PS Old Gen");
- props.put(MonitorMemory.REPORTING_INTERVAL.getName(), "100 millis");
- props.put(MonitorMemory.THRESHOLD_PROPERTY.getName(), threshold);
- reportingTask.setProperties(props);
-
- ProcessScheduler ps = fc.getProcessScheduler();
- ps.schedule(reportingTask);
-
- Thread.sleep(2000);
- // ensure no memory warning were issued
- assertTrue(capturingLogger.getWarnMessages().size() == 0);
-
- // throw something on the heap
- @SuppressWarnings("unused")
- byte[] b = new byte[Integer.MAX_VALUE / 3];
- Thread.sleep(200);
- assertTrue(capturingLogger.getWarnMessages().size() > 0);
- assertTrue(capturingLogger.getWarnMessages().get(0).getMsg()
- .startsWith("Memory Pool 'PS Old Gen' has exceeded the configured Threshold of " + threshold));
-
- // now try to clear the heap and see memory being reclaimed
- b = null;
- System.gc();
- Thread.sleep(1000);
- assertTrue(capturingLogger.getInfoMessages().get(0).getMsg().startsWith(
- "Memory Pool 'PS Old Gen' is no longer exceeding the configured Threshold of " + threshold));
- }
-
- private CapturingLogger wrapAndReturnCapturingLogger() throws Exception {
- Field loggerField = MonitorMemory.class.getDeclaredField("logger");
- Field modifiersField = Field.class.getDeclaredField("modifiers");
- modifiersField.setAccessible(true);
- modifiersField.setInt(loggerField, loggerField.getModifiers() & ~Modifier.FINAL);
-
- loggerField.setAccessible(true);
- CapturingLogger capturingLogger = new CapturingLogger((Logger) loggerField.get(null));
- loggerField.set(null, capturingLogger);
- return capturingLogger;
- }
-
- private Tuple buildFlowControllerForTest(final Map addProps) throws Exception {
- addProps.put(NiFiProperties.PROVENANCE_REPO_IMPLEMENTATION_CLASS, MockProvenanceRepository.class.getName());
- addProps.put("nifi.remote.input.socket.port", "");
- addProps.put("nifi.remote.input.secure", "");
- final NiFiProperties nifiProperties = NiFiProperties.createBasicNiFiProperties(null, addProps);
-
- // build the system bundle
- final Bundle bundle = SystemBundle.create(nifiProperties);
- ExtensionManager.discoverExtensions(bundle, Collections.emptySet());
-
- return new Tuple<>(FlowController.createStandaloneInstance(
- mock(FlowFileEventRepository.class),
- nifiProperties,
- mock(Authorizer.class),
- mock(AuditService.class),
- null,
- null,
- null), bundle);
- }
-}