diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/FetchGoogleDrive.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/FetchGoogleDrive.java index 9804999854..70d97cc511 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/FetchGoogleDrive.java +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/FetchGoogleDrive.java @@ -38,21 +38,13 @@ import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processors.gcp.ProxyAwareTransportFactory; import org.apache.nifi.processors.gcp.util.GoogleUtils; import org.apache.nifi.proxy.ProxyConfiguration; -import org.apache.nifi.schema.access.SchemaNotFoundException; -import org.apache.nifi.serialization.MalformedRecordException; -import org.apache.nifi.serialization.RecordReader; -import org.apache.nifi.serialization.RecordReaderFactory; -import org.apache.nifi.serialization.record.Record; import java.io.IOException; import java.io.InputStream; import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; import java.util.HashSet; import java.util.List; -import java.util.Map; -import java.util.Optional; import java.util.Set; @InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) @@ -77,15 +69,6 @@ public class FetchGoogleDrive extends AbstractProcessor implements GoogleDriveTr .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); - static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder() - .name("record-reader") - .displayName("Record Reader") - .description("Specifies the Controller Service to use for reading incoming Google Driver File meta-data as NiFi Records." - + " If not set, the Processor expects as attributes of a separate flowfile for each File to fetch.") - .identifiesControllerService(RecordReaderFactory.class) - .required(false) - .build(); - public static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") @@ -97,22 +80,15 @@ public class FetchGoogleDrive extends AbstractProcessor implements GoogleDriveTr .description("A flowfile will be routed here for each File for which fetch was attempted but failed.") .build(); - public static final Relationship REL_INPUT_FAILURE = - new Relationship.Builder().name("input_failure") - .description("The incoming flowfile will be routed here if it's content could not be processed.") - .build(); - private static final List PROPERTIES = Collections.unmodifiableList(Arrays.asList( GoogleUtils.GCP_CREDENTIALS_PROVIDER_SERVICE, FILE_ID, - RECORD_READER, ProxyConfiguration.createProxyConfigPropertyDescriptor(false, ProxyAwareTransportFactory.PROXY_SPECS) )); public static final Set relationships = Collections.unmodifiableSet(new HashSet<>(Arrays.asList( REL_SUCCESS, - REL_FAILURE, - REL_INPUT_FAILURE + REL_FAILURE ))); private volatile Drive driveService; @@ -145,84 +121,36 @@ public class FetchGoogleDrive extends AbstractProcessor implements GoogleDriveTr return; } - if (context.getProperty(RECORD_READER).isSet()) { - RecordReaderFactory recordReaderFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class); + String fileId = context.getProperty(FILE_ID).evaluateAttributeExpressions(flowFile).getValue(); - try (InputStream inFlowFile = session.read(flowFile)) { - final Map flowFileAttributes = flowFile.getAttributes(); - final RecordReader reader = recordReaderFactory.createRecordReader(flowFileAttributes, inFlowFile, flowFile.getSize(), getLogger()); + FlowFile outFlowFile = flowFile; + try { + outFlowFile = fetchFile(fileId, session, outFlowFile); - Record record; - while ((record = reader.nextRecord()) != null) { - String fileId = record.getAsString(GoogleDriveFileInfo.ID); - FlowFile outFlowFile = session.create(flowFile); - try { - addAttributes(session, outFlowFile, record); - - fetchFile(fileId, session, outFlowFile); - - session.transfer(outFlowFile, REL_SUCCESS); - } catch (GoogleJsonResponseException e) { - handleErrorResponse(session, fileId, outFlowFile, e); - } catch (Exception e) { - handleUnexpectedError(session, outFlowFile, fileId, e); - } - } - session.remove(flowFile); - } catch (IOException | MalformedRecordException | SchemaNotFoundException e) { - getLogger().error("Couldn't read file metadata content as records from incoming flowfile", e); - - session.putAttribute(flowFile, ERROR_MESSAGE_ATTRIBUTE, e.getMessage()); - - session.transfer(flowFile, REL_INPUT_FAILURE); - } catch (Exception e) { - getLogger().error("Unexpected error while processing incoming flowfile", e); - - session.putAttribute(flowFile, ERROR_MESSAGE_ATTRIBUTE, e.getMessage()); - - session.transfer(flowFile, REL_INPUT_FAILURE); - } - } else { - String fileId = context.getProperty(FILE_ID).evaluateAttributeExpressions(flowFile).getValue(); - FlowFile outFlowFile = flowFile; - try { - fetchFile(fileId, session, outFlowFile); - - session.transfer(outFlowFile, REL_SUCCESS); - } catch (GoogleJsonResponseException e) { - handleErrorResponse(session, fileId, flowFile, e); - } catch (Exception e) { - handleUnexpectedError(session, flowFile, fileId, e); - } + session.transfer(outFlowFile, REL_SUCCESS); + } catch (GoogleJsonResponseException e) { + handleErrorResponse(session, fileId, flowFile, e); + } catch (Exception e) { + handleUnexpectedError(session, flowFile, fileId, e); } - session.commitAsync(); } - private void addAttributes(ProcessSession session, FlowFile outFlowFile, Record record) { - Map attributes = new HashMap<>(); - - for (GoogleDriveFlowFileAttribute attribute : GoogleDriveFlowFileAttribute.values()) { - Optional.ofNullable(attribute.getValue(record)) - .ifPresent(value -> attributes.put(attribute.getName(), value)); - } - - session.putAllAttributes(outFlowFile, attributes); - } - - void fetchFile(String fileId, ProcessSession session, FlowFile outFlowFile) throws IOException { + FlowFile fetchFile(String fileId, ProcessSession session, FlowFile outFlowFile) throws IOException { InputStream driveFileInputStream = driveService .files() .get(fileId) .executeMediaAsInputStream(); - session.importFrom(driveFileInputStream, outFlowFile); + outFlowFile = session.importFrom(driveFileInputStream, outFlowFile); + + return outFlowFile; } private void handleErrorResponse(ProcessSession session, String fileId, FlowFile outFlowFile, GoogleJsonResponseException e) { getLogger().error("Couldn't fetch file with id '{}'", fileId, e); - session.putAttribute(outFlowFile, ERROR_CODE_ATTRIBUTE, "" + e.getStatusCode()); - session.putAttribute(outFlowFile, ERROR_MESSAGE_ATTRIBUTE, e.getMessage()); + outFlowFile = session.putAttribute(outFlowFile, ERROR_CODE_ATTRIBUTE, "" + e.getStatusCode()); + outFlowFile = session.putAttribute(outFlowFile, ERROR_MESSAGE_ATTRIBUTE, e.getMessage()); session.transfer(outFlowFile, REL_FAILURE); } @@ -230,8 +158,7 @@ public class FetchGoogleDrive extends AbstractProcessor implements GoogleDriveTr private void handleUnexpectedError(ProcessSession session, FlowFile flowFile, String fileId, Exception e) { getLogger().error("Unexpected error while fetching and processing file with id '{}'", fileId, e); - session.putAttribute(flowFile, ERROR_CODE_ATTRIBUTE, "N/A"); - session.putAttribute(flowFile, ERROR_MESSAGE_ATTRIBUTE, e.getMessage()); + flowFile = session.putAttribute(flowFile, ERROR_MESSAGE_ATTRIBUTE, e.getMessage()); session.transfer(flowFile, REL_FAILURE); } diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/FetchGoogleDriveIT.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/FetchGoogleDriveIT.java index 48487b2609..471adffba5 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/FetchGoogleDriveIT.java +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/FetchGoogleDriveIT.java @@ -17,15 +17,9 @@ package org.apache.nifi.processors.gcp.drive; import com.google.api.services.drive.model.File; -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.json.JsonTreeReader; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.reporting.InitializationException; -import org.apache.nifi.serialization.RecordReaderFactory; import org.apache.nifi.util.MockFlowFile; import org.junit.jupiter.api.Test; -import java.io.IOException; import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; @@ -33,7 +27,6 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; /** * See Javadoc {@link AbstractGoogleDriveIT} for instructions how to run this test. @@ -47,7 +40,7 @@ public class FetchGoogleDriveIT extends AbstractGoogleDriveIT } @Test - void testFetchSingleFileByInputAttributes() throws Exception { + void testFetch() throws Exception { // GIVEN File file = createFileWithDefaultContent("test_file.txt", mainFolderId); @@ -64,7 +57,6 @@ public class FetchGoogleDriveIT extends AbstractGoogleDriveIT // THEN testRunner.assertTransferCount(FetchGoogleDrive.REL_FAILURE, 0); - testRunner.assertTransferCount(FetchGoogleDrive.REL_INPUT_FAILURE, 0); checkAttributes(FetchGoogleDrive.REL_SUCCESS, expectedAttributes); checkContent(FetchGoogleDrive.REL_SUCCESS, expectedContent); @@ -91,7 +83,6 @@ public class FetchGoogleDriveIT extends AbstractGoogleDriveIT // THEN testRunner.assertTransferCount(FetchGoogleDrive.REL_SUCCESS, 0); - testRunner.assertTransferCount(FetchGoogleDrive.REL_INPUT_FAILURE, 0); checkAttributes(FetchGoogleDrive.REL_FAILURE, expectedFailureAttributes); } @@ -126,10 +117,7 @@ public class FetchGoogleDriveIT extends AbstractGoogleDriveIT }; Set> expectedFailureAttributes = new HashSet<>(Arrays.asList( - new HashMap() {{ - putAll(inputFlowFileAttributes); - put("error.code", "N/A"); - }} + inputFlowFileAttributes )); // WHEN @@ -138,219 +126,10 @@ public class FetchGoogleDriveIT extends AbstractGoogleDriveIT // THEN testRunner.assertTransferCount(FetchGoogleDrive.REL_SUCCESS, 0); - testRunner.assertTransferCount(FetchGoogleDrive.REL_INPUT_FAILURE, 0); checkAttributes(FetchGoogleDrive.REL_FAILURE, expectedFailureAttributes); } - @Test - void testFetchMultipleFilesByInputRecords() throws Exception { - // GIVEN - addJsonRecordReaderFactory(); - - File file1 = createFile("test_file_1.txt", "test_content_1", mainFolderId); - File file2 = createFile("test_file_2.txt", "test_content_2", mainFolderId); - - String input = "[" + - "{" + - "\"drive.id\":\"" + file1.getId() + "\"," + - "\"filename\":\"" + file1.getName() + "\"" + - "}," + - "{" + - "\"drive.id\":\"" + file2.getId() + "\"," + - "\"filename\":\"" + file2.getName() + "\"" + - "}" + - "]"; - - List expectedContent = Arrays.asList( - "test_content_1", - "test_content_2" - ); - - Set> expectedAttributes = new HashSet<>(Arrays.asList( - new HashMap() {{ - put("drive.id", "" + file1.getId()); - put("filename", file1.getName()); - }}, - new HashMap() {{ - put("drive.id", "" + file2.getId()); - put("filename", file2.getName()); - }} - )); - - // WHEN - testRunner.enqueue(input); - testRunner.run(); - - // THEN - testRunner.assertTransferCount(FetchGoogleDrive.REL_FAILURE, 0); - testRunner.assertTransferCount(FetchGoogleDrive.REL_INPUT_FAILURE, 0); - - checkContent(FetchGoogleDrive.REL_SUCCESS, expectedContent); - checkAttributes(FetchGoogleDrive.REL_SUCCESS, expectedAttributes); - } - - @Test - void testInputRecordReferencesMissingFile() throws Exception { - // GIVEN - addJsonRecordReaderFactory(); - - String input = "[" + - "{" + - "\"drive.id\":\"missing\"," + - "\"filename\":\"missing_filename\"" + - "}" + - "]"; - - Set> expectedFailureAttributes = new HashSet<>(Arrays.asList( - new HashMap() {{ - put("drive.id", "missing"); - put("filename", "missing_filename"); - put("error.code", "404"); - }} - )); - - // WHEN - testRunner.enqueue(input); - testRunner.run(); - - // THEN - testRunner.assertTransferCount(FetchGoogleDrive.REL_SUCCESS, 0); - testRunner.assertTransferCount(FetchGoogleDrive.REL_INPUT_FAILURE, 0); - - checkAttributes(FetchGoogleDrive.REL_FAILURE, expectedFailureAttributes); - } - - @Test - void testInputRecordsAreInvalid() throws Exception { - // GIVEN - addJsonRecordReaderFactory(); - - String input = "invalid json"; - - List expectedContents = Arrays.asList("invalid json"); - - // WHEN - testRunner.enqueue(input); - testRunner.run(); - - // THEN - testRunner.assertTransferCount(FetchGoogleDrive.REL_SUCCESS, 0); - testRunner.assertTransferCount(FetchGoogleDrive.REL_FAILURE, 0); - - checkContent(FetchGoogleDrive.REL_INPUT_FAILURE, expectedContents); - } - - @Test - void testThrowExceptionBeforeRecordsAreProcessed() throws Exception { - // GIVEN - addJsonRecordReaderFactory(); - - File file = createFile("test_file.txt", mainFolderId); - - String validInputContent = "[" + - "{" + - "\"drive.id\":\"" + file.getId() + "\"," + - "\"filename\":\"" + file.getName() + "\"" + - "}" + - "]"; - - MockFlowFile input = new MockFlowFile(1) { - @Override - public Map getAttributes() { - throw new RuntimeException("Intentional exception"); - } - - @Override - public String getContent() { - return validInputContent; - } - }; - - List expectedContents = Arrays.asList(validInputContent); - - // WHEN - testRunner.enqueue(input); - testRunner.run(); - - // THEN - testRunner.assertTransferCount(FetchGoogleDrive.REL_SUCCESS, 0); - testRunner.assertTransferCount(FetchGoogleDrive.REL_FAILURE, 0); - - checkContent(FetchGoogleDrive.REL_INPUT_FAILURE, expectedContents); - } - - @Test - void testOneInputRecordOutOfManyThrowsUnexpectedException() throws Exception { - // GIVEN - AtomicReference fileIdToThrowException = new AtomicReference<>(); - - testSubject = new FetchGoogleDrive() { - @Override - void fetchFile(String fileId, ProcessSession session, FlowFile outFlowFile) throws IOException { - if (fileId.equals(fileIdToThrowException.get())) { - throw new RuntimeException(fileId + " intentionally forces exception"); - } - super.fetchFile(fileId, session, outFlowFile); - } - }; - testRunner = createTestRunner(); - - addJsonRecordReaderFactory(); - - File file1 = createFile("test_file_1.txt", "test_content_1", mainFolderId); - File file2 = createFile("test_file_2.txt", "test_content_2", mainFolderId); - - String input = "[" + - "{" + - "\"drive.id\":\"" + file1.getId() + "\"," + - "\"filename\":\"" + file1.getName() + "\"" + - "}," + - "{" + - "\"drive.id\":\"" + file2.getId() + "\"," + - "\"filename\":\"" + file2.getName() + "\"" + - "}" + - "]"; - - fileIdToThrowException.set(file2.getId()); - - Set> expectedSuccessAttributes = new HashSet<>(Arrays.asList( - new HashMap() {{ - put("drive.id", file1.getId()); - put("filename", file1.getName()); - }} - )); - List expectedSuccessContents = Arrays.asList("test_content_1"); - - Set> expectedFailureAttributes = new HashSet<>(Arrays.asList( - new HashMap() {{ - put("drive.id", file2.getId()); - put("filename", file2.getName()); - put(FetchGoogleDrive.ERROR_CODE_ATTRIBUTE, "N/A"); - }} - )); - - // WHEN - testRunner.enqueue(input); - testRunner.run(); - - // THEN - testRunner.assertTransferCount(FetchGoogleDrive.REL_INPUT_FAILURE, 0); - - checkAttributes(FetchGoogleDrive.REL_SUCCESS, expectedSuccessAttributes); - checkContent(FetchGoogleDrive.REL_SUCCESS, expectedSuccessContents); - - checkAttributes(FetchGoogleDrive.REL_FAILURE, expectedFailureAttributes); - checkContent(FetchGoogleDrive.REL_FAILURE, Arrays.asList("")); - } - - private void addJsonRecordReaderFactory() throws InitializationException { - RecordReaderFactory recordReader = new JsonTreeReader(); - testRunner.addControllerService("record_reader", recordReader); - testRunner.enableControllerService(recordReader); - testRunner.setProperty(FetchGoogleDrive.RECORD_READER, "record_reader"); - } - public Set getCheckedAttributeNames() { Set checkedAttributeNames = OutputChecker.super.getCheckedAttributeNames();