diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java index 5301700ee5..c38e3c603d 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java @@ -19,6 +19,7 @@ package org.apache.nifi.processors.azure.storage; import java.io.BufferedInputStream; import java.io.IOException; import java.io.InputStream; +import java.io.FilterInputStream; import java.net.URISyntaxException; import java.util.ArrayList; import java.util.HashMap; @@ -131,8 +132,16 @@ public class PutAzureBlobStorage extends AbstractAzureBlobProcessor { in = new BufferedInputStream(rawIn); } + // If markSupported() is true and a file length is provided, + // Blobs are not uploaded in blocks resulting in OOME for large + // files. The UnmarkableInputStream wrapper class disables + // mark() and reset() to help force uploading files in chunks. + if (in.markSupported()) { + in = new UnmarkableInputStream(in); + } + try { - blob.upload(in, length, null, null, operationContext); + blob.upload(in, -1, null, null, operationContext); BlobProperties properties = blob.getProperties(); attributes.put("azure.container", containerName); attributes.put("azure.primaryUri", blob.getSnapshotQualifiedUri().toString()); @@ -165,4 +174,24 @@ public class PutAzureBlobStorage extends AbstractAzureBlobProcessor { } } + + // Used to help force Azure Blob SDK to write in blocks + private static class UnmarkableInputStream extends FilterInputStream { + public UnmarkableInputStream(InputStream in) { + super(in); + } + + @Override + public void mark(int readlimit) { + } + + @Override + public void reset() throws IOException { + } + + @Override + public boolean markSupported() { + return false; + } + } }