NIFI-12797 Refactored Record.incorporateInactiveFields

Refactored Record.incorporateInactiveFields to handle when an updated field and an inactive field have the same name (which can happen if incorporateInactiveFields is called multiple times). Also refactored the setValue(String, Object) method to call setValue(RecordField, Object) because the logic had diverged. Also exposed the text of Expression Language, which led to the discovery of this bug.

This closes #8413

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Mark Payne 2024-02-13 08:31:29 -05:00 committed by exceptionfactory
parent 942d13c118
commit 8ad3c731da
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
16 changed files with 129 additions and 31 deletions

View File

@ -44,6 +44,7 @@ public class CompiledExpression implements Expression {
return tree;
}
@Override
public String getExpression() {
return expression;
}

View File

@ -20,6 +20,7 @@ import org.apache.nifi.expression.AttributeValueDecorator;
import org.apache.nifi.processor.exception.ProcessException;
import java.util.Collections;
import java.util.List;
import java.util.Set;
public class EmptyPreparedQuery implements PreparedQuery {
@ -49,4 +50,9 @@ public class EmptyPreparedQuery implements PreparedQuery {
public Set<String> getExplicitlyReferencedAttributes() {
return Collections.emptySet();
}
@Override
public List<Expression> getExpressions() {
return List.of();
}
}

View File

@ -28,4 +28,9 @@ public interface Expression {
* @return the evaluated value
*/
String evaluate(EvaluationContext evaluationContext, AttributeValueDecorator decorator);
/**
* @return the expression as a String
*/
String getExpression();
}

View File

@ -22,6 +22,7 @@ import org.apache.nifi.expression.AttributeValueDecorator;
import org.apache.nifi.processor.exception.ProcessException;
import java.util.Collections;
import java.util.List;
import java.util.Set;
/**
@ -59,4 +60,9 @@ public class InvalidPreparedQuery implements PreparedQuery {
public Set<String> getExplicitlyReferencedAttributes() {
return Collections.emptySet();
}
@Override
public List<Expression> getExpressions() {
return List.of();
}
}

View File

@ -41,4 +41,9 @@ public class ParameterExpression implements Expression {
return parameter.getValue();
}
@Override
public String getExpression() {
return "#{" + parameterName + "}";
}
}

View File

@ -19,6 +19,7 @@ package org.apache.nifi.attribute.expression.language;
import org.apache.nifi.expression.AttributeValueDecorator;
import org.apache.nifi.processor.exception.ProcessException;
import java.util.List;
import java.util.Set;
public interface PreparedQuery {
@ -45,4 +46,9 @@ public interface PreparedQuery {
* @return a Set of all attributes that are explicitly referenced by the Prepared Query
*/
Set<String> getExplicitlyReferencedAttributes();
/**
* @return the list of all Expressions that are used to make up the Prepared Query
*/
List<Expression> getExpressions();
}

View File

@ -178,4 +178,9 @@ public class StandardPreparedQuery implements PreparedQuery {
this.variableImpact = impact;
return impact;
}
@Override
public List<Expression> getExpressions() {
return Collections.unmodifiableList(expressions);
}
}

View File

@ -30,4 +30,9 @@ public class StringLiteralExpression implements Expression {
public String evaluate(final EvaluationContext evaluationContext, AttributeValueDecorator decorator) {
return value;
}
@Override
public String getExpression() {
return value;
}
}

View File

@ -94,10 +94,6 @@
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-properties</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-uuid5</artifactId>

View File

@ -16,12 +16,12 @@
*/
package org.apache.nifi.record.path.functions;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.record.path.FieldValue;
import org.apache.nifi.record.path.RecordPathEvaluationContext;
import org.apache.nifi.record.path.StandardFieldValue;
import org.apache.nifi.record.path.paths.RecordPathSegment;
import org.apache.nifi.record.path.util.RecordPathUtils;
import org.apache.nifi.util.StringUtils;
import java.time.Instant;
import java.time.ZoneId;

View File

@ -17,8 +17,8 @@
package org.apache.nifi.record.path.functions;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.record.path.paths.RecordPathSegment;
import org.apache.nifi.util.StringUtils;
public class PadLeft extends Padding {

View File

@ -17,8 +17,8 @@
package org.apache.nifi.record.path.functions;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.record.path.paths.RecordPathSegment;
import org.apache.nifi.util.StringUtils;
public class PadRight extends Padding {

View File

@ -16,12 +16,12 @@
*/
package org.apache.nifi.record.path.functions;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.record.path.FieldValue;
import org.apache.nifi.record.path.RecordPathEvaluationContext;
import org.apache.nifi.record.path.StandardFieldValue;
import org.apache.nifi.record.path.paths.RecordPathSegment;
import org.apache.nifi.record.path.util.RecordPathUtils;
import org.apache.nifi.util.StringUtils;
import java.time.Instant;
import java.time.ZoneId;

View File

@ -473,6 +473,25 @@ public class MapRecord implements Record {
}
}
@Override
public void setValue(final String fieldName, final Object value) {
final Optional<RecordField> existingField = getSchema().getField(fieldName);
RecordField recordField = null;
if (existingField.isPresent()) {
final DataType existingDataType = existingField.get().getDataType();
final boolean compatible = DataTypeUtils.isCompatibleDataType(value, existingDataType);
if (compatible) {
recordField = existingField.get();
}
}
if (recordField == null) {
final DataType inferredDataType = DataTypeUtils.inferDataType(value, RecordFieldType.STRING.getDataType());
recordField = new RecordField(fieldName, inferredDataType);
}
setValue(recordField, value);
}
@Override
public void remove(final RecordField field) {
final Optional<RecordField> existingField = resolveField(field);
@ -521,21 +540,6 @@ public class MapRecord implements Record {
schema = new SimpleRecordSchema(schemaFields);
}
@Override
public void setValue(final String fieldName, final Object value) {
final Optional<RecordField> existingField = setValueAndGetField(fieldName, value);
if (existingField.isEmpty()) {
if (inactiveFields == null) {
inactiveFields = new LinkedHashSet<>();
}
final DataType inferredDataType = DataTypeUtils.inferDataType(value, RecordFieldType.STRING.getDataType());
final RecordField field = new RecordField(fieldName, inferredDataType);
inactiveFields.add(field);
}
}
private Optional<RecordField> setValueAndGetField(final String fieldName, final Object value) {
final Optional<RecordField> field = getSchema().getField(fieldName);
@ -642,7 +646,7 @@ public class MapRecord implements Record {
@Override
public void incorporateInactiveFields() {
final List<RecordField> updatedFields = new ArrayList<>();
final Map<String, RecordField> fieldsByName = new LinkedHashMap<>();
boolean fieldUpdated = false;
for (final RecordField field : schema.getFields()) {
@ -651,7 +655,7 @@ public class MapRecord implements Record {
fieldUpdated = true;
}
updatedFields.add(updated);
fieldsByName.put(updated.getFieldName(), updated);
}
if (!fieldUpdated && (inactiveFields == null || inactiveFields.isEmpty())) {
@ -660,13 +664,21 @@ public class MapRecord implements Record {
if (inactiveFields != null) {
for (final RecordField field : inactiveFields) {
if (!updatedFields.contains(field)) {
updatedFields.add(field);
final RecordField existingField = fieldsByName.get(field.getFieldName());
if (existingField == null) {
fieldsByName.put(field.getFieldName(), field);
} else {
if (Objects.equals(existingField, field)) {
continue;
}
final RecordField merged = DataTypeUtils.merge(existingField, field);
fieldsByName.put(field.getFieldName(), merged);
}
}
}
this.schema = new SimpleRecordSchema(updatedFields);
this.schema = new SimpleRecordSchema(new ArrayList<>(fieldsByName.values()));
}
private RecordField getUpdatedRecordField(final RecordField field) {

View File

@ -798,13 +798,14 @@ public class DataTypeUtils {
return false;
}
// Either an object array (check the element type) or a String to be converted to byte[]
if (value instanceof Object[]) {
for (Object o : ((Object[]) value)) {
if (value instanceof final Object[] array) {
for (final Object element : array) {
// Check each element to ensure its type is the same or can be coerced (if need be)
if (!isCompatibleDataType(o, elementDataType, strict)) {
if (!isCompatibleDataType(element, elementDataType, strict)) {
return false;
}
}
return true;
} else {
return value instanceof String && RecordFieldType.BYTE.getDataType().equals(elementDataType);

View File

@ -19,6 +19,7 @@ package org.apache.nifi.serialization.record;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.type.ArrayDataType;
import org.apache.nifi.serialization.record.type.ChoiceDataType;
import org.apache.nifi.serialization.record.type.RecordDataType;
import org.junit.jupiter.api.Test;
@ -32,11 +33,60 @@ import java.util.Set;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestMapRecord {
@Test
public void testIncorporateInactiveFieldsWithUpdate() {
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("string", RecordFieldType.STRING.getDataType()));
fields.add(new RecordField("number", RecordFieldType.INT.getDataType()));
final RecordSchema schema = new SimpleRecordSchema(fields);
final Map<String, Object> values = new HashMap<>();
final Record record = new MapRecord(schema, values);
record.setValue("number", "value");
record.incorporateInactiveFields();
final RecordSchema updatedSchema = record.getSchema();
final DataType dataType = updatedSchema.getDataType("number").orElseThrow();
assertSame(RecordFieldType.CHOICE, dataType.getFieldType());
final ChoiceDataType choiceDataType = (ChoiceDataType) dataType;
final List<DataType> subTypes = choiceDataType.getPossibleSubTypes();
assertEquals(2, subTypes.size());
assertTrue(subTypes.contains(RecordFieldType.INT.getDataType()));
assertTrue(subTypes.contains(RecordFieldType.STRING.getDataType()));
}
@Test
public void testIncorporateInactiveFieldsWithConflict() {
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("string", RecordFieldType.STRING.getDataType()));
fields.add(new RecordField("number", RecordFieldType.INT.getDataType()));
final RecordSchema schema = new SimpleRecordSchema(fields);
final Map<String, Object> values = new HashMap<>();
final Record record = new MapRecord(schema, values);
record.setValue("new", 8);
record.incorporateInactiveFields();
record.setValue("new", "eight");
record.incorporateInactiveFields();
final DataType dataType = record.getSchema().getDataType("new").orElseThrow();
assertSame(RecordFieldType.CHOICE, dataType.getFieldType());
final ChoiceDataType choiceDataType = (ChoiceDataType) dataType;
final List<DataType> subTypes = choiceDataType.getPossibleSubTypes();
assertEquals(2, subTypes.size());
assertTrue(subTypes.contains(RecordFieldType.INT.getDataType()));
assertTrue(subTypes.contains(RecordFieldType.STRING.getDataType()));
}
@Test
public void testDefaultValue() {
final List<RecordField> fields = new ArrayList<>();