NIFI-11158 PutSalesforceObject processor improvements

This closes #6959.

Reviewed-by: Lehel <lehel44@hotmail.com>
Reviewed-by: Mark Bathori <bathori.mark@gmail.com>

Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
This commit is contained in:
krisztina-zsihovszki 2023-02-15 12:37:31 +01:00 committed by Peter Turcsanyi
parent 9773105841
commit 60c02225d5
No known key found for this signature in database
GPG Key ID: 55A813F1C3E553DC
8 changed files with 126 additions and 24 deletions

View File

@ -40,6 +40,7 @@ import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.registry.VariableDescriptor;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.state.MockStateManager;
@ -72,6 +73,7 @@ import java.util.function.Predicate;
import java.util.stream.Collectors;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toSet;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class StandardProcessorTestRunner implements TestRunner {
@ -366,7 +368,7 @@ public class StandardProcessorTestRunner implements TestRunner {
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))
)
.collect(Collectors.toSet());
.collect(toSet());
assertEquals(expectedAttributes, actualAttributes);
}
@ -1055,4 +1057,13 @@ public class StandardProcessorTestRunner implements TestRunner {
public void setRunSchedule(long runSchedule) {
this.runSchedule = runSchedule;
}
@Override
public void assertProvenanceEvent(final ProvenanceEventType eventType) {
Set<ProvenanceEventType> expectedEventTypes = Collections.singleton(eventType);
Set<ProvenanceEventType> actualEventTypes = getProvenanceEvents().stream()
.map(ProvenanceEventRecord::getEventType)
.collect(toSet());
assertEquals(expectedEventTypes, actualEventTypes);
}
}

View File

@ -28,6 +28,7 @@ import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.state.MockStateManager;
@ -1062,4 +1063,10 @@ public interface TestRunner {
*/
void setRunSchedule(long runSchedule);
/**
* Assert that provenance event was created with the specified event type.
*
* @param eventType Provenance event type
*/
void assertProvenanceEvent(ProvenanceEventType eventType);
}

View File

@ -20,7 +20,9 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.nifi.NullSuppression;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
@ -64,9 +66,13 @@ import static org.apache.nifi.processors.salesforce.util.CommonSalesforcePropert
@CapabilityDescription("Creates new records for the specified Salesforce sObject. The type of the Salesforce object must be set in the input flowfile's"
+ " 'objectType' attribute. This processor cannot update existing records.")
@ReadsAttribute(attribute = "objectType", description = "The Salesforce object type to upload records to. E.g. Account, Contact, Campaign.")
@WritesAttribute(attribute = "error.message", description = "The error message returned by Salesforce.")
@SeeAlso(QuerySalesforceObject.class)
public class PutSalesforceObject extends AbstractProcessor {
private static final int MAX_RECORD_COUNT = 200;
private static final String ATTR_OBJECT_TYPE = "objectType";
private static final String ATTR_ERROR_MESSAGE = "error.message";
protected static final PropertyDescriptor RECORD_READER_FACTORY = new PropertyDescriptor.Builder()
.name("record-reader")
@ -138,15 +144,19 @@ public class PutSalesforceObject extends AbstractProcessor {
return;
}
String objectType = flowFile.getAttribute("objectType");
String objectType = flowFile.getAttribute(ATTR_OBJECT_TYPE);
if (objectType == null) {
throw new ProcessException("Salesforce object type not found among the incoming flowfile attributes");
getLogger().error("Salesforce object type not found among the incoming FlowFile attributes");
flowFile = session.putAttribute(flowFile, ATTR_ERROR_MESSAGE, "Salesforce object type not found among FlowFile attributes");
session.transfer(session.penalize(flowFile), REL_FAILURE);
return;
}
RecordReaderFactory readerFactory = context.getProperty(RECORD_READER_FACTORY).asControllerService(RecordReaderFactory.class);
RecordExtender extender;
long startNanos = System.nanoTime();
try {
try (InputStream in = session.read(flowFile);
RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger());
ByteArrayOutputStream out = new ByteArrayOutputStream();
@ -170,24 +180,30 @@ public class PutSalesforceObject extends AbstractProcessor {
out.reset();
}
}
if (writer.isActiveRecordSet()) {
processRecords(objectType, out, writer, extender);
}
session.transfer(flowFile, REL_SUCCESS);
}
session.transfer(flowFile, REL_SUCCESS);
long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
session.getProvenanceReporter().send(flowFile, salesforceRestService.getVersionedBaseUrl()+ "/composite/tree/" + objectType, transferMillis);
} catch (MalformedRecordException e) {
getLogger().error("Couldn't read records from input", e);
session.transfer(flowFile, REL_FAILURE);
transferToFailure(session, flowFile, e);
} catch (SchemaNotFoundException e) {
getLogger().error("Couldn't create record writer", e);
session.transfer(flowFile, REL_FAILURE);
transferToFailure(session, flowFile, e);
} catch (Exception e) {
getLogger().error("Failed to put records to Salesforce.", e);
session.transfer(flowFile, REL_FAILURE);
transferToFailure(session, flowFile, e);
}
}
private void transferToFailure(ProcessSession session, FlowFile flowFile, Exception e) {
flowFile = session.putAttribute(flowFile, ATTR_ERROR_MESSAGE, e.getMessage());
session.transfer(session.penalize(flowFile), REL_FAILURE);
}
private void processRecords(String objectType, ByteArrayOutputStream out, WriteJsonResult writer, RecordExtender extender) throws IOException {
writer.finishRecordSet();
writer.flush();

View File

@ -32,6 +32,7 @@ import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.configuration.DefaultSchedule;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.AllowableValue;
@ -115,6 +116,7 @@ import static org.apache.nifi.processors.salesforce.util.CommonSalesforcePropert
@WritesAttribute(attribute = "record.count", description = "Sets the number of records in the FlowFile.")
})
@DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min")
@SeeAlso(PutSalesforceObject.class)
public class QuerySalesforceObject extends AbstractProcessor {
static final AllowableValue PROPERTY_BASED_QUERY = new AllowableValue("property-based-query", "Property Based Query", "Provide query by properties.");
@ -398,7 +400,7 @@ public class QuerySalesforceObject extends AbstractProcessor {
Map<String, String> attributes = new HashMap<>();
AtomicInteger recordCountHolder = new AtomicInteger();
long startNanos = System.nanoTime();
flowFile = session.write(flowFile, out -> {
try (
InputStream querySObjectResultInputStream = getResultInputStream(nextRecordsUrl.get(), querySObject);
@ -465,6 +467,10 @@ public class QuerySalesforceObject extends AbstractProcessor {
flowFile = session.putAllAttributes(flowFile, attributes);
session.transfer(flowFile, REL_SUCCESS);
long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
session.getProvenanceReporter().receive(flowFile, salesforceRestService.getVersionedBaseUrl() + "/composite/tree/" + sObject,
transferMillis);
session.adjustCounter("Records Processed", recordCount, false);
getLogger().info("Successfully written {} records for {}", recordCount, flowFile);
}

View File

@ -66,7 +66,9 @@ public class RecordExtender {
public MapRecord getExtendedRecord(String objectType, int count, Record record) {
Set<String> rawFieldNames = record.getRawFieldNames();
Set<String> rawFieldNames = record.getRawFieldNames().stream()
.filter(fieldName -> record.getValue(fieldName) != null)
.collect(Collectors.toSet());
Map<String, Object> objectMap = rawFieldNames.stream()
.collect(Collectors.toMap(Function.identity(), record::getValue));

View File

@ -44,7 +44,7 @@ public class SalesforceRestService {
}
public InputStream describeSObject(String sObject) {
String url = baseUrl + "/services/data/v" + version + "/sobjects/" + sObject + "/describe?maxRecords=1";
String url = getVersionedBaseUrl() + "/sobjects/" + sObject + "/describe?maxRecords=1";
Request request = new Request.Builder()
.addHeader("Authorization", "Bearer " + accessTokenProvider.get())
@ -56,7 +56,7 @@ public class SalesforceRestService {
}
public InputStream query(String query) {
String url = baseUrl + "/services/data/v" + version + "/query";
String url = getVersionedBaseUrl() + "/query";
HttpUrl httpUrl = HttpUrl.get(url).newBuilder()
.addQueryParameter("q", query)
@ -87,7 +87,7 @@ public class SalesforceRestService {
}
public InputStream postRecord(String sObjectApiName, String body) {
String url = baseUrl + "/services/data/v" + version + "/composite/tree/" + sObjectApiName;
String url = getVersionedBaseUrl() + "/composite/tree/" + sObjectApiName;
HttpUrl httpUrl = HttpUrl.get(url).newBuilder()
.build();
@ -103,6 +103,10 @@ public class SalesforceRestService {
return request(request);
}
public String getVersionedBaseUrl() {
return baseUrl + "/services/data/v" + version;
}
private InputStream request(Request request) {
Response response = null;
try {
@ -115,6 +119,11 @@ public class SalesforceRestService {
);
}
return response.body().byteStream();
} catch (ProcessException e) {
if (response != null) {
response.close();
}
throw e;
} catch (Exception e) {
if (response != null) {
response.close();

View File

@ -20,6 +20,8 @@ import org.apache.nifi.oauth2.StandardOauth2AccessTokenProvider;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processors.salesforce.util.CommonSalesforceProperties;
import org.apache.nifi.processors.salesforce.util.SalesforceConfigAware;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.serialization.record.MockRecordParser;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.util.MockFlowFile;
@ -32,6 +34,7 @@ import java.util.Collections;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
class PutSalesforceObjectIT implements SalesforceConfigAware {
@ -62,25 +65,71 @@ class PutSalesforceObjectIT implements SalesforceConfigAware {
reader.addSchemaField("numberOfEmployees", RecordFieldType.STRING);
reader.addSchemaField("industry", RecordFieldType.STRING);
reader.addRecord("SampleAccount1", "111111", "www.salesforce1.com", "100", "Banking");
reader.addRecord("SampleAccount2", "222222", "www.salesforce2.com", "200", "Banking");
reader.addRecord("SampleAccount1", "111111", "www.salesforce1.com", null, "Banking");
reader.addRecord("SampleAccount2", null, "www.salesforce2.com", "200", "Banking");
reader.addRecord("SampleAccount3", "333333", "www.salesforce3.com", "300", "Banking");
reader.addRecord("SampleAccount4", "444444", "www.salesforce4.com", "400", "Banking");
reader.addRecord("SampleAccount4", "444444", null, "400", "Banking");
reader.addRecord("SampleAccount5", "555555", "www.salesforce5.com", "500", "Banking");
runner.enqueue("", Collections.singletonMap("objectType", "Account"));
runner.addControllerService("reader", reader);
runner.enableControllerService(reader);
runner.setProperty(CommonSalesforceProperties.API_VERSION, VERSION);
runner.setProperty(CommonSalesforceProperties.API_URL, BASE_URL);
runner.setProperty(PutSalesforceObject.RECORD_READER_FACTORY, reader.getIdentifier());
configureProcessor(reader);
runner.run();
List<MockFlowFile> results = runner.getFlowFilesForRelationship(PutSalesforceObject.REL_SUCCESS);
assertEquals(1, results.size());
runner.assertProvenanceEvent(ProvenanceEventType.SEND);
}
@Test
void testMissingObjectType() throws Exception {
MockRecordParser reader = new MockRecordParser();
runner.enqueue("");
configureProcessor(reader);
runner.run();
List<MockFlowFile> results = runner.getFlowFilesForRelationship(PutSalesforceObject.REL_FAILURE);
assertEquals(1, results.size());
assertTrue(runner.getProvenanceEvents().isEmpty());
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutSalesforceObject.REL_FAILURE);
MockFlowFile ff0 = flowFiles.get(0);
ff0.assertAttributeExists("error.message");
}
@Test
void testErrorForInvalidRecordField() throws Exception {
MockRecordParser reader = new MockRecordParser();
reader.addSchemaField("invalidField", RecordFieldType.STRING);
reader.addRecord("invalidField");
runner.enqueue("", Collections.singletonMap("objectType", "Account"));
configureProcessor(reader);
runner.run();
List<MockFlowFile> results = runner.getFlowFilesForRelationship(PutSalesforceObject.REL_FAILURE);
assertEquals(1, results.size());
assertTrue(runner.getProvenanceEvents().isEmpty());
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutSalesforceObject.REL_FAILURE);
MockFlowFile ff0 = flowFiles.get(0);
ff0.assertAttributeExists("error.message");
}
private void configureProcessor(final MockRecordParser reader) throws InitializationException {
runner.addControllerService("reader", reader);
runner.enableControllerService(reader);
runner.setProperty(CommonSalesforceProperties.API_VERSION, VERSION);
runner.setProperty(CommonSalesforceProperties.API_URL, BASE_URL);
runner.setProperty(PutSalesforceObject.RECORD_READER_FACTORY, reader.getIdentifier());
}
}

View File

@ -20,6 +20,7 @@ import org.apache.nifi.oauth2.StandardOauth2AccessTokenProvider;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processors.salesforce.util.CommonSalesforceProperties;
import org.apache.nifi.processors.salesforce.util.SalesforceConfigAware;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.record.MockRecordWriter;
import org.apache.nifi.util.MockFlowFile;
@ -77,6 +78,7 @@ class QuerySalesforceObjectIT implements SalesforceConfigAware {
List<MockFlowFile> results = runner.getFlowFilesForRelationship(QuerySalesforceObject.REL_SUCCESS);
assertNotNull(results.get(0).getContent());
runner.assertProvenanceEvent(ProvenanceEventType.RECEIVE);
}
@Test