NIFI-13989 Reduced constructors in JsonTreeRowRecordReader to one (#9506)

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
dan-s1 2024-11-12 09:24:30 -05:00 committed by GitHub
parent 21791b09da
commit f15b7caaa4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 130 additions and 129 deletions

View File

@ -53,23 +53,6 @@ public class JsonTreeRowRecordReader extends AbstractJsonRowRecordReader {
private final RecordSchema schema;
public JsonTreeRowRecordReader(final InputStream in, final ComponentLog logger, final RecordSchema schema,
final String dateFormat, final String timeFormat, final String timestampFormat)
throws IOException, MalformedRecordException {
this(in, logger, schema, dateFormat, timeFormat, timestampFormat, null, null, null, null);
}
public JsonTreeRowRecordReader(final InputStream in, final ComponentLog logger, final RecordSchema schema,
final String dateFormat, final String timeFormat, final String timestampFormat,
final StartingFieldStrategy startingFieldStrategy, final String startingFieldName,
final SchemaApplicationStrategy schemaApplicationStrategy, final BiPredicate<String, String> captureFieldPredicate)
throws IOException, MalformedRecordException {
this(in, logger, schema, dateFormat, timeFormat, timestampFormat, startingFieldStrategy, startingFieldName, schemaApplicationStrategy,
captureFieldPredicate, false, null, new JsonParserFactory());
}
public JsonTreeRowRecordReader(final InputStream in, final ComponentLog logger, final RecordSchema schema,
final String dateFormat, final String timeFormat, final String timestampFormat,
final StartingFieldStrategy startingFieldStrategy, final String startingFieldName,

View File

@ -23,6 +23,7 @@ import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.json.JsonParserFactory;
import org.apache.nifi.json.JsonRecordSource;
import org.apache.nifi.json.JsonSchemaInference;
import org.apache.nifi.json.JsonTreeRowRecordReader;
@ -301,7 +302,8 @@ public class RecordTransformProxy extends PythonProcessorProxy<RecordTransform>
}
try (final InputStream in = new ByteArrayInputStream(jsonBytes)) {
final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, getLogger(), schema, null, null, null);
final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, getLogger(), schema, null, null, null, null,
null, null, null, false, null, new JsonParserFactory());
final Record record = reader.nextRecord(false, false);
return record;
}

View File

@ -44,6 +44,7 @@ import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.json.JsonParserFactory;
import org.apache.nifi.json.JsonTreeRowRecordReader;
import org.apache.nifi.json.SchemaApplicationStrategy;
import org.apache.nifi.json.StartingFieldStrategy;
@ -505,7 +506,10 @@ public class QuerySalesforceObject extends AbstractProcessor {
StartingFieldStrategy.NESTED_FIELD,
STARTING_FIELD_NAME,
SchemaApplicationStrategy.SELECTED_PART,
CAPTURE_PREDICATE
CAPTURE_PREDICATE,
false,
null,
new JsonParserFactory()
);
}

View File

@ -19,6 +19,7 @@ package org.apache.nifi.processors.standard;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.json.JsonParserFactory;
import org.apache.nifi.json.JsonRecordSetWriter;
import org.apache.nifi.json.JsonTreeReader;
import org.apache.nifi.json.JsonTreeRowRecordReader;
@ -460,15 +461,14 @@ public class TestForkRecord {
@Override
public RecordReader createRecordReader(FlowFile flowFile, InputStream in, ComponentLog logger) throws MalformedRecordException, IOException, SchemaNotFoundException {
return new JsonTreeRowRecordReader(in, logger, schema, dateFormat, timeFormat, timestampFormat);
return new JsonTreeRowRecordReader(in, logger, schema, dateFormat, timeFormat, timestampFormat, null, null, null, null, false, null, new JsonParserFactory());
}
@Override
public RecordReader createRecordReader(Map<String, String> variables, InputStream in, long inputLength, ComponentLog logger)
throws MalformedRecordException, IOException, SchemaNotFoundException {
return new JsonTreeRowRecordReader(in, logger, schema, dateFormat, timeFormat, timestampFormat);
return new JsonTreeRowRecordReader(in, logger, schema, dateFormat, timeFormat, timestampFormat, null, null, null, null, false, null, new JsonParserFactory());
}
}
private class CustomRecordWriter extends MockRecordWriter {

View File

@ -35,9 +35,11 @@ import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.type.ChoiceDataType;
import org.apache.nifi.util.EqualsWrapper;
import org.apache.nifi.util.MockComponentLog;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -49,6 +51,8 @@ import java.io.InputStream;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.sql.Date;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@ -63,19 +67,23 @@ import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
@ExtendWith(MockitoExtension.class)
class TestJsonTreeRowRecordReader {
private static final Logger LOGGER = LoggerFactory.getLogger(TestJsonTreeRowRecordReader.class);
private final String dateFormat = RecordFieldType.DATE.getDefaultFormat();
private final String timeFormat = RecordFieldType.TIME.getDefaultFormat();
private final String timestampFormat = RecordFieldType.TIMESTAMP.getDefaultFormat();
@Mock
private ComponentLog log;
private List<RecordField> getDefaultFields() {
return getFields(RecordFieldType.DOUBLE.getDataType());
}
@ -102,7 +110,7 @@ class TestJsonTreeRowRecordReader {
}
@Test
void testReadChoiceOfStringOrArrayOfRecords() throws IOException, MalformedRecordException {
void testReadChoiceOfStringOrArrayOfRecords() throws Exception {
final File schemaFile = new File("src/test/resources/json/choice-of-string-or-array-record.avsc");
final File jsonFile = new File("src/test/resources/json/choice-of-string-or-array-record.json");
@ -110,23 +118,23 @@ class TestJsonTreeRowRecordReader {
final RecordSchema recordSchema = AvroTypeUtil.createSchema(avroSchema);
try (final InputStream fis = new FileInputStream(jsonFile);
final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(fis, new MockComponentLog("id", "id"), recordSchema, dateFormat, timeFormat, timestampFormat)) {
final JsonTreeRowRecordReader reader = createJsonTreeRowRecordReader(fis, recordSchema)) {
final Record record = reader.nextRecord();
final Object[] fieldsArray = record.getAsArray("fields");
assertEquals(2, fieldsArray.length);
final Object firstElement = fieldsArray[0];
assertTrue(firstElement instanceof Record);
assertInstanceOf(Record.class, firstElement);
assertEquals("string", ((Record) firstElement).getAsString("type"));
final Object secondElement = fieldsArray[1];
assertTrue(secondElement instanceof Record);
assertInstanceOf(Record.class, secondElement);
final Object[] typeArray = ((Record) secondElement).getAsArray("type");
assertEquals(1, typeArray.length);
final Object firstType = typeArray[0];
assertTrue(firstType instanceof Record);
assertInstanceOf(Record.class, firstType);
final Record firstTypeRecord = (Record) firstType;
assertEquals("string", firstTypeRecord.getAsString("type"));
}
@ -135,14 +143,12 @@ class TestJsonTreeRowRecordReader {
@Test
@Disabled("Intended only for manual testing to determine performance before/after modifications")
void testPerformanceOnLocalFile() throws IOException, MalformedRecordException {
void testPerformanceOnLocalFile() throws Exception {
final RecordSchema schema = new SimpleRecordSchema(Collections.emptyList());
final File file = new File("/devel/nifi/nifi-assembly/target/nifi-1.2.0-SNAPSHOT-bin/nifi-1.2.0-SNAPSHOT/prov/16812193969219289");
final byte[] data = Files.readAllBytes(file.toPath());
final ComponentLog logger = mock(ComponentLog.class);
int recordCount = 0;
final int iterations = 1000;
@ -150,10 +156,10 @@ class TestJsonTreeRowRecordReader {
final long start = System.nanoTime();
for (int i = 0; i < iterations; i++) {
try (final InputStream in = new ByteArrayInputStream(data);
final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, logger, schema, dateFormat, timeFormat, timestampFormat)) {
while (reader.nextRecord() != null) {
recordCount++;
}
final JsonTreeRowRecordReader reader = createJsonTreeRowRecordReader(in, schema)) {
while (reader.nextRecord() != null) {
recordCount++;
}
}
}
final long nanos = System.nanoTime() - start;
@ -164,14 +170,12 @@ class TestJsonTreeRowRecordReader {
@Test
@Disabled("Intended only for manual testing to determine performance before/after modifications")
void testPerformanceOnIndividualMessages() throws IOException, MalformedRecordException {
void testPerformanceOnIndividualMessages() throws Exception {
final RecordSchema schema = new SimpleRecordSchema(Collections.emptyList());
final File file = new File("/devel/nifi/nifi-assembly/target/nifi-1.2.0-SNAPSHOT-bin/nifi-1.2.0-SNAPSHOT/1.prov.json");
final byte[] data = Files.readAllBytes(file.toPath());
final ComponentLog logger = mock(ComponentLog.class);
int recordCount = 0;
final int iterations = 1_000_000;
@ -179,7 +183,7 @@ class TestJsonTreeRowRecordReader {
final long start = System.nanoTime();
for (int i = 0; i < iterations; i++) {
try (final InputStream in = new ByteArrayInputStream(data);
final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, logger, schema, dateFormat, timeFormat, timestampFormat)) {
final JsonTreeRowRecordReader reader = createJsonTreeRowRecordReader(in, schema)) {
while (reader.nextRecord() != null) {
recordCount++;
}
@ -192,12 +196,12 @@ class TestJsonTreeRowRecordReader {
}
@Test
void testChoiceOfRecordTypes() throws IOException, MalformedRecordException {
void testChoiceOfRecordTypes() throws Exception {
final Schema avroSchema = new Schema.Parser().parse(new File("src/test/resources/json/record-choice.avsc"));
final RecordSchema recordSchema = AvroTypeUtil.createSchema(avroSchema);
try (final InputStream in = new FileInputStream("src/test/resources/json/elements-for-record-choice.json");
final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), recordSchema, dateFormat, timeFormat, timestampFormat)) {
final JsonTreeRowRecordReader reader = createJsonTreeRowRecordReader(in, recordSchema)) {
// evaluate first record
final Record firstRecord = reader.nextRecord();
@ -214,7 +218,7 @@ class TestJsonTreeRowRecordReader {
// child record should have a schema with "id" as the only field
final Object childObject = firstRecord.getValue("child");
assertTrue(childObject instanceof Record);
assertInstanceOf(Record.class, childObject);
final Record firstChildRecord = (Record) childObject;
final RecordSchema firstChildSchema = firstChildRecord.getSchema();
@ -236,7 +240,7 @@ class TestJsonTreeRowRecordReader {
// child record should have a schema with "name" as the only field
final Object secondChildObject = secondRecord.getValue("child");
assertTrue(secondChildObject instanceof Record);
assertInstanceOf(Record.class, secondChildObject);
final Record secondChildRecord = (Record) secondChildObject;
final RecordSchema secondChildSchema = secondChildRecord.getSchema();
@ -248,11 +252,11 @@ class TestJsonTreeRowRecordReader {
}
@Test
void testReadArray() throws IOException, MalformedRecordException {
void testReadArray() throws Exception {
final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
try (final InputStream in = new FileInputStream("src/test/resources/json/bank-account-array.json");
final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
final JsonTreeRowRecordReader reader = createJsonTreeRowRecordReader(in, schema)) {
final List<String> fieldNames = schema.getFieldNames();
final List<String> expectedFieldNames = Arrays.asList("id", "name", "balance", "address", "city", "state", "zipCode", "country");
@ -274,11 +278,11 @@ class TestJsonTreeRowRecordReader {
}
@Test
void testReadOneLinePerJSON() throws IOException, MalformedRecordException {
void testReadOneLinePerJSON() throws Exception {
final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
try (final InputStream in = new FileInputStream("src/test/resources/json/bank-account-oneline.json");
final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
final JsonTreeRowRecordReader reader = createJsonTreeRowRecordReader(in, schema)) {
final List<String> fieldNames = schema.getFieldNames();
final List<String> expectedFieldNames = Arrays.asList("id", "name", "balance", "address", "city", "state", "zipCode", "country");
@ -300,7 +304,7 @@ class TestJsonTreeRowRecordReader {
}
@Test
void testReadMultilineJSON() throws IOException, MalformedRecordException {
void testReadMultilineJSON() throws Exception {
testReadAccountJson("src/test/resources/json/bank-account-multiline.json", false, null);
}
@ -313,23 +317,22 @@ class TestJsonTreeRowRecordReader {
}
@Test
void testReadJSONComments() throws IOException, MalformedRecordException {
void testReadJSONComments() throws Exception {
testReadAccountJson("src/test/resources/json/bank-account-comments.jsonc", true, StreamReadConstraints.builder().maxStringLength(20_000).build());
}
@Test
void testReadJSONDisallowComments() {
final MalformedRecordException mre = assertThrows(MalformedRecordException.class, () ->
assertThrows(MalformedRecordException.class, () ->
testReadAccountJson("src/test/resources/json/bank-account-comments.jsonc", false, StreamReadConstraints.builder().maxStringLength(20_000).build()));
}
private void testReadAccountJson(final String inputFile, final boolean allowComments, final StreamReadConstraints streamReadConstraints) throws IOException, MalformedRecordException {
private void testReadAccountJson(final String inputFile, final boolean allowComments, final StreamReadConstraints streamReadConstraints) throws Exception {
final List<RecordField> fields = getFields(RecordFieldType.DECIMAL.getDecimalDataType(30, 10));
final RecordSchema schema = new SimpleRecordSchema(fields);
try (final InputStream in = new FileInputStream(inputFile);
final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat,
null, null, null, null, allowComments, streamReadConstraints, new JsonParserFactory())) {
final JsonTreeRowRecordReader reader = createJsonTreeRowRecordReader(in, schema, null, null, null, null, null, null, null, allowComments, streamReadConstraints)) {
final List<String> fieldNames = schema.getFieldNames();
final List<String> expectedFieldNames = Arrays.asList("id", "name", "balance", "address", "city", "state", "zipCode", "country");
@ -351,11 +354,11 @@ class TestJsonTreeRowRecordReader {
}
@Test
void testReadMultilineArrays() throws IOException, MalformedRecordException {
void testReadMultilineArrays() throws Exception {
final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
try (final InputStream in = new FileInputStream("src/test/resources/json/bank-account-multiarray.json");
final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
final JsonTreeRowRecordReader reader = createJsonTreeRowRecordReader(in, schema)) {
final List<String> fieldNames = schema.getFieldNames();
final List<String> expectedFieldNames = Arrays.asList("id", "name", "balance", "address", "city", "state", "zipCode", "country");
@ -383,11 +386,11 @@ class TestJsonTreeRowRecordReader {
}
@Test
void testReadMixedJSON() throws IOException, MalformedRecordException {
void testReadMixedJSON() throws Exception {
final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
try (final InputStream in = new FileInputStream("src/test/resources/json/bank-account-mixed.json");
final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
final JsonTreeRowRecordReader reader = createJsonTreeRowRecordReader(in, schema)) {
final List<String> fieldNames = schema.getFieldNames();
final List<String> expectedFieldNames = Arrays.asList("id", "name", "balance", "address", "city", "state", "zipCode", "country");
@ -416,14 +419,14 @@ class TestJsonTreeRowRecordReader {
}
@Test
void testReadRawRecordIncludesFieldsNotInSchema() throws IOException, MalformedRecordException {
void testReadRawRecordIncludesFieldsNotInSchema() throws Exception {
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("id", RecordFieldType.INT.getDataType()));
fields.add(new RecordField("name", RecordFieldType.STRING.getDataType()));
final RecordSchema schema = new SimpleRecordSchema(fields);
try (final InputStream in = new FileInputStream("src/test/resources/json/bank-account-array.json");
final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
final JsonTreeRowRecordReader reader = createJsonTreeRowRecordReader(in, schema)) {
final Record schemaValidatedRecord = reader.nextRecord(true, true);
assertEquals(1, schemaValidatedRecord.getValue("id"));
@ -432,7 +435,7 @@ class TestJsonTreeRowRecordReader {
}
try (final InputStream in = new FileInputStream("src/test/resources/json/bank-account-array.json");
final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
final JsonTreeRowRecordReader reader = createJsonTreeRowRecordReader(in, schema)) {
final Record rawRecord = reader.nextRecord(false, false);
assertEquals(1, rawRecord.getValue("id"));
@ -447,7 +450,7 @@ class TestJsonTreeRowRecordReader {
}
@Test
void testReadRawRecordFieldOrderPreserved() throws IOException, MalformedRecordException {
void testReadRawRecordFieldOrderPreserved() throws Exception {
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("id", RecordFieldType.INT.getDataType()));
final RecordSchema schema = new SimpleRecordSchema(fields);
@ -458,7 +461,7 @@ class TestJsonTreeRowRecordReader {
final String expectedMap = "{id=1, name=John Doe, address=123 My Street, city=My City, state=MS, zipCode=11111, country=USA, account={\"id\":42,\"balance\":4750.89}}";
try (final InputStream in = new FileInputStream("src/test/resources/json/single-element-nested.json");
final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
final JsonTreeRowRecordReader reader = createJsonTreeRowRecordReader(in, schema)) {
final Record rawRecord = reader.nextRecord(false, false);
@ -470,14 +473,14 @@ class TestJsonTreeRowRecordReader {
}
@Test
void testReadRawRecordTypeCoercion() throws IOException, MalformedRecordException {
void testReadRawRecordTypeCoercion() throws Exception {
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("id", RecordFieldType.STRING.getDataType()));
fields.add(new RecordField("name", RecordFieldType.STRING.getDataType()));
final RecordSchema schema = new SimpleRecordSchema(fields);
try (final InputStream in = new FileInputStream("src/test/resources/json/bank-account-array.json");
final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
final JsonTreeRowRecordReader reader = createJsonTreeRowRecordReader(in, schema)) {
final Record schemaValidatedRecord = reader.nextRecord(true, true);
assertEquals("1", schemaValidatedRecord.getValue("id")); // will be coerced into a STRING as per the schema
@ -488,7 +491,7 @@ class TestJsonTreeRowRecordReader {
}
try (final InputStream in = new FileInputStream("src/test/resources/json/bank-account-array.json");
final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
final JsonTreeRowRecordReader reader = createJsonTreeRowRecordReader(in, schema)) {
final Record rawRecord = reader.nextRecord(false, false);
assertEquals(1, rawRecord.getValue("id")); // will return raw value of (int) 1
@ -505,7 +508,7 @@ class TestJsonTreeRowRecordReader {
}
@Test
void testDateCoercedFromString() throws IOException, MalformedRecordException {
void testDateCoercedFromString() throws Exception {
final String dateField = "date";
final List<RecordField> recordFields = Collections.singletonList(new RecordField(dateField, RecordFieldType.DATE.getDataType()));
final RecordSchema schema = new SimpleRecordSchema(recordFields);
@ -515,38 +518,38 @@ class TestJsonTreeRowRecordReader {
final String json = String.format("{ \"%s\": \"%s\" }", dateField, date);
for (final boolean coerceTypes : new boolean[] {true, false}) {
try (final InputStream in = new ByteArrayInputStream(json.getBytes(StandardCharsets.UTF_8));
final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, datePattern, timeFormat, timestampFormat)) {
final JsonTreeRowRecordReader reader = createJsonTreeRowRecordReader(in, schema, datePattern, timeFormat, timestampFormat, null, null, null, null, false, null)) {
final Record record = reader.nextRecord(coerceTypes, false);
final Object value = record.getValue(dateField);
assertTrue(value instanceof java.sql.Date, "With coerceTypes set to " + coerceTypes + ", value is not a Date");
assertInstanceOf(Date.class, value, "With coerceTypes set to " + coerceTypes + ", value is not a Date");
assertEquals(date, value.toString());
}
}
}
@Test
void testTimestampCoercedFromString() throws IOException, MalformedRecordException {
void testTimestampCoercedFromString() throws Exception {
final List<RecordField> recordFields = Collections.singletonList(new RecordField("timestamp", RecordFieldType.TIMESTAMP.getDataType()));
final RecordSchema schema = new SimpleRecordSchema(recordFields);
for (final boolean coerceTypes : new boolean[] {true, false}) {
try (final InputStream in = new FileInputStream("src/test/resources/json/timestamp.json");
final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, "yyyy/MM/dd HH:mm:ss")) {
final JsonTreeRowRecordReader reader = createJsonTreeRowRecordReader(in, schema, dateFormat, timeFormat, "yyyy/MM/dd HH:mm:ss", null, null, null, null, false, null)) {
final Record record = reader.nextRecord(coerceTypes, false);
final Object value = record.getValue("timestamp");
assertTrue(value instanceof java.sql.Timestamp, "With coerceTypes set to " + coerceTypes + ", value is not a Timestamp");
assertInstanceOf(Timestamp.class, value, "With coerceTypes set to " + coerceTypes + ", value is not a Timestamp");
}
}
}
@Test
void testSingleJsonElement() throws IOException, MalformedRecordException {
void testSingleJsonElement() throws Exception {
final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
try (final InputStream in = new FileInputStream("src/test/resources/json/single-bank-account.json");
final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
final JsonTreeRowRecordReader reader = createJsonTreeRowRecordReader(in, schema)) {
final List<String> fieldNames = schema.getFieldNames();
final List<String> expectedFieldNames = Arrays.asList("id", "name", "balance", "address", "city", "state", "zipCode", "country");
@ -565,14 +568,14 @@ class TestJsonTreeRowRecordReader {
}
@Test
void testSingleJsonElementWithChoiceFields() throws IOException, MalformedRecordException {
void testSingleJsonElementWithChoiceFields() throws Exception {
// Wraps default fields by Choice data type to test mapping to a Choice type.
final List<RecordField> choiceFields = getDefaultFields().stream()
.map(f -> new RecordField(f.getFieldName(), RecordFieldType.CHOICE.getChoiceDataType(f.getDataType()))).collect(Collectors.toList());
final RecordSchema schema = new SimpleRecordSchema(choiceFields);
try (final InputStream in = new FileInputStream("src/test/resources/json/single-bank-account.json");
final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
final JsonTreeRowRecordReader reader = createJsonTreeRowRecordReader(in, schema)) {
final List<String> fieldNames = schema.getFieldNames();
final List<String> expectedFieldNames = Arrays.asList("id", "name", "balance", "address", "city", "state", "zipCode", "country");
@ -582,9 +585,9 @@ class TestJsonTreeRowRecordReader {
RecordFieldType.DOUBLE, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING);
final List<RecordField> fields = schema.getFields();
for (int i = 0; i < schema.getFields().size(); i++) {
assertTrue(fields.get(i).getDataType() instanceof ChoiceDataType);
assertInstanceOf(ChoiceDataType.class, fields.get(i).getDataType());
final ChoiceDataType choiceDataType = (ChoiceDataType) fields.get(i).getDataType();
assertEquals(expectedTypes.get(i), choiceDataType.getPossibleSubTypes().get(0).getFieldType());
assertEquals(expectedTypes.get(i), choiceDataType.getPossibleSubTypes().getFirst().getFieldType());
}
final Object[] firstRecordValues = reader.nextRecord().getValues();
@ -595,7 +598,7 @@ class TestJsonTreeRowRecordReader {
}
@Test
void testElementWithNestedData() throws IOException, MalformedRecordException {
void testElementWithNestedData() throws Exception {
final DataType accountType = RecordFieldType.RECORD.getRecordDataType(getAccountSchema());
final List<RecordField> fields = getDefaultFields();
fields.add(new RecordField("account", accountType));
@ -603,7 +606,7 @@ class TestJsonTreeRowRecordReader {
final RecordSchema schema = new SimpleRecordSchema(fields);
try (final InputStream in = new FileInputStream("src/test/resources/json/single-element-nested.json");
final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
final JsonTreeRowRecordReader reader = createJsonTreeRowRecordReader(in, schema)) {
final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(DataType::getFieldType).collect(Collectors.toList());
final List<RecordFieldType> expectedTypes = Arrays.asList(RecordFieldType.INT, RecordFieldType.STRING,
@ -625,7 +628,7 @@ class TestJsonTreeRowRecordReader {
}
@Test
void testElementWithNestedArray() throws IOException, MalformedRecordException {
void testElementWithNestedArray() throws Exception {
final DataType accountRecordType = RecordFieldType.RECORD.getRecordDataType(getAccountSchema());
final DataType accountsType = RecordFieldType.ARRAY.getArrayDataType(accountRecordType);
@ -635,7 +638,7 @@ class TestJsonTreeRowRecordReader {
final RecordSchema schema = new SimpleRecordSchema(fields);
try (final InputStream in = new FileInputStream("src/test/resources/json/single-element-nested-array.json");
final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
final JsonTreeRowRecordReader reader = createJsonTreeRowRecordReader(in, schema)) {
final List<String> fieldNames = schema.getFieldNames();
final List<String> expectedFieldNames = Arrays.asList("id", "name", "address", "city", "state", "zipCode", "country", "accounts");
@ -658,11 +661,11 @@ class TestJsonTreeRowRecordReader {
}
@Test
void testReadArrayDifferentSchemas() throws IOException, MalformedRecordException {
void testReadArrayDifferentSchemas() throws Exception {
final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
try (final InputStream in = new FileInputStream("src/test/resources/json/bank-account-array-different-schemas.json");
final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
final JsonTreeRowRecordReader reader = createJsonTreeRowRecordReader(in, schema)) {
final List<String> fieldNames = schema.getFieldNames();
final List<String> expectedFieldNames = Arrays.asList("id", "name", "balance", "address", "city", "state", "zipCode", "country");
@ -687,11 +690,11 @@ class TestJsonTreeRowRecordReader {
}
@Test
void testReadArrayDifferentSchemasWithOptionalElementOverridden() throws IOException, MalformedRecordException {
void testReadArrayDifferentSchemasWithOptionalElementOverridden() throws Exception {
final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
try (final InputStream in = new FileInputStream("src/test/resources/json/bank-account-array-optional-balance.json");
final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
final JsonTreeRowRecordReader reader = createJsonTreeRowRecordReader(in, schema)) {
final List<String> fieldNames = schema.getFieldNames();
final List<String> expectedFieldNames = Arrays.asList("id", "name", "balance", "address", "city", "state", "zipCode", "country");
@ -716,7 +719,7 @@ class TestJsonTreeRowRecordReader {
}
@Test
public void testMultipleInputRecordsWithStartingFieldArray() throws IOException, MalformedRecordException {
public void testMultipleInputRecordsWithStartingFieldArray() throws Exception {
final String inputJson = """
[{
"books": [{
@ -743,8 +746,8 @@ class TestJsonTreeRowRecordReader {
final List<String> ids = new ArrayList<>();
try (final InputStream in = new ByteArrayInputStream(inputJson.getBytes(StandardCharsets.UTF_8));
final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), bookSchema, dateFormat, timeFormat, timestampFormat,
StartingFieldStrategy.NESTED_FIELD, "books", SchemaApplicationStrategy.SELECTED_PART, null)) {
final JsonTreeRowRecordReader reader = createJsonTreeRowRecordReader(in, bookSchema, dateFormat, timeFormat, timestampFormat,
StartingFieldStrategy.NESTED_FIELD, "books", SchemaApplicationStrategy.SELECTED_PART, null, false, null)) {
Record record;
while ((record = reader.nextRecord()) != null) {
@ -757,7 +760,7 @@ class TestJsonTreeRowRecordReader {
}
@Test
public void testMultipleInputRecordsWithStartingFieldSingleObject() throws IOException, MalformedRecordException {
public void testMultipleInputRecordsWithStartingFieldSingleObject() throws Exception {
final String inputJson = """
{"book": {"id": 1,"title": "Book 1"}}
{"book": {"id": 2,"title": "Book 2"}}
@ -772,8 +775,8 @@ class TestJsonTreeRowRecordReader {
final List<String> ids = new ArrayList<>();
try (final InputStream in = new ByteArrayInputStream(inputJson.getBytes(StandardCharsets.UTF_8));
final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), bookSchema, dateFormat, timeFormat, timestampFormat,
StartingFieldStrategy.NESTED_FIELD, "book", SchemaApplicationStrategy.SELECTED_PART, null)) {
final JsonTreeRowRecordReader reader = createJsonTreeRowRecordReader(in, bookSchema, dateFormat, timeFormat, timestampFormat,
StartingFieldStrategy.NESTED_FIELD, "book", SchemaApplicationStrategy.SELECTED_PART, null, false, null)) {
Record record;
while ((record = reader.nextRecord()) != null) {
@ -788,7 +791,7 @@ class TestJsonTreeRowRecordReader {
@Test
void testReadUnicodeCharacters() throws IOException, MalformedRecordException {
void testReadUnicodeCharacters() throws Exception {
final List<RecordField> fromFields = new ArrayList<>();
fromFields.add(new RecordField("id", RecordFieldType.LONG.getDataType()));
@ -804,12 +807,12 @@ class TestJsonTreeRowRecordReader {
final RecordSchema schema = new SimpleRecordSchema(fields);
try (final InputStream in = new FileInputStream("src/test/resources/json/json-with-unicode.json");
final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
final JsonTreeRowRecordReader reader = createJsonTreeRowRecordReader(in, schema)) {
final Object[] firstRecordValues = reader.nextRecord().getValues();
final Object secondValue = firstRecordValues[1];
assertTrue(secondValue instanceof Long);
assertInstanceOf(Long.class, secondValue);
assertEquals(832036744985577473L, secondValue);
final Object unicodeValue = firstRecordValues[2];
@ -830,7 +833,7 @@ class TestJsonTreeRowRecordReader {
MalformedRecordException mre = assertThrows(MalformedRecordException.class, () -> {
try (final InputStream in = new FileInputStream("src/test/resources/json/single-bank-account-wrong-field-type.json");
final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
final JsonTreeRowRecordReader reader = createJsonTreeRowRecordReader(in, schema)) {
reader.nextRecord().getValues();
}
@ -858,12 +861,12 @@ class TestJsonTreeRowRecordReader {
));
List<Object> expected = Arrays.asList(
new MapRecord(expectedSchema, new HashMap<String, Object>() {{
new MapRecord(expectedSchema, new HashMap<>() {{
put("integer", 1);
put("boolean", true);
put("booleanOrString", true);
}}),
new MapRecord(expectedSchema, new HashMap<String, Object>() {{
new MapRecord(expectedSchema, new HashMap<>() {{
put("integer", 2);
put("string", "stringValue2");
put("booleanOrString", "booleanOrStringValue2");
@ -893,14 +896,14 @@ class TestJsonTreeRowRecordReader {
));
List<Object> expected = Arrays.asList(
new MapRecord(expectedRecordChoiceSchema, new HashMap<String, Object>() {{
put("record", new MapRecord(expectedRecordSchema1, new HashMap<String, Object>() {{
new MapRecord(expectedRecordChoiceSchema, new HashMap<>() {{
put("record", new MapRecord(expectedRecordSchema1, new HashMap<>() {{
put("integer", 1);
put("boolean", true);
}}));
}}),
new MapRecord(expectedRecordChoiceSchema, new HashMap<String, Object>() {{
put("record", new MapRecord(expectedRecordSchema2, new HashMap<String, Object>() {{
new MapRecord(expectedRecordChoiceSchema, new HashMap<>() {{
put("record", new MapRecord(expectedRecordSchema2, new HashMap<>() {{
put("integer", 2);
put("string", "stringValue2");
}}));
@ -992,7 +995,7 @@ class TestJsonTreeRowRecordReader {
}
@Test
void testStartFromNestedArray() throws IOException, MalformedRecordException {
void testStartFromNestedArray() throws Exception {
String jsonPath = "src/test/resources/json/single-element-nested-array.json";
SimpleRecordSchema expectedRecordSchema = new SimpleRecordSchema(Arrays.asList(
@ -1009,7 +1012,7 @@ class TestJsonTreeRowRecordReader {
}
@Test
void testStartFromNestedObject() throws IOException, MalformedRecordException {
void testStartFromNestedObject() throws Exception {
String jsonPath = "src/test/resources/json/single-element-nested.json";
SimpleRecordSchema expectedRecordSchema = new SimpleRecordSchema(Arrays.asList(
@ -1023,7 +1026,7 @@ class TestJsonTreeRowRecordReader {
}
@Test
void testStartFromMultipleNestedField() throws IOException, MalformedRecordException {
void testStartFromMultipleNestedField() throws Exception {
String jsonPath = "src/test/resources/json/multiple-nested-field.json";
SimpleRecordSchema expectedRecordSchema = new SimpleRecordSchema(Arrays.asList(
@ -1040,14 +1043,14 @@ class TestJsonTreeRowRecordReader {
}
@Test
void testStartFromSimpleFieldReturnsEmptyJson() throws IOException, MalformedRecordException {
void testStartFromSimpleFieldReturnsEmptyJson() throws Exception {
String jsonPath = "src/test/resources/json/single-element-nested.json";
testReadRecords(jsonPath, Collections.emptyList(), StartingFieldStrategy.NESTED_FIELD, "name");
}
@Test
void testStartFromNonExistentFieldWithDefinedSchema() throws IOException, MalformedRecordException {
void testStartFromNonExistentFieldWithDefinedSchema() throws Exception {
String jsonPath = "src/test/resources/json/single-element-nested.json";
SimpleRecordSchema expectedRecordSchema = new SimpleRecordSchema(getDefaultFields());
@ -1058,7 +1061,7 @@ class TestJsonTreeRowRecordReader {
}
@Test
void testStartFromNestedFieldThenStartObject() throws IOException, MalformedRecordException {
void testStartFromNestedFieldThenStartObject() throws Exception {
String jsonPath = "src/test/resources/json/nested-array-then-start-object.json";
final SimpleRecordSchema expectedRecordSchema = new SimpleRecordSchema(Arrays.asList(
@ -1076,7 +1079,7 @@ class TestJsonTreeRowRecordReader {
}
@Test
void testStartFromNestedObjectWithWholeJsonSchemaScope() throws IOException, MalformedRecordException {
void testStartFromNestedObjectWithWholeJsonSchemaScope() throws Exception {
String jsonPath = "src/test/resources/json/single-element-nested.json";
final RecordSchema accountSchema = new SimpleRecordSchema(Arrays.asList(
@ -1099,7 +1102,7 @@ class TestJsonTreeRowRecordReader {
}
@Test
void testStartFromNestedArrayWithWholeJsonSchemaScope() throws IOException, MalformedRecordException {
void testStartFromNestedArrayWithWholeJsonSchemaScope() throws Exception {
String jsonPath = "src/test/resources/json/single-element-nested-array.json";
RecordSchema accountSchema = new SimpleRecordSchema(Arrays.asList(
@ -1123,7 +1126,7 @@ class TestJsonTreeRowRecordReader {
}
@Test
void testStartFromDeepNestedObject() throws IOException, MalformedRecordException {
void testStartFromDeepNestedObject() throws Exception {
String jsonPath = "src/test/resources/json/single-element-deep-nested.json";
RecordSchema recordSchema = new SimpleRecordSchema(Arrays.asList(
@ -1149,7 +1152,7 @@ class TestJsonTreeRowRecordReader {
));
List<Object> expected = Collections.singletonList(
new MapRecord(expectedRecordSchema, new HashMap<String, Object>() {{
new MapRecord(expectedRecordSchema, new HashMap<>() {{
put("nestedLevel2Int", 111);
put("nestedLevel2String", "root.level1.level2:string");
}})
@ -1160,7 +1163,7 @@ class TestJsonTreeRowRecordReader {
}
@Test
void testCaptureFields() throws IOException, MalformedRecordException {
void testCaptureFields() throws Exception {
Map<String, String> expectedCapturedFields = new HashMap<>();
expectedCapturedFields.put("id", "1");
expectedCapturedFields.put("zipCode", "11111");
@ -1194,11 +1197,9 @@ class TestJsonTreeRowRecordReader {
));
try (InputStream in = new FileInputStream("src/test/resources/json/capture-fields.json")) {
JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(
in, mock(ComponentLog.class), recordSchema,
dateFormat, timeFormat, timestampFormat,
StartingFieldStrategy.NESTED_FIELD, startingFieldName,
SchemaApplicationStrategy.SELECTED_PART, capturePredicate);
JsonTreeRowRecordReader reader = createJsonTreeRowRecordReader(in, recordSchema, dateFormat, timeFormat, timestampFormat,
StartingFieldStrategy.NESTED_FIELD, startingFieldName, SchemaApplicationStrategy.SELECTED_PART,
capturePredicate, false, null);
while (reader.nextRecord() != null);
Map<String, String> capturedFields = reader.getCapturedFields();
@ -1207,7 +1208,7 @@ class TestJsonTreeRowRecordReader {
}
}
private void testReadRecords(String jsonFilename, List<Object> expected) throws IOException, MalformedRecordException {
private void testReadRecords(String jsonFilename, List<Object> expected) throws Exception {
final File jsonFile = new File(jsonFilename);
try (final InputStream jsonStream = new ByteArrayInputStream(FileUtils.readFileToByteArray(jsonFile))) {
final RecordSchema schema = inferSchema(jsonStream, StartingFieldStrategy.ROOT_NODE, null);
@ -1219,7 +1220,7 @@ class TestJsonTreeRowRecordReader {
List<Object> expected,
StartingFieldStrategy strategy,
String startingFieldName)
throws IOException, MalformedRecordException {
throws Exception {
final File jsonFile = new File(jsonPath);
try (InputStream jsonStream = new ByteArrayInputStream(FileUtils.readFileToByteArray(jsonFile))) {
@ -1228,7 +1229,7 @@ class TestJsonTreeRowRecordReader {
}
}
private void testReadRecords(String jsonPath, RecordSchema schema, List<Object> expected) throws IOException, MalformedRecordException {
private void testReadRecords(String jsonPath, RecordSchema schema, List<Object> expected) throws Exception {
final File jsonFile = new File(jsonPath);
try (InputStream jsonStream = new ByteArrayInputStream(FileUtils.readFileToByteArray(jsonFile))) {
testReadRecords(jsonStream, schema, expected);
@ -1240,15 +1241,15 @@ class TestJsonTreeRowRecordReader {
List<Object> expected,
StartingFieldStrategy strategy,
String startingFieldName,
SchemaApplicationStrategy schemaApplicationStrategy) throws IOException, MalformedRecordException {
SchemaApplicationStrategy schemaApplicationStrategy) throws Exception {
final File jsonFile = new File(jsonPath);
try (InputStream jsonStream = new ByteArrayInputStream(FileUtils.readFileToByteArray(jsonFile))) {
testReadRecords(jsonStream, schema, expected, strategy, startingFieldName, schemaApplicationStrategy);
}
}
private void testReadRecords(InputStream jsonStream, RecordSchema schema, List<Object> expected) throws IOException, MalformedRecordException {
try (JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(jsonStream, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
private void testReadRecords(InputStream jsonStream, RecordSchema schema, List<Object> expected) throws Exception {
try (JsonTreeRowRecordReader reader = createJsonTreeRowRecordReader(jsonStream, schema)) {
List<Object> actual = new ArrayList<>();
Record record;
while ((record = reader.nextRecord()) != null) {
@ -1280,10 +1281,10 @@ class TestJsonTreeRowRecordReader {
StartingFieldStrategy strategy,
String startingFieldName,
SchemaApplicationStrategy schemaApplicationStrategy)
throws IOException, MalformedRecordException {
throws Exception {
try (JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(jsonStream, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat,
strategy, startingFieldName, schemaApplicationStrategy, null)) {
try (JsonTreeRowRecordReader reader = createJsonTreeRowRecordReader(jsonStream, schema, dateFormat, timeFormat, timestampFormat,
strategy, startingFieldName, schemaApplicationStrategy, null, false, null)) {
List<Object> actual = new ArrayList<>();
Record record;
@ -1312,12 +1313,23 @@ class TestJsonTreeRowRecordReader {
private RecordSchema inferSchema(InputStream jsonStream, StartingFieldStrategy strategy, String startingFieldName) throws IOException {
RecordSchema schema = new InferSchemaAccessStrategy<>(
(__, inputStream) -> new JsonRecordSource(inputStream, strategy, startingFieldName, StreamReadConstraints.defaults()),
new JsonSchemaInference(new TimeValueInference(null, null, null)),
mock(ComponentLog.class)
new JsonSchemaInference(new TimeValueInference(null, null, null)), log
).getSchema(Collections.emptyMap(), jsonStream, null);
jsonStream.reset();
return schema;
}
private JsonTreeRowRecordReader createJsonTreeRowRecordReader(InputStream inputStream, RecordSchema recordSchema) throws Exception {
return createJsonTreeRowRecordReader(inputStream, recordSchema, dateFormat, timeFormat, timestampFormat, null, null, null, null, false, null);
}
private JsonTreeRowRecordReader createJsonTreeRowRecordReader(InputStream inputStream, RecordSchema recordSchema, String dateFormat, String timeFormat, String timestampFormat,
StartingFieldStrategy startingFieldStrategy, String startingFieldName, SchemaApplicationStrategy schemaApplicationStrategy,
BiPredicate<String, String> captureFieldPredicate, boolean allowComments, StreamReadConstraints streamReadConstraints)
throws Exception {
return new JsonTreeRowRecordReader(inputStream, log, recordSchema, dateFormat, timeFormat, timestampFormat, startingFieldStrategy, startingFieldName, schemaApplicationStrategy,
captureFieldPredicate, allowComments, streamReadConstraints, new JsonParserFactory());
}
}