NIFI-6691: This closes #3754. Fixed bug in MapRecord's algorithm for incorporating Inactive Fields, by recursing into any 'child fields' that contain records; also fixed bug in UpdateRecord that caused it to incorrectly map the first result to all elements if the key of the property points to an array

This commit is contained in:
Mark Payne 2019-09-19 16:22:19 -04:00 committed by Joe Witt
parent 13381c2254
commit d0371a5ef1
No known key found for this signature in database
GPG Key ID: 9093BF854F811A1A
6 changed files with 183 additions and 28 deletions

View File

@ -20,7 +20,9 @@ package org.apache.nifi.serialization.record;
import org.apache.nifi.serialization.SchemaValidationException; import org.apache.nifi.serialization.SchemaValidationException;
import org.apache.nifi.serialization.SimpleRecordSchema; import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.type.ArrayDataType; import org.apache.nifi.serialization.record.type.ArrayDataType;
import org.apache.nifi.serialization.record.type.ChoiceDataType;
import org.apache.nifi.serialization.record.type.MapDataType; import org.apache.nifi.serialization.record.type.MapDataType;
import org.apache.nifi.serialization.record.type.RecordDataType;
import org.apache.nifi.serialization.record.util.DataTypeUtils; import org.apache.nifi.serialization.record.util.DataTypeUtils;
import org.apache.nifi.serialization.record.util.IllegalTypeConversionException; import org.apache.nifi.serialization.record.util.IllegalTypeConversionException;
@ -434,22 +436,125 @@ public class MapRecord implements Record {
this.schema = DataTypeUtils.merge(this.schema, other); this.schema = DataTypeUtils.merge(this.schema, other);
} }
@Override @Override
public void incorporateInactiveFields() { public void incorporateInactiveFields() {
if (inactiveFields == null) { final List<RecordField> updatedFields = new ArrayList<>();
return;
for (final RecordField field : schema.getFields()) {
updatedFields.add(getUpdatedRecordField(field));
} }
final List<RecordField> allFields = new ArrayList<>(schema.getFieldCount() + inactiveFields.size()); if (inactiveFields != null) {
allFields.addAll(schema.getFields()); for (final RecordField field : inactiveFields) {
if (!updatedFields.contains(field)) {
for (final RecordField field : inactiveFields) { updatedFields.add(field);
if (!allFields.contains(field)) { }
allFields.add(field);
} }
} }
this.schema = new SimpleRecordSchema(allFields); this.schema = new SimpleRecordSchema(updatedFields);
}
private RecordField getUpdatedRecordField(final RecordField field) {
final DataType dataType = field.getDataType();
final RecordFieldType fieldType = dataType.getFieldType();
if (isSimpleType(fieldType)) {
return field;
}
final Object value = getValue(field);
if (value == null) {
return field;
}
if (fieldType == RecordFieldType.RECORD && value instanceof Record) {
final Record childRecord = (Record) value;
childRecord.incorporateInactiveFields();
final RecordSchema definedChildSchema = ((RecordDataType) dataType).getChildSchema();
final RecordSchema actualChildSchema = childRecord.getSchema();
final RecordSchema combinedChildSchema = DataTypeUtils.merge(definedChildSchema, actualChildSchema);
final DataType combinedDataType = RecordFieldType.RECORD.getRecordDataType(combinedChildSchema);
final RecordField updatedField = new RecordField(field.getFieldName(), combinedDataType, field.getDefaultValue(), field.getAliases(), field.isNullable());
return updatedField;
}
if (fieldType == RecordFieldType.ARRAY && value instanceof Object[]) {
final DataType elementType = ((ArrayDataType) dataType).getElementType();
final RecordFieldType elementFieldType = elementType.getFieldType();
if (elementFieldType == RecordFieldType.RECORD) {
final Object[] array = (Object[]) value;
RecordSchema mergedSchema = ((RecordDataType) elementType).getChildSchema();
for (final Object element : array) {
if (element == null) {
continue;
}
final Record record = (Record) element;
record.incorporateInactiveFields();
mergedSchema = DataTypeUtils.merge(mergedSchema, record.getSchema());
}
final DataType mergedRecordType = RecordFieldType.RECORD.getRecordDataType(mergedSchema);
final DataType mergedDataType = RecordFieldType.ARRAY.getArrayDataType(mergedRecordType);
final RecordField updatedField = new RecordField(field.getFieldName(), mergedDataType, field.getDefaultValue(), field.getAliases(), field.isNullable());
return updatedField;
}
return field;
}
if (fieldType == RecordFieldType.CHOICE) {
final ChoiceDataType choiceDataType = (ChoiceDataType) dataType;
final List<DataType> possibleTypes = choiceDataType.getPossibleSubTypes();
final DataType chosenDataType = DataTypeUtils.chooseDataType(value, choiceDataType);
if (chosenDataType.getFieldType() != RecordFieldType.RECORD || !(value instanceof Record)) {
return field;
}
final RecordDataType recordDataType = (RecordDataType) chosenDataType;
final Record childRecord = (Record) value;
childRecord.incorporateInactiveFields();
final RecordSchema definedChildSchema = recordDataType.getChildSchema();
final RecordSchema actualChildSchema = childRecord.getSchema();
final RecordSchema combinedChildSchema = DataTypeUtils.merge(definedChildSchema, actualChildSchema);
final DataType combinedDataType = RecordFieldType.RECORD.getRecordDataType(combinedChildSchema);
final List<DataType> updatedPossibleTypes = new ArrayList<>(possibleTypes.size());
for (final DataType possibleType : possibleTypes) {
if (possibleType.equals(chosenDataType)) {
updatedPossibleTypes.add(combinedDataType);
} else {
updatedPossibleTypes.add(possibleType);
}
}
final DataType mergedDataType = RecordFieldType.CHOICE.getChoiceDataType(updatedPossibleTypes);
return new RecordField(field.getFieldName(), mergedDataType, field.getDefaultValue(), field.getAliases(), field.isNullable());
}
return field;
}
private boolean isSimpleType(final RecordFieldType fieldType) {
switch (fieldType) {
case ARRAY:
case RECORD:
case MAP:
case CHOICE:
return false;
}
return true;
} }
@Override @Override

View File

@ -1,13 +1,13 @@
<?xml version="1.0"?> <?xml version="1.0"?>
<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor <!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
license agreements. See the NOTICE file distributed with this work for additional license agreements. See the NOTICE file distributed with this work for additional
information regarding copyright ownership. The ASF licenses this file to information regarding copyright ownership. The ASF licenses this file to
You under the Apache License, Version 2.0 (the "License"); you may not use You under the Apache License, Version 2.0 (the "License"); you may not use
this file except in compliance with the License. You may obtain a copy of this file except in compliance with the License. You may obtain a copy of
the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
by applicable law or agreed to in writing, software distributed under the by applicable law or agreed to in writing, software distributed under the
License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
OF ANY KIND, either express or implied. See the License for the specific OF ANY KIND, either express or implied. See the License for the specific
language governing permissions and limitations under the License. --> language governing permissions and limitations under the License. -->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
@ -554,6 +554,8 @@
<exclude>src/test/resources/TestUpdateRecord/output/updateArrays/multi-arrays-0and1.json</exclude> <exclude>src/test/resources/TestUpdateRecord/output/updateArrays/multi-arrays-0and1.json</exclude>
<exclude>src/test/resources/TestUpdateRecord/output/updateArrays/multi-arrays-0and2.json</exclude> <exclude>src/test/resources/TestUpdateRecord/output/updateArrays/multi-arrays-0and2.json</exclude>
<exclude>src/test/resources/TestUpdateRecord/output/updateArrays/multi-arrays-streets.json</exclude> <exclude>src/test/resources/TestUpdateRecord/output/updateArrays/multi-arrays-streets.json</exclude>
<exclude>src/test/resources/TestUpdateRecord/input/addresses.json</exclude>
<exclude>src/test/resources/TestUpdateRecord/output/full-addresses.json</exclude>
<exclude>src/test/resources/xxe_template.xml</exclude> <exclude>src/test/resources/xxe_template.xml</exclude>
<exclude>src/test/resources/xxe_from_report.xml</exclude> <exclude>src/test/resources/xxe_from_report.xml</exclude>
<exclude>src/test/resources/TestForkRecord/single-element-nested-array-strings.json</exclude> <exclude>src/test/resources/TestForkRecord/single-element-nested-array-strings.json</exclude>

View File

@ -203,8 +203,6 @@ public class UpdateRecord extends AbstractRecordProcessor {
final List<FieldValue> selectedFields = replacementResult.getSelectedFields().collect(Collectors.toList()); final List<FieldValue> selectedFields = replacementResult.getSelectedFields().collect(Collectors.toList());
final Object replacementObject = getReplacementObject(selectedFields); final Object replacementObject = getReplacementObject(selectedFields);
updateFieldValue(fieldVal, replacementObject); updateFieldValue(fieldVal, replacementObject);
record = updateRecord(destinationFieldValues, selectedFields, record);
} }
return record; return record;

View File

@ -17,19 +17,12 @@
package org.apache.nifi.processors.standard; package org.apache.nifi.processors.standard;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Collections;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.json.JsonRecordSetWriter; import org.apache.nifi.json.JsonRecordSetWriter;
import org.apache.nifi.json.JsonTreeReader; import org.apache.nifi.json.JsonTreeReader;
import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.schema.access.SchemaAccessUtils; import org.apache.nifi.schema.access.SchemaAccessUtils;
import org.apache.nifi.schema.inference.SchemaInferenceUtil;
import org.apache.nifi.serialization.record.MockRecordParser; import org.apache.nifi.serialization.record.MockRecordParser;
import org.apache.nifi.serialization.record.MockRecordWriter; import org.apache.nifi.serialization.record.MockRecordWriter;
import org.apache.nifi.serialization.record.RecordFieldType; import org.apache.nifi.serialization.record.RecordFieldType;
@ -39,6 +32,14 @@ import org.apache.nifi.util.TestRunners;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Collections;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class TestUpdateRecord { public class TestUpdateRecord {
private TestRunner runner; private TestRunner runner;
@ -158,6 +159,31 @@ public class TestUpdateRecord {
mff.assertContentEquals("header\nJohnny,Johnny\n"); mff.assertContentEquals("header\nJohnny,Johnny\n");
} }
@Test
public void testConcatWithArrayInferredSchema() throws InitializationException, IOException {
final JsonTreeReader jsonReader = new JsonTreeReader();
runner.addControllerService("reader", jsonReader);
runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaInferenceUtil.INFER_SCHEMA);
runner.enableControllerService(jsonReader);
final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter();
runner.addControllerService("writer", jsonWriter);
runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.INHERIT_RECORD_SCHEMA);
runner.setProperty(jsonWriter, "Pretty Print JSON", "true");
runner.setProperty(jsonWriter, "Schema Write Strategy", "full-schema-attribute");
runner.enableControllerService(jsonWriter);
runner.enqueue(Paths.get("src/test/resources/TestUpdateRecord/input/addresses.json"));
runner.setProperty("/addresses[*]/full", "concat(../street, ' ', ../city, ' ', ../state)");
runner.setProperty(UpdateRecord.REPLACEMENT_VALUE_STRATEGY, UpdateRecord.RECORD_PATH_VALUES);
runner.run();
runner.assertAllFlowFilesTransferred(UpdateRecord.REL_SUCCESS, 1);
final String expectedOutput = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/output/full-addresses.json")));
runner.getFlowFilesForRelationship(UpdateRecord.REL_SUCCESS).get(0).assertContentEquals(expectedOutput);
}
@Test @Test
public void testChangingSchema() throws InitializationException, IOException { public void testChangingSchema() throws InitializationException, IOException {
final JsonTreeReader jsonReader = new JsonTreeReader(); final JsonTreeReader jsonReader = new JsonTreeReader();

View File

@ -0,0 +1,11 @@
[ {
"addresses" : [ {
"street" : "1234 My Street",
"city" : "My City",
"state" : "MS"
}, {
"street" : "4321 Your Street",
"city" : "Your City",
"state" : "YS"
} ]
} ]

View File

@ -0,0 +1,13 @@
[ {
"addresses" : [ {
"street" : "1234 My Street",
"city" : "My City",
"state" : "MS",
"full" : "1234 My Street My City MS"
}, {
"street" : "4321 Your Street",
"city" : "Your City",
"state" : "YS",
"full" : "4321 Your Street Your City YS"
} ]
} ]