NIFI-12124: This closes #7791. Added a new RenameRecordField processor. In testing, also noticed that the Descendant Wildcard operator (//*) and Descendant Field Path Operator (//name, for instance) did not properly account for array of records or map elements, so addressed those concerns.

Signed-off-by: Joseph Witt <joewitt@apache.org>
This commit is contained in:
Mark Payne 2023-09-25 13:19:30 -04:00 committed by Joseph Witt
parent f4ae292a45
commit 3ae0eedee6
No known key found for this signature in database
GPG Key ID: 9093BF854F811A1A
23 changed files with 740 additions and 42 deletions

View File

@ -172,6 +172,7 @@ jobs:
if: failure()
- name: Post Disk Usage
run: df
if: ${{ always() }}
macos-build-jp:
timeout-minutes: 150
@ -235,6 +236,7 @@ jobs:
if: failure()
- name: Post Disk Usage
run: df
if: ${{ always() }}
windows-build:
timeout-minutes: 150
@ -300,3 +302,4 @@ jobs:
if: failure()
- name: Post Disk Usage
run: df
if: ${{ always() }}

View File

@ -20,14 +20,18 @@ package org.apache.nifi.record.path.paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
import org.apache.nifi.record.path.FieldValue;
import org.apache.nifi.record.path.MapEntryFieldValue;
import org.apache.nifi.record.path.RecordPathEvaluationContext;
import org.apache.nifi.record.path.StandardFieldValue;
import org.apache.nifi.record.path.util.Filters;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.type.MapDataType;
public class DescendantFieldPath extends RecordPathSegment {
private final String descendantName;
@ -74,6 +78,26 @@ public class DescendantFieldPath extends RecordPathSegment {
if (Filters.isRecord(childField.getDataType(), recordValue)) {
final FieldValue childFieldValue = new StandardFieldValue(recordValue, childField, fieldValue);
matchingValues.addAll(findDescendants(childFieldValue));
} else if (Filters.isRecordArray(childField.getDataType(), recordValue)) {
final Object[] arrayValues = (Object[]) recordValue;
for (final Object arrayValue : arrayValues) {
final FieldValue childFieldValue = new StandardFieldValue(arrayValue, childField, fieldValue);
matchingValues.addAll(findDescendants(childFieldValue));
}
} else if (childField.getDataType().getFieldType() == RecordFieldType.MAP) {
final Map<String, ?> map = (Map<String, ?>) recordValue;
final DataType valueType = ((MapDataType) childField.getDataType()).getValueType();
for (final Map.Entry<String, ?> entry : map.entrySet()) {
final String mapKey = entry.getKey();
final Object mapValue = entry.getValue();
final RecordField elementField = new RecordField(fieldValue.getField().getFieldName(), valueType);
final FieldValue mapFieldValue = new MapEntryFieldValue(mapValue, elementField, fieldValue, mapKey);
matchingValues.add(mapFieldValue);
}
}
}

View File

@ -20,14 +20,18 @@ package org.apache.nifi.record.path.paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
import org.apache.nifi.record.path.FieldValue;
import org.apache.nifi.record.path.MapEntryFieldValue;
import org.apache.nifi.record.path.RecordPathEvaluationContext;
import org.apache.nifi.record.path.StandardFieldValue;
import org.apache.nifi.record.path.util.Filters;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.type.MapDataType;
public class WildcardDescendantPath extends RecordPathSegment {
@ -66,6 +70,26 @@ public class WildcardDescendantPath extends RecordPathSegment {
if (Filters.isRecord(childField.getDataType(), value)) {
final FieldValue childFieldValue = new StandardFieldValue(value, childField, fieldValue);
matchingValues.addAll(findDescendants(childFieldValue));
} else if (Filters.isRecordArray(childField.getDataType(), value)) {
final Object[] arrayValues = (Object[]) value;
for (final Object arrayValue : arrayValues) {
final FieldValue childFieldValue = new StandardFieldValue(arrayValue, childField, fieldValue);
matchingValues.addAll(findDescendants(childFieldValue));
}
} else if (childField.getDataType().getFieldType() == RecordFieldType.MAP) {
final Map<String, ?> map = (Map<String, ?>) value;
final DataType valueType = ((MapDataType) childField.getDataType()).getValueType();
for (final Map.Entry<String, ?> entry : map.entrySet()) {
final String mapKey = entry.getKey();
final Object mapValue = entry.getValue();
final RecordField elementField = new RecordField(fieldValue.getField().getFieldName(), valueType);
final FieldValue mapFieldValue = new MapEntryFieldValue(mapValue, elementField, fieldValue, mapKey);
matchingValues.add(mapFieldValue);
}
}
}

View File

@ -17,14 +17,15 @@
package org.apache.nifi.record.path.util;
import java.lang.reflect.Array;
import java.util.Optional;
import java.util.function.Predicate;
import java.util.stream.Stream;
import org.apache.nifi.record.path.FieldValue;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.type.ArrayDataType;
public class Filters {
@ -71,4 +72,39 @@ public class Filters {
return false;
}
public static boolean isRecordArray(final DataType dataType, final Object value) {
if (dataType.getFieldType() != RecordFieldType.ARRAY) {
return false;
}
final ArrayDataType arrayDataType = (ArrayDataType) dataType;
final DataType elementType = arrayDataType.getElementType();
if (elementType != null && elementType.getFieldType() == RecordFieldType.RECORD) {
return true;
}
if (value == null) {
return false;
}
if (!value.getClass().isArray()) {
return false;
}
final int length = Array.getLength(value);
if (length == 0) {
return false;
}
for (int i = 0; i < length; i++) {
final Object val = Array.get(value, i);
if (!(val instanceof Record)) {
return false;
}
}
return true;
}
}

View File

@ -17,21 +17,6 @@
package org.apache.nifi.record.path;
import org.apache.nifi.record.path.exception.RecordPathException;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.type.ArrayDataType;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
import org.apache.nifi.uuid5.Uuid5Util;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import java.nio.charset.IllegalCharsetNameException;
import java.nio.charset.StandardCharsets;
import java.sql.Date;
@ -52,6 +37,20 @@ import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.nifi.record.path.exception.RecordPathException;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.type.ArrayDataType;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
import org.apache.nifi.uuid5.Uuid5Util;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
@ -1364,6 +1363,7 @@ public class TestRecordPath {
assertEquals("John Smith", results.get(0).getValue());
assertEquals("Jane Smith", results.get(1).getValue());
}
@Test
public void testFieldName() {
final List<RecordField> fields = new ArrayList<>();
@ -1382,6 +1382,52 @@ public class TestRecordPath {
assertEquals(0L, RecordPath.compile("//name[not(startsWith(fieldName(.), 'n'))]").evaluate(record).getSelectedFields().count());
}
@Test
public void testRecursiveWithMap() {
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("map", RecordFieldType.MAP.getMapDataType(RecordFieldType.STRING.getDataType())));
final RecordSchema schema = new SimpleRecordSchema(fields);
final Map<String, String> mapValues = new HashMap<>();
mapValues.put("a", "z");
mapValues.put("b", "Y");
mapValues.put("c", "x");
final Map<String, Object> values = new HashMap<>();
values.put("map", mapValues);
final Record record = new MapRecord(schema, values);
assertEquals("Y", RecordPath.compile("//*[. = toUpperCase(.)]").evaluate(record).getSelectedFields().findFirst().get().getValue());
}
@Test
public void testRecursiveWithChoiceThatIncludesRecord() {
final RecordSchema personSchema = new SimpleRecordSchema(Arrays.asList(
new RecordField("name", RecordFieldType.STRING.getDataType()),
new RecordField("age", RecordFieldType.INT.getDataType())
));
final DataType personDataType = RecordFieldType.RECORD.getRecordDataType(personSchema);
final DataType stringDataType = RecordFieldType.STRING.getDataType();
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("person", RecordFieldType.CHOICE.getChoiceDataType(stringDataType, personDataType)));
final RecordSchema schema = new SimpleRecordSchema(fields);
final Map<String, Object> personValueMap = new HashMap<>();
personValueMap.put("name", "John Doe");
personValueMap.put("age", 30);
final Record personRecord = new MapRecord(personSchema, personValueMap);
final Map<String, Object> values = new HashMap<>();
values.put("person", personRecord);
final Record record = new MapRecord(schema, values);
final List<Object> expectedValues = List.of(personRecord, "John Doe", 30);
assertEquals(expectedValues, RecordPath.compile("//*").evaluate(record).getSelectedFields().map(FieldValue::getValue).toList());
}
@Test
public void testToDateFromString() {
final List<RecordField> fields = new ArrayList<>();

View File

@ -17,12 +17,6 @@
package org.apache.nifi.serialization;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldRemovalPath;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.SchemaIdentifier;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@ -31,6 +25,11 @@ import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldRemovalPath;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.SchemaIdentifier;
public class SimpleRecordSchema implements RecordSchema {
private List<RecordField> fields = null;
@ -188,12 +187,17 @@ public class SimpleRecordSchema implements RecordSchema {
public int hashCode() {
int computed = this.hashCode;
if (computed == 0) {
computed = this.hashCode = 143 + 3 * fields.hashCode();
computed = this.hashCode = calculateHashCode();
}
return computed;
}
private int calculateHashCode() {
return 143 + 3 * fields.hashCode();
}
private static String createText(final List<RecordField> fields) {
final StringBuilder sb = new StringBuilder("[");
@ -264,18 +268,15 @@ public class SimpleRecordSchema implements RecordSchema {
@Override
public void removeField(final String fieldName) {
final List<RecordField> remainingFields = fields.stream()
.filter(field -> !field.getFieldName().equals(fieldName)).collect(Collectors.toList());
final List<RecordField> remainingFields = new ArrayList<>();
for (final RecordField field : fields) {
if (!field.getFieldName().equals(fieldName)) {
remainingFields.add(field);
}
}
if (remainingFields.size() != fields.size()) {
fields = null;
setFields(remainingFields);
text.set(createText(fields));
textAvailable = true;
schemaFormat = null;
schemaIdentifier = SchemaIdentifier.EMPTY;
hashCode = 0; // set to 0 to trigger re-calculation
hashCode = hashCode();
resetFields(remainingFields);
}
}
@ -288,6 +289,41 @@ public class SimpleRecordSchema implements RecordSchema {
}
}
@Override
public boolean renameField(final String currentName, final String newName) {
final List<RecordField> updatedFields = new ArrayList<>(fields.size());
boolean renamed = false;
for (final RecordField recordField : fields) {
if (recordField.getFieldName().equals(currentName)) {
final RecordField updated = new RecordField(newName, recordField.getDataType(), recordField.getDefaultValue(), recordField.getAliases(), recordField.isNullable());
updatedFields.add(updated);
renamed = true;
} else {
updatedFields.add(recordField);
}
}
if (!renamed) {
return false;
}
resetFields(updatedFields);
return true;
}
private void resetFields(final List<RecordField> updatedFields) {
this.fields = null;
setFields(updatedFields);
textAvailable = false;
text.set(null);
schemaFormat = null;
schemaIdentifier = SchemaIdentifier.EMPTY;
hashCode = calculateHashCode();
}
@Override
public boolean isRecursive() {
return getFields().stream().anyMatch(field -> field.getDataType().isRecursive(Collections.singletonList(this)));

View File

@ -25,6 +25,8 @@ import org.apache.nifi.serialization.record.type.MapDataType;
import org.apache.nifi.serialization.record.type.RecordDataType;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
import org.apache.nifi.serialization.record.util.IllegalTypeConversionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.charset.StandardCharsets;
import java.text.DateFormat;
@ -42,6 +44,8 @@ import java.util.Set;
import java.util.function.Supplier;
public class MapRecord implements Record {
private static final Logger logger = LoggerFactory.getLogger(MapRecord.class);
private RecordSchema schema;
private final Map<String, Object> values;
private Optional<SerializedForm> serializedForm;
@ -406,6 +410,30 @@ public class MapRecord implements Record {
existingField.ifPresent(recordField -> values.remove(recordField.getFieldName()));
}
@Override
public boolean rename(final RecordField field, final String newName) {
final Optional<RecordField> resolvedField = resolveField(field);
if (resolvedField.isEmpty()) {
logger.debug("Could not rename {} to {} because the field could not be resolved to any field in the schema", field, newName);
return false;
}
// If the new name already exists in the schema, and there's already a value, do not rename.
if (schema.getField(newName).isPresent()) {
throw new IllegalArgumentException("Could not rename [" + field + "] to [" + newName + "] because a field already exists with the name [" + newName + "]");
}
final String currentName = resolvedField.get().getFieldName();
final boolean renamed = schema.renameField(currentName, newName);
if (!renamed) {
return false;
}
final Object currentValue = values.remove(currentName);
values.put(newName, currentValue);
return true;
}
@Override
public void regenerateSchema() {
final List<RecordField> schemaFields = new ArrayList<>(schema.getFieldCount());

View File

@ -17,12 +17,11 @@
package org.apache.nifi.serialization.record;
import org.apache.nifi.serialization.record.util.IllegalTypeConversionException;
import java.util.Date;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.nifi.serialization.record.util.IllegalTypeConversionException;
public interface Record {
@ -152,6 +151,17 @@ public interface Record {
*/
void remove(RecordField field);
/**
* Renames the given field to the new name
*
* @param field the RecordField to update
* @param newName the new name for the field
* @return <code>true</code> if the field was renamed, <code>false</code> if the field could not be found
* @throws IllegalArgumentException if unable to rename field due to a naming conflict, such as the new name already existing in the schema
*/
boolean rename(RecordField field, String newName);
/**
* Creates a new schema for the Record based on the Record's field types.
* In case any of the Record's fields were changed, this method propagates the changes to the parent Record.

View File

@ -97,6 +97,16 @@ public interface RecordSchema {
*/
void removePath(RecordFieldRemovalPath path);
/**
* Renames the field that corresponds to the given RecordField so that its new name is equal to the given name
*
* @param currentName the current name of the field
* @param newName the new name for the field
* @return <code>true</code> if the field is renamed, <code>false</code> if the field with the given current name cannot be found
* @throws IllegalArgumentException if unable to rename the field due to a naming conflict
*/
boolean renameField(String currentName, String newName);
/**
* @return true if the schema contains itself as a nested field type, false if it does not
*/

View File

@ -18,6 +18,10 @@
package org.apache.nifi.accumulo.data;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.RecordField;
@ -26,11 +30,6 @@ import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.SchemaIdentifier;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
public class KeySchema implements RecordSchema {
private static final List<RecordField> KEY_FIELDS = new ArrayList<>();
@ -125,6 +124,11 @@ public class KeySchema implements RecordSchema {
throw new NotImplementedException("Path removal from Accumulo KeySchema is not implemented.");
}
@Override
public boolean renameField(final String currentName, final String newName) {
throw new NotImplementedException("Field renaming from Accumulo KeySchema is not implemented.");
}
@Override
public boolean isRecursive() {
throw new NotImplementedException("Determining if an Accumulo KeySchema is recursive is not implemented.");

View File

@ -0,0 +1,209 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.documentation.UseCase;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.record.path.RecordPath;
import org.apache.nifi.record.path.RecordPathResult;
import org.apache.nifi.record.path.util.RecordPathCache;
import org.apache.nifi.record.path.validation.RecordPathPropertyNameValidator;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
@SideEffectFree
@SupportsBatching
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@Tags({"update", "record", "rename", "field", "generic", "schema", "json", "csv", "avro", "log", "logs"})
@CapabilityDescription("Renames one or more fields in each Record of a FlowFile. "
+ "This Processor requires that at least one user-defined Property be added. The name of the Property should indicate a RecordPath that determines the field that should "
+ "be updated. The value of the Property is the new name to assign to the Record Field that matches the RecordPath. The property value may use Expression Language to reference "
+ "FlowFile attributes as well as the variables `field.name`, `field.value`, `field.type`, and `record.index`")
@WritesAttributes({
@WritesAttribute(attribute = "record.index", description = "This attribute provides the current row index and is only available inside the literal value expression.")
})
@DynamicProperty(name = "A RecordPath that identifies which field(s) to update",
value = "The new name to assign to the Record field",
description = "Allows users to specify a new name for each field that matches the RecordPath.",
expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
@SeeAlso({UpdateRecord.class, RemoveRecordField.class})
@UseCase(
description = "Rename a field in each Record to a specific, known name.",
keywords = {"rename", "field", "static", "specific", "name"},
configuration = """
Configure the 'Record Reader' according to the input format.
Configure the 'Record Writer' according to the desired output format.
Add a property to the Processor such that the name of the property is a RecordPath to identifies the field to rename. \
The value of the property is the new name of the property.
For example, to rename the `name` field to `full_name`, add a property with a name of `/name` and a value of `full_name`.
Many properties can be added following this pattern in order to rename multiple fields.
"""
)
@UseCase(
description = "Rename a field in each Record to a name that is derived from a FlowFile attribute.",
keywords = {"rename", "field", "expression language", "EL", "flowfile", "attribute"},
configuration = """
Configure the 'Record Reader' according to the input format.
Configure the 'Record Writer' according to the desired output format.
Add a property to the Processor such that the name of the property is a RecordPath to identifies the field to rename. \
The value of the property is an Expression Language expression that can be used to determine the new name of the field.
For example, to rename the `addr` field to whatever value is stored in the `preferred_address_name` attribute, \
add a property with a name of `/name` and a value of `${preferred_address_name}`.
Many properties can be added following this pattern in order to rename multiple fields.
"""
)
@UseCase(
description = "Rename a field in each Record to a new name that is derived from the current field name.",
notes = "This might be used, for example, to add a prefix or a suffix to some fields, or to transform the name of the field by making it uppercase.",
keywords = {"rename", "field", "expression language", "EL", "field.name"},
configuration = """
Configure the 'Record Reader' according to the input format.
Configure the 'Record Writer' according to the desired output format.
Add a property to the Processor such that the name of the property is a RecordPath to identifies the field to rename. \
The value of the property is an Expression Language expression that references the `field.name` property.
For example, to rename all fields with a prefix of `pre_`, we add a property named `/*` and a value of `pre_${field.name}`. \
If we would like this to happen recursively, to nested fields as well, we use a property name of `//*` with the value of `pre_${field.name}`.
To make all field names uppercase, we can add a property named `//*` with a value of `${field.name:toUpper()}`.
Many properties can be added following this pattern in order to rename multiple fields.
"""
)
public class RenameRecordField extends AbstractRecordProcessor {
private static final String FIELD_NAME = "field.name";
private static final String FIELD_VALUE = "field.value";
private static final String FIELD_TYPE = "field.type";
private static final String RECORD_INDEX = "record.index";
private volatile RecordPathCache recordPathCache;
private volatile List<String> recordPaths;
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
return new PropertyDescriptor.Builder()
.name(propertyDescriptorName)
.description("Specifies the new name to use for any record field that match the RecordPath: " + propertyDescriptorName)
.required(false)
.dynamic(true)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(new RecordPathPropertyNameValidator())
.build();
}
@Override
protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
final boolean containsDynamic = validationContext.getProperties().keySet().stream().anyMatch(PropertyDescriptor::isDynamic);
if (containsDynamic) {
return Collections.emptyList();
}
return Collections.singleton(new ValidationResult.Builder()
.subject("User-defined Properties")
.valid(false)
.explanation("At least one RecordPath must be specified")
.build());
}
@OnScheduled
public void createRecordPaths(final ProcessContext context) {
recordPathCache = new RecordPathCache(context.getProperties().size());
final List<String> recordPaths = new ArrayList<>(context.getProperties().size() - 2);
for (final PropertyDescriptor property : context.getProperties().keySet()) {
if (property.isDynamic()) {
recordPaths.add(property.getName());
}
}
this.recordPaths = recordPaths;
}
@Override
protected Record process(final Record record, final FlowFile flowFile, final ProcessContext context, final long count) {
for (final String propertyName : recordPaths) {
final RecordPath recordPath = recordPathCache.getCompiled(propertyName);
final RecordPathResult result = recordPath.evaluate(record);
final PropertyValue newFieldNamePropertyValue = context.getProperty(propertyName);
if (newFieldNamePropertyValue.isExpressionLanguagePresent()) {
final Map<String, String> fieldVariables = new HashMap<>();
result.getSelectedFields().forEach(fieldVal -> {
fieldVariables.clear();
fieldVariables.put(FIELD_NAME, fieldVal.getField().getFieldName());
fieldVariables.put(FIELD_VALUE, DataTypeUtils.toString(fieldVal.getValue(), (String) null));
fieldVariables.put(FIELD_TYPE, fieldVal.getField().getDataType().getFieldType().name());
fieldVariables.put(RECORD_INDEX, String.valueOf(count));
final String newFieldName = newFieldNamePropertyValue.evaluateAttributeExpressions(flowFile, fieldVariables).getValue();
fieldVal.getParentRecord().ifPresent(parentRecord -> {
parentRecord.rename(fieldVal.getField(), newFieldName);
});
});
} else {
final String newFieldName = newFieldNamePropertyValue.evaluateAttributeExpressions(flowFile).getValue();
result.getSelectedFields().forEach(fieldVal -> {
fieldVal.getParentRecord().ifPresent(parentRecord -> {
parentRecord.rename(fieldVal.getField(), newFieldName);
});
});
}
}
return record;
}
}

View File

@ -99,6 +99,7 @@ org.apache.nifi.processors.standard.QueryDatabaseTable
org.apache.nifi.processors.standard.QueryDatabaseTableRecord
org.apache.nifi.processors.standard.QueryRecord
org.apache.nifi.processors.standard.RemoveRecordField
org.apache.nifi.processors.standard.RenameRecordField
org.apache.nifi.processors.standard.ReplaceText
org.apache.nifi.processors.standard.ReplaceTextWithMapping
org.apache.nifi.processors.standard.RetryFlowFile

View File

@ -0,0 +1,182 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import org.apache.avro.Schema;
import org.apache.nifi.json.JsonRecordSetWriter;
import org.apache.nifi.json.JsonTreeReader;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.schema.access.SchemaAccessUtils;
import org.apache.nifi.serialization.SchemaRegistryRecordSetWriter;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collections;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
public class TestRenameRecordField {
private final Path INPUT_FILES = Paths.get("src/test/resources/TestRenameRecordField/input");
private final Path OUTPUT_FILES = Paths.get("src/test/resources/TestRenameRecordField/output");
private TestRunner runner;
@BeforeEach
public void setup() throws InitializationException {
runner = TestRunners.newTestRunner(RenameRecordField.class);
final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter();
runner.addControllerService("writer", jsonWriter);
runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.INHERIT_RECORD_SCHEMA);
runner.setProperty(jsonWriter, JsonRecordSetWriter.SUPPRESS_NULLS, JsonRecordSetWriter.NEVER_SUPPRESS);
runner.setProperty(jsonWriter, JsonRecordSetWriter.PRETTY_PRINT_JSON, "true");
runner.setProperty(jsonWriter, jsonWriter.getSchemaWriteStrategyDescriptor(), SchemaRegistryRecordSetWriter.AVRO_SCHEMA_ATTRIBUTE);
runner.enableControllerService(jsonWriter);
runner.setProperty(AbstractRecordProcessor.RECORD_WRITER, "writer");
final JsonTreeReader reader = new JsonTreeReader();
runner.addControllerService("reader", reader);
runner.enableControllerService(reader);
runner.setProperty(AbstractRecordProcessor.RECORD_READER, "reader");
}
@Test
public void testRenameFieldStaticValue() throws IOException {
runner.setProperty("/application", "favoriteApplication");
runner.enqueue(INPUT_FILES.resolve("simple-person.json"));
runner.run();
runner.assertAllFlowFilesTransferred(AbstractRecordProcessor.REL_SUCCESS, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(AbstractRecordProcessor.REL_SUCCESS).get(0);
assertContentsEqual(out, OUTPUT_FILES.resolve("testRenameFieldStaticValue.json"));
}
@Test
public void testRenameFieldUsingAttribute() throws IOException {
runner.setProperty("/application", "${desiredKey}");
runner.enqueue(INPUT_FILES.resolve("simple-person.json"), Collections.singletonMap("desiredKey", "favorite"));
runner.run();
runner.assertAllFlowFilesTransferred(AbstractRecordProcessor.REL_SUCCESS, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(AbstractRecordProcessor.REL_SUCCESS).get(0);
assertContentsEqual(out, OUTPUT_FILES.resolve("testRenameFieldUsingAttribute.json"));
}
@Test
public void testRenameMultipleFields() throws IOException {
runner.setProperty("/application", "app");
runner.setProperty("/name", "${nameKey}");
runner.enqueue(INPUT_FILES.resolve("simple-person.json"), Collections.singletonMap("nameKey", "full_name"));
runner.run();
runner.assertAllFlowFilesTransferred(AbstractRecordProcessor.REL_SUCCESS, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(AbstractRecordProcessor.REL_SUCCESS).get(0);
assertContentsEqual(out, OUTPUT_FILES.resolve("testRenameMultipleFields.json"));
final String outputSchemaText = out.getAttribute("avro.schema");
final Schema outputSchema = new Schema.Parser().parse(outputSchemaText);
assertEquals(3, outputSchema.getFields().size());
assertNull(outputSchema.getField("application"));
assertNotNull(outputSchema.getField("app"));
assertNull(outputSchema.getField("name"));
assertNotNull(outputSchema.getField("full_name"));
}
@Test
public void testRenameArray() throws IOException {
runner.setProperty("/addresses", "addrs");
runner.enqueue(INPUT_FILES.resolve("complex-person.json"));
runner.run();
runner.assertAllFlowFilesTransferred(AbstractRecordProcessor.REL_SUCCESS, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(AbstractRecordProcessor.REL_SUCCESS).get(0);
assertContentsEqual(out, OUTPUT_FILES.resolve("testRenameArray.json"));
}
@Test
public void testNestedPath() throws IOException {
runner.setProperty("/addresses[*]/street", "streetAddress");
runner.enqueue(INPUT_FILES.resolve("complex-person.json"));
runner.run();
runner.assertAllFlowFilesTransferred(AbstractRecordProcessor.REL_SUCCESS, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(AbstractRecordProcessor.REL_SUCCESS).get(0);
assertContentsEqual(out, OUTPUT_FILES.resolve("testNestedPath.json"));
}
@Test
public void testNamingConflict() throws IOException {
runner.setProperty("/application", "name");
runner.enqueue(INPUT_FILES.resolve("simple-person.json"));
runner.run();
runner.assertAllFlowFilesTransferred(AbstractRecordProcessor.REL_FAILURE, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(AbstractRecordProcessor.REL_FAILURE).get(0);
// Output should be unchanged.
assertContentsEqual(out, INPUT_FILES.resolve("simple-person.json"));
}
@Test
public void testReferencingFieldName() throws IOException {
runner.setProperty("/*", "UPDATED_${field.name}");
runner.enqueue(INPUT_FILES.resolve("simple-person.json"));
runner.run();
runner.assertAllFlowFilesTransferred(AbstractRecordProcessor.REL_SUCCESS, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(AbstractRecordProcessor.REL_SUCCESS).get(0);
assertContentsEqual(out, OUTPUT_FILES.resolve("testReferencingFieldName.json"));
}
@Test
public void testRecursivelyReferencingAllFields() throws IOException {
runner.setProperty("//*", "${field.name:toUpper()}");
runner.enqueue(INPUT_FILES.resolve("complex-person.json"));
runner.run();
runner.assertAllFlowFilesTransferred(AbstractRecordProcessor.REL_SUCCESS, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(AbstractRecordProcessor.REL_SUCCESS).get(0);
assertContentsEqual(out, OUTPUT_FILES.resolve("testRecursivelyReferencingAllFields.json"));
}
@Test
public void testRecursivelyReferencingFieldName() throws IOException {
runner.setProperty("//name", "${field.name:toUpper()}");
runner.enqueue(INPUT_FILES.resolve("complex-person.json"));
runner.run();
runner.assertAllFlowFilesTransferred(AbstractRecordProcessor.REL_SUCCESS, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(AbstractRecordProcessor.REL_SUCCESS).get(0);
assertContentsEqual(out, OUTPUT_FILES.resolve("testRecursivelyReferencingFieldName.json"));
}
private void assertContentsEqual(final MockFlowFile flowFile, final Path expectedContent) throws IOException {
final String flowFileContent = flowFile.getContent();
final String fileContent = new String(Files.readAllBytes(expectedContent));
assertEquals(flowFileContent.replace("\r", ""), fileContent.replace("\r", ""));
}
}

View File

@ -0,0 +1,12 @@
[ {
"name" : "John Doe",
"age" : 30,
"application" : "Apache NiFi",
"addresses" : [ {
"name" : "Home",
"street" : "7777 Main Street",
"city" : "My City",
"state" : "MS",
"zip" : "90210"
} ]
} ]

View File

@ -0,0 +1,5 @@
[ {
"name" : "John Doe",
"age" : 30,
"application" : "Apache NiFi"
} ]

View File

@ -0,0 +1,12 @@
[ {
"name" : "John Doe",
"age" : 30,
"application" : "Apache NiFi",
"addresses" : [ {
"name" : "Home",
"streetAddress" : "7777 Main Street",
"city" : "My City",
"state" : "MS",
"zip" : "90210"
} ]
} ]

View File

@ -0,0 +1,12 @@
[ {
"NAME" : "John Doe",
"AGE" : 30,
"APPLICATION" : "Apache NiFi",
"ADDRESSES" : [ {
"NAME" : "Home",
"STREET" : "7777 Main Street",
"CITY" : "My City",
"STATE" : "MS",
"ZIP" : "90210"
} ]
} ]

View File

@ -0,0 +1,12 @@
[ {
"NAME" : "John Doe",
"age" : 30,
"application" : "Apache NiFi",
"addresses" : [ {
"NAME" : "Home",
"street" : "7777 Main Street",
"city" : "My City",
"state" : "MS",
"zip" : "90210"
} ]
} ]

View File

@ -0,0 +1,5 @@
[ {
"UPDATED_name" : "John Doe",
"UPDATED_age" : 30,
"UPDATED_application" : "Apache NiFi"
} ]

View File

@ -0,0 +1,12 @@
[ {
"name" : "John Doe",
"age" : 30,
"application" : "Apache NiFi",
"addrs" : [ {
"name" : "Home",
"street" : "7777 Main Street",
"city" : "My City",
"state" : "MS",
"zip" : "90210"
} ]
} ]

View File

@ -0,0 +1,5 @@
[ {
"name" : "John Doe",
"age" : 30,
"favoriteApplication" : "Apache NiFi"
} ]

View File

@ -0,0 +1,5 @@
[ {
"name" : "John Doe",
"age" : 30,
"favorite" : "Apache NiFi"
} ]

View File

@ -0,0 +1,5 @@
[ {
"full_name" : "John Doe",
"age" : 30,
"app" : "Apache NiFi"
} ]