From 8ad3c731dabdf9f951b949ce733e81cdebc06749 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Tue, 13 Feb 2024 08:31:29 -0500 Subject: [PATCH] NIFI-12797 Refactored Record.incorporateInactiveFields Refactored Record.incorporateInactiveFields to handle when an updated field and an inactive field have the same name (which can happen if incorporateInactiveFields is called multiple times). Also refactored the setValue(String, Object) method to call setValue(RecordField, Object) because the logic had diverged. Also exposed the text of Expression Language, which led to the discovery of this bug. This closes #8413 Signed-off-by: David Handermann --- .../language/CompiledExpression.java | 1 + .../language/EmptyPreparedQuery.java | 6 +++ .../expression/language/Expression.java | 5 ++ .../language/InvalidPreparedQuery.java | 6 +++ .../language/ParameterExpression.java | 5 ++ .../expression/language/PreparedQuery.java | 6 +++ .../language/StandardPreparedQuery.java | 5 ++ .../language/StringLiteralExpression.java | 5 ++ nifi-commons/nifi-record-path/pom.xml | 4 -- .../nifi/record/path/functions/Format.java | 2 +- .../nifi/record/path/functions/PadLeft.java | 2 +- .../nifi/record/path/functions/PadRight.java | 2 +- .../nifi/record/path/functions/ToDate.java | 2 +- .../nifi/serialization/record/MapRecord.java | 52 ++++++++++++------- .../record/util/DataTypeUtils.java | 7 +-- .../serialization/record/TestMapRecord.java | 50 ++++++++++++++++++ 16 files changed, 129 insertions(+), 31 deletions(-) diff --git a/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/CompiledExpression.java b/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/CompiledExpression.java index 74741b157f..5ac4f559c4 100644 --- a/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/CompiledExpression.java +++ b/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/CompiledExpression.java @@ -44,6 +44,7 @@ public class CompiledExpression implements Expression { return tree; } + @Override public String getExpression() { return expression; } diff --git a/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/EmptyPreparedQuery.java b/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/EmptyPreparedQuery.java index b33fcf4c43..e00d171912 100644 --- a/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/EmptyPreparedQuery.java +++ b/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/EmptyPreparedQuery.java @@ -20,6 +20,7 @@ import org.apache.nifi.expression.AttributeValueDecorator; import org.apache.nifi.processor.exception.ProcessException; import java.util.Collections; +import java.util.List; import java.util.Set; public class EmptyPreparedQuery implements PreparedQuery { @@ -49,4 +50,9 @@ public class EmptyPreparedQuery implements PreparedQuery { public Set getExplicitlyReferencedAttributes() { return Collections.emptySet(); } + + @Override + public List getExpressions() { + return List.of(); + } } diff --git a/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/Expression.java b/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/Expression.java index 188c503ced..3196a75f7d 100644 --- a/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/Expression.java +++ b/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/Expression.java @@ -28,4 +28,9 @@ public interface Expression { * @return the evaluated value */ String evaluate(EvaluationContext evaluationContext, AttributeValueDecorator decorator); + + /** + * @return the expression as a String + */ + String getExpression(); } diff --git a/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/InvalidPreparedQuery.java b/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/InvalidPreparedQuery.java index 953e9ff100..9670e6769e 100644 --- a/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/InvalidPreparedQuery.java +++ b/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/InvalidPreparedQuery.java @@ -22,6 +22,7 @@ import org.apache.nifi.expression.AttributeValueDecorator; import org.apache.nifi.processor.exception.ProcessException; import java.util.Collections; +import java.util.List; import java.util.Set; /** @@ -59,4 +60,9 @@ public class InvalidPreparedQuery implements PreparedQuery { public Set getExplicitlyReferencedAttributes() { return Collections.emptySet(); } + + @Override + public List getExpressions() { + return List.of(); + } } diff --git a/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/ParameterExpression.java b/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/ParameterExpression.java index fe8595f574..660c3b3772 100644 --- a/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/ParameterExpression.java +++ b/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/ParameterExpression.java @@ -41,4 +41,9 @@ public class ParameterExpression implements Expression { return parameter.getValue(); } + + @Override + public String getExpression() { + return "#{" + parameterName + "}"; + } } diff --git a/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/PreparedQuery.java b/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/PreparedQuery.java index f7a73e33c1..004d198129 100644 --- a/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/PreparedQuery.java +++ b/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/PreparedQuery.java @@ -19,6 +19,7 @@ package org.apache.nifi.attribute.expression.language; import org.apache.nifi.expression.AttributeValueDecorator; import org.apache.nifi.processor.exception.ProcessException; +import java.util.List; import java.util.Set; public interface PreparedQuery { @@ -45,4 +46,9 @@ public interface PreparedQuery { * @return a Set of all attributes that are explicitly referenced by the Prepared Query */ Set getExplicitlyReferencedAttributes(); + + /** + * @return the list of all Expressions that are used to make up the Prepared Query + */ + List getExpressions(); } diff --git a/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/StandardPreparedQuery.java b/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/StandardPreparedQuery.java index 787ded85d7..546ef5ebd4 100644 --- a/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/StandardPreparedQuery.java +++ b/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/StandardPreparedQuery.java @@ -178,4 +178,9 @@ public class StandardPreparedQuery implements PreparedQuery { this.variableImpact = impact; return impact; } + + @Override + public List getExpressions() { + return Collections.unmodifiableList(expressions); + } } diff --git a/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/StringLiteralExpression.java b/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/StringLiteralExpression.java index d6a87d58dd..c77fa5e623 100644 --- a/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/StringLiteralExpression.java +++ b/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/StringLiteralExpression.java @@ -30,4 +30,9 @@ public class StringLiteralExpression implements Expression { public String evaluate(final EvaluationContext evaluationContext, AttributeValueDecorator decorator) { return value; } + + @Override + public String getExpression() { + return value; + } } diff --git a/nifi-commons/nifi-record-path/pom.xml b/nifi-commons/nifi-record-path/pom.xml index ead5937227..9a33fad567 100644 --- a/nifi-commons/nifi-record-path/pom.xml +++ b/nifi-commons/nifi-record-path/pom.xml @@ -94,10 +94,6 @@ org.apache.nifi nifi-record - - org.apache.nifi - nifi-properties - org.apache.nifi nifi-uuid5 diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/Format.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/Format.java index b621f3dd58..0ac3ad0dc7 100644 --- a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/Format.java +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/Format.java @@ -16,12 +16,12 @@ */ package org.apache.nifi.record.path.functions; +import org.apache.commons.lang3.StringUtils; import org.apache.nifi.record.path.FieldValue; import org.apache.nifi.record.path.RecordPathEvaluationContext; import org.apache.nifi.record.path.StandardFieldValue; import org.apache.nifi.record.path.paths.RecordPathSegment; import org.apache.nifi.record.path.util.RecordPathUtils; -import org.apache.nifi.util.StringUtils; import java.time.Instant; import java.time.ZoneId; diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/PadLeft.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/PadLeft.java index 67087a3a61..3b819f400a 100644 --- a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/PadLeft.java +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/PadLeft.java @@ -17,8 +17,8 @@ package org.apache.nifi.record.path.functions; +import org.apache.commons.lang3.StringUtils; import org.apache.nifi.record.path.paths.RecordPathSegment; -import org.apache.nifi.util.StringUtils; public class PadLeft extends Padding { diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/PadRight.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/PadRight.java index f45d4910af..9da50a6f58 100644 --- a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/PadRight.java +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/PadRight.java @@ -17,8 +17,8 @@ package org.apache.nifi.record.path.functions; +import org.apache.commons.lang3.StringUtils; import org.apache.nifi.record.path.paths.RecordPathSegment; -import org.apache.nifi.util.StringUtils; public class PadRight extends Padding { diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/ToDate.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/ToDate.java index 6637cc4079..9a076c2450 100644 --- a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/ToDate.java +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/ToDate.java @@ -16,12 +16,12 @@ */ package org.apache.nifi.record.path.functions; +import org.apache.commons.lang3.StringUtils; import org.apache.nifi.record.path.FieldValue; import org.apache.nifi.record.path.RecordPathEvaluationContext; import org.apache.nifi.record.path.StandardFieldValue; import org.apache.nifi.record.path.paths.RecordPathSegment; import org.apache.nifi.record.path.util.RecordPathUtils; -import org.apache.nifi.util.StringUtils; import java.time.Instant; import java.time.ZoneId; diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java index 1ab6539297..35f3974a24 100644 --- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java @@ -473,6 +473,25 @@ public class MapRecord implements Record { } } + @Override + public void setValue(final String fieldName, final Object value) { + final Optional existingField = getSchema().getField(fieldName); + RecordField recordField = null; + if (existingField.isPresent()) { + final DataType existingDataType = existingField.get().getDataType(); + final boolean compatible = DataTypeUtils.isCompatibleDataType(value, existingDataType); + if (compatible) { + recordField = existingField.get(); + } + } + if (recordField == null) { + final DataType inferredDataType = DataTypeUtils.inferDataType(value, RecordFieldType.STRING.getDataType()); + recordField = new RecordField(fieldName, inferredDataType); + } + + setValue(recordField, value); + } + @Override public void remove(final RecordField field) { final Optional existingField = resolveField(field); @@ -521,21 +540,6 @@ public class MapRecord implements Record { schema = new SimpleRecordSchema(schemaFields); } - @Override - public void setValue(final String fieldName, final Object value) { - final Optional existingField = setValueAndGetField(fieldName, value); - - if (existingField.isEmpty()) { - if (inactiveFields == null) { - inactiveFields = new LinkedHashSet<>(); - } - - final DataType inferredDataType = DataTypeUtils.inferDataType(value, RecordFieldType.STRING.getDataType()); - final RecordField field = new RecordField(fieldName, inferredDataType); - inactiveFields.add(field); - } - } - private Optional setValueAndGetField(final String fieldName, final Object value) { final Optional field = getSchema().getField(fieldName); @@ -642,7 +646,7 @@ public class MapRecord implements Record { @Override public void incorporateInactiveFields() { - final List updatedFields = new ArrayList<>(); + final Map fieldsByName = new LinkedHashMap<>(); boolean fieldUpdated = false; for (final RecordField field : schema.getFields()) { @@ -651,7 +655,7 @@ public class MapRecord implements Record { fieldUpdated = true; } - updatedFields.add(updated); + fieldsByName.put(updated.getFieldName(), updated); } if (!fieldUpdated && (inactiveFields == null || inactiveFields.isEmpty())) { @@ -660,13 +664,21 @@ public class MapRecord implements Record { if (inactiveFields != null) { for (final RecordField field : inactiveFields) { - if (!updatedFields.contains(field)) { - updatedFields.add(field); + final RecordField existingField = fieldsByName.get(field.getFieldName()); + if (existingField == null) { + fieldsByName.put(field.getFieldName(), field); + } else { + if (Objects.equals(existingField, field)) { + continue; + } + + final RecordField merged = DataTypeUtils.merge(existingField, field); + fieldsByName.put(field.getFieldName(), merged); } } } - this.schema = new SimpleRecordSchema(updatedFields); + this.schema = new SimpleRecordSchema(new ArrayList<>(fieldsByName.values())); } private RecordField getUpdatedRecordField(final RecordField field) { diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java index cd7fe2da82..de4ddf27a7 100644 --- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java @@ -798,13 +798,14 @@ public class DataTypeUtils { return false; } // Either an object array (check the element type) or a String to be converted to byte[] - if (value instanceof Object[]) { - for (Object o : ((Object[]) value)) { + if (value instanceof final Object[] array) { + for (final Object element : array) { // Check each element to ensure its type is the same or can be coerced (if need be) - if (!isCompatibleDataType(o, elementDataType, strict)) { + if (!isCompatibleDataType(element, elementDataType, strict)) { return false; } } + return true; } else { return value instanceof String && RecordFieldType.BYTE.getDataType().equals(elementDataType); diff --git a/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestMapRecord.java b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestMapRecord.java index 2240e57270..70219ac8a3 100644 --- a/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestMapRecord.java +++ b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestMapRecord.java @@ -19,6 +19,7 @@ package org.apache.nifi.serialization.record; import org.apache.nifi.serialization.SimpleRecordSchema; import org.apache.nifi.serialization.record.type.ArrayDataType; +import org.apache.nifi.serialization.record.type.ChoiceDataType; import org.apache.nifi.serialization.record.type.RecordDataType; import org.junit.jupiter.api.Test; @@ -32,11 +33,60 @@ import java.util.Set; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertSame; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; public class TestMapRecord { + @Test + public void testIncorporateInactiveFieldsWithUpdate() { + final List fields = new ArrayList<>(); + fields.add(new RecordField("string", RecordFieldType.STRING.getDataType())); + fields.add(new RecordField("number", RecordFieldType.INT.getDataType())); + + final RecordSchema schema = new SimpleRecordSchema(fields); + final Map values = new HashMap<>(); + final Record record = new MapRecord(schema, values); + record.setValue("number", "value"); + record.incorporateInactiveFields(); + + final RecordSchema updatedSchema = record.getSchema(); + final DataType dataType = updatedSchema.getDataType("number").orElseThrow(); + assertSame(RecordFieldType.CHOICE, dataType.getFieldType()); + + final ChoiceDataType choiceDataType = (ChoiceDataType) dataType; + final List subTypes = choiceDataType.getPossibleSubTypes(); + assertEquals(2, subTypes.size()); + assertTrue(subTypes.contains(RecordFieldType.INT.getDataType())); + assertTrue(subTypes.contains(RecordFieldType.STRING.getDataType())); + } + + @Test + public void testIncorporateInactiveFieldsWithConflict() { + final List fields = new ArrayList<>(); + fields.add(new RecordField("string", RecordFieldType.STRING.getDataType())); + fields.add(new RecordField("number", RecordFieldType.INT.getDataType())); + + final RecordSchema schema = new SimpleRecordSchema(fields); + final Map values = new HashMap<>(); + final Record record = new MapRecord(schema, values); + record.setValue("new", 8); + record.incorporateInactiveFields(); + + record.setValue("new", "eight"); + record.incorporateInactiveFields(); + + final DataType dataType = record.getSchema().getDataType("new").orElseThrow(); + assertSame(RecordFieldType.CHOICE, dataType.getFieldType()); + + final ChoiceDataType choiceDataType = (ChoiceDataType) dataType; + final List subTypes = choiceDataType.getPossibleSubTypes(); + assertEquals(2, subTypes.size()); + assertTrue(subTypes.contains(RecordFieldType.INT.getDataType())); + assertTrue(subTypes.contains(RecordFieldType.STRING.getDataType())); + } + @Test public void testDefaultValue() { final List fields = new ArrayList<>();