NIFI-11147: Query all fields in QuerySalesforceObject

Fix review comments
This commit is contained in:
Lehel Boér 2023-02-15 18:37:15 +01:00 committed by Matthew Burgess
parent 48689a2567
commit 7295e3dc21
No known key found for this signature in database
GPG Key ID: 05D3DEB8126DAD24
4 changed files with 78 additions and 38 deletions

View File

@ -16,6 +16,8 @@
*/
package org.apache.nifi.processors.salesforce;
import org.apache.camel.component.salesforce.api.dto.SObjectDescription;
import org.apache.camel.component.salesforce.api.dto.SObjectField;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
@ -59,6 +61,7 @@ import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.util.StringUtils;
import java.io.IOException;
import java.io.InputStream;
@ -79,6 +82,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiPredicate;
import java.util.stream.Collectors;
import static org.apache.nifi.processors.salesforce.util.CommonSalesforceProperties.API_URL;
import static org.apache.nifi.processors.salesforce.util.CommonSalesforceProperties.API_VERSION;
@ -118,8 +122,8 @@ public class QuerySalesforceObject extends AbstractProcessor {
static final PropertyDescriptor FIELD_NAMES = new PropertyDescriptor.Builder()
.name("field-names")
.displayName("Field Names")
.description("Comma-separated list of field names requested from the sObject to be queried")
.required(true)
.description("Comma-separated list of field names requested from the sObject to be queried. When this field is left empty, all fields are queried.")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.build();
@ -293,7 +297,14 @@ public class QuerySalesforceObject extends AbstractProcessor {
ageFilterUpper = ageFilterUpperTime.format(DateTimeFormatter.ISO_OFFSET_DATE_TIME);
}
ConvertedSalesforceSchema convertedSalesforceSchema = getConvertedSalesforceSchema(sObject, fields);
SalesforceSchemaHolder salesForceSchemaHolder = getConvertedSalesforceSchema(sObject, fields);
if (StringUtils.isBlank(fields)) {
fields = salesForceSchemaHolder.getSalesforceObject().getFields()
.stream()
.map(SObjectField::getName)
.collect(Collectors.joining(","));
}
String querySObject = buildQuery(
sObject,
@ -322,7 +333,7 @@ public class QuerySalesforceObject extends AbstractProcessor {
JsonTreeRowRecordReader jsonReader = new JsonTreeRowRecordReader(
querySObjectResultInputStream,
getLogger(),
convertedSalesforceSchema.recordSchema,
salesForceSchemaHolder.recordSchema,
DATE_FORMAT,
TIME_FORMAT,
DATE_TIME_FORMAT,
@ -336,7 +347,7 @@ public class QuerySalesforceObject extends AbstractProcessor {
getLogger(),
writerFactory.getSchema(
originalAttributes,
convertedSalesforceSchema.recordSchema
salesForceSchemaHolder.recordSchema
),
out,
originalAttributes
@ -394,7 +405,7 @@ public class QuerySalesforceObject extends AbstractProcessor {
return salesforceRestService.getNextRecords(nextRecordsUrl.get());
}
private ConvertedSalesforceSchema getConvertedSalesforceSchema(String sObject, String fields) {
private SalesforceSchemaHolder getConvertedSalesforceSchema(String sObject, String fields) {
try (InputStream describeSObjectResult = salesforceRestService.describeSObject(sObject)) {
return convertSchema(describeSObjectResult, fields);
} catch (IOException e) {
@ -410,9 +421,10 @@ public class QuerySalesforceObject extends AbstractProcessor {
}
}
protected ConvertedSalesforceSchema convertSchema(InputStream describeSObjectResult, String fields) {
protected SalesforceSchemaHolder convertSchema(InputStream describeSObjectResult, String fieldsOfInterest) {
try {
RecordSchema recordSchema = salesForceToRecordSchemaConverter.convertSchema(describeSObjectResult, fields);
SObjectDescription salesforceObject = salesForceToRecordSchemaConverter.getSalesforceObject(describeSObjectResult);
RecordSchema recordSchema = salesForceToRecordSchemaConverter.convertSchema(salesforceObject, fieldsOfInterest);
RecordSchema querySObjectResultSchema = new SimpleRecordSchema(Collections.singletonList(
new RecordField(STARTING_FIELD_NAME, RecordFieldType.ARRAY.getArrayDataType(
@ -422,7 +434,7 @@ public class QuerySalesforceObject extends AbstractProcessor {
))
));
return new ConvertedSalesforceSchema(querySObjectResultSchema, recordSchema);
return new SalesforceSchemaHolder(querySObjectResultSchema, recordSchema, salesforceObject);
} catch (IOException e) {
throw new ProcessException("SObject to Record schema conversion failed", e);
}
@ -465,13 +477,19 @@ public class QuerySalesforceObject extends AbstractProcessor {
return queryBuilder.toString();
}
static class ConvertedSalesforceSchema {
static class SalesforceSchemaHolder {
RecordSchema querySObjectResultSchema;
RecordSchema recordSchema;
SObjectDescription salesforceObject;
public ConvertedSalesforceSchema(RecordSchema querySObjectResultSchema, RecordSchema recordSchema) {
public SalesforceSchemaHolder(RecordSchema querySObjectResultSchema, RecordSchema recordSchema, SObjectDescription salesforceObject) {
this.querySObjectResultSchema = querySObjectResultSchema;
this.recordSchema = recordSchema;
this.salesforceObject = salesforceObject;
}
public SObjectDescription getSalesforceObject() {
return salesforceObject;
}
}
}

View File

@ -24,6 +24,7 @@ import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.util.StringUtils;
import java.io.IOException;
import java.io.InputStream;
@ -33,27 +34,30 @@ import java.util.List;
import java.util.stream.Collectors;
public class SalesforceToRecordSchemaConverter {
private static final ObjectMapper OBJECT_MAPPER = JsonUtils.createObjectMapper();
private final String dateFormat;
private final String dateTimeFormat;
private final String timeFormat;
private final ObjectMapper objectMapper;
public SalesforceToRecordSchemaConverter(String dateFormat, String dateTimeFormat, String timeFormat) {
this.dateFormat = dateFormat;
this.dateTimeFormat = dateTimeFormat;
this.timeFormat = timeFormat;
objectMapper = JsonUtils.createObjectMapper();
}
public RecordSchema convertSchema(final InputStream describeSOjbectResultJsonString, final String fieldNamesOfInterest) throws IOException {
public SObjectDescription getSalesforceObject(InputStream salesforceObjectResultJsonString) throws IOException {
return OBJECT_MAPPER.readValue(salesforceObjectResultJsonString, SObjectDescription.class);
}
final SObjectDescription sObjectDescription = objectMapper.readValue(describeSOjbectResultJsonString, SObjectDescription.class);
public RecordSchema convertSchema(SObjectDescription salesforceObject, String fieldNamesOfInterest) {
List<SObjectField> fields = salesforceObject.getFields();
if (StringUtils.isNotBlank(fieldNamesOfInterest)) {
final List<String> listOfFieldNamesOfInterest = Arrays.asList(fieldNamesOfInterest.toLowerCase().split("\\s*,\\s*"));
final List<SObjectField> fields = sObjectDescription.getFields()
fields = fields
.stream()
.filter(sObjectField -> listOfFieldNamesOfInterest.contains(sObjectField.getName().toLowerCase()))
.collect(Collectors.toList());
}
final List<RecordField> recordFields = new ArrayList<>();
@ -110,8 +114,8 @@ public class SalesforceToRecordSchemaConverter {
recordFields.add(new RecordField(field.getName(), RecordFieldType.RECORD.getRecordDataType(locationSchema), field.getDefaultValue(), field.isNillable()));
break;
default:
throw new IllegalArgumentException(String.format("Could not create determine schema for '%s'. Could not convert field '%s' of soap type '%s'.",
sObjectDescription.getName(), field.getName(), soapType));
throw new IllegalArgumentException(String.format("Could not determine schema for '%s'. Could not convert field '%s' of soap type '%s'.",
salesforceObject.getName(), field.getName(), soapType));
}
}

View File

@ -17,6 +17,7 @@
package org.apache.nifi.processors.salesforce.util;
import com.fasterxml.jackson.databind.exc.MismatchedInputException;
import org.apache.camel.component.salesforce.api.dto.SObjectDescription;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
@ -29,7 +30,6 @@ import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
@ -69,7 +69,8 @@ class SalesforceToRecordSchemaConverterTest {
));
try (final InputStream sfSchema = readFile(TEST_PATH + salesforceSchemaFileName)) {
final RecordSchema actual = converter.convertSchema(sfSchema, fieldNames);
final SObjectDescription salesforceObject = converter.getSalesforceObject(sfSchema);
final RecordSchema actual = converter.convertSchema(salesforceObject, fieldNames);
assertEquals(expected, actual);
}
@ -102,7 +103,8 @@ class SalesforceToRecordSchemaConverterTest {
));
try (final InputStream sfSchema = readFile(TEST_PATH + salesforceSchemaFileName)) {
final RecordSchema actual = converter.convertSchema(sfSchema, fieldNames);
final SObjectDescription salesforceObject = converter.getSalesforceObject(sfSchema);
final RecordSchema actual = converter.convertSchema(salesforceObject, fieldNames);
assertEquals(expected, actual);
}
@ -119,21 +121,36 @@ class SalesforceToRecordSchemaConverterTest {
));
try (final InputStream sfSchema = readFile(TEST_PATH + salesforceSchemaFileName)) {
final RecordSchema actual = converter.convertSchema(sfSchema, fieldNames);
final SObjectDescription salesforceObject = converter.getSalesforceObject(sfSchema);
final RecordSchema actual = converter.convertSchema(salesforceObject, fieldNames);
assertEquals(expected, actual);
}
}
@Test
void testSelectEmptyFields() throws IOException {
void testSelectAllFields() throws IOException {
final String salesforceSchemaFileName = "simple_sf_schema.json";
final String fieldNames = "";
final RecordSchema expected = new SimpleRecordSchema(Collections.emptyList());
final RecordSchema expected = new SimpleRecordSchema(Arrays.asList(
new RecordField("ExampleInt", RecordFieldType.INT.getDataType()),
new RecordField("ExampleLong", RecordFieldType.LONG.getDataType()),
new RecordField("ExampleDouble", RecordFieldType.DOUBLE.getDataType()),
new RecordField("ExampleBoolean", RecordFieldType.BOOLEAN.getDataType()),
new RecordField("ExampleID", RecordFieldType.STRING.getDataType()),
new RecordField("ExampleString", RecordFieldType.STRING.getDataType()),
new RecordField("ExampleJson", RecordFieldType.STRING.getDataType()),
new RecordField("ExampleBase64Binary", RecordFieldType.STRING.getDataType()),
new RecordField("ExampleAnyType", RecordFieldType.STRING.getDataType()),
new RecordField("ExampleDate", RecordFieldType.DATE.getDataType("yyyy-mm-dd")),
new RecordField("ExampleDateTime", RecordFieldType.TIMESTAMP.getDataType("yyyy-mm-dd / hh:mm:ss")),
new RecordField("ExampleTime", RecordFieldType.TIME.getDataType("hh:mm:ss"))
));
try (final InputStream sfSchema = readFile(TEST_PATH + salesforceSchemaFileName)) {
final RecordSchema actual = converter.convertSchema(sfSchema, fieldNames);
final SObjectDescription salesforceObject = converter.getSalesforceObject(sfSchema);
final RecordSchema actual = converter.convertSchema(salesforceObject, fieldNames);
assertEquals(expected, actual);
}
@ -142,22 +159,23 @@ class SalesforceToRecordSchemaConverterTest {
@Test
void testConvertEmptySchema() throws IOException {
try (final InputStream sfSchema = new ByteArrayInputStream("".getBytes(StandardCharsets.UTF_8))) {
assertThrows(MismatchedInputException.class, () -> converter.convertSchema(sfSchema, "ExampleField"));
assertThrows(MismatchedInputException.class, () -> converter.getSalesforceObject(sfSchema));
}
}
@Test
void testConvertNullSchema() {
void testConvertNullSchema() throws IOException {
final InputStream sfSchema = null;
assertThrows(IllegalArgumentException.class, () -> converter.convertSchema(sfSchema, "ExampleField"));
assertThrows(IllegalArgumentException.class, () -> converter.getSalesforceObject(sfSchema));
}
@Test
void testConvertUnknownDataType() throws IOException {
try (final InputStream sfSchema = readFile(TEST_PATH + "unknown_type_sf_schema.json")) {
final String fieldNames = "FieldWithUnknownType";
final IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> converter.convertSchema(sfSchema, fieldNames));
final String errorMessage = "Could not create determine schema for 'SObjectWithUnknownFieldType'. Could not convert field 'FieldWithUnknownType' of soap type 'xsd:unknown'.";
final SObjectDescription salesforceObject = converter.getSalesforceObject(sfSchema);
final IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> converter.convertSchema(salesforceObject, fieldNames));
final String errorMessage = "Could not determine schema for 'SObjectWithUnknownFieldType'. Could not convert field 'FieldWithUnknownType' of soap type 'xsd:unknown'.";
assertEquals(errorMessage, exception.getMessage());
}
}

View File

@ -94,15 +94,15 @@ class TestRecordExtender {
int referenceId = 0;
String objectType = "Account";
MapRecord testRecord = new MapRecord(ORIGINAL_SCHEMA, new HashMap<String, Object>() {{
MapRecord testRecord = new MapRecord(ORIGINAL_SCHEMA, new HashMap<>() {{
put("testRecordField1", "testRecordValue1");
put("testRecordField2", "testRecordValue2");
}});
MapRecord expectedRecord = new MapRecord(EXPECTED_EXTENDED_SCHEMA, new HashMap<String, Object>() {{
MapRecord expectedRecord = new MapRecord(EXPECTED_EXTENDED_SCHEMA, new HashMap<>() {{
put("attributes",
new MapRecord(ATTRIBUTES_RECORD_SCHEMA, new HashMap<String, Object>() {{
new MapRecord(ATTRIBUTES_RECORD_SCHEMA, new HashMap<>() {{
put("type", objectType);
put("referenceId", referenceId);
}})