diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java index fb0cbb8184..7d5eae4d84 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java @@ -17,6 +17,7 @@ package org.apache.nifi.processors.standard; import org.apache.commons.io.IOUtils; +import org.apache.commons.text.StringSubstitutor; import org.apache.nifi.annotation.behavior.EventDriven; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; @@ -95,6 +96,7 @@ public class ReplaceText extends AbstractProcessor { public static final String regexReplaceValue = "Regex Replace"; public static final String literalReplaceValue = "Literal Replace"; public static final String alwaysReplace = "Always Replace"; + public static final String SUBSTITUTE_VARIABLES_VALUE = "Substitute Variables"; private static final Pattern unescapedBackReferencePattern = Pattern.compile("[^\\\\]\\$(\\d+)"); private static final String DEFAULT_REGEX = "(?s)(^.*$)"; private static final String DEFAULT_REPLACEMENT_VALUE = "$1"; @@ -120,6 +122,9 @@ public class ReplaceText extends AbstractProcessor { static final AllowableValue ALWAYS_REPLACE = new AllowableValue(alwaysReplace, alwaysReplace, "Always replaces the entire line or the entire contents of the FlowFile (depending on the value of the property) and does not bother searching " + "for any value. When this strategy is chosen, the property is ignored."); + static final AllowableValue SUBSTITUTE_VARIABLES = new AllowableValue(SUBSTITUTE_VARIABLES_VALUE, SUBSTITUTE_VARIABLES_VALUE, + "Substitute variable references (specified in ${var} form) using FlowFile attributes for looking up the replacement value by variable name. " + + "When this strategy is chosen, both the and properties are ignored."); public static final PropertyDescriptor SEARCH_VALUE = new PropertyDescriptor.Builder() .name("Regular Expression") @@ -163,7 +168,7 @@ public class ReplaceText extends AbstractProcessor { public static final PropertyDescriptor REPLACEMENT_STRATEGY = new PropertyDescriptor.Builder() .name("Replacement Strategy") .description("The strategy for how and what to replace within the FlowFile's text content.") - .allowableValues(PREPEND, APPEND, REGEX_REPLACE, LITERAL_REPLACE, ALWAYS_REPLACE) + .allowableValues(PREPEND, APPEND, REGEX_REPLACE, LITERAL_REPLACE, ALWAYS_REPLACE, SUBSTITUTE_VARIABLES) .defaultValue(REGEX_REPLACE.getValue()) .required(true) .build(); @@ -246,6 +251,7 @@ public class ReplaceText extends AbstractProcessor { case appendValue: case prependValue: case alwaysReplace: + case SUBSTITUTE_VARIABLES_VALUE: default: // nothing to check, search value is not used break; @@ -286,6 +292,9 @@ public class ReplaceText extends AbstractProcessor { case alwaysReplace: replacementStrategyExecutor = new AlwaysReplace(); break; + case SUBSTITUTE_VARIABLES_VALUE: + replacementStrategyExecutor = new SubstituteVariablesReplace(); + break; default: throw new AssertionError(); } @@ -652,6 +661,41 @@ public class ReplaceText extends AbstractProcessor { } } + private static class SubstituteVariablesReplace implements ReplacementStrategyExecutor { + @Override + public FlowFile replace(FlowFile flowFile, final ProcessSession session, final ProcessContext context, final String evaluateMode, final Charset charset, final int maxBufferSize) { + final Map flowFileAttributes = flowFile.getAttributes(); + + if (evaluateMode.equalsIgnoreCase(ENTIRE_TEXT)) { + final int flowFileSize = (int) flowFile.getSize(); + final int bufferSize = Math.min(maxBufferSize, flowFileSize); + final byte[] buffer = new byte[bufferSize]; + + flowFile = session.write(flowFile, new StreamCallback() { + @Override + public void process(final InputStream in, final OutputStream out) throws IOException { + StreamUtils.fillBuffer(in, buffer, false); + final String originalContent = new String(buffer, 0, flowFileSize, charset); + final String substitutedContent = StringSubstitutor.replace(originalContent, flowFileAttributes); + out.write(substitutedContent.getBytes(charset)); + } + }); + } else { + flowFile = session.write(flowFile, new StreamReplaceCallback(charset, maxBufferSize, context.getProperty(LINE_BY_LINE_EVALUATION_MODE).getValue(), + (bw, oneLine) -> { + final String substitutedLine = StringSubstitutor.replace(oneLine, flowFileAttributes); + bw.write(substitutedLine); + })); + } + return flowFile; + } + + @Override + public boolean isAllDataBufferedForEntireText() { + return true; + } + } + /** * If we have a '$' followed by anything other than a number, then escape * it. E.g., '$d' becomes '\$d' so that it can be used as a literal in a diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestReplaceText.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestReplaceText.java index 341b75e911..88ed09747e 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestReplaceText.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestReplaceText.java @@ -1641,6 +1641,50 @@ public class TestReplaceText { out.assertContentEquals("WO$1R$2D"); } + @Test + public void testSubstituteVariablesWithEvaluationModeEntireText() { + testSubstituteVariables("Line1: ${var1}\nLine2: ${var2}", "Line1: foo\nLine2: bar", ReplaceText.ENTIRE_TEXT, createAttributesMap()); + } + + @Test + public void testSubstituteVariablesWithEvaluationModeLineByLine() { + testSubstituteVariables("Line1: ${var1}\nLine2: ${var2}", "Line1: foo\nLine2: bar", ReplaceText.LINE_BY_LINE, createAttributesMap()); + } + + @Test + public void testSubstituteVariablesWhenVariableValueMissing() { + testSubstituteVariables("Line1: ${var1}\nLine2: ${var2}", "Line1: ${var1}\nLine2: ${var2}", ReplaceText.ENTIRE_TEXT, Collections.emptyMap()); + } + + @Test + public void testSubstituteVariablesWhenVariableReferenceEscaped() { + testSubstituteVariables("Line1: $${var1}\nLine2: $${var2}", "Line1: ${var1}\nLine2: ${var2}", ReplaceText.ENTIRE_TEXT, createAttributesMap()); + } + + @Test + public void testSubstituteVariablesWhenVariableNameEmpty() { + testSubstituteVariables("Line1: ${}\nLine2: ${}", "Line1: ${}\nLine2: ${}", ReplaceText.ENTIRE_TEXT, createAttributesMap()); + } + + private void testSubstituteVariables(String inputContent, String expectedContent, String evaluationMode, Map attributesMap) { + final TestRunner runner = getRunner(); + runner.setProperty(ReplaceText.EVALUATION_MODE, evaluationMode); + runner.setProperty(ReplaceText.REPLACEMENT_STRATEGY, ReplaceText.SUBSTITUTE_VARIABLES.getValue()); + runner.enqueue(inputContent, attributesMap); + runner.run(); + + runner.assertAllFlowFilesTransferred(ReplaceText.REL_SUCCESS, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(ReplaceText.REL_SUCCESS).get(0); + out.assertContentEquals(expectedContent); + } + + private Map createAttributesMap() { + final Map attributesMap = new HashMap<>(); + attributesMap.put("var1", "foo"); + attributesMap.put("var2", "bar"); + return attributesMap; + } + /* * A repeated alternation regex such as (A|B)* can lead to StackOverflowError * on large input strings.