mirror of https://github.com/apache/nifi.git
NIFI-12118: refactored RemoveRecordPath member variable that was caching values, and improve performance with Pattern.matcher().find() instead of .match().
This closes #7783 Signed-off-by: Chris Sampson <chris.sampson82@gmail.com>
This commit is contained in:
parent
4b9eb8361c
commit
c28d040bca
|
@ -17,15 +17,14 @@
|
|||
|
||||
package org.apache.nifi.record.path;
|
||||
|
||||
import org.apache.nifi.record.path.util.RecordPathCache;
|
||||
import org.apache.nifi.serialization.record.Record;
|
||||
import org.apache.nifi.serialization.record.RecordFieldRemovalPath;
|
||||
import org.apache.nifi.serialization.record.RecordSchema;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.regex.Pattern;
|
||||
import java.util.stream.Collectors;
|
||||
import org.apache.nifi.record.path.util.RecordPathCache;
|
||||
import org.apache.nifi.serialization.record.Record;
|
||||
import org.apache.nifi.serialization.record.RecordFieldRemovalPath;
|
||||
import org.apache.nifi.serialization.record.RecordSchema;
|
||||
|
||||
public class RecordFieldRemover {
|
||||
private final RecordPathCache recordPathCache;
|
||||
|
@ -50,20 +49,22 @@ public class RecordFieldRemover {
|
|||
final RecordPathResult recordPathResult = recordPath.evaluate(record);
|
||||
final List<FieldValue> selectedFields = recordPathResult.getSelectedFields().collect(Collectors.toList());
|
||||
|
||||
if (!selectedFields.isEmpty()) {
|
||||
if (recordPathRemovalProperties.isAppliedToAllElementsInCollection()) {
|
||||
// all elements have the same parent, so navigate up from the first element in the collection
|
||||
selectedFields.get(0).getParent().ifPresent(FieldValue::removeContent);
|
||||
} else {
|
||||
selectedFields.forEach(FieldValue::remove);
|
||||
}
|
||||
|
||||
if (recordPathRemovalProperties.isRemovingFieldsNotJustElementsFromWithinCollection()) {
|
||||
removeFieldsFromSchema(selectedFields);
|
||||
}
|
||||
|
||||
fieldsChanged = true;
|
||||
if (selectedFields.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (recordPathRemovalProperties.isAppliedToAllElementsInCollection()) {
|
||||
// all elements have the same parent, so navigate up from the first element in the collection
|
||||
selectedFields.get(0).getParent().ifPresent(FieldValue::removeContent);
|
||||
} else {
|
||||
selectedFields.forEach(FieldValue::remove);
|
||||
}
|
||||
|
||||
if (recordPathRemovalProperties.isRemovingFieldsNotJustElementsFromWithinCollection()) {
|
||||
removeFieldsFromSchema(selectedFields);
|
||||
}
|
||||
|
||||
fieldsChanged = true;
|
||||
}
|
||||
|
||||
private void removeFieldsFromSchema(final List<FieldValue> selectedFields) {
|
||||
|
@ -92,7 +93,7 @@ public class RecordFieldRemover {
|
|||
}
|
||||
|
||||
public static class RecordPathRemovalProperties {
|
||||
private static final Pattern ALL_ELEMENTS_REGEX = Pattern.compile(".*\\[\\s*(?:\\*|0\\s*\\.\\.\\s*-1)\\s*]$");
|
||||
private static final Pattern ALL_ELEMENTS_REGEX = Pattern.compile("\\[\\s*(?:\\*|0\\s*\\.\\.\\s*-1)\\s*]$");
|
||||
private static final Pattern ARRAY_ELEMENTS_REGEX = Pattern.compile("\\[\\s*-?\\d+(?:\\s*,\\s*-?\\d+)*+\\s*]");
|
||||
private static final Pattern MAP_ELEMENTS_REGEX = Pattern.compile("\\[\\s*'[^']+'(?:\\s*,\\s*'[^']+')*+\\s*]");
|
||||
|
||||
|
@ -106,7 +107,7 @@ public class RecordFieldRemover {
|
|||
this.recordPath = recordPath;
|
||||
|
||||
// ends with [*] or [0..-1]
|
||||
this.appliedToAllElementsInCollection = ALL_ELEMENTS_REGEX.matcher(recordPath).matches();
|
||||
this.appliedToAllElementsInCollection = ALL_ELEMENTS_REGEX.matcher(recordPath).find();
|
||||
|
||||
// contains an array reference [] with one or more element references, e.g. [1], [ 1, -1]
|
||||
this.appliedToIndividualArrayElements = ARRAY_ELEMENTS_REGEX.matcher(recordPath).find();
|
||||
|
|
|
@ -17,6 +17,10 @@
|
|||
|
||||
package org.apache.nifi.processors.standard;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import org.apache.nifi.annotation.behavior.DynamicProperty;
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
|
||||
|
@ -40,11 +44,6 @@ import org.apache.nifi.record.path.util.RecordPathCache;
|
|||
import org.apache.nifi.record.path.validation.RecordPathValidator;
|
||||
import org.apache.nifi.serialization.record.Record;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
|
||||
@SideEffectFree
|
||||
@SupportsBatching
|
||||
|
@ -65,7 +64,6 @@ import java.util.List;
|
|||
@SeeAlso({UpdateRecord.class})
|
||||
public class RemoveRecordField extends AbstractRecordProcessor {
|
||||
private volatile RecordPathCache recordPathCache;
|
||||
private volatile List<RecordFieldRemover.RecordPathRemovalProperties> recordPathsToRemove;
|
||||
|
||||
private static final String ROOT_PATH = "/";
|
||||
|
||||
|
@ -111,24 +109,21 @@ public class RemoveRecordField extends AbstractRecordProcessor {
|
|||
@OnScheduled
|
||||
public void collectRecordPaths(final ProcessContext context) {
|
||||
recordPathCache = new RecordPathCache(context.getProperties().size() * 2);
|
||||
|
||||
recordPathsToRemove = null;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Record process(final Record record, final FlowFile flowFile, final ProcessContext context, final long count) {
|
||||
if (recordPathsToRemove == null) {
|
||||
recordPathsToRemove = new ArrayList<>(context.getProperties().size());
|
||||
context.getProperties().keySet().forEach(property -> {
|
||||
if (property.isDynamic()) {
|
||||
// validate RecordPath from Expression Language (if applicable)
|
||||
final String recordPath = context.getProperty(property).evaluateAttributeExpressions(flowFile).getValue();
|
||||
if (ROOT_PATH.equals(recordPath)) {
|
||||
throw new ProcessException(String.format("The root Record Path %s cannot be removed for %s", ROOT_PATH, property.getDisplayName()));
|
||||
}
|
||||
recordPathsToRemove.add(new RecordFieldRemover.RecordPathRemovalProperties(recordPath));
|
||||
final List<RecordFieldRemover.RecordPathRemovalProperties> recordPathsToRemove = new ArrayList<>();
|
||||
for (final PropertyDescriptor property : context.getProperties().keySet()) {
|
||||
if (property.isDynamic()) {
|
||||
// validate RecordPath from Expression Language (if applicable)
|
||||
final String recordPath = context.getProperty(property).evaluateAttributeExpressions(flowFile).getValue();
|
||||
if (ROOT_PATH.equals(recordPath)) {
|
||||
throw new ProcessException(String.format("The root Record Path %s cannot be removed for %s", ROOT_PATH, property.getDisplayName()));
|
||||
}
|
||||
});
|
||||
|
||||
recordPathsToRemove.add(new RecordFieldRemover.RecordPathRemovalProperties(recordPath));
|
||||
}
|
||||
}
|
||||
|
||||
final RecordFieldRemover recordFieldRemover = new RecordFieldRemover(record, recordPathCache);
|
||||
|
|
Loading…
Reference in New Issue