diff --git a/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/CompiledExpression.java b/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/CompiledExpression.java index 74741b157f..5ac4f559c4 100644 --- a/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/CompiledExpression.java +++ b/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/CompiledExpression.java @@ -44,6 +44,7 @@ public class CompiledExpression implements Expression { return tree; } + @Override public String getExpression() { return expression; } diff --git a/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/EmptyPreparedQuery.java b/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/EmptyPreparedQuery.java index b33fcf4c43..e00d171912 100644 --- a/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/EmptyPreparedQuery.java +++ b/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/EmptyPreparedQuery.java @@ -20,6 +20,7 @@ import org.apache.nifi.expression.AttributeValueDecorator; import org.apache.nifi.processor.exception.ProcessException; import java.util.Collections; +import java.util.List; import java.util.Set; public class EmptyPreparedQuery implements PreparedQuery { @@ -49,4 +50,9 @@ public class EmptyPreparedQuery implements PreparedQuery { public Set getExplicitlyReferencedAttributes() { return Collections.emptySet(); } + + @Override + public List getExpressions() { + return List.of(); + } } diff --git a/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/Expression.java b/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/Expression.java index 188c503ced..3196a75f7d 100644 --- a/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/Expression.java +++ b/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/Expression.java @@ -28,4 +28,9 @@ public interface Expression { * @return the evaluated value */ String evaluate(EvaluationContext evaluationContext, AttributeValueDecorator decorator); + + /** + * @return the expression as a String + */ + String getExpression(); } diff --git a/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/InvalidPreparedQuery.java b/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/InvalidPreparedQuery.java index 953e9ff100..9670e6769e 100644 --- a/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/InvalidPreparedQuery.java +++ b/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/InvalidPreparedQuery.java @@ -22,6 +22,7 @@ import org.apache.nifi.expression.AttributeValueDecorator; import org.apache.nifi.processor.exception.ProcessException; import java.util.Collections; +import java.util.List; import java.util.Set; /** @@ -59,4 +60,9 @@ public class InvalidPreparedQuery implements PreparedQuery { public Set getExplicitlyReferencedAttributes() { return Collections.emptySet(); } + + @Override + public List getExpressions() { + return List.of(); + } } diff --git a/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/ParameterExpression.java b/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/ParameterExpression.java index fe8595f574..660c3b3772 100644 --- a/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/ParameterExpression.java +++ b/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/ParameterExpression.java @@ -41,4 +41,9 @@ public class ParameterExpression implements Expression { return parameter.getValue(); } + + @Override + public String getExpression() { + return "#{" + parameterName + "}"; + } } diff --git a/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/PreparedQuery.java b/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/PreparedQuery.java index f7a73e33c1..004d198129 100644 --- a/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/PreparedQuery.java +++ b/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/PreparedQuery.java @@ -19,6 +19,7 @@ package org.apache.nifi.attribute.expression.language; import org.apache.nifi.expression.AttributeValueDecorator; import org.apache.nifi.processor.exception.ProcessException; +import java.util.List; import java.util.Set; public interface PreparedQuery { @@ -45,4 +46,9 @@ public interface PreparedQuery { * @return a Set of all attributes that are explicitly referenced by the Prepared Query */ Set getExplicitlyReferencedAttributes(); + + /** + * @return the list of all Expressions that are used to make up the Prepared Query + */ + List getExpressions(); } diff --git a/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/StandardPreparedQuery.java b/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/StandardPreparedQuery.java index 787ded85d7..546ef5ebd4 100644 --- a/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/StandardPreparedQuery.java +++ b/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/StandardPreparedQuery.java @@ -178,4 +178,9 @@ public class StandardPreparedQuery implements PreparedQuery { this.variableImpact = impact; return impact; } + + @Override + public List getExpressions() { + return Collections.unmodifiableList(expressions); + } } diff --git a/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/StringLiteralExpression.java b/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/StringLiteralExpression.java index d6a87d58dd..c77fa5e623 100644 --- a/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/StringLiteralExpression.java +++ b/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/StringLiteralExpression.java @@ -30,4 +30,9 @@ public class StringLiteralExpression implements Expression { public String evaluate(final EvaluationContext evaluationContext, AttributeValueDecorator decorator) { return value; } + + @Override + public String getExpression() { + return value; + } } diff --git a/nifi-commons/nifi-record-path/pom.xml b/nifi-commons/nifi-record-path/pom.xml index ead5937227..9a33fad567 100644 --- a/nifi-commons/nifi-record-path/pom.xml +++ b/nifi-commons/nifi-record-path/pom.xml @@ -94,10 +94,6 @@ org.apache.nifi nifi-record - - org.apache.nifi - nifi-properties - org.apache.nifi nifi-uuid5 diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/Format.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/Format.java index b621f3dd58..0ac3ad0dc7 100644 --- a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/Format.java +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/Format.java @@ -16,12 +16,12 @@ */ package org.apache.nifi.record.path.functions; +import org.apache.commons.lang3.StringUtils; import org.apache.nifi.record.path.FieldValue; import org.apache.nifi.record.path.RecordPathEvaluationContext; import org.apache.nifi.record.path.StandardFieldValue; import org.apache.nifi.record.path.paths.RecordPathSegment; import org.apache.nifi.record.path.util.RecordPathUtils; -import org.apache.nifi.util.StringUtils; import java.time.Instant; import java.time.ZoneId; diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/PadLeft.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/PadLeft.java index 67087a3a61..3b819f400a 100644 --- a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/PadLeft.java +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/PadLeft.java @@ -17,8 +17,8 @@ package org.apache.nifi.record.path.functions; +import org.apache.commons.lang3.StringUtils; import org.apache.nifi.record.path.paths.RecordPathSegment; -import org.apache.nifi.util.StringUtils; public class PadLeft extends Padding { diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/PadRight.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/PadRight.java index f45d4910af..9da50a6f58 100644 --- a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/PadRight.java +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/PadRight.java @@ -17,8 +17,8 @@ package org.apache.nifi.record.path.functions; +import org.apache.commons.lang3.StringUtils; import org.apache.nifi.record.path.paths.RecordPathSegment; -import org.apache.nifi.util.StringUtils; public class PadRight extends Padding { diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/ToDate.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/ToDate.java index 6637cc4079..9a076c2450 100644 --- a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/ToDate.java +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/ToDate.java @@ -16,12 +16,12 @@ */ package org.apache.nifi.record.path.functions; +import org.apache.commons.lang3.StringUtils; import org.apache.nifi.record.path.FieldValue; import org.apache.nifi.record.path.RecordPathEvaluationContext; import org.apache.nifi.record.path.StandardFieldValue; import org.apache.nifi.record.path.paths.RecordPathSegment; import org.apache.nifi.record.path.util.RecordPathUtils; -import org.apache.nifi.util.StringUtils; import java.time.Instant; import java.time.ZoneId; diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java index 1ab6539297..35f3974a24 100644 --- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java @@ -473,6 +473,25 @@ public class MapRecord implements Record { } } + @Override + public void setValue(final String fieldName, final Object value) { + final Optional existingField = getSchema().getField(fieldName); + RecordField recordField = null; + if (existingField.isPresent()) { + final DataType existingDataType = existingField.get().getDataType(); + final boolean compatible = DataTypeUtils.isCompatibleDataType(value, existingDataType); + if (compatible) { + recordField = existingField.get(); + } + } + if (recordField == null) { + final DataType inferredDataType = DataTypeUtils.inferDataType(value, RecordFieldType.STRING.getDataType()); + recordField = new RecordField(fieldName, inferredDataType); + } + + setValue(recordField, value); + } + @Override public void remove(final RecordField field) { final Optional existingField = resolveField(field); @@ -521,21 +540,6 @@ public class MapRecord implements Record { schema = new SimpleRecordSchema(schemaFields); } - @Override - public void setValue(final String fieldName, final Object value) { - final Optional existingField = setValueAndGetField(fieldName, value); - - if (existingField.isEmpty()) { - if (inactiveFields == null) { - inactiveFields = new LinkedHashSet<>(); - } - - final DataType inferredDataType = DataTypeUtils.inferDataType(value, RecordFieldType.STRING.getDataType()); - final RecordField field = new RecordField(fieldName, inferredDataType); - inactiveFields.add(field); - } - } - private Optional setValueAndGetField(final String fieldName, final Object value) { final Optional field = getSchema().getField(fieldName); @@ -642,7 +646,7 @@ public class MapRecord implements Record { @Override public void incorporateInactiveFields() { - final List updatedFields = new ArrayList<>(); + final Map fieldsByName = new LinkedHashMap<>(); boolean fieldUpdated = false; for (final RecordField field : schema.getFields()) { @@ -651,7 +655,7 @@ public class MapRecord implements Record { fieldUpdated = true; } - updatedFields.add(updated); + fieldsByName.put(updated.getFieldName(), updated); } if (!fieldUpdated && (inactiveFields == null || inactiveFields.isEmpty())) { @@ -660,13 +664,21 @@ public class MapRecord implements Record { if (inactiveFields != null) { for (final RecordField field : inactiveFields) { - if (!updatedFields.contains(field)) { - updatedFields.add(field); + final RecordField existingField = fieldsByName.get(field.getFieldName()); + if (existingField == null) { + fieldsByName.put(field.getFieldName(), field); + } else { + if (Objects.equals(existingField, field)) { + continue; + } + + final RecordField merged = DataTypeUtils.merge(existingField, field); + fieldsByName.put(field.getFieldName(), merged); } } } - this.schema = new SimpleRecordSchema(updatedFields); + this.schema = new SimpleRecordSchema(new ArrayList<>(fieldsByName.values())); } private RecordField getUpdatedRecordField(final RecordField field) { diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java index cd7fe2da82..de4ddf27a7 100644 --- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java @@ -798,13 +798,14 @@ public class DataTypeUtils { return false; } // Either an object array (check the element type) or a String to be converted to byte[] - if (value instanceof Object[]) { - for (Object o : ((Object[]) value)) { + if (value instanceof final Object[] array) { + for (final Object element : array) { // Check each element to ensure its type is the same or can be coerced (if need be) - if (!isCompatibleDataType(o, elementDataType, strict)) { + if (!isCompatibleDataType(element, elementDataType, strict)) { return false; } } + return true; } else { return value instanceof String && RecordFieldType.BYTE.getDataType().equals(elementDataType); diff --git a/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestMapRecord.java b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestMapRecord.java index 2240e57270..70219ac8a3 100644 --- a/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestMapRecord.java +++ b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestMapRecord.java @@ -19,6 +19,7 @@ package org.apache.nifi.serialization.record; import org.apache.nifi.serialization.SimpleRecordSchema; import org.apache.nifi.serialization.record.type.ArrayDataType; +import org.apache.nifi.serialization.record.type.ChoiceDataType; import org.apache.nifi.serialization.record.type.RecordDataType; import org.junit.jupiter.api.Test; @@ -32,11 +33,60 @@ import java.util.Set; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertSame; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; public class TestMapRecord { + @Test + public void testIncorporateInactiveFieldsWithUpdate() { + final List fields = new ArrayList<>(); + fields.add(new RecordField("string", RecordFieldType.STRING.getDataType())); + fields.add(new RecordField("number", RecordFieldType.INT.getDataType())); + + final RecordSchema schema = new SimpleRecordSchema(fields); + final Map values = new HashMap<>(); + final Record record = new MapRecord(schema, values); + record.setValue("number", "value"); + record.incorporateInactiveFields(); + + final RecordSchema updatedSchema = record.getSchema(); + final DataType dataType = updatedSchema.getDataType("number").orElseThrow(); + assertSame(RecordFieldType.CHOICE, dataType.getFieldType()); + + final ChoiceDataType choiceDataType = (ChoiceDataType) dataType; + final List subTypes = choiceDataType.getPossibleSubTypes(); + assertEquals(2, subTypes.size()); + assertTrue(subTypes.contains(RecordFieldType.INT.getDataType())); + assertTrue(subTypes.contains(RecordFieldType.STRING.getDataType())); + } + + @Test + public void testIncorporateInactiveFieldsWithConflict() { + final List fields = new ArrayList<>(); + fields.add(new RecordField("string", RecordFieldType.STRING.getDataType())); + fields.add(new RecordField("number", RecordFieldType.INT.getDataType())); + + final RecordSchema schema = new SimpleRecordSchema(fields); + final Map values = new HashMap<>(); + final Record record = new MapRecord(schema, values); + record.setValue("new", 8); + record.incorporateInactiveFields(); + + record.setValue("new", "eight"); + record.incorporateInactiveFields(); + + final DataType dataType = record.getSchema().getDataType("new").orElseThrow(); + assertSame(RecordFieldType.CHOICE, dataType.getFieldType()); + + final ChoiceDataType choiceDataType = (ChoiceDataType) dataType; + final List subTypes = choiceDataType.getPossibleSubTypes(); + assertEquals(2, subTypes.size()); + assertTrue(subTypes.contains(RecordFieldType.INT.getDataType())); + assertTrue(subTypes.contains(RecordFieldType.STRING.getDataType())); + } + @Test public void testDefaultValue() { final List fields = new ArrayList<>();