mirror of
https://github.com/apache/nifi.git
synced 2025-02-08 19:14:57 +00:00
NIFI-7994: Fixed ReplaceText concurrency issue
Moving RegexReplace.additionalAttrs field to local variable in replace() method. Concurrent usage of the additionalAttrs map caused data corruption.
This commit is contained in:
parent
479ee6e3db
commit
018778a25d
@ -494,7 +494,6 @@ public class ReplaceText extends AbstractProcessor {
|
|||||||
|
|
||||||
private static class RegexReplace implements ReplacementStrategyExecutor {
|
private static class RegexReplace implements ReplacementStrategyExecutor {
|
||||||
private final int numCapturingGroups;
|
private final int numCapturingGroups;
|
||||||
private final Map<String, String> additionalAttrs;
|
|
||||||
|
|
||||||
// back references are not supported in the evaluated expression
|
// back references are not supported in the evaluated expression
|
||||||
private final AttributeValueDecorator escapeBackRefDecorator = new AttributeValueDecorator() {
|
private final AttributeValueDecorator escapeBackRefDecorator = new AttributeValueDecorator() {
|
||||||
@ -507,7 +506,6 @@ public class ReplaceText extends AbstractProcessor {
|
|||||||
|
|
||||||
public RegexReplace(final String regex) {
|
public RegexReplace(final String regex) {
|
||||||
numCapturingGroups = Pattern.compile(regex).matcher("").groupCount();
|
numCapturingGroups = Pattern.compile(regex).matcher("").groupCount();
|
||||||
additionalAttrs = new HashMap<>(numCapturingGroups);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -516,6 +514,7 @@ public class ReplaceText extends AbstractProcessor {
|
|||||||
|
|
||||||
final String searchRegex = context.getProperty(SEARCH_VALUE).evaluateAttributeExpressions(flowFile, quotedAttributeDecorator).getValue();
|
final String searchRegex = context.getProperty(SEARCH_VALUE).evaluateAttributeExpressions(flowFile, quotedAttributeDecorator).getValue();
|
||||||
final Pattern searchPattern = Pattern.compile(searchRegex);
|
final Pattern searchPattern = Pattern.compile(searchRegex);
|
||||||
|
final Map<String, String> additionalAttrs = new HashMap<>(numCapturingGroups);
|
||||||
|
|
||||||
FlowFile updatedFlowFile;
|
FlowFile updatedFlowFile;
|
||||||
if (evaluateMode.equalsIgnoreCase(ENTIRE_TEXT)) {
|
if (evaluateMode.equalsIgnoreCase(ENTIRE_TEXT)) {
|
||||||
@ -526,7 +525,6 @@ public class ReplaceText extends AbstractProcessor {
|
|||||||
session.read(flowFile, in -> StreamUtils.fillBuffer(in, buffer, false));
|
session.read(flowFile, in -> StreamUtils.fillBuffer(in, buffer, false));
|
||||||
|
|
||||||
final String contentString = new String(buffer, 0, flowFileSize, charset);
|
final String contentString = new String(buffer, 0, flowFileSize, charset);
|
||||||
additionalAttrs.clear();
|
|
||||||
final Matcher matcher = searchPattern.matcher(contentString);
|
final Matcher matcher = searchPattern.matcher(contentString);
|
||||||
|
|
||||||
final PropertyValue replacementValueProperty = context.getProperty(REPLACEMENT_VALUE);
|
final PropertyValue replacementValueProperty = context.getProperty(REPLACEMENT_VALUE);
|
||||||
@ -560,8 +558,7 @@ public class ReplaceText extends AbstractProcessor {
|
|||||||
final Matcher matcher = searchPattern.matcher("");
|
final Matcher matcher = searchPattern.matcher("");
|
||||||
updatedFlowFile = session.write(flowFile, new StreamReplaceCallback(charset, maxBufferSize, context.getProperty(LINE_BY_LINE_EVALUATION_MODE).getValue(),
|
updatedFlowFile = session.write(flowFile, new StreamReplaceCallback(charset, maxBufferSize, context.getProperty(LINE_BY_LINE_EVALUATION_MODE).getValue(),
|
||||||
(bw, oneLine) -> {
|
(bw, oneLine) -> {
|
||||||
additionalAttrs.clear();
|
matcher.reset(oneLine);
|
||||||
matcher.reset(oneLine);
|
|
||||||
|
|
||||||
int matches = 0;
|
int matches = 0;
|
||||||
StringBuffer sb = new StringBuffer();
|
StringBuffer sb = new StringBuffer();
|
||||||
|
Loading…
x
Reference in New Issue
Block a user