NIFI-7183 - This closes #4073. Improve ReplaceText when removing FF's content

Signed-off-by: Joe Witt <joewitt@apache.org>
This commit is contained in:
Pierre Villard 2020-02-21 18:35:27 -08:00 committed by Joe Witt
parent 722b99432c
commit 9a8a551e03
No known key found for this signature in database
GPG Key ID: 9093BF854F811A1A

View File

@ -26,6 +26,7 @@ import org.apache.nifi.annotation.behavior.SystemResource;
import org.apache.nifi.annotation.behavior.SystemResourceConsideration; import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.attribute.expression.language.exception.AttributeExpressionLanguageException; import org.apache.nifi.attribute.expression.language.exception.AttributeExpressionLanguageException;
import org.apache.nifi.attribute.expression.language.exception.IllegalAttributeException; import org.apache.nifi.attribute.expression.language.exception.IllegalAttributeException;
import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.AllowableValue;
@ -199,6 +200,7 @@ public class ReplaceText extends AbstractProcessor {
private List<PropertyDescriptor> properties; private List<PropertyDescriptor> properties;
private Set<Relationship> relationships; private Set<Relationship> relationships;
private ReplacementStrategyExecutor replacementStrategyExecutor;
@Override @Override
protected void init(final ProcessorInitializationContext context) { protected void init(final ProcessorInitializationContext context) {
@ -254,21 +256,12 @@ public class ReplaceText extends AbstractProcessor {
return errors; return errors;
} }
@Override @OnScheduled
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { public void setup(ProcessContext context) {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final ComponentLog logger = getLogger();
final String replacementStrategy = context.getProperty(REPLACEMENT_STRATEGY).getValue(); final String replacementStrategy = context.getProperty(REPLACEMENT_STRATEGY).getValue();
final String evaluateMode = context.getProperty(EVALUATION_MODE).getValue();
final Charset charset = Charset.forName(context.getProperty(CHARACTER_SET).getValue());
final int maxBufferSize = context.getProperty(MAX_BUFFER_SIZE).asDataSize(DataUnit.B).intValue(); final int maxBufferSize = context.getProperty(MAX_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
final String evaluateMode = context.getProperty(EVALUATION_MODE).getValue();
final byte[] buffer; final byte[] buffer;
if (replacementStrategy.equalsIgnoreCase(regexReplaceValue) || replacementStrategy.equalsIgnoreCase(literalReplaceValue)) { if (replacementStrategy.equalsIgnoreCase(regexReplaceValue) || replacementStrategy.equalsIgnoreCase(literalReplaceValue)) {
buffer = new byte[maxBufferSize]; buffer = new byte[maxBufferSize];
@ -276,7 +269,6 @@ public class ReplaceText extends AbstractProcessor {
buffer = null; buffer = null;
} }
ReplacementStrategyExecutor replacementStrategyExecutor;
switch (replacementStrategy) { switch (replacementStrategy) {
case prependValue: case prependValue:
replacementStrategyExecutor = new PrependReplace(); replacementStrategyExecutor = new PrependReplace();
@ -288,6 +280,10 @@ public class ReplaceText extends AbstractProcessor {
// for backward compatibility - if replacement regex is ".*" then we will simply always replace the content. // for backward compatibility - if replacement regex is ".*" then we will simply always replace the content.
if (context.getProperty(SEARCH_VALUE).getValue().equals(".*")) { if (context.getProperty(SEARCH_VALUE).getValue().equals(".*")) {
replacementStrategyExecutor = new AlwaysReplace(); replacementStrategyExecutor = new AlwaysReplace();
} else if (context.getProperty(SEARCH_VALUE).getValue().equals(DEFAULT_REGEX)
&& evaluateMode.equalsIgnoreCase(ENTIRE_TEXT)
&& context.getProperty(REPLACEMENT_VALUE).getValue().isEmpty()) {
replacementStrategyExecutor = new AlwaysReplace();
} else { } else {
replacementStrategyExecutor = new RegexReplace(buffer, context); replacementStrategyExecutor = new RegexReplace(buffer, context);
} }
@ -302,6 +298,20 @@ public class ReplaceText extends AbstractProcessor {
default: default:
throw new AssertionError(); throw new AssertionError();
} }
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final ComponentLog logger = getLogger();
final Charset charset = Charset.forName(context.getProperty(CHARACTER_SET).getValue());
final int maxBufferSize = context.getProperty(MAX_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
final String evaluateMode = context.getProperty(EVALUATION_MODE).getValue();
if (evaluateMode.equalsIgnoreCase(ENTIRE_TEXT)) { if (evaluateMode.equalsIgnoreCase(ENTIRE_TEXT)) {
if (flowFile.getSize() > maxBufferSize && replacementStrategyExecutor.isAllDataBufferedForEntireText()) { if (flowFile.getSize() > maxBufferSize && replacementStrategyExecutor.isAllDataBufferedForEntireText()) {
@ -325,6 +335,7 @@ public class ReplaceText extends AbstractProcessor {
session.transfer(flowFile, REL_FAILURE); session.transfer(flowFile, REL_FAILURE);
return; return;
} }
logger.info("Transferred {} to 'success'", new Object[] {flowFile}); logger.info("Transferred {} to 'success'", new Object[] {flowFile});
session.getProvenanceReporter().modifyContent(flowFile, stopWatch.getElapsed(TimeUnit.MILLISECONDS)); session.getProvenanceReporter().modifyContent(flowFile, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
session.transfer(flowFile, REL_SUCCESS); session.transfer(flowFile, REL_SUCCESS);