From 63aac1a31d5d35fb133d5768abf99201964a16b4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lehel=20Bo=C3=A9r?= Date: Fri, 23 Sep 2022 05:10:12 +0200 Subject: [PATCH] NIFI-10513: Added capture non-record fields to JsonTreeRowRecordReader, added pagination to QuerySalesforceObject This closes #6444. Signed-off-by: Tamas Palfy --- .../json/AbstractJsonRowRecordReader.java | 77 +++++++++- .../nifi/json/JsonTreeRowRecordReader.java | 11 +- .../salesforce/QuerySalesforceObject.java | 137 ++++++++++-------- .../util/SalesforceRestService.java | 15 ++ .../org/apache/nifi/json/JsonTreeReader.java | 4 +- .../json/TestJsonTreeRowRecordReader.java | 87 +++++++++-- .../test/resources/json/capture-fields.json | 20 +++ 7 files changed, 263 insertions(+), 88 deletions(-) create mode 100644 nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/capture-fields.json diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java index a3515d03db..b024beed6c 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java @@ -21,7 +21,6 @@ import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.core.JsonParseException; import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.core.JsonToken; -import com.fasterxml.jackson.core.io.SerializedString; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ArrayNode; @@ -48,9 +47,11 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.function.BiPredicate; import java.util.function.Supplier; public abstract class AbstractJsonRowRecordReader implements RecordReader { + private final ComponentLog logger; private final Supplier LAZY_DATE_FORMAT; private final Supplier LAZY_TIME_FORMAT; @@ -64,6 +65,9 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader { private JsonNode firstJsonNode; private StartingFieldStrategy strategy; + private Map capturedFields; + private BiPredicate captureFieldPredicate; + private AbstractJsonRowRecordReader(final ComponentLog logger, final String dateFormat, final String timeFormat, final String timestampFormat) { this.logger = logger; @@ -76,27 +80,61 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader { LAZY_TIMESTAMP_FORMAT = () -> tsf; } - protected AbstractJsonRowRecordReader(final InputStream in, final ComponentLog logger, final String dateFormat, final String timeFormat, final String timestampFormat) + protected AbstractJsonRowRecordReader(final InputStream in, + final ComponentLog logger, + final String dateFormat, + final String timeFormat, + final String timestampFormat) throws IOException, MalformedRecordException { - this(in, logger, dateFormat, timeFormat, timestampFormat, null, null); + this(in, logger, dateFormat, timeFormat, timestampFormat, null, null, null); } - protected AbstractJsonRowRecordReader(final InputStream in, final ComponentLog logger, final String dateFormat, final String timeFormat, final String timestampFormat, - final StartingFieldStrategy strategy, final String nestedFieldName) throws IOException, MalformedRecordException { + /** + * Constructor with initial logic for JSON to NiFi record parsing. + * + * @param in the input stream to parse + * @param logger ComponentLog + * @param dateFormat format for parsing date fields + * @param timeFormat format for parsing time fields + * @param timestampFormat format for parsing timestamp fields + * @param strategy whether to start processing from a specific field + * @param nestedFieldName the name of the field to start the processing from + * @param captureFieldPredicate predicate that takes a JSON fieldName and fieldValue to capture top-level non-processed fields which can + * be accessed by calling {@link #getCapturedFields()} + * @throws IOException in case of JSON stream processing failure + * @throws MalformedRecordException in case of malformed JSON input + */ + protected AbstractJsonRowRecordReader(final InputStream in, + final ComponentLog logger, + final String dateFormat, + final String timeFormat, + final String timestampFormat, + final StartingFieldStrategy strategy, + final String nestedFieldName, + final BiPredicate captureFieldPredicate) + throws IOException, MalformedRecordException { this(logger, dateFormat, timeFormat, timestampFormat); this.strategy = strategy; + this.captureFieldPredicate = captureFieldPredicate; + capturedFields = new HashMap<>(); try { jsonParser = jsonFactory.createParser(in); jsonParser.setCodec(codec); if (strategy == StartingFieldStrategy.NESTED_FIELD) { - final SerializedString serializedStartingFieldName = new SerializedString(nestedFieldName); - while (!jsonParser.nextFieldName(serializedStartingFieldName) && jsonParser.hasCurrentToken()); - logger.debug("Parsing starting at nested field [{}]", nestedFieldName); + while (jsonParser.nextToken() != null) { + if (nestedFieldName.equals(jsonParser.getCurrentName())) { + logger.debug("Parsing starting at nested field [{}]", nestedFieldName); + break; + } + if (captureFieldPredicate != null) { + captureCurrentField(captureFieldPredicate); + } + } } JsonToken token = jsonParser.nextToken(); @@ -130,6 +168,11 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader { public Record nextRecord(final boolean coerceTypes, final boolean dropUnknownFields) throws IOException, MalformedRecordException { final JsonNode nextNode = getNextJsonNode(); if (nextNode == null) { + if (captureFieldPredicate != null) { + while (jsonParser.nextToken() != null) { + captureCurrentField(captureFieldPredicate); + } + } return null; } @@ -242,6 +285,19 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader { return null; } + private void captureCurrentField(BiPredicate captureFieldPredicate) throws IOException { + if (jsonParser.getCurrentToken() == JsonToken.FIELD_NAME) { + jsonParser.nextToken(); + + final String fieldName = jsonParser.getCurrentName(); + final String fieldValue = jsonParser.getValueAsString(); + + if (captureFieldPredicate.test(fieldName, fieldValue)) { + capturedFields.put(fieldName, fieldValue); + } + } + } + private Map getMapFromRawValue(final JsonNode fieldNode, final DataType dataType, final String fieldName) throws IOException { if (dataType == null || dataType.getFieldType() != RecordFieldType.MAP) { return null; @@ -389,4 +445,9 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader { } protected abstract Record convertJsonNodeToRecord(JsonNode nextNode, RecordSchema schema, boolean coerceTypes, boolean dropUnknownFields) throws IOException, MalformedRecordException; + + + public Map getCapturedFields() { + return capturedFields; + } } diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java index ae2ae11afb..fce50e35c9 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java @@ -37,13 +37,14 @@ import org.apache.nifi.serialization.record.util.DataTypeUtils; import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; -import java.util.LinkedHashMap; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Queue; +import java.util.function.BiPredicate; import java.util.function.Supplier; public class JsonTreeRowRecordReader extends AbstractJsonRowRecordReader { @@ -52,16 +53,16 @@ public class JsonTreeRowRecordReader extends AbstractJsonRowRecordReader { public JsonTreeRowRecordReader(final InputStream in, final ComponentLog logger, final RecordSchema schema, final String dateFormat, final String timeFormat, final String timestampFormat) throws IOException, MalformedRecordException { - this(in, logger, schema, dateFormat, timeFormat, timestampFormat, null, null, null); + this(in, logger, schema, dateFormat, timeFormat, timestampFormat, null, null, null, null); } public JsonTreeRowRecordReader(final InputStream in, final ComponentLog logger, final RecordSchema schema, final String dateFormat, final String timeFormat, final String timestampFormat, final StartingFieldStrategy startingFieldStrategy, final String startingFieldName, - final SchemaApplicationStrategy schemaApplicationStrategy) + final SchemaApplicationStrategy schemaApplicationStrategy, final BiPredicate captureFieldPredicate) throws IOException, MalformedRecordException { - super(in, logger, dateFormat, timeFormat, timestampFormat, startingFieldStrategy, startingFieldName); + super(in, logger, dateFormat, timeFormat, timestampFormat, startingFieldStrategy, startingFieldName, captureFieldPredicate); if (startingFieldStrategy == StartingFieldStrategy.NESTED_FIELD && schemaApplicationStrategy == SchemaApplicationStrategy.WHOLE_JSON) { this.schema = getSelectedSchema(schema, startingFieldName); } else { @@ -79,7 +80,7 @@ public class JsonTreeRowRecordReader extends AbstractJsonRowRecordReader { return getChildSchemaFromField(optionalRecordField.get()); } else { for (RecordField field : currentSchema.getFields()) { - if (field.getDataType() instanceof ArrayDataType || field.getDataType() instanceof RecordDataType) { + if (field.getDataType() instanceof ArrayDataType || field.getDataType() instanceof RecordDataType) { schemas.add(getChildSchemaFromField(field)); } } diff --git a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java index df3ff3c6b3..5d1ba58d11 100644 --- a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java +++ b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java @@ -75,6 +75,8 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiPredicate; @PrimaryNodeOnly @TriggerSerially @@ -220,6 +222,8 @@ public class QuerySalesforceObject extends AbstractProcessor { private static final String DATE_FORMAT = "yyyy-MM-dd"; private static final String TIME_FORMAT = "HH:mm:ss.SSSX"; private static final String DATE_TIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSZZZZ"; + private static final String NEXT_RECORDS_URL = "nextRecordsUrl"; + private static final BiPredicate CAPTURE_PREDICATE = (fieldName, fieldValue) -> NEXT_RECORDS_URL.equals(fieldName); private volatile SalesforceToRecordSchemaConverter salesForceToRecordSchemaConverter; private volatile SalesforceRestService salesforceRestService; @@ -330,76 +334,93 @@ public class QuerySalesforceObject extends AbstractProcessor { ageFilterUpper ); - FlowFile flowFile = session.create(); + AtomicReference nextRecordsUrl = new AtomicReference<>(); - Map originalAttributes = flowFile.getAttributes(); - Map attributes = new HashMap<>(); + do { - AtomicInteger recordCountHolder = new AtomicInteger(); + FlowFile flowFile = session.create(); + Map originalAttributes = flowFile.getAttributes(); + Map attributes = new HashMap<>(); - flowFile = session.write(flowFile, out -> { - try ( - InputStream querySObjectResultInputStream = salesforceRestService.query(querySObject); - JsonTreeRowRecordReader jsonReader = new JsonTreeRowRecordReader( - querySObjectResultInputStream, - getLogger(), - convertedSalesforceSchema.recordSchema, - DATE_FORMAT, - TIME_FORMAT, - DATE_TIME_FORMAT, - StartingFieldStrategy.NESTED_FIELD, - STARTING_FIELD_NAME, - SchemaApplicationStrategy.SELECTED_PART - ); + AtomicInteger recordCountHolder = new AtomicInteger(); - RecordSetWriter writer = writerFactory.createWriter( - getLogger(), - writerFactory.getSchema( - originalAttributes, - convertedSalesforceSchema.recordSchema - ), - out, - originalAttributes - ) - ) { - writer.beginRecordSet(); + flowFile = session.write(flowFile, out -> { + try ( + InputStream querySObjectResultInputStream = getResultInputStream(nextRecordsUrl, querySObject); - Record querySObjectRecord; - while ((querySObjectRecord = jsonReader.nextRecord()) != null) { - writer.write(querySObjectRecord); + JsonTreeRowRecordReader jsonReader = new JsonTreeRowRecordReader( + querySObjectResultInputStream, + getLogger(), + convertedSalesforceSchema.recordSchema, + DATE_FORMAT, + TIME_FORMAT, + DATE_TIME_FORMAT, + StartingFieldStrategy.NESTED_FIELD, + STARTING_FIELD_NAME, + SchemaApplicationStrategy.SELECTED_PART, + CAPTURE_PREDICATE + ); + + RecordSetWriter writer = writerFactory.createWriter( + getLogger(), + writerFactory.getSchema( + originalAttributes, + convertedSalesforceSchema.recordSchema + ), + out, + originalAttributes + ) + ) { + writer.beginRecordSet(); + + Record querySObjectRecord; + while ((querySObjectRecord = jsonReader.nextRecord()) != null) { + writer.write(querySObjectRecord); + } + + WriteResult writeResult = writer.finishRecordSet(); + + Map capturedFields = jsonReader.getCapturedFields(); + + nextRecordsUrl.set(capturedFields.getOrDefault(NEXT_RECORDS_URL, null)); + + attributes.put("record.count", String.valueOf(writeResult.getRecordCount())); + attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType()); + attributes.putAll(writeResult.getAttributes()); + + recordCountHolder.set(writeResult.getRecordCount()); + + if (ageFilterUpper != null) { + Map newState = new HashMap<>(state.toMap()); + newState.put(LAST_AGE_FILTER, ageFilterUpper); + updateState(context, newState); + } + } catch (SchemaNotFoundException e) { + throw new ProcessException("Couldn't create record writer", e); + } catch (MalformedRecordException e) { + throw new ProcessException("Couldn't read records from input", e); } + }); - WriteResult writeResult = writer.finishRecordSet(); + int recordCount = recordCountHolder.get(); - attributes.put("record.count", String.valueOf(writeResult.getRecordCount())); - attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType()); - attributes.putAll(writeResult.getAttributes()); + if (!createZeroRecordFlowFiles && recordCount == 0) { + session.remove(flowFile); + } else { + flowFile = session.putAllAttributes(flowFile, attributes); + session.transfer(flowFile, REL_SUCCESS); - recordCountHolder.set(writeResult.getRecordCount()); - - if (ageFilterUpper != null) { - Map newState = new HashMap<>(state.toMap()); - newState.put(LAST_AGE_FILTER, ageFilterUpper); - updateState(context, newState); - } - } catch (SchemaNotFoundException e) { - throw new ProcessException("Couldn't create record writer", e); - } catch (MalformedRecordException e) { - throw new ProcessException("Couldn't read records from input", e); + session.adjustCounter("Records Processed", recordCount, false); + getLogger().info("Successfully written {} records for {}", recordCount, flowFile); } - }); + } while (nextRecordsUrl.get() != null); + } - int recordCount = recordCountHolder.get(); - - if (!createZeroRecordFlowFiles && recordCount == 0) { - session.remove(flowFile); - } else { - flowFile = session.putAllAttributes(flowFile, attributes); - session.transfer(flowFile, REL_SUCCESS); - - session.adjustCounter("Records Processed", recordCount, false); - getLogger().info("Successfully written {} records for {}", recordCount, flowFile); + private InputStream getResultInputStream(AtomicReference nextRecordsUrl, String querySObject) { + if (nextRecordsUrl.get() == null) { + return salesforceRestService.query(querySObject); } + return salesforceRestService.getNextRecords(nextRecordsUrl.get()); } private ConvertedSalesforceSchema getConvertedSalesforceSchema(String sObject, String fields) { diff --git a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/SalesforceRestService.java b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/SalesforceRestService.java index bc3f746158..8d6a8fddae 100644 --- a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/SalesforceRestService.java +++ b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/SalesforceRestService.java @@ -69,6 +69,21 @@ public class SalesforceRestService { return request(request); } + public InputStream getNextRecords(String nextRecordsUrl) { + String url = baseUrl + nextRecordsUrl; + + HttpUrl httpUrl = HttpUrl.get(url).newBuilder() + .build(); + + Request request = new Request.Builder() + .addHeader("Authorization", "Bearer " + accessTokenProvider.get()) + .url(httpUrl) + .get() + .build(); + + return request(request); + } + private InputStream request(Request request) { Response response = null; try { diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java index 5de7d74c7d..1d4caa8dbc 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java @@ -76,7 +76,6 @@ public class JsonTreeReader extends SchemaRegistryService implements RecordReade private volatile StartingFieldStrategy startingFieldStrategy; private volatile SchemaApplicationStrategy schemaApplicationStrategy; - public static final PropertyDescriptor STARTING_FIELD_STRATEGY = new PropertyDescriptor.Builder() .name("starting-field-strategy") .displayName("Starting Field Strategy") @@ -165,6 +164,7 @@ public class JsonTreeReader extends SchemaRegistryService implements RecordReade public RecordReader createRecordReader(final Map variables, final InputStream in, final long inputLength, final ComponentLog logger) throws IOException, MalformedRecordException, SchemaNotFoundException { final RecordSchema schema = getSchema(variables, in, null); - return new JsonTreeRowRecordReader(in, logger, schema, dateFormat, timeFormat, timestampFormat, startingFieldStrategy, startingFieldName, schemaApplicationStrategy); + return new JsonTreeRowRecordReader(in, logger, schema, dateFormat, timeFormat, timestampFormat, startingFieldStrategy, startingFieldName, + schemaApplicationStrategy, null); } } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java index e885c95df7..e8b0b6c48c 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java @@ -51,7 +51,9 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.function.BiPredicate; import java.util.function.Function; import java.util.stream.Collectors; @@ -94,14 +96,6 @@ class TestJsonTreeRowRecordReader { return new SimpleRecordSchema(accountFields); } - private RecordSchema getSchema() { - final DataType accountType = RecordFieldType.RECORD.getRecordDataType(getAccountSchema()); - final List fields = getDefaultFields(); - fields.add(new RecordField("account", accountType)); - fields.remove(new RecordField("balance", RecordFieldType.DOUBLE.getDataType())); - return new SimpleRecordSchema(fields); - } - @Test void testReadChoiceOfStringOrArrayOfRecords() throws IOException, MalformedRecordException { final File schemaFile = new File("src/test/resources/json/choice-of-string-or-array-record.avsc"); @@ -1253,6 +1247,54 @@ class TestJsonTreeRowRecordReader { "nestedLevel2Record", SchemaApplicationStrategy.WHOLE_JSON); } + @Test + void testCaptureFields() throws IOException, MalformedRecordException { + Map expectedCapturedFields = new HashMap<>(); + expectedCapturedFields.put("id", "1"); + expectedCapturedFields.put("zipCode", "11111"); + expectedCapturedFields.put("country", "USA"); + expectedCapturedFields.put("job", null); + Set fieldsToCapture = expectedCapturedFields.keySet(); + BiPredicate capturePredicate = (fieldName, fieldValue) -> fieldsToCapture.contains(fieldName); + String startingFieldName = "accounts"; + + + SimpleRecordSchema accountRecordSchema = new SimpleRecordSchema(Arrays.asList( + new RecordField("id", RecordFieldType.INT.getDataType()), + new RecordField("balance", RecordFieldType.DOUBLE.getDataType()) + )); + + SimpleRecordSchema jobRecordSchema = new SimpleRecordSchema(Arrays.asList( + new RecordField("salary", RecordFieldType.INT.getDataType()), + new RecordField("position", RecordFieldType.STRING.getDataType()) + )); + + SimpleRecordSchema recordSchema = new SimpleRecordSchema(Arrays.asList( + new RecordField("id", RecordFieldType.INT.getDataType()), + new RecordField("accounts", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(accountRecordSchema))), + new RecordField("name", RecordFieldType.STRING.getDataType()), + new RecordField("address", RecordFieldType.STRING.getDataType()), + new RecordField("city", RecordFieldType.STRING.getDataType()), + new RecordField("job", RecordFieldType.RECORD.getRecordDataType(jobRecordSchema)), + new RecordField("state", RecordFieldType.STRING.getDataType()), + new RecordField("zipCode", RecordFieldType.STRING.getDataType()), + new RecordField("country", RecordFieldType.STRING.getDataType()) + )); + + try (InputStream in = new FileInputStream("src/test/resources/json/capture-fields.json")) { + JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader( + in, mock(ComponentLog.class), recordSchema, + dateFormat, timeFormat, timestampFormat, + StartingFieldStrategy.NESTED_FIELD, startingFieldName, + SchemaApplicationStrategy.SELECTED_PART, capturePredicate); + + while (reader.nextRecord() != null); + Map capturedFields = reader.getCapturedFields(); + + assertEquals(expectedCapturedFields, capturedFields); + } + } + private void testReadRecords(String jsonPath, List expected) throws IOException, MalformedRecordException { final File jsonFile = new File(jsonPath); try ( @@ -1263,8 +1305,12 @@ class TestJsonTreeRowRecordReader { } } - private void testReadRecords(String jsonPath, List expected, StartingFieldStrategy strategy, - String startingFieldName) throws IOException, MalformedRecordException { + private void testReadRecords(String jsonPath, + List expected, + StartingFieldStrategy strategy, + String startingFieldName) + throws IOException, MalformedRecordException { + final File jsonFile = new File(jsonPath); try (InputStream jsonStream = new ByteArrayInputStream(FileUtils.readFileToByteArray(jsonFile))) { RecordSchema schema = inferSchema(jsonStream, strategy, startingFieldName); @@ -1279,8 +1325,14 @@ class TestJsonTreeRowRecordReader { } } - private void testReadRecords(String jsonPath, RecordSchema schema, List expected, StartingFieldStrategy strategy, - String startingFieldName, SchemaApplicationStrategy schemaApplicationStrategy) throws IOException, MalformedRecordException { + private void testReadRecords(String jsonPath, + RecordSchema schema, + List expected, + StartingFieldStrategy strategy, + String startingFieldName, + SchemaApplicationStrategy schemaApplicationStrategy + ) throws IOException, MalformedRecordException { + final File jsonFile = new File(jsonPath); try (InputStream jsonStream = new ByteArrayInputStream(FileUtils.readFileToByteArray(jsonFile))) { testReadRecords(jsonStream, schema, expected, strategy, startingFieldName, schemaApplicationStrategy); @@ -1314,11 +1366,16 @@ class TestJsonTreeRowRecordReader { } } - private void testReadRecords(InputStream jsonStream, RecordSchema schema, List expected, StartingFieldStrategy strategy, - String startingFieldName, SchemaApplicationStrategy schemaApplicationStrategy) + private void testReadRecords(InputStream jsonStream, + RecordSchema schema, + List expected, + StartingFieldStrategy strategy, + String startingFieldName, + SchemaApplicationStrategy schemaApplicationStrategy) throws IOException, MalformedRecordException { + try (JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(jsonStream, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat, - strategy, startingFieldName, schemaApplicationStrategy)) { + strategy, startingFieldName, schemaApplicationStrategy, null)) { List actual = new ArrayList<>(); Record record; diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/capture-fields.json b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/capture-fields.json new file mode 100644 index 0000000000..2011528f23 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/capture-fields.json @@ -0,0 +1,20 @@ +{ + "id": 1, + "accounts": [{ + "id": 42, + "balance": 4750.89 + }, { + "id": 43, + "balance": 48212.38 + }], + "name": "John Doe", + "address": "123 My Street", + "city": "My City", + "job" : { + "salary": 115431, + "position": "acountant" + }, + "state": "MS", + "zipCode": "11111", + "country": "USA" +} \ No newline at end of file