diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java index f985e4d19c..2d6a446d06 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java @@ -43,6 +43,7 @@ import org.apache.nifi.components.ValidationResult; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.DataUnit; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.exception.FlowFileAccessException; @@ -82,6 +83,7 @@ public class FetchS3Object extends AbstractS3Processor { .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .required(false) .build(); + public static final PropertyDescriptor REQUESTER_PAYS = new PropertyDescriptor.Builder() .name("requester-pays") .displayName("Requester Pays") @@ -95,10 +97,30 @@ public class FetchS3Object extends AbstractS3Processor { .defaultValue("false") .build(); + public static final PropertyDescriptor RANGE_START = new PropertyDescriptor.Builder() + .name("range-start") + .displayName("Range Start") + .description("The byte position at which to start reading from the object. An empty value or a value of " + + "zero will start reading at the beginning of the object.") + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .required(false) + .build(); + + public static final PropertyDescriptor RANGE_LENGTH = new PropertyDescriptor.Builder() + .name("range-length") + .displayName("Range Length") + .description("The number of bytes to download from the object, starting from the Range Start. An empty " + + "value or a value that extends beyond the end of the object will read to the end of the object.") + .addValidator(StandardValidators.createDataSizeBoundsValidator(1, Long.MAX_VALUE)) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .required(false) + .build(); + public static final List properties = Collections.unmodifiableList( Arrays.asList(BUCKET, KEY, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, VERSION_ID, SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE, SIGNER_OVERRIDE, ENCRYPTION_SERVICE, PROXY_CONFIGURATION_SERVICE, PROXY_HOST, - PROXY_HOST_PORT, PROXY_USERNAME, PROXY_PASSWORD, REQUESTER_PAYS)); + PROXY_HOST_PORT, PROXY_USERNAME, PROXY_PASSWORD, REQUESTER_PAYS, RANGE_START, RANGE_LENGTH)); @Override protected List getSupportedPropertyDescriptors() { @@ -138,6 +160,8 @@ public class FetchS3Object extends AbstractS3Processor { final String key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue(); final String versionId = context.getProperty(VERSION_ID).evaluateAttributeExpressions(flowFile).getValue(); final boolean requesterPays = context.getProperty(REQUESTER_PAYS).asBoolean(); + final long rangeStart = (context.getProperty(RANGE_START).isSet() ? context.getProperty(RANGE_START).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B).longValue() : 0L); + final Long rangeLength = (context.getProperty(RANGE_LENGTH).isSet() ? context.getProperty(RANGE_LENGTH).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B).longValue() : null); final AmazonS3 client = getClient(); final GetObjectRequest request; @@ -147,6 +171,11 @@ public class FetchS3Object extends AbstractS3Processor { request = new GetObjectRequest(bucket, key, versionId); } request.setRequesterPays(requesterPays); + if (rangeLength != null) { + request.setRange(rangeStart, rangeStart + rangeLength - 1); + } else { + request.setRange(rangeStart); + } final Map attributes = new HashMap<>(); diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java index ed3fdfe3c1..25a1e4de64 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java @@ -268,7 +268,7 @@ public class TestFetchS3Object { public void testGetPropertyDescriptors() throws Exception { FetchS3Object processor = new FetchS3Object(); List pd = processor.getSupportedPropertyDescriptors(); - assertEquals("size should be eq", 19, pd.size()); + assertEquals("size should be eq", 21, pd.size()); assertTrue(pd.contains(FetchS3Object.ACCESS_KEY)); assertTrue(pd.contains(FetchS3Object.AWS_CREDENTIALS_PROVIDER_SERVICE)); assertTrue(pd.contains(FetchS3Object.BUCKET)); diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage.java index 2300cea472..11996f3432 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage.java @@ -18,7 +18,9 @@ package org.apache.nifi.processors.azure.storage; import java.io.IOException; import java.net.URISyntaxException; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.TimeUnit; @@ -32,10 +34,14 @@ import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.DataUnit; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processors.azure.AbstractAzureBlobProcessor; import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; @@ -53,6 +59,34 @@ import com.microsoft.azure.storage.blob.CloudBlobContainer; }) public class FetchAzureBlobStorage extends AbstractAzureBlobProcessor { + public static final PropertyDescriptor RANGE_START = new PropertyDescriptor.Builder() + .name("range-start") + .displayName("Range Start") + .description("The byte position at which to start reading from the blob. An empty value or a value of " + + "zero will start reading at the beginning of the blob.") + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .required(false) + .build(); + + public static final PropertyDescriptor RANGE_LENGTH = new PropertyDescriptor.Builder() + .name("range-length") + .displayName("Range Length") + .description("The number of bytes to download from the blob, starting from the Range Start. An empty " + + "value or a value that extends beyond the end of the blob will read to the end of the blob.") + .addValidator(StandardValidators.createDataSizeBoundsValidator(1, Long.MAX_VALUE)) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .required(false) + .build(); + + @Override + public List getSupportedPropertyDescriptors() { + List properties = new ArrayList(super.getSupportedPropertyDescriptors()); + properties.add(RANGE_START); + properties.add(RANGE_LENGTH); + return properties; + } + @Override public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { FlowFile flowFile = session.get(); @@ -62,8 +96,10 @@ public class FetchAzureBlobStorage extends AbstractAzureBlobProcessor { final long startNanos = System.nanoTime(); - String containerName = context.getProperty(AzureStorageUtils.CONTAINER).evaluateAttributeExpressions(flowFile).getValue(); - String blobPath = context.getProperty(BLOB).evaluateAttributeExpressions(flowFile).getValue(); + final String containerName = context.getProperty(AzureStorageUtils.CONTAINER).evaluateAttributeExpressions(flowFile).getValue(); + final String blobPath = context.getProperty(BLOB).evaluateAttributeExpressions(flowFile).getValue(); + final long rangeStart = (context.getProperty(RANGE_START).isSet() ? context.getProperty(RANGE_START).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B).longValue() : 0L); + final Long rangeLength = (context.getProperty(RANGE_LENGTH).isSet() ? context.getProperty(RANGE_LENGTH).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B).longValue() : null); AtomicReference storedException = new AtomicReference<>(); try { @@ -80,7 +116,7 @@ public class FetchAzureBlobStorage extends AbstractAzureBlobProcessor { // distribution of download over threads, investigate flowFile = session.write(flowFile, os -> { try { - blob.download(os, null, null, operationContext); + blob.downloadRange(rangeStart, rangeLength, os, null, null, operationContext); } catch (StorageException e) { storedException.set(e); throw new IOException(e); diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureDataLakeStorage.java index a4febcb1ba..0e98858e16 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureDataLakeStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureDataLakeStorage.java @@ -16,21 +16,30 @@ */ package org.apache.nifi.processors.azure.storage; +import com.azure.core.util.Context; import com.azure.storage.file.datalake.DataLakeDirectoryClient; import com.azure.storage.file.datalake.DataLakeFileClient; import com.azure.storage.file.datalake.DataLakeFileSystemClient; import com.azure.storage.file.datalake.DataLakeServiceClient; +import com.azure.storage.file.datalake.models.DownloadRetryOptions; +import com.azure.storage.file.datalake.models.FileRange; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.DataUnit; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.TimeUnit; @Tags({"azure", "microsoft", "cloud", "storage", "adlsgen2", "datalake"}) @@ -39,6 +48,45 @@ import java.util.concurrent.TimeUnit; @InputRequirement(Requirement.INPUT_REQUIRED) public class FetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcessor { + public static final PropertyDescriptor RANGE_START = new PropertyDescriptor.Builder() + .name("range-start") + .displayName("Range Start") + .description("The byte position at which to start reading from the object. An empty value or a value of " + + "zero will start reading at the beginning of the object.") + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .required(false) + .build(); + + public static final PropertyDescriptor RANGE_LENGTH = new PropertyDescriptor.Builder() + .name("range-length") + .displayName("Range Length") + .description("The number of bytes to download from the object, starting from the Range Start. An empty " + + "value or a value that extends beyond the end of the object will read to the end of the object.") + .addValidator(StandardValidators.createDataSizeBoundsValidator(1, Long.MAX_VALUE)) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .required(false) + .build(); + + public static final PropertyDescriptor NUM_RETRIES = new PropertyDescriptor.Builder() + .name("number-of-retries") + .displayName("Number of Retries") + .description("The number of automatic retries to perform if the download fails.") + .addValidator(StandardValidators.createLongValidator(0L, Integer.MAX_VALUE, true)) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .required(false) + .defaultValue("0") + .build(); + + @Override + public List getSupportedPropertyDescriptors() { + List properties = new ArrayList(super.getSupportedPropertyDescriptors()); + properties.add(RANGE_START); + properties.add(RANGE_LENGTH); + properties.add(NUM_RETRIES); + return properties; + } + @Override public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { FlowFile flowFile = session.get(); @@ -48,10 +96,16 @@ public class FetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageProce final long startNanos = System.nanoTime(); try { + final long rangeStart = (context.getProperty(RANGE_START).isSet() ? context.getProperty(RANGE_START).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B).longValue() : 0L); + final Long rangeLength = (context.getProperty(RANGE_LENGTH).isSet() ? context.getProperty(RANGE_LENGTH).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B).longValue() : null); + final int numRetries = (context.getProperty(NUM_RETRIES).isSet() ? context.getProperty(NUM_RETRIES).evaluateAttributeExpressions(flowFile).asInteger() : 0); + final FileRange fileRange = new FileRange(rangeStart, rangeLength); + final DownloadRetryOptions retryOptions = new DownloadRetryOptions(); + retryOptions.setMaxRetryRequests(numRetries); + final String fileSystem = evaluateFileSystemProperty(context, flowFile); final String directory = evaluateDirectoryProperty(context, flowFile); final String fileName = evaluateFileNameProperty(context, flowFile); - final DataLakeServiceClient storageClient = getStorageClient(context, flowFile); final DataLakeFileSystemClient fileSystemClient = storageClient.getFileSystemClient(fileSystem); final DataLakeDirectoryClient directoryClient = fileSystemClient.getDirectoryClient(directory); @@ -61,7 +115,7 @@ public class FetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageProce throw new ProcessException(FILE.getDisplayName() + " (" + fileName + ") points to a directory. Full path: " + fileClient.getFilePath()); } - flowFile = session.write(flowFile, os -> fileClient.read(os)); + flowFile = session.write(flowFile, os -> fileClient.readWithResponse(os, fileRange, retryOptions, null, false, null, Context.NONE)); session.getProvenanceReporter().modifyContent(flowFile); session.transfer(flowFile, REL_SUCCESS); @@ -73,4 +127,4 @@ public class FetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageProce session.transfer(flowFile, REL_FAILURE); } } -} \ No newline at end of file +} diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureBlobStorageIT.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureBlobStorageIT.java index 714688f112..3685a1c7b2 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureBlobStorageIT.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureBlobStorageIT.java @@ -32,6 +32,7 @@ public abstract class AbstractAzureBlobStorageIT extends AbstractAzureStorageIT protected static final String TEST_CONTAINER_NAME_PREFIX = "nifi-test-container"; protected static final String TEST_BLOB_NAME = "nifi-test-blob"; protected static final String TEST_FILE_NAME = "nifi-test-file"; + protected static final String TEST_FILE_CONTENT = "ABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890"; protected CloudBlobContainer container; @@ -52,8 +53,8 @@ public abstract class AbstractAzureBlobStorageIT extends AbstractAzureStorageIT protected void uploadTestBlob() throws Exception { CloudBlob blob = container.getBlockBlobReference(TEST_BLOB_NAME); - byte[] buf = "0123456789".getBytes(); + byte[] buf = TEST_FILE_CONTENT.getBytes(); InputStream in = new ByteArrayInputStream(buf); - blob.upload(in, 10); + blob.upload(in, buf.length); } } diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureDataLakeStorageIT.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureDataLakeStorageIT.java index fccdbf92c0..8da1892da9 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureDataLakeStorageIT.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureDataLakeStorageIT.java @@ -36,7 +36,8 @@ import java.util.UUID; public abstract class AbstractAzureDataLakeStorageIT extends AbstractAzureStorageIT { private static final String FILESYSTEM_NAME_PREFIX = "nifi-test-filesystem"; - private static final String TEST_FILE_CONTENT = "test"; + + protected static final String TEST_FILE_CONTENT = "ABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890"; protected String fileSystemName; protected DataLakeFileSystemClient fileSystemClient; diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureBlobStorage.java index 873390cb61..2acda67b86 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureBlobStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureBlobStorage.java @@ -47,6 +47,71 @@ public class ITFetchAzureBlobStorage extends AbstractAzureBlobStorageIT { assertResult(); } + @Test + public void testFetchBlobWithRangeZeroOne() throws Exception { + runner.setProperty(FetchAzureBlobStorage.RANGE_START, "0B"); + runner.setProperty(FetchAzureBlobStorage.RANGE_LENGTH, "1B"); + runner.assertValid(); + runner.enqueue(new byte[0]); + runner.run(); + + assertResult(TEST_FILE_CONTENT.substring(0, 1)); + } + + @Test + public void testFetchBlobWithRangeOneOne() throws Exception { + runner.setProperty(FetchAzureBlobStorage.RANGE_START, "1B"); + runner.setProperty(FetchAzureBlobStorage.RANGE_LENGTH, "1B"); + runner.assertValid(); + runner.enqueue(new byte[0]); + runner.run(); + + assertResult(TEST_FILE_CONTENT.substring(1, 2)); + } + + @Test + public void testFetchBlobWithRangeTwentyThreeTwentySix() throws Exception { + runner.setProperty(FetchAzureBlobStorage.RANGE_START, "23B"); + runner.setProperty(FetchAzureBlobStorage.RANGE_LENGTH, "3B"); + runner.assertValid(); + runner.enqueue(new byte[0]); + runner.run(); + + assertResult(TEST_FILE_CONTENT.substring(23, 26)); + } + + @Test + public void testFetchBlobWithRangeLengthGreater() throws Exception { + runner.setProperty(FetchAzureBlobStorage.RANGE_START, "0B"); + runner.setProperty(FetchAzureBlobStorage.RANGE_LENGTH, "1KB"); + runner.assertValid(); + runner.enqueue(new byte[0]); + runner.run(); + + assertResult(TEST_FILE_CONTENT); + } + + @Test + public void testFetchBlobWithRangeLengthUnset() throws Exception { + runner.setProperty(FetchAzureBlobStorage.RANGE_START, "0B"); + runner.assertValid(); + runner.enqueue(new byte[0]); + runner.run(); + + assertResult(TEST_FILE_CONTENT); + } + + @Test + public void testFetchBlobWithRangeStartOutOfRange() throws Exception { + runner.setProperty(FetchAzureBlobStorage.RANGE_START, String.format("%sB", TEST_FILE_CONTENT.length() + 1)); + runner.setProperty(FetchAzureBlobStorage.RANGE_LENGTH, "1B"); + runner.assertValid(); + runner.enqueue(new byte[0]); + runner.run(); + + runner.assertAllFlowFilesTransferred(AbstractAzureBlobProcessor.REL_FAILURE, 1); + } + @Test public void testFetchBlobUsingCredentialService() throws Exception { configureCredentialsService(); @@ -59,11 +124,15 @@ public class ITFetchAzureBlobStorage extends AbstractAzureBlobStorageIT { } private void assertResult() throws Exception { + assertResult(TEST_FILE_CONTENT); + } + + private void assertResult(final String expectedContent) throws Exception { runner.assertAllFlowFilesTransferred(AbstractAzureBlobProcessor.REL_SUCCESS, 1); List flowFilesForRelationship = runner.getFlowFilesForRelationship(FetchAzureBlobStorage.REL_SUCCESS); for (MockFlowFile flowFile : flowFilesForRelationship) { - flowFile.assertContentEquals("0123456789".getBytes()); - flowFile.assertAttributeEquals("azure.length", "10"); + flowFile.assertContentEquals(expectedContent); + flowFile.assertAttributeEquals("azure.length", String.valueOf(TEST_FILE_CONTENT.length())); } } } diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureDataLakeStorage.java index 4dc91237ff..8edcdd3c46 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureDataLakeStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureDataLakeStorage.java @@ -47,14 +47,13 @@ public class ITFetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT // GIVEN String directory = "TestDirectory"; String filename = "testFile.txt"; - String fileContent = "AzureFileContent"; String inputFlowFileContent = "InputFlowFileContent"; - createDirectoryAndUploadFile(directory, filename, fileContent); + createDirectoryAndUploadFile(directory, filename, TEST_FILE_CONTENT); // WHEN // THEN - testSuccessfulFetch(fileSystemName, directory, filename, inputFlowFileContent, fileContent); + testSuccessfulFetch(fileSystemName, directory, filename, inputFlowFileContent, TEST_FILE_CONTENT); } @Test @@ -62,14 +61,13 @@ public class ITFetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT // GIVEN String directory= ""; String filename = "testFile.txt"; - String fileContent = "AzureFileContent"; String inputFlowFileContent = "InputFlowFileContent"; - uploadFile(directory, filename, fileContent); + uploadFile(directory, filename, TEST_FILE_CONTENT); // WHEN // THEN - testSuccessfulFetch(fileSystemName, directory, filename, inputFlowFileContent, fileContent); + testSuccessfulFetch(fileSystemName, directory, filename, inputFlowFileContent, TEST_FILE_CONTENT); } @Test @@ -77,14 +75,13 @@ public class ITFetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT // GIVEN String directory= "A Test Directory"; String filename = "testFile.txt"; - String fileContent = "AzureFileContent"; String inputFlowFileContent = "InputFlowFileContent"; - createDirectoryAndUploadFile(directory, filename, fileContent); + createDirectoryAndUploadFile(directory, filename, TEST_FILE_CONTENT); // WHEN // THEN - testSuccessfulFetch(fileSystemName, directory, filename, inputFlowFileContent, fileContent); + testSuccessfulFetch(fileSystemName, directory, filename, inputFlowFileContent, TEST_FILE_CONTENT); } @Test @@ -92,14 +89,13 @@ public class ITFetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT // GIVEN String directory= "TestDirectory"; String filename = "A test file.txt"; - String fileContent = "AzureFileContent"; String inputFlowFileContent = "InputFlowFileContent"; - createDirectoryAndUploadFile(directory, filename, fileContent); + createDirectoryAndUploadFile(directory, filename, TEST_FILE_CONTENT); // WHEN // THEN - testSuccessfulFetch(fileSystemName, directory, filename, inputFlowFileContent, fileContent); + testSuccessfulFetch(fileSystemName, directory, filename, inputFlowFileContent, TEST_FILE_CONTENT); } @Test @@ -152,14 +148,13 @@ public class ITFetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT + "Directory08/Directory09/Directory10/Directory11/Directory12/Directory13/Directory14/Directory15/" + "Directory16/Directory17/Directory18/Directory19/Directory20/TestDirectory"; String filename = "testFile.txt"; - String fileContent = "AzureFileContent"; String inputFlowFileContent = "InputFlowFileContent"; - createDirectoryAndUploadFile(directory, filename, fileContent); + createDirectoryAndUploadFile(directory, filename, TEST_FILE_CONTENT); // WHEN // THEN - testSuccessfulFetch(fileSystemName, directory, filename, inputFlowFileContent, fileContent); + testSuccessfulFetch(fileSystemName, directory, filename, inputFlowFileContent, TEST_FILE_CONTENT); } @Test @@ -168,10 +163,9 @@ public class ITFetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT String parentDirectory = "ParentDirectory"; String childDirectory = "ChildDirectory"; String filename = "testFile.txt"; - String fileContent = "AzureFileContent"; String inputFlowFileContent = "InputFlowFileContent"; - createDirectoryAndUploadFile(parentDirectory + "/" + childDirectory, filename, fileContent); + createDirectoryAndUploadFile(parentDirectory + "/" + childDirectory, filename, TEST_FILE_CONTENT); // WHEN // THEN @@ -229,11 +223,11 @@ public class ITFetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT String fileContent = new String(fileContentBytes); String inputFlowFileContent = "InputFlowFileContent"; - createDirectoryAndUploadFile(directory, filename, fileContent); + createDirectoryAndUploadFile(directory, filename, TEST_FILE_CONTENT); // WHEN // THEN - testSuccessfulFetch(fileSystemName, directory, filename, inputFlowFileContent, fileContent); + testSuccessfulFetch(fileSystemName, directory, filename, inputFlowFileContent, TEST_FILE_CONTENT); } @Test @@ -242,10 +236,9 @@ public class ITFetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT String directory = "TestDirectory"; String invalidDirectoryName = "Test/\\Directory"; String filename = "testFile.txt"; - String fileContent = "AzureFileContent"; String inputFlowFileContent = "InputFlowFileContent"; - createDirectoryAndUploadFile(directory, filename, fileContent); + createDirectoryAndUploadFile(directory, filename, TEST_FILE_CONTENT); // WHEN // THEN @@ -258,10 +251,9 @@ public class ITFetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT String directory = "TestDirectory"; String filename = "testFile.txt"; String invalidFilename = "test/\\File.txt"; - String fileContent = "AzureFileContent"; String inputFlowFileContent = "InputFlowFileContent"; - createDirectoryAndUploadFile(directory, filename, fileContent); + createDirectoryAndUploadFile(directory, filename, TEST_FILE_CONTENT); // WHEN // THEN @@ -277,7 +269,6 @@ public class ITFetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT String directory = "TestDirectory"; String filename = "testFile.txt"; - String fileContent = "AzureFileContent"; String inputFlowFileContent = "InputFlowFileContent"; @@ -286,7 +277,7 @@ public class ITFetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT attributes.put(expLangDirectory, directory); attributes.put(expLangFilename, filename); - createDirectoryAndUploadFile(directory, filename, fileContent); + createDirectoryAndUploadFile(directory, filename, TEST_FILE_CONTENT); // WHEN // THEN @@ -295,7 +286,7 @@ public class ITFetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT "${" + expLangFilename + "}", attributes, inputFlowFileContent, - fileContent); + TEST_FILE_CONTENT); } @Test @@ -307,7 +298,6 @@ public class ITFetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT String directory = "TestDirectory"; String filename = "testFile.txt"; - String fileContent = "AzureFileContent"; String inputFlowFileContent = "InputFlowFileContent"; @@ -315,7 +305,7 @@ public class ITFetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT attributes.put(expLangDirectory, directory); attributes.put(expLangFilename, filename); - createDirectoryAndUploadFile(directory, filename, fileContent); + createDirectoryAndUploadFile(directory, filename, TEST_FILE_CONTENT); // WHEN // THEN @@ -336,7 +326,6 @@ public class ITFetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT String directory = "TestDirectory"; String filename = "testFile.txt"; - String fileContent = "AzureFileContent"; String inputFlowFileContent = "InputFlowFileContent"; @@ -344,7 +333,7 @@ public class ITFetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT attributes.put(expLangFileSystem, fileSystemName); attributes.put(expLangDirectory, directory); - createDirectoryAndUploadFile(directory, filename, fileContent); + createDirectoryAndUploadFile(directory, filename, TEST_FILE_CONTENT); // WHEN // THEN @@ -356,15 +345,113 @@ public class ITFetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT inputFlowFileContent); } + @Test + public void testFetchWithRangeZeroOne() throws Exception { + // GIVEN + String directory= "A Test Directory"; + String filename = "testFile.txt"; + String inputFlowFileContent = "InputFlowFileContent"; + + createDirectoryAndUploadFile(directory, filename, TEST_FILE_CONTENT); + + // WHEN + // THEN + testSuccessfulFetch(fileSystemName, directory, filename, "0B", "1B", inputFlowFileContent, TEST_FILE_CONTENT.substring(0, 1)); + } + + @Test + public void testFetchWithRangeOneOne() throws Exception { + // GIVEN + String directory= "A Test Directory"; + String filename = "testFile.txt"; + String inputFlowFileContent = "InputFlowFileContent"; + + createDirectoryAndUploadFile(directory, filename, TEST_FILE_CONTENT); + + // WHEN + // THEN + testSuccessfulFetch(fileSystemName, directory, filename, "1B", "1B", inputFlowFileContent, TEST_FILE_CONTENT.substring(1, 2)); + } + + @Test + public void testFetchWithRangeTwentyThreeTwentySix() throws Exception { + // GIVEN + String directory= "A Test Directory"; + String filename = "testFile.txt"; + String inputFlowFileContent = "InputFlowFileContent"; + + createDirectoryAndUploadFile(directory, filename, TEST_FILE_CONTENT); + + // WHEN + // THEN + testSuccessfulFetch(fileSystemName, directory, filename, "23B", "3B", inputFlowFileContent, TEST_FILE_CONTENT.substring(23, 26)); + } + + @Test + public void testFetchWithRangeLengthGreater() throws Exception { + // GIVEN + String directory= "A Test Directory"; + String filename = "testFile.txt"; + String inputFlowFileContent = "InputFlowFileContent"; + + createDirectoryAndUploadFile(directory, filename, TEST_FILE_CONTENT); + + // WHEN + // THEN + testSuccessfulFetch(fileSystemName, directory, filename, "0B", "1KB", inputFlowFileContent, TEST_FILE_CONTENT); + } + + @Test + public void testFetchWithRangeLengthUnset() throws Exception { + // GIVEN + String directory= "A Test Directory"; + String filename = "testFile.txt"; + String inputFlowFileContent = "InputFlowFileContent"; + + createDirectoryAndUploadFile(directory, filename, TEST_FILE_CONTENT); + + // WHEN + // THEN + testSuccessfulFetch(fileSystemName, directory, filename, "0B", null, inputFlowFileContent, TEST_FILE_CONTENT); + } + + @Test + public void testFetchWithRangeStartOutOfRange() throws Exception { + // GIVEN + String directory= "A Test Directory"; + String filename = "testFile.txt"; + String inputFlowFileContent = "InputFlowFileContent"; + + createDirectoryAndUploadFile(directory, filename, TEST_FILE_CONTENT); + + setRunnerProperties(fileSystemName, directory, filename, String.format("%sB", TEST_FILE_CONTENT.length() + 1), "1B"); + + // WHEN + startRunner(inputFlowFileContent, Collections.emptyMap()); + + // THEN + DataLakeStorageException e = (DataLakeStorageException)runner.getLogger().getErrorMessages().get(0).getThrowable(); + assertEquals(416, e.getStatusCode()); + } + private void testSuccessfulFetch(String fileSystem, String directory, String filename, String inputFlowFileContent, String expectedFlowFileContent) { testSuccessfulFetch(fileSystem, directory, filename, Collections.emptyMap(), inputFlowFileContent, expectedFlowFileContent); } + private void testSuccessfulFetch(String fileSystem, String directory, String filename, String rangeStart, String rangeLength, String inputFlowFileContent, String expectedFlowFileContent) { + testSuccessfulFetch(fileSystem, directory, filename, rangeStart, rangeLength, Collections.emptyMap(), inputFlowFileContent, expectedFlowFileContent); + } + private void testSuccessfulFetch(String fileSystem, String directory, String filename, Map attributes, String inputFlowFileContent, String expectedFlowFileContent) { + testSuccessfulFetch(fileSystem, directory, filename, null, null, attributes, inputFlowFileContent, expectedFlowFileContent); + } + + private void testSuccessfulFetch(String fileSystem, String directory, String filename, String rangeStart, String rangeLength, + Map attributes, String inputFlowFileContent, String expectedFlowFileContent) { // GIVEN Set expectedEventTypes = Sets.newHashSet(ProvenanceEventType.CONTENT_MODIFIED, ProvenanceEventType.FETCH); - setRunnerProperties(fileSystem, directory, filename); + setRunnerProperties(fileSystem, directory, filename, rangeStart, rangeLength); // WHEN startRunner(inputFlowFileContent, attributes); @@ -412,9 +499,22 @@ public class ITFetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT } private void setRunnerProperties(String fileSystem, String directory, String filename) { + setRunnerProperties(fileSystem, directory, filename, null, null); + } + + private void setRunnerProperties(String fileSystem, String directory, String filename, String rangeStart, String rangeLength) { runner.setProperty(FetchAzureDataLakeStorage.FILESYSTEM, fileSystem); runner.setProperty(FetchAzureDataLakeStorage.DIRECTORY, directory); runner.setProperty(FetchAzureDataLakeStorage.FILE, filename); + + if (rangeStart != null) { + runner.setProperty(FetchAzureDataLakeStorage.RANGE_START, rangeStart); + } + + if (rangeLength != null) { + runner.setProperty(FetchAzureDataLakeStorage.RANGE_LENGTH, rangeLength); + } + runner.assertValid(); } diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/FetchGCSObject.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/FetchGCSObject.java index 724a88fdfc..f04eb05bf7 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/FetchGCSObject.java +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/FetchGCSObject.java @@ -23,6 +23,7 @@ import com.google.cloud.storage.BlobId; import com.google.cloud.storage.Storage; import com.google.cloud.storage.StorageException; import com.google.common.collect.ImmutableList; +import org.apache.commons.io.input.BoundedInputStream; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttributes; @@ -33,11 +34,13 @@ import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.DataUnit; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; +import java.io.IOException; import java.nio.channels.Channels; import java.util.ArrayList; import java.util.List; @@ -159,6 +162,26 @@ public class FetchGCSObject extends AbstractGCSProcessor { .sensitive(true) .build(); + public static final PropertyDescriptor RANGE_START = new PropertyDescriptor.Builder() + .name("gcs-object-range-start") + .displayName("Range Start") + .description("The byte position at which to start reading from the object. An empty value or a value of " + + "zero will start reading at the beginning of the object.") + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .required(false) + .build(); + + public static final PropertyDescriptor RANGE_LENGTH = new PropertyDescriptor.Builder() + .name("gcs-object-range-length") + .displayName("Range Length") + .description("The number of bytes to download from the object, starting from the Range Start. An empty " + + "value or a value that extends beyond the end of the object will read to the end of the object.") + .addValidator(StandardValidators.createDataSizeBoundsValidator(1, Long.MAX_VALUE)) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .required(false) + .build(); + @Override public List getSupportedPropertyDescriptors() { return ImmutableList.builder() @@ -167,6 +190,8 @@ public class FetchGCSObject extends AbstractGCSProcessor { .addAll(super.getSupportedPropertyDescriptors()) .add(GENERATION) .add(ENCRYPTION_KEY) + .add(RANGE_START) + .add(RANGE_LENGTH) .build(); } @@ -189,6 +214,9 @@ public class FetchGCSObject extends AbstractGCSProcessor { final Storage storage = getCloudService(); final BlobId blobId = BlobId.of(bucketName, key, generation); + final long rangeStart = (context.getProperty(RANGE_START).isSet() ? context.getProperty(RANGE_START).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B).longValue() : 0L); + final Long rangeLength = (context.getProperty(RANGE_LENGTH).isSet() ? context.getProperty(RANGE_LENGTH).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B).longValue() : null); + try { final List blobSourceOptions = new ArrayList<>(2); @@ -205,12 +233,26 @@ public class FetchGCSObject extends AbstractGCSProcessor { throw new StorageException(404, "Blob " + blobId + " not found"); } + if (rangeStart > 0 && rangeStart >= blob.getSize()) { + if (getLogger().isDebugEnabled()) { + getLogger().debug("Start position: {}, blob size: {}", new Object[] {rangeStart, blob.getSize()}); + } + throw new StorageException(416, "The range specified is not valid for the blob " + blobId + + ". Range Start is beyond the end of the blob."); + } + final ReadChannel reader = storage.reader(blobId, blobSourceOptions.toArray(new Storage.BlobSourceOption[0])); - flowFile = session.importFrom(Channels.newInputStream(reader), flowFile); + reader.seek(rangeStart); + + if (rangeLength == null) { + flowFile = session.importFrom(Channels.newInputStream(reader), flowFile); + } else { + flowFile = session.importFrom(new BoundedInputStream(Channels.newInputStream(reader), rangeLength), flowFile); + } final Map attributes = StorageAttributes.createAttributes(blob); flowFile = session.putAllAttributes(flowFile, attributes); - } catch (StorageException e) { + } catch (StorageException | IOException e) { getLogger().error("Failed to fetch GCS Object due to {}", new Object[] {e}, e); flowFile = session.penalize(flowFile); session.transfer(flowFile, REL_FAILURE);