NIFI-10379 - In FetchGoogleDrive removed record-based input processing.

This closes #6327.

Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
This commit is contained in:
Tamas Palfy 2022-08-23 20:38:32 +02:00 committed by Peter Turcsanyi
parent 9c2a778fe4
commit 63905c5fc1
No known key found for this signature in database
GPG Key ID: 55A813F1C3E553DC
2 changed files with 19 additions and 313 deletions

View File

@ -38,21 +38,13 @@ import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.gcp.ProxyAwareTransportFactory;
import org.apache.nifi.processors.gcp.util.GoogleUtils;
import org.apache.nifi.proxy.ProxyConfiguration;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.record.Record;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@ -77,15 +69,6 @@ public class FetchGoogleDrive extends AbstractProcessor implements GoogleDriveTr
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
.name("record-reader")
.displayName("Record Reader")
.description("Specifies the Controller Service to use for reading incoming Google Driver File meta-data as NiFi Records."
+ " If not set, the Processor expects as attributes of a separate flowfile for each File to fetch.")
.identifiesControllerService(RecordReaderFactory.class)
.required(false)
.build();
public static final Relationship REL_SUCCESS =
new Relationship.Builder()
.name("success")
@ -97,22 +80,15 @@ public class FetchGoogleDrive extends AbstractProcessor implements GoogleDriveTr
.description("A flowfile will be routed here for each File for which fetch was attempted but failed.")
.build();
public static final Relationship REL_INPUT_FAILURE =
new Relationship.Builder().name("input_failure")
.description("The incoming flowfile will be routed here if it's content could not be processed.")
.build();
private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
GoogleUtils.GCP_CREDENTIALS_PROVIDER_SERVICE,
FILE_ID,
RECORD_READER,
ProxyConfiguration.createProxyConfigPropertyDescriptor(false, ProxyAwareTransportFactory.PROXY_SPECS)
));
public static final Set<Relationship> relationships = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
REL_SUCCESS,
REL_FAILURE,
REL_INPUT_FAILURE
REL_FAILURE
)));
private volatile Drive driveService;
@ -145,84 +121,36 @@ public class FetchGoogleDrive extends AbstractProcessor implements GoogleDriveTr
return;
}
if (context.getProperty(RECORD_READER).isSet()) {
RecordReaderFactory recordReaderFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
String fileId = context.getProperty(FILE_ID).evaluateAttributeExpressions(flowFile).getValue();
try (InputStream inFlowFile = session.read(flowFile)) {
final Map<String, String> flowFileAttributes = flowFile.getAttributes();
final RecordReader reader = recordReaderFactory.createRecordReader(flowFileAttributes, inFlowFile, flowFile.getSize(), getLogger());
FlowFile outFlowFile = flowFile;
try {
outFlowFile = fetchFile(fileId, session, outFlowFile);
Record record;
while ((record = reader.nextRecord()) != null) {
String fileId = record.getAsString(GoogleDriveFileInfo.ID);
FlowFile outFlowFile = session.create(flowFile);
try {
addAttributes(session, outFlowFile, record);
fetchFile(fileId, session, outFlowFile);
session.transfer(outFlowFile, REL_SUCCESS);
} catch (GoogleJsonResponseException e) {
handleErrorResponse(session, fileId, outFlowFile, e);
} catch (Exception e) {
handleUnexpectedError(session, outFlowFile, fileId, e);
}
}
session.remove(flowFile);
} catch (IOException | MalformedRecordException | SchemaNotFoundException e) {
getLogger().error("Couldn't read file metadata content as records from incoming flowfile", e);
session.putAttribute(flowFile, ERROR_MESSAGE_ATTRIBUTE, e.getMessage());
session.transfer(flowFile, REL_INPUT_FAILURE);
} catch (Exception e) {
getLogger().error("Unexpected error while processing incoming flowfile", e);
session.putAttribute(flowFile, ERROR_MESSAGE_ATTRIBUTE, e.getMessage());
session.transfer(flowFile, REL_INPUT_FAILURE);
}
} else {
String fileId = context.getProperty(FILE_ID).evaluateAttributeExpressions(flowFile).getValue();
FlowFile outFlowFile = flowFile;
try {
fetchFile(fileId, session, outFlowFile);
session.transfer(outFlowFile, REL_SUCCESS);
} catch (GoogleJsonResponseException e) {
handleErrorResponse(session, fileId, flowFile, e);
} catch (Exception e) {
handleUnexpectedError(session, flowFile, fileId, e);
}
session.transfer(outFlowFile, REL_SUCCESS);
} catch (GoogleJsonResponseException e) {
handleErrorResponse(session, fileId, flowFile, e);
} catch (Exception e) {
handleUnexpectedError(session, flowFile, fileId, e);
}
session.commitAsync();
}
private void addAttributes(ProcessSession session, FlowFile outFlowFile, Record record) {
Map<String, String> attributes = new HashMap<>();
for (GoogleDriveFlowFileAttribute attribute : GoogleDriveFlowFileAttribute.values()) {
Optional.ofNullable(attribute.getValue(record))
.ifPresent(value -> attributes.put(attribute.getName(), value));
}
session.putAllAttributes(outFlowFile, attributes);
}
void fetchFile(String fileId, ProcessSession session, FlowFile outFlowFile) throws IOException {
FlowFile fetchFile(String fileId, ProcessSession session, FlowFile outFlowFile) throws IOException {
InputStream driveFileInputStream = driveService
.files()
.get(fileId)
.executeMediaAsInputStream();
session.importFrom(driveFileInputStream, outFlowFile);
outFlowFile = session.importFrom(driveFileInputStream, outFlowFile);
return outFlowFile;
}
private void handleErrorResponse(ProcessSession session, String fileId, FlowFile outFlowFile, GoogleJsonResponseException e) {
getLogger().error("Couldn't fetch file with id '{}'", fileId, e);
session.putAttribute(outFlowFile, ERROR_CODE_ATTRIBUTE, "" + e.getStatusCode());
session.putAttribute(outFlowFile, ERROR_MESSAGE_ATTRIBUTE, e.getMessage());
outFlowFile = session.putAttribute(outFlowFile, ERROR_CODE_ATTRIBUTE, "" + e.getStatusCode());
outFlowFile = session.putAttribute(outFlowFile, ERROR_MESSAGE_ATTRIBUTE, e.getMessage());
session.transfer(outFlowFile, REL_FAILURE);
}
@ -230,8 +158,7 @@ public class FetchGoogleDrive extends AbstractProcessor implements GoogleDriveTr
private void handleUnexpectedError(ProcessSession session, FlowFile flowFile, String fileId, Exception e) {
getLogger().error("Unexpected error while fetching and processing file with id '{}'", fileId, e);
session.putAttribute(flowFile, ERROR_CODE_ATTRIBUTE, "N/A");
session.putAttribute(flowFile, ERROR_MESSAGE_ATTRIBUTE, e.getMessage());
flowFile = session.putAttribute(flowFile, ERROR_MESSAGE_ATTRIBUTE, e.getMessage());
session.transfer(flowFile, REL_FAILURE);
}

View File

@ -17,15 +17,9 @@
package org.apache.nifi.processors.gcp.drive;
import com.google.api.services.drive.model.File;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.json.JsonTreeReader;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.util.MockFlowFile;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
@ -33,7 +27,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
/**
* See Javadoc {@link AbstractGoogleDriveIT} for instructions how to run this test.
@ -47,7 +40,7 @@ public class FetchGoogleDriveIT extends AbstractGoogleDriveIT<FetchGoogleDrive>
}
@Test
void testFetchSingleFileByInputAttributes() throws Exception {
void testFetch() throws Exception {
// GIVEN
File file = createFileWithDefaultContent("test_file.txt", mainFolderId);
@ -64,7 +57,6 @@ public class FetchGoogleDriveIT extends AbstractGoogleDriveIT<FetchGoogleDrive>
// THEN
testRunner.assertTransferCount(FetchGoogleDrive.REL_FAILURE, 0);
testRunner.assertTransferCount(FetchGoogleDrive.REL_INPUT_FAILURE, 0);
checkAttributes(FetchGoogleDrive.REL_SUCCESS, expectedAttributes);
checkContent(FetchGoogleDrive.REL_SUCCESS, expectedContent);
@ -91,7 +83,6 @@ public class FetchGoogleDriveIT extends AbstractGoogleDriveIT<FetchGoogleDrive>
// THEN
testRunner.assertTransferCount(FetchGoogleDrive.REL_SUCCESS, 0);
testRunner.assertTransferCount(FetchGoogleDrive.REL_INPUT_FAILURE, 0);
checkAttributes(FetchGoogleDrive.REL_FAILURE, expectedFailureAttributes);
}
@ -126,10 +117,7 @@ public class FetchGoogleDriveIT extends AbstractGoogleDriveIT<FetchGoogleDrive>
};
Set<Map<String, String>> expectedFailureAttributes = new HashSet<>(Arrays.asList(
new HashMap<String, String>() {{
putAll(inputFlowFileAttributes);
put("error.code", "N/A");
}}
inputFlowFileAttributes
));
// WHEN
@ -138,219 +126,10 @@ public class FetchGoogleDriveIT extends AbstractGoogleDriveIT<FetchGoogleDrive>
// THEN
testRunner.assertTransferCount(FetchGoogleDrive.REL_SUCCESS, 0);
testRunner.assertTransferCount(FetchGoogleDrive.REL_INPUT_FAILURE, 0);
checkAttributes(FetchGoogleDrive.REL_FAILURE, expectedFailureAttributes);
}
@Test
void testFetchMultipleFilesByInputRecords() throws Exception {
// GIVEN
addJsonRecordReaderFactory();
File file1 = createFile("test_file_1.txt", "test_content_1", mainFolderId);
File file2 = createFile("test_file_2.txt", "test_content_2", mainFolderId);
String input = "[" +
"{" +
"\"drive.id\":\"" + file1.getId() + "\"," +
"\"filename\":\"" + file1.getName() + "\"" +
"}," +
"{" +
"\"drive.id\":\"" + file2.getId() + "\"," +
"\"filename\":\"" + file2.getName() + "\"" +
"}" +
"]";
List<String> expectedContent = Arrays.asList(
"test_content_1",
"test_content_2"
);
Set<Map<String, String>> expectedAttributes = new HashSet<>(Arrays.asList(
new HashMap<String, String>() {{
put("drive.id", "" + file1.getId());
put("filename", file1.getName());
}},
new HashMap<String, String>() {{
put("drive.id", "" + file2.getId());
put("filename", file2.getName());
}}
));
// WHEN
testRunner.enqueue(input);
testRunner.run();
// THEN
testRunner.assertTransferCount(FetchGoogleDrive.REL_FAILURE, 0);
testRunner.assertTransferCount(FetchGoogleDrive.REL_INPUT_FAILURE, 0);
checkContent(FetchGoogleDrive.REL_SUCCESS, expectedContent);
checkAttributes(FetchGoogleDrive.REL_SUCCESS, expectedAttributes);
}
@Test
void testInputRecordReferencesMissingFile() throws Exception {
// GIVEN
addJsonRecordReaderFactory();
String input = "[" +
"{" +
"\"drive.id\":\"missing\"," +
"\"filename\":\"missing_filename\"" +
"}" +
"]";
Set<Map<String, String>> expectedFailureAttributes = new HashSet<>(Arrays.asList(
new HashMap<String, String>() {{
put("drive.id", "missing");
put("filename", "missing_filename");
put("error.code", "404");
}}
));
// WHEN
testRunner.enqueue(input);
testRunner.run();
// THEN
testRunner.assertTransferCount(FetchGoogleDrive.REL_SUCCESS, 0);
testRunner.assertTransferCount(FetchGoogleDrive.REL_INPUT_FAILURE, 0);
checkAttributes(FetchGoogleDrive.REL_FAILURE, expectedFailureAttributes);
}
@Test
void testInputRecordsAreInvalid() throws Exception {
// GIVEN
addJsonRecordReaderFactory();
String input = "invalid json";
List<String> expectedContents = Arrays.asList("invalid json");
// WHEN
testRunner.enqueue(input);
testRunner.run();
// THEN
testRunner.assertTransferCount(FetchGoogleDrive.REL_SUCCESS, 0);
testRunner.assertTransferCount(FetchGoogleDrive.REL_FAILURE, 0);
checkContent(FetchGoogleDrive.REL_INPUT_FAILURE, expectedContents);
}
@Test
void testThrowExceptionBeforeRecordsAreProcessed() throws Exception {
// GIVEN
addJsonRecordReaderFactory();
File file = createFile("test_file.txt", mainFolderId);
String validInputContent = "[" +
"{" +
"\"drive.id\":\"" + file.getId() + "\"," +
"\"filename\":\"" + file.getName() + "\"" +
"}" +
"]";
MockFlowFile input = new MockFlowFile(1) {
@Override
public Map<String, String> getAttributes() {
throw new RuntimeException("Intentional exception");
}
@Override
public String getContent() {
return validInputContent;
}
};
List<String> expectedContents = Arrays.asList(validInputContent);
// WHEN
testRunner.enqueue(input);
testRunner.run();
// THEN
testRunner.assertTransferCount(FetchGoogleDrive.REL_SUCCESS, 0);
testRunner.assertTransferCount(FetchGoogleDrive.REL_FAILURE, 0);
checkContent(FetchGoogleDrive.REL_INPUT_FAILURE, expectedContents);
}
@Test
void testOneInputRecordOutOfManyThrowsUnexpectedException() throws Exception {
// GIVEN
AtomicReference<String> fileIdToThrowException = new AtomicReference<>();
testSubject = new FetchGoogleDrive() {
@Override
void fetchFile(String fileId, ProcessSession session, FlowFile outFlowFile) throws IOException {
if (fileId.equals(fileIdToThrowException.get())) {
throw new RuntimeException(fileId + " intentionally forces exception");
}
super.fetchFile(fileId, session, outFlowFile);
}
};
testRunner = createTestRunner();
addJsonRecordReaderFactory();
File file1 = createFile("test_file_1.txt", "test_content_1", mainFolderId);
File file2 = createFile("test_file_2.txt", "test_content_2", mainFolderId);
String input = "[" +
"{" +
"\"drive.id\":\"" + file1.getId() + "\"," +
"\"filename\":\"" + file1.getName() + "\"" +
"}," +
"{" +
"\"drive.id\":\"" + file2.getId() + "\"," +
"\"filename\":\"" + file2.getName() + "\"" +
"}" +
"]";
fileIdToThrowException.set(file2.getId());
Set<Map<String, String>> expectedSuccessAttributes = new HashSet<>(Arrays.asList(
new HashMap<String, String>() {{
put("drive.id", file1.getId());
put("filename", file1.getName());
}}
));
List<String> expectedSuccessContents = Arrays.asList("test_content_1");
Set<Map<String, String>> expectedFailureAttributes = new HashSet<>(Arrays.asList(
new HashMap<String, String>() {{
put("drive.id", file2.getId());
put("filename", file2.getName());
put(FetchGoogleDrive.ERROR_CODE_ATTRIBUTE, "N/A");
}}
));
// WHEN
testRunner.enqueue(input);
testRunner.run();
// THEN
testRunner.assertTransferCount(FetchGoogleDrive.REL_INPUT_FAILURE, 0);
checkAttributes(FetchGoogleDrive.REL_SUCCESS, expectedSuccessAttributes);
checkContent(FetchGoogleDrive.REL_SUCCESS, expectedSuccessContents);
checkAttributes(FetchGoogleDrive.REL_FAILURE, expectedFailureAttributes);
checkContent(FetchGoogleDrive.REL_FAILURE, Arrays.asList(""));
}
private void addJsonRecordReaderFactory() throws InitializationException {
RecordReaderFactory recordReader = new JsonTreeReader();
testRunner.addControllerService("record_reader", recordReader);
testRunner.enableControllerService(recordReader);
testRunner.setProperty(FetchGoogleDrive.RECORD_READER, "record_reader");
}
public Set<String> getCheckedAttributeNames() {
Set<String> checkedAttributeNames = OutputChecker.super.getCheckedAttributeNames();