NIFI-8474: Added new Replacement Strategy for variable substitution in ReplaceText.

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #5032.
This commit is contained in:
Peter Turcsanyi 2021-04-26 21:30:17 +02:00 committed by Pierre Villard
parent f9d3bb7f69
commit 525bfe00b3
No known key found for this signature in database
GPG Key ID: F92A93B30C07C6D5
2 changed files with 89 additions and 1 deletions

View File

@ -17,6 +17,7 @@
package org.apache.nifi.processors.standard;
import org.apache.commons.io.IOUtils;
import org.apache.commons.text.StringSubstitutor;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
@ -95,6 +96,7 @@ public class ReplaceText extends AbstractProcessor {
public static final String regexReplaceValue = "Regex Replace";
public static final String literalReplaceValue = "Literal Replace";
public static final String alwaysReplace = "Always Replace";
public static final String SUBSTITUTE_VARIABLES_VALUE = "Substitute Variables";
private static final Pattern unescapedBackReferencePattern = Pattern.compile("[^\\\\]\\$(\\d+)");
private static final String DEFAULT_REGEX = "(?s)(^.*$)";
private static final String DEFAULT_REPLACEMENT_VALUE = "$1";
@ -120,6 +122,9 @@ public class ReplaceText extends AbstractProcessor {
static final AllowableValue ALWAYS_REPLACE = new AllowableValue(alwaysReplace, alwaysReplace,
"Always replaces the entire line or the entire contents of the FlowFile (depending on the value of the <Evaluation Mode> property) and does not bother searching "
+ "for any value. When this strategy is chosen, the <Search Value> property is ignored.");
static final AllowableValue SUBSTITUTE_VARIABLES = new AllowableValue(SUBSTITUTE_VARIABLES_VALUE, SUBSTITUTE_VARIABLES_VALUE,
"Substitute variable references (specified in ${var} form) using FlowFile attributes for looking up the replacement value by variable name. "
+ "When this strategy is chosen, both the <Search Value> and <Replacement Value> properties are ignored.");
public static final PropertyDescriptor SEARCH_VALUE = new PropertyDescriptor.Builder()
.name("Regular Expression")
@ -163,7 +168,7 @@ public class ReplaceText extends AbstractProcessor {
public static final PropertyDescriptor REPLACEMENT_STRATEGY = new PropertyDescriptor.Builder()
.name("Replacement Strategy")
.description("The strategy for how and what to replace within the FlowFile's text content.")
.allowableValues(PREPEND, APPEND, REGEX_REPLACE, LITERAL_REPLACE, ALWAYS_REPLACE)
.allowableValues(PREPEND, APPEND, REGEX_REPLACE, LITERAL_REPLACE, ALWAYS_REPLACE, SUBSTITUTE_VARIABLES)
.defaultValue(REGEX_REPLACE.getValue())
.required(true)
.build();
@ -246,6 +251,7 @@ public class ReplaceText extends AbstractProcessor {
case appendValue:
case prependValue:
case alwaysReplace:
case SUBSTITUTE_VARIABLES_VALUE:
default:
// nothing to check, search value is not used
break;
@ -286,6 +292,9 @@ public class ReplaceText extends AbstractProcessor {
case alwaysReplace:
replacementStrategyExecutor = new AlwaysReplace();
break;
case SUBSTITUTE_VARIABLES_VALUE:
replacementStrategyExecutor = new SubstituteVariablesReplace();
break;
default:
throw new AssertionError();
}
@ -652,6 +661,41 @@ public class ReplaceText extends AbstractProcessor {
}
}
private static class SubstituteVariablesReplace implements ReplacementStrategyExecutor {
@Override
public FlowFile replace(FlowFile flowFile, final ProcessSession session, final ProcessContext context, final String evaluateMode, final Charset charset, final int maxBufferSize) {
final Map<String, String> flowFileAttributes = flowFile.getAttributes();
if (evaluateMode.equalsIgnoreCase(ENTIRE_TEXT)) {
final int flowFileSize = (int) flowFile.getSize();
final int bufferSize = Math.min(maxBufferSize, flowFileSize);
final byte[] buffer = new byte[bufferSize];
flowFile = session.write(flowFile, new StreamCallback() {
@Override
public void process(final InputStream in, final OutputStream out) throws IOException {
StreamUtils.fillBuffer(in, buffer, false);
final String originalContent = new String(buffer, 0, flowFileSize, charset);
final String substitutedContent = StringSubstitutor.replace(originalContent, flowFileAttributes);
out.write(substitutedContent.getBytes(charset));
}
});
} else {
flowFile = session.write(flowFile, new StreamReplaceCallback(charset, maxBufferSize, context.getProperty(LINE_BY_LINE_EVALUATION_MODE).getValue(),
(bw, oneLine) -> {
final String substitutedLine = StringSubstitutor.replace(oneLine, flowFileAttributes);
bw.write(substitutedLine);
}));
}
return flowFile;
}
@Override
public boolean isAllDataBufferedForEntireText() {
return true;
}
}
/**
* If we have a '$' followed by anything other than a number, then escape
* it. E.g., '$d' becomes '\$d' so that it can be used as a literal in a

View File

@ -1641,6 +1641,50 @@ public class TestReplaceText {
out.assertContentEquals("WO$1R$2D");
}
@Test
public void testSubstituteVariablesWithEvaluationModeEntireText() {
testSubstituteVariables("Line1: ${var1}\nLine2: ${var2}", "Line1: foo\nLine2: bar", ReplaceText.ENTIRE_TEXT, createAttributesMap());
}
@Test
public void testSubstituteVariablesWithEvaluationModeLineByLine() {
testSubstituteVariables("Line1: ${var1}\nLine2: ${var2}", "Line1: foo\nLine2: bar", ReplaceText.LINE_BY_LINE, createAttributesMap());
}
@Test
public void testSubstituteVariablesWhenVariableValueMissing() {
testSubstituteVariables("Line1: ${var1}\nLine2: ${var2}", "Line1: ${var1}\nLine2: ${var2}", ReplaceText.ENTIRE_TEXT, Collections.emptyMap());
}
@Test
public void testSubstituteVariablesWhenVariableReferenceEscaped() {
testSubstituteVariables("Line1: $${var1}\nLine2: $${var2}", "Line1: ${var1}\nLine2: ${var2}", ReplaceText.ENTIRE_TEXT, createAttributesMap());
}
@Test
public void testSubstituteVariablesWhenVariableNameEmpty() {
testSubstituteVariables("Line1: ${}\nLine2: ${}", "Line1: ${}\nLine2: ${}", ReplaceText.ENTIRE_TEXT, createAttributesMap());
}
private void testSubstituteVariables(String inputContent, String expectedContent, String evaluationMode, Map<String, String> attributesMap) {
final TestRunner runner = getRunner();
runner.setProperty(ReplaceText.EVALUATION_MODE, evaluationMode);
runner.setProperty(ReplaceText.REPLACEMENT_STRATEGY, ReplaceText.SUBSTITUTE_VARIABLES.getValue());
runner.enqueue(inputContent, attributesMap);
runner.run();
runner.assertAllFlowFilesTransferred(ReplaceText.REL_SUCCESS, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(ReplaceText.REL_SUCCESS).get(0);
out.assertContentEquals(expectedContent);
}
private Map<String, String> createAttributesMap() {
final Map<String, String> attributesMap = new HashMap<>();
attributesMap.put("var1", "foo");
attributesMap.put("var2", "bar");
return attributesMap;
}
/*
* A repeated alternation regex such as (A|B)* can lead to StackOverflowError
* on large input strings.