NIFI-5979 Add Line-by-Line Evaluation Mode to ReplaceText

Refactored to use functions to better handle strategy specific variables
via closure.
This commit is contained in:
Koji Kawamura 2019-04-08 11:35:30 +09:00
parent d222f14a9e
commit 1c588f10b2
1 changed files with 114 additions and 137 deletions

View File

@ -53,7 +53,6 @@ import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.stream.io.util.LineDemarcator; import org.apache.nifi.stream.io.util.LineDemarcator;
import org.apache.nifi.util.StopWatch; import org.apache.nifi.util.StopWatch;
import javax.annotation.Nullable;
import java.io.BufferedWriter; import java.io.BufferedWriter;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
@ -386,33 +385,30 @@ public class ReplaceText extends AbstractProcessor {
} }
}); });
} else { } else {
flowFile = session.write(flowFile, new StreamReplaceCallback(this, charset, maxBufferSize, context, flowFile, null)); flowFile = session.write(flowFile, new StreamReplaceCallback(charset, maxBufferSize, context.getProperty(LINE_BY_LINE_EVALUATION_MODE).getValue(),
((bw, oneLine) -> {
// We need to determine what line ending was used and use that after our replacement value.
lineEndingBuilder.setLength(0);
for (int i = oneLine.length() - 1; i >= 0; i--) {
final char c = oneLine.charAt(i);
if (c == '\r' || c == '\n') {
lineEndingBuilder.append(c);
} else {
break;
}
}
bw.write(replacementValue);
// Preserve original line endings. Reverse string because we iterated over original line ending in reverse order, appending to builder.
// So if builder has multiple characters, they are now reversed from the original string's ordering.
bw.write(lineEndingBuilder.reverse().toString());
})));
} }
return flowFile; return flowFile;
} }
public void replaceInLine(BufferedWriter bw, String oneLine, @Nullable Matcher matcher, @Nullable Pattern searchPattern, ProcessContext context, FlowFile flowFile) throws IOException {
final String replacementValue = context.getProperty(REPLACEMENT_VALUE).evaluateAttributeExpressions(flowFile).getValue();
final StringBuilder lineEndingBuilder = new StringBuilder(2);
// We need to determine what line ending was used and use that after our replacement value.
lineEndingBuilder.setLength(0);
for (int i = oneLine.length() - 1; i >= 0; i--) {
final char c = oneLine.charAt(i);
if (c == '\r' || c == '\n') {
lineEndingBuilder.append(c);
} else {
break;
}
}
bw.write(replacementValue);
// Preserve original line endings. Reverse string because we iterated over original line ending in reverse order, appending to builder.
// So if builder has multiple characters, they are now reversed from the original string's ordering.
bw.write(lineEndingBuilder.reverse().toString());
}
@Override @Override
public boolean isAllDataBufferedForEntireText() { public boolean isAllDataBufferedForEntireText() {
return false; return false;
@ -433,7 +429,8 @@ public class ReplaceText extends AbstractProcessor {
} }
}); });
} else { } else {
flowFile = session.write(flowFile, new StreamReplaceCallback(this, charset, maxBufferSize, context, flowFile, null)); flowFile = session.write(flowFile, new StreamReplaceCallback(charset, maxBufferSize, context.getProperty(LINE_BY_LINE_EVALUATION_MODE).getValue(),
(bw, oneLine) -> bw.write(replacementValue.concat(oneLine))));
} }
return flowFile; return flowFile;
} }
@ -443,11 +440,6 @@ public class ReplaceText extends AbstractProcessor {
return false; return false;
} }
@Override
public void replaceInLine(BufferedWriter bw, String oneLine, @Nullable Matcher matcher, @Nullable Pattern searchPattern, ProcessContext context, FlowFile flowFile) throws IOException {
final String replacementValue = context.getProperty(REPLACEMENT_VALUE).evaluateAttributeExpressions(flowFile).getValue();
bw.write(replacementValue.concat(oneLine));
}
} }
private class AppendReplace implements ReplacementStrategyExecutor { private class AppendReplace implements ReplacementStrategyExecutor {
@ -465,38 +457,36 @@ public class ReplaceText extends AbstractProcessor {
} }
}); });
} else { } else {
flowFile = session.write(flowFile, new StreamReplaceCallback(this, charset, maxBufferSize, context, flowFile, null)); flowFile = session.write(flowFile, new StreamReplaceCallback(charset, maxBufferSize, context.getProperty(LINE_BY_LINE_EVALUATION_MODE).getValue(),
(bw, oneLine) -> {
// we need to find the first carriage return or new-line so that we can append the new value
// before the line separate. However, we don't want to do this using a regular expression due
// to performance concerns. So we will find the first occurrence of either \r or \n and use
// that to insert the replacement value.
boolean foundNewLine = false;
for (int i = 0; i < oneLine.length(); i++) {
final char c = oneLine.charAt(i);
if (foundNewLine) {
bw.write(c);
continue;
}
if (c == '\r' || c == '\n') {
bw.write(replacementValue);
foundNewLine = true;
}
bw.write(c);
}
if (!foundNewLine) {
bw.write(replacementValue);
}
}));
} }
return flowFile; return flowFile;
} }
public void replaceInLine(BufferedWriter bw, String oneLine, @Nullable Matcher matcher, @Nullable Pattern searchPattern, ProcessContext context, FlowFile flowFile) throws IOException {
String replacementValue = context.getProperty(REPLACEMENT_VALUE).evaluateAttributeExpressions(flowFile).getValue();
// we need to find the first carriage return or new-line so that we can append the new value
// before the line separate. However, we don't want to do this using a regular expression due
// to performance concerns. So we will find the first occurrence of either \r or \n and use
// that to insert the replacement value.
boolean foundNewLine = false;
for (int i = 0; i < oneLine.length(); i++) {
final char c = oneLine.charAt(i);
if (foundNewLine) {
bw.write(c);
continue;
}
if (c == '\r' || c == '\n') {
bw.write(replacementValue);
foundNewLine = true;
}
bw.write(c);
}
if (!foundNewLine) {
bw.write(replacementValue);
}
}
@Override @Override
public boolean isAllDataBufferedForEntireText() { public boolean isAllDataBufferedForEntireText() {
return false; return false;
@ -580,48 +570,44 @@ public class ReplaceText extends AbstractProcessor {
} }
} else { } else {
updatedFlowFile = session.write(flowFile, new StreamReplaceCallback(this, charset, maxBufferSize, context, flowFile, searchPattern)); final Matcher matcher = searchPattern.matcher("");
updatedFlowFile = session.write(flowFile, new StreamReplaceCallback(charset, maxBufferSize, context.getProperty(LINE_BY_LINE_EVALUATION_MODE).getValue(),
(bw, oneLine) -> {
additionalAttrs.clear();
matcher.reset(oneLine);
int matches = 0;
StringBuffer sb = new StringBuffer();
while (matcher.find()) {
matches++;
for (int i=0; i <= matcher.groupCount(); i++) {
additionalAttrs.put("$" + i, matcher.group(i));
}
String replacement = context.getProperty(REPLACEMENT_VALUE).evaluateAttributeExpressions(flowFile, additionalAttrs, escapeBackRefDecorator).getValue();
replacement = escapeLiteralBackReferences(replacement, numCapturingGroups);
String replacementFinal = normalizeReplacementString(replacement);
matcher.appendReplacement(sb, replacementFinal);
}
if (matches > 0) {
matcher.appendTail(sb);
final String updatedValue = sb.toString();
bw.write(updatedValue);
} else {
// No match. Just write out the line as it was.
bw.write(oneLine);
}
}));
} }
return updatedFlowFile; return updatedFlowFile;
} }
public void replaceInLine(BufferedWriter bw, String oneLine, @Nullable Matcher matcher, Pattern searchPattern, ProcessContext context, FlowFile flowFile) throws IOException {
additionalAttrs.clear();
if (matcher == null) {
matcher = searchPattern.matcher(oneLine);
} else {
matcher.reset(oneLine);
}
int matches = 0;
StringBuffer sb = new StringBuffer();
while (matcher.find()) {
matches++;
for (int i=0; i <= matcher.groupCount(); i++) {
additionalAttrs.put("$" + i, matcher.group(i));
}
String replacement = context.getProperty(REPLACEMENT_VALUE).evaluateAttributeExpressions(flowFile, additionalAttrs, escapeBackRefDecorator).getValue();
replacement = escapeLiteralBackReferences(replacement, numCapturingGroups);
String replacementFinal = normalizeReplacementString(replacement);
matcher.appendReplacement(sb, replacementFinal);
}
if (matches > 0) {
matcher.appendTail(sb);
final String updatedValue = sb.toString();
bw.write(updatedValue);
} else {
// No match. Just write out the line as it was.
bw.write(oneLine);
}
}
@Override @Override
public boolean isAllDataBufferedForEntireText() { public boolean isAllDataBufferedForEntireText() {
return true; return true;
@ -657,32 +643,30 @@ public class ReplaceText extends AbstractProcessor {
} else { } else {
final Pattern searchPattern = Pattern.compile(searchValue, Pattern.LITERAL); final Pattern searchPattern = Pattern.compile(searchValue, Pattern.LITERAL);
flowFile = session.write(flowFile, new StreamReplaceCallback(this, charset, maxBufferSize, context, flowFile, searchPattern)); flowFile = session.write(flowFile, new StreamReplaceCallback(charset, maxBufferSize, context.getProperty(LINE_BY_LINE_EVALUATION_MODE).getValue(),
(bw, oneLine) -> {
int matches = 0;
int lastEnd = 0;
final Matcher matcher = searchPattern.matcher(oneLine);
while (matcher.find()) {
bw.write(oneLine, lastEnd, matcher.start() - lastEnd);
bw.write(replacementValue);
matches++;
lastEnd = matcher.end();
}
if (matches > 0) {
bw.write(oneLine, lastEnd, oneLine.length() - lastEnd);
} else {
bw.write(oneLine);
}
}));
} }
return flowFile; return flowFile;
} }
public void replaceInLine(BufferedWriter bw, String oneLine, @Nullable Matcher matcher, @Nullable Pattern searchPattern, ProcessContext context, FlowFile flowFile) throws IOException {
String replacementValue = context.getProperty(REPLACEMENT_VALUE).evaluateAttributeExpressions(flowFile).getValue();
int matches = 0;
int lastEnd = 0;
while (matcher.find()) {
bw.write(oneLine, lastEnd, matcher.start() - lastEnd);
bw.write(replacementValue);
matches++;
lastEnd = matcher.end();
}
if (matches > 0) {
bw.write(oneLine, lastEnd, oneLine.length() - lastEnd);
} else {
bw.write(oneLine);
}
}
@Override @Override
public boolean isAllDataBufferedForEntireText() { public boolean isAllDataBufferedForEntireText() {
return true; return true;
@ -706,49 +690,43 @@ public class ReplaceText extends AbstractProcessor {
FlowFile replace(FlowFile flowFile, ProcessSession session, ProcessContext context, String evaluateMode, Charset charset, int maxBufferSize); FlowFile replace(FlowFile flowFile, ProcessSession session, ProcessContext context, String evaluateMode, Charset charset, int maxBufferSize);
boolean isAllDataBufferedForEntireText(); boolean isAllDataBufferedForEntireText();
}
void replaceInLine(BufferedWriter bw, String oneLine, @Nullable Matcher matcher, @Nullable Pattern searchPattern, ProcessContext context, FlowFile flowFile) throws IOException ; @FunctionalInterface
private interface ReplaceLine {
void apply(BufferedWriter bw, String oneLine) throws IOException;
} }
private class StreamReplaceCallback implements StreamCallback { private class StreamReplaceCallback implements StreamCallback {
private final Charset charset; private final Charset charset;
private final int maxBufferSize; private final int maxBufferSize;
private final ProcessContext context; private final String lineByLineEvaluationMode;
private final FlowFile flowFile; private final ReplaceLine replaceLine;
private final ReplacementStrategyExecutor replacementStrategyExecutor;
private final Pattern searchPattern;
public StreamReplaceCallback(ReplacementStrategyExecutor replacementStrategyExecutor, private StreamReplaceCallback(Charset charset,
Charset charset,
int maxBufferSize, int maxBufferSize,
ProcessContext context, String lineByLineEvaluationMode,
FlowFile flowFile, ReplaceLine replaceLine) {
@Nullable Pattern searchPattern) {
this.replacementStrategyExecutor = replacementStrategyExecutor;
this.charset = charset; this.charset = charset;
this.maxBufferSize = maxBufferSize; this.maxBufferSize = maxBufferSize;
this.context = context; this.lineByLineEvaluationMode = lineByLineEvaluationMode;
this.flowFile = flowFile; this.replaceLine = replaceLine;
this.searchPattern = searchPattern;
} }
@Override @Override
public void process(final InputStream in, final OutputStream out) throws IOException { public void process(final InputStream in, final OutputStream out) throws IOException {
final String lineByLineEvaluationMode = context.getProperty(LINE_BY_LINE_EVALUATION_MODE).getValue();
try (final LineDemarcator demarcator = new LineDemarcator(in, charset, maxBufferSize, 8192); try (final LineDemarcator demarcator = new LineDemarcator(in, charset, maxBufferSize, 8192);
final BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(out, charset))) { final BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(out, charset))) {
String precedingLine = demarcator.nextLine(); String precedingLine = demarcator.nextLine();
String succeedingLine; String succeedingLine;
Matcher matcher = null;
boolean firstLine = true; boolean firstLine = true;
while (null != (succeedingLine = demarcator.nextLine())) { while (null != (succeedingLine = demarcator.nextLine())) {
matcher = null != searchPattern ? searchPattern.matcher(precedingLine) : null;
if(firstLine && lineByLineEvaluationMode.equalsIgnoreCase(FIRST_LINE)){ if(firstLine && lineByLineEvaluationMode.equalsIgnoreCase(FIRST_LINE)){
replacementStrategyExecutor.replaceInLine(bw, precedingLine, matcher, searchPattern, context, flowFile); replaceLine.apply(bw, precedingLine);
firstLine = false; firstLine = false;
} else if(firstLine && lineByLineEvaluationMode.equalsIgnoreCase(EXCEPT_FIRST_LINE)) { } else if(firstLine && lineByLineEvaluationMode.equalsIgnoreCase(EXCEPT_FIRST_LINE)) {
firstLine = false; firstLine = false;
@ -757,7 +735,7 @@ public class ReplaceText extends AbstractProcessor {
|| lineByLineEvaluationMode.equalsIgnoreCase(EXCEPT_LAST_LINE) || lineByLineEvaluationMode.equalsIgnoreCase(EXCEPT_LAST_LINE)
|| lineByLineEvaluationMode.equalsIgnoreCase(ALL) || lineByLineEvaluationMode.equalsIgnoreCase(ALL)
|| (!firstLine && lineByLineEvaluationMode.equalsIgnoreCase(EXCEPT_FIRST_LINE))) { || (!firstLine && lineByLineEvaluationMode.equalsIgnoreCase(EXCEPT_FIRST_LINE))) {
replacementStrategyExecutor.replaceInLine(bw, precedingLine, matcher, searchPattern, context, flowFile); replaceLine.apply(bw, precedingLine);
} else { } else {
bw.write(precedingLine); bw.write(precedingLine);
} }
@ -771,8 +749,7 @@ public class ReplaceText extends AbstractProcessor {
|| (firstLine && lineByLineEvaluationMode.equalsIgnoreCase(EXCEPT_FIRST_LINE))) { || (firstLine && lineByLineEvaluationMode.equalsIgnoreCase(EXCEPT_FIRST_LINE))) {
bw.write(precedingLine); bw.write(precedingLine);
} else { } else {
matcher = null != searchPattern ? searchPattern.matcher(precedingLine) : null; replaceLine.apply(bw, precedingLine);
replacementStrategyExecutor.replaceInLine(bw, precedingLine, matcher, searchPattern, context, flowFile);
} }
} }
} }