NIFI-7830: Support large files in PutAzureDataLakeStorage

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #4556.
This commit is contained in:
Peter Turcsanyi 2020-09-26 00:32:23 +02:00 committed by Pierre Villard
parent a57d38c58d
commit f9ae3bb9c9
No known key found for this signature in database
GPG Key ID: F92A93B30C07C6D5
5 changed files with 36 additions and 8 deletions

View File

@ -83,7 +83,7 @@
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-storage-file-datalake</artifactId>
<version>12.1.1</version>
<version>12.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>

View File

@ -21,6 +21,8 @@ import com.azure.storage.file.datalake.DataLakeFileClient;
import com.azure.storage.file.datalake.DataLakeFileSystemClient;
import com.azure.storage.file.datalake.DataLakeServiceClient;
import com.azure.storage.file.datalake.models.DataLakeStorageException;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.io.input.BoundedInputStream;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
@ -71,6 +73,8 @@ public class PutAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcess
public static final String REPLACE_RESOLUTION = "replace";
public static final String IGNORE_RESOLUTION = "ignore";
public static long MAX_CHUNK_SIZE = 100 * 1024 * 1024; // current chunk limit is 100 MiB on Azure
public static final PropertyDescriptor CONFLICT_RESOLUTION = new PropertyDescriptor.Builder()
.name("conflict-resolution-strategy")
.displayName("Conflict Resolution Strategy")
@ -120,11 +124,10 @@ public class PutAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcess
final long length = flowFile.getSize();
if (length > 0) {
try (final InputStream rawIn = session.read(flowFile); final BufferedInputStream in = new BufferedInputStream(rawIn)) {
fileClient.append(in, 0, length);
try (final InputStream rawIn = session.read(flowFile); final BufferedInputStream bufferedIn = new BufferedInputStream(rawIn)) {
uploadContent(fileClient, bufferedIn, length);
}
}
fileClient.flush(length);
final Map<String, String> attributes = new HashMap<>();
attributes.put(ATTR_NAME_FILESYSTEM, fileSystem);
@ -158,4 +161,24 @@ public class PutAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcess
session.transfer(flowFile, REL_FAILURE);
}
}
@VisibleForTesting
static void uploadContent(DataLakeFileClient fileClient, InputStream in, long length) {
long chunkStart = 0;
long chunkSize;
while (chunkStart < length) {
chunkSize = Math.min(length - chunkStart, MAX_CHUNK_SIZE);
// com.azure.storage.common.Utility.convertStreamToByteBuffer() throws an exception
// if there are more available bytes in the stream after reading the chunk
BoundedInputStream boundedIn = new BoundedInputStream(in, chunkSize);
fileClient.append(boundedIn, chunkStart, chunkSize);
chunkStart += chunkSize;
}
fileClient.flush(length);
}
}

View File

@ -84,8 +84,7 @@ public abstract class AbstractAzureDataLakeStorageIT extends AbstractAzureStorag
DataLakeDirectoryClient directoryClient = fileSystemClient.getDirectoryClient(directory);
DataLakeFileClient fileClient = directoryClient.createFile(filename);
fileClient.append(new ByteArrayInputStream(fileContentBytes), 0, fileContentBytes.length);
fileClient.flush(fileContentBytes.length);
PutAzureDataLakeStorage.uploadContent(fileClient, new ByteArrayInputStream(fileContentBytes), fileContentBytes.length);
}
protected void uploadFile(TestFile testFile) {

View File

@ -29,6 +29,7 @@ import org.junit.Test;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.stream.Collectors;
@ -222,7 +223,9 @@ public class ITFetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT
// GIVEN
String directory = "TestDirectory";
String filename = "testFile.txt";
byte[] fileContentBytes = new byte[100_000_000];
Random random = new Random();
byte[] fileContentBytes = new byte[120_000_000];
random.nextBytes(fileContentBytes);
String fileContent = new String(fileContentBytes);
String inputFlowFileContent = "InputFlowFileContent";

View File

@ -32,6 +32,7 @@ import org.junit.Test;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.stream.Collectors;
@ -137,7 +138,9 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
// ignore excessive test with larger file size
@Test
public void testPutBigFile() throws Exception {
byte[] fileData = new byte[100_000_000];
Random random = new Random();
byte[] fileData = new byte[120_000_000];
random.nextBytes(fileData);
runProcessor(fileData);