mirror of https://github.com/apache/nifi.git
NIFI-7964 Force PutAzureBlobStorage to stream writes
NIFI-7964 Remove unused import NIFI-7964 Reverted to using BufferedInputStream along with unmarkable NIFI-7964 Made UnmarkableInputStream static and added factory method NIFI-7964 Remove unnecessary factory method from UnmarkableInputStream This closes #4632. Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
This commit is contained in:
parent
bb178f371b
commit
1e227ca643
|
@ -19,6 +19,7 @@ package org.apache.nifi.processors.azure.storage;
|
|||
import java.io.BufferedInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.FilterInputStream;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
|
@ -131,8 +132,16 @@ public class PutAzureBlobStorage extends AbstractAzureBlobProcessor {
|
|||
in = new BufferedInputStream(rawIn);
|
||||
}
|
||||
|
||||
// If markSupported() is true and a file length is provided,
|
||||
// Blobs are not uploaded in blocks resulting in OOME for large
|
||||
// files. The UnmarkableInputStream wrapper class disables
|
||||
// mark() and reset() to help force uploading files in chunks.
|
||||
if (in.markSupported()) {
|
||||
in = new UnmarkableInputStream(in);
|
||||
}
|
||||
|
||||
try {
|
||||
blob.upload(in, length, null, null, operationContext);
|
||||
blob.upload(in, -1, null, null, operationContext);
|
||||
BlobProperties properties = blob.getProperties();
|
||||
attributes.put("azure.container", containerName);
|
||||
attributes.put("azure.primaryUri", blob.getSnapshotQualifiedUri().toString());
|
||||
|
@ -165,4 +174,24 @@ public class PutAzureBlobStorage extends AbstractAzureBlobProcessor {
|
|||
}
|
||||
|
||||
}
|
||||
|
||||
// Used to help force Azure Blob SDK to write in blocks
|
||||
private static class UnmarkableInputStream extends FilterInputStream {
|
||||
public UnmarkableInputStream(InputStream in) {
|
||||
super(in);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void mark(int readlimit) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void reset() throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean markSupported() {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue