Lazily open InputStream during complete MPU

Previously the filesystem provider could exhaust file descriptors by
eagerly opening up to 10,000 parts.  This partially undoes
JCLOUDS-1367.
This commit is contained in:
Andrew Gaul 2019-01-28 21:45:20 -08:00
parent 29eec441e9
commit 13f32b28c9
2 changed files with 77 additions and 10 deletions

View File

@ -164,6 +164,34 @@ public class FilesystemBlobIntegrationTest extends BaseBlobIntegrationTest {
}
}
@Test(groups = { "integration", "live" })
public void test10000PartMultipartUpload() throws Exception {
BlobStore blobStore = view.getBlobStore();
String container = getContainerName();
int partSize = (int) blobStore.getMinimumMultipartPartSize();
try {
String name = "blob-name";
BlobBuilder blobBuilder = blobStore.blobBuilder(name);
Blob blob = blobBuilder.build();
MultipartUpload mpu = blobStore.initiateMultipartUpload(container, blob.getMetadata(), new PutOptions());
ImmutableList.Builder<MultipartPart> parts = ImmutableList.builder();
byte[] content = new byte[partSize];
for (int i = 0; i < 10 * 1000; ++i) {
Payload payload = Payloads.newByteArrayPayload(content);
payload.getContentMetadata().setContentLength((long) partSize);
parts.add(blobStore.uploadMultipartPart(mpu, i, payload));
}
blobStore.completeMultipartUpload(mpu, parts.build());
BlobMetadata newBlobMetadata = blobStore.blobMetadata(container, name);
assertThat(newBlobMetadata.getSize()).isEqualTo(10 * 1000 * partSize);
} finally {
returnContainer(container);
}
}
protected void checkExtendedAttributesSupport() {
if (isMacOSX()) {
throw new SkipException("filesystem does not support extended attributes in Mac OSX");

View File

@ -28,12 +28,14 @@ import static com.google.common.collect.Sets.newTreeSet;
import static org.jclouds.blobstore.options.ListContainerOptions.Builder.recursive;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.SequenceInputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -756,7 +758,14 @@ public final class LocalBlobStore implements BlobStore {
// return InputStream to more closely follow real blobstore
Payload payload;
try {
payload = new InputStreamPayload(blob.getPayload().openStream());
InputStream is = blob.getPayload().openStream();
if (is instanceof FileInputStream) {
// except for FileInputStream since large MPU can open too many fds
is.close();
payload = blob.getPayload();
} else {
payload = new InputStreamPayload(blob.getPayload().openStream());
}
} catch (IOException ioe) {
throw new RuntimeException(ioe);
}
@ -825,20 +834,14 @@ public final class LocalBlobStore implements BlobStore {
@Override
public String completeMultipartUpload(MultipartUpload mpu, List<MultipartPart> parts) {
ImmutableList.Builder<InputStream> streams = ImmutableList.builder();
ImmutableList.Builder<Blob> blobs = ImmutableList.builder();
long contentLength = 0;
Hasher md5Hasher = Hashing.md5().newHasher();
for (MultipartPart part : parts) {
Blob blobPart = getBlob(mpu.containerName(), MULTIPART_PREFIX + mpu.id() + "-" + mpu.blobName() + "-" + part.partNumber());
contentLength += blobPart.getMetadata().getContentMetadata().getContentLength();
InputStream is;
try {
is = blobPart.getPayload().openStream();
} catch (IOException ioe) {
throw propagate(ioe);
}
streams.add(is);
blobs.add(blobPart);
md5Hasher.putBytes(BaseEncoding.base16().lowerCase().decode(blobPart.getMetadata().getETag()));
}
String mpuETag = new StringBuilder("\"")
@ -849,7 +852,7 @@ public final class LocalBlobStore implements BlobStore {
.toString();
PayloadBlobBuilder blobBuilder = blobBuilder(mpu.blobName())
.userMetadata(mpu.blobMetadata().getUserMetadata())
.payload(new SequenceInputStream(Iterators.asEnumeration(streams.build().iterator())))
.payload(new MultiBlobInputStream(blobs.build()))
.contentLength(contentLength)
.eTag(mpuETag);
String cacheControl = mpu.blobMetadata().getContentMetadata().getCacheControl();
@ -998,5 +1001,41 @@ public final class LocalBlobStore implements BlobStore {
return eTag;
}
private static final class MultiBlobInputStream extends InputStream {
private final Iterator<Blob> blobs;
private InputStream current;
MultiBlobInputStream(List<Blob> blobs) {
this.blobs = blobs.iterator();
}
@Override
public int read() throws IOException {
byte[] b = new byte[1];
int result = read(b, 0, b.length);
if (result == -1) {
return -1;
}
return b[0] & 0x000000FF;
}
@Override
public int read(byte[] b, int off, int len) throws IOException {
while (true) {
if (current == null) {
if (!blobs.hasNext()) {
return -1;
}
current = blobs.next().getPayload().openStream();
}
int result = current.read(b, off, len);
if (result == -1) {
current.close();
current = null;
continue;
}
return result;
}
}
}
}