NIFI-8659: Support multiple output records in JoltTransformRecord (#5126)

NIFI-8659: Support multiple output records in JoltTransformRecord
This commit is contained in:
Matthew Burgess 2021-06-07 15:30:41 -04:00 committed by GitHub
parent f07e17ba74
commit 659fda10f3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 169 additions and 13 deletions

View File

@ -113,8 +113,10 @@
<exclude>src/test/resources/TestJoltTransformRecord/defaultrSpec.json</exclude> <exclude>src/test/resources/TestJoltTransformRecord/defaultrSpec.json</exclude>
<exclude>src/test/resources/TestJoltTransformRecord/defaultrOutput.json</exclude> <exclude>src/test/resources/TestJoltTransformRecord/defaultrOutput.json</exclude>
<exclude>src/test/resources/TestJoltTransformRecord/shiftrSpec.json</exclude> <exclude>src/test/resources/TestJoltTransformRecord/shiftrSpec.json</exclude>
<exclude>src/test/resources/TestJoltTransformRecord/shiftrSpecMultipleOutputRecords.json</exclude>
<exclude>src/test/resources/TestJoltTransformRecord/sortrOutput.json</exclude> <exclude>src/test/resources/TestJoltTransformRecord/sortrOutput.json</exclude>
<exclude>src/test/resources/TestJoltTransformRecord/shiftrOutput.json</exclude> <exclude>src/test/resources/TestJoltTransformRecord/shiftrOutput.json</exclude>
<exclude>src/test/resources/TestJoltTransformRecord/shiftrOutputMultipleOutputRecords.json</exclude>
<exclude>src/test/resources/TestJoltTransformRecord/removrSpec.json</exclude> <exclude>src/test/resources/TestJoltTransformRecord/removrSpec.json</exclude>
<exclude>src/test/resources/TestJoltTransformRecord/removrOutput.json</exclude> <exclude>src/test/resources/TestJoltTransformRecord/removrOutput.json</exclude>
<exclude>src/test/resources/TestJoltTransformRecord/defaultrSpec.json</exclude> <exclude>src/test/resources/TestJoltTransformRecord/defaultrSpec.json</exclude>
@ -135,6 +137,7 @@
<exclude>src/test/resources/TestTransformFactory/cardrSpec.json</exclude> <exclude>src/test/resources/TestTransformFactory/cardrSpec.json</exclude>
<exclude>src/test/resources/TestTransformFactory/defaultrSpec.json</exclude> <exclude>src/test/resources/TestTransformFactory/defaultrSpec.json</exclude>
<exclude>src/test/resources/TestTransformFactory/shiftrSpec.json</exclude> <exclude>src/test/resources/TestTransformFactory/shiftrSpec.json</exclude>
<exclude>src/test/resources/TestTransformFactory/shiftrSpecMultipleOutputRecords.json</exclude>
<exclude>src/test/resources/TestTransformFactory/removrSpec.json</exclude> <exclude>src/test/resources/TestTransformFactory/removrSpec.json</exclude>
<exclude>src/test/resources/TestTransformFactory/defaultrSpec.json</exclude> <exclude>src/test/resources/TestTransformFactory/defaultrSpec.json</exclude>
<exclude>src/test/resources/TestTransformFactory/modifierDefaultSpec.json</exclude> <exclude>src/test/resources/TestTransformFactory/modifierDefaultSpec.json</exclude>

View File

@ -331,12 +331,16 @@ public class JoltTransformRecord extends AbstractProcessor {
} else { } else {
final JoltTransform transform = getTransform(context, original); final JoltTransform transform = getTransform(context, original);
final Record transformedFirstRecord = transform(firstRecord, transform); final List<Record >transformedFirstRecords = transform(firstRecord, transform);
if (transformedFirstRecord == null) { if (transformedFirstRecords.isEmpty()) {
throw new ProcessException("Error transforming the first record"); throw new ProcessException("Error transforming the first record");
} }
final Record transformedFirstRecord = transformedFirstRecords.get(0);
if (transformedFirstRecord == null) {
throw new ProcessException("Error transforming the first record");
}
final RecordSchema writeSchema = writerFactory.getSchema(original.getAttributes(), transformedFirstRecord.getSchema()); final RecordSchema writeSchema = writerFactory.getSchema(original.getAttributes(), transformedFirstRecord.getSchema());
// TODO: Is it possible that two Records with the same input schema could have different schemas after transformation? // TODO: Is it possible that two Records with the same input schema could have different schemas after transformation?
@ -349,11 +353,24 @@ public class JoltTransformRecord extends AbstractProcessor {
writer.beginRecordSet(); writer.beginRecordSet();
writer.write(transformedFirstRecord); writer.write(transformedFirstRecord);
Record record; Record record;
// If multiple output records were generated, write them out
for (int i = 1; i < transformedFirstRecords.size(); i++) {
record = transformedFirstRecords.get(i);
if (record == null) {
throw new ProcessException("Error transforming the first record");
}
writer.write(record);
}
while ((record = reader.nextRecord()) != null) { while ((record = reader.nextRecord()) != null) {
final Record transformedRecord = transform(record, transform); final List<Record> transformedRecords = transform(record, transform);
writer.write(transformedRecord); if (transformedRecords == null) {
throw new ProcessException("Error transforming the record");
}
for (Record transformedRecord : transformedRecords) {
writer.write(transformedRecord);
}
} }
writeResult = writer.finishRecordSet(); writeResult = writer.finishRecordSet();
@ -388,7 +405,7 @@ public class JoltTransformRecord extends AbstractProcessor {
session.transfer(original, REL_ORIGINAL); session.transfer(original, REL_ORIGINAL);
} }
private Record transform(final Record record, final JoltTransform transform) { private List<Record> transform(final Record record, final JoltTransform transform) {
Map<String, Object> recordMap = (Map<String, Object>) DataTypeUtils.convertRecordFieldtoObject(record, RecordFieldType.RECORD.getRecordDataType(record.getSchema())); Map<String, Object> recordMap = (Map<String, Object>) DataTypeUtils.convertRecordFieldtoObject(record, RecordFieldType.RECORD.getRecordDataType(record.getSchema()));
// JOLT expects arrays to be of type List where our Record code uses Object[]. // JOLT expects arrays to be of type List where our Record code uses Object[].
@ -399,8 +416,23 @@ public class JoltTransformRecord extends AbstractProcessor {
// JOLT expects arrays to be of type List where our Record code uses Object[]. // JOLT expects arrays to be of type List where our Record code uses Object[].
// Make another pass of the transformed objects to change List to Object[]. // Make another pass of the transformed objects to change List to Object[].
final Object normalizedRecordValues = normalizeRecordObjects(transformedObject); final Object normalizedRecordValues = normalizeRecordObjects(transformedObject);
final Record updatedRecord = DataTypeUtils.toRecord(normalizedRecordValues, "r"); final List<Record> recordList = new ArrayList<>();
return updatedRecord;
if (normalizedRecordValues == null) {
return recordList;
}
// If the top-level object is an array, return a list of the records inside. Otherwise return a singleton list with the single transformed record
if (normalizedRecordValues instanceof Object[]) {
for (Object o : (Object[]) normalizedRecordValues) {
if (o != null) {
recordList.add(DataTypeUtils.toRecord(o, "r"));
}
}
} else {
recordList.add(DataTypeUtils.toRecord(normalizedRecordValues, "r"));
}
return recordList;
} }
private JoltTransform getTransform(final ProcessContext context, final FlowFile flowFile) { private JoltTransform getTransform(final ProcessContext context, final FlowFile flowFile) {
@ -460,7 +492,7 @@ public class JoltTransformRecord extends AbstractProcessor {
} else if (o instanceof Object[]) { } else if (o instanceof Object[]) {
return Arrays.stream(((Object[]) o)).map(JoltTransformRecord::normalizeJoltObjects).collect(Collectors.toList()); return Arrays.stream(((Object[]) o)).map(JoltTransformRecord::normalizeJoltObjects).collect(Collectors.toList());
} else if (o instanceof Collection) { } else if (o instanceof Collection) {
Collection c = (Collection) o; Collection<?> c = (Collection<?>) o;
return c.stream().map(JoltTransformRecord::normalizeJoltObjects).collect(Collectors.toList()); return c.stream().map(JoltTransformRecord::normalizeJoltObjects).collect(Collectors.toList());
} else { } else {
return o; return o;
@ -474,13 +506,21 @@ public class JoltTransformRecord extends AbstractProcessor {
m.forEach((k, v) -> m.put(k, normalizeRecordObjects(v))); m.forEach((k, v) -> m.put(k, normalizeRecordObjects(v)));
return m; return m;
} else if (o instanceof List) { } else if (o instanceof List) {
return ((List<Object>) o).stream().map(JoltTransformRecord::normalizeRecordObjects).toArray(Object[]::new); final List<Object> objectList = (List<Object>) o;
final Object[] objectArray = new Object[objectList.size()];
for (int i = 0; i < objectArray.length; i++) {
objectArray[i] = normalizeRecordObjects(objectList.get(i));
}
return objectArray;
} else if (o instanceof Collection) { } else if (o instanceof Collection) {
Collection c = (Collection) o; Collection<?> c = (Collection<?>) o;
return c.stream().map(JoltTransformRecord::normalizeRecordObjects).collect(Collectors.toList()); final List<Object> objectList = new ArrayList<>();
for (Object obj : c) {
objectList.add(normalizeRecordObjects(obj));
}
return objectList;
} else { } else {
return o; return o;
} }
} }
} }

View File

@ -44,6 +44,7 @@ import java.nio.file.Paths;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.function.BiFunction; import java.util.function.BiFunction;
@ -308,6 +309,66 @@ runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1);
transformed.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/json"); transformed.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/json");
assertEquals(new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/shiftrOutput.json"))), assertEquals(new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/shiftrOutput.json"))),
new String(transformed.toByteArray())); new String(transformed.toByteArray()));
}
@Test
public void testTransformInputWithShiftrMultipleOutputRecords() throws IOException {
RecordField aField = new RecordField("a", RecordFieldType.INT.getDataType());
RecordField bField = new RecordField("b", RecordFieldType.INT.getDataType());
RecordField cField = new RecordField("c", RecordFieldType.INT.getDataType());
List<RecordField> abcFields = Arrays.asList(aField, bField, cField);
RecordSchema xSchema = new SimpleRecordSchema(abcFields);
RecordField xRecord = new RecordField("x", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(xSchema)));
parser.addSchemaField(xRecord);
final Record record1 = new MapRecord(xSchema, new HashMap<String, Object>() {{
put("a", 1);
put("b", 2);
put("c", 3);
}});
final Record record2 = new MapRecord(xSchema, new HashMap<String, Object>() {{
put("a", 11);
put("b", 21);
put("c", 31);
}});
final Record record3 = new MapRecord(xSchema, new HashMap<String, Object>() {{
put("a", 21);
put("b", 2);
put("c", 3);
}});
final Object[] recordArray1 = new Object[] {record1, record2, record3};
parser.addRecord((Object) recordArray1);
final Record record4 = new MapRecord(xSchema, new HashMap<String, Object>() {{
put("a", 100);
put("b", 200);
put("c", 300);
}});
final Record record5 = new MapRecord(xSchema, new HashMap<String, Object>() {{
put("a", 101);
put("b", 201);
put("c", 301);
}});
final Object[] recordArray2 = new Object[] {record4, record5};
parser.addRecord((Object) recordArray2);
final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/shiftrOutputSchemaMultipleOutputRecords.avsc")));
runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText);
runner.setProperty(writer, "Pretty Print JSON", "true");
runner.enableControllerService(writer);
final String spec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/shiftrSpecMultipleOutputRecords.json")));
runner.setProperty(JoltTransformRecord.JOLT_SPEC, spec);
runner.setProperty(JoltTransformRecord.JOLT_TRANSFORM, JoltTransformRecord.SHIFTR);
runner.enqueue(new byte[0]);
runner.run();
runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 1);
runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1);
final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).get(0);
transformed.assertAttributeExists(CoreAttributes.MIME_TYPE.key());
transformed.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/json");
assertEquals(new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/shiftrOutputMultipleOutputRecords.json"))),
new String(transformed.toByteArray()));
} }

View File

@ -0,0 +1,21 @@
[ {
"a" : "1",
"b" : "2",
"c" : "3"
}, {
"a" : "11",
"b" : "21",
"c" : "31"
}, {
"a" : "21",
"b" : "2",
"c" : "3"
}, {
"a" : "100",
"b" : "200",
"c" : "300"
}, {
"a" : "101",
"b" : "201",
"c" : "301"
} ]

View File

@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
{
"type": "record",
"name": "x",
"namespace": "org.apache.nifi",
"fields": [
{"name": "a", "type": "string"},
{"name": "b", "type": "string"},
{"name": "c", "type": "string"}
]
}