NIFI-7886 Added read by range to Azure, GCS, S3 fetch processors

This closes #4576

Co-authored-by: Joey Frazee <jfrazee@apache.org>
Signed-off-by: Joey Frazee <jfrazee@apache.org>
This commit is contained in:
Paul Kelly 2020-10-06 10:34:02 +00:00 committed by Joey Frazee
parent 16dc61e151
commit 0ed3534524
9 changed files with 378 additions and 46 deletions

View File

@ -43,6 +43,7 @@ import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.FlowFileAccessException;
@ -82,6 +83,7 @@ public class FetchS3Object extends AbstractS3Processor {
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(false)
.build();
public static final PropertyDescriptor REQUESTER_PAYS = new PropertyDescriptor.Builder()
.name("requester-pays")
.displayName("Requester Pays")
@ -95,10 +97,30 @@ public class FetchS3Object extends AbstractS3Processor {
.defaultValue("false")
.build();
public static final PropertyDescriptor RANGE_START = new PropertyDescriptor.Builder()
.name("range-start")
.displayName("Range Start")
.description("The byte position at which to start reading from the object. An empty value or a value of " +
"zero will start reading at the beginning of the object.")
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(false)
.build();
public static final PropertyDescriptor RANGE_LENGTH = new PropertyDescriptor.Builder()
.name("range-length")
.displayName("Range Length")
.description("The number of bytes to download from the object, starting from the Range Start. An empty " +
"value or a value that extends beyond the end of the object will read to the end of the object.")
.addValidator(StandardValidators.createDataSizeBoundsValidator(1, Long.MAX_VALUE))
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(false)
.build();
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
Arrays.asList(BUCKET, KEY, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, VERSION_ID,
SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE, SIGNER_OVERRIDE, ENCRYPTION_SERVICE, PROXY_CONFIGURATION_SERVICE, PROXY_HOST,
PROXY_HOST_PORT, PROXY_USERNAME, PROXY_PASSWORD, REQUESTER_PAYS));
PROXY_HOST_PORT, PROXY_USERNAME, PROXY_PASSWORD, REQUESTER_PAYS, RANGE_START, RANGE_LENGTH));
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@ -138,6 +160,8 @@ public class FetchS3Object extends AbstractS3Processor {
final String key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue();
final String versionId = context.getProperty(VERSION_ID).evaluateAttributeExpressions(flowFile).getValue();
final boolean requesterPays = context.getProperty(REQUESTER_PAYS).asBoolean();
final long rangeStart = (context.getProperty(RANGE_START).isSet() ? context.getProperty(RANGE_START).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B).longValue() : 0L);
final Long rangeLength = (context.getProperty(RANGE_LENGTH).isSet() ? context.getProperty(RANGE_LENGTH).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B).longValue() : null);
final AmazonS3 client = getClient();
final GetObjectRequest request;
@ -147,6 +171,11 @@ public class FetchS3Object extends AbstractS3Processor {
request = new GetObjectRequest(bucket, key, versionId);
}
request.setRequesterPays(requesterPays);
if (rangeLength != null) {
request.setRange(rangeStart, rangeStart + rangeLength - 1);
} else {
request.setRange(rangeStart);
}
final Map<String, String> attributes = new HashMap<>();

View File

@ -268,7 +268,7 @@ public class TestFetchS3Object {
public void testGetPropertyDescriptors() throws Exception {
FetchS3Object processor = new FetchS3Object();
List<PropertyDescriptor> pd = processor.getSupportedPropertyDescriptors();
assertEquals("size should be eq", 19, pd.size());
assertEquals("size should be eq", 21, pd.size());
assertTrue(pd.contains(FetchS3Object.ACCESS_KEY));
assertTrue(pd.contains(FetchS3Object.AWS_CREDENTIALS_PROVIDER_SERVICE));
assertTrue(pd.contains(FetchS3Object.BUCKET));

View File

@ -18,7 +18,9 @@ package org.apache.nifi.processors.azure.storage;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
@ -32,10 +34,14 @@ import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.azure.AbstractAzureBlobProcessor;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
@ -53,6 +59,34 @@ import com.microsoft.azure.storage.blob.CloudBlobContainer;
})
public class FetchAzureBlobStorage extends AbstractAzureBlobProcessor {
public static final PropertyDescriptor RANGE_START = new PropertyDescriptor.Builder()
.name("range-start")
.displayName("Range Start")
.description("The byte position at which to start reading from the blob. An empty value or a value of " +
"zero will start reading at the beginning of the blob.")
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(false)
.build();
public static final PropertyDescriptor RANGE_LENGTH = new PropertyDescriptor.Builder()
.name("range-length")
.displayName("Range Length")
.description("The number of bytes to download from the blob, starting from the Range Start. An empty " +
"value or a value that extends beyond the end of the blob will read to the end of the blob.")
.addValidator(StandardValidators.createDataSizeBoundsValidator(1, Long.MAX_VALUE))
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(false)
.build();
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
List<PropertyDescriptor> properties = new ArrayList<PropertyDescriptor>(super.getSupportedPropertyDescriptors());
properties.add(RANGE_START);
properties.add(RANGE_LENGTH);
return properties;
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
@ -62,8 +96,10 @@ public class FetchAzureBlobStorage extends AbstractAzureBlobProcessor {
final long startNanos = System.nanoTime();
String containerName = context.getProperty(AzureStorageUtils.CONTAINER).evaluateAttributeExpressions(flowFile).getValue();
String blobPath = context.getProperty(BLOB).evaluateAttributeExpressions(flowFile).getValue();
final String containerName = context.getProperty(AzureStorageUtils.CONTAINER).evaluateAttributeExpressions(flowFile).getValue();
final String blobPath = context.getProperty(BLOB).evaluateAttributeExpressions(flowFile).getValue();
final long rangeStart = (context.getProperty(RANGE_START).isSet() ? context.getProperty(RANGE_START).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B).longValue() : 0L);
final Long rangeLength = (context.getProperty(RANGE_LENGTH).isSet() ? context.getProperty(RANGE_LENGTH).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B).longValue() : null);
AtomicReference<Exception> storedException = new AtomicReference<>();
try {
@ -80,7 +116,7 @@ public class FetchAzureBlobStorage extends AbstractAzureBlobProcessor {
// distribution of download over threads, investigate
flowFile = session.write(flowFile, os -> {
try {
blob.download(os, null, null, operationContext);
blob.downloadRange(rangeStart, rangeLength, os, null, null, operationContext);
} catch (StorageException e) {
storedException.set(e);
throw new IOException(e);

View File

@ -16,21 +16,30 @@
*/
package org.apache.nifi.processors.azure.storage;
import com.azure.core.util.Context;
import com.azure.storage.file.datalake.DataLakeDirectoryClient;
import com.azure.storage.file.datalake.DataLakeFileClient;
import com.azure.storage.file.datalake.DataLakeFileSystemClient;
import com.azure.storage.file.datalake.DataLakeServiceClient;
import com.azure.storage.file.datalake.models.DownloadRetryOptions;
import com.azure.storage.file.datalake.models.FileRange;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
@Tags({"azure", "microsoft", "cloud", "storage", "adlsgen2", "datalake"})
@ -39,6 +48,45 @@ import java.util.concurrent.TimeUnit;
@InputRequirement(Requirement.INPUT_REQUIRED)
public class FetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcessor {
public static final PropertyDescriptor RANGE_START = new PropertyDescriptor.Builder()
.name("range-start")
.displayName("Range Start")
.description("The byte position at which to start reading from the object. An empty value or a value of " +
"zero will start reading at the beginning of the object.")
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(false)
.build();
public static final PropertyDescriptor RANGE_LENGTH = new PropertyDescriptor.Builder()
.name("range-length")
.displayName("Range Length")
.description("The number of bytes to download from the object, starting from the Range Start. An empty " +
"value or a value that extends beyond the end of the object will read to the end of the object.")
.addValidator(StandardValidators.createDataSizeBoundsValidator(1, Long.MAX_VALUE))
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(false)
.build();
public static final PropertyDescriptor NUM_RETRIES = new PropertyDescriptor.Builder()
.name("number-of-retries")
.displayName("Number of Retries")
.description("The number of automatic retries to perform if the download fails.")
.addValidator(StandardValidators.createLongValidator(0L, Integer.MAX_VALUE, true))
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(false)
.defaultValue("0")
.build();
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
List<PropertyDescriptor> properties = new ArrayList<PropertyDescriptor>(super.getSupportedPropertyDescriptors());
properties.add(RANGE_START);
properties.add(RANGE_LENGTH);
properties.add(NUM_RETRIES);
return properties;
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
@ -48,10 +96,16 @@ public class FetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageProce
final long startNanos = System.nanoTime();
try {
final long rangeStart = (context.getProperty(RANGE_START).isSet() ? context.getProperty(RANGE_START).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B).longValue() : 0L);
final Long rangeLength = (context.getProperty(RANGE_LENGTH).isSet() ? context.getProperty(RANGE_LENGTH).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B).longValue() : null);
final int numRetries = (context.getProperty(NUM_RETRIES).isSet() ? context.getProperty(NUM_RETRIES).evaluateAttributeExpressions(flowFile).asInteger() : 0);
final FileRange fileRange = new FileRange(rangeStart, rangeLength);
final DownloadRetryOptions retryOptions = new DownloadRetryOptions();
retryOptions.setMaxRetryRequests(numRetries);
final String fileSystem = evaluateFileSystemProperty(context, flowFile);
final String directory = evaluateDirectoryProperty(context, flowFile);
final String fileName = evaluateFileNameProperty(context, flowFile);
final DataLakeServiceClient storageClient = getStorageClient(context, flowFile);
final DataLakeFileSystemClient fileSystemClient = storageClient.getFileSystemClient(fileSystem);
final DataLakeDirectoryClient directoryClient = fileSystemClient.getDirectoryClient(directory);
@ -61,7 +115,7 @@ public class FetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageProce
throw new ProcessException(FILE.getDisplayName() + " (" + fileName + ") points to a directory. Full path: " + fileClient.getFilePath());
}
flowFile = session.write(flowFile, os -> fileClient.read(os));
flowFile = session.write(flowFile, os -> fileClient.readWithResponse(os, fileRange, retryOptions, null, false, null, Context.NONE));
session.getProvenanceReporter().modifyContent(flowFile);
session.transfer(flowFile, REL_SUCCESS);
@ -73,4 +127,4 @@ public class FetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageProce
session.transfer(flowFile, REL_FAILURE);
}
}
}
}

View File

@ -32,6 +32,7 @@ public abstract class AbstractAzureBlobStorageIT extends AbstractAzureStorageIT
protected static final String TEST_CONTAINER_NAME_PREFIX = "nifi-test-container";
protected static final String TEST_BLOB_NAME = "nifi-test-blob";
protected static final String TEST_FILE_NAME = "nifi-test-file";
protected static final String TEST_FILE_CONTENT = "ABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890";
protected CloudBlobContainer container;
@ -52,8 +53,8 @@ public abstract class AbstractAzureBlobStorageIT extends AbstractAzureStorageIT
protected void uploadTestBlob() throws Exception {
CloudBlob blob = container.getBlockBlobReference(TEST_BLOB_NAME);
byte[] buf = "0123456789".getBytes();
byte[] buf = TEST_FILE_CONTENT.getBytes();
InputStream in = new ByteArrayInputStream(buf);
blob.upload(in, 10);
blob.upload(in, buf.length);
}
}

View File

@ -36,7 +36,8 @@ import java.util.UUID;
public abstract class AbstractAzureDataLakeStorageIT extends AbstractAzureStorageIT {
private static final String FILESYSTEM_NAME_PREFIX = "nifi-test-filesystem";
private static final String TEST_FILE_CONTENT = "test";
protected static final String TEST_FILE_CONTENT = "ABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890";
protected String fileSystemName;
protected DataLakeFileSystemClient fileSystemClient;

View File

@ -47,6 +47,71 @@ public class ITFetchAzureBlobStorage extends AbstractAzureBlobStorageIT {
assertResult();
}
@Test
public void testFetchBlobWithRangeZeroOne() throws Exception {
runner.setProperty(FetchAzureBlobStorage.RANGE_START, "0B");
runner.setProperty(FetchAzureBlobStorage.RANGE_LENGTH, "1B");
runner.assertValid();
runner.enqueue(new byte[0]);
runner.run();
assertResult(TEST_FILE_CONTENT.substring(0, 1));
}
@Test
public void testFetchBlobWithRangeOneOne() throws Exception {
runner.setProperty(FetchAzureBlobStorage.RANGE_START, "1B");
runner.setProperty(FetchAzureBlobStorage.RANGE_LENGTH, "1B");
runner.assertValid();
runner.enqueue(new byte[0]);
runner.run();
assertResult(TEST_FILE_CONTENT.substring(1, 2));
}
@Test
public void testFetchBlobWithRangeTwentyThreeTwentySix() throws Exception {
runner.setProperty(FetchAzureBlobStorage.RANGE_START, "23B");
runner.setProperty(FetchAzureBlobStorage.RANGE_LENGTH, "3B");
runner.assertValid();
runner.enqueue(new byte[0]);
runner.run();
assertResult(TEST_FILE_CONTENT.substring(23, 26));
}
@Test
public void testFetchBlobWithRangeLengthGreater() throws Exception {
runner.setProperty(FetchAzureBlobStorage.RANGE_START, "0B");
runner.setProperty(FetchAzureBlobStorage.RANGE_LENGTH, "1KB");
runner.assertValid();
runner.enqueue(new byte[0]);
runner.run();
assertResult(TEST_FILE_CONTENT);
}
@Test
public void testFetchBlobWithRangeLengthUnset() throws Exception {
runner.setProperty(FetchAzureBlobStorage.RANGE_START, "0B");
runner.assertValid();
runner.enqueue(new byte[0]);
runner.run();
assertResult(TEST_FILE_CONTENT);
}
@Test
public void testFetchBlobWithRangeStartOutOfRange() throws Exception {
runner.setProperty(FetchAzureBlobStorage.RANGE_START, String.format("%sB", TEST_FILE_CONTENT.length() + 1));
runner.setProperty(FetchAzureBlobStorage.RANGE_LENGTH, "1B");
runner.assertValid();
runner.enqueue(new byte[0]);
runner.run();
runner.assertAllFlowFilesTransferred(AbstractAzureBlobProcessor.REL_FAILURE, 1);
}
@Test
public void testFetchBlobUsingCredentialService() throws Exception {
configureCredentialsService();
@ -59,11 +124,15 @@ public class ITFetchAzureBlobStorage extends AbstractAzureBlobStorageIT {
}
private void assertResult() throws Exception {
assertResult(TEST_FILE_CONTENT);
}
private void assertResult(final String expectedContent) throws Exception {
runner.assertAllFlowFilesTransferred(AbstractAzureBlobProcessor.REL_SUCCESS, 1);
List<MockFlowFile> flowFilesForRelationship = runner.getFlowFilesForRelationship(FetchAzureBlobStorage.REL_SUCCESS);
for (MockFlowFile flowFile : flowFilesForRelationship) {
flowFile.assertContentEquals("0123456789".getBytes());
flowFile.assertAttributeEquals("azure.length", "10");
flowFile.assertContentEquals(expectedContent);
flowFile.assertAttributeEquals("azure.length", String.valueOf(TEST_FILE_CONTENT.length()));
}
}
}

View File

@ -47,14 +47,13 @@ public class ITFetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT
// GIVEN
String directory = "TestDirectory";
String filename = "testFile.txt";
String fileContent = "AzureFileContent";
String inputFlowFileContent = "InputFlowFileContent";
createDirectoryAndUploadFile(directory, filename, fileContent);
createDirectoryAndUploadFile(directory, filename, TEST_FILE_CONTENT);
// WHEN
// THEN
testSuccessfulFetch(fileSystemName, directory, filename, inputFlowFileContent, fileContent);
testSuccessfulFetch(fileSystemName, directory, filename, inputFlowFileContent, TEST_FILE_CONTENT);
}
@Test
@ -62,14 +61,13 @@ public class ITFetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT
// GIVEN
String directory= "";
String filename = "testFile.txt";
String fileContent = "AzureFileContent";
String inputFlowFileContent = "InputFlowFileContent";
uploadFile(directory, filename, fileContent);
uploadFile(directory, filename, TEST_FILE_CONTENT);
// WHEN
// THEN
testSuccessfulFetch(fileSystemName, directory, filename, inputFlowFileContent, fileContent);
testSuccessfulFetch(fileSystemName, directory, filename, inputFlowFileContent, TEST_FILE_CONTENT);
}
@Test
@ -77,14 +75,13 @@ public class ITFetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT
// GIVEN
String directory= "A Test Directory";
String filename = "testFile.txt";
String fileContent = "AzureFileContent";
String inputFlowFileContent = "InputFlowFileContent";
createDirectoryAndUploadFile(directory, filename, fileContent);
createDirectoryAndUploadFile(directory, filename, TEST_FILE_CONTENT);
// WHEN
// THEN
testSuccessfulFetch(fileSystemName, directory, filename, inputFlowFileContent, fileContent);
testSuccessfulFetch(fileSystemName, directory, filename, inputFlowFileContent, TEST_FILE_CONTENT);
}
@Test
@ -92,14 +89,13 @@ public class ITFetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT
// GIVEN
String directory= "TestDirectory";
String filename = "A test file.txt";
String fileContent = "AzureFileContent";
String inputFlowFileContent = "InputFlowFileContent";
createDirectoryAndUploadFile(directory, filename, fileContent);
createDirectoryAndUploadFile(directory, filename, TEST_FILE_CONTENT);
// WHEN
// THEN
testSuccessfulFetch(fileSystemName, directory, filename, inputFlowFileContent, fileContent);
testSuccessfulFetch(fileSystemName, directory, filename, inputFlowFileContent, TEST_FILE_CONTENT);
}
@Test
@ -152,14 +148,13 @@ public class ITFetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT
+ "Directory08/Directory09/Directory10/Directory11/Directory12/Directory13/Directory14/Directory15/"
+ "Directory16/Directory17/Directory18/Directory19/Directory20/TestDirectory";
String filename = "testFile.txt";
String fileContent = "AzureFileContent";
String inputFlowFileContent = "InputFlowFileContent";
createDirectoryAndUploadFile(directory, filename, fileContent);
createDirectoryAndUploadFile(directory, filename, TEST_FILE_CONTENT);
// WHEN
// THEN
testSuccessfulFetch(fileSystemName, directory, filename, inputFlowFileContent, fileContent);
testSuccessfulFetch(fileSystemName, directory, filename, inputFlowFileContent, TEST_FILE_CONTENT);
}
@Test
@ -168,10 +163,9 @@ public class ITFetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT
String parentDirectory = "ParentDirectory";
String childDirectory = "ChildDirectory";
String filename = "testFile.txt";
String fileContent = "AzureFileContent";
String inputFlowFileContent = "InputFlowFileContent";
createDirectoryAndUploadFile(parentDirectory + "/" + childDirectory, filename, fileContent);
createDirectoryAndUploadFile(parentDirectory + "/" + childDirectory, filename, TEST_FILE_CONTENT);
// WHEN
// THEN
@ -229,11 +223,11 @@ public class ITFetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT
String fileContent = new String(fileContentBytes);
String inputFlowFileContent = "InputFlowFileContent";
createDirectoryAndUploadFile(directory, filename, fileContent);
createDirectoryAndUploadFile(directory, filename, TEST_FILE_CONTENT);
// WHEN
// THEN
testSuccessfulFetch(fileSystemName, directory, filename, inputFlowFileContent, fileContent);
testSuccessfulFetch(fileSystemName, directory, filename, inputFlowFileContent, TEST_FILE_CONTENT);
}
@Test
@ -242,10 +236,9 @@ public class ITFetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT
String directory = "TestDirectory";
String invalidDirectoryName = "Test/\\Directory";
String filename = "testFile.txt";
String fileContent = "AzureFileContent";
String inputFlowFileContent = "InputFlowFileContent";
createDirectoryAndUploadFile(directory, filename, fileContent);
createDirectoryAndUploadFile(directory, filename, TEST_FILE_CONTENT);
// WHEN
// THEN
@ -258,10 +251,9 @@ public class ITFetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT
String directory = "TestDirectory";
String filename = "testFile.txt";
String invalidFilename = "test/\\File.txt";
String fileContent = "AzureFileContent";
String inputFlowFileContent = "InputFlowFileContent";
createDirectoryAndUploadFile(directory, filename, fileContent);
createDirectoryAndUploadFile(directory, filename, TEST_FILE_CONTENT);
// WHEN
// THEN
@ -277,7 +269,6 @@ public class ITFetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT
String directory = "TestDirectory";
String filename = "testFile.txt";
String fileContent = "AzureFileContent";
String inputFlowFileContent = "InputFlowFileContent";
@ -286,7 +277,7 @@ public class ITFetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT
attributes.put(expLangDirectory, directory);
attributes.put(expLangFilename, filename);
createDirectoryAndUploadFile(directory, filename, fileContent);
createDirectoryAndUploadFile(directory, filename, TEST_FILE_CONTENT);
// WHEN
// THEN
@ -295,7 +286,7 @@ public class ITFetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT
"${" + expLangFilename + "}",
attributes,
inputFlowFileContent,
fileContent);
TEST_FILE_CONTENT);
}
@Test
@ -307,7 +298,6 @@ public class ITFetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT
String directory = "TestDirectory";
String filename = "testFile.txt";
String fileContent = "AzureFileContent";
String inputFlowFileContent = "InputFlowFileContent";
@ -315,7 +305,7 @@ public class ITFetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT
attributes.put(expLangDirectory, directory);
attributes.put(expLangFilename, filename);
createDirectoryAndUploadFile(directory, filename, fileContent);
createDirectoryAndUploadFile(directory, filename, TEST_FILE_CONTENT);
// WHEN
// THEN
@ -336,7 +326,6 @@ public class ITFetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT
String directory = "TestDirectory";
String filename = "testFile.txt";
String fileContent = "AzureFileContent";
String inputFlowFileContent = "InputFlowFileContent";
@ -344,7 +333,7 @@ public class ITFetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT
attributes.put(expLangFileSystem, fileSystemName);
attributes.put(expLangDirectory, directory);
createDirectoryAndUploadFile(directory, filename, fileContent);
createDirectoryAndUploadFile(directory, filename, TEST_FILE_CONTENT);
// WHEN
// THEN
@ -356,15 +345,113 @@ public class ITFetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT
inputFlowFileContent);
}
@Test
public void testFetchWithRangeZeroOne() throws Exception {
// GIVEN
String directory= "A Test Directory";
String filename = "testFile.txt";
String inputFlowFileContent = "InputFlowFileContent";
createDirectoryAndUploadFile(directory, filename, TEST_FILE_CONTENT);
// WHEN
// THEN
testSuccessfulFetch(fileSystemName, directory, filename, "0B", "1B", inputFlowFileContent, TEST_FILE_CONTENT.substring(0, 1));
}
@Test
public void testFetchWithRangeOneOne() throws Exception {
// GIVEN
String directory= "A Test Directory";
String filename = "testFile.txt";
String inputFlowFileContent = "InputFlowFileContent";
createDirectoryAndUploadFile(directory, filename, TEST_FILE_CONTENT);
// WHEN
// THEN
testSuccessfulFetch(fileSystemName, directory, filename, "1B", "1B", inputFlowFileContent, TEST_FILE_CONTENT.substring(1, 2));
}
@Test
public void testFetchWithRangeTwentyThreeTwentySix() throws Exception {
// GIVEN
String directory= "A Test Directory";
String filename = "testFile.txt";
String inputFlowFileContent = "InputFlowFileContent";
createDirectoryAndUploadFile(directory, filename, TEST_FILE_CONTENT);
// WHEN
// THEN
testSuccessfulFetch(fileSystemName, directory, filename, "23B", "3B", inputFlowFileContent, TEST_FILE_CONTENT.substring(23, 26));
}
@Test
public void testFetchWithRangeLengthGreater() throws Exception {
// GIVEN
String directory= "A Test Directory";
String filename = "testFile.txt";
String inputFlowFileContent = "InputFlowFileContent";
createDirectoryAndUploadFile(directory, filename, TEST_FILE_CONTENT);
// WHEN
// THEN
testSuccessfulFetch(fileSystemName, directory, filename, "0B", "1KB", inputFlowFileContent, TEST_FILE_CONTENT);
}
@Test
public void testFetchWithRangeLengthUnset() throws Exception {
// GIVEN
String directory= "A Test Directory";
String filename = "testFile.txt";
String inputFlowFileContent = "InputFlowFileContent";
createDirectoryAndUploadFile(directory, filename, TEST_FILE_CONTENT);
// WHEN
// THEN
testSuccessfulFetch(fileSystemName, directory, filename, "0B", null, inputFlowFileContent, TEST_FILE_CONTENT);
}
@Test
public void testFetchWithRangeStartOutOfRange() throws Exception {
// GIVEN
String directory= "A Test Directory";
String filename = "testFile.txt";
String inputFlowFileContent = "InputFlowFileContent";
createDirectoryAndUploadFile(directory, filename, TEST_FILE_CONTENT);
setRunnerProperties(fileSystemName, directory, filename, String.format("%sB", TEST_FILE_CONTENT.length() + 1), "1B");
// WHEN
startRunner(inputFlowFileContent, Collections.emptyMap());
// THEN
DataLakeStorageException e = (DataLakeStorageException)runner.getLogger().getErrorMessages().get(0).getThrowable();
assertEquals(416, e.getStatusCode());
}
private void testSuccessfulFetch(String fileSystem, String directory, String filename, String inputFlowFileContent, String expectedFlowFileContent) {
testSuccessfulFetch(fileSystem, directory, filename, Collections.emptyMap(), inputFlowFileContent, expectedFlowFileContent);
}
private void testSuccessfulFetch(String fileSystem, String directory, String filename, String rangeStart, String rangeLength, String inputFlowFileContent, String expectedFlowFileContent) {
testSuccessfulFetch(fileSystem, directory, filename, rangeStart, rangeLength, Collections.emptyMap(), inputFlowFileContent, expectedFlowFileContent);
}
private void testSuccessfulFetch(String fileSystem, String directory, String filename, Map<String, String> attributes, String inputFlowFileContent, String expectedFlowFileContent) {
testSuccessfulFetch(fileSystem, directory, filename, null, null, attributes, inputFlowFileContent, expectedFlowFileContent);
}
private void testSuccessfulFetch(String fileSystem, String directory, String filename, String rangeStart, String rangeLength,
Map<String, String> attributes, String inputFlowFileContent, String expectedFlowFileContent) {
// GIVEN
Set<ProvenanceEventType> expectedEventTypes = Sets.newHashSet(ProvenanceEventType.CONTENT_MODIFIED, ProvenanceEventType.FETCH);
setRunnerProperties(fileSystem, directory, filename);
setRunnerProperties(fileSystem, directory, filename, rangeStart, rangeLength);
// WHEN
startRunner(inputFlowFileContent, attributes);
@ -412,9 +499,22 @@ public class ITFetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT
}
private void setRunnerProperties(String fileSystem, String directory, String filename) {
setRunnerProperties(fileSystem, directory, filename, null, null);
}
private void setRunnerProperties(String fileSystem, String directory, String filename, String rangeStart, String rangeLength) {
runner.setProperty(FetchAzureDataLakeStorage.FILESYSTEM, fileSystem);
runner.setProperty(FetchAzureDataLakeStorage.DIRECTORY, directory);
runner.setProperty(FetchAzureDataLakeStorage.FILE, filename);
if (rangeStart != null) {
runner.setProperty(FetchAzureDataLakeStorage.RANGE_START, rangeStart);
}
if (rangeLength != null) {
runner.setProperty(FetchAzureDataLakeStorage.RANGE_LENGTH, rangeLength);
}
runner.assertValid();
}

View File

@ -23,6 +23,7 @@ import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageException;
import com.google.common.collect.ImmutableList;
import org.apache.commons.io.input.BoundedInputStream;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
@ -33,11 +34,13 @@ import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import java.io.IOException;
import java.nio.channels.Channels;
import java.util.ArrayList;
import java.util.List;
@ -159,6 +162,26 @@ public class FetchGCSObject extends AbstractGCSProcessor {
.sensitive(true)
.build();
public static final PropertyDescriptor RANGE_START = new PropertyDescriptor.Builder()
.name("gcs-object-range-start")
.displayName("Range Start")
.description("The byte position at which to start reading from the object. An empty value or a value of " +
"zero will start reading at the beginning of the object.")
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(false)
.build();
public static final PropertyDescriptor RANGE_LENGTH = new PropertyDescriptor.Builder()
.name("gcs-object-range-length")
.displayName("Range Length")
.description("The number of bytes to download from the object, starting from the Range Start. An empty " +
"value or a value that extends beyond the end of the object will read to the end of the object.")
.addValidator(StandardValidators.createDataSizeBoundsValidator(1, Long.MAX_VALUE))
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(false)
.build();
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return ImmutableList.<PropertyDescriptor>builder()
@ -167,6 +190,8 @@ public class FetchGCSObject extends AbstractGCSProcessor {
.addAll(super.getSupportedPropertyDescriptors())
.add(GENERATION)
.add(ENCRYPTION_KEY)
.add(RANGE_START)
.add(RANGE_LENGTH)
.build();
}
@ -189,6 +214,9 @@ public class FetchGCSObject extends AbstractGCSProcessor {
final Storage storage = getCloudService();
final BlobId blobId = BlobId.of(bucketName, key, generation);
final long rangeStart = (context.getProperty(RANGE_START).isSet() ? context.getProperty(RANGE_START).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B).longValue() : 0L);
final Long rangeLength = (context.getProperty(RANGE_LENGTH).isSet() ? context.getProperty(RANGE_LENGTH).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B).longValue() : null);
try {
final List<Storage.BlobSourceOption> blobSourceOptions = new ArrayList<>(2);
@ -205,12 +233,26 @@ public class FetchGCSObject extends AbstractGCSProcessor {
throw new StorageException(404, "Blob " + blobId + " not found");
}
if (rangeStart > 0 && rangeStart >= blob.getSize()) {
if (getLogger().isDebugEnabled()) {
getLogger().debug("Start position: {}, blob size: {}", new Object[] {rangeStart, blob.getSize()});
}
throw new StorageException(416, "The range specified is not valid for the blob " + blobId
+ ". Range Start is beyond the end of the blob.");
}
final ReadChannel reader = storage.reader(blobId, blobSourceOptions.toArray(new Storage.BlobSourceOption[0]));
flowFile = session.importFrom(Channels.newInputStream(reader), flowFile);
reader.seek(rangeStart);
if (rangeLength == null) {
flowFile = session.importFrom(Channels.newInputStream(reader), flowFile);
} else {
flowFile = session.importFrom(new BoundedInputStream(Channels.newInputStream(reader), rangeLength), flowFile);
}
final Map<String, String> attributes = StorageAttributes.createAttributes(blob);
flowFile = session.putAllAttributes(flowFile, attributes);
} catch (StorageException e) {
} catch (StorageException | IOException e) {
getLogger().error("Failed to fetch GCS Object due to {}", new Object[] {e}, e);
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);