NIFI-10513: Added capture non-record fields to JsonTreeRowRecordReader, added pagination to QuerySalesforceObject

This closes #6444.

Signed-off-by: Tamas Palfy <tpalfy@apache.org>
This commit is contained in:
Lehel Boér 2022-09-23 05:10:12 +02:00 committed by Tamas Palfy
parent 1d9e119084
commit 63aac1a31d
7 changed files with 263 additions and 88 deletions

View File

@ -21,7 +21,6 @@ import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.core.io.SerializedString;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
@ -48,9 +47,11 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.BiPredicate;
import java.util.function.Supplier;
public abstract class AbstractJsonRowRecordReader implements RecordReader {
private final ComponentLog logger;
private final Supplier<DateFormat> LAZY_DATE_FORMAT;
private final Supplier<DateFormat> LAZY_TIME_FORMAT;
@ -64,6 +65,9 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader {
private JsonNode firstJsonNode;
private StartingFieldStrategy strategy;
private Map<String, String> capturedFields;
private BiPredicate<String, String> captureFieldPredicate;
private AbstractJsonRowRecordReader(final ComponentLog logger, final String dateFormat, final String timeFormat, final String timestampFormat) {
this.logger = logger;
@ -76,27 +80,61 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader {
LAZY_TIMESTAMP_FORMAT = () -> tsf;
}
protected AbstractJsonRowRecordReader(final InputStream in, final ComponentLog logger, final String dateFormat, final String timeFormat, final String timestampFormat)
protected AbstractJsonRowRecordReader(final InputStream in,
final ComponentLog logger,
final String dateFormat,
final String timeFormat,
final String timestampFormat)
throws IOException, MalformedRecordException {
this(in, logger, dateFormat, timeFormat, timestampFormat, null, null);
this(in, logger, dateFormat, timeFormat, timestampFormat, null, null, null);
}
protected AbstractJsonRowRecordReader(final InputStream in, final ComponentLog logger, final String dateFormat, final String timeFormat, final String timestampFormat,
final StartingFieldStrategy strategy, final String nestedFieldName) throws IOException, MalformedRecordException {
/**
* Constructor with initial logic for JSON to NiFi record parsing.
*
* @param in the input stream to parse
* @param logger ComponentLog
* @param dateFormat format for parsing date fields
* @param timeFormat format for parsing time fields
* @param timestampFormat format for parsing timestamp fields
* @param strategy whether to start processing from a specific field
* @param nestedFieldName the name of the field to start the processing from
* @param captureFieldPredicate predicate that takes a JSON fieldName and fieldValue to capture top-level non-processed fields which can
* be accessed by calling {@link #getCapturedFields()}
* @throws IOException in case of JSON stream processing failure
* @throws MalformedRecordException in case of malformed JSON input
*/
protected AbstractJsonRowRecordReader(final InputStream in,
final ComponentLog logger,
final String dateFormat,
final String timeFormat,
final String timestampFormat,
final StartingFieldStrategy strategy,
final String nestedFieldName,
final BiPredicate<String, String> captureFieldPredicate)
throws IOException, MalformedRecordException {
this(logger, dateFormat, timeFormat, timestampFormat);
this.strategy = strategy;
this.captureFieldPredicate = captureFieldPredicate;
capturedFields = new HashMap<>();
try {
jsonParser = jsonFactory.createParser(in);
jsonParser.setCodec(codec);
if (strategy == StartingFieldStrategy.NESTED_FIELD) {
final SerializedString serializedStartingFieldName = new SerializedString(nestedFieldName);
while (!jsonParser.nextFieldName(serializedStartingFieldName) && jsonParser.hasCurrentToken());
logger.debug("Parsing starting at nested field [{}]", nestedFieldName);
while (jsonParser.nextToken() != null) {
if (nestedFieldName.equals(jsonParser.getCurrentName())) {
logger.debug("Parsing starting at nested field [{}]", nestedFieldName);
break;
}
if (captureFieldPredicate != null) {
captureCurrentField(captureFieldPredicate);
}
}
}
JsonToken token = jsonParser.nextToken();
@ -130,6 +168,11 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader {
public Record nextRecord(final boolean coerceTypes, final boolean dropUnknownFields) throws IOException, MalformedRecordException {
final JsonNode nextNode = getNextJsonNode();
if (nextNode == null) {
if (captureFieldPredicate != null) {
while (jsonParser.nextToken() != null) {
captureCurrentField(captureFieldPredicate);
}
}
return null;
}
@ -242,6 +285,19 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader {
return null;
}
private void captureCurrentField(BiPredicate<String, String> captureFieldPredicate) throws IOException {
if (jsonParser.getCurrentToken() == JsonToken.FIELD_NAME) {
jsonParser.nextToken();
final String fieldName = jsonParser.getCurrentName();
final String fieldValue = jsonParser.getValueAsString();
if (captureFieldPredicate.test(fieldName, fieldValue)) {
capturedFields.put(fieldName, fieldValue);
}
}
}
private Map<String, Object> getMapFromRawValue(final JsonNode fieldNode, final DataType dataType, final String fieldName) throws IOException {
if (dataType == null || dataType.getFieldType() != RecordFieldType.MAP) {
return null;
@ -389,4 +445,9 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader {
}
protected abstract Record convertJsonNodeToRecord(JsonNode nextNode, RecordSchema schema, boolean coerceTypes, boolean dropUnknownFields) throws IOException, MalformedRecordException;
public Map<String, String> getCapturedFields() {
return capturedFields;
}
}

View File

@ -37,13 +37,14 @@ import org.apache.nifi.serialization.record.util.DataTypeUtils;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.function.BiPredicate;
import java.util.function.Supplier;
public class JsonTreeRowRecordReader extends AbstractJsonRowRecordReader {
@ -52,16 +53,16 @@ public class JsonTreeRowRecordReader extends AbstractJsonRowRecordReader {
public JsonTreeRowRecordReader(final InputStream in, final ComponentLog logger, final RecordSchema schema,
final String dateFormat, final String timeFormat, final String timestampFormat) throws IOException, MalformedRecordException {
this(in, logger, schema, dateFormat, timeFormat, timestampFormat, null, null, null);
this(in, logger, schema, dateFormat, timeFormat, timestampFormat, null, null, null, null);
}
public JsonTreeRowRecordReader(final InputStream in, final ComponentLog logger, final RecordSchema schema,
final String dateFormat, final String timeFormat, final String timestampFormat,
final StartingFieldStrategy startingFieldStrategy, final String startingFieldName,
final SchemaApplicationStrategy schemaApplicationStrategy)
final SchemaApplicationStrategy schemaApplicationStrategy, final BiPredicate<String, String> captureFieldPredicate)
throws IOException, MalformedRecordException {
super(in, logger, dateFormat, timeFormat, timestampFormat, startingFieldStrategy, startingFieldName);
super(in, logger, dateFormat, timeFormat, timestampFormat, startingFieldStrategy, startingFieldName, captureFieldPredicate);
if (startingFieldStrategy == StartingFieldStrategy.NESTED_FIELD && schemaApplicationStrategy == SchemaApplicationStrategy.WHOLE_JSON) {
this.schema = getSelectedSchema(schema, startingFieldName);
} else {
@ -79,7 +80,7 @@ public class JsonTreeRowRecordReader extends AbstractJsonRowRecordReader {
return getChildSchemaFromField(optionalRecordField.get());
} else {
for (RecordField field : currentSchema.getFields()) {
if (field.getDataType() instanceof ArrayDataType || field.getDataType() instanceof RecordDataType) {
if (field.getDataType() instanceof ArrayDataType || field.getDataType() instanceof RecordDataType) {
schemas.add(getChildSchemaFromField(field));
}
}

View File

@ -75,6 +75,8 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiPredicate;
@PrimaryNodeOnly
@TriggerSerially
@ -220,6 +222,8 @@ public class QuerySalesforceObject extends AbstractProcessor {
private static final String DATE_FORMAT = "yyyy-MM-dd";
private static final String TIME_FORMAT = "HH:mm:ss.SSSX";
private static final String DATE_TIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSZZZZ";
private static final String NEXT_RECORDS_URL = "nextRecordsUrl";
private static final BiPredicate<String, String> CAPTURE_PREDICATE = (fieldName, fieldValue) -> NEXT_RECORDS_URL.equals(fieldName);
private volatile SalesforceToRecordSchemaConverter salesForceToRecordSchemaConverter;
private volatile SalesforceRestService salesforceRestService;
@ -330,76 +334,93 @@ public class QuerySalesforceObject extends AbstractProcessor {
ageFilterUpper
);
FlowFile flowFile = session.create();
AtomicReference<String> nextRecordsUrl = new AtomicReference<>();
Map<String, String> originalAttributes = flowFile.getAttributes();
Map<String, String> attributes = new HashMap<>();
do {
AtomicInteger recordCountHolder = new AtomicInteger();
FlowFile flowFile = session.create();
Map<String, String> originalAttributes = flowFile.getAttributes();
Map<String, String> attributes = new HashMap<>();
flowFile = session.write(flowFile, out -> {
try (
InputStream querySObjectResultInputStream = salesforceRestService.query(querySObject);
JsonTreeRowRecordReader jsonReader = new JsonTreeRowRecordReader(
querySObjectResultInputStream,
getLogger(),
convertedSalesforceSchema.recordSchema,
DATE_FORMAT,
TIME_FORMAT,
DATE_TIME_FORMAT,
StartingFieldStrategy.NESTED_FIELD,
STARTING_FIELD_NAME,
SchemaApplicationStrategy.SELECTED_PART
);
AtomicInteger recordCountHolder = new AtomicInteger();
RecordSetWriter writer = writerFactory.createWriter(
getLogger(),
writerFactory.getSchema(
originalAttributes,
convertedSalesforceSchema.recordSchema
),
out,
originalAttributes
)
) {
writer.beginRecordSet();
flowFile = session.write(flowFile, out -> {
try (
InputStream querySObjectResultInputStream = getResultInputStream(nextRecordsUrl, querySObject);
Record querySObjectRecord;
while ((querySObjectRecord = jsonReader.nextRecord()) != null) {
writer.write(querySObjectRecord);
JsonTreeRowRecordReader jsonReader = new JsonTreeRowRecordReader(
querySObjectResultInputStream,
getLogger(),
convertedSalesforceSchema.recordSchema,
DATE_FORMAT,
TIME_FORMAT,
DATE_TIME_FORMAT,
StartingFieldStrategy.NESTED_FIELD,
STARTING_FIELD_NAME,
SchemaApplicationStrategy.SELECTED_PART,
CAPTURE_PREDICATE
);
RecordSetWriter writer = writerFactory.createWriter(
getLogger(),
writerFactory.getSchema(
originalAttributes,
convertedSalesforceSchema.recordSchema
),
out,
originalAttributes
)
) {
writer.beginRecordSet();
Record querySObjectRecord;
while ((querySObjectRecord = jsonReader.nextRecord()) != null) {
writer.write(querySObjectRecord);
}
WriteResult writeResult = writer.finishRecordSet();
Map<String, String> capturedFields = jsonReader.getCapturedFields();
nextRecordsUrl.set(capturedFields.getOrDefault(NEXT_RECORDS_URL, null));
attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
attributes.putAll(writeResult.getAttributes());
recordCountHolder.set(writeResult.getRecordCount());
if (ageFilterUpper != null) {
Map<String, String> newState = new HashMap<>(state.toMap());
newState.put(LAST_AGE_FILTER, ageFilterUpper);
updateState(context, newState);
}
} catch (SchemaNotFoundException e) {
throw new ProcessException("Couldn't create record writer", e);
} catch (MalformedRecordException e) {
throw new ProcessException("Couldn't read records from input", e);
}
});
WriteResult writeResult = writer.finishRecordSet();
int recordCount = recordCountHolder.get();
attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
attributes.putAll(writeResult.getAttributes());
if (!createZeroRecordFlowFiles && recordCount == 0) {
session.remove(flowFile);
} else {
flowFile = session.putAllAttributes(flowFile, attributes);
session.transfer(flowFile, REL_SUCCESS);
recordCountHolder.set(writeResult.getRecordCount());
if (ageFilterUpper != null) {
Map<String, String> newState = new HashMap<>(state.toMap());
newState.put(LAST_AGE_FILTER, ageFilterUpper);
updateState(context, newState);
}
} catch (SchemaNotFoundException e) {
throw new ProcessException("Couldn't create record writer", e);
} catch (MalformedRecordException e) {
throw new ProcessException("Couldn't read records from input", e);
session.adjustCounter("Records Processed", recordCount, false);
getLogger().info("Successfully written {} records for {}", recordCount, flowFile);
}
});
} while (nextRecordsUrl.get() != null);
}
int recordCount = recordCountHolder.get();
if (!createZeroRecordFlowFiles && recordCount == 0) {
session.remove(flowFile);
} else {
flowFile = session.putAllAttributes(flowFile, attributes);
session.transfer(flowFile, REL_SUCCESS);
session.adjustCounter("Records Processed", recordCount, false);
getLogger().info("Successfully written {} records for {}", recordCount, flowFile);
private InputStream getResultInputStream(AtomicReference<String> nextRecordsUrl, String querySObject) {
if (nextRecordsUrl.get() == null) {
return salesforceRestService.query(querySObject);
}
return salesforceRestService.getNextRecords(nextRecordsUrl.get());
}
private ConvertedSalesforceSchema getConvertedSalesforceSchema(String sObject, String fields) {

View File

@ -69,6 +69,21 @@ public class SalesforceRestService {
return request(request);
}
public InputStream getNextRecords(String nextRecordsUrl) {
String url = baseUrl + nextRecordsUrl;
HttpUrl httpUrl = HttpUrl.get(url).newBuilder()
.build();
Request request = new Request.Builder()
.addHeader("Authorization", "Bearer " + accessTokenProvider.get())
.url(httpUrl)
.get()
.build();
return request(request);
}
private InputStream request(Request request) {
Response response = null;
try {

View File

@ -76,7 +76,6 @@ public class JsonTreeReader extends SchemaRegistryService implements RecordReade
private volatile StartingFieldStrategy startingFieldStrategy;
private volatile SchemaApplicationStrategy schemaApplicationStrategy;
public static final PropertyDescriptor STARTING_FIELD_STRATEGY = new PropertyDescriptor.Builder()
.name("starting-field-strategy")
.displayName("Starting Field Strategy")
@ -165,6 +164,7 @@ public class JsonTreeReader extends SchemaRegistryService implements RecordReade
public RecordReader createRecordReader(final Map<String, String> variables, final InputStream in, final long inputLength, final ComponentLog logger)
throws IOException, MalformedRecordException, SchemaNotFoundException {
final RecordSchema schema = getSchema(variables, in, null);
return new JsonTreeRowRecordReader(in, logger, schema, dateFormat, timeFormat, timestampFormat, startingFieldStrategy, startingFieldName, schemaApplicationStrategy);
return new JsonTreeRowRecordReader(in, logger, schema, dateFormat, timeFormat, timestampFormat, startingFieldStrategy, startingFieldName,
schemaApplicationStrategy, null);
}
}

View File

@ -51,7 +51,9 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.BiPredicate;
import java.util.function.Function;
import java.util.stream.Collectors;
@ -94,14 +96,6 @@ class TestJsonTreeRowRecordReader {
return new SimpleRecordSchema(accountFields);
}
private RecordSchema getSchema() {
final DataType accountType = RecordFieldType.RECORD.getRecordDataType(getAccountSchema());
final List<RecordField> fields = getDefaultFields();
fields.add(new RecordField("account", accountType));
fields.remove(new RecordField("balance", RecordFieldType.DOUBLE.getDataType()));
return new SimpleRecordSchema(fields);
}
@Test
void testReadChoiceOfStringOrArrayOfRecords() throws IOException, MalformedRecordException {
final File schemaFile = new File("src/test/resources/json/choice-of-string-or-array-record.avsc");
@ -1253,6 +1247,54 @@ class TestJsonTreeRowRecordReader {
"nestedLevel2Record", SchemaApplicationStrategy.WHOLE_JSON);
}
@Test
void testCaptureFields() throws IOException, MalformedRecordException {
Map<String, String> expectedCapturedFields = new HashMap<>();
expectedCapturedFields.put("id", "1");
expectedCapturedFields.put("zipCode", "11111");
expectedCapturedFields.put("country", "USA");
expectedCapturedFields.put("job", null);
Set<String> fieldsToCapture = expectedCapturedFields.keySet();
BiPredicate<String, String> capturePredicate = (fieldName, fieldValue) -> fieldsToCapture.contains(fieldName);
String startingFieldName = "accounts";
SimpleRecordSchema accountRecordSchema = new SimpleRecordSchema(Arrays.asList(
new RecordField("id", RecordFieldType.INT.getDataType()),
new RecordField("balance", RecordFieldType.DOUBLE.getDataType())
));
SimpleRecordSchema jobRecordSchema = new SimpleRecordSchema(Arrays.asList(
new RecordField("salary", RecordFieldType.INT.getDataType()),
new RecordField("position", RecordFieldType.STRING.getDataType())
));
SimpleRecordSchema recordSchema = new SimpleRecordSchema(Arrays.asList(
new RecordField("id", RecordFieldType.INT.getDataType()),
new RecordField("accounts", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(accountRecordSchema))),
new RecordField("name", RecordFieldType.STRING.getDataType()),
new RecordField("address", RecordFieldType.STRING.getDataType()),
new RecordField("city", RecordFieldType.STRING.getDataType()),
new RecordField("job", RecordFieldType.RECORD.getRecordDataType(jobRecordSchema)),
new RecordField("state", RecordFieldType.STRING.getDataType()),
new RecordField("zipCode", RecordFieldType.STRING.getDataType()),
new RecordField("country", RecordFieldType.STRING.getDataType())
));
try (InputStream in = new FileInputStream("src/test/resources/json/capture-fields.json")) {
JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(
in, mock(ComponentLog.class), recordSchema,
dateFormat, timeFormat, timestampFormat,
StartingFieldStrategy.NESTED_FIELD, startingFieldName,
SchemaApplicationStrategy.SELECTED_PART, capturePredicate);
while (reader.nextRecord() != null);
Map<String, String> capturedFields = reader.getCapturedFields();
assertEquals(expectedCapturedFields, capturedFields);
}
}
private void testReadRecords(String jsonPath, List<Object> expected) throws IOException, MalformedRecordException {
final File jsonFile = new File(jsonPath);
try (
@ -1263,8 +1305,12 @@ class TestJsonTreeRowRecordReader {
}
}
private void testReadRecords(String jsonPath, List<Object> expected, StartingFieldStrategy strategy,
String startingFieldName) throws IOException, MalformedRecordException {
private void testReadRecords(String jsonPath,
List<Object> expected,
StartingFieldStrategy strategy,
String startingFieldName)
throws IOException, MalformedRecordException {
final File jsonFile = new File(jsonPath);
try (InputStream jsonStream = new ByteArrayInputStream(FileUtils.readFileToByteArray(jsonFile))) {
RecordSchema schema = inferSchema(jsonStream, strategy, startingFieldName);
@ -1279,8 +1325,14 @@ class TestJsonTreeRowRecordReader {
}
}
private void testReadRecords(String jsonPath, RecordSchema schema, List<Object> expected, StartingFieldStrategy strategy,
String startingFieldName, SchemaApplicationStrategy schemaApplicationStrategy) throws IOException, MalformedRecordException {
private void testReadRecords(String jsonPath,
RecordSchema schema,
List<Object> expected,
StartingFieldStrategy strategy,
String startingFieldName,
SchemaApplicationStrategy schemaApplicationStrategy
) throws IOException, MalformedRecordException {
final File jsonFile = new File(jsonPath);
try (InputStream jsonStream = new ByteArrayInputStream(FileUtils.readFileToByteArray(jsonFile))) {
testReadRecords(jsonStream, schema, expected, strategy, startingFieldName, schemaApplicationStrategy);
@ -1314,11 +1366,16 @@ class TestJsonTreeRowRecordReader {
}
}
private void testReadRecords(InputStream jsonStream, RecordSchema schema, List<Object> expected, StartingFieldStrategy strategy,
String startingFieldName, SchemaApplicationStrategy schemaApplicationStrategy)
private void testReadRecords(InputStream jsonStream,
RecordSchema schema,
List<Object> expected,
StartingFieldStrategy strategy,
String startingFieldName,
SchemaApplicationStrategy schemaApplicationStrategy)
throws IOException, MalformedRecordException {
try (JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(jsonStream, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat,
strategy, startingFieldName, schemaApplicationStrategy)) {
strategy, startingFieldName, schemaApplicationStrategy, null)) {
List<Object> actual = new ArrayList<>();
Record record;

View File

@ -0,0 +1,20 @@
{
"id": 1,
"accounts": [{
"id": 42,
"balance": 4750.89
}, {
"id": 43,
"balance": 48212.38
}],
"name": "John Doe",
"address": "123 My Street",
"city": "My City",
"job" : {
"salary": 115431,
"position": "acountant"
},
"state": "MS",
"zipCode": "11111",
"country": "USA"
}