From 7295e3dc212e0ce6d6ed4fdc9d837d3076a56a7f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lehel=20Bo=C3=A9r?= Date: Wed, 15 Feb 2023 18:37:15 +0100 Subject: [PATCH] NIFI-11147: Query all fields in QuerySalesforceObject Fix review comments --- .../salesforce/QuerySalesforceObject.java | 40 +++++++++++++----- .../SalesforceToRecordSchemaConverter.java | 28 +++++++------ ...SalesforceToRecordSchemaConverterTest.java | 42 +++++++++++++------ .../salesforce/util/TestRecordExtender.java | 6 +-- 4 files changed, 78 insertions(+), 38 deletions(-) diff --git a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java index 8235375d69..8a2a465c7a 100644 --- a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java +++ b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java @@ -16,6 +16,8 @@ */ package org.apache.nifi.processors.salesforce; +import org.apache.camel.component.salesforce.api.dto.SObjectDescription; +import org.apache.camel.component.salesforce.api.dto.SObjectField; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.PrimaryNodeOnly; @@ -59,6 +61,7 @@ import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.RecordField; import org.apache.nifi.serialization.record.RecordFieldType; import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.util.StringUtils; import java.io.IOException; import java.io.InputStream; @@ -79,6 +82,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiPredicate; +import java.util.stream.Collectors; import static org.apache.nifi.processors.salesforce.util.CommonSalesforceProperties.API_URL; import static org.apache.nifi.processors.salesforce.util.CommonSalesforceProperties.API_VERSION; @@ -118,8 +122,8 @@ public class QuerySalesforceObject extends AbstractProcessor { static final PropertyDescriptor FIELD_NAMES = new PropertyDescriptor.Builder() .name("field-names") .displayName("Field Names") - .description("Comma-separated list of field names requested from the sObject to be queried") - .required(true) + .description("Comma-separated list of field names requested from the sObject to be queried. When this field is left empty, all fields are queried.") + .required(false) .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .addValidator(StandardValidators.NON_BLANK_VALIDATOR) .build(); @@ -293,7 +297,14 @@ public class QuerySalesforceObject extends AbstractProcessor { ageFilterUpper = ageFilterUpperTime.format(DateTimeFormatter.ISO_OFFSET_DATE_TIME); } - ConvertedSalesforceSchema convertedSalesforceSchema = getConvertedSalesforceSchema(sObject, fields); + SalesforceSchemaHolder salesForceSchemaHolder = getConvertedSalesforceSchema(sObject, fields); + + if (StringUtils.isBlank(fields)) { + fields = salesForceSchemaHolder.getSalesforceObject().getFields() + .stream() + .map(SObjectField::getName) + .collect(Collectors.joining(",")); + } String querySObject = buildQuery( sObject, @@ -322,7 +333,7 @@ public class QuerySalesforceObject extends AbstractProcessor { JsonTreeRowRecordReader jsonReader = new JsonTreeRowRecordReader( querySObjectResultInputStream, getLogger(), - convertedSalesforceSchema.recordSchema, + salesForceSchemaHolder.recordSchema, DATE_FORMAT, TIME_FORMAT, DATE_TIME_FORMAT, @@ -336,7 +347,7 @@ public class QuerySalesforceObject extends AbstractProcessor { getLogger(), writerFactory.getSchema( originalAttributes, - convertedSalesforceSchema.recordSchema + salesForceSchemaHolder.recordSchema ), out, originalAttributes @@ -394,7 +405,7 @@ public class QuerySalesforceObject extends AbstractProcessor { return salesforceRestService.getNextRecords(nextRecordsUrl.get()); } - private ConvertedSalesforceSchema getConvertedSalesforceSchema(String sObject, String fields) { + private SalesforceSchemaHolder getConvertedSalesforceSchema(String sObject, String fields) { try (InputStream describeSObjectResult = salesforceRestService.describeSObject(sObject)) { return convertSchema(describeSObjectResult, fields); } catch (IOException e) { @@ -410,9 +421,10 @@ public class QuerySalesforceObject extends AbstractProcessor { } } - protected ConvertedSalesforceSchema convertSchema(InputStream describeSObjectResult, String fields) { + protected SalesforceSchemaHolder convertSchema(InputStream describeSObjectResult, String fieldsOfInterest) { try { - RecordSchema recordSchema = salesForceToRecordSchemaConverter.convertSchema(describeSObjectResult, fields); + SObjectDescription salesforceObject = salesForceToRecordSchemaConverter.getSalesforceObject(describeSObjectResult); + RecordSchema recordSchema = salesForceToRecordSchemaConverter.convertSchema(salesforceObject, fieldsOfInterest); RecordSchema querySObjectResultSchema = new SimpleRecordSchema(Collections.singletonList( new RecordField(STARTING_FIELD_NAME, RecordFieldType.ARRAY.getArrayDataType( @@ -422,7 +434,7 @@ public class QuerySalesforceObject extends AbstractProcessor { )) )); - return new ConvertedSalesforceSchema(querySObjectResultSchema, recordSchema); + return new SalesforceSchemaHolder(querySObjectResultSchema, recordSchema, salesforceObject); } catch (IOException e) { throw new ProcessException("SObject to Record schema conversion failed", e); } @@ -465,13 +477,19 @@ public class QuerySalesforceObject extends AbstractProcessor { return queryBuilder.toString(); } - static class ConvertedSalesforceSchema { + static class SalesforceSchemaHolder { RecordSchema querySObjectResultSchema; RecordSchema recordSchema; + SObjectDescription salesforceObject; - public ConvertedSalesforceSchema(RecordSchema querySObjectResultSchema, RecordSchema recordSchema) { + public SalesforceSchemaHolder(RecordSchema querySObjectResultSchema, RecordSchema recordSchema, SObjectDescription salesforceObject) { this.querySObjectResultSchema = querySObjectResultSchema; this.recordSchema = recordSchema; + this.salesforceObject = salesforceObject; + } + + public SObjectDescription getSalesforceObject() { + return salesforceObject; } } } diff --git a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/SalesforceToRecordSchemaConverter.java b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/SalesforceToRecordSchemaConverter.java index f4396415ea..5bc6637161 100644 --- a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/SalesforceToRecordSchemaConverter.java +++ b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/SalesforceToRecordSchemaConverter.java @@ -24,6 +24,7 @@ import org.apache.nifi.serialization.SimpleRecordSchema; import org.apache.nifi.serialization.record.RecordField; import org.apache.nifi.serialization.record.RecordFieldType; import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.util.StringUtils; import java.io.IOException; import java.io.InputStream; @@ -33,27 +34,30 @@ import java.util.List; import java.util.stream.Collectors; public class SalesforceToRecordSchemaConverter { - + private static final ObjectMapper OBJECT_MAPPER = JsonUtils.createObjectMapper(); private final String dateFormat; private final String dateTimeFormat; private final String timeFormat; - private final ObjectMapper objectMapper; public SalesforceToRecordSchemaConverter(String dateFormat, String dateTimeFormat, String timeFormat) { this.dateFormat = dateFormat; this.dateTimeFormat = dateTimeFormat; this.timeFormat = timeFormat; - objectMapper = JsonUtils.createObjectMapper(); } - public RecordSchema convertSchema(final InputStream describeSOjbectResultJsonString, final String fieldNamesOfInterest) throws IOException { + public SObjectDescription getSalesforceObject(InputStream salesforceObjectResultJsonString) throws IOException { + return OBJECT_MAPPER.readValue(salesforceObjectResultJsonString, SObjectDescription.class); + } - final SObjectDescription sObjectDescription = objectMapper.readValue(describeSOjbectResultJsonString, SObjectDescription.class); - final List listOfFieldNamesOfInterest = Arrays.asList(fieldNamesOfInterest.toLowerCase().split("\\s*,\\s*")); - final List fields = sObjectDescription.getFields() - .stream() - .filter(sObjectField -> listOfFieldNamesOfInterest.contains(sObjectField.getName().toLowerCase())) - .collect(Collectors.toList()); + public RecordSchema convertSchema(SObjectDescription salesforceObject, String fieldNamesOfInterest) { + List fields = salesforceObject.getFields(); + if (StringUtils.isNotBlank(fieldNamesOfInterest)) { + final List listOfFieldNamesOfInterest = Arrays.asList(fieldNamesOfInterest.toLowerCase().split("\\s*,\\s*")); + fields = fields + .stream() + .filter(sObjectField -> listOfFieldNamesOfInterest.contains(sObjectField.getName().toLowerCase())) + .collect(Collectors.toList()); + } final List recordFields = new ArrayList<>(); @@ -110,8 +114,8 @@ public class SalesforceToRecordSchemaConverter { recordFields.add(new RecordField(field.getName(), RecordFieldType.RECORD.getRecordDataType(locationSchema), field.getDefaultValue(), field.isNillable())); break; default: - throw new IllegalArgumentException(String.format("Could not create determine schema for '%s'. Could not convert field '%s' of soap type '%s'.", - sObjectDescription.getName(), field.getName(), soapType)); + throw new IllegalArgumentException(String.format("Could not determine schema for '%s'. Could not convert field '%s' of soap type '%s'.", + salesforceObject.getName(), field.getName(), soapType)); } } diff --git a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/util/SalesforceToRecordSchemaConverterTest.java b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/util/SalesforceToRecordSchemaConverterTest.java index c0bc3a80c1..408bd62394 100644 --- a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/util/SalesforceToRecordSchemaConverterTest.java +++ b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/util/SalesforceToRecordSchemaConverterTest.java @@ -17,6 +17,7 @@ package org.apache.nifi.processors.salesforce.util; import com.fasterxml.jackson.databind.exc.MismatchedInputException; +import org.apache.camel.component.salesforce.api.dto.SObjectDescription; import org.apache.nifi.serialization.SimpleRecordSchema; import org.apache.nifi.serialization.record.RecordField; import org.apache.nifi.serialization.record.RecordFieldType; @@ -29,7 +30,6 @@ import java.io.IOException; import java.io.InputStream; import java.nio.charset.StandardCharsets; import java.util.Arrays; -import java.util.Collections; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -69,7 +69,8 @@ class SalesforceToRecordSchemaConverterTest { )); try (final InputStream sfSchema = readFile(TEST_PATH + salesforceSchemaFileName)) { - final RecordSchema actual = converter.convertSchema(sfSchema, fieldNames); + final SObjectDescription salesforceObject = converter.getSalesforceObject(sfSchema); + final RecordSchema actual = converter.convertSchema(salesforceObject, fieldNames); assertEquals(expected, actual); } @@ -102,7 +103,8 @@ class SalesforceToRecordSchemaConverterTest { )); try (final InputStream sfSchema = readFile(TEST_PATH + salesforceSchemaFileName)) { - final RecordSchema actual = converter.convertSchema(sfSchema, fieldNames); + final SObjectDescription salesforceObject = converter.getSalesforceObject(sfSchema); + final RecordSchema actual = converter.convertSchema(salesforceObject, fieldNames); assertEquals(expected, actual); } @@ -119,21 +121,36 @@ class SalesforceToRecordSchemaConverterTest { )); try (final InputStream sfSchema = readFile(TEST_PATH + salesforceSchemaFileName)) { - final RecordSchema actual = converter.convertSchema(sfSchema, fieldNames); + final SObjectDescription salesforceObject = converter.getSalesforceObject(sfSchema); + final RecordSchema actual = converter.convertSchema(salesforceObject, fieldNames); assertEquals(expected, actual); } } @Test - void testSelectEmptyFields() throws IOException { + void testSelectAllFields() throws IOException { final String salesforceSchemaFileName = "simple_sf_schema.json"; final String fieldNames = ""; - final RecordSchema expected = new SimpleRecordSchema(Collections.emptyList()); + final RecordSchema expected = new SimpleRecordSchema(Arrays.asList( + new RecordField("ExampleInt", RecordFieldType.INT.getDataType()), + new RecordField("ExampleLong", RecordFieldType.LONG.getDataType()), + new RecordField("ExampleDouble", RecordFieldType.DOUBLE.getDataType()), + new RecordField("ExampleBoolean", RecordFieldType.BOOLEAN.getDataType()), + new RecordField("ExampleID", RecordFieldType.STRING.getDataType()), + new RecordField("ExampleString", RecordFieldType.STRING.getDataType()), + new RecordField("ExampleJson", RecordFieldType.STRING.getDataType()), + new RecordField("ExampleBase64Binary", RecordFieldType.STRING.getDataType()), + new RecordField("ExampleAnyType", RecordFieldType.STRING.getDataType()), + new RecordField("ExampleDate", RecordFieldType.DATE.getDataType("yyyy-mm-dd")), + new RecordField("ExampleDateTime", RecordFieldType.TIMESTAMP.getDataType("yyyy-mm-dd / hh:mm:ss")), + new RecordField("ExampleTime", RecordFieldType.TIME.getDataType("hh:mm:ss")) + )); try (final InputStream sfSchema = readFile(TEST_PATH + salesforceSchemaFileName)) { - final RecordSchema actual = converter.convertSchema(sfSchema, fieldNames); + final SObjectDescription salesforceObject = converter.getSalesforceObject(sfSchema); + final RecordSchema actual = converter.convertSchema(salesforceObject, fieldNames); assertEquals(expected, actual); } @@ -142,22 +159,23 @@ class SalesforceToRecordSchemaConverterTest { @Test void testConvertEmptySchema() throws IOException { try (final InputStream sfSchema = new ByteArrayInputStream("".getBytes(StandardCharsets.UTF_8))) { - assertThrows(MismatchedInputException.class, () -> converter.convertSchema(sfSchema, "ExampleField")); + assertThrows(MismatchedInputException.class, () -> converter.getSalesforceObject(sfSchema)); } } @Test - void testConvertNullSchema() { + void testConvertNullSchema() throws IOException { final InputStream sfSchema = null; - assertThrows(IllegalArgumentException.class, () -> converter.convertSchema(sfSchema, "ExampleField")); + assertThrows(IllegalArgumentException.class, () -> converter.getSalesforceObject(sfSchema)); } @Test void testConvertUnknownDataType() throws IOException { try (final InputStream sfSchema = readFile(TEST_PATH + "unknown_type_sf_schema.json")) { final String fieldNames = "FieldWithUnknownType"; - final IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> converter.convertSchema(sfSchema, fieldNames)); - final String errorMessage = "Could not create determine schema for 'SObjectWithUnknownFieldType'. Could not convert field 'FieldWithUnknownType' of soap type 'xsd:unknown'."; + final SObjectDescription salesforceObject = converter.getSalesforceObject(sfSchema); + final IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> converter.convertSchema(salesforceObject, fieldNames)); + final String errorMessage = "Could not determine schema for 'SObjectWithUnknownFieldType'. Could not convert field 'FieldWithUnknownType' of soap type 'xsd:unknown'."; assertEquals(errorMessage, exception.getMessage()); } } diff --git a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/util/TestRecordExtender.java b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/util/TestRecordExtender.java index a55f10b8c1..bd48c5b5b6 100644 --- a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/util/TestRecordExtender.java +++ b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/util/TestRecordExtender.java @@ -94,15 +94,15 @@ class TestRecordExtender { int referenceId = 0; String objectType = "Account"; - MapRecord testRecord = new MapRecord(ORIGINAL_SCHEMA, new HashMap() {{ + MapRecord testRecord = new MapRecord(ORIGINAL_SCHEMA, new HashMap<>() {{ put("testRecordField1", "testRecordValue1"); put("testRecordField2", "testRecordValue2"); }}); - MapRecord expectedRecord = new MapRecord(EXPECTED_EXTENDED_SCHEMA, new HashMap() {{ + MapRecord expectedRecord = new MapRecord(EXPECTED_EXTENDED_SCHEMA, new HashMap<>() {{ put("attributes", - new MapRecord(ATTRIBUTES_RECORD_SCHEMA, new HashMap() {{ + new MapRecord(ATTRIBUTES_RECORD_SCHEMA, new HashMap<>() {{ put("type", objectType); put("referenceId", referenceId); }})